2026-06-07
AI
0

目录

整体思路
代码

RAGFlow 是一款开源、面向复杂文档理解的企业级 RAG(检索增强生成)引擎。让LangChain集成Ragflow轻松实现文档检索

整体思路

  • RAGFlow:独立服务,擅长文档解析、分块、向量检索、溯源,对外提供 HTTP API。
  • LangChain:编排框架,核心是 BaseRetriever → 检索文档 → 进 LLM 链。
  • 集成方式:写一个自定义 Retriever,继承 LangChain 的 BaseRetriever,内部调用 RAGFlow 的检索接口,把结果转成 LangChain 的 Document。

代码

python
#!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2026/6/7 21:45 @Author: sql668 @File : ragflow_service.py """ import http.client import json from typing import List from langchain_core.callbacks import CallbackManagerForRetrieverRun from langchain_core.documents import Document from langchain_core.retrievers import BaseRetriever class RAGFlowRetrieverService(BaseRetriever): """修复所有隐坑:强制不走环境变量、不走类默认值、纯硬编码验证""" RAGFLOW_HOST = "0.0.0.0" RAGFLOW_PORT = 80 RAGFLOW_API_KEY = "ragflow-U4NGE0YWYwNjI3NzExZjFhYTdiMDI0Mm" RAGFLOW_KB_ID = "3e82dfba624011f1b40e0242ac120006" top_k: int = 3 def _get_relevant_documents( self, query: str, *, run_manager: CallbackManagerForRetrieverRun ) -> List[Document]: host = self.RAGFLOW_HOST port = self.RAGFLOW_PORT api_key = self.RAGFLOW_API_KEY kb_id = self.RAGFLOW_KB_ID payload = json.dumps({ "question": query, "dataset_ids": [kb_id] }) headers = { 'Authorization': f'Bearer {api_key}', # 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Content-Type': 'application/json', # 'Accept': '*/*', # 'Host': f'{host}:{port}', # 'Connection': 'keep-alive' } try: conn = http.client.HTTPConnection(host, port, timeout=30000) conn.request("POST", "/api/v1/retrieval", payload, headers) res = conn.getresponse() data = res.read().decode("utf-8") if res.status == 502: raise Exception( "502 Bad Gateway 错误!\n" "请检查 RAGFlow 服务日志以获取详细错误信息。" ) if res.status != 200: raise Exception( f"请求失败,状态码: {res.status}\n" f"响应内容: {data}" ) response_data = json.loads(data) print("🔍 完整响应结构:", json.dumps(response_data, ensure_ascii=False, indent=2)) docs = [] chunks = response_data.get("data", {}).get("chunks", []) for chunk in chunks: docs.append(Document( page_content=chunk.get("content", ""), metadata=chunk )) return docs except Exception as e: raise Exception( f"调用 RAGFlow API 失败: {str(e)}" ) if __name__ == "__main__": ret = RAGFlowRetrieverService() docs = ret.get_relevant_documents("结构化分析方法") print("✅ 成功!文档数量:", len(docs))

handler.py

