Skip to main content

Working Examples

Learn Flow Core through practical, working examples from our GitHub repository. Each example progressively introduces more advanced concepts.
All examples are fully functional and can be found in the examples/ directory of the Flow Core repository.

Available Examples

Example Progression

🎯 Level 1: Basics

Start with fundamental concepts:

01_basic_chat_bot.py

Learn the basics of Flow Core with a simple chatbot that maintains conversation history:
# Key concepts demonstrated:
- BaseNode implementation
- NodeContext and WorkflowContext
- ChatHistoryNode for memory management
- InMemoryChatHistory for session storage
- Message handling (user/assistant)
Features:
  • Session-based conversation history
  • Window-based memory (keeps last N messages)
  • Simple rule-based responses
  • History inspection

🚀 Level 2: Streaming

Add real-time capabilities:

02_streaming_chat.py

Implement token-by-token streaming for responsive chat experiences:
# Key concepts demonstrated:
- StreamingNode implementation
- AsyncGenerator for streaming
- Real-time token emission
- Progress tracking
- Stream aggregation
Features:
  • Character-by-character streaming
  • Simulated typing effect
  • Complete response aggregation
  • Stream event handling

⚡ Level 3: Parallel Execution

Scale with concurrent processing:

03_parallel_search.py

Execute multiple search operations simultaneously:
# Key concepts demonstrated:
- ParallelNode for concurrent execution
- Multiple search engine integration
- Result aggregation strategies
- Error handling in parallel flows
- Performance optimization
Features:
  • Concurrent API calls to multiple sources
  • Aggregated search results
  • Fallback handling for failed searches
  • Response time tracking

🧠 Level 4: RAG Pipeline

Build production-ready AI workflows:

04_rag_pipeline.py

Complete Retrieval-Augmented Generation implementation:
# Key concepts demonstrated:
- Complex workflow composition
- Document ingestion and chunking
- Vector embedding generation
- Similarity search
- Context-aware generation
- Pipeline orchestration
Features:
  • Document processing pipeline
  • Vector store integration
  • Semantic search
  • Context injection
  • LLM-powered responses

🛡️ Level 5: Advanced Patterns

Production-grade resilience:

advanced_features_example.py

Comprehensive demonstration of enterprise features:
# Key concepts demonstrated:
- Retry mechanisms with exponential backoff
- Fallback strategies
- Circuit breaker patterns
- Response caching
- Rate limiting
- Error recovery
Features:
  • Automatic retry on transient failures
  • Graceful degradation
  • Performance optimization through caching
  • API rate limit handling
  • Comprehensive error handling

👥 Level 6: Human-in-the-Loop

Interactive workflows with human oversight:

hitl_example.py

Implement workflows that pause for human input:
# Key concepts demonstrated:
- Workflow interruption
- Human approval gates
- State persistence
- Resume from checkpoint
- Conditional execution
Features:
  • Pause workflow for approval
  • State preservation during interruption
  • Manual intervention points
  • Workflow resumption
  • Audit trail

Running the Examples

Prerequisites

# Install Flow Core
pip install nadoo-flow-core

# Clone the repository for examples
git clone https://github.com/nadoo-ai/nadoo-flow-core
cd nadoo-flow-core/examples

Running Individual Examples

# Run basic chatbot
python 01_basic_chat_bot.py

# Run streaming chat
python 02_streaming_chat.py

# Run parallel search
python 03_parallel_search.py

# Run RAG pipeline
python 04_rag_pipeline.py

# Run advanced features
python advanced_features_example.py

# Run human-in-the-loop
python hitl_example.py

Example Structure

Each example follows a consistent structure:
"""
Example Title
Description in Korean and English
"""

import asyncio
from nadoo_flow import ...

# 1. Node Implementation
class CustomNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Node logic here
        pass

# 2. Workflow Composition
workflow = Node1() | Node2() | Node3()

# 3. Main Execution
async def main():
    # Setup
    context = WorkflowContext(...)

    # Execute
    result = await workflow.run(input_data)

    # Display results
    print(result)

if __name__ == "__main__":
    asyncio.run(main())

Key Learning Points

From Basic Examples

  • Node Creation: How to implement custom nodes
  • Context Management: Working with node and workflow contexts
  • Memory Systems: Implementing conversation history
  • Error Handling: Proper error propagation

From Advanced Examples

  • Parallel Processing: Scaling with concurrent execution
  • Streaming: Real-time data processing
  • Resilience: Production-grade error handling
  • Optimization: Caching and performance tuning

Common Patterns

Pattern 1: Stateful Conversations

# Using memory to maintain conversation state
history_manager = SessionHistoryManager(
    history_factory=lambda sid: InMemoryChatHistory(),
    window_size=10
)

chatbot = ChatNode("chatbot", history_manager)

Pattern 2: Parallel Data Fetching

# Fetch from multiple sources concurrently
parallel = ParallelNode([
    SearchEngineNode("google"),
    SearchEngineNode("bing"),
    DatabaseNode("internal")
])

results = await parallel.run(query)

Pattern 3: Retry with Fallback

# Resilient execution with fallback
retry_node = RetryNode(
    primary=PrimaryAPINode(),
    max_retries=3,
    backoff=ExponentialBackoff()
)

fallback_chain = retry_node | FallbackNode()

Pattern 4: Stream Processing

# Process streaming data
async for token in streaming_node.stream(input_data):
    print(token, end="", flush=True)

Best Practices Demonstrated

Every example shows proper error handling with meaningful error messages and graceful degradation.
All examples use type hints and Pydantic models for runtime validation.
Proper use of Python’s async/await for concurrent operations.
Cleanup of resources, proper context management, and memory efficiency.
Clear comments in both Korean and English explaining key concepts.

Building Your Own

After studying these examples, you’ll be able to:
  1. Create Custom Nodes: Build nodes for any use case
  2. Compose Workflows: Chain nodes into complex workflows
  3. Handle State: Manage conversation and application state
  4. Scale Performance: Use parallel execution and streaming
  5. Ensure Reliability: Implement retry, fallback, and caching
  6. Add Human Oversight: Create approval workflows

Getting Help

Next Steps

Ready to dive deeper? Start with these examples:
1

Run Basic Chat Bot

Start with 01_basic_chat_bot.py to understand core concepts
2

Try Streaming

Move to 02_streaming_chat.py for real-time features
3

Scale with Parallel

Explore 03_parallel_search.py for concurrent execution
4

Build RAG Pipeline

Study 04_rag_pipeline.py for production patterns