Files
gangyan/langchain-chat/server/knowledge_base/file_converter.py

1256 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
from datetime import datetime
from html import escape
import os
import re
import subprocess
import tempfile
import uuid
import docx
import markdown
import fitz # PyMuPDF
from docx import Document
from typing import Optional
from collections import defaultdict
import zipfile
from lxml import etree
from docx import Document
from docx.oxml import parse_xml
from io import BytesIO
import base64
import os
import xml.etree.ElementTree as ET
import openpyxl
import pandas as pd
import xlrd
from openpyxl.styles import Font, PatternFill
from configs.kb_config import (
GENERATED_IMAGES_BASE_PATH,
IMAGE_SERVER_URL_TEMPLATE,
PDF_CONVERT_KB_ROOT,
)
NS = {
'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main',
'wp': 'http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing',
'a': 'http://schemas.openxmlformats.org/drawingml/2006/main',
'r': 'http://schemas.openxmlformats.org/officeDocument/2006/relationships',
'm': 'http://schemas.openxmlformats.org/officeDocument/2006/math',
'pic': 'http://schemas.openxmlformats.org/drawingml/2006/picture',
'v': 'urn:schemas-microsoft-com:vml',
'pkg': 'http://schemas.openxmlformats.org/package/2006/relationships',
'wps': 'http://schemas.microsoft.com/office/word/2010/wordprocessingShape',
'o': 'urn:schemas-microsoft-com:office:office',
'mc': 'http://schemas.openxmlformats.org/markup-compatibility/2006',
}
class FileConverter:
def __init__(self, libreoffice_path: str = "libreoffice"):
self.libreoffice_path = libreoffice_path
self._default_image_dir = "/home/albert/Documents/docx_images"
def _clean_html(self, html: str) -> str:
"""HTML后处理方法"""
# 提取body内容
body_match = re.search(r'<body[^>]*>(.*?)</body>', html, re.DOTALL)
content = body_match.group(1) if body_match else html
# 清理不需要的标签和属性
content = re.sub(r'<style.*?>.*?</style>', '', content, flags=re.DOTALL)
# content = re.sub(r'</?span[^>]*>', '', content)
content = re.sub(r'\s+style="[^"]*"', '', content)
# 添加元素ID
return self._add_element_ids(content).strip()
def _add_element_ids(self, content: str) -> str:
"""为元素添加唯一ID"""
counters = defaultdict(int)
def replace_tag(match):
tag = match.group(1).lower()
counters[tag] += 1
attrs = re.sub(r'\s+id="[^"]*"', '', match.group(2))
return f'<{tag} id="{tag}-{counters[tag]}"{attrs}>'
# 扩展匹配规则包含表格相关标签
content = re.sub(
r'<(h[1-6]|p|div|span|table|td|th|tr)(\b[^>]*)>',
replace_tag,
content,
flags=re.IGNORECASE
)
return content
def _save_html(self, content: str, output_path: Optional[str] = None) -> str:
"""统一保存方法"""
cleaned = self._clean_html(content)
if output_path:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(cleaned)
return cleaned
def _clean_docx_html(self, html: str) -> str:
"""HTML后处理方法"""
# 提取body内容
body_match = re.search(r'<body[^>]*>(.*?)</body>', html, re.DOTALL)
content = body_match.group(1) if body_match else html
# 清理不需要的标签和属性
# content = re.sub(r'<style.*?>.*?</style>', '', content, flags=re.DOTALL)
# content = re.sub(r'</?span[^>]*>', '', content)
# content = re.sub(r'\s+style="[^"]*"', '', content)
# 添加元素ID
return self._add_docx_element_ids(content).strip()
def _add_docx_element_ids(self, content: str) -> str:
"""为元素添加唯一ID"""
counters = defaultdict(int)
def replace_tag(match):
tag = match.group(1).lower()
# 检查是否是 div 标签且 id 为 comment-数字 格式
if tag == 'div':
id_pattern = re.compile(r'\s+id="comment-\d+"')
if id_pattern.search(match.group(2)):
return match.group(0) # 如果匹配,不做替换,直接返回原标签
counters[tag] += 1
attrs = re.sub(r'\s+id="[^"]*"', '', match.group(2))
return f'<{tag} id="{tag}-{counters[tag]}"{attrs}>'
# 扩展匹配规则包含表格相关标签
content = re.sub(
r'<(h[1-6]|p|div|span|table|td|th|tr|style|strong|em|a|u)(\b[^>]*)>',
replace_tag,
content,
flags=re.IGNORECASE
)
return content
def _save_docx_html(self, content: str, output_path: Optional[str] = None) -> str:
"""统一保存方法"""
cleaned = self._clean_docx_html(content)
if output_path:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(cleaned)
return cleaned
def txt_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
"""txt转换方法"""
try:
with open(input_path, 'r', encoding='utf-8') as f:
content = f.read()
# 将每行文本转换为p标签
paragraphs = [f'<p>{line}</p>' for line in content.splitlines() if line.strip()]
return self._save_html(f'<body>{"".join(paragraphs)}</body>', output_path)
except Exception as e:
raise RuntimeError(f"文本转换失败: {str(e)}")
def doc_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
"""DOC转换方法"""
try:
with tempfile.TemporaryDirectory() as temp_dir:
# 转换DOC到DOCX
cmd = [
self.libreoffice_path,
'--headless',
'--convert-to', 'docx',
'--outdir', temp_dir,
input_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"LibreOffice错误: {result.stderr}")
# 获取转换后的DOCX路径
base_name = os.path.splitext(os.path.basename(input_path))[0]
converted_docx = os.path.join(temp_dir, f"{base_name}.docx")
if not os.path.exists(converted_docx):
raise FileNotFoundError("转换后的DOCX文件未找到")
# 使用DOCX处理流程
return self.docx_to_html(converted_docx, output_path)
except Exception as e:
raise RuntimeError(f"DOC转换失败: {str(e)}")
# def docx_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
# """DOCX转换方法"""
# try:
# doc = Document(input_path)
# html = ['<body>']
# # 按文档顺序处理所有元素
# for element in doc.element.body:
# # 处理段落
# if element.tag.endswith('p'):
# para = docx.text.paragraph.Paragraph(element, doc)
# if not para.text.strip():
# continue
# style_name = getattr(para.style, "name", None)
# if style_name and style_name.startswith('Heading'):
# level = min(int(para.style.name[-1]), 6)
# html.append(f'<h{level}>{para.text}</h{level}>')
# else:
# html.append(f'<p>{para.text}</p>')
# # 处理表格
# elif element.tag.endswith('tbl'):
# table = docx.table.Table(element, doc)
# # 添加表格容器
# html.append('<div class="table-container">')
# html.append('<table border="1" style="border-collapse: collapse">')
# for row in table.rows:
# html.append('<tr>')
# for cell in row.cells:
# html.append(f'<td style="padding: 5px">{cell.text}</td>')
# html.append('</tr>')
# html.append('</table>')
# html.append('</div>') # 关闭表格容器
# return self._save_html(''.join(html) + '</body>', output_path)
# except Exception as e:
# raise RuntimeError(f"DOCX转换失败: {str(e)}")
def md_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
"""MD转换方法"""
try:
with open(input_path, 'r', encoding='utf-8') as f:
md_content = f.read()
# 公式预处理:替换公式为占位符
md_content, formula_map = self._preserve_formulas(md_content)
# 转换Markdown为HTML
body_content = markdown.markdown(
md_content,
extensions=['extra', 'tables', 'codehilite']
)
pattern = r'<html>|<body>|</body>|</html>'
result = re.sub(pattern, '', body_content)
# 公式后处理
formula_result = self._restore_formulas(result, formula_map)
# 表格后处理
body_content = self._process_tables(formula_result)
return self._save_html(f'<body>{body_content}</body>', output_path)
except Exception as e:
raise RuntimeError(f"Markdown转换失败: {str(e)}")
def _process_tables(self, html: str) -> str:
"""MD表格处理方法"""
# 添加表格容器
html = re.sub(
r'(<table\b[^>]*>)',
r'<div class="table-container">\1',
html,
flags=re.IGNORECASE
)
html = re.sub(
r'(</table>)',
r'\1</div>',
html,
flags=re.IGNORECASE
)
# 添加基础表格样式
html = re.sub(
r'<table\b([^>]*)>',
r'<table\1 border="1" style="border-collapse: collapse; width: 100%; margin: 1em 0;">',
html,
flags=re.IGNORECASE
)
# 单元格样式增强
html = re.sub(
r'<(td|th)\b([^>]*)>',
r'<\1\2 style="padding: 8px; border: 1px solid #ddd;">',
html,
flags=re.IGNORECASE
)
# 表头样式
html = re.sub(
r'<th\b([^>]*)>',
r'<th\1 style="background-color: #f8f9fa; font-weight: bold;">',
html,
flags=re.IGNORECASE
)
return html
def _preserve_formulas(self, md_content: str) -> tuple:
"""公式预处理:将公式替换为唯一占位符"""
formula_map = {}
# 匹配块级公式 $$...$$
def block_replace(match):
formula_id = uuid.uuid4().hex
formula_map[formula_id] = match.group(0)
return f'\n\nFORMULA_BLOCK_{formula_id}\n\n'
# 匹配行内公式 $...$
def inline_replace(match):
formula_id = uuid.uuid4().hex
formula_map[formula_id] = match.group(0)
return f'FORMULA_INLINE_{formula_id}'
# 按顺序处理块级公式和行内公式
processed_content = re.sub(
r'\$\$(.*?)\$\$',
block_replace,
md_content,
flags=re.DOTALL
)
processed_content = re.sub(
r'(?<!\\)\$([^\n$]+)(?<!\\)\$',
inline_replace,
processed_content
)
return processed_content, formula_map
def _restore_formulas(self, html: str, formula_map: dict) -> str:
"""公式还原:将占位符替换回原始公式内容"""
# 输入校验
if not isinstance(html, str) or not isinstance(formula_map, dict):
raise ValueError("参数类型错误html 必须是字符串formula_map 必须是字典")
# 定义通用的替换函数
def replace_formula(match):
key = match.group(1)
return formula_map.get(key, f"{{UNKNOWN_FORMULA_{key}}}") # 防止公式丢失时静默失败
# 块级公式还原
html = re.sub(r'FORMULA_BLOCK_([a-f0-9]{32})', replace_formula, html)
# 行内公式还原
html = re.sub(r'FORMULA_INLINE_([a-f0-9]{32})', replace_formula, html)
return html
def docx_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
# 初始化参数
image_dir = self._default_image_dir
os.makedirs(image_dir, exist_ok=True)
# 读取DOCX文件
doc = Document(input_path)
html = ['<!DOCTYPE html><html><head><meta charset="UTF-8"></head><body>']
# 处理批注和注释
comment_result = self._extract_comments(input_path)
active_comments = comment_result["active_comments"]
deleted_comments = comment_result["deleted_comments"]
if active_comments:
html.append('<div class="comments" style="border-top: 1px solid #ccc; margin-top: 20px;">')
# 解析主文档
with zipfile.ZipFile(input_path) as z:
doc_xml = z.read('word/document.xml')
doc_tree = etree.fromstring(doc_xml)
rels_xml = z.read('word/_rels/document.xml.rels') if 'word/_rels/document.xml.rels' in z.namelist() else None
rels_tree = etree.fromstring(rels_xml) if rels_xml else None
# 遍历所有文档元素
for element in doc_tree.xpath('//w:body/*', namespaces=NS):
if element.tag.endswith('p'):
html.append(self._process_paragraph(element, z, image_dir, rels_tree))
elif element.tag.endswith('tbl'):
html.append(self._process_table(element, z, image_dir, rels_tree))
# 添加注释内容
if active_comments:
html.append('<h4>审阅批注:</h4>')
for comment_id, comment_info in active_comments.items():
author = comment_info["author"]
date = comment_info["date"]
text = comment_info["text"]
html.append(f'<div id="comment-{comment_id}">[{comment_id}] {author},{date} 批注: {text}</div>')
html.append('</div>')
if deleted_comments:
html.append('<h4>删除:</h4>')
for comment_id, comment_info in deleted_comments.items():
author = comment_info["author"]
date = comment_info["date"]
text = comment_info["text"]
html.append(f'<div id="comment-{comment_id}">[{comment_id}] {author}({date})删除的内容: {text}</div>')
html.append('</body></html>')
# 处理输出
html_str = '\n'.join(html)
if output_path:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_str)
return self._save_docx_html(html_str)
def _extract_comments(self, docx_path: str) -> dict:
comments = {}
deleted_comment_ids = set()
with zipfile.ZipFile(docx_path) as z:
# 提取批注基础信息
if 'word/comments.xml' in z.namelist():
comments_xml = z.read('word/comments.xml')
comments_tree = etree.fromstring(comments_xml)
for comm in comments_tree.xpath('//w:comment', namespaces=NS):
# 提取批注元数据
comm_id = comm.get(f'{{{NS["w"]}}}id')
author = comm.get(f'{{{NS["w"]}}}author', 'Unknown')
date_str = comm.get(f'{{{NS["w"]}}}date', '')
if date_str:
try:
# 尝试解析 ISO 8601 格式的日期
date = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
# 转换为更易读的格式,例如 'YYYY-MM-DD HH:MM:SS'
date = date.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果解析失败,保留原始字符串
date = date_str
else:
date = ''
text = ''.join(comm.xpath('.//w:t/text()', namespaces=NS))
# 存储批注信息保留原始ID和新ID映射
comments[comm_id] = {
"original_id": comm_id,
"new_id": str(int(comm_id) + 1), # 按需求调整ID生成逻辑
"author": author,
"date": date,
"text": text.strip(),
"deleted": False
}
# 检测被删除的批注操作
if 'word/document.xml' in z.namelist():
doc_xml = z.read('word/document.xml')
doc_tree = etree.fromstring(doc_xml)
# 查找所有删除修订中的批注引用
for del_ref in doc_tree.xpath('//w:del//w:commentReference', namespaces=NS):
deleted_id = del_ref.get(f'{{{NS["w"]}}}id')
deleted_comment_ids.add(deleted_id)
# 标记已删除批注
for comm_id in deleted_comment_ids:
if comm_id in comments:
comments[comm_id]["deleted"] = True
# 返回结构化结果
return {
"active_comments": {
v["new_id"]: v for v in comments.values() if not v["deleted"]
},
"deleted_comments": {
v["new_id"]: v for v in comments.values() if v["deleted"]
}
}
def _process_paragraph(self, para, zip_file, image_dir, rels_tree, in_textbox=False) -> str:
for ac in para.xpath('.//mc:AlternateContent', namespaces=NS):
for node in ac.xpath('.//mc:Choice | .//mc:Fallback', namespaces=NS):
para.addprevious(node) # 把内容“提升”到 para 直接子节点
ac.getparent().remove(ac)
fragments = []
if not in_textbox:
# — VML 文本框 —
shapes_vml = para.xpath(
'.//w:pict//v:shape[v:textbox] | .//w:pict//v:rect[v:textbox]',
namespaces=NS
)
for shape in shapes_vml:
style = (shape.get('style') or '').rstrip(';') + ';'
fill = shape.find('.//v:fill', namespaces=NS)
img_url = None
if fill is not None:
relid = fill.get(f"{{{NS['o']}}}relid") or fill.get('src')
if relid:
img_url = (relid.startswith('rId')
and self._save_image_by_rid(relid, zip_file, image_dir)
or relid)
bg = f'background-image:url({img_url});background-size:cover;' if img_url else ''
fragments.append(f'<div class="textbox" style="{style}{bg}">')
for txbx in shape.xpath('.//v:textbox//w:txbxContent', namespaces=NS):
for p in txbx.xpath('.//w:p', namespaces=NS):
fragments.append(
self._process_paragraph(p, zip_file, image_dir, rels_tree, True)
)
fragments.append('</div>')
# — DML 文本框 —
shapes_dml = para.xpath(
'.//w:drawing//wps:wsp[wps:txbx]',
namespaces=NS
)
for wsp in shapes_dml:
blip = wsp.find('.//wps:spPr/a:blipFill/a:blip', namespaces=NS)
img_url = None
if blip is not None:
rid = blip.get(f"{{{NS['r']}}}embed")
if rid:
img_url = self._save_image_by_rid(rid, zip_file, image_dir)
bg = f'background-image:url({img_url});background-size:cover;' if img_url else ''
fragments.append(f'<div class="textbox" style="{bg}">')
for txbx in wsp.xpath('.//w:txbxContent', namespaces=NS):
for p in txbx.xpath('.//w:p', namespaces=NS):
fragments.append(
self._process_paragraph(p, zip_file, image_dir, rels_tree, True)
)
fragments.append('</div>')
# 若有任何文本框内容,先返回
if fragments:
return ''.join(fragments)
# — 普通段落逻辑 —
p_props = para.xpath('.//w:pPr', namespaces=NS)
p_style = self._apply_paragraph_styles(p_props[0]) if p_props else ""
p_html = [f'<p {p_style}>']
for run in para.xpath('.//w:r', namespaces=NS):
text = ''.join(run.xpath('.//w:t/text()', namespaces=NS))
run_props = run.xpath('.//w:rPr', namespaces=NS)
if run_props:
text = self._apply_text_styles(run_props[0], text)
if run_props[0].xpath('.//w:strike', namespaces=NS):
text = f'<del>{text}</del>'
if run_props[0].xpath('.//w:dstrike', namespaces=NS):
text = f'<s>{text}</s>'
comment_ref = run.xpath('.//w:commentReference', namespaces=NS)
if comment_ref:
comm_id = comment_ref[0].get(f"{{{NS['w']}}}id")
new_id = str(int(comm_id) + 1)
text += f'<sup><a href="#comment-{new_id}">[{new_id}]</a></sup>'
if self._has_valid_image(run):
text += self._process_image(run, zip_file)
p_html.append(text)
p_html.append('</p>')
return ''.join(p_html)
def _save_image_by_rid(self, r_id, zip_file, image_dir=None):
"""
根据 relationship id(r_id) 从 zip_file 中提取图片到 GENERATED_IMAGES_BASE_PATH
并返回通过 IMAGE_SERVER_URL_TEMPLATE 拼接后的 URL 字符串。
"""
# 1. 确保输出目录存在(只用 GENERATED_IMAGES_BASE_PATH
out_dir = os.path.abspath(GENERATED_IMAGES_BASE_PATH)
os.makedirs(out_dir, exist_ok=True)
# 2. 解析 rels 文件,找到对应 rId 的 Relationship
rels_path = 'word/_rels/document.xml.rels'
try:
rels_data = zip_file.read(rels_path)
except KeyError:
return None
ns = {'pkg': 'http://schemas.openxmlformats.org/package/2006/relationships'}
root = ET.fromstring(rels_data)
rel = root.find(f".//pkg:Relationship[@Id='{r_id}']", namespaces=ns)
if rel is None or not rel.get('Target'):
return None
target = rel.get('Target') # e.g. "media/image1.png"
internal_path = os.path.normpath(os.path.join('word', target))
# 3. 从 ZIP 中读取图片二进制
try:
img_data = zip_file.read(internal_path)
except KeyError:
return None
# 4. 确定扩展名
ext = os.path.splitext(target)[1].lstrip('.').lower()
if ext not in ('jpeg', 'jpg', 'gif', 'png', 'webp'):
ext = 'png'
# 5. 写入文件
filename = f"{r_id}.{ext}"
out_path = os.path.join(out_dir, filename)
with open(out_path, 'wb') as f:
f.write(img_data)
# 6. 返回拼接后的 URL
return IMAGE_SERVER_URL_TEMPLATE.format(filename)
def _has_valid_image(self, run) -> bool:
"""精确判断是否存在有效图片"""
# 检查完整的图片元素结构
return any(
run.xpath('.//wp:inline/a:graphic/a:graphicData/pic:pic', namespaces=NS) or
run.xpath('.//wp:anchor/a:graphic/a:graphicData/pic:pic', namespaces=NS)
)
def _process_image(self, run, zip_file, image_dir=None) -> str:
"""
提取 run 中内联图片,保存到 GENERATED_IMAGES_BASE_PATH
并返回带硬编码服务器地址的 <img> 标签。
"""
# 1. 定位 blip 元素,获取关系 ID
blips = run.xpath('.//a:blip', namespaces=NS)
if not blips:
return ''
rid = blips[0].get(f"{{{NS['r']}}}embed")
if not rid:
return ''
# 2. 解析关系文件,找到实际图片路径
rels_path = 'word/_rels/document.xml.rels'
try:
with zip_file.open(rels_path) as rels_file:
tree = ET.parse(rels_file)
except Exception:
return ''
pkg_ns = {'r': 'http://schemas.openxmlformats.org/package/2006/relationships'}
rel = tree.find(f".//r:Relationship[@Id='{rid}']", namespaces=pkg_ns)
if rel is None:
return ''
target = rel.get('Target')
if not target:
return ''
# 3. 计算在 ZIP 内部的路径,并读取字节
internal_path = os.path.normpath(os.path.join('word', target))
try:
img_data = zip_file.read(internal_path)
except KeyError:
try:
img_data = zip_file.read(target)
except Exception:
return ''
# 4. 确定扩展名
ext = os.path.splitext(target)[1].lstrip('.').lower()
if ext not in ('jpeg', 'jpg', 'gif', 'png', 'webp'):
ext = 'png'
# 5. 生成文件名和写入目标目录
file_name = f"{rid}.{ext}"
out_dir = os.path.abspath(GENERATED_IMAGES_BASE_PATH)
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, file_name)
with open(out_path, 'wb') as f:
f.write(img_data)
# 6. 返回带硬编码服务器 URL 的 <img> 标签
url = IMAGE_SERVER_URL_TEMPLATE.format(file_name)
return f'<img src="{url}" style="max-width:600px"/>'
def _apply_paragraph_styles(self, p_props) -> str:
style_attrs = []
# 处理居中
align = p_props.xpath('.//w:jc/@w:val', namespaces=NS)
if align:
if align[0] == 'center':
style_attrs.append('text-align: center;')
return f'style="{" ".join(style_attrs)}"' if style_attrs else ""
def _get_highlight_color(self, highlight_val):
# 定义颜色映射表
color_map = {
'yellow': '#ffff00',
'green': '#00ff00',
'cyan': '#00ffff',
'magenta': '#ff00ff',
'red': '#ff0000',
'blue': '#0000ff',
'black': '#000000',
'white': '#ffffff',
'gray': '#808080',
'orange': '#ffa500',
'purple': '#800080',
'pink': '#ffc0cb',
'brown': '#a52a2a',
'lime': '#00ff00',
'olive': '#808000',
'navy': '#000080',
'teal': '#008080',
'maroon': '#800000',
'silver': '#c0c0c0',
'gold': '#ffd700',
'indigo': '#4b0082',
'violet': '#ee82ee',
'turquoise': '#40e0d0',
'coral': '#ff7f50',
'salmon': '#fa8072',
'khaki': '#f0e68c',
'tan': '#d2b48c',
'sienna': '#a0522d',
'chocolate': '#d2691e',
'peru': '#cd853f',
'saddlebrown': '#8b4513',
'rosybrown': '#bc8f8f',
'moccasin': '#ffe4b5',
'bisque': '#ffe4c4',
'peachpuff': '#ffdab9',
'papayawhip': '#ffefd5',
'blanchedalmond': '#ffebcd',
'navajowhite': '#ffdead',
'antiquewhite': '#faebd7',
'linen': '#faf0e6',
'oldlace': '#fdf5e6',
'azure': '#f0ffff',
'mintcream': '#f5fffa',
'aliceblue': '#f0f8ff',
'lavender': '#e6e6fa',
'lavenderblush': '#fff0f5',
'mistyrose': '#ffe4e1',
'gainsboro': '#dcdcdc',
'lightgrey': '#d3d3d3',
'lightgray': '#d3d3d3',
'silver': '#c0c0c0',
'darkgray': '#a9a9a9',
'darkgrey': '#a9a9a9',
'dimgray': '#696969',
'dimgrey': '#696969',
'lightslategray': '#778899',
'lightslategrey': '#778899',
'slategray': '#708090',
'slategrey': '#708090',
'darkslategray': '#2f4f4f',
'darkslategrey': '#2f4f4f',
'lightsteelblue': '#b0c4de',
'powderblue': '#b0e0e6',
'lightblue': '#add8e6',
'skyblue': '#87ceeb',
'lightskyblue': '#87cefa',
'deepskyblue': '#00bfff',
'dodgerblue': '#1e90ff',
'royalblue': '#4169e1',
'blueviolet': '#8a2be2',
'mediumorchid': '#ba55d3',
'thistle': '#d8bfd8',
'plum': '#dda0dd',
'violet': '#ee82ee',
'orchid': '#da70d6',
'magenta': '#ff00ff',
'hotpink': '#ff69b4',
'deeppink': '#ff1493',
'palevioletred': '#db7093',
'crimson': '#dc143c',
'firebrick': '#b22222',
'darkred': '#8b0000',
'indianred': '#cd5c5c',
'rosybrown': '#bc8f8f',
'saddlebrown': '#8b4513',
'sienna': '#a0522d',
'chocolate': '#d2691e',
'peru': '#cd853f',
'burlywood': '#deb887',
'beige': '#f5f5dc',
'wheat': '#f5deb3',
'sandybrown': '#f4a460',
'goldenrod': '#daa520',
'darkgoldenrod': '#b8860b',
'gold': '#ffd700',
'orange': '#ffa500',
'darkorange': '#ff8c00',
'coral': '#ff7f50',
'tomato': '#ff6347',
'orangered': '#ff4500',
'red': '#ff0000',
'darkred': '#8b0000',
'salmon': '#fa8072',
'lightsalmon': '#ffa07a',
'darksalmon': '#e9967a',
'crimson': '#dc143c',
'firebrick': '#b22222',
'darkred': '#8b0000',
'lightcoral': '#f08080',
'indianred': '#cd5c5c',
'rosybrown': '#bc8f8f',
'saddlebrown': '#8b4513',
'sienna': '#a0522d',
'chocolate': '#d2691e',
'peru': '#cd853f',
'burlywood': '#deb887',
'beige': '#f5f5dc',
'wheat': '#f5deb3',
'sandybrown': '#f4a460',
'tan': '#d2b48c',
'navajowhite': '#ffdead',
'bisque': '#ffe4c4',
'blanchedalmond': '#ffebcd',
'papayawhip': '#ffefd5',
'moccasin': '#ffe4b5',
'antiquewhite': '#faebd7',
'linen': '#faf0e6',
'oldlace': '#fdf5e6',
'floralwhite': '#fffaf0',
'ivory': '#fffff0',
'lemonchiffon': '#fffacd',
'cornsilk': '#fff8dc',
'seashell': '#fff5ee',
'mintcream': '#f5fffa',
'azure': '#f0ffff',
'aliceblue': '#f0f8ff',
'lavender': '#e6e6fa',
'lavenderblush': '#fff0f5',
'mistyrose': '#ffe4e1',
'white': '#ffffff',
'snow': '#fffafa',
'honeydew': '#f0fff0',
'mintcream': '#f5fffa',
'azure': '#f0ffff',
'aliceblue': '#f0f8ff',
'ghostwhite': '#f8f8ff',
'whitesmoke': '#f5f5f5',
'seashell': '#fff5ee',
'cornsilk': '#fff8dc',
'blanchedalmond': '#ffebcd',
'bisque': '#ffe4c4',
'navajowhite': '#ffdead',
'antiquewhite': '#faebd7',
'burlywood': '#deb887',
'wheat': '#f5deb3',
'tan': '#d2b48c',
'rosybrown': '#bc8f8f',
'sandybrown': '#f4a460',
'goldenrod': '#daa520',
'darkgoldenrod': '#b8860b',
'peru': '#cd853f',
'chocolate': '#d2691e',
'saddlebrown': '#8b4513',
'sienna': '#a0522d',
'brown': '#a52a2a',
'maroon': '#800000',
'transparent': 'transparent',
}
# 将输入的颜色名称转换为小写,以确保大小写不影响匹配
highlight_val = highlight_val.lower()
# 返回映射的颜色,如果未找到则返回默认值 'transparent'
return color_map.get(highlight_val, 'transparent')
def _apply_text_styles(self, run_props, text: str) -> str:
"""增强版文本样式处理"""
style_stack = []
# 字体大小单位转换1pt = 2倍w:sz值
if sz := run_props.xpath('.//w:sz/@w:val', namespaces=NS):
size_pt = int(sz[0]) / 2
style_stack.append(f"font-size: {size_pt}pt;")
# # 字体颜色
# if color := run_props.xpath('.//w:color/@w:val', namespaces=NS):
# hex_color = self._get_color_hex(color[0])
# style_stack.append(f"color: {hex_color};")
# 字体系列
if font := run_props.xpath('.//w:rFonts/@w:ascii', namespaces=NS):
style_stack.append(f"font-family: {font[0]};")
# 粗体/斜体/下划线(保留原有逻辑)
if run_props.xpath('.//w:b', namespaces=NS):
text = f'<strong>{text}</strong>'
if run_props.xpath('.//w:i', namespaces=NS):
text = f'<em>{text}</em>'
if run_props.xpath('.//w:u', namespaces=NS):
text = f'<u>{text}</u>'
# 上下标处理
if vert_align := run_props.xpath('.//w:vertAlign/@w:val', namespaces=NS):
if vert_align[0] == 'superscript':
text = f'<sup>{text}</sup>'
elif vert_align[0] == 'subscript':
text = f'<sub>{text}</sub>'
# 列表序号特殊处理(需配合段落级检测)
if self._is_list_number(run_props): # 需要实现段落检测方法
list_type = self._get_list_type(run_props)
text = f'<span class="list-number {list_type}">{text}</span>'
# 高亮背景色
if highlight := run_props.xpath('.//w:highlight/@w:val', namespaces=NS):
color = self._get_highlight_color(highlight[0])
style_stack.append(f"background-color: {color};")
# 组合内联样式
if style_stack:
style_str = ' '.join(style_stack)
text = f'<span style="{style_str}">{text}</span>'
return text
def _is_list_number(self, run_props) -> bool:
"""检测是否为列表序号(需要结合段落信息)"""
# 需在段落处理中设置上下文状态
# 示例实现检查是否存在numPr元素
return run_props.getparent().xpath('ancestor::w:p/w:pPr/w:numPr', namespaces=NS)
def _get_list_type(self, run_props) -> str:
"""获取列表类型(有序/无序)"""
num_id = run_props.getparent().xpath('ancestor::w:p/w:pPr/w:numPr/w:numId/@w:val', namespaces=NS)
# 需要访问numbering.xml获取具体类型
return 'ordered' if num_id else 'unordered'
def _process_table(self, table, zip_file, image_dir: str, rels_tree) -> str:
html = ['<table border="1" style="border-collapse: collapse;">']
for row in table.xpath('.//w:tr', namespaces=NS):
html.append('<tr>')
for cell in row.xpath('.//w:tc', namespaces=NS):
html.append('<td style="padding: 4px;">')
for p in cell.xpath('.//w:p', namespaces=NS):
html.append(self._process_paragraph(p, zip_file, image_dir, rels_tree))
html.append('</td>')
html.append('</tr>')
html.append('</table>')
return ''.join(html)
def _pdf_plain_text_to_html(self, text: str) -> str:
"""将单页纯文本转为简单段落 HTML已 escape"""
if not (text or "").strip():
return '<p><em>(本页无文本内容)</em></p>'
parts: list[str] = []
for line in text.splitlines():
if line.strip():
parts.append(f"<p>{escape(line)}</p>")
else:
parts.append("<br/>")
return "".join(parts) if parts else '<p><em>(本页无文本内容)</em></p>'
@staticmethod
def _escape_html(text: str) -> str:
"""HTML 转义"""
return text.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;').replace('"', '&quot;')
def pdf_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
"""PDF 预览:使用 pdfplumber 提取文本和表格,生成干净的 HTML。"""
allowed_pdf_root = os.path.abspath(PDF_CONVERT_KB_ROOT)
abs_input = os.path.abspath(input_path)
if abs_input != allowed_pdf_root and not abs_input.startswith(allowed_pdf_root + os.sep):
return (
f"PDF路径不在知识库根目录下: input={input_path!r}, root={allowed_pdf_root!r}"
"可设置环境变量 PDF_CONVERT_KB_ROOT。"
)
if not os.path.isfile(abs_input):
return f"PDF文件不存在: {abs_input}"
if os.path.splitext(abs_input)[1].lower() != ".pdf":
return "不是 PDF 文件"
try:
import pdfplumber
import re
sections: list[str] = []
any_text = False
with pdfplumber.open(abs_input) as pdf:
for i, page in enumerate(pdf.pages):
page_parts: list[str] = []
# 提取表格
tables = page.extract_tables()
table_bboxes = []
if tables:
for tbl_settings in page.find_tables():
table_bboxes.append(tbl_settings.bbox)
# 提取文本(排除表格区域的文本)
text = page.extract_text() or ""
if text.strip():
any_text = True
lines = text.split('\n')
# 合并连续非空行为段落,空行分段,标题行独立
current_para = []
for line in lines:
stripped = line.strip()
if not stripped:
# 空行 → 结束当前段落
if current_para:
page_parts.append(f'<p>{self._escape_html("".join(current_para))}</p>')
current_para = []
continue
# 标题检测
is_heading = (len(stripped) < 30
and not stripped.endswith(('', '', '', '', '', ',', '.', ';'))
and not stripped.startswith(('', '('))
and re.match(r'^[一二三四五六七八九十\d]+[、.]', stripped))
if is_heading:
# 先输出累积的段落
if current_para:
page_parts.append(f'<p>{self._escape_html("".join(current_para))}</p>')
current_para = []
page_parts.append(f'<h3>{self._escape_html(stripped)}</h3>')
else:
current_para.append(stripped)
# 输出最后一个段落
if current_para:
page_parts.append(f'<p>{self._escape_html("".join(current_para))}</p>')
# 渲染表格
for table in tables:
if not table:
continue
page_parts.append('<table class="pdf-table">')
for row_idx, row in enumerate(table):
page_parts.append('<tr>')
tag = 'th' if row_idx == 0 else 'td'
for cell in row:
cell_text = self._escape_html(str(cell)) if cell is not None else ''
page_parts.append(f'<{tag}>{cell_text}</{tag}>')
page_parts.append('</tr>')
page_parts.append('</table>')
page_html = '\n'.join(page_parts)
sections.append(
f'<section class="pdf-page" data-page="{i + 1}">'
f'<div class="pdf-page-num">第 {i + 1} 页</div>'
f'{page_html}</section>'
)
css = '''<style>
.pdf-preview { font-family: "PingFang SC", "Microsoft YaHei", system-ui, sans-serif; line-height: 1.8; color: #333; }
.pdf-page { margin-bottom: 1.5em; padding-bottom: 1em; border-bottom: 1px solid #e5e5e5; }
.pdf-page-num { font-size: 12px; color: #999; margin-bottom: 8px; }
.pdf-preview p { margin: 0.3em 0; font-size: 15px; text-indent: 0; }
.pdf-preview h3 { font-size: 16px; font-weight: bold; margin: 1em 0 0.4em; color: #222; }
.pdf-table { border-collapse: collapse; width: 100%; margin: 1em 0; font-size: 14px; }
.pdf-table th, .pdf-table td { border: 1px solid #d0d0d0; padding: 6px 10px; text-align: left; vertical-align: top; }
.pdf-table th { background: #f5f7fa; font-weight: bold; }
.pdf-table tr:nth-child(even) { background: #fafbff; }
</style>'''
if not any_text:
wrapper = (
f'{css}<div class="pdf-preview">'
'<p><em>(未能从 PDF 提取到文本,可能是扫描件或加密文档。)</em></p></div>'
)
else:
wrapper = (
f'{css}<div class="pdf-preview">'
f"{''.join(sections)}</div>"
)
return self._save_html(f"<body>{wrapper}</body>", output_path)
except Exception as e:
return f"PDF预览生成失败: {str(e)}"
def get_cell_style(self, cell, mode='xlsx', xls_book=None):
"""
获取单元格的 CSS style。支持 xlsx/xls
- 字体颜色RGB | 英文名)
- 背景色
- 下划线 + 删除线
- 上标/下标
- 加粗/斜体/字体/字号
mode: 'xlsx' or 'xls'
cell: openpyxl.cell.Cell 或 xlrd.sheet.Cell
xls_book: xlrd.book.Book仅 mode='xls' 时传
"""
styles = []
if mode == 'xlsx':
# ============ openpyxl ============
font = cell.font
fill = cell.fill
# — 字体颜色 —
if font and font.color and getattr(font.color, 'type', None) == 'rgb' and font.color.rgb:
rgb = font.color.rgb[-6:] # 取最后 6 位
styles.append(f"color: #{rgb};")
elif font and font.color and isinstance(font.color, str):
# 英文名直接映射
color_css = self._get_highlight_color(font.color)
styles.append(f"color: {color_css};")
# — 背景色 —
if fill and getattr(fill, 'patternType', None) not in (None, 'none') and getattr(fill, 'fgColor', None):
fg = fill.fgColor
if getattr(fg, 'type', None) == 'rgb' and fg.rgb:
rgb = fg.rgb[-6:]
styles.append(f"background-color: #{rgb};")
elif isinstance(fg, str):
bg_css = self._get_highlight_color(fg)
styles.append(f"background-color: {bg_css};")
# — 下划线 + 删除线 —
td = []
if font and getattr(font, 'underline', False):
td.append("underline")
if font and getattr(font, 'strike', False):
td.append("line-through")
if td:
styles.append(f"text-decoration: {' '.join(td)};")
# — 上标 / 下标 —
if font and getattr(font, 'vertAlign', None) == 'superscript':
styles.append("vertical-align: super; font-size: smaller;")
elif font and getattr(font, 'vertAlign', None) == 'subscript':
styles.append("vertical-align: sub; font-size: smaller;")
# — 加粗 / 斜体 —
if font and font.bold:
styles.append("font-weight: bold;")
if font and font.italic:
styles.append("font-style: italic;")
# — 字体 & 大小 —
if font and font.name:
styles.append(f"font-family: '{font.name}';")
if font and font.size:
styles.append(f"font-size: {font.size}pt;")
elif mode == 'xls' and xls_book is not None:
# ============ xlrd ============
# cell 必须带 .rowx, .colx, .sheet 属性
rowx, colx = getattr(cell, 'rowx', None), getattr(cell, 'colx', None)
if rowx is not None and colx is not None:
sheet = cell.sheet
xf = xls_book.xf_list[sheet.cell_xf_index(rowx, colx)]
fnt = xls_book.font_list[xf.font_index]
# — 字体颜色Index -> 英文名 -> hex
idx2name = {
0x08: 'black', 0x0A: 'white', 0x0C: 'red', 0x10: 'green',
0x14: 'blue', 0x18: 'yellow', 0x1C: 'magenta', 0x20: 'cyan'
}
name = idx2name.get(fnt.colour_index)
if name:
styles.append(f"color: {self._get_highlight_color(name)};")
# — 下划线 + 删除线 —
td = []
if getattr(fnt, 'underline_type', 0):
td.append("underline")
if getattr(fnt, 'strike_out', False):
td.append("line-through")
if td:
styles.append(f"text-decoration: {' '.join(td)};")
# — 加粗 / 斜体 —
if getattr(fnt, 'bold', False):
styles.append("font-weight: bold;")
if getattr(fnt, 'italic', False):
styles.append("font-style: italic;")
# — 字体 family —
if getattr(fnt, 'name', None):
styles.append(f"font-family: '{fnt.name}';")
# 注xlrd 无法获取字号、fill、上下标
return ''.join(styles)
def xlsx_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
try:
import openpyxl
from openpyxl.utils import get_column_letter
wb = openpyxl.load_workbook(input_path, data_only=True)
style = '''<style>
.excel-table-wrapper{overflow-x:auto;margin:1em 0;}
.excel-table{border-collapse:collapse;margin:0;table-layout:fixed;}
.excel-table td,.excel-table th{padding:6px 10px;border:1px solid #d0d0d0;white-space:nowrap;overflow:hidden;text-overflow:ellipsis;font-size:13px;vertical-align:middle;}
.excel-table tr:nth-child(even){background:#fafbff;}
</style>'''
html = []
for idx, sheet in enumerate(wb.worksheets):
html.append(f'<h3>Sheet {idx+1}: {sheet.title}</h3>')
# 计算列宽
col_widths = {}
for col_idx in range(1, sheet.max_column + 1):
col_letter = get_column_letter(col_idx)
dim = sheet.column_dimensions.get(col_letter)
if dim and dim.width and dim.width > 0:
# openpyxl width 以字符数为单位,约 7px/字符
col_widths[col_idx] = max(60, int(dim.width * 7.5))
else:
# 根据内容估算宽度
max_len = 8
for row_idx in range(1, min(sheet.max_row + 1, 50)):
cell = sheet.cell(row=row_idx, column=col_idx)
if cell.value is not None:
max_len = max(max_len, len(str(cell.value)))
col_widths[col_idx] = max(60, min(300, max_len * 9))
html.append('<div class="excel-table-wrapper">')
html.append('<table class="excel-table" border="1">')
# colgroup 设置列宽
html.append('<colgroup>')
for col_idx in range(1, sheet.max_column + 1):
w = col_widths.get(col_idx, 80)
html.append(f'<col style="width:{w}px;min-width:{w}px;">')
html.append('</colgroup>')
merged_map = {}
for r in sheet.merged_cells.ranges:
min_row, min_col, max_row, max_col = r.min_row, r.min_col, r.max_row, r.max_col
for row in range(min_row, max_row+1):
for col in range(min_col, max_col+1):
merged_map[(row, col)] = (min_row, min_col, max_row-min_row+1, max_col-min_col+1)
for row in range(1, sheet.max_row+1):
html.append('<tr>')
for col in range(1, sheet.max_column+1):
merge_info = merged_map.get((row, col))
if merge_info and (row, col) != (merge_info[0], merge_info[1]):
continue
cell = sheet.cell(row=row, column=col)
cell_value = cell.value if cell.value is not None else ""
style_str = self.get_cell_style(cell, mode='xlsx')
td_attrs = ''
if (row, col) in merged_map:
_, _, rowspan, colspan = merged_map[(row, col)]
if rowspan > 1:
td_attrs += f' rowspan="{rowspan}"'
if colspan > 1:
td_attrs += f' colspan="{colspan}"'
# 合并单元格允许换行
style_str += 'white-space:normal;word-wrap:break-word;'
html.append(f'<td{td_attrs} style="{style_str}">{cell_value}</td>')
html.append('</tr>')
html.append('</table></div>')
html_str = style + ''.join(html)
return self._save_html(f'<body>{html_str}</body>', output_path)
except Exception as e:
return f"转换失败: {str(e)}"
def xls_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
try:
import xlrd
xls = xlrd.open_workbook(input_path, formatting_info=True)
body = []
style = '''<style>.excel-table{border-collapse:collapse;width:100%;margin:1em 0;}.excel-table td,.excel-table th{padding:8px;border:1px solid #ddd;}</style>'''
for idx, sheet in enumerate(xls.sheets()):
body.append(f'<h3>Sheet {idx+1}: {sheet.name}</h3>')
body.append('<div class="table-container">')
body.append('<table class="excel-table" border="1" style="border-collapse:collapse;width:100%;margin:1em 0;">')
for row_idx in range(sheet.nrows):
body.append('<tr>')
for col_idx in range(sheet.ncols):
cell = sheet.cell(row_idx, col_idx)
# 为get_cell_style补全信息
cell.rowx = row_idx
cell.colx = col_idx
cell.sheet = sheet
cell_html = str(cell.value) if cell.value is not None else ''
style_str = self.get_cell_style(cell, mode='xls', xls_book=xls)
body.append(f'<td style="{style_str}">{cell_html}</td>')
body.append('</tr>')
body.append('</table></div>')
body_content = '\n'.join(body)
html_body = f'{style}{body_content}'
return self._save_html(f'<body>{html_body}</body>', output_path)
except Exception as e:
return f"转换失败: {str(e)}"
# 使用示例
if __name__ == "__main__":
converter = FileConverter()
try:
# 示例转换
converter.txt_to_html("input.md", "output.html")
converter.txt_to_html("input.txt", "output.html")
converter.doc_to_html("input.doc", "output.html")
converter.docx_to_html("input.docx", "output.html")
converter.docx_to_html("input.xlsx", "output.html")
converter.docx_to_html("input.xls", "output.html")
except Exception as e:
print(f"转换错误: {str(e)}")