import asyncio import os import urllib from fastapi import File, Form, Body, Query, Response, UploadFile from configs import (DEFAULT_VS_TYPE, EMBEDDING_MODEL, VECTOR_SEARCH_TOP_K, SCORE_THRESHOLD, EXPR, CHUNK_SIZE, OVERLAP_SIZE, ZH_TITLE_ENHANCE, logger, log_verbose, POLICY_KNOWLEDGE_BASE) from configs.model_config import LLM_MODELS from server.knowledge_base.cleanpdf import PdfConverter from server.knowledge_base.file_converter import FileConverter from server.utils import BaseResponse, ListResponse, flatten, run_in_thread_pool from server.knowledge_base.utils import (validate_kb_name, list_files_from_folder, get_file_path, files2docs_in_thread, KnowledgeFile) from fastapi.responses import FileResponse from sse_starlette import EventSourceResponse from pydantic import Json import json from server.knowledge_base.kb_service.base import KBServiceFactory from server.db.repository.knowledge_file_repository import get_file_detail from langchain.docstore.document import Document from server.knowledge_base.model.kb_document_model import DocumentWithVSId from typing import List, Dict from server.chat.policy_fun_iast import get_llm_model_response from datetime import datetime def search_docs( fileName: list = Body([], description="文件名称", examples=["123.txt"]), query: str = Body("", description="改写后的query", examples=["你好"]), usr_query: str = Body("", description="用户输入的问题", examples=["你好"]), knowledge_base_name: str = Body(..., description="知识库名称", examples=["samples"]), top_k: int = Body(VECTOR_SEARCH_TOP_K, description="匹配向量数"), score_threshold: float = Body(SCORE_THRESHOLD, description="知识库匹配相关度阈值,取值范围在0-1之间," "SCORE越小,相关度越高," "取到1相当于不筛选,建议设置在0.5左右", ge=0, le=1), expr: str = Body(EXPR, description="milvus混合检索条件"), file_name: str = Body("", description="文件名称,支持 sql 通配符"), metadata: dict = Body({}, description="根据 metadata 进行过滤,仅支持一级键"), custom_strategy_config: dict = Body({}, description="自定义策略配置"), query_rewrite_model_name = LLM_MODELS[0] ) -> List[DocumentWithVSId]: # 获取当前时间并格式化为YYYYMMDD time = datetime.now().strftime("%Y%m%d") if POLICY_KNOWLEDGE_BASE in knowledge_base_name: expr = get_llm_model_response( strategy_name="get policy time", llm_model_name=query_rewrite_model_name, template_prompt_name="get_policy_time", prompt_param_dict={"query": usr_query, "time": time}, temperature=0.01, max_tokens=512 ).replace("None", "") print(f'Milvus混合检索表达式:{expr}') kb = KBServiceFactory.get_service_by_name(knowledge_base_name) data = [] if type(expr) is not str: expr = EXPR query1 = "" if kb is not None: if fileName: if query: query1 += "请查询以下几篇文件:" + str(fileName[0]) + "并" + query docs = kb.search_docs(query1, top_k, score_threshold, expr) data = [DocumentWithVSId(**x[0].dict(), score=x[1], id=x[0].metadata.get("id"))for x in docs if x[0].metadata.get("source") in fileName] elif file_name or metadata: data = kb.list_docs(file_name=file_name, metadata=metadata) else: if query: docs = kb.search_docs(query, top_k, score_threshold, expr) data = [DocumentWithVSId(**x[0].dict(), score=x[1], id=x[0].metadata.get("id")) for x in docs] elif file_name or metadata: data = kb.list_docs(file_name=file_name, metadata=metadata) return data def search_self_docs( fileNames: list = Body([], description="文件名称", examples=["123.txt"]), query: str = Body("", description="改写后的query", examples=["你好"]), knowledge_base_name: str = Body(..., description="知识库名称", examples=["samples"]), top_k: int = Body(VECTOR_SEARCH_TOP_K, description="匹配向量数"), score_threshold: float = Body(SCORE_THRESHOLD, description="知识库匹配相关度阈值,取值范围在0-1之间," "SCORE越小,相关度越高," "取到1相当于不筛选,建议设置在0.5左右", ge=0, le=1), expr: str = Body("", description="milvus混合检索条件"), ) -> List[DocumentWithVSId]: kb = KBServiceFactory.get_service_by_name(knowledge_base_name) data = [] if fileNames: # 检查是否存在嵌套列表 if isinstance(fileNames[0], list): # 如果是嵌套列表,先展平 flat_fileNames = flatten(fileNames) else: # 如果不是嵌套列表,直接使用 flat_fileNames = fileNames else: flat_fileNames = [] if not expr or not isinstance(expr, str): if flat_fileNames: expr = ' || '.join([f'source == "{fileName}"' for fileName in flat_fileNames]) else: expr = "" logger.info(f"个人知识库检索EXPR: {expr}") if kb is not None: docs = kb.search_docs(query, top_k, score_threshold, expr) if top_k > 50: data = docs else: data = [ DocumentWithVSId( **{k: v for k, v in x[0].dict().items() if k != 'page_content'}, # 排除原有的 page_content score=x[1], id=x[0].metadata.get("id"), page_content=f"【^[{index +1}]^ {x[0].page_content}】 " # 拼接索引和page_content ) for index, x in enumerate(docs) # 使用enumerate来获取索引 if x[0].metadata.get("source") in flat_fileNames ] else: logger.warning(f"未找到知识库服务: {knowledge_base_name}") return data def update_docs_by_id( knowledge_base_name: str = Body(..., description="知识库名称", examples=["samples"]), docs: Dict[str, Document] = Body(..., description="要更新的文档内容,形如:{id: Document, ...}") ) -> BaseResponse: ''' 按照文档 ID 更新文档内容 ''' kb = KBServiceFactory.get_service_by_name(knowledge_base_name) if kb is None: return BaseResponse(code=500, msg=f"指定的知识库 {knowledge_base_name} 不存在") if kb.update_doc_by_ids(docs=docs): return BaseResponse(msg=f"文档更新成功") else: return BaseResponse(msg=f"文档更新失败") def list_files( knowledge_base_name: str ) -> ListResponse: if not validate_kb_name(knowledge_base_name): return ListResponse(code=403, msg="Don't attack me", data=[]) knowledge_base_name = urllib.parse.unquote(knowledge_base_name) kb = KBServiceFactory.get_service_by_name(knowledge_base_name) if kb is None: return ListResponse(code=404, msg=f"未找到知识库 {knowledge_base_name}", data=[]) else: all_doc_names = kb.list_files() return ListResponse(data=all_doc_names) def _save_files_in_thread(files: List[UploadFile], knowledge_base_name: str, override: bool): """ 通过多线程将上传的文件保存到对应知识库目录内。 生成器返回保存结果:{"code":200, "msg": "xxx", "data": {"knowledge_base_name":"xxx", "file_name": "xxx"}} """ def save_file(file: UploadFile, knowledge_base_name: str, override: bool) -> dict: ''' 保存单个文件。 ''' try: filename = file.filename file_path = get_file_path(knowledge_base_name=knowledge_base_name, doc_name=filename) data = {"knowledge_base_name": knowledge_base_name, "file_name": filename} file_content = file.file.read() # 读取上传文件的内容 if (os.path.isfile(file_path) and not override and os.path.getsize(file_path) == len(file_content) ): file_status = f"文件 {filename} 已存在。" logger.warn(file_status) return dict(code=404, msg=file_status, data=data) if not os.path.isdir(os.path.dirname(file_path)): os.makedirs(os.path.dirname(file_path)) with open(file_path, "wb") as f: f.write(file_content) return dict(code=200, msg=f"成功上传文件 {filename}", data=data) except Exception as e: msg = f"{filename} 文件上传失败,报错信息为: {e}" logger.error(f'{e.__class__.__name__}: {msg}', exc_info=e if log_verbose else None) return dict(code=500, msg=msg, data=data) params = [{"file": file, "knowledge_base_name": knowledge_base_name, "override": override} for file in files] for result in run_in_thread_pool(save_file, params=params): yield result # def files2docs(files: List[UploadFile] = File(..., description="上传文件,支持多文件"), # knowledge_base_name: str = Form(..., description="知识库名称", examples=["samples"]), # override: bool = Form(False, description="覆盖已有文件"), # save: bool = Form(True, description="是否将文件保存到知识库目录")): # def save_files(files, knowledge_base_name, override): # for result in _save_files_in_thread(files, knowledge_base_name=knowledge_base_name, override=override): # yield json.dumps(result, ensure_ascii=False) # def files_to_docs(files): # for result in files2docs_in_thread(files): # yield json.dumps(result, ensure_ascii=False) def upload_docs( files: List[UploadFile] = File(..., description="上传文件,支持多文件"), knowledge_base_name: str = Form(..., description="知识库名称", examples=["samples"]), override: bool = Form(False, description="覆盖已有文件"), to_vector_store: bool = Form(True, description="上传文件后是否进行向量化"), chunk_size: int = Form(CHUNK_SIZE, description="知识库中单段文本最大长度"), chunk_overlap: int = Form(OVERLAP_SIZE, description="知识库中相邻文本重合长度"), zh_title_enhance: bool = Form(ZH_TITLE_ENHANCE, description="是否开启中文标题加强"), docs: Json = Form({}, description="自定义的docs,需要转为json字符串", examples=[{"test.txt": [Document(page_content="custom doc")]}]), not_refresh_vs_cache: bool = Form(False, description="暂不保存向量库(用于FAISS)"), ) -> BaseResponse: """ API接口:上传文件,并/或向量化 """ if not validate_kb_name(knowledge_base_name): return BaseResponse(code=403, msg="Don't attack me") kb = KBServiceFactory.get_service_by_name(knowledge_base_name) if kb is None: return BaseResponse(code=404, msg=f"未找到知识库 {knowledge_base_name}") failed_files = {} file_names = list(docs.keys()) # 先将上传的文件保存到磁盘 for result in _save_files_in_thread(files, knowledge_base_name=knowledge_base_name, override=override): filename = result["data"]["file_name"] if result["code"] != 200: failed_files[filename] = result["msg"] if filename not in file_names: file_names.append(filename) # 对保存的文件进行向量化 if to_vector_store: result = update_docs( knowledge_base_name=knowledge_base_name, file_names=file_names, override_custom_docs=True, chunk_size=chunk_size, chunk_overlap=chunk_overlap, zh_title_enhance=zh_title_enhance, docs=docs, not_refresh_vs_cache=True, ) failed_files.update(result.data["failed_files"]) if not not_refresh_vs_cache: kb.save_vector_store() return BaseResponse(code=200, msg="文件上传与向量化完成", data={"failed_files": failed_files}) def _background_llm_and_vectorize( knowledge_base_name: str, file_names: List[str], chunk_size: int, chunk_overlap: int, zh_title_enhance: bool, docs: dict, not_refresh_vs_cache: bool, ): """后台线程:执行 LLM 导读生成 + 向量化,不阻塞上传响应。""" import time start_time = time.time() kb = KBServiceFactory.get_service_by_name(knowledge_base_name) # 1. 生成 LLM 导读(摘要、关键词、章节速览) for filename in file_names: try: knowledge_file = KnowledgeFile(filename=filename, knowledge_base_name=knowledge_base_name) new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) try: llm_result = new_loop.run_until_complete(knowledge_file.get_llm_result()) finally: new_loop.close() # 将 LLM 结果写入缓存文件,供 Java 后端轮询读取 import json cache_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "knowledge_base", knowledge_base_name) os.makedirs(cache_dir, exist_ok=True) cache_file = os.path.join(cache_dir, f"{filename}.llm_result.json") with open(cache_file, 'w', encoding='utf-8') as f: json.dump(llm_result, f, ensure_ascii=False) logger.info(f"[后台] LLM 导读生成完成: {filename}") except Exception as e: logger.error(f"[后台] LLM 导读生成失败 {filename}: {e}") # 2. 向量化 try: _update_docs_impl( knowledge_base_name=knowledge_base_name, file_names=file_names, override_custom_docs=True, chunk_size=chunk_size, chunk_overlap=chunk_overlap, zh_title_enhance=zh_title_enhance, docs=docs, not_refresh_vs_cache=True, ) if kb and not not_refresh_vs_cache: kb.save_vector_store() logger.info(f"[后台] 向量化完成,总耗时: {time.time() - start_time:.2f}s") except Exception as e: logger.error(f"[后台] 向量化失败: {e}") def upload_docs_new( files: List[UploadFile] = File(..., description="上传文件,支持多文件"), knowledge_base_name: str = Form(..., description="知识库名称", examples=["samples"]), override: bool = Form(False, description="覆盖已有文件"), to_vector_store: bool = Form(True, description="上传文件后是否进行向量化"), chunk_size: int = Form(CHUNK_SIZE, description="知识库中单段文本最大长度"), chunk_overlap: int = Form(OVERLAP_SIZE, description="知识库中相邻文本重合长度"), zh_title_enhance: bool = Form(ZH_TITLE_ENHANCE, description="是否开启中文标题加强"), docs: Json = Form({}, description="自定义的docs,需要转为json字符串", examples=[{"test.txt": [Document(page_content="custom doc")]}]), not_refresh_vs_cache: bool = Form(False, description="暂不保存向量库(用于FAISS)"), ) -> BaseResponse: """ API接口:上传文件,先提取全文快速返回,LLM导读+向量化后台异步执行 """ import time start_time = time.time() if not validate_kb_name(knowledge_base_name): return BaseResponse(code=403, msg="Don't attack me") kb = KBServiceFactory.get_service_by_name(knowledge_base_name) if kb is None: kb = KBServiceFactory.get_service(knowledge_base_name, DEFAULT_VS_TYPE, EMBEDDING_MODEL) try: kb.create_kb() logger.info(f"自动创建知识库: {knowledge_base_name}") except Exception as e: msg = f"创建知识库出错: {e}" logger.error(f'{e.__class__.__name__}: {msg}', exc_info=e if log_verbose else None) return BaseResponse(code=500, msg=msg) failed_files = {} file_names = list(docs.keys()) llm_results = {} # 保存文件到磁盘 + 提取全文(快速操作) for result in _save_files_in_thread(files, knowledge_base_name=knowledge_base_name, override=override): filename = result["data"]["file_name"] if result["code"] != 200: failed_files[filename] = result["msg"] if filename not in file_names: file_names.append(filename) # 仅提取全文(快速),不调用 LLM try: knowledge_file = KnowledgeFile(filename=filename, knowledge_base_name=knowledge_base_name) full_text_data = knowledge_file.get_full_text() import json as _json try: full_text = _json.loads(full_text_data).get("full_text", "") except: full_text = "" llm_results[filename] = { "full_text": full_text, "article_abstract": "导读生成中...", "article_keywords": "导读生成中...", "article_paragraph": "导读生成中..." } except Exception as e: logger.error(f"提取全文失败 {filename}: {e}") llm_results[filename] = { "full_text": "", "article_abstract": "导读生成中...", "article_keywords": "导读生成中...", "article_paragraph": "导读生成中..." } # 后台异步执行 LLM 导读 + 向量化(不阻塞响应) import threading bg_thread = threading.Thread( target=_background_llm_and_vectorize, args=(knowledge_base_name, file_names, chunk_size, chunk_overlap, zh_title_enhance, docs, not_refresh_vs_cache), daemon=True ) bg_thread.start() logger.info(f"文件上传+全文提取用时: {time.time() - start_time:.2f}s,LLM+向量化已转后台") return BaseResponse(code=200, msg="文件上传完成,导读生成中", data={ "failed_files": failed_files, "llm_results": llm_results }) def delete_docs( knowledge_base_name: str = Body(..., examples=["samples"]), file_names: List[str] = Body(..., examples=[["file_name.md", "test.txt"]]), delete_content: bool = Body(False), not_refresh_vs_cache: bool = Body(False, description="暂不保存向量库(用于FAISS)"), ) -> BaseResponse: if not validate_kb_name(knowledge_base_name): return BaseResponse(code=403, msg="Don't attack me") knowledge_base_name = urllib.parse.unquote(knowledge_base_name) kb = KBServiceFactory.get_service_by_name(knowledge_base_name) if kb is None: return BaseResponse(code=404, msg=f"未找到知识库 {knowledge_base_name}") failed_files = {} for file_name in file_names: if not kb.exist_doc(file_name): failed_files[file_name] = f"未找到文件 {file_name}" try: kb_file = KnowledgeFile(filename=file_name, knowledge_base_name=knowledge_base_name) kb.delete_doc(kb_file, delete_content, not_refresh_vs_cache=True) except Exception as e: msg = f"{file_name} 文件删除失败,错误信息:{e}" logger.error(f'{e.__class__.__name__}: {msg}', exc_info=e if log_verbose else None) failed_files[file_name] = msg if not not_refresh_vs_cache: kb.save_vector_store() return BaseResponse(code=200, msg=f"文件删除完成", data={"failed_files": failed_files}) def update_info( knowledge_base_name: str = Body(..., description="知识库名称", examples=["samples"]), kb_info: str = Body(..., description="知识库介绍", examples=["这是一个知识库"]), ): if not validate_kb_name(knowledge_base_name): return BaseResponse(code=403, msg="Don't attack me") kb = KBServiceFactory.get_service_by_name(knowledge_base_name) if kb is None: return BaseResponse(code=404, msg=f"未找到知识库 {knowledge_base_name}") kb.update_info(kb_info) return BaseResponse(code=200, msg=f"知识库介绍修改完成", data={"kb_info": kb_info}) from time import time def _update_docs_impl( knowledge_base_name: str, file_names: List[str], chunk_size: int = CHUNK_SIZE, chunk_overlap: int = OVERLAP_SIZE, zh_title_enhance: bool = ZH_TITLE_ENHANCE, override_custom_docs: bool = False, docs: Dict = {}, not_refresh_vs_cache: bool = False, ) -> BaseResponse: """ 更新知识库文档的核心实现(供内部调用) """ if not validate_kb_name(knowledge_base_name): return BaseResponse(code=403, msg="Don't attack me") kb = KBServiceFactory.get_service_by_name(knowledge_base_name) if kb is None: return BaseResponse(code=404, msg=f"未找到知识库 {knowledge_base_name}") failed_files = {} kb_files = [] # 生成需要加载docs的文件列表 for file_name in file_names: file_detail = get_file_detail(kb_name=knowledge_base_name, filename=file_name) # 如果该文件之前使用了自定义docs,则根据参数决定略过或覆盖 if file_detail.get("custom_docs") and not override_custom_docs: continue if file_name not in docs: try: kb_files.append(KnowledgeFile(filename=file_name, knowledge_base_name=knowledge_base_name)) except Exception as e: msg = f"加载文档 {file_name} 时出错:{e}" logger.error(f'{e.__class__.__name__}: {msg}', exc_info=e if log_verbose else None) failed_files[file_name] = msg update_st = time() # 从文件生成docs,并进行向量化。 # 这里利用了KnowledgeFile的缓存功能,在多线程中加载Document,然后传给KnowledgeFile for status, result in files2docs_in_thread(kb_files, chunk_size=chunk_size, chunk_overlap=chunk_overlap, zh_title_enhance=zh_title_enhance): if status: kb_name, file_name, new_docs = result kb_file = KnowledgeFile(filename=file_name, knowledge_base_name=knowledge_base_name) kb_file.splited_docs = new_docs kb.update_doc(kb_file, not_refresh_vs_cache=True) else: kb_name, file_name, error = result failed_files[file_name] = error print('use time:', time() - update_st) # 将自定义的docs进行向量化 for file_name, v in docs.items(): try: v = [x if isinstance(x, Document) else Document(**x) for x in v] kb_file = KnowledgeFile(filename=file_name, knowledge_base_name=knowledge_base_name) kb.update_doc(kb_file, docs=v, not_refresh_vs_cache=True) except Exception as e: msg = f"为 {file_name} 添加自定义docs时出错:{e}" logger.error(f'{e.__class__.__name__}: {msg}', exc_info=e if log_verbose else None) failed_files[file_name] = msg if not not_refresh_vs_cache: kb.save_vector_store() return BaseResponse(code=200, msg=f"更新文档完成", data={"failed_files": failed_files}) def update_docs( knowledge_base_name: str = Body(..., description="知识库名称", examples=["samples"]), file_names: List[str] = Body(..., description="文件名称,支持多文件", examples=[["file_name1", "text.txt"]]), chunk_size: int = Body(CHUNK_SIZE, description="知识库中单段文本最大长度"), chunk_overlap: int = Body(OVERLAP_SIZE, description="知识库中相邻文本重合长度"), zh_title_enhance: bool = Body(ZH_TITLE_ENHANCE, description="是否开启中文标题加强"), override_custom_docs: bool = Body(False, description="是否覆盖之前自定义的docs"), docs: Json = Body({}, description="自定义的docs,需要转为json字符串", examples=[{"test.txt": [Document(page_content="custom doc")]}]), not_refresh_vs_cache: bool = Body(False, description="暂不保存向量库(用于FAISS)"), ) -> BaseResponse: """ 更新知识库文档(API 路由) """ return _update_docs_impl( knowledge_base_name=knowledge_base_name, file_names=file_names, chunk_size=chunk_size, chunk_overlap=chunk_overlap, zh_title_enhance=zh_title_enhance, override_custom_docs=override_custom_docs, docs=docs, not_refresh_vs_cache=not_refresh_vs_cache, ) def download_doc( knowledge_base_name: str = Query(..., description="知识库名称", examples=["samples"]), file_name: str = Query(..., description="文件名称", examples=["test.txt"]), preview: bool = Query(True, description="是:浏览器内预览;否:下载"), ): """ 下载/预览知识库文档(支持自动转换为 HTML) """ logger.info(f"是否预览: {preview}") if not validate_kb_name(knowledge_base_name): return BaseResponse(code=403, msg="Don't attack me") kb = KBServiceFactory.get_service_by_name(knowledge_base_name) if kb is None: return BaseResponse(code=404, msg=f"未找到知识库 {knowledge_base_name}") try: kb_file = KnowledgeFile(filename=file_name, knowledge_base_name=knowledge_base_name) if not os.path.exists(kb_file.filepath): return BaseResponse(code=404, msg=f"文件 {file_name} 不存在") # 支持转换的文件类型映射 CONVERT_MAP = { "pdf": "pdf_to_html", "docx": "docx_to_html", "doc": "doc_to_html", "md": "md_to_html", "txt": "txt_to_html", "xlsx": "xlsx_to_html", "xls": "xls_to_html", } # 获取文件扩展名 file_ext = os.path.splitext(file_name)[1].lower().lstrip('.') # 检查是否需要转换 if file_ext in CONVERT_MAP: converter = FileConverter() convert_method = getattr(converter, CONVERT_MAP[file_ext]) try: # 执行转换并获取 HTML 内容 html_content = convert_method(kb_file.filepath, output_path=None) if "转换失败" in html_content: return BaseResponse(code=500, msg=f"文件:{file_name} 处理失败", data=html_content) # 构造响应参数 new_filename = f"{os.path.splitext(os.path.basename(file_name))[0]}.html" # 对文件名进行 UTF-8 编码 encoded_filename = urllib.parse.quote(new_filename) content_disposition = "inline" if preview else f"attachment; filename*=UTF-8''{encoded_filename}" # 返回 HTML 响应,以文件流形式 return Response( content=html_content.encode('utf-8'), media_type="text/html", headers={ "Content-Disposition": content_disposition, "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) except RuntimeError as e: msg = f"文件转换失败: {str(e)}" logger.error(msg) return BaseResponse(code=500, msg=msg) # 不需要转换的文件类型 content_disposition_type = "inline" if preview else "attachment" encoded_filename = urllib.parse.quote(kb_file.filename) with open(kb_file.filepath, 'rb') as file: file_content = file.read() return Response( content=file_content if preview else html_content, media_type="application/octet-stream", headers={ "Content-Disposition": f"{content_disposition_type}; filename*=UTF-8''{encoded_filename}", "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) except Exception as e: msg = f"{file_name} 处理失败,错误信息是:{e}" logger.error(f'{e.__class__.__name__}: {msg}', exc_info=e if log_verbose else None) return BaseResponse(code=500, msg=msg) def recreate_vector_store( knowledge_base_name: str = Body(..., examples=["samples"]), allow_empty_kb: bool = Body(True), vs_type: str = Body(DEFAULT_VS_TYPE), embed_model: str = Body(EMBEDDING_MODEL), chunk_size: int = Body(CHUNK_SIZE, description="知识库中单段文本最大长度"), chunk_overlap: int = Body(OVERLAP_SIZE, description="知识库中相邻文本重合长度"), zh_title_enhance: bool = Body(ZH_TITLE_ENHANCE, description="是否开启中文标题加强"), not_refresh_vs_cache: bool = Body(False, description="暂不保存向量库(用于FAISS)"), ): """ recreate vector store from the content. this is usefull when user can copy files to content folder directly instead of upload through network. by default, get_service_by_name only return knowledge base in the info.db and having document files in it. set allow_empty_kb to True make it applied on empty knowledge base which it not in the info.db or having no documents. """ def output(): kb = KBServiceFactory.get_service(knowledge_base_name, vs_type, embed_model) if not kb.exists() and not allow_empty_kb: yield {"code": 404, "msg": f"未找到知识库 ‘{knowledge_base_name}’"} else: if kb.exists(): kb.clear_vs() kb.create_kb() files = list_files_from_folder(knowledge_base_name) kb_files = [(file, knowledge_base_name) for file in files] i = 0 for status, result in files2docs_in_thread(kb_files, chunk_size=chunk_size, chunk_overlap=chunk_overlap, zh_title_enhance=zh_title_enhance): if status: kb_name, file_name, docs = result kb_file = KnowledgeFile(filename=file_name, knowledge_base_name=kb_name) kb_file.splited_docs = docs yield json.dumps({ "code": 200, "msg": f"({i + 1} / {len(files)}): {file_name}", "total": len(files), "finished": i + 1, "doc": file_name, }, ensure_ascii=False) kb.add_doc(kb_file, not_refresh_vs_cache=True) else: kb_name, file_name, error = result msg = f"添加文件‘{file_name}’到知识库‘{knowledge_base_name}’时出错:{error}。已跳过。" logger.error(msg) yield json.dumps({ "code": 500, "msg": msg, }) i += 1 if not not_refresh_vs_cache: kb.save_vector_store() return EventSourceResponse(output())