RAGFlow 是一款开源、面向复杂文档理解的企业级 RAG(检索增强生成)引擎。让LangChain集成Ragflow轻松实现文档检索
python#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2026/6/7 21:45
@Author: sql668
@File : ragflow_service.py
"""
import http.client
import json
from typing import List
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
class RAGFlowRetrieverService(BaseRetriever):
"""修复所有隐坑:强制不走环境变量、不走类默认值、纯硬编码验证"""
RAGFLOW_HOST = "0.0.0.0"
RAGFLOW_PORT = 80
RAGFLOW_API_KEY = "ragflow-U4NGE0YWYwNjI3NzExZjFhYTdiMDI0Mm"
RAGFLOW_KB_ID = "3e82dfba624011f1b40e0242ac120006"
top_k: int = 3
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
host = self.RAGFLOW_HOST
port = self.RAGFLOW_PORT
api_key = self.RAGFLOW_API_KEY
kb_id = self.RAGFLOW_KB_ID
payload = json.dumps({
"question": query,
"dataset_ids": [kb_id]
})
headers = {
'Authorization': f'Bearer {api_key}',
# 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
'Content-Type': 'application/json',
# 'Accept': '*/*',
# 'Host': f'{host}:{port}',
# 'Connection': 'keep-alive'
}
try:
conn = http.client.HTTPConnection(host, port, timeout=30000)
conn.request("POST", "/api/v1/retrieval", payload, headers)
res = conn.getresponse()
data = res.read().decode("utf-8")
if res.status == 502:
raise Exception(
"502 Bad Gateway 错误!\n"
"请检查 RAGFlow 服务日志以获取详细错误信息。"
)
if res.status != 200:
raise Exception(
f"请求失败,状态码: {res.status}\n"
f"响应内容: {data}"
)
response_data = json.loads(data)
print("🔍 完整响应结构:", json.dumps(response_data, ensure_ascii=False, indent=2))
docs = []
chunks = response_data.get("data", {}).get("chunks", [])
for chunk in chunks:
docs.append(Document(
page_content=chunk.get("content", ""),
metadata=chunk
))
return docs
except Exception as e:
raise Exception(
f"调用 RAGFlow API 失败: {str(e)}"
)
if __name__ == "__main__":
ret = RAGFlowRetrieverService()
docs = ret.get_relevant_documents("结构化分析方法")
print("✅ 成功!文档数量:", len(docs))
handler.py
import os import uuid from dataclasses import dataclass from operator import itemgetter from typing import Any from injector import inject from langchain.memory import ConversationBufferWindowMemory from langchain_community.chat_message_histories import FileChatMessageHistory from langchain_community.chat_models import ChatTongyi from langchain_core.memory import BaseMemory from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableConfig from langchain_core.tracers import Run from internal.schema.app_schema import CompletionReq from internal.service import AppService, VectorDatabaseService, RAGFlowRetrieverService from pkg.response import success_json, validate_error_json, success_message @inject @dataclass class AppHandler: """应用控制器""" app_service: AppService vector_database_service: VectorDatabaseService ragflow_service: RAGFlowRetrieverService def create_app(self): app = self.app_service.create_app() return success_message(f"应用创建成功,id为{app.id}") def get_app(self, id: uuid.uuid4): app = self.app_service.get_app(id) return success_message(f"应用获取成功,名字是{app.name}") def update_app(self, id: uuid.uuid4): app = self.app_service.update_app(id) return success_message(f"应用更新成功,名字是{app.name}") def delete_app(self, id: uuid.uuid4): app = self.app_service.delete_app(id) return success_message(f"应用删除成功,名字是{app.name}") def ping(self): # raise FailException("数据未找到") return {"ping": "pong"} def debug(self, app_id: uuid.UUID): """聊天接口""" # 1. 提取从接口中获取的输入,POST req = CompletionReq() if not req.validate(): return validate_error_json(req.errors) # 创建prompt于记忆 system_prompt = "你是一个强大的聊天机器人,能根据对应的上下文和历史对话信息回复用户的问题。\n\n<context>{context}</context>" prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), MessagesPlaceholder("history"), ("human", "{query}") ]) memory = ConversationBufferWindowMemory( k=3, # 保留最近的3轮对话 input_key="query", output_key="output", return_messages=True, chat_memory=FileChatMessageHistory("./storage/memory/chat_history.txt") ) # 创建大语言模型 llm = ChatTongyi(model="qwen3.7-max", dashscope_api_key=os.getenv("OPENAI_API_KEY")) # 可以使用其他的向量库作为检索器 retriever = self.vector_database_service.get_retriever() | self.vector_database_service.combine_documents # 创建链应用 chain = (RunnablePassthrough.assign( history=RunnableLambda(self._load_memory_variables) | itemgetter("history"), context=itemgetter("query") | self.ragflow_service ) | prompt | llm | StrOutputParser()).with_listeners(on_end=self._save_context) # 获取检索器 # retriever = self.vector_database_service.get_retriever() # # # 测试检索 # query_text = req.query.data # print(f"查询文本: {query_text}") # docs = retriever.invoke(query_text) # print(f"检索到的文档数量: {len(docs)}") # for i, doc in enumerate(docs): # print(f"文档 {i + 1}: {doc.page_content[:50]}...") # print(f"相似度分数: {doc.metadata.get('score', 'N/A')}") # # context = self.vector_database_service.combine_documents(docs) # print(f"合并后的上下文:\n{context}") # 创建链应用 # chain = (RunnablePassthrough.assign( # history=RunnableLambda(self._load_memory_variables) | itemgetter("history"), # context=lambda x: self.vector_database_service.combine_documents(retriever.invoke(x["query"])) # ) | prompt | llm | StrOutputParser()).with_listeners(on_end=self._save_context) # 调用链应用 chain_input = {"query": req.query.data} content = chain.invoke(chain_input, config={ "configurable": {"memory": memory} }) # memory.save_context(chain_input, {"output": content}) # prompt = ChatPromptTemplate.from_template("{query}") # 2. 构建大模型客户端 # llm = ChatOpenAI(model="qwen3.7-max", api_key=os.getenv("OPENAI_API_KEY"), # base_url=os.getenv("OPENAI_API_BASE_URL")) # parser = StrOutputParser() # chain = prompt | llm | parser # content = chain.invoke({"query": req.query.data}) return success_json({"content": content}) @classmethod def _load_memory_variables(cls, input: dict[str, Any], config: RunnableConfig) -> dict[str, Any]: """加载记忆变量信息""" # 1. 从config中获取configurable configurable = config.get("configurable", {}) configurable_memory = configurable.get("memory", None) if configurable_memory is not None and isinstance(configurable_memory, BaseMemory): return configurable_memory.load_memory_variables(input) return { "history": config.get("memory").load_memory_variables(input) } @classmethod def _save_context(cls, run_obj: Run, config: RunnableConfig) -> None: """存储上下文信息到记忆实体中""" configurable = config.get("configurable", {}) configurable_memory = configurable.get("memory", None) if configurable_memory is not None and isinstance(configurable_memory, BaseMemory): configurable_memory.save_context(run_obj.inputs, run_obj.outputs)


本文作者:繁星
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!