Skip to main content

Overview

Retrieval-Augmented Generation (RAG) combines the power of large language models with external knowledge retrieval. This example shows how to build production-ready RAG pipelines using Flow Core.

Basic RAG Pipeline

from nadoo_flow import ChainableNode, LLMNode, ParallelNode, FunctionNode
import numpy as np
from typing import List, Dict, Any

class DocumentLoader(ChainableNode):
    """Load documents from various sources"""

    def __init__(self, source_type="file"):
        super().__init__()
        self.source_type = source_type

    async def execute(self, data):
        source_path = data.get("source", "")

        # Mock document loading
        documents = [
            {
                "id": "doc1",
                "content": "Nadoo Flow Core is a Python framework for building AI workflows.",
                "metadata": {"source": source_path, "page": 1}
            },
            {
                "id": "doc2",
                "content": "It supports node-based composition and async execution.",
                "metadata": {"source": source_path, "page": 2}
            },
            {
                "id": "doc3",
                "content": "Flow Core integrates with various LLM providers seamlessly.",
                "metadata": {"source": source_path, "page": 3}
            }
        ]

        return {"documents": documents, "count": len(documents)}

class TextSplitter(ChainableNode):
    """Split documents into chunks"""

    def __init__(self, chunk_size=500, overlap=50):
        super().__init__()
        self.chunk_size = chunk_size
        self.overlap = overlap

    async def execute(self, data):
        documents = data.get("documents", [])
        chunks = []

        for doc in documents:
            content = doc["content"]
            doc_chunks = self._split_text(content)

            for i, chunk_text in enumerate(doc_chunks):
                chunks.append({
                    "id": f"{doc['id']}_chunk_{i}",
                    "content": chunk_text,
                    "metadata": {
                        **doc["metadata"],
                        "chunk_index": i,
                        "parent_doc": doc["id"]
                    }
                })

        return {"chunks": chunks, "total_chunks": len(chunks)}

    def _split_text(self, text: str) -> List[str]:
        """Split text into overlapping chunks"""
        chunks = []
        for i in range(0, len(text), self.chunk_size - self.overlap):
            chunk = text[i:i + self.chunk_size]
            if chunk:
                chunks.append(chunk)
        return chunks

class Embedder(ChainableNode):
    """Generate embeddings for text chunks"""

    def __init__(self, model="text-embedding-ada-002"):
        super().__init__()
        self.model = model

    async def execute(self, data):
        chunks = data.get("chunks", [])
        embedded_chunks = []

        for chunk in chunks:
            # Mock embedding generation
            embedding = np.random.randn(1536).tolist()

            embedded_chunks.append({
                **chunk,
                "embedding": embedding
            })

        return {"embedded_chunks": embedded_chunks}

class VectorStore(ChainableNode):
    """Store and retrieve vectors"""

    def __init__(self):
        super().__init__()
        self.store = []

    async def index(self, chunks: List[Dict]):
        """Index chunks with embeddings"""
        self.store.extend(chunks)
        return {"indexed": len(chunks)}

    async def search(self, query_embedding: List[float], top_k: int = 5):
        """Search similar chunks"""
        # Mock similarity search
        scores = []
        for chunk in self.store:
            # Cosine similarity (simplified)
            score = np.random.random()
            scores.append((score, chunk))

        # Sort by score and return top_k
        scores.sort(key=lambda x: x[0], reverse=True)
        results = [chunk for score, chunk in scores[:top_k]]

        return results

    async def execute(self, data):
        if "embedded_chunks" in data:
            # Indexing mode
            result = await self.index(data["embedded_chunks"])
            return {"indexed": result["indexed"], "total_stored": len(self.store)}
        elif "query_embedding" in data:
            # Search mode
            results = await self.search(
                data["query_embedding"],
                data.get("top_k", 5)
            )
            return {"retrieved_chunks": results}
        else:
            return {"error": "Invalid operation"}

# Build RAG pipeline
rag_pipeline = (
    DocumentLoader(source_type="file")
    | TextSplitter(chunk_size=200, overlap=20)
    | Embedder(model="text-embedding-ada-002")
    | VectorStore()
)

# Index documents
index_result = await rag_pipeline.execute({"source": "/docs/manual.pdf"})
print(f"Indexed {index_result['indexed']} chunks")