import os import uuid from dataclasses import dataclass from operator import itemgetter from typing import Any from injector import inject from langchain.memory import ConversationBufferWindowMemory from langchain_community.chat_message_histories import FileChatMessageHistory from langchain_community.chat_models import ChatTongyi from langchain_core.memory import BaseMemory from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableConfig from langchain_core.tracers import Run from internal.schema.app_schema import CompletionReq from internal.service import AppService, VectorDatabaseService, RAGFlowRetrieverService from pkg.response import success_json, validate_error_json, success_message @inject @dataclass class AppHandler: """应用控制器""" app_service: AppService vector_database_service: VectorDatabaseService ragflow_service: RAGFlowRetrieverService def create_app(self): app = self.app_service.create_app() return success_message(f"应用创建成功,id为{app.id}") def get_app(self, id: uuid.uuid4): app = self.app_service.get_app(id) return success_message(f"应用获取成功,名字是{app.name}") def update_app(self, id: uuid.uuid4): app = self.app_service.update_app(id) return success_message(f"应用更新成功,名字是{app.name}") def delete_app(self, id: uuid.uuid4): app = self.app_service.delete_app(id) return success_message(f"应用删除成功,名字是{app.name}") def ping(self): # raise FailException("数据未找到") return {"ping": "pong"} def debug(self, app_id: uuid.UUID): """聊天接口""" # 1. 提取从接口中获取的输入,POST req = CompletionReq() if not req.validate(): return validate_error_json(req.errors) # 创建prompt于记忆 system_prompt = "你是一个强大的聊天机器人,能根据对应的上下文和历史对话信息回复用户的问题。\n\n<context>{context}</context>" prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), MessagesPlaceholder("history"), ("human", "{query}") ]) memory = ConversationBufferWindowMemory( k=3, # 保留最近的3轮对话 input_key="query", output_key="output", return_messages=True, chat_memory=FileChatMessageHistory("./storage/memory/chat_history.txt") ) # 创建大语言模型 llm = ChatTongyi(model="qwen3.7-max", dashscope_api_key=os.getenv("OPENAI_API_KEY")) # 可以使用其他的向量库作为检索器 retriever = self.vector_database_service.get_retriever() | self.vector_database_service.combine_documents # 创建链应用 chain = (RunnablePassthrough.assign( history=RunnableLambda(self._load_memory_variables) | itemgetter("history"), context=itemgetter("query") | self.ragflow_service ) | prompt | llm | StrOutputParser()).with_listeners(on_end=self._save_context) # 获取检索器 # retriever = self.vector_database_service.get_retriever() # # # 测试检索 # query_text = req.query.data # print(f"查询文本: {query_text}") # docs = retriever.invoke(query_text) # print(f"检索到的文档数量: {len(docs)}") # for i, doc in enumerate(docs): # print(f"文档 {i + 1}: {doc.page_content[:50]}...") # print(f"相似度分数: {doc.metadata.get('score', 'N/A')}") # # context = self.vector_database_service.combine_documents(docs) # print(f"合并后的上下文:\n{context}") # 创建链应用 # chain = (RunnablePassthrough.assign( # history=RunnableLambda(self._load_memory_variables) | itemgetter("history"), # context=lambda x: self.vector_database_service.combine_documents(retriever.invoke(x["query"])) # ) | prompt | llm | StrOutputParser()).with_listeners(on_end=self._save_context) # 调用链应用 chain_input = {"query": req.query.data} content = chain.invoke(chain_input, config={ "configurable": {"memory": memory} }) # memory.save_context(chain_input, {"output": content}) # prompt = ChatPromptTemplate.from_template("{query}") # 2. 构建大模型客户端 # llm = ChatOpenAI(model="qwen3.7-max", api_key=os.getenv("OPENAI_API_KEY"), # base_url=os.getenv("OPENAI_API_BASE_URL")) # parser = StrOutputParser() # chain = prompt | llm | parser # content = chain.invoke({"query": req.query.data}) return success_json({"content": content}) @classmethod def _load_memory_variables(cls, input: dict[str, Any], config: RunnableConfig) -> dict[str, Any]: """加载记忆变量信息""" # 1. 从config中获取configurable configurable = config.get("configurable", {}) configurable_memory = configurable.get("memory", None) if configurable_memory is not None and isinstance(configurable_memory, BaseMemory): return configurable_memory.load_memory_variables(input) return { "history": config.get("memory").load_memory_variables(input) } @classmethod def _save_context(cls, run_obj: Run, config: RunnableConfig) -> None: """存储上下文信息到记忆实体中""" configurable = config.get("configurable", {}) configurable_memory = configurable.get("memory", None) if configurable_memory is not None and isinstance(configurable_memory, BaseMemory): configurable_memory.save_context(run_obj.inputs, run_obj.outputs)
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:繁星

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!