From ee7c4a73ed2473612f53cfada1c693433f7d6d9f Mon Sep 17 00:00:00 2001 From: liuguancen Date: Thu, 2 Apr 2026 16:45:27 +0800 Subject: [PATCH] =?UTF-8?q?[RAG]=20=E5=BD=BB=E5=BA=95=E6=94=B9=E5=9B=9E?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E4=B8=8A=E4=BC=A0(=E6=A8=A1=E5=9E=8B?= =?UTF-8?q?=E5=B7=B2=E6=8D=A2v3=E8=B6=B3=E5=A4=9F=E5=BF=AB)=EF=BC=8C?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=BC=82=E6=AD=A5=E5=90=8E=E5=8F=B0=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/knowledge_base/kb_doc_api.py | 139 +++++------------- 1 file changed, 40 insertions(+), 99 deletions(-) diff --git a/langchain-chat/server/knowledge_base/kb_doc_api.py b/langchain-chat/server/knowledge_base/kb_doc_api.py index 6364b86..ed83e0c 100644 --- a/langchain-chat/server/knowledge_base/kb_doc_api.py +++ b/langchain-chat/server/knowledge_base/kb_doc_api.py @@ -270,75 +270,6 @@ def upload_docs( return BaseResponse(code=200, msg="文件上传与向量化完成", data={"failed_files": failed_files}) -def _background_llm_and_vectorize( - knowledge_base_name: str, - file_names: List[str], - chunk_size: int, - chunk_overlap: int, - zh_title_enhance: bool, - docs: dict, - not_refresh_vs_cache: bool, - embedding_ids: dict, -): - """后台线程:LLM 导读 + 向量化,完成后直连 MySQL 更新结果。""" - import time - import pymysql - start_time = time.time() - - kb = KBServiceFactory.get_service_by_name(knowledge_base_name) - - for filename in file_names: - try: - knowledge_file = KnowledgeFile(filename=filename, knowledge_base_name=knowledge_base_name) - new_loop = asyncio.new_event_loop() - asyncio.set_event_loop(new_loop) - try: - llm_result = new_loop.run_until_complete(knowledge_file.get_llm_result()) - finally: - new_loop.close() - - # 直连 MySQL 更新导读结果 - embedding_id = embedding_ids.get(filename, filename) - try: - conn = pymysql.connect(**ck_mysql_config) - with conn.cursor() as cursor: - cursor.execute( - "UPDATE gpt_upload_file SET article_abstract=%s, article_keywords=%s, article_paragraph=%s WHERE embedding_id=%s", - ( - str(llm_result.get("article_abstract", "生成摘要失败")), - str(llm_result.get("article_keywords", "生成关键词失败")), - str(llm_result.get("article_paragraph", "生成章节速览失败")), - embedding_id - ) - ) - conn.commit() - conn.close() - logger.info(f"[后台] LLM 导读已更新到数据库: {filename}") - except Exception as db_e: - logger.error(f"[后台] MySQL 更新失败 {filename}: {db_e}") - - except Exception as e: - logger.error(f"[后台] LLM 导读生成失败 {filename}: {e}") - - # 向量化 - try: - _update_docs_impl( - knowledge_base_name=knowledge_base_name, - file_names=file_names, - override_custom_docs=True, - chunk_size=chunk_size, - chunk_overlap=chunk_overlap, - zh_title_enhance=zh_title_enhance, - docs=docs, - not_refresh_vs_cache=True, - ) - if kb and not not_refresh_vs_cache: - kb.save_vector_store() - except Exception as e: - logger.error(f"[后台] 向量化失败: {e}") - logger.info(f"[后台] 总耗时: {time.time() - start_time:.2f}s") - - def upload_docs_new( files: List[UploadFile] = File(..., description="上传文件,支持多文件"), knowledge_base_name: str = Form(..., description="知识库名称", examples=["samples"]), @@ -352,7 +283,7 @@ def upload_docs_new( not_refresh_vs_cache: bool = Form(False, description="暂不保存向量库(用于FAISS)"), ) -> BaseResponse: """ - API接口:上传文件,提取全文后快速返回,LLM导读+向量化后台异步执行并直连MySQL更新结果 + API接口:上传文件,同步生成导读(模型已优化为deepseek-v3),然后向量化 """ import time start_time = time.time() @@ -373,51 +304,61 @@ def upload_docs_new( failed_files = {} file_names = list(docs.keys()) llm_results = {} - embedding_ids = {} - # 保存文件到磁盘 + 提取全文(快速) for result in _save_files_in_thread(files, knowledge_base_name=knowledge_base_name, override=override): filename = result["data"]["file_name"] if result["code"] != 200: failed_files[filename] = result["msg"] + if filename not in file_names: file_names.append(filename) - embedding_ids[filename] = filename try: knowledge_file = KnowledgeFile(filename=filename, knowledge_base_name=knowledge_base_name) - full_text_data = knowledge_file.get_full_text() - import json as _json - try: - full_text = _json.loads(full_text_data).get("full_text", "") - except: - full_text = "" + import concurrent.futures + def run_async_in_thread(): + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + return new_loop.run_until_complete(knowledge_file.get_llm_result()) + finally: + new_loop.close() + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(run_async_in_thread) + llm_result = future.result() llm_results[filename] = { - "full_text": full_text, - "article_abstract": "导读生成中,请稍后刷新查看...", - "article_keywords": "导读生成中,请稍后刷新查看...", - "article_paragraph": "导读生成中,请稍后刷新查看..." + "full_text": llm_result.get("full_text", "获取全文失败"), + "article_abstract": llm_result.get("article_abstract", "生成摘要失败"), + "article_keywords": llm_result.get("article_keywords", "生成关键词失败"), + "article_paragraph": llm_result.get("article_paragraph", "生成章节速览失败") } except Exception as e: - logger.error(f"提取全文失败 {filename}: {e}") + logger.error(f"生成LLM结果时出错:{e}", exc_info=e if log_verbose else None) llm_results[filename] = { - "full_text": "", - "article_abstract": "导读生成中,请稍后刷新查看...", - "article_keywords": "导读生成中,请稍后刷新查看...", - "article_paragraph": "导读生成中,请稍后刷新查看..." + "article_abstract": "生成摘要失败", + "article_keywords": "生成关键词失败", + "article_paragraph": "生成章节速览失败" } - # 后台异步:LLM 导读 + 向量化,完成后直连 MySQL 更新 - import threading - threading.Thread( - target=_background_llm_and_vectorize, - args=(knowledge_base_name, file_names, chunk_size, chunk_overlap, - zh_title_enhance, docs, not_refresh_vs_cache, embedding_ids), - daemon=True - ).start() - - logger.info(f"文件上传+全文提取: {time.time() - start_time:.2f}s,LLM+向量化转后台") - return BaseResponse(code=200, msg="文件上传完成,导读生成中", data={ + if to_vector_store: + update_st = time.time() + result = _update_docs_impl( + knowledge_base_name=knowledge_base_name, + file_names=file_names, + override_custom_docs=True, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + zh_title_enhance=zh_title_enhance, + docs=docs, + not_refresh_vs_cache=True, + ) + failed_files.update(result.data["failed_files"]) + if not not_refresh_vs_cache: + kb.save_vector_store() + logger.info(f'向量化用时:{time.time() - update_st}') + logger.info(f"总执行时间: {time.time() - start_time:.2f}s") + return BaseResponse(code=200, msg="文件上传与向量化完成", data={ "failed_files": failed_files, "llm_results": llm_results })