# Query pipeline
class RAGQueryPipeline(ChainableNode):
    def __init__(self, vector_store: VectorStore):
        super().__init__()
        self.vector_store = vector_store
        self.embedder = Embedder()
        self.llm = LLMNode(model="gpt-4")

    async def execute(self, data):
        query = data.get("query", "")

        # Embed query
        query_data = {"chunks": [{"content": query}]}
        embedded = await self.embedder.execute(query_data)
        query_embedding = embedded["embedded_chunks"][0]["embedding"]

        # Retrieve relevant chunks
        retrieved = await self.vector_store.execute({
            "query_embedding": query_embedding,
            "top_k": 5
        })

        # Generate response with context
        context = "\n".join([
            chunk["content"] for chunk in retrieved["retrieved_chunks"]
        ])

        prompt = f"""Based on the following context, answer the question.

Context:
{context}

Question: {query}

Answer:"""

        response = await self.llm.execute({"prompt": prompt})

        return {
            "answer": response["content"],
            "sources": retrieved["retrieved_chunks"],
            "query": query
        }
class HybridRetriever(ChainableNode):
    """Combine vector and keyword search"""

    def __init__(self):
        super().__init__()
        self.vector_store = VectorStore()
        self.keyword_index = {}

    async def index_documents(self, documents: List[Dict]):
        """Index for both vector and keyword search"""
        # Vector indexing
        await self.vector_store.index(documents)

        # Keyword indexing (inverted index)
        for doc in documents:
            words = doc["content"].lower().split()
            for word in words:
                if word not in self.keyword_index:
                    self.keyword_index[word] = []
                self.keyword_index[word].append(doc["id"])

        return {"indexed": len(documents)}

    async def hybrid_search(self, query: str, query_embedding: List[float]):
        """Perform hybrid search"""
        # Vector search
        vector_results = await self.vector_store.search(query_embedding, top_k=10)

        # Keyword search
        query_words = query.lower().split()
        keyword_scores = {}

        for word in query_words:
            if word in self.keyword_index:
                for doc_id in self.keyword_index[word]:
                    keyword_scores[doc_id] = keyword_scores.get(doc_id, 0) + 1

        # Combine and rerank
        combined_scores = {}
        for result in vector_results:
            doc_id = result["id"]
            vector_score = 0.7  # Mock score
            keyword_score = keyword_scores.get(doc_id, 0) / len(query_words)
            combined_scores[doc_id] = (0.6 * vector_score + 0.4 * keyword_score)

        # Sort by combined score
        sorted_results = sorted(
            vector_results,
            key=lambda x: combined_scores.get(x["id"], 0),
            reverse=True
        )

        return sorted_results[:5]

    async def execute(self, data):
        if "documents" in data:
            return await self.index_documents(data["documents"])
        elif "query" in data:
            return {
                "retrieved": await self.hybrid_search(
                    data["query"],
                    data["query_embedding"]
                )
            }

RAG with Reranking

class Reranker(ChainableNode):
    """Rerank retrieved documents for better relevance"""

    def __init__(self, model="cross-encoder/ms-marco-MiniLM-L-12-v2"):
        super().__init__()
        self.model = model

    async def execute(self, data):
        query = data.get("query", "")
        candidates = data.get("candidates", [])

        # Score each candidate
        scored_candidates = []
        for candidate in candidates:
            # Mock cross-encoder scoring
            score = np.random.random()
            scored_candidates.append({
                **candidate,
                "rerank_score": score
            })

        # Sort by rerank score
        scored_candidates.sort(key=lambda x: x["rerank_score"], reverse=True)

        return {
            "reranked": scored_candidates[:data.get("top_k", 5)],
            "original_count": len(candidates)
        }

# RAG with reranking pipeline
class AdvancedRAGPipeline(ChainableNode):
    def __init__(self):
        super().__init__()
        self.retriever = HybridRetriever()
        self.reranker = Reranker()
        self.llm = LLMNode(model="gpt-4")

    async def execute(self, data):
        query = data.get("query", "")

        # Initial retrieval (get more candidates)
        retrieved = await self.retriever.execute({
            "query": query,
            "query_embedding": np.random.randn(1536).tolist()
        })

        # Rerank for better relevance
        reranked = await self.reranker.execute({
            "query": query,
            "candidates": retrieved["retrieved"],
            "top_k": 3
        })

        # Generate response with best sources
        context = "\n".join([
            f"[{i+1}] {chunk['content']}"
            for i, chunk in enumerate(reranked["reranked"])
        ])

        prompt = f"""Answer based on the numbered sources below.

Sources:
{context}

Question: {query}

Provide a detailed answer and cite sources using [1], [2], etc.:"""

        response = await self.llm.execute({"prompt": prompt})

        return {
            "answer": response["content"],
            "sources": reranked["reranked"],
            "query": query
        }

Streaming RAG Pipeline

from nadoo_flow import StreamingNode
from typing import AsyncGenerator

