from typing import List, Optional, Union, Dict, Any from pathlib import Path from langchain_core.documents import Document from langchain_community.document_loaders.base import BaseLoader from docx import Document as DocxDocument import os import subprocess import logging import zipfile from lxml import etree as ET import re logger = logging.getLogger(__name__) class GCYWordLoader(BaseLoader): """用于加载和解析 Word 文档的自定义加载器,支持标题层级结构解析及XML级别内容(段落、表格、页眉页脚、批注、修订、目录)。""" # WordprocessingML命名空间 ns = { "w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main" } def __init__( self, file_path: Union[str, Path], output_dir: Optional[Union[str, Path]] = None, *, keep_doc_title: bool = True, start_with_title: bool = False, max_heading_level: int = 3, metadata: Optional[Dict[str, Any]] = None ): self.file_path = str(file_path) self.output_dir = str(output_dir) if output_dir else os.path.dirname(self.file_path) self.keep_doc_title = keep_doc_title self.start_with_title = start_with_title self.max_heading_level = min(max(1, max_heading_level), 6) self.metadata = metadata or {} # 临时解压目录 self._work_dir = os.path.join(self.output_dir, '_pyc_work') self._doc_dir = os.path.join(self._work_dir, 'word') # 验证 if not os.path.isfile(self.file_path): raise FileNotFoundError(f"文件不存在: {self.file_path}") if not self.file_path.lower().endswith(('.doc', '.docx')): raise ValueError("仅支持 .doc 或 .docx 格式") def load(self) -> List[Document]: try: # 预处理(.doc 转 .docx) processed_path = self._preprocess_document() # 准备工作目录并解压 self._prepare_work_directories() self._extract_docx(processed_path) # 解析主文档 XML self._parse_document_xml() # 构建文档片段列表 docs: List[Document] = [] base_meta = {"source": self.file_path, "file_name": os.path.basename(self.file_path), **self.metadata} # 支持保留文档标题 docx_core = DocxDocument(processed_path) if self.keep_doc_title and docx_core.core_properties.title: title = docx_core.core_properties.title if self.start_with_title: docs.append(Document(page_content=title, metadata={**base_meta, "heading": "Document Title"})) else: docs.append(Document(page_content=title, metadata={**base_meta, "heading": None})) # 段落 docs.extend(self._parse_paragraphs(base_meta)) # 表格 docs.extend(self._parse_tables(base_meta)) # 页眉与页脚 docs.extend(self._parse_headers_and_footers(base_meta)) # 批注 docs.extend(self._parse_comments(base_meta)) # 修订 docs.extend(self._parse_revisions(base_meta)) # 目录项 docs.extend(self._parse_toc(base_meta)) return docs except Exception as e: logger.error(f"文档加载失败: {self.file_path}", exc_info=True) raise RuntimeError(f"无法加载文档: {e}") finally: # 清理临时 if os.path.exists(self._work_dir): try: import shutil; shutil.rmtree(self._work_dir) except: pass def _preprocess_document(self) -> str: # .docx 直接返回 if self.file_path.lower().endswith('.docx'): return self.file_path # .doc 转 .docx output_path = os.path.join(self.output_dir, Path(self.file_path).stem + '.docx') subprocess.run([ 'soffice', '--headless', '--convert-to', 'docx', '--outdir', self.output_dir, self.file_path ], check=True, capture_output=True) if not os.path.exists(output_path): raise RuntimeError('文档格式转换失败') return output_path def _prepare_work_directories(self): if os.path.exists(self._work_dir): import shutil; shutil.rmtree(self._work_dir) os.makedirs(self._doc_dir, exist_ok=True) def _extract_docx(self, path: str): with zipfile.ZipFile(path, 'r') as z: z.extractall(self._work_dir) def _parse_document_xml(self): xml_path = os.path.join(self._doc_dir, 'document.xml') parser = ET.XMLParser(remove_blank_text=True) self._doc_tree = ET.parse(xml_path, parser) self._doc_root = self._doc_tree.getroot() def _get_text_from_runs(self, parent) -> str: parts = [] for r in parent.findall('.//w:r', self.ns): for t in r.findall('w:t', self.ns): if t.text: parts.append(t.text) return ''.join(parts).strip() def _parse_paragraphs(self, base_meta) -> List[Document]: docs = [] for p in self._doc_root.findall('.//w:p', self.ns): text = self._get_text_from_runs(p) if not text: continue docs.append(Document(page_content=text, metadata={**base_meta, 'content_type':'paragraph'})) return docs def _parse_tables(self, base_meta) -> List[Document]: docs = [] for tbl in self._doc_root.findall('.//w:tbl', self.ns): # 简单按行为 \n 分段 rows = [] for row in tbl.findall('.//w:tr', self.ns): cells = [] for cell in row.findall('.//w:tc', self.ns): cells.append(self._get_text_from_runs(cell)) rows.append('|'.join(cells)) content = '\n'.join(rows) docs.append(Document(page_content=content, metadata={**base_meta, 'content_type':'table'})) return docs def _parse_headers_and_footers(self, base_meta) -> List[Document]: docs = [] for part in ['header', 'footer']: for i in range(1,10): path = os.path.join(self._doc_dir, f'{part}{i}.xml') if not os.path.exists(path): continue tree = ET.parse(path) root = tree.getroot() for p in root.findall('.//w:p', self.ns): text = self._get_text_from_runs(p) if text: docs.append(Document(page_content=text, metadata={**base_meta, 'content_type':part})) return docs def _parse_comments(self, base_meta) -> List[Document]: docs = [] path = os.path.join(self._doc_dir, 'comments.xml') if os.path.exists(path): tree = ET.parse(path) root = tree.getroot() for c in root.findall('.//w:comment', self.ns): text = self._get_text_from_runs(c) if text: docs.append(Document(page_content=text, metadata={**base_meta, 'content_type':'comment'})) return docs def _parse_revisions(self, base_meta) -> List[Document]: docs = [] for tag in ['del','ins']: for el in self._doc_root.findall(f'.//w:{tag}', self.ns): text = self._get_text_from_runs(el) if text: docs.append(Document(page_content=text, metadata={**base_meta, 'content_type':'revision'})) return docs def _parse_toc(self, base_meta) -> List[Document]: docs = [] for p in self._doc_root.findall('.//w:p', self.ns): full = ''.join(t.text or '' for t in p.findall('w:t', self.ns)).strip() if len(full)>255: continue if p.find('.//w:tab', self.ns) is not None and full and full[-1].isdigit(): # 简单拆分标题和页码 parts = re.split(r'\t+', full) title = parts[0] docs.append(Document(page_content=title, metadata={**base_meta,'content_type':'toc_entry'})) return docs