import os import re from typing import Optional from bs4 import BeautifulSoup from collections import defaultdict import cssutils from server.knowledge_base.file_converter import FileConverter import uuid import base64 class PdfConverter(FileConverter): def _clean_pdf_html(self, html: str) -> str: """HTML后处理方法""" soup = BeautifulSoup(html, 'html.parser') # 处理样式表中的CSS规则 def process_rule(rule): if rule.type == rule.MEDIA_RULE: for nested_rule in rule: process_rule(nested_rule) elif rule.type == rule.STYLE_RULE: # 移除文本选择限制属性 for prop in ['user-select', '-webkit-user-select', '-moz-user-select', '-ms-user-select']: rule.style.removeProperty(prop) # 原有处理逻辑保持不变 if any('#page-container-1' in selector.selectorText for selector in rule.selectorList): rule.style.removeProperty('background-color') rule.style.removeProperty('background-image') if any(re.search(r'(^|[\s>+~])\.pf($|[\s\[.:>+~])', selector.selectorText) for selector in rule.selectorList): for prop in ['box-shadow', 'border-collapse']: for _ in range(3): if rule.style.removeProperty(prop): break # 处理内联样式 def clean_inline_styles(tag): if tag.has_attr('style'): style = cssutils.parseStyle(tag['style']) # 移除文本选择限制属性 for prop in ['user-select', '-webkit-user-select', '-moz-user-select', '-ms-user-select']: style.removeProperty(prop) # 原有处理逻辑保持不变 if tag.get('id') == 'page-container-1': style.removeProperty('background-color') style.removeProperty('background-image') if 'pf' in tag.get('class', []): style.removeProperty('box-shadow') style.removeProperty('border-collapse') tag['style'] = style.cssText.replace('\n', ' ').strip() if not tag['style']: del tag['style'] # 清理空的和仅含空格的span标签 for span in soup.find_all('span'): # 判断是否包含可见内容 if not span.text.strip(): span.decompose() else: # 清理内部的空白字符 if span.string and span.string.isspace(): span.string.replace_with(' ') # 处理包含多个空白文本节点的情况 elif all(isinstance(c, str) and c.isspace() for c in span.contents): span.replace_with(' ') # 原有处理流程 for style_tag in soup.find_all('style'): if style_tag.string: try: sheet = cssutils.parseString(style_tag.string) for rule in sheet: process_rule(rule) style_tag.string = sheet.cssText.decode('utf-8')\ .replace('\\n', '\n')\ .replace(' !important', '!important') except Exception as e: print(f"CSS处理错误: {str(e)}") continue for container in soup.select('#page-container-1'): clean_inline_styles(container) for pf_element in soup.select('.pf'): clean_inline_styles(pf_element) content = str(soup) content = self._add_pdf_element_ids(content) if hasattr(self, 'page_container_id') and self.page_container_id: new_id = self.page_container_id head_pattern = re.compile( r'(]*>)(.*?)()', re.DOTALL | re.IGNORECASE ) def replace_head(match): head_content = match.group(2) head_content = re.sub( r'(id\s*=\s*["\']?)page-container(["\'\]>])', f'\\g<1>{new_id}\\g<2>', head_content ) head_content = re.sub( r'(#[^{\s>]+?{.*?)(\bbackground-(color|image)\s*:[^;]+;?)', lambda m: m.group(1) if m.group(2) else m.group(0), head_content, flags=re.DOTALL|re.IGNORECASE ) return f"{match.group(1)}{head_content}{match.group(3)}" content = head_pattern.sub(replace_head, content) content = re.sub( r']*>[\s\S]*?', '', content, flags=re.IGNORECASE ) return content.strip() def _add_pdf_element_ids(self, content: str) -> str: """为元素添加唯一ID""" counters = defaultdict(int) self.page_container_id = None # 重置ID记录 def replace_tag(match): tag = match.group(1).lower() attrs = match.group(2) # 处理page-container的特殊逻辑 if tag == "div": id_match = re.search( r'\bid\s*=\s*["\']page-container["\']', attrs, flags=re.IGNORECASE ) if id_match: # 生成唯一ID并记录 if not self.page_container_id: counters['page-container'] += 1 self.page_container_id = f"page-container-{counters['page-container']}" # 保留其他属性 clean_attrs = re.sub(r'\s+id="[^"]*"', '', attrs) return f'
' # 常规标签处理 counters[tag] += 1 clean_attrs = re.sub(r'\s+id="[^"]*"', '', attrs) return f'<{tag} id="{tag}-{counters[tag]}"{clean_attrs}>' # 处理所有目标标签 return re.sub( r'<(h[1-6]|p|div|span)(\b[^>]*)>', replace_tag, content, flags=re.IGNORECASE ) def _save_pdf_html(self, content: str, output_path: Optional[str] = None) -> str: """统一保存方法""" cleaned = self._clean_pdf_html(content) # cleaned = self._add_pdf_element_ids(content) if output_path: with open(output_path, 'w', encoding='utf-8') as f: f.write(cleaned) return cleaned # def pdf_to_html(self, input_path: str, output_path: Optional[str] = None) -> str: # """PDF转换方法""" # cmd = [ # 'pdf2htmlEX', # '--zoom', '1.2', # 放大 # '--split-pages', '0', # 保持整体布局 # # '--embed-css', '0', # 避免内联样式冲突 # # '--embed-image', '0', # 避免内联图片冲突 # # '--optimize-text', '1', # 优化文本渲染 # input_path # ] # result = subprocess.run( # 'cd /data3/pdffiles && ' + ' '.join(cmd), # shell=True, # stdout=subprocess.PIPE, # stderr=subprocess.STDOUT # ) # print(f"转换状态: {result.returncode}\n输出: {result.stdout.decode()[:200]}") # # 准备文件名 # file_name = os.path.basename(input_path)[:-3] + "html" # html_path = f"/data3/pdffiles/{file_name}" # if not os.path.exists(html_path): # return f"{file_name} 转换失败" # # 读取并处理HTML内容 # with open(html_path, 'r', encoding='utf-8') as file: # soup = BeautifulSoup(file, 'html.parser') # # 移除注释 # for comment in soup.find_all(string=lambda text: isinstance(text, str) and "Created by pdf2htmlEX" in text): # comment.extract() # # 移除loading-indicator # for div in soup.find_all('div', class_='loading-indicator'): # div.decompose() # # 移除所有包含sidebar的div # for div in soup.find_all('div', id=lambda x: x and 'sidebar' in x.lower()): # div.decompose() # # 转换为字符串并处理base64 # html_content = str(soup) # # 清理临时文件 # os.remove(html_path) # # 处理base64图片 # html_content = self.read_and_replace_base64( # html_content, # output_dir={GENERATED_IMAGES_BASE_PATH} # ) # return f"{self._save_pdf_html(html_content, output_path)}" def pdf_to_html(self, input_path: str, output_path: Optional[str] = None) -> str: """PDF 预览:与基类一致(本机 PyMuPDF 抽文本)。如需后处理可在此包装 super() 结果。""" return super().pdf_to_html(input_path, output_path) def read_and_replace_base64(self,html_content, output_dir): image_index = 0 # 用于生成唯一的文件名 def replace_base64(match): nonlocal image_index base64_data = match.group(0) # 保存 Base64 图片并获取文件路径 # 提取文件类型和实际的 Base64 数据 header, data = base64_data.split(',', 1) file_extension = header.split(';')[0].split('/')[1] # 获取文件扩展名 file_name = f'image_{uuid.uuid1()}_{image_index}.{file_extension}' # 生成文件名 file_path = os.path.join(output_dir, file_name) # 将 Base64 数据解码并保存为文件 with open(file_path, 'wb') as image_file: image_file.write(base64.b64decode(data)) image_index += 1 # 返回文件的 URL return f"http://127.0.0.1:8099/chat_web_backend/get-image?file_name={os.path.basename(file_path)}" # 使用正则表达式匹配 Base64 字符串 base64_pattern = r'data:image/(png|jpg|jpeg);base64,[A-Za-z0-9+/=]+' # base64_pattern = r'data:image/(png|jpg|jpeg);base64,[A-Za-z0-9+/=]+|data:application/font-woff;base64,[A-Za-z0-9+/=]+' updated_html_content = re.sub(base64_pattern, replace_base64, html_content) return updated_html_content # def pdf_to_html(self, input_path: str, output_path: Optional[str] = None) -> str: # """PDF转换方法""" # try: # doc = fitz.open(input_path) # page_width = doc[0].rect.width # page_height = doc[0].rect.height # border_radius = 5 # html = ['', ''] # image_save_path = '{GENERATED_IMAGES_BASE_PATH}' # pic_num =0 # # 确保图片保存路径存在 # os.makedirs(image_save_path, exist_ok=True) # for page in doc: # blocks = page.get_text("dict")["blocks"] # sorted_blocks = sorted(blocks, key=lambda b: (b["bbox"][1], b["bbox"][0])) # 按y坐标和x坐标排序 # for block in sorted_blocks: # if "image" in block: # pic_num += 1 # bbox = block["bbox"] # image_bytes = block["image"] # image_ext = block["ext"] # image_name = f'image_{page.number}_{pic_num}.{image_ext}' # image_url = f'http://127.0.0.1:8099/chat_web_backend/get-image?file_name={image_name}' # image_path = os.path.join(image_save_path, image_name) # # 保存图片到指定路径 # with open(image_path, 'wb') as img_file: # img_file.write(image_bytes) # percent_left = (bbox[0]) / page_width * 100 # # 获取页面的宽度和高度 # container_width = page_width # 页面宽度 # container_height = page_height # 页面高度 # # 计算图像的宽度和高度 # img_width = bbox[2] - bbox[0] # 计算宽度 # img_height = bbox[3] - bbox[1] # 计算高度 # # 计算百分比 # width_percent = (img_width / container_width) * 100 # height_percent = (img_height / container_height) * 100 # html.append(f'
Image {pic_num}
') # if "lines" in block: # text_nums = 0 # for line in block["lines"]: # is_code_block =any(span["font"].startswith(("Courier", "NSimSun")) for span in line["spans"]) # 假设代码使用Courier字体 # if is_code_block: # html.append(f"
")
    #                         for span in line["spans"]:
    #                             text_nums += 1
    #                             bbox = span["bbox"]
    #                             text = span["text"]
    #                             font = span["font"]  # 字体
    #                             size = span["size"]  # 字体大小
    #                             color = span["color"]  # 字体颜色

    #                             # 动态生成CSS样式
    #                             css_style = f'font-family: {font}; font-size: {size}px; color: #{color:06x};'
    #                             percent_left = (bbox[0]) / page_width * 100
    #                             # 根据字体大小判断标题
    #                             if size > 20:  # 假设大于20的字体为标题
    #                                 if text_nums == 1:
    #                                     html.append(f'

{text.strip()}

') # else: # html.append(f'

{text.strip()}

') # else: # if text_nums == 1: # html.append(f'

{text.strip()}

') # else: # html.append(f'

{text.strip()}

') # if is_code_block or size<=20: # if is_code_block: # html.append("
") # else: # html.append("
") # else: # html.append('
') # # html.append('
') # html.append('') # # 将HTML内容保存到指定路径 # html_content = ''.join(html) # if output_path: # with open(output_path, 'w', encoding='utf-8') as file: # file.write(html_content) # else: # # 如果没有指定路径,使用默认路径或返回HTML内容 # output_path = 'output.html' # with open(output_path, 'w', encoding='utf-8') as file: # file.write(html_content) # return output_path # except Exception as e: # raise RuntimeError(f"PDF转换失败: {str(e)}") # def replace_base64_with_url(self,html_content, output_dir): # image_index = 0 # 用于生成唯一的文件名 # def replace_base64(match): # nonlocal image_index # base64_data = match.group(0) # # 保存 Base64 图片并获取文件路径 # # 提取文件类型和实际的 Base64 数据 # header, data = base64_data.split(',', 1) # file_extension = header.split(';')[0].split('/')[1] # 获取文件扩展名 # file_name = f'image_{uuid.uuid1()}_{image_index}.{file_extension}' # 生成文件名 # file_path = os.path.join(output_dir, file_name) # # 将 Base64 数据解码并保存为文件 # with open(file_path, 'wb') as image_file: # image_file.write(base64.b64decode(data)) # image_index += 1 # # 返回文件的 URL # return f"http://127.0.0.1:8099/chat_web_backend/get-image?file_name={os.path.basename(file_path)}" # # 使用正则表达式匹配 Base64 字符串 # base64_pattern = r'data:image/(png|jpg|jpeg);base64,[A-Za-z0-9+/=]+' # updated_html_content = re.sub(base64_pattern, replace_base64, html_content) # return updated_html_content