class StreamingRAG(StreamingNode):
    """RAG with streaming response generation"""

    def __init__(self):
        super().__init__()
        self.retriever = VectorStore()
        self.embedder = Embedder()

    async def stream_execute(self, data) -> AsyncGenerator:
        query = data.get("query", "")

        # Retrieve context (non-streaming)
        yield {"type": "status", "message": "Searching knowledge base..."}

        # Embed query
        query_embedded = await self.embedder.execute({
            "chunks": [{"content": query}]
        })
        query_embedding = query_embedded["embedded_chunks"][0]["embedding"]

        # Retrieve
        retrieved = await self.retriever.execute({
            "query_embedding": query_embedding,
            "top_k": 3
        })

        yield {
            "type": "sources",
            "sources": retrieved["retrieved_chunks"],
            "count": len(retrieved["retrieved_chunks"])
        }

        # Stream response generation
        yield {"type": "status", "message": "Generating response..."}

        context = "\n".join([
            chunk["content"] for chunk in retrieved["retrieved_chunks"]
        ])

        # Simulate streaming LLM response
        response_parts = [
            "Based on the context, ",
            "I can tell you that ",
            "Flow Core is ",
            "a powerful framework ",
            "for building AI workflows."
        ]

        for i, part in enumerate(response_parts):
            await asyncio.sleep(0.1)
            yield {
                "type": "token",
                "content": part,
                "progress": (i + 1) / len(response_parts)
            }

        yield {"type": "complete", "message": "Response generated successfully"}

# Use streaming RAG
streaming_rag = StreamingRAG()

async for chunk in streaming_rag.stream_execute({"query": "What is Flow Core?"}):
    if chunk["type"] == "token":
        print(chunk["content"], end="", flush=True)
    elif chunk["type"] == "status":
        print(f"\n[{chunk['message']}]")
    elif chunk["type"] == "sources":
        print(f"\nFound {chunk['count']} relevant sources")

Multi-Step Reasoning RAG

class ReasoningRAG(ChainableNode):
    """RAG with multi-step reasoning"""

    def __init__(self):
        super().__init__()
        self.retriever = VectorStore()
        self.llm = LLMNode(model="gpt-4")

    async def execute(self, data):
        query = data.get("query", "")

        # Step 1: Decompose query
        decompose_prompt = f"""Break down this question into sub-questions:
Question: {query}

Sub-questions (numbered list):"""

        decomposition = await self.llm.execute({"prompt": decompose_prompt})
        sub_questions = decomposition["content"].split("\n")

        # Step 2: Answer each sub-question
        sub_answers = []
        for sub_q in sub_questions:
            if sub_q.strip():
                # Retrieve for sub-question
                retrieved = await self.retriever.execute({
                    "query_embedding": np.random.randn(1536).tolist(),
                    "top_k": 2
                })

                # Generate sub-answer
                sub_prompt = f"""Context: {retrieved}
Question: {sub_q}
Answer:"""

                sub_answer = await self.llm.execute({"prompt": sub_prompt})
                sub_answers.append({
                    "question": sub_q,
                    "answer": sub_answer["content"]
                })

        # Step 3: Synthesize final answer
        synthesis_prompt = f"""Original question: {query}

Sub-answers:
{chr(10).join([f"Q: {sa['question']}\nA: {sa['answer']}" for sa in sub_answers])}

Provide a comprehensive answer to the original question:"""

        final_answer = await self.llm.execute({"prompt": synthesis_prompt})

        return {
            "query": query,
            "reasoning_steps": sub_answers,
            "final_answer": final_answer["content"]
        }

RAG with Feedback Loop

class FeedbackRAG(ChainableNode):
    """RAG with user feedback integration"""

    def __init__(self):
        super().__init__()
        self.retriever = VectorStore()
        self.llm = LLMNode(model="gpt-4")
        self.feedback_history = []

    async def execute(self, data):
        query = data.get("query", "")
        feedback = data.get("feedback", None)

        # Store feedback if provided
        if feedback:
            self.feedback_history.append({
                "query": query,
                "feedback": feedback,
                "timestamp": "2024-01-01T12:00:00Z"
            })

        # Retrieve with feedback consideration
        retrieved = await self.retriever.execute({
            "query_embedding": np.random.randn(1536).tolist(),
            "top_k": 5
        })

        # Filter based on negative feedback
        filtered_chunks = []
        for chunk in retrieved["retrieved_chunks"]:
            # Check if chunk was marked as irrelevant
            is_relevant = True
            for fb in self.feedback_history:
                if fb["feedback"]["type"] == "irrelevant" and \
                   chunk["id"] in fb["feedback"].get("chunk_ids", []):
                    is_relevant = False
                    break

            if is_relevant:
                filtered_chunks.append(chunk)

        # Generate response
        context = "\n".join([chunk["content"] for chunk in filtered_chunks])

        prompt = f"""Context: {context}

Previous feedback: {len(self.feedback_history)} responses rated

Question: {query}
Answer:"""

        response = await self.llm.execute({"prompt": prompt})

        return {
            "answer": response["content"],
            "sources": filtered_chunks,
            "feedback_considered": len(self.feedback_history) > 0
        }

    async def process_feedback(self, query: str, rating: int, chunk_ids: List[str] = None):
        """Process user feedback"""
        feedback_data = {
            "query": query,
            "feedback": {
                "rating": rating,
                "type": "irrelevant" if rating < 3 else "relevant",
                "chunk_ids": chunk_ids or []
            }
        }

        # Re-process with feedback
        return await self.execute(feedback_data)

