Files
gangyan/langchain-chat/document_loaders/gycWordLoader.py

205 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import List, Optional, Union, Dict, Any
from pathlib import Path
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
from docx import Document as DocxDocument
import os
import subprocess
import logging
import zipfile
from lxml import etree as ET
import re
logger = logging.getLogger(__name__)
class GCYWordLoader(BaseLoader):
"""用于加载和解析 Word 文档的自定义加载器支持标题层级结构解析及XML级别内容段落、表格、页眉页脚、批注、修订、目录"""
# WordprocessingML命名空间
ns = {
"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
}
def __init__(
self,
file_path: Union[str, Path],
output_dir: Optional[Union[str, Path]] = None,
*,
keep_doc_title: bool = True,
start_with_title: bool = False,
max_heading_level: int = 3,
metadata: Optional[Dict[str, Any]] = None
):
self.file_path = str(file_path)
self.output_dir = str(output_dir) if output_dir else os.path.dirname(self.file_path)
self.keep_doc_title = keep_doc_title
self.start_with_title = start_with_title
self.max_heading_level = min(max(1, max_heading_level), 6)
self.metadata = metadata or {}
# 临时解压目录
self._work_dir = os.path.join(self.output_dir, '_pyc_work')
self._doc_dir = os.path.join(self._work_dir, 'word')
# 验证
if not os.path.isfile(self.file_path):
raise FileNotFoundError(f"文件不存在: {self.file_path}")
if not self.file_path.lower().endswith(('.doc', '.docx')):
raise ValueError("仅支持 .doc 或 .docx 格式")
def load(self) -> List[Document]:
try:
# 预处理(.doc 转 .docx
processed_path = self._preprocess_document()
# 准备工作目录并解压
self._prepare_work_directories()
self._extract_docx(processed_path)
# 解析主文档 XML
self._parse_document_xml()
# 构建文档片段列表
docs: List[Document] = []
base_meta = {"source": self.file_path, "file_name": os.path.basename(self.file_path), **self.metadata}
# 支持保留文档标题
docx_core = DocxDocument(processed_path)
if self.keep_doc_title and docx_core.core_properties.title:
title = docx_core.core_properties.title
if self.start_with_title:
docs.append(Document(page_content=title, metadata={**base_meta, "heading": "Document Title"}))
else:
docs.append(Document(page_content=title, metadata={**base_meta, "heading": None}))
# 段落
docs.extend(self._parse_paragraphs(base_meta))
# 表格
docs.extend(self._parse_tables(base_meta))
# 页眉与页脚
docs.extend(self._parse_headers_and_footers(base_meta))
# 批注
docs.extend(self._parse_comments(base_meta))
# 修订
docs.extend(self._parse_revisions(base_meta))
# 目录项
docs.extend(self._parse_toc(base_meta))
return docs
except Exception as e:
logger.error(f"文档加载失败: {self.file_path}", exc_info=True)
raise RuntimeError(f"无法加载文档: {e}")
finally:
# 清理临时
if os.path.exists(self._work_dir):
try:
import shutil; shutil.rmtree(self._work_dir)
except:
pass
def _preprocess_document(self) -> str:
# .docx 直接返回
if self.file_path.lower().endswith('.docx'):
return self.file_path
# .doc 转 .docx
output_path = os.path.join(self.output_dir, Path(self.file_path).stem + '.docx')
subprocess.run([
'soffice', '--headless', '--convert-to', 'docx', '--outdir', self.output_dir, self.file_path
], check=True, capture_output=True)
if not os.path.exists(output_path):
raise RuntimeError('文档格式转换失败')
return output_path
def _prepare_work_directories(self):
if os.path.exists(self._work_dir):
import shutil; shutil.rmtree(self._work_dir)
os.makedirs(self._doc_dir, exist_ok=True)
def _extract_docx(self, path: str):
with zipfile.ZipFile(path, 'r') as z:
z.extractall(self._work_dir)
def _parse_document_xml(self):
xml_path = os.path.join(self._doc_dir, 'document.xml')
parser = ET.XMLParser(remove_blank_text=True)
self._doc_tree = ET.parse(xml_path, parser)
self._doc_root = self._doc_tree.getroot()
def _get_text_from_runs(self, parent) -> str:
parts = []
for r in parent.findall('.//w:r', self.ns):
for t in r.findall('w:t', self.ns):
if t.text:
parts.append(t.text)
return ''.join(parts).strip()
def _parse_paragraphs(self, base_meta) -> List[Document]:
docs = []
for p in self._doc_root.findall('.//w:p', self.ns):
text = self._get_text_from_runs(p)
if not text:
continue
docs.append(Document(page_content=text, metadata={**base_meta, 'content_type':'paragraph'}))
return docs
def _parse_tables(self, base_meta) -> List[Document]:
docs = []
for tbl in self._doc_root.findall('.//w:tbl', self.ns):
# 简单按行为 \n 分段
rows = []
for row in tbl.findall('.//w:tr', self.ns):
cells = []
for cell in row.findall('.//w:tc', self.ns):
cells.append(self._get_text_from_runs(cell))
rows.append('|'.join(cells))
content = '\n'.join(rows)
docs.append(Document(page_content=content, metadata={**base_meta, 'content_type':'table'}))
return docs
def _parse_headers_and_footers(self, base_meta) -> List[Document]:
docs = []
for part in ['header', 'footer']:
for i in range(1,10):
path = os.path.join(self._doc_dir, f'{part}{i}.xml')
if not os.path.exists(path):
continue
tree = ET.parse(path)
root = tree.getroot()
for p in root.findall('.//w:p', self.ns):
text = self._get_text_from_runs(p)
if text:
docs.append(Document(page_content=text, metadata={**base_meta, 'content_type':part}))
return docs
def _parse_comments(self, base_meta) -> List[Document]:
docs = []
path = os.path.join(self._doc_dir, 'comments.xml')
if os.path.exists(path):
tree = ET.parse(path)
root = tree.getroot()
for c in root.findall('.//w:comment', self.ns):
text = self._get_text_from_runs(c)
if text:
docs.append(Document(page_content=text, metadata={**base_meta, 'content_type':'comment'}))
return docs
def _parse_revisions(self, base_meta) -> List[Document]:
docs = []
for tag in ['del','ins']:
for el in self._doc_root.findall(f'.//w:{tag}', self.ns):
text = self._get_text_from_runs(el)
if text:
docs.append(Document(page_content=text, metadata={**base_meta, 'content_type':'revision'}))
return docs
def _parse_toc(self, base_meta) -> List[Document]:
docs = []
for p in self._doc_root.findall('.//w:p', self.ns):
full = ''.join(t.text or '' for t in p.findall('w:t', self.ns)).strip()
if len(full)>255: continue
if p.find('.//w:tab', self.ns) is not None and full and full[-1].isdigit():
# 简单拆分标题和页码
parts = re.split(r'\t+', full)
title = parts[0]
docs.append(Document(page_content=title, metadata={**base_meta,'content_type':'toc_entry'}))
return docs