Files

373 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
from typing import Optional
from bs4 import BeautifulSoup
from collections import defaultdict
import cssutils
from server.knowledge_base.file_converter import FileConverter
import uuid
import base64
class PdfConverter(FileConverter):
def _clean_pdf_html(self, html: str) -> str:
"""HTML后处理方法"""
soup = BeautifulSoup(html, 'html.parser')
# 处理样式表中的CSS规则
def process_rule(rule):
if rule.type == rule.MEDIA_RULE:
for nested_rule in rule:
process_rule(nested_rule)
elif rule.type == rule.STYLE_RULE:
# 移除文本选择限制属性
for prop in ['user-select', '-webkit-user-select', '-moz-user-select', '-ms-user-select']:
rule.style.removeProperty(prop)
# 原有处理逻辑保持不变
if any('#page-container-1' in selector.selectorText for selector in rule.selectorList):
rule.style.removeProperty('background-color')
rule.style.removeProperty('background-image')
if any(re.search(r'(^|[\s>+~])\.pf($|[\s\[.:>+~])', selector.selectorText)
for selector in rule.selectorList):
for prop in ['box-shadow', 'border-collapse']:
for _ in range(3):
if rule.style.removeProperty(prop):
break
# 处理内联样式
def clean_inline_styles(tag):
if tag.has_attr('style'):
style = cssutils.parseStyle(tag['style'])
# 移除文本选择限制属性
for prop in ['user-select', '-webkit-user-select', '-moz-user-select', '-ms-user-select']:
style.removeProperty(prop)
# 原有处理逻辑保持不变
if tag.get('id') == 'page-container-1':
style.removeProperty('background-color')
style.removeProperty('background-image')
if 'pf' in tag.get('class', []):
style.removeProperty('box-shadow')
style.removeProperty('border-collapse')
tag['style'] = style.cssText.replace('\n', ' ').strip()
if not tag['style']:
del tag['style']
# 清理空的和仅含空格的span标签
for span in soup.find_all('span'):
# 判断是否包含可见内容
if not span.text.strip():
span.decompose()
else:
# 清理内部的空白字符
if span.string and span.string.isspace():
span.string.replace_with(' ')
# 处理包含多个空白文本节点的情况
elif all(isinstance(c, str) and c.isspace() for c in span.contents):
span.replace_with(' ')
# 原有处理流程
for style_tag in soup.find_all('style'):
if style_tag.string:
try:
sheet = cssutils.parseString(style_tag.string)
for rule in sheet:
process_rule(rule)
style_tag.string = sheet.cssText.decode('utf-8')\
.replace('\\n', '\n')\
.replace(' !important', '!important')
except Exception as e:
print(f"CSS处理错误: {str(e)}")
continue
for container in soup.select('#page-container-1'):
clean_inline_styles(container)
for pf_element in soup.select('.pf'):
clean_inline_styles(pf_element)
content = str(soup)
content = self._add_pdf_element_ids(content)
if hasattr(self, 'page_container_id') and self.page_container_id:
new_id = self.page_container_id
head_pattern = re.compile(
r'(<head[^>]*>)(.*?)(</head>)',
re.DOTALL | re.IGNORECASE
)
def replace_head(match):
head_content = match.group(2)
head_content = re.sub(
r'(id\s*=\s*["\']?)page-container(["\'\]>])',
f'\\g<1>{new_id}\\g<2>',
head_content
)
head_content = re.sub(
r'(#[^{\s>]+?{.*?)(\bbackground-(color|image)\s*:[^;]+;?)',
lambda m: m.group(1) if m.group(2) else m.group(0),
head_content,
flags=re.DOTALL|re.IGNORECASE
)
return f"{match.group(1)}{head_content}{match.group(3)}"
content = head_pattern.sub(replace_head, content)
content = re.sub(
r'<script\b[^>]*>[\s\S]*?</script>',
'',
content,
flags=re.IGNORECASE
)
return content.strip()
def _add_pdf_element_ids(self, content: str) -> str:
"""为元素添加唯一ID"""
counters = defaultdict(int)
self.page_container_id = None # 重置ID记录
def replace_tag(match):
tag = match.group(1).lower()
attrs = match.group(2)
# 处理page-container的特殊逻辑
if tag == "div":
id_match = re.search(
r'\bid\s*=\s*["\']page-container["\']',
attrs,
flags=re.IGNORECASE
)
if id_match:
# 生成唯一ID并记录
if not self.page_container_id:
counters['page-container'] += 1
self.page_container_id = f"page-container-{counters['page-container']}"
# 保留其他属性
clean_attrs = re.sub(r'\s+id="[^"]*"', '', attrs)
return f'<div id="{self.page_container_id}"{clean_attrs}>'
# 常规标签处理
counters[tag] += 1
clean_attrs = re.sub(r'\s+id="[^"]*"', '', attrs)
return f'<{tag} id="{tag}-{counters[tag]}"{clean_attrs}>'
# 处理所有目标标签
return re.sub(
r'<(h[1-6]|p|div|span)(\b[^>]*)>',
replace_tag,
content,
flags=re.IGNORECASE
)
def _save_pdf_html(self, content: str, output_path: Optional[str] = None) -> str:
"""统一保存方法"""
cleaned = self._clean_pdf_html(content)
# cleaned = self._add_pdf_element_ids(content)
if output_path:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(cleaned)
return cleaned
# def pdf_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
# """PDF转换方法"""
# cmd = [
# 'pdf2htmlEX',
# '--zoom', '1.2', # 放大
# '--split-pages', '0', # 保持整体布局
# # '--embed-css', '0', # 避免内联样式冲突
# # '--embed-image', '0', # 避免内联图片冲突
# # '--optimize-text', '1', # 优化文本渲染
# input_path
# ]
# result = subprocess.run(
# 'cd /data3/pdffiles && ' + ' '.join(cmd),
# shell=True,
# stdout=subprocess.PIPE,
# stderr=subprocess.STDOUT
# )
# print(f"转换状态: {result.returncode}\n输出: {result.stdout.decode()[:200]}")
# # 准备文件名
# file_name = os.path.basename(input_path)[:-3] + "html"
# html_path = f"/data3/pdffiles/{file_name}"
# if not os.path.exists(html_path):
# return f"{file_name} 转换失败"
# # 读取并处理HTML内容
# with open(html_path, 'r', encoding='utf-8') as file:
# soup = BeautifulSoup(file, 'html.parser')
# # 移除注释
# for comment in soup.find_all(string=lambda text: isinstance(text, str) and "Created by pdf2htmlEX" in text):
# comment.extract()
# # 移除loading-indicator
# for div in soup.find_all('div', class_='loading-indicator'):
# div.decompose()
# # 移除所有包含sidebar的div
# for div in soup.find_all('div', id=lambda x: x and 'sidebar' in x.lower()):
# div.decompose()
# # 转换为字符串并处理base64
# html_content = str(soup)
# # 清理临时文件
# os.remove(html_path)
# # 处理base64图片
# html_content = self.read_and_replace_base64(
# html_content,
# output_dir={GENERATED_IMAGES_BASE_PATH}
# )
# return f"{self._save_pdf_html(html_content, output_path)}"
def pdf_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
"""PDF 预览:与基类一致(本机 PyMuPDF 抽文本)。如需后处理可在此包装 super() 结果。"""
return super().pdf_to_html(input_path, output_path)
def read_and_replace_base64(self,html_content, output_dir):
image_index = 0 # 用于生成唯一的文件名
def replace_base64(match):
nonlocal image_index
base64_data = match.group(0)
# 保存 Base64 图片并获取文件路径
# 提取文件类型和实际的 Base64 数据
header, data = base64_data.split(',', 1)
file_extension = header.split(';')[0].split('/')[1] # 获取文件扩展名
file_name = f'image_{uuid.uuid1()}_{image_index}.{file_extension}' # 生成文件名
file_path = os.path.join(output_dir, file_name)
# 将 Base64 数据解码并保存为文件
with open(file_path, 'wb') as image_file:
image_file.write(base64.b64decode(data))
image_index += 1
# 返回文件的 URL
return f"http://127.0.0.1:8099/chat_web_backend/get-image?file_name={os.path.basename(file_path)}"
# 使用正则表达式匹配 Base64 字符串
base64_pattern = r'data:image/(png|jpg|jpeg);base64,[A-Za-z0-9+/=]+'
# base64_pattern = r'data:image/(png|jpg|jpeg);base64,[A-Za-z0-9+/=]+|data:application/font-woff;base64,[A-Za-z0-9+/=]+'
updated_html_content = re.sub(base64_pattern, replace_base64, html_content)
return updated_html_content
# def pdf_to_html(self, input_path: str, output_path: Optional[str] = None) -> str:
# """PDF转换方法"""
# try:
# doc = fitz.open(input_path)
# page_width = doc[0].rect.width
# page_height = doc[0].rect.height
# border_radius = 5
# html = ['<style>','pre { background-color: #2d2d2d;color: #f8f8f2; padding: 10px;margin: 0;width: 80%;box-sizing: border-box;border-radius: 0px;}', '</style>', '<body style="position: relative;">']
# image_save_path = '{GENERATED_IMAGES_BASE_PATH}'
# pic_num =0
# # 确保图片保存路径存在
# os.makedirs(image_save_path, exist_ok=True)
# for page in doc:
# blocks = page.get_text("dict")["blocks"]
# sorted_blocks = sorted(blocks, key=lambda b: (b["bbox"][1], b["bbox"][0])) # 按y坐标和x坐标排序
# for block in sorted_blocks:
# if "image" in block:
# pic_num += 1
# bbox = block["bbox"]
# image_bytes = block["image"]
# image_ext = block["ext"]
# image_name = f'image_{page.number}_{pic_num}.{image_ext}'
# image_url = f'http://127.0.0.1:8099/chat_web_backend/get-image?file_name={image_name}'
# image_path = os.path.join(image_save_path, image_name)
# # 保存图片到指定路径
# with open(image_path, 'wb') as img_file:
# img_file.write(image_bytes)
# percent_left = (bbox[0]) / page_width * 100
# # 获取页面的宽度和高度
# container_width = page_width # 页面宽度
# container_height = page_height # 页面高度
# # 计算图像的宽度和高度
# img_width = bbox[2] - bbox[0] # 计算宽度
# img_height = bbox[3] - bbox[1] # 计算高度
# # 计算百分比
# width_percent = (img_width / container_width) * 100
# height_percent = (img_height / container_height) * 100
# html.append(f'<div style="width: {width_percent}%; height: {height_percent}%; margin-left: {percent_left}%;clear: both;overflow: auto;"><img src="{image_url}" alt="Image {pic_num}" style="max-width: 100%; height: auto;display: block;"/></div>')
# if "lines" in block:
# text_nums = 0
# for line in block["lines"]:
# is_code_block =any(span["font"].startswith(("Courier", "NSimSun")) for span in line["spans"]) # 假设代码使用Courier字体
# if is_code_block:
# html.append(f"<pre>")
# for span in line["spans"]:
# text_nums += 1
# bbox = span["bbox"]
# text = span["text"]
# font = span["font"] # 字体
# size = span["size"] # 字体大小
# color = span["color"] # 字体颜色
# # 动态生成CSS样式
# css_style = f'font-family: {font}; font-size: {size}px; color: #{color:06x};'
# percent_left = (bbox[0]) / page_width * 100
# # 根据字体大小判断标题
# if size > 20: # 假设大于20的字体为标题
# if text_nums == 1:
# html.append(f'<h2 style="{css_style};display: inline; margin-left: {percent_left}%; ">{text.strip()}</h2>')
# else:
# html.append(f'<h3 style="{css_style};display: inline;">{text.strip()}</h3>')
# else:
# if text_nums == 1:
# html.append(f'<p style="{css_style};display: inline; margin-left: {percent_left}%; ">{text.strip()}</p>')
# else:
# html.append(f'<p style="{css_style};display: inline; ">{text.strip()}</p>')
# if is_code_block or size<=20:
# if is_code_block:
# html.append("</pre>")
# else:
# html.append("<br>")
# else:
# html.append('<br>')
# # html.append('<br>')
# html.append('</body>')
# # 将HTML内容保存到指定路径
# html_content = ''.join(html)
# if output_path:
# with open(output_path, 'w', encoding='utf-8') as file:
# file.write(html_content)
# else:
# # 如果没有指定路径使用默认路径或返回HTML内容
# output_path = 'output.html'
# with open(output_path, 'w', encoding='utf-8') as file:
# file.write(html_content)
# return output_path
# except Exception as e:
# raise RuntimeError(f"PDF转换失败: {str(e)}")
# def replace_base64_with_url(self,html_content, output_dir):
# image_index = 0 # 用于生成唯一的文件名
# def replace_base64(match):
# nonlocal image_index
# base64_data = match.group(0)
# # 保存 Base64 图片并获取文件路径
# # 提取文件类型和实际的 Base64 数据
# header, data = base64_data.split(',', 1)
# file_extension = header.split(';')[0].split('/')[1] # 获取文件扩展名
# file_name = f'image_{uuid.uuid1()}_{image_index}.{file_extension}' # 生成文件名
# file_path = os.path.join(output_dir, file_name)
# # 将 Base64 数据解码并保存为文件
# with open(file_path, 'wb') as image_file:
# image_file.write(base64.b64decode(data))
# image_index += 1
# # 返回文件的 URL
# return f"http://127.0.0.1:8099/chat_web_backend/get-image?file_name={os.path.basename(file_path)}"
# # 使用正则表达式匹配 Base64 字符串
# base64_pattern = r'data:image/(png|jpg|jpeg);base64,[A-Za-z0-9+/=]+'
# updated_html_content = re.sub(base64_pattern, replace_base64, html_content)
# return updated_html_content