Files
gangyan/langchain-chat/document_loaders/gycWordLoader.py

205 lines
8.1 KiB
Python
Raw Normal View History

from typing import List, Optional, Union, Dict, Any
from pathlib import Path
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
from docx import Document as DocxDocument
import os
import subprocess
import logging
import zipfile
from lxml import etree as ET
import re
logger = logging.getLogger(__name__)
class GCYWordLoader(BaseLoader):
"""用于加载和解析 Word 文档的自定义加载器支持标题层级结构解析及XML级别内容段落、表格、页眉页脚、批注、修订、目录"""
# WordprocessingML命名空间
ns = {
"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
}
def __init__(
self,
file_path: Union[str, Path],
output_dir: Optional[Union[str, Path]] = None,
*,
keep_doc_title: bool = True,
start_with_title: bool = False,
max_heading_level: int = 3,
metadata: Optional[Dict[str, Any]] = None
):
self.file_path = str(file_path)
self.output_dir = str(output_dir) if output_dir else os.path.dirname(self.file_path)
self.keep_doc_title = keep_doc_title
self.start_with_title = start_with_title
self.max_heading_level = min(max(1, max_heading_level), 6)
self.metadata = metadata or {}
# 临时解压目录
self._work_dir = os.path.join(self.output_dir, '_pyc_work')
self._doc_dir = os.path.join(self._work_dir, 'word')
# 验证
if not os.path.isfile(self.file_path):
raise FileNotFoundError(f"文件不存在: {self.file_path}")
if not self.file_path.lower().endswith(('.doc', '.docx')):
raise ValueError("仅支持 .doc 或 .docx 格式")
def load(self) -> List[Document]:
try:
# 预处理(.doc 转 .docx
processed_path = self._preprocess_document()
# 准备工作目录并解压
self._prepare_work_directories()
self._extract_docx(processed_path)
# 解析主文档 XML
self._parse_document_xml()
# 构建文档片段列表
docs: List[Document] = []
base_meta = {"source": self.file_path, "file_name": os.path.basename(self.file_path), **self.metadata}
# 支持保留文档标题
docx_core = DocxDocument(processed_path)
if self.keep_doc_title and docx_core.core_properties.title:
title = docx_core.core_properties.title
if self.start_with_title:
docs.append(Document(page_content=title, metadata={**base_meta, "heading": "Document Title"}))
else:
docs.append(Document(page_content=title, metadata={**base_meta, "heading": None}))
# 段落
docs.extend(self._parse_paragraphs(base_meta))
# 表格
docs.extend(self._parse_tables(base_meta))
# 页眉与页脚
docs.extend(self._parse_headers_and_footers(base_meta))
# 批注
docs.extend(self._parse_comments(base_meta))
# 修订
docs.extend(self._parse_revisions(base_meta))
# 目录项
docs.extend(self._parse_toc(base_meta))
return docs
except Exception as e:
logger.error(f"文档加载失败: {self.file_path}", exc_info=True)
raise RuntimeError(f"无法加载文档: {e}")
finally:
# 清理临时
if os.path.exists(self._work_dir):
try:
import shutil; shutil.rmtree(self._work_dir)
except:
pass
def _preprocess_document(self) -> str:
# .docx 直接返回
if self.file_path.lower().endswith('.docx'):
return self.file_path
# .doc 转 .docx
output_path = os.path.join(self.output_dir, Path(self.file_path).stem + '.docx')
subprocess.run([
'soffice', '--headless', '--convert-to', 'docx', '--outdir', self.output_dir, self.file_path
], check=True, capture_output=True)
if not os.path.exists(output_path):
raise RuntimeError('文档格式转换失败')
return output_path
def _prepare_work_directories(self):
if os.path.exists(self._work_dir):
import shutil; shutil.rmtree(self._work_dir)
os.makedirs(self._doc_dir, exist_ok=True)
def _extract_docx(self, path: str):
with zipfile.ZipFile(path, 'r') as z:
z.extractall(self._work_dir)
def _parse_document_xml(self):
xml_path = os.path.join(self._doc_dir, 'document.xml')
parser = ET.XMLParser(remove_blank_text=True)
self._doc_tree = ET.parse(xml_path, parser)
self._doc_root = self._doc_tree.getroot()
def _get_text_from_runs(self, parent) -> str:
parts = []
for r in parent.findall('.//w:r', self.ns):
for t in r.findall('w:t', self.ns):
if t.text:
parts.append(t.text)
return ''.join(parts).strip()
def _parse_paragraphs(self, base_meta) -> List[Document]:
docs = []
for p in self._doc_root.findall('.//w:p', self.ns):
text = self._get_text_from_runs(p)
if not text:
continue
docs.append(Document(page_content=text, metadata={**base_meta, 'content_type':'paragraph'}))
return docs
def _parse_tables(self, base_meta) -> List[Document]:
docs = []
for tbl in self._doc_root.findall('.//w:tbl', self.ns):
# 简单按行为 \n 分段
rows = []
for row in tbl.findall('.//w:tr', self.ns):
cells = []
for cell in row.findall('.//w:tc', self.ns):
cells.append(self._get_text_from_runs(cell))
rows.append('|'.join(cells))
content = '\n'.join(rows)
docs.append(Document(page_content=content, metadata={**base_meta, 'content_type':'table'}))
return docs
def _parse_headers_and_footers(self, base_meta) -> List[Document]:
docs = []
for part in ['header', 'footer']:
for i in range(1,10):
path = os.path.join(self._doc_dir, f'{part}{i}.xml')
if not os.path.exists(path):
continue
tree = ET.parse(path)
root = tree.getroot()
for p in root.findall('.//w:p', self.ns):
text = self._get_text_from_runs(p)
if text:
docs.append(Document(page_content=text, metadata={**base_meta, 'content_type':part}))
return docs
def _parse_comments(self, base_meta) -> List[Document]:
docs = []
path = os.path.join(self._doc_dir, 'comments.xml')
if os.path.exists(path):
tree = ET.parse(path)
root = tree.getroot()
for c in root.findall('.//w:comment', self.ns):
text = self._get_text_from_runs(c)
if text:
docs.append(Document(page_content=text, metadata={**base_meta, 'content_type':'comment'}))
return docs
def _parse_revisions(self, base_meta) -> List[Document]:
docs = []
for tag in ['del','ins']:
for el in self._doc_root.findall(f'.//w:{tag}', self.ns):
text = self._get_text_from_runs(el)
if text:
docs.append(Document(page_content=text, metadata={**base_meta, 'content_type':'revision'}))
return docs
def _parse_toc(self, base_meta) -> List[Document]:
docs = []
for p in self._doc_root.findall('.//w:p', self.ns):
full = ''.join(t.text or '' for t in p.findall('w:t', self.ns)).strip()
if len(full)>255: continue
if p.find('.//w:tab', self.ns) is not None and full and full[-1].isdigit():
# 简单拆分标题和页码
parts = re.split(r'\t+', full)
title = parts[0]
docs.append(Document(page_content=title, metadata={**base_meta,'content_type':'toc_entry'}))
return docs