Overview
Retrieval-Augmented Generation (RAG) combines the power of large language models with external knowledge retrieval. This example shows how to build production-ready RAG pipelines using Flow Core.Basic RAG Pipeline
Copy
from nadoo_flow import ChainableNode, LLMNode, ParallelNode, FunctionNode
import numpy as np
from typing import List, Dict, Any
class DocumentLoader(ChainableNode):
"""Load documents from various sources"""
def __init__(self, source_type="file"):
super().__init__()
self.source_type = source_type
async def execute(self, data):
source_path = data.get("source", "")
# Mock document loading
documents = [
{
"id": "doc1",
"content": "Nadoo Flow Core is a Python framework for building AI workflows.",
"metadata": {"source": source_path, "page": 1}
},
{
"id": "doc2",
"content": "It supports node-based composition and async execution.",
"metadata": {"source": source_path, "page": 2}
},
{
"id": "doc3",
"content": "Flow Core integrates with various LLM providers seamlessly.",
"metadata": {"source": source_path, "page": 3}
}
]
return {"documents": documents, "count": len(documents)}
class TextSplitter(ChainableNode):
"""Split documents into chunks"""
def __init__(self, chunk_size=500, overlap=50):
super().__init__()
self.chunk_size = chunk_size
self.overlap = overlap
async def execute(self, data):
documents = data.get("documents", [])
chunks = []
for doc in documents:
content = doc["content"]
doc_chunks = self._split_text(content)
for i, chunk_text in enumerate(doc_chunks):
chunks.append({
"id": f"{doc['id']}_chunk_{i}",
"content": chunk_text,
"metadata": {
**doc["metadata"],
"chunk_index": i,
"parent_doc": doc["id"]
}
})
return {"chunks": chunks, "total_chunks": len(chunks)}
def _split_text(self, text: str) -> List[str]:
"""Split text into overlapping chunks"""
chunks = []
for i in range(0, len(text), self.chunk_size - self.overlap):
chunk = text[i:i + self.chunk_size]
if chunk:
chunks.append(chunk)
return chunks
class Embedder(ChainableNode):
"""Generate embeddings for text chunks"""
def __init__(self, model="text-embedding-ada-002"):
super().__init__()
self.model = model
async def execute(self, data):
chunks = data.get("chunks", [])
embedded_chunks = []
for chunk in chunks:
# Mock embedding generation
embedding = np.random.randn(1536).tolist()
embedded_chunks.append({
**chunk,
"embedding": embedding
})
return {"embedded_chunks": embedded_chunks}
class VectorStore(ChainableNode):
"""Store and retrieve vectors"""
def __init__(self):
super().__init__()
self.store = []
async def index(self, chunks: List[Dict]):
"""Index chunks with embeddings"""
self.store.extend(chunks)
return {"indexed": len(chunks)}
async def search(self, query_embedding: List[float], top_k: int = 5):
"""Search similar chunks"""
# Mock similarity search
scores = []
for chunk in self.store:
# Cosine similarity (simplified)
score = np.random.random()
scores.append((score, chunk))
# Sort by score and return top_k
scores.sort(key=lambda x: x[0], reverse=True)
results = [chunk for score, chunk in scores[:top_k]]
return results
async def execute(self, data):
if "embedded_chunks" in data:
# Indexing mode
result = await self.index(data["embedded_chunks"])
return {"indexed": result["indexed"], "total_stored": len(self.store)}
elif "query_embedding" in data:
# Search mode
results = await self.search(
data["query_embedding"],
data.get("top_k", 5)
)
return {"retrieved_chunks": results}
else:
return {"error": "Invalid operation"}
# Build RAG pipeline
rag_pipeline = (
DocumentLoader(source_type="file")
| TextSplitter(chunk_size=200, overlap=20)
| Embedder(model="text-embedding-ada-002")
| VectorStore()
)
# Index documents
index_result = await rag_pipeline.execute({"source": "/docs/manual.pdf"})
print(f"Indexed {index_result['indexed']} chunks")
# Query pipeline
class RAGQueryPipeline(ChainableNode):
def __init__(self, vector_store: VectorStore):
super().__init__()
self.vector_store = vector_store
self.embedder = Embedder()
self.llm = LLMNode(model="gpt-4")
async def execute(self, data):
query = data.get("query", "")
# Embed query
query_data = {"chunks": [{"content": query}]}
embedded = await self.embedder.execute(query_data)
query_embedding = embedded["embedded_chunks"][0]["embedding"]
# Retrieve relevant chunks
retrieved = await self.vector_store.execute({
"query_embedding": query_embedding,
"top_k": 5
})
# Generate response with context
context = "\n".join([
chunk["content"] for chunk in retrieved["retrieved_chunks"]
])
prompt = f"""Based on the following context, answer the question.
Context:
{context}
Question: {query}
Answer:"""
response = await self.llm.execute({"prompt": prompt})
return {
"answer": response["content"],
"sources": retrieved["retrieved_chunks"],
"query": query
}
Advanced RAG with Hybrid Search
Copy
class HybridRetriever(ChainableNode):
"""Combine vector and keyword search"""
def __init__(self):
super().__init__()
self.vector_store = VectorStore()
self.keyword_index = {}
async def index_documents(self, documents: List[Dict]):
"""Index for both vector and keyword search"""
# Vector indexing
await self.vector_store.index(documents)
# Keyword indexing (inverted index)
for doc in documents:
words = doc["content"].lower().split()
for word in words:
if word not in self.keyword_index:
self.keyword_index[word] = []
self.keyword_index[word].append(doc["id"])
return {"indexed": len(documents)}
async def hybrid_search(self, query: str, query_embedding: List[float]):
"""Perform hybrid search"""
# Vector search
vector_results = await self.vector_store.search(query_embedding, top_k=10)
# Keyword search
query_words = query.lower().split()
keyword_scores = {}
for word in query_words:
if word in self.keyword_index:
for doc_id in self.keyword_index[word]:
keyword_scores[doc_id] = keyword_scores.get(doc_id, 0) + 1
# Combine and rerank
combined_scores = {}
for result in vector_results:
doc_id = result["id"]
vector_score = 0.7 # Mock score
keyword_score = keyword_scores.get(doc_id, 0) / len(query_words)
combined_scores[doc_id] = (0.6 * vector_score + 0.4 * keyword_score)
# Sort by combined score
sorted_results = sorted(
vector_results,
key=lambda x: combined_scores.get(x["id"], 0),
reverse=True
)
return sorted_results[:5]
async def execute(self, data):
if "documents" in data:
return await self.index_documents(data["documents"])
elif "query" in data:
return {
"retrieved": await self.hybrid_search(
data["query"],
data["query_embedding"]
)
}
RAG with Reranking
Copy
class Reranker(ChainableNode):
"""Rerank retrieved documents for better relevance"""
def __init__(self, model="cross-encoder/ms-marco-MiniLM-L-12-v2"):
super().__init__()
self.model = model
async def execute(self, data):
query = data.get("query", "")
candidates = data.get("candidates", [])
# Score each candidate
scored_candidates = []
for candidate in candidates:
# Mock cross-encoder scoring
score = np.random.random()
scored_candidates.append({
**candidate,
"rerank_score": score
})
# Sort by rerank score
scored_candidates.sort(key=lambda x: x["rerank_score"], reverse=True)
return {
"reranked": scored_candidates[:data.get("top_k", 5)],
"original_count": len(candidates)
}
# RAG with reranking pipeline
class AdvancedRAGPipeline(ChainableNode):
def __init__(self):
super().__init__()
self.retriever = HybridRetriever()
self.reranker = Reranker()
self.llm = LLMNode(model="gpt-4")
async def execute(self, data):
query = data.get("query", "")
# Initial retrieval (get more candidates)
retrieved = await self.retriever.execute({
"query": query,
"query_embedding": np.random.randn(1536).tolist()
})
# Rerank for better relevance
reranked = await self.reranker.execute({
"query": query,
"candidates": retrieved["retrieved"],
"top_k": 3
})
# Generate response with best sources
context = "\n".join([
f"[{i+1}] {chunk['content']}"
for i, chunk in enumerate(reranked["reranked"])
])
prompt = f"""Answer based on the numbered sources below.
Sources:
{context}
Question: {query}
Provide a detailed answer and cite sources using [1], [2], etc.:"""
response = await self.llm.execute({"prompt": prompt})
return {
"answer": response["content"],
"sources": reranked["reranked"],
"query": query
}
Streaming RAG Pipeline
Copy
from nadoo_flow import StreamingNode
from typing import AsyncGenerator
class StreamingRAG(StreamingNode):
"""RAG with streaming response generation"""
def __init__(self):
super().__init__()
self.retriever = VectorStore()
self.embedder = Embedder()
async def stream_execute(self, data) -> AsyncGenerator:
query = data.get("query", "")
# Retrieve context (non-streaming)
yield {"type": "status", "message": "Searching knowledge base..."}
# Embed query
query_embedded = await self.embedder.execute({
"chunks": [{"content": query}]
})
query_embedding = query_embedded["embedded_chunks"][0]["embedding"]
# Retrieve
retrieved = await self.retriever.execute({
"query_embedding": query_embedding,
"top_k": 3
})
yield {
"type": "sources",
"sources": retrieved["retrieved_chunks"],
"count": len(retrieved["retrieved_chunks"])
}
# Stream response generation
yield {"type": "status", "message": "Generating response..."}
context = "\n".join([
chunk["content"] for chunk in retrieved["retrieved_chunks"]
])
# Simulate streaming LLM response
response_parts = [
"Based on the context, ",
"I can tell you that ",
"Flow Core is ",
"a powerful framework ",
"for building AI workflows."
]
for i, part in enumerate(response_parts):
await asyncio.sleep(0.1)
yield {
"type": "token",
"content": part,
"progress": (i + 1) / len(response_parts)
}
yield {"type": "complete", "message": "Response generated successfully"}
# Use streaming RAG
streaming_rag = StreamingRAG()
async for chunk in streaming_rag.stream_execute({"query": "What is Flow Core?"}):
if chunk["type"] == "token":
print(chunk["content"], end="", flush=True)
elif chunk["type"] == "status":
print(f"\n[{chunk['message']}]")
elif chunk["type"] == "sources":
print(f"\nFound {chunk['count']} relevant sources")
Multi-Step Reasoning RAG
Copy
class ReasoningRAG(ChainableNode):
"""RAG with multi-step reasoning"""
def __init__(self):
super().__init__()
self.retriever = VectorStore()
self.llm = LLMNode(model="gpt-4")
async def execute(self, data):
query = data.get("query", "")
# Step 1: Decompose query
decompose_prompt = f"""Break down this question into sub-questions:
Question: {query}
Sub-questions (numbered list):"""
decomposition = await self.llm.execute({"prompt": decompose_prompt})
sub_questions = decomposition["content"].split("\n")
# Step 2: Answer each sub-question
sub_answers = []
for sub_q in sub_questions:
if sub_q.strip():
# Retrieve for sub-question
retrieved = await self.retriever.execute({
"query_embedding": np.random.randn(1536).tolist(),
"top_k": 2
})
# Generate sub-answer
sub_prompt = f"""Context: {retrieved}
Question: {sub_q}
Answer:"""
sub_answer = await self.llm.execute({"prompt": sub_prompt})
sub_answers.append({
"question": sub_q,
"answer": sub_answer["content"]
})
# Step 3: Synthesize final answer
synthesis_prompt = f"""Original question: {query}
Sub-answers:
{chr(10).join([f"Q: {sa['question']}\nA: {sa['answer']}" for sa in sub_answers])}
Provide a comprehensive answer to the original question:"""
final_answer = await self.llm.execute({"prompt": synthesis_prompt})
return {
"query": query,
"reasoning_steps": sub_answers,
"final_answer": final_answer["content"]
}
RAG with Feedback Loop
Copy
class FeedbackRAG(ChainableNode):
"""RAG with user feedback integration"""
def __init__(self):
super().__init__()
self.retriever = VectorStore()
self.llm = LLMNode(model="gpt-4")
self.feedback_history = []
async def execute(self, data):
query = data.get("query", "")
feedback = data.get("feedback", None)
# Store feedback if provided
if feedback:
self.feedback_history.append({
"query": query,
"feedback": feedback,
"timestamp": "2024-01-01T12:00:00Z"
})
# Retrieve with feedback consideration
retrieved = await self.retriever.execute({
"query_embedding": np.random.randn(1536).tolist(),
"top_k": 5
})
# Filter based on negative feedback
filtered_chunks = []
for chunk in retrieved["retrieved_chunks"]:
# Check if chunk was marked as irrelevant
is_relevant = True
for fb in self.feedback_history:
if fb["feedback"]["type"] == "irrelevant" and \
chunk["id"] in fb["feedback"].get("chunk_ids", []):
is_relevant = False
break
if is_relevant:
filtered_chunks.append(chunk)
# Generate response
context = "\n".join([chunk["content"] for chunk in filtered_chunks])
prompt = f"""Context: {context}
Previous feedback: {len(self.feedback_history)} responses rated
Question: {query}
Answer:"""
response = await self.llm.execute({"prompt": prompt})
return {
"answer": response["content"],
"sources": filtered_chunks,
"feedback_considered": len(self.feedback_history) > 0
}
async def process_feedback(self, query: str, rating: int, chunk_ids: List[str] = None):
"""Process user feedback"""
feedback_data = {
"query": query,
"feedback": {
"rating": rating,
"type": "irrelevant" if rating < 3 else "relevant",
"chunk_ids": chunk_ids or []
}
}
# Re-process with feedback
return await self.execute(feedback_data)
Production RAG Configuration
Copy
class ProductionRAG:
"""Production-ready RAG configuration"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.setup_pipeline()
def setup_pipeline(self):
"""Configure production pipeline"""
# Document processing
self.loader = DocumentLoader(
source_type=self.config.get("source_type", "file")
)
self.splitter = TextSplitter(
chunk_size=self.config.get("chunk_size", 500),
overlap=self.config.get("chunk_overlap", 50)
)
# Embedding
self.embedder = Embedder(
model=self.config.get("embedding_model", "text-embedding-ada-002")
)
# Retrieval
self.retriever = HybridRetriever()
# Reranking
self.reranker = Reranker(
model=self.config.get("reranker_model", "cross-encoder/ms-marco")
)
# Generation
self.generator = LLMNode(
model=self.config.get("llm_model", "gpt-4"),
temperature=self.config.get("temperature", 0.3),
max_tokens=self.config.get("max_tokens", 500)
)
async def index_documents(self, sources: List[str]):
"""Index documents from sources"""
all_chunks = []
for source in sources:
# Load documents
docs = await self.loader.execute({"source": source})
# Split into chunks
chunks = await self.splitter.execute(docs)
# Generate embeddings
embedded = await self.embedder.execute(chunks)
all_chunks.extend(embedded["embedded_chunks"])
# Index all chunks
await self.retriever.index_documents(all_chunks)
return {"indexed": len(all_chunks), "sources": len(sources)}
async def query(self, question: str, **kwargs):
"""Query the RAG system"""
# Embed query
query_embedded = await self.embedder.execute({
"chunks": [{"content": question}]
})
query_embedding = query_embedded["embedded_chunks"][0]["embedding"]
# Retrieve candidates
retrieved = await self.retriever.execute({
"query": question,
"query_embedding": query_embedding
})
# Rerank
reranked = await self.reranker.execute({
"query": question,
"candidates": retrieved["retrieved"],
"top_k": kwargs.get("top_k", 3)
})
# Generate response
context = "\n".join([
chunk["content"] for chunk in reranked["reranked"]
])
response = await self.generator.execute({
"prompt": self._build_prompt(question, context, **kwargs)
})
return {
"answer": response["content"],
"sources": reranked["reranked"],
"metadata": {
"model": self.config.get("llm_model"),
"chunks_used": len(reranked["reranked"])
}
}
def _build_prompt(self, question: str, context: str, **kwargs):
"""Build prompt for generation"""
style = kwargs.get("style", "detailed")
prompts = {
"detailed": f"""Use the context below to answer the question in detail.
Context:
{context}
Question: {question}
Provide a comprehensive answer:""",
"concise": f"""Answer briefly based on: {context}
Q: {question}
A:""",
"academic": f"""Based on the following sources, provide an academic response.
Sources:
{context}
Query: {question}
Response (cite sources):"""
}
return prompts.get(style, prompts["detailed"])
# Use production RAG
config = {
"source_type": "file",
"chunk_size": 500,
"chunk_overlap": 50,
"embedding_model": "text-embedding-ada-002",
"llm_model": "gpt-4",
"temperature": 0.3,
"max_tokens": 500
}
rag = ProductionRAG(config)
# Index documents
await rag.index_documents(["/docs/manual.pdf", "/docs/api.md"])
# Query
result = await rag.query(
"How does Flow Core handle parallel execution?",
style="detailed",
top_k=5
)
print(result["answer"])
Best Practices
Document Processing
Document Processing
- Choose appropriate chunk sizes (300-800 tokens typically)
- Maintain overlap between chunks (10-20%)
- Preserve document metadata through pipeline
- Handle multiple document formats
Retrieval Optimization
Retrieval Optimization
- Use hybrid search for better recall
- Implement reranking for precision
- Cache frequently accessed embeddings
- Consider query expansion techniques
Response Generation
Response Generation
- Provide clear source attribution
- Handle cases with no relevant context
- Implement response validation
- Consider streaming for better UX
Production Considerations
Production Considerations
- Monitor retrieval quality metrics
- Implement feedback loops
- Version control document indexes
- Plan for index updates