Production RAG Configuration

class ProductionRAG:
    """Production-ready RAG configuration"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.setup_pipeline()

    def setup_pipeline(self):
        """Configure production pipeline"""
        # Document processing
        self.loader = DocumentLoader(
            source_type=self.config.get("source_type", "file")
        )

        self.splitter = TextSplitter(
            chunk_size=self.config.get("chunk_size", 500),
            overlap=self.config.get("chunk_overlap", 50)
        )

        # Embedding
        self.embedder = Embedder(
            model=self.config.get("embedding_model", "text-embedding-ada-002")
        )

        # Retrieval
        self.retriever = HybridRetriever()

        # Reranking
        self.reranker = Reranker(
            model=self.config.get("reranker_model", "cross-encoder/ms-marco")
        )

        # Generation
        self.generator = LLMNode(
            model=self.config.get("llm_model", "gpt-4"),
            temperature=self.config.get("temperature", 0.3),
            max_tokens=self.config.get("max_tokens", 500)
        )

    async def index_documents(self, sources: List[str]):
        """Index documents from sources"""
        all_chunks = []

        for source in sources:
            # Load documents
            docs = await self.loader.execute({"source": source})

            # Split into chunks
            chunks = await self.splitter.execute(docs)

            # Generate embeddings
            embedded = await self.embedder.execute(chunks)

            all_chunks.extend(embedded["embedded_chunks"])

        # Index all chunks
        await self.retriever.index_documents(all_chunks)

        return {"indexed": len(all_chunks), "sources": len(sources)}

    async def query(self, question: str, **kwargs):
        """Query the RAG system"""
        # Embed query
        query_embedded = await self.embedder.execute({
            "chunks": [{"content": question}]
        })
        query_embedding = query_embedded["embedded_chunks"][0]["embedding"]

        # Retrieve candidates
        retrieved = await self.retriever.execute({
            "query": question,
            "query_embedding": query_embedding
        })

        # Rerank
        reranked = await self.reranker.execute({
            "query": question,
            "candidates": retrieved["retrieved"],
            "top_k": kwargs.get("top_k", 3)
        })

        # Generate response
        context = "\n".join([
            chunk["content"] for chunk in reranked["reranked"]
        ])

        response = await self.generator.execute({
            "prompt": self._build_prompt(question, context, **kwargs)
        })

        return {
            "answer": response["content"],
            "sources": reranked["reranked"],
            "metadata": {
                "model": self.config.get("llm_model"),
                "chunks_used": len(reranked["reranked"])
            }
        }

    def _build_prompt(self, question: str, context: str, **kwargs):
        """Build prompt for generation"""
        style = kwargs.get("style", "detailed")

        prompts = {
            "detailed": f"""Use the context below to answer the question in detail.

Context:
{context}

Question: {question}

Provide a comprehensive answer:""",

            "concise": f"""Answer briefly based on: {context}

Q: {question}
A:""",

            "academic": f"""Based on the following sources, provide an academic response.

Sources:
{context}

Query: {question}

Response (cite sources):"""
        }

        return prompts.get(style, prompts["detailed"])

# Use production RAG
config = {
    "source_type": "file",
    "chunk_size": 500,
    "chunk_overlap": 50,
    "embedding_model": "text-embedding-ada-002",
    "llm_model": "gpt-4",
    "temperature": 0.3,
    "max_tokens": 500
}

rag = ProductionRAG(config)

# Index documents
await rag.index_documents(["/docs/manual.pdf", "/docs/api.md"])

# Query
result = await rag.query(
    "How does Flow Core handle parallel execution?",
    style="detailed",
    top_k=5
)

print(result["answer"])

Best Practices

  • Choose appropriate chunk sizes (300-800 tokens typically)
  • Maintain overlap between chunks (10-20%)
  • Preserve document metadata through pipeline
  • Handle multiple document formats
  • Use hybrid search for better recall
  • Implement reranking for precision
  • Cache frequently accessed embeddings
  • Consider query expansion techniques
  • Provide clear source attribution
  • Handle cases with no relevant context
  • Implement response validation
  • Consider streaming for better UX
  • Monitor retrieval quality metrics
  • Implement feedback loops
  • Version control document indexes
  • Plan for index updates

Next Steps