373 lines
17 KiB
Python
373 lines
17 KiB
Python
import os
|
||
import re
|
||
from typing import Optional
|
||
from bs4 import BeautifulSoup
|
||
from collections import defaultdict
|
||
import cssutils
|
||
from server.knowledge_base.file_converter import FileConverter
|
||
import uuid
|
||
import base64
|
||
|
||
class PdfConverter(FileConverter):
|
||
def _clean_pdf_html(self, html: str) -> str:
|
||
"""HTML后处理方法"""
|
||
soup = BeautifulSoup(html, 'html.parser')
|
||
|
||
# 处理样式表中的CSS规则
|
||
def process_rule(rule):
|
||
if rule.type == rule.MEDIA_RULE:
|
||
for nested_rule in rule:
|
||
process_rule(nested_rule)
|
||
elif rule.type == rule.STYLE_RULE:
|
||
# 移除文本选择限制属性
|
||
for prop in ['user-select', '-webkit-user-select', '-moz-user-select', '-ms-user-select']:
|
||
rule.style.removeProperty(prop)
|
||
|
||
# 原有处理逻辑保持不变
|
||
if any('#page-container-1' in selector.selectorText for selector in rule.selectorList):
|
||
rule.style.removeProperty('background-color')
|
||
rule.style.removeProperty('background-image')
|
||
|
||
if any(re.search(r'(^|[\s>+~])\.pf($|[\s\[.:>+~])', selector.selectorText)
|
||
for selector in rule.selectorList):
|
||
for prop in ['box-shadow', 'border-collapse']:
|
||
for _ in range(3):
|
||
if rule.style.removeProperty(prop):
|
||
break
|
||
|
||
# 处理内联样式
|
||
def clean_inline_styles(tag):
|
||
if tag.has_attr('style'):
|
||
style = cssutils.parseStyle(tag['style'])
|
||
# 移除文本选择限制属性
|
||
for prop in ['user-select', '-webkit-user-select', '-moz-user-select', '-ms-user-select']:
|
||
style.removeProperty(prop)
|
||
# 原有处理逻辑保持不变
|
||
if tag.get('id') == 'page-container-1':
|
||
style.removeProperty('background-color')
|
||
style.removeProperty('background-image')
|
||
if 'pf' in tag.get('class', []):
|
||
style.removeProperty('box-shadow')
|
||
style.removeProperty('border-collapse')
|
||
tag['style'] = style.cssText.replace('\n', ' ').strip()
|
||
if not tag['style']:
|
||
del tag['style']
|
||
|
||
# 清理空的和仅含空格的span标签
|
||
for span in soup.find_all('span'):
|
||
# 判断是否包含可见内容
|
||
if not span.text.strip():
|
||
span.decompose()
|
||
else:
|
||
# 清理内部的空白字符
|
||
if span.string and span.string.isspace():
|
||
span.string.replace_with(' ')
|
||
# 处理包含多个空白文本节点的情况
|
||
elif all(isinstance(c, str) and c.isspace() for c in span.contents):
|
||
span.replace_with(' ')
|
||
|
||
# 原有处理流程
|
||
for style_tag in soup.find_all('style'):
|
||
if style_tag.string:
|
||
try:
|
||
sheet = cssutils.parseString(style_tag.string)
|
||
for rule in sheet:
|
||
process_rule(rule)
|
||
style_tag.string = sheet.cssText.decode('utf-8')\
|
||
.replace('\\n', '\n')\
|
||
.replace(' !important', '!important')
|
||
except Exception as e:
|
||
print(f"CSS处理错误: {str(e)}")
|
||
continue
|
||
|
||
for container in soup.select('#page-container-1'):
|
||
clean_inline_styles(container)
|
||
for pf_element in soup.select('.pf'):
|
||
clean_inline_styles(pf_element)
|
||
|
||
content = str(soup)
|
||
content = self._add_pdf_element_ids(content)
|
||
|
||
if hasattr(self, 'page_container_id') and self.page_container_id:
|
||
new_id = self.page_container_id
|
||
head_pattern = re.compile(
|
||
r'(<head[^>]*>)(.*?)(</head>)',
|
||
re.DOTALL | re.IGNORECASE
|
||
)
|
||
def replace_head(match):
|
||
head_content = match.group(2)
|
||
head_content = re.sub(
|
||
r'(id\s*=\s*["\']?)page-container(["\'\]>])',
|
||
f'\\g<1>{new_id}\\g<2>',
|
||
head_content
|
||
)
|
||
head_content = re.sub(
|
||
r'(#[^{\s>]+?{.*?)(\bbackground-(color|image)\s*:[^;]+;?)',
|
||
lambda m: m.group(1) if m.group(2) else m.group(0),
|
||
head_content,
|
||
flags=re.DOTALL|re.IGNORECASE
|
||
)
|
||
return f"{match.group(1)}{head_content}{match.group(3)}"
|
||
|
||
content = head_pattern.sub(replace_head, content)
|
||
|
||
content = re.sub(
|
||
r'<script\b[^>]*>[\s\S]*?</script>',
|
||
'',
|
||
content,
|
||
flags=re.IGNORECASE
|
||
)
|
||
|
||
return content.strip()
|
||
|
||
def _add_pdf_element_ids(self, content: str) -> str:
|
||
"""为元素添加唯一ID"""
|
||
counters = defaultdict(int)
|
||
self.page_container_id = None # 重置ID记录
|
||
|
||
def replace_tag(match):
|
||
tag = match.group(1).lower()
|
||
attrs = match.group(2)
|
||
|
||
# 处理page-container的特殊逻辑
|
||
if tag == "div":
|
||
id_match = re.search(
|
||
r'\bid\s*=\s*["\']page-container["\']',
|
||
attrs,
|
||
flags=re.IGNORECASE
|
||
)
|
||
if id_match:
|
||
# 生成唯一ID并记录
|
||
if not self.page_container_id:
|
||
counters['page-container'] += 1
|
||
self.page_container_id = f"page-container-{counters['page-container']}"
|
||
# 保留其他属性
|
||
clean_attrs = re.sub(r'\s+id="[^"]*"', '', attrs)
|
||
return f'<div id="{self.page_container_id}"{clean_attrs}>'
|
||
|
||
# 常规标签处理
|
||
counters[tag] += 1
|
||
clean_attrs = re.sub(r'\s+id="[^"]*"', '', attrs)
|
||
return f'<{tag} id="{tag}-{counters[tag]}"{clean_attrs}>'
|
||
|
||
# 处理所有目标标签
|
||
return re.sub(
|
||
r'<(h[1-6]|p|div|span)(\b[^>]*)>',
|
||
replace_tag,
|
||
content,
|
||
flags=re.IGNORECASE
|
||
)
|
||
def _save_pdf_html(self, content: str, output_path: Optional[str] = None) -> str:
|
||
"""统一保存方法"""
|
||
cleaned = self._clean_pdf_html(content)
|
||
# cleaned = self._add_pdf_element_ids(content)
|
||
if output_path:
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
f.write(cleaned)
|
||
return cleaned
|
||
|
||
# def pdf_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
|
||
# """PDF转换方法"""
|
||
# cmd = [
|
||
# 'pdf2htmlEX',
|
||
# '--zoom', '1.2', # 放大
|
||
# '--split-pages', '0', # 保持整体布局
|
||
# # '--embed-css', '0', # 避免内联样式冲突
|
||
# # '--embed-image', '0', # 避免内联图片冲突
|
||
# # '--optimize-text', '1', # 优化文本渲染
|
||
# input_path
|
||
# ]
|
||
# result = subprocess.run(
|
||
# 'cd /data3/pdffiles && ' + ' '.join(cmd),
|
||
# shell=True,
|
||
# stdout=subprocess.PIPE,
|
||
# stderr=subprocess.STDOUT
|
||
# )
|
||
# print(f"转换状态: {result.returncode}\n输出: {result.stdout.decode()[:200]}")
|
||
|
||
# # 准备文件名
|
||
# file_name = os.path.basename(input_path)[:-3] + "html"
|
||
# html_path = f"/data3/pdffiles/{file_name}"
|
||
|
||
# if not os.path.exists(html_path):
|
||
# return f"{file_name} 转换失败"
|
||
|
||
# # 读取并处理HTML内容
|
||
# with open(html_path, 'r', encoding='utf-8') as file:
|
||
# soup = BeautifulSoup(file, 'html.parser')
|
||
|
||
# # 移除注释
|
||
# for comment in soup.find_all(string=lambda text: isinstance(text, str) and "Created by pdf2htmlEX" in text):
|
||
# comment.extract()
|
||
|
||
# # 移除loading-indicator
|
||
# for div in soup.find_all('div', class_='loading-indicator'):
|
||
# div.decompose()
|
||
|
||
# # 移除所有包含sidebar的div
|
||
# for div in soup.find_all('div', id=lambda x: x and 'sidebar' in x.lower()):
|
||
# div.decompose()
|
||
|
||
# # 转换为字符串并处理base64
|
||
# html_content = str(soup)
|
||
|
||
# # 清理临时文件
|
||
# os.remove(html_path)
|
||
|
||
# # 处理base64图片
|
||
# html_content = self.read_and_replace_base64(
|
||
# html_content,
|
||
# output_dir={GENERATED_IMAGES_BASE_PATH}
|
||
# )
|
||
|
||
# return f"{self._save_pdf_html(html_content, output_path)}"
|
||
|
||
def pdf_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
|
||
"""PDF 预览:与基类一致(本机 PyMuPDF 抽文本)。如需后处理可在此包装 super() 结果。"""
|
||
return super().pdf_to_html(input_path, output_path)
|
||
|
||
def read_and_replace_base64(self,html_content, output_dir):
|
||
image_index = 0 # 用于生成唯一的文件名
|
||
def replace_base64(match):
|
||
nonlocal image_index
|
||
base64_data = match.group(0)
|
||
# 保存 Base64 图片并获取文件路径
|
||
# 提取文件类型和实际的 Base64 数据
|
||
header, data = base64_data.split(',', 1)
|
||
file_extension = header.split(';')[0].split('/')[1] # 获取文件扩展名
|
||
file_name = f'image_{uuid.uuid1()}_{image_index}.{file_extension}' # 生成文件名
|
||
file_path = os.path.join(output_dir, file_name)
|
||
|
||
# 将 Base64 数据解码并保存为文件
|
||
with open(file_path, 'wb') as image_file:
|
||
image_file.write(base64.b64decode(data))
|
||
image_index += 1
|
||
# 返回文件的 URL
|
||
return f"http://127.0.0.1:8099/chat_web_backend/get-image?file_name={os.path.basename(file_path)}"
|
||
|
||
# 使用正则表达式匹配 Base64 字符串
|
||
base64_pattern = r'data:image/(png|jpg|jpeg);base64,[A-Za-z0-9+/=]+'
|
||
# base64_pattern = r'data:image/(png|jpg|jpeg);base64,[A-Za-z0-9+/=]+|data:application/font-woff;base64,[A-Za-z0-9+/=]+'
|
||
updated_html_content = re.sub(base64_pattern, replace_base64, html_content)
|
||
return updated_html_content
|
||
|
||
# def pdf_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
|
||
# """PDF转换方法"""
|
||
# try:
|
||
# doc = fitz.open(input_path)
|
||
# page_width = doc[0].rect.width
|
||
# page_height = doc[0].rect.height
|
||
# border_radius = 5
|
||
# html = ['<style>','pre { background-color: #2d2d2d;color: #f8f8f2; padding: 10px;margin: 0;width: 80%;box-sizing: border-box;border-radius: 0px;}', '</style>', '<body style="position: relative;">']
|
||
# image_save_path = '{GENERATED_IMAGES_BASE_PATH}'
|
||
# pic_num =0
|
||
# # 确保图片保存路径存在
|
||
# os.makedirs(image_save_path, exist_ok=True)
|
||
|
||
# for page in doc:
|
||
# blocks = page.get_text("dict")["blocks"]
|
||
# sorted_blocks = sorted(blocks, key=lambda b: (b["bbox"][1], b["bbox"][0])) # 按y坐标和x坐标排序
|
||
|
||
# for block in sorted_blocks:
|
||
# if "image" in block:
|
||
# pic_num += 1
|
||
# bbox = block["bbox"]
|
||
# image_bytes = block["image"]
|
||
# image_ext = block["ext"]
|
||
# image_name = f'image_{page.number}_{pic_num}.{image_ext}'
|
||
# image_url = f'http://127.0.0.1:8099/chat_web_backend/get-image?file_name={image_name}'
|
||
# image_path = os.path.join(image_save_path, image_name)
|
||
# # 保存图片到指定路径
|
||
# with open(image_path, 'wb') as img_file:
|
||
# img_file.write(image_bytes)
|
||
# percent_left = (bbox[0]) / page_width * 100
|
||
# # 获取页面的宽度和高度
|
||
# container_width = page_width # 页面宽度
|
||
# container_height = page_height # 页面高度
|
||
|
||
# # 计算图像的宽度和高度
|
||
# img_width = bbox[2] - bbox[0] # 计算宽度
|
||
# img_height = bbox[3] - bbox[1] # 计算高度
|
||
|
||
# # 计算百分比
|
||
# width_percent = (img_width / container_width) * 100
|
||
# height_percent = (img_height / container_height) * 100
|
||
# html.append(f'<div style="width: {width_percent}%; height: {height_percent}%; margin-left: {percent_left}%;clear: both;overflow: auto;"><img src="{image_url}" alt="Image {pic_num}" style="max-width: 100%; height: auto;display: block;"/></div>')
|
||
# if "lines" in block:
|
||
# text_nums = 0
|
||
# for line in block["lines"]:
|
||
# is_code_block =any(span["font"].startswith(("Courier", "NSimSun")) for span in line["spans"]) # 假设代码使用Courier字体
|
||
# if is_code_block:
|
||
# html.append(f"<pre>")
|
||
# for span in line["spans"]:
|
||
# text_nums += 1
|
||
# bbox = span["bbox"]
|
||
# text = span["text"]
|
||
# font = span["font"] # 字体
|
||
# size = span["size"] # 字体大小
|
||
# color = span["color"] # 字体颜色
|
||
|
||
# # 动态生成CSS样式
|
||
# css_style = f'font-family: {font}; font-size: {size}px; color: #{color:06x};'
|
||
# percent_left = (bbox[0]) / page_width * 100
|
||
# # 根据字体大小判断标题
|
||
# if size > 20: # 假设大于20的字体为标题
|
||
# if text_nums == 1:
|
||
# html.append(f'<h2 style="{css_style};display: inline; margin-left: {percent_left}%; ">{text.strip()}</h2>')
|
||
# else:
|
||
# html.append(f'<h3 style="{css_style};display: inline;">{text.strip()}</h3>')
|
||
# else:
|
||
# if text_nums == 1:
|
||
# html.append(f'<p style="{css_style};display: inline; margin-left: {percent_left}%; ">{text.strip()}</p>')
|
||
# else:
|
||
# html.append(f'<p style="{css_style};display: inline; ">{text.strip()}</p>')
|
||
|
||
# if is_code_block or size<=20:
|
||
# if is_code_block:
|
||
# html.append("</pre>")
|
||
# else:
|
||
# html.append("<br>")
|
||
# else:
|
||
# html.append('<br>')
|
||
# # html.append('<br>')
|
||
|
||
# html.append('</body>')
|
||
|
||
# # 将HTML内容保存到指定路径
|
||
# html_content = ''.join(html)
|
||
# if output_path:
|
||
# with open(output_path, 'w', encoding='utf-8') as file:
|
||
# file.write(html_content)
|
||
# else:
|
||
# # 如果没有指定路径,使用默认路径或返回HTML内容
|
||
# output_path = 'output.html'
|
||
# with open(output_path, 'w', encoding='utf-8') as file:
|
||
# file.write(html_content)
|
||
|
||
# return output_path
|
||
# except Exception as e:
|
||
# raise RuntimeError(f"PDF转换失败: {str(e)}")
|
||
# def replace_base64_with_url(self,html_content, output_dir):
|
||
# image_index = 0 # 用于生成唯一的文件名
|
||
# def replace_base64(match):
|
||
# nonlocal image_index
|
||
# base64_data = match.group(0)
|
||
# # 保存 Base64 图片并获取文件路径
|
||
# # 提取文件类型和实际的 Base64 数据
|
||
# header, data = base64_data.split(',', 1)
|
||
# file_extension = header.split(';')[0].split('/')[1] # 获取文件扩展名
|
||
# file_name = f'image_{uuid.uuid1()}_{image_index}.{file_extension}' # 生成文件名
|
||
# file_path = os.path.join(output_dir, file_name)
|
||
|
||
# # 将 Base64 数据解码并保存为文件
|
||
# with open(file_path, 'wb') as image_file:
|
||
# image_file.write(base64.b64decode(data))
|
||
# image_index += 1
|
||
# # 返回文件的 URL
|
||
# return f"http://127.0.0.1:8099/chat_web_backend/get-image?file_name={os.path.basename(file_path)}"
|
||
|
||
# # 使用正则表达式匹配 Base64 字符串
|
||
# base64_pattern = r'data:image/(png|jpg|jpeg);base64,[A-Za-z0-9+/=]+'
|
||
# updated_html_content = re.sub(base64_pattern, replace_base64, html_content)
|
||
# return updated_html_content
|