Integration: LangChain
Integration: LangChain
Section titled “Integration: LangChain”Using EdgeQuake as a Retriever in LangChain Applications
This guide shows how to integrate EdgeQuake with LangChain Python applications for building custom RAG pipelines.
Overview
Section titled “Overview”EdgeQuake provides a REST API that can be wrapped as a LangChain BaseRetriever, enabling you to:
- Use EdgeQuake’s Graph-RAG in LangChain chains
- Combine with other retrievers (ensemble retrieval)
- Build custom RAG applications with LangChain’s tooling
┌─────────────────────────────────────────────────────────────────┐│ LANGCHAIN + EDGEQUAKE │├─────────────────────────────────────────────────────────────────┤│ ││ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────┐ ││ │ LangChain │ │ EdgeQuakeRet. │ │ EdgeQuake │ ││ │ Application │───▶│ (Custom) │───▶│ API │ ││ │ │ │ │ │ │ ││ │ • Chains │ │ • _get_relevant │ │ • /query │ ││ │ • Agents │◀───│ _documents() │◀───│ • /chat │ ││ │ • Tools │ │ │ │ │ ││ └─────────────────┘ └─────────────────┘ └─────────────┘ ││ │└─────────────────────────────────────────────────────────────────┘Prerequisites
Section titled “Prerequisites”pip install langchain langchain-core requestsEnsure EdgeQuake is running:
curl http://localhost:8080/health# {"status":"healthy","database_connected":true}Custom EdgeQuake Retriever
Section titled “Custom EdgeQuake Retriever”Create a custom retriever that wraps EdgeQuake’s API:
"""EdgeQuake Retriever for LangChain."""
from typing import List, Optionalimport requestsfrom langchain_core.documents import Documentfrom langchain_core.retrievers import BaseRetrieverfrom langchain_core.callbacks import CallbackManagerForRetrieverRun
class EdgeQuakeRetriever(BaseRetriever): """Retriever that uses EdgeQuake's Graph-RAG API.
Example: retriever = EdgeQuakeRetriever( base_url="http://localhost:8080", workspace_id="default", query_mode="hybrid" ) docs = retriever.invoke("What is the main topic?") """
base_url: str = "http://localhost:8080" workspace_id: str = "default" query_mode: str = "hybrid" # local, global, naive, hybrid, mix top_k: int = 10 timeout: int = 60
def _get_relevant_documents( self, query: str, *, run_manager: CallbackManagerForRetrieverRun, ) -> List[Document]: """Retrieve documents from EdgeQuake.
Args: query: The search query. run_manager: Callback manager for the retrieval run.
Returns: List of LangChain Document objects. """ # Make API request to EdgeQuake response = requests.post( f"{self.base_url}/api/v1/query", json={ "query": query, "mode": self.query_mode, "top_k": self.top_k, }, headers={ "Content-Type": "application/json", "X-Workspace-ID": self.workspace_id, }, timeout=self.timeout, ) response.raise_for_status() result = response.json()
# Convert to LangChain Documents documents = []
# Extract chunks from response chunks = result.get("chunks", []) for chunk in chunks: doc = Document( page_content=chunk.get("content", ""), metadata={ "source": chunk.get("document_id", ""), "chunk_id": chunk.get("chunk_id", ""), "score": chunk.get("score", 0.0), "workspace_id": self.workspace_id, "query_mode": self.query_mode, } ) documents.append(doc)
# Also include entities if available entities = result.get("entities", []) for entity in entities: doc = Document( page_content=f"Entity: {entity.get('name', '')} - {entity.get('description', '')}", metadata={ "type": "entity", "entity_type": entity.get("type", ""), "entity_name": entity.get("name", ""), } ) documents.append(doc)
return documents
class EdgeQuakeStreamRetriever(BaseRetriever): """Retriever with streaming support for real-time responses."""
base_url: str = "http://localhost:8080" workspace_id: str = "default" query_mode: str = "hybrid"
def _get_relevant_documents( self, query: str, *, run_manager: CallbackManagerForRetrieverRun, ) -> List[Document]: """Stream documents from EdgeQuake.
Uses Server-Sent Events for real-time retrieval. """ documents = []
with requests.post( f"{self.base_url}/api/v1/chat/stream", json={ "message": query, "workspace_id": self.workspace_id, "mode": self.query_mode, }, stream=True, timeout=60, ) as response: response.raise_for_status()
for line in response.iter_lines(): if line: # Parse SSE data if line.startswith(b"data: "): import json data = json.loads(line[6:])
# Check for context in the stream if "context" in data: for chunk in data["context"]: doc = Document( page_content=chunk.get("content", ""), metadata={ "source": chunk.get("document_id", ""), "chunk_id": chunk.get("chunk_id", ""), } ) documents.append(doc)
return documentsBasic Usage
Section titled “Basic Usage”Simple Retrieval
Section titled “Simple Retrieval”from edgequake_retriever import EdgeQuakeRetriever
# Create retrieverretriever = EdgeQuakeRetriever( base_url="http://localhost:8080", workspace_id="default", query_mode="hybrid", top_k=5,)
# Retrieve documentsdocs = retriever.invoke("What are the key concepts?")
for doc in docs: print(f"Content: {doc.page_content[:100]}...") print(f"Source: {doc.metadata.get('source')}") print(f"Score: {doc.metadata.get('score')}") print("---")With Different Query Modes
Section titled “With Different Query Modes”# Local mode - entity-focusedlocal_retriever = EdgeQuakeRetriever(query_mode="local")local_docs = local_retriever.invoke("Who is John Smith?")
# Global mode - relationship-focusedglobal_retriever = EdgeQuakeRetriever(query_mode="global")global_docs = global_retriever.invoke("What are the themes?")
# Naive mode - vector search only (fastest)naive_retriever = EdgeQuakeRetriever(query_mode="naive")naive_docs = naive_retriever.invoke("climate change impacts")RAG Chain with EdgeQuake
Section titled “RAG Chain with EdgeQuake”Build a complete RAG chain using EdgeQuake as the retriever:
from langchain_openai import ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParserfrom langchain_core.runnables import RunnablePassthrough
from edgequake_retriever import EdgeQuakeRetriever
# Componentsretriever = EdgeQuakeRetriever( base_url="http://localhost:8080", workspace_id="default", query_mode="hybrid",)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Prompt templatetemplate = """Answer the question based on the following context:
Context:{context}
Question: {question}
Answer:"""
prompt = ChatPromptTemplate.from_template(template)
# Helper function to format documentsdef format_docs(docs): return "\n\n".join(doc.page_content for doc in docs)
# Build the chainrag_chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser())
# Use the chainresponse = rag_chain.invoke("What is the main topic of the documents?")print(response)Ensemble Retriever
Section titled “Ensemble Retriever”Combine EdgeQuake with other retrievers for hybrid search:
from langchain.retrievers import EnsembleRetrieverfrom langchain_community.vectorstores import FAISSfrom langchain_openai import OpenAIEmbeddings
from edgequake_retriever import EdgeQuakeRetriever
# EdgeQuake retriever (Graph-RAG)edgequake = EdgeQuakeRetriever( base_url="http://localhost:8080", query_mode="hybrid",)
# Local FAISS retriever (fallback)embeddings = OpenAIEmbeddings()local_texts = ["Local document 1", "Local document 2"]faiss_store = FAISS.from_texts(local_texts, embeddings)faiss_retriever = faiss_store.as_retriever()
# Ensemble with weightsensemble_retriever = EnsembleRetriever( retrievers=[edgequake, faiss_retriever], weights=[0.7, 0.3], # Prefer EdgeQuake)
# Use ensembledocs = ensemble_retriever.invoke("What are the key topics?")Agent with EdgeQuake Tool
Section titled “Agent with EdgeQuake Tool”Create a LangChain agent that can query EdgeQuake:
from langchain.agents import AgentExecutor, create_openai_tools_agentfrom langchain_openai import ChatOpenAIfrom langchain_core.tools import Toolfrom langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from edgequake_retriever import EdgeQuakeRetriever
# Create the toolretriever = EdgeQuakeRetriever( base_url="http://localhost:8080", query_mode="hybrid",)
def search_knowledge_base(query: str) -> str: """Search the knowledge base for relevant information.""" docs = retriever.invoke(query) if not docs: return "No relevant documents found." return "\n\n".join(doc.page_content for doc in docs[:3])
knowledge_tool = Tool( name="search_knowledge_base", description="Search the knowledge base for information about documents and entities. Use this when you need specific information.", func=search_knowledge_base,)
# Create agentllm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful assistant with access to a knowledge base."), ("user", "{input}"), MessagesPlaceholder(variable_name="agent_scratchpad"),])
agent = create_openai_tools_agent(llm, [knowledge_tool], prompt)agent_executor = AgentExecutor(agent=agent, tools=[knowledge_tool], verbose=True)
# Use the agentresponse = agent_executor.invoke({ "input": "What companies are mentioned in the documents?"})print(response["output"])LangGraph Integration
Section titled “LangGraph Integration”Use EdgeQuake in a LangGraph workflow:
from typing import Annotated, Sequence, TypedDictfrom langgraph.graph import StateGraph, ENDfrom langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from edgequake_retriever import EdgeQuakeRetriever
# State definitionclass GraphState(TypedDict): messages: Annotated[Sequence[BaseMessage], "chat history"] context: str answer: str
# Nodesdef retrieve(state: GraphState) -> GraphState: """Retrieve relevant documents.""" retriever = EdgeQuakeRetriever() last_message = state["messages"][-1].content
docs = retriever.invoke(last_message) context = "\n\n".join(doc.page_content for doc in docs)
return {"context": context, **state}
def generate(state: GraphState) -> GraphState: """Generate response using context.""" from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
prompt = f"""Based on the following context, answer the question.
Context:{state["context"]}
Question: {state["messages"][-1].content}
Answer:"""
response = llm.invoke([HumanMessage(content=prompt)])
return {"answer": response.content, **state}
# Build graphworkflow = StateGraph(GraphState)workflow.add_node("retrieve", retrieve)workflow.add_node("generate", generate)workflow.add_edge("retrieve", "generate")workflow.add_edge("generate", END)workflow.set_entry_point("retrieve")
app = workflow.compile()
# Use the graphresult = app.invoke({ "messages": [HumanMessage(content="What are the main topics?")], "context": "", "answer": "",})print(result["answer"])Configuration Options
Section titled “Configuration Options”EdgeQuakeRetriever Parameters
Section titled “EdgeQuakeRetriever Parameters”| Parameter | Type | Default | Description |
|---|---|---|---|
base_url | str | ”http://localhost:8080” | EdgeQuake API URL |
workspace_id | str | ”default” | Workspace for document scope |
query_mode | str | ”hybrid” | Query mode (local/global/naive/hybrid/mix) |
top_k | int | 10 | Maximum documents to retrieve |
timeout | int | 60 | Request timeout in seconds |
Query Mode Selection
Section titled “Query Mode Selection”| Mode | Best For | Performance |
|---|---|---|
local | Entity-specific questions | Medium |
global | Theme/relationship questions | Slower |
naive | Quick keyword search | Fastest |
hybrid | General queries | Medium |
mix | Adaptive blending | Medium |
Error Handling
Section titled “Error Handling”from langchain_core.documents import Document
class RobustEdgeQuakeRetriever(EdgeQuakeRetriever): """Retriever with error handling and fallbacks."""
fallback_message: str = "Unable to retrieve documents at this time."
def _get_relevant_documents( self, query: str, *, run_manager, ) -> List[Document]: try: return super()._get_relevant_documents(query, run_manager=run_manager) except requests.exceptions.ConnectionError: # EdgeQuake is not available return [Document( page_content=self.fallback_message, metadata={"error": "connection_error"} )] except requests.exceptions.Timeout: # Request timed out return [Document( page_content=self.fallback_message, metadata={"error": "timeout"} )] except requests.exceptions.HTTPError as e: # HTTP error from EdgeQuake return [Document( page_content=f"API error: {e}", metadata={"error": "http_error"} )]Best Practices
Section titled “Best Practices”-
Choose the Right Query Mode
- Use
naivefor simple keyword searches - Use
localfor entity-focused questions - Use
globalfor relationship/theme questions - Use
hybridas the default
- Use
-
Handle Rate Limits
- EdgeQuake may have rate limits configured
- Implement exponential backoff for retries
-
Cache Results
- Use LangChain’s caching for repeated queries
- Reduces API calls and latency
-
Monitor Performance
- Check EdgeQuake’s cost dashboard
- Use workspace isolation for different use cases
-
Batch Queries
- Group similar queries when possible
- Reduces overhead and improves throughput
See Also
Section titled “See Also”- REST API Reference - Full API documentation
- Query Modes Deep Dive - Understanding query modes
- Open WebUI Integration - Alternative UI integration