72 lines
2.5 KiB
Python
72 lines
2.5 KiB
Python
|
|
import os
|
|||
|
|
import pandas as pd
|
|||
|
|
from pathlib import Path
|
|||
|
|
import logging
|
|||
|
|
|
|||
|
|
from typing import List, Optional, Union, Dict, Any
|
|||
|
|
from langchain_core.documents import Document
|
|||
|
|
from langchain_community.document_loaders.base import BaseLoader
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
class ExcelLoader(BaseLoader):
|
|||
|
|
"""
|
|||
|
|
用于加载 Excel 文件(.xls/.xlsx)的 Loader。
|
|||
|
|
使用 pandas 解析所有工作表,并将非空单元格内容展平为按逗号分隔的文段。
|
|||
|
|
"""
|
|||
|
|
def __init__(
|
|||
|
|
self,
|
|||
|
|
file_path: Union[str, Path],
|
|||
|
|
*,
|
|||
|
|
metadata: Optional[Dict[str, Any]] = None
|
|||
|
|
):
|
|||
|
|
"""
|
|||
|
|
Args:
|
|||
|
|
file_path: Excel 文件路径,支持 .xls 和 .xlsx
|
|||
|
|
metadata: 附加的文档级元数据
|
|||
|
|
"""
|
|||
|
|
self.file_path = str(file_path)
|
|||
|
|
self.metadata = metadata or {}
|
|||
|
|
|
|||
|
|
suffix = Path(self.file_path).suffix.lower()
|
|||
|
|
if suffix not in (".xls", ".xlsx"):
|
|||
|
|
raise ValueError(f"ExcelLoader 仅支持 .xls/.xlsx 文件: {self.file_path}")
|
|||
|
|
if not os.path.isfile(self.file_path):
|
|||
|
|
raise FileNotFoundError(f"文件不存在: {self.file_path}")
|
|||
|
|
|
|||
|
|
def load(self) -> List[Document]:
|
|||
|
|
"""
|
|||
|
|
读取 Excel 中的所有工作表,返回每个表格中所有非空单元格按逗号分隔的 Document 列表。
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# sheet_name=None 返回 dict: {sheet_name: DataFrame}
|
|||
|
|
sheets: Dict[str, pd.DataFrame] = pd.read_excel(
|
|||
|
|
self.file_path,
|
|||
|
|
sheet_name=None
|
|||
|
|
)
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"读取 Excel 文件失败: {self.file_path}", exc_info=True)
|
|||
|
|
raise RuntimeError(f"无法加载 Excel 文件: {e}") from e
|
|||
|
|
|
|||
|
|
documents: List[Document] = []
|
|||
|
|
for sheet_name, df in sheets.items():
|
|||
|
|
segments: List[str] = []
|
|||
|
|
# 遍历所有单元格
|
|||
|
|
for row in df.itertuples(index=False, name=None):
|
|||
|
|
for cell in row:
|
|||
|
|
if pd.isna(cell):
|
|||
|
|
continue
|
|||
|
|
text = str(cell).strip()
|
|||
|
|
if text:
|
|||
|
|
segments.append(text)
|
|||
|
|
# 用英文逗号分隔所有文段
|
|||
|
|
content = ",".join(segments)
|
|||
|
|
md: Dict[str, Any] = {
|
|||
|
|
"source": self.file_path,
|
|||
|
|
"sheet_name": sheet_name,
|
|||
|
|
**self.metadata
|
|||
|
|
}
|
|||
|
|
documents.append(Document(page_content=content, metadata=md))
|
|||
|
|
|
|||
|
|
return documents
|