Files
gangyan/langchain-chat/server/translator_service/converter/docx.py

1262 lines
47 KiB
Python
Raw Normal View History

import os
import asyncio
import re
import shutil
import zipfile
from typing import Callable, Dict, List, Optional, Tuple, Any
from io import BytesIO
from docx import Document
from docx.enum.shape import WD_INLINE_SHAPE_TYPE
from dataclasses import dataclass, field
from lxml import etree as ET # 替换为 lxml 的 etree
from configs.basic_config import *
from configs.translate_config import *
from server.translator_service.translator.openai_translator import OpenAITranslator
class LLMTranslator:
def __init__(
self,
lang_in: str,
lang_out: str,
temp_dict: dict[str, str] = None,
) -> None:
self.lang_in = lang_in if lang_in != "auto" else ""
self.lang_out = lang_out
self.translator = OpenAITranslator(
lang_in=self.lang_in,
lang_out=self.lang_out,
model=LLM_MODEL,
base_url=LLM_ENDPOINT,
api_key=LLM_API_KEY,
ignore_cache= True,
qps=LLM_CONCURRENCY_LIMIT,
)
self.vocab = None
async def translate(self, text: str) -> str:
if not text:
return text
# 如果是数字,则跳过,注意数字内可能包含 . , - 且可能在数字中间
if re.match(r"^-?\d+([.,\-]\d+)*$", text):
return text
# 如果是单个英文字母,无论大小写,直接返回原文
if re.match(r"^[a-zA-Z]$", text):
return text
# 如果是单个标点符号,则跳过
if text in SYMBOLS:
return text
retries = LLM_RETRIES
while retries > 0:
try:
logger.info(f"开始翻译: {text}")
if text == "" and self.lang_out == "en":
return "Of"
result = await asyncio.get_event_loop().run_in_executor(
None, lambda: self.translator.translate(text)
)
logger.info(f"翻译完成: {result}")
return result.replace("<br>", "\n")
except Exception as e:
retries -= 1
if retries > 0:
logger.warning(f"翻译失败,剩余重试次数{retries}: {str(e)}")
await asyncio.sleep(1)
continue
logger.error(f"翻译失败且重试次数已用完: {str(e)}")
return text
async def processor(
input_path: str,
output_path: str,
lang_in: str,
lang_out: str,
is_dual_language: bool,
work_dir: str,
progress_callback: Callable[[float], None],
cancel_event: Optional[asyncio.Event] = None,
):
"""DOCX文档翻译处理入口函数"""
docx_translator = DocxTranslator(
input_path=input_path,
output_path=output_path,
lang_in=lang_in,
lang_out=lang_out,
is_dual_language=is_dual_language,
work_dir=work_dir,
progress_callback=progress_callback,
cancel_event=cancel_event,
)
await docx_translator.initialize()
await docx_translator.process()
@dataclass
class TranslationTask:
"""翻译任务数据类,用于存储待翻译的文本和对应的元素信息"""
element: ET.Element # XML元素
text_element: Optional[ET.Element] = None # 主要文本元素
original_text: str = "" # 原始文本
translated_text: Optional[str] = None # 翻译后的文本
is_dual_language: bool = False # 是否为双语模式
text_elements: List[ET.Element] = field(default_factory=list) # 所有相关文本元素
number_text: str = "" # 目录项的编号部分
page_text: str = "" # 目录项的页码部分
is_toc: bool = False # 是否为目录项
page_number: str = "" # 目录项的页码部分
comment_id: Optional[str] = None
revision_id: Optional[str] = None
is_comment: bool = False
is_revision: bool = False
comments: List[Tuple[str, ET.Element]] = field(
default_factory=list
) # 批注文本和元素对的列表
revisions: List[Tuple[str, ET.Element]] = field(
default_factory=list
) # 修订记录文本和元素对的列表
text_map: List[Tuple[str, str, ET.Element]] = field(
default_factory=list
) # 用于存储文本片段和其对应的元素信息
def __post_init__(self):
if self.text_element and not self.text_elements:
self.text_elements = [self.text_element]
class DocxTranslator:
"""DOCX文档翻译器
负责处理DOCX文档的翻译
支持单语和双语模式
"""
# XML命名空间
ns = {
"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main",
"m": "http://schemas.openxmlformats.org/officeDocument/2006/math",
"w14": "http://schemas.microsoft.com/office/word/2010/wordml",
}
def __init__(
self,
input_path: str,
output_path: str,
lang_in: str,
lang_out: str,
is_dual_language: bool,
work_dir: str,
progress_callback: Callable[[float], None],
cancel_event: Optional[asyncio.Event] = None,
):
"""初始化文档翻译器
Args:
input_path: 输入文档路径
output_path: 输出文档路径
lang_in: 输入语言
lang_out: 输出语言
is_dual_language: 是否使用双语模式
work_dir: 工作目录
progress_callback: 进度回调函数
cancel_event: 取消事件
"""
# 文件路径相关
self.input_path = input_path
self.output_path = output_path
self.work_dir = work_dir
self.doc_dir = os.path.join(work_dir, "doc")
self.img_dir = os.path.join(work_dir, "img")
# 翻译配置
self.lang_in = lang_in
self.lang_out = lang_out
self.is_dual_language = is_dual_language
# 进度和控制
self.progress_callback = progress_callback
self.cancel_event = cancel_event
self.total_items = 0
self.processed_items = 0
# 翻译引擎
self.llm_translator = None
self.image_processor = None
# 文档对象
self.doc = None
self.doc_tree = None
self.doc_root = None
self.doc_xml_path = None
# 锁
self.doc_lock = asyncio.Lock()
self.progress_lock = asyncio.Lock()
# 批注和修订记录相关
self.comments_tasks = []
self.revision_tasks = []
# 页眉页脚相关
self.header_tasks = []
self.footer_tasks = []
# 目录项相关
self.toc_tasks = []
# ===== 初始化和主流程方法 =====
async def initialize(self):
"""初始化翻译器和工作环境"""
await self.check_cancelled()
# 加载文档
loop = asyncio.get_event_loop()
self.doc = await loop.run_in_executor(None, lambda: Document(self.input_path))
# 初始化翻译引擎
logger.info(
f"初始化翻译器 - 输入语言: {self.lang_in}, 输出语言: {self.lang_out}"
)
self.llm_translator = LLMTranslator(
lang_in=self.lang_in,
lang_out=self.lang_out,
)
# 准备工作目录
await self._prepare_work_directories()
# 解析文档XML
await self._extract_and_parse_document()
# 计算总项目数
await self._calculate_total_items()
await self.check_cancelled()
async def process(self):
"""处理文档翻译的主流程"""
try:
await self.check_cancelled()
# 收集所有需要翻译的内容
await self.collect_all_content()
# 翻译文档文本
await self.translate_document()
# 翻译批注和修订记录
await self.translate_comments_and_revisions()
# 翻译页眉页脚
await self.translate_headers_and_footers()
await self.check_cancelled()
# 保存处理后的文档
await self.save_document()
except asyncio.CancelledError:
logger.info("任务被取消,正在退出...")
raise
except Exception as e:
logger.error(f"处理文档失败: {str(e)}")
raise
finally:
# 清理资源
await self.cleanup()
# ===== 文档处理辅助方法 =====
async def _prepare_work_directories(self):
"""准备工作目录"""
if os.path.exists(self.work_dir):
shutil.rmtree(self.work_dir)
os.makedirs(self.work_dir, exist_ok=True)
os.makedirs(self.doc_dir, exist_ok=True)
os.makedirs(self.img_dir, exist_ok=True)
async def _extract_and_parse_document(self):
"""解压并解析文档XML使用lxml"""
try:
with zipfile.ZipFile(self.input_path, "r") as zip_ref:
zip_ref.extractall(self.doc_dir)
except Exception as e:
logger.error(f"解压文档失败: {str(e)}")
raise
try:
self.doc_xml_path = os.path.join(self.doc_dir, "word", "document.xml")
parser = ET.XMLParser(remove_blank_text=True)
self.doc_tree = ET.parse(self.doc_xml_path, parser)
self.doc_root = self.doc_tree.getroot()
except Exception as e:
logger.error(f"解析文档失败: {str(e)}")
raise
async def _calculate_total_items(self):
"""计算需要处理的总项目数"""
# 原有的文档元素计数
doc_elements = self.doc_root.findall(".//w:p", self.ns)
doc_tables = self.doc_root.findall(".//w:tbl", self.ns)
# 添加页眉页脚、批注和修订记录的计数
total_headers = len(self.header_tasks)
total_footers = len(self.footer_tasks)
total_comments = len(self.comments_tasks)
total_revisions = len(self.revision_tasks)
total_toc_entries = len(self.toc_tasks)
# 更新总项目数
self.total_items = (
len(doc_elements)
+ len(doc_tables)
+ total_headers
+ total_footers
+ total_comments
+ total_revisions
+ total_toc_entries
)
logger.info(f"文档中共有 {self.total_items} 个文本元素需要处理")
# ===== 文本翻译相关方法 =====
async def translate_document(self):
"""翻译文档文本内容,使用并发方式处理"""
try:
# 创建任务列表
tasks = []
# 收集需要翻译的段落
for element in self.doc_root.findall(".//w:p", self.ns):
# 如果元素包含数学公式,则跳过
if any(
child.tag.startswith("{" + self.ns["m"] + "}")
for child in element.iter()
):
continue
# 检查是否是目录项,如果是则跳过(由专门的方法处理)
if any(task.element == element for task in self.toc_tasks):
continue
if self.is_dual_language:
# 双语模式:收集整个元素的文本
await self.collect_dual_language_task(element, tasks)
else:
# 单语模式收集每个Run的文本
await self.collect_single_language_tasks(element, tasks)
# 当收集到足够的任务或处理完所有元素时,执行批量翻译
if len(tasks) >= LLM_CONCURRENCY_LIMIT:
await self.process_translation_batch(tasks)
tasks = [] # 清空任务列表
# 处理剩余的任务
if tasks:
await self.process_translation_batch(tasks)
# 处理目录项
if self.toc_tasks:
await self.translate_toc_entries()
except Exception as e:
logger.error(f"处理文档文本失败: {str(e)}")
raise
async def collect_dual_language_task(
self, element: ET.Element, tasks: List[TranslationTask]
) -> None:
"""收集双语模式下的翻译任务
Args:
element: XML元素
tasks: 任务列表
"""
# 提取文本
text = []
for t in element.iter("{" + self.ns["w"] + "}t"):
if t.text:
text.append(t.text)
original_text = "".join(text).strip()
# 如果有文本需要翻译,添加到任务列表
if original_text:
task = TranslationTask(
element=element, original_text=original_text, is_dual_language=True
)
tasks.append(task)
async def collect_single_language_tasks(
self, element: ET.Element, tasks: List[TranslationTask]
) -> None:
"""收集单语模式下的翻译任务,批注和修订记录只收集不添加内容"""
text_elements = list(element.iter("{" + self.ns["w"] + "}t"))
if not text_elements:
return
# 收集主文本(包含修订记录的内容)
main_text = "".join(t.text or "" for t in text_elements)
# 收集批注和修订记录(只收集引用,不添加内容)
comments = []
revisions = []
# 收集批注
for comment_ref in element.findall(".//w:commentReference", self.ns):
comment_id = comment_ref.get("{" + self.ns["w"] + "}id")
comment_xml = os.path.join(self.doc_dir, "word", "comments.xml")
if os.path.exists(comment_xml):
comment_tree = ET.parse(comment_xml)
comment_root = comment_tree.getroot()
comment = comment_root.find(
f".//w:comment[@w:id='{comment_id}']", self.ns
)
if comment is not None:
comments.append(comment)
# 收集修订记录(只收集引用)
for revision in element.findall(".//w:del", self.ns) + element.findall(
".//w:ins", self.ns
):
revisions.append(revision)
if main_text.strip():
task = TranslationTask(
element=element,
text_elements=text_elements, # 包含所有文本元素
original_text=main_text,
is_dual_language=False,
comments=comments,
revisions=revisions,
)
tasks.append(task)
async def process_translation_batch(self, tasks: List[TranslationTask]) -> None:
"""并发处理一批翻译任务
Args:
tasks: 翻译任务列表
"""
await self.check_cancelled()
# 创建翻译协程列表
translation_coroutines = []
for task in tasks:
if task.original_text:
translation_coroutines.append(self.translate_text(task))
# 并发执行翻译
if translation_coroutines:
await asyncio.gather(*translation_coroutines)
# 更新文档
for task in tasks:
if task.translated_text:
if task.is_dual_language:
await self.update_dual_language_element(task)
else:
await self.update_single_language_element(task)
# 更新进度
await self.update_progress()
async def translate_text(self, task: TranslationTask) -> None:
"""翻译单个文本
Args:
task: 翻译任务
"""
try:
await self.check_cancelled()
# 实际调用翻译API
translated_text = await self.llm_translator.translate(task.original_text)
# translated_text = "X" + task.original_text + "X"
# 如果翻译失败,保留原文
if not translated_text or translated_text == "error":
translated_text = task.original_text
task.translated_text = translated_text
except Exception as e:
logger.error(f"翻译文本失败: {str(e)}")
task.translated_text = task.original_text
async def update_dual_language_element(self, task: TranslationTask) -> None:
"""更新双语模式下的元素文本"""
# 获取原始段落在文档中的位置
all_paragraphs = self.doc_root.findall(".//w:p", self.ns)
try:
index = all_paragraphs.index(task.element)
except ValueError:
# 如果找不到元素,则使用原来的双语模式
dual_text = f"{task.original_text} ({task.translated_text})"
text_elements = list(task.element.iter("{" + self.ns["w"] + "}t"))
if text_elements:
text_elements[0].text = dual_text
return
# 检查原段落是否整段都有加粗
is_all_bold = True
runs = task.element.findall(".//w:r", self.ns)
for run in runs:
# 检查run是否包含文本
if run.find(".//w:t", self.ns) is not None:
# 检查是否有加粗属性
rPr = run.find(".//w:rPr", self.ns)
if rPr is None or rPr.find(".//w:b", self.ns) is None:
is_all_bold = False
break
# 复制原始段落创建新段落
new_paragraph = ET.fromstring(ET.tostring(task.element))
# 清除新段落中的所有文本
for t_element in new_paragraph.findall(".//w:t", self.ns):
t_element.text = ""
# 找到第一个文本元素并设置为译文
text_elements = list(new_paragraph.findall(".//w:t", self.ns))
if text_elements:
# 设置译文
text_elements[0].text = task.translated_text
# 设置译文格式
run = text_elements[0].getparent() # 获取包含文本的run元素
if run is not None:
# 获取或创建rPr元素
rPr = run.find(".//w:rPr", self.ns)
if rPr is None:
rPr = ET.SubElement(run, "{" + self.ns["w"] + "}rPr")
# 根据原文格式设置译文加粗
if is_all_bold:
# 添加加粗标记
if rPr.find(".//w:b", self.ns) is None:
ET.SubElement(rPr, "{" + self.ns["w"] + "}b")
else:
# 移除加粗标记
bold = rPr.find(".//w:b", self.ns)
if bold is not None:
rPr.remove(bold)
# 找到原段落的父元素
parent = None
for elem in self.doc_root.iter():
if task.element in list(elem):
parent = elem
break
if parent is not None:
# 在原段落后插入新段落
parent_children = list(parent)
index_in_parent = parent_children.index(task.element)
parent.insert(index_in_parent + 1, new_paragraph)
else:
# 如果找不到父元素,则使用原来的双语模式
dual_text = f"{task.original_text} ({task.translated_text})"
text_elements = list(task.element.iter("{" + self.ns["w"] + "}t"))
if text_elements:
text_elements[0].text = dual_text
def _smart_split_text(self, text: str) -> List[str]:
"""智能分割翻译后的文本,考虑标点符号和自然语言边界
Args:
text: 要分割的文本
Returns:
分割后的文本片段列表
"""
# 定义分割标记
delimiters = list(SYMBOLS) + [" "]
# 如果文本很短,直接返回
if len(text) <= 10:
return [text]
segments = []
current_segment = ""
for char in text:
current_segment += char
# 在分隔符处进行分割
if any(current_segment.endswith(d) for d in delimiters):
segments.append(current_segment)
current_segment = ""
# 处理剩余文本
if current_segment:
segments.append(current_segment)
return segments
async def update_single_language_element(self, task: TranslationTask) -> None:
"""更新单语模式下的元素文本"""
if not task.translated_text:
return
# 检查原段落是否整段都有加粗
is_all_bold = True
runs = task.element.findall(".//w:r", self.ns)
for run in runs:
# 检查run是否包含文本
if run.find(".//w:t", self.ns) is not None:
# 检查是否有加粗属性
rPr = run.find(".//w:rPr", self.ns)
if rPr is None or rPr.find(".//w:b", self.ns) is None:
is_all_bold = False
break
# 更新主文本
if task.text_elements:
# 设置第一个文本元素的内容
task.text_elements[0].text = task.translated_text
# 设置译文格式
run = task.text_elements[0].getparent() # 获取包含文本的run元素
if run is not None:
# 获取或创建rPr元素
rPr = run.find(".//w:rPr", self.ns)
if rPr is None:
rPr = ET.SubElement(run, "{" + self.ns["w"] + "}rPr")
# 根据原文格式设置译文加粗
if is_all_bold:
# 添加加粗标记
if rPr.find(".//w:b", self.ns) is None:
ET.SubElement(rPr, "{" + self.ns["w"] + "}b")
else:
# 移除加粗标记
bold = rPr.find(".//w:b", self.ns)
if bold is not None:
rPr.remove(bold)
# 清空其他文本元素
for t in task.text_elements[1:]:
t.text = ""
async def translate_element_dual(self, element: ET.Element) -> None:
"""双语模式:提取并翻译整个元素的文本(保留此方法以兼容现有代码)
Args:
element: XML元素
"""
# 如果元素包含数学公式,则跳过
if any(
child.tag.startswith("{" + self.ns["m"] + "}") for child in element.iter()
):
return
# 提取文本
text = []
for t in element.iter("{" + self.ns["w"] + "}t"):
if t.text:
text.append(t.text)
original_text = "".join(text).strip()
# 翻译文本
if original_text:
try:
await self.check_cancelled()
# 实际应用中调用翻译API
translated_text = await self.llm_translator.translate(original_text)
# translated_text = "X" + original_text + "X"
# 如果翻译失败,保留原文
if not translated_text or translated_text == "error":
translated_text = original_text
except Exception as e:
logger.error(f"翻译文本失败: {str(e)}")
translated_text = original_text
# 构建双语文本(原文 + 译文)
dual_text = f"{original_text}\n{translated_text}"
# 更新文本
text_elements = list(element.iter("{" + self.ns["w"] + "}t"))
if not text_elements:
return
if len(text_elements) == 1:
text_elements[0].text = dual_text
return
# 处理多个文本元素的情况
# 首先保留第一个元素的原文
first_element = text_elements[0]
first_element.text = original_text + "\n"
# 然后将译文分配给剩余的元素
words = translated_text.split()
total_words = len(words)
remaining_elements = text_elements[1:]
if not remaining_elements:
# 如果只有一个元素,则将译文附加到原文后
first_element.text = dual_text
return
words_per_element = max(1, total_words // len(remaining_elements))
for i, t_element in enumerate(remaining_elements):
start_idx = i * words_per_element
end_idx = (
start_idx + words_per_element
if i < len(remaining_elements) - 1
else None
)
if start_idx < len(words):
t_element.text = " ".join(words[start_idx:end_idx])
else:
t_element.text = ""
# 更新进度
await self.update_progress()
async def translate_element_single(self, element: ET.Element) -> None:
"""单语模式按每个Run单独翻译段落中的文本保留此方法以兼容现有代码
Args:
element: XML元素
"""
# 如果元素包含数学公式,则跳过
if any(
child.tag.startswith("{" + self.ns["m"] + "}") for child in element.iter()
):
return
# 获取所有文本元素
text_elements = list(element.iter("{" + self.ns["w"] + "}t"))
if not text_elements:
return
# 对每个文本元素单独处理
for t_element in text_elements:
if t_element.text and t_element.text.strip():
original_text = t_element.text
# 实际应用中调用翻译API
try:
await self.check_cancelled()
translated_text = await self.llm_translator.translate(original_text)
# translated_text = "X" + original_text + "X"
# 如果翻译失败,保留原文
if not translated_text or translated_text == "error":
translated_text = original_text
except Exception as e:
logger.error(f"翻译文本失败: {str(e)}")
translated_text = original_text
t_element.text = translated_text
# 更新进度
await self.update_progress()
# ===== 文档保存和清理方法 =====
async def save_document(self):
"""保存处理后的文档"""
async with self.doc_lock:
await self.check_cancelled()
# 首先保存 XML 更改
self.doc_tree.write(
self.doc_xml_path, encoding="UTF-8", xml_declaration=True
)
# 然后打包所有文件到新的 docx
with zipfile.ZipFile(self.output_path, "w") as outzip:
for foldername, subfolders, filenames in os.walk(self.doc_dir):
for filename in filenames:
file_path = os.path.join(foldername, filename)
arcname = os.path.relpath(file_path, self.doc_dir)
outzip.write(file_path, arcname)
# 文档处理完成后设置进度为100%
self.progress_callback(100.0)
# async def save_document_by_docx(self):
# """用 python-docx 库保存处理后的文档"""
# async with self.doc_lock:
# await self.check_cancelled()
# loop = asyncio.get_event_loop()
# await loop.run_in_executor(None, lambda: self.doc.save(self.output_path))
# # 文档处理完成后设置进度为100%
# self.progress_callback(100.0)
async def cleanup(self):
"""清理资源和临时文件"""
# 清理内存资源
await self.cleanmem()
# 清理临时文件
if os.path.exists(self.work_dir):
shutil.rmtree(self.work_dir)
logger.info("资源和临时文件清理完成")
async def cleanmem(self):
"""清理内存资源"""
if self.llm_translator:
del self.llm_translator
if self.image_processor:
del self.image_processor
if self.doc:
del self.doc
logger.info("内存资源清理完成")
# ===== 工具方法 =====
async def check_cancelled(self):
"""检查任务是否被取消"""
if self.cancel_event and self.cancel_event.is_set():
logger.info("任务被取消,正在退出...")
raise asyncio.CancelledError("任务已被取消")
async def update_progress(self):
"""更新处理进度"""
await self.check_cancelled()
self.processed_items += 1
progress = (
min(100.0, (self.processed_items / self.total_items) * 100)
if self.total_items > 0
else 0
)
self.progress_callback(progress)
async def collect_all_content(self):
"""收集所有需要翻译的内容"""
# 收集批注和修订记录
await self.collect_comments_and_revisions()
# 收集页眉页脚
await self.collect_headers_and_footers()
# 收集目录项
await self.collect_toc_entries()
async def collect_headers_and_footers(self):
"""收集页眉页脚内容"""
# 处理页眉
for section_id in range(1, 10): # 通常文档不会超过9个节
header_xml = os.path.join(self.doc_dir, "word", f"header{section_id}.xml")
if os.path.exists(header_xml):
tree = ET.parse(header_xml)
root = tree.getroot()
for paragraph in root.findall(".//w:p", self.ns):
text = "".join(
t.text
for r in paragraph.findall(".//w:r", self.ns)
for t in r.findall(".//w:t", self.ns)
if t.text
)
if text.strip():
self.header_tasks.append(
TranslationTask(
element=paragraph,
original_text=text,
is_dual_language=self.is_dual_language,
text_elements=[
t
for r in paragraph.findall(".//w:r", self.ns)
for t in r.findall(".//w:t", self.ns)
],
)
)
# 处理页脚
for section_id in range(1, 10):
footer_xml = os.path.join(self.doc_dir, "word", f"footer{section_id}.xml")
if os.path.exists(footer_xml):
tree = ET.parse(footer_xml)
root = tree.getroot()
for paragraph in root.findall(".//w:p", self.ns):
text = "".join(
t.text
for r in paragraph.findall(".//w:r", self.ns)
for t in r.findall(".//w:t", self.ns)
if t.text
)
if text.strip():
self.footer_tasks.append(
TranslationTask(
element=paragraph,
original_text=text,
is_dual_language=self.is_dual_language,
text_elements=[
t
for r in paragraph.findall(".//w:r", self.ns)
for t in r.findall(".//w:t", self.ns)
],
)
)
async def collect_comments_and_revisions(self):
"""收集批注和修订记录内容"""
# 处理批注
comments_xml = os.path.join(self.doc_dir, "word", "comments.xml")
if os.path.exists(comments_xml):
tree = ET.parse(comments_xml)
root = tree.getroot()
for comment in root.findall(".//w:comment", self.ns):
text = []
runs = comment.findall(".//w:r", self.ns)
for i, r in enumerate(runs):
for t in r.findall(".//w:t", self.ns):
if t.text:
text.append(t.text)
# 检查是否需要在运行元素之间添加空格
if i < len(runs) - 1: # 如果不是最后一个运行元素
next_run = runs[i + 1]
if next_run.find(".//w:t", self.ns) is not None:
text.append(" ")
text = "".join(text).strip()
if text:
self.comments_tasks.append(
TranslationTask(
element=comment,
original_text=text,
is_dual_language=self.is_dual_language,
text_elements=[
t
for r in comment.findall(".//w:r", self.ns)
for t in r.findall(".//w:t", self.ns)
],
is_comment=True,
comment_id=comment.get("{" + self.ns["w"] + "}id"),
)
)
# 处理修订记录
for element in self.doc_root.findall(
".//w:del", self.ns
) + self.doc_root.findall(".//w:ins", self.ns):
text = []
runs = element.findall(".//w:r", self.ns)
for i, r in enumerate(runs):
for t in r.findall(".//w:t", self.ns):
if t.text:
text.append(t.text)
# 检查是否需要在运行元素之间添加空格
if i < len(runs) - 1: # 如果不是最后一个运行元素
next_run = runs[i + 1]
if next_run.find(".//w:t", self.ns) is not None:
text.append(" ")
text = "".join(text).strip()
if text:
self.revision_tasks.append(
TranslationTask(
element=element,
original_text=text,
is_dual_language=self.is_dual_language,
text_elements=[
t
for r in element.findall(".//w:r", self.ns)
for t in r.findall(".//w:t", self.ns)
],
is_revision=True,
revision_id=element.get("{" + self.ns["w"] + "}id"),
)
)
async def collect_toc_entries(self):
"""收集目录项内容并特殊处理"""
# 查找所有可能的目录段落
toc_paragraphs = []
# 首先找到所有段落
all_paragraphs = self.doc_root.findall(".//w:p", self.ns)
# 然后筛选包含目录特征的段落
for paragraph in all_paragraphs:
# 获取段落的完整文本
full_text = "".join(
t.text or ""
for t in paragraph.findall(".//w:t", self.ns)
).strip()
# 检查段落长度和末尾字符
if len(full_text) > 255:
continue
# 检查是否包含制表符和末尾是否为数字
has_tab = paragraph.find(".//w:tab", self.ns) is not None
ends_with_digit = full_text and full_text[-1].isdigit()
if has_tab and ends_with_digit:
toc_paragraphs.append(paragraph)
for paragraph in toc_paragraphs:
# 分析段落结构,识别标题文本、分隔符和页码
runs = paragraph.findall(".//w:r", self.ns)
# 收集所有文本元素和它们的位置信息
text_elements = []
tab_elements = []
page_number_elements = []
for i, r in enumerate(runs):
# 检查是否包含制表符
if r.find(".//w:tab", self.ns) is not None:
tab_elements.append((i, r))
continue
# 获取文本
t_elements = r.findall(".//w:t", self.ns)
for t in t_elements:
if t.text:
# 检查是否可能是页码(纯数字且位于段落末尾)
if t.text.strip().isdigit() and i > len(runs) * 0.7:
page_number_elements.append((i, r, t))
else:
text_elements.append((i, r, t))
# 如果找到了文本和页码,创建翻译任务
if text_elements and page_number_elements:
# 提取标题文本
title_text = "".join(t.text for _, _, t in text_elements)
# 创建翻译任务
task = TranslationTask(
element=paragraph,
original_text=title_text.strip(),
is_dual_language=False,
text_elements=[t for _, _, t in text_elements],
)
# 添加结构信息
task.is_toc_entry = True
task.page_number_elements = page_number_elements
task.tab_elements = tab_elements
task.original_structure = {
"text": text_elements,
"tabs": tab_elements,
"page_numbers": page_number_elements,
}
self.toc_tasks.append(task)
async def translate_toc_entries(self):
"""翻译目录项,保留特殊格式"""
if not self.toc_tasks:
return
# 创建翻译协程列表
translation_coroutines = []
for task in self.toc_tasks:
if task.original_text:
translation_coroutines.append(self.translate_text(task))
# 并发执行翻译
if translation_coroutines:
await asyncio.gather(*translation_coroutines)
# 更新目录项
for task in self.toc_tasks:
if task.translated_text:
# 特殊处理目录项的更新,保留页码和分隔符
await self.update_toc_entry(task)
# 更新进度
await self.update_progress()
async def update_toc_entry(self, task: TranslationTask) -> None:
"""特殊更新目录项,保留页码和分隔符格式
Args:
task: 翻译任务
"""
if not task.text_elements or not hasattr(task, "original_structure"):
return
# 获取原始结构信息
original_structure = task.original_structure
# 1. 清空所有文本元素
for t in task.text_elements:
t.text = ""
# 2. 根据是否为双语模式处理文本
if task.text_elements:
if self.is_dual_language:
# 双语模式:原文 (译文)
task.text_elements[
0
].text = f"{task.original_text} ({task.translated_text})"
else:
# 单语模式:仅译文
task.text_elements[0].text = task.translated_text
# 3. 确保只保留一个制表符,删除多余的制表符
if hasattr(task, "tab_elements") and task.tab_elements:
# 只保留最后一个制表符,删除其他制表符
for i, (idx, tab) in enumerate(task.tab_elements[:-1]):
parent = tab.getparent()
if parent is not None:
parent.remove(tab)
# 4. 确保页码元素保持在原位置
if hasattr(task, "page_number_elements") and task.page_number_elements:
# 页码元素保持不变
pass
async def translate_comments_and_revisions(self):
"""翻译批注和修订记录"""
# 翻译批注
if self.comments_tasks:
await self.process_translation_batch(self.comments_tasks)
# 更新批注XML
comments_xml = os.path.join(self.doc_dir, "word", "comments.xml")
if os.path.exists(comments_xml):
tree = ET.parse(comments_xml)
root = tree.getroot()
for task in self.comments_tasks:
if task.translated_text:
comment = root.find(
f".//w:comment[@w:id='{task.comment_id}']", self.ns
)
if comment is not None:
# 更新批注文本
for t in comment.findall(".//w:t", self.ns):
t.text = task.translated_text
tree.write(comments_xml, encoding="UTF-8", xml_declaration=True)
# 翻译修订记录
if self.revision_tasks:
await self.process_translation_batch(self.revision_tasks)
# 更新修订记录
for task in self.revision_tasks:
if task.translated_text:
# 为修订记录的译文添加括号
bracketed_translation = f"({task.translated_text})"
for t in task.element.findall(".//w:t", self.ns):
t.text = bracketed_translation
async def translate_headers_and_footers(self):
"""翻译页眉页脚内容"""
# 翻译页眉
if self.header_tasks:
await self.process_translation_batch(self.header_tasks)
# 更新页眉XML
for section_id in range(1, 10):
header_xml = os.path.join(
self.doc_dir, "word", f"header{section_id}.xml"
)
if os.path.exists(header_xml):
tree = ET.parse(header_xml)
root = tree.getroot()
# 更新翻译后的文本
for task in self.header_tasks:
if task.translated_text:
# 查找对应的段落
for paragraph in root.findall(".//w:p", self.ns):
# 比较段落内容以确认是否是正确的段落
original_text = "".join(
t.text
for r in paragraph.findall(".//w:r", self.ns)
for t in r.findall(".//w:t", self.ns)
if t.text
).strip()
if original_text == task.original_text:
# 更新段落中的所有文本元素
text_elements = [
t
for r in paragraph.findall(".//w:r", self.ns)
for t in r.findall(".//w:t", self.ns)
]
if text_elements:
# 清除其他文本元素
for t in text_elements[1:]:
t.text = ""
# 设置第一个文本元素的内容
text_elements[0].text = task.translated_text
# 确保保留空格属性
if any(
t.get(
"{http://www.w3.org/XML/1998/namespace}space"
)
== "preserve"
for t in text_elements
):
text_elements[0].set(
"{http://www.w3.org/XML/1998/namespace}space",
"preserve",
)
tree.write(header_xml, encoding="UTF-8", xml_declaration=True)
# 翻译页脚
if self.footer_tasks:
await self.process_translation_batch(self.footer_tasks)
# 更新页脚XML
for section_id in range(1, 10):
footer_xml = os.path.join(
self.doc_dir, "word", f"footer{section_id}.xml"
)
if os.path.exists(footer_xml):
tree = ET.parse(footer_xml)
root = tree.getroot()
# 更新翻译后的文本
for task in self.footer_tasks:
if task.translated_text:
# 查找对应的段落
for paragraph in root.findall(".//w:p", self.ns):
# 比较段落内容以确认是否是正确的段落
original_text = "".join(
t.text
for r in paragraph.findall(".//w:r", self.ns)
for t in r.findall(".//w:t", self.ns)
if t.text
).strip()
if original_text == task.original_text:
# 更新段落中的所有文本元素
text_elements = [
t
for r in paragraph.findall(".//w:r", self.ns)
for t in r.findall(".//w:t", self.ns)
]
if text_elements:
# 清除其他文本元素
for t in text_elements[1:]:
t.text = ""
# 设置第一个文本元素的内容
text_elements[0].text = task.translated_text
# 确保保留空格属性
if any(
t.get(
"{http://www.w3.org/XML/1998/namespace}space"
)
== "preserve"
for t in text_elements
):
text_elements[0].set(
"{http://www.w3.org/XML/1998/namespace}space",
"preserve",
)
tree.write(footer_xml, encoding="UTF-8", xml_declaration=True)