Skip to main content

Overview

This example demonstrates the fundamentals of Flow Core by building a simple chatbot that maintains conversation history. You’ll learn about nodes, contexts, memory management, and the basic workflow composition pattern.
Source Code: View the complete example at 01_basic_chat_bot.py

What You’ll Learn

  • Creating custom nodes by extending BaseNode
  • Managing workflow and node contexts
  • Using ChatHistoryNode for conversation memory
  • Implementing InMemoryChatHistory for session storage
  • Handling user and assistant messages
  • Building interactive chat workflows

Complete Code

"""
Basic Chat Bot Example
간단한 챗봇 - 대화 기록 관리 기능 포함

This example shows:
- BaseNode implementation
- NodeContext and WorkflowContext usage
- ChatHistoryNode for memory management
- InMemoryChatHistory for session storage
- Message handling (user/assistant)
"""

import asyncio
from datetime import datetime
from nadoo_flow import (
    BaseNode, NodeResult, NodeContext, WorkflowContext,
    ChatHistoryNode, SessionHistoryManager,
    InMemoryChatHistory, Message
)

class SimpleChatBot(BaseNode, ChatHistoryNode):
    """Simple chatbot with memory"""

    def __init__(self, name="ChatBot"):
        BaseNode.__init__(self, node_id="chatbot")

        # Setup session-based history manager
        history_manager = SessionHistoryManager(
            history_factory=lambda session_id: InMemoryChatHistory(),
            window_size=10  # Keep only last 10 messages
        )

        ChatHistoryNode.__init__(
            self,
            history_manager=history_manager,
            session_key="session_id"  # Key to extract session ID from context
        )

        self.name = name

    async def execute(self, node_context: NodeContext, workflow_context: WorkflowContext) -> NodeResult:
        """Process user input and generate response"""

        # Get user input
        user_input = node_context.input_data.get("message", "")

        # Get conversation history for this session
        history = await self.get_history(workflow_context)
        messages = await history.get_messages()

        # Generate response based on input and history
        response = await self.generate_response(user_input, messages)

        # Save interaction to history
        await self.save_interaction(
            workflow_context,
            Message.user(user_input),
            Message.assistant(response)
        )

        return NodeResult(
            success=True,
            output={
                "response": response,
                "message_count": len(messages) + 2,
                "session_id": workflow_context.get_global_variable("session_id")
            }
        )

    async def generate_response(self, user_input: str, history: list[Message]) -> str:
        """Generate bot response based on input and history"""

        # Simple rule-based responses
        user_input_lower = user_input.lower()

        if "hello" in user_input_lower or "hi" in user_input_lower:
            return f"Hello! I'm {self.name}. How can I help you today?"

        elif "bye" in user_input_lower or "goodbye" in user_input_lower:
            return "Goodbye! It was nice chatting with you!"

        elif "history" in user_input_lower:
            if not history:
                return "This is the beginning of our conversation."
            else:
                return f"We've exchanged {len(history)} messages so far."

        elif "time" in user_input_lower:
            current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            return f"The current time is {current_time}"

        elif "name" in user_input_lower:
            # Check if user mentioned their name before
            for msg in history:
                if msg.role == "user" and "my name is" in msg.content.lower():
                    # Extract name from previous message
                    parts = msg.content.lower().split("my name is")
                    if len(parts) > 1:
                        name = parts[1].strip().split()[0]
                        return f"I remember! Your name is {name.title()}."

            return "What's your name? You can tell me by saying 'My name is...'"

        else:
            # Echo with acknowledgment
            return f"You said: '{user_input}'. Tell me more!"


async def main():
    """Main execution function"""

    print("🤖 Simple Chat Bot Example")
    print("=" * 50)

    # Create chatbot
    chatbot = SimpleChatBot(name="Assistant")

    # Create workflow context with session
    workflow_context = WorkflowContext(
        workflow_id="chat_session_001",
        global_variables={"session_id": "user_123"}
    )

    # Simulate conversation
    conversations = [
        "Hello!",
        "What's the time?",
        "My name is Alice",
        "Do you remember my name?",
        "Show me our history",
        "Goodbye!"
    ]

    print(f"Starting conversation with session: {workflow_context.global_variables['session_id']}\n")

    for user_message in conversations:
        print(f"👤 User: {user_message}")

        # Create node context with user input
        node_context = NodeContext(
            node_id="chatbot",
            input_data={"message": user_message}
        )

        # Execute chatbot
        result = await chatbot.execute(node_context, workflow_context)

        if result.success:
            print(f"🤖 Bot: {result.output['response']}")
            print(f"   (Total messages: {result.output['message_count']})\n")
        else:
            print(f"❌ Error: {result.error}\n")

        # Small delay for readability
        await asyncio.sleep(0.5)

    # Display final conversation history
    print("\n" + "=" * 50)
    print("📜 Conversation History:")
    history = await chatbot.get_history(workflow_context)
    messages = await history.get_messages()

    for i, msg in enumerate(messages, 1):
        role_emoji = "👤" if msg.role == "user" else "🤖"
        print(f"{i}. {role_emoji} {msg.role.title()}: {msg.content}")


if __name__ == "__main__":
    asyncio.run(main())

Key Concepts Explained

1. BaseNode Implementation

Every Flow Core workflow component extends BaseNode:
class SimpleChatBot(BaseNode):
    def __init__(self):
        super().__init__(node_id="chatbot")

    async def execute(self, node_context, workflow_context):
        # Your logic here
        return NodeResult(success=True, output={})

2. Memory Management

The ChatHistoryNode mixin adds conversation memory:
class SimpleChatBot(BaseNode, ChatHistoryNode):
    def __init__(self):
        # Initialize both base classes
        BaseNode.__init__(self, node_id="chatbot")
        ChatHistoryNode.__init__(
            self,
            history_manager=SessionHistoryManager(...),
            session_key="session_id"
        )

3. Session Management

Each user gets their own conversation history:
history_manager = SessionHistoryManager(
    history_factory=lambda sid: InMemoryChatHistory(),
    window_size=10  # Sliding window of 10 messages
)

4. Context System

Two types of context flow through the workflow:
  • NodeContext: Per-node execution state
  • WorkflowContext: Shared state across all nodes
# Node context carries input/output
node_context = NodeContext(
    node_id="chatbot",
    input_data={"message": "Hello"}
)

# Workflow context carries global state
workflow_context = WorkflowContext(
    workflow_id="session_001",
    global_variables={"session_id": "user_123"}
)

Running the Example

Prerequisites

pip install nadoo-flow-core

Execute

python 01_basic_chat_bot.py

Expected Output

🤖 Simple Chat Bot Example
==================================================
Starting conversation with session: user_123

👤 User: Hello!
🤖 Bot: Hello! I'm Assistant. How can I help you today?
   (Total messages: 2)

👤 User: What's the time?
🤖 Bot: The current time is 2024-01-15 14:30:45
   (Total messages: 4)

👤 User: My name is Alice
🤖 Bot: You said: 'My name is Alice'. Tell me more!
   (Total messages: 6)

👤 User: Do you remember my name?
🤖 Bot: I remember! Your name is Alice.
   (Total messages: 8)

👤 User: Show me our history
🤖 Bot: We've exchanged 8 messages so far.
   (Total messages: 10)

👤 User: Goodbye!
🤖 Bot: Goodbye! It was nice chatting with you!
   (Total messages: 12)

==================================================
📜 Conversation History:
1. 👤 User: Hello!
2. 🤖 Assistant: Hello! I'm Assistant. How can I help you today?
3. 👤 User: What's the time?
4. 🤖 Assistant: The current time is 2024-01-15 14:30:45
5. 👤 User: My name is Alice
6. 🤖 Assistant: You said: 'My name is Alice'. Tell me more!
7. 👤 User: Do you remember my name?
8. 🤖 Assistant: I remember! Your name is Alice.
9. 👤 User: Show me our history
10. 🤖 Assistant: We've exchanged 8 messages so far.

Extending the Example

Add Persistent Storage

Replace InMemoryChatHistory with Redis-backed storage:
from nadoo_flow import RedisChatHistory
import redis.asyncio as redis

redis_client = redis.Redis(host='localhost', port=6379)

history_manager = SessionHistoryManager(
    history_factory=lambda sid: RedisChatHistory(
        session_id=sid,
        redis_client=redis_client,
        ttl=3600  # 1 hour expiration
    )
)

Add LLM Integration

Replace rule-based responses with LLM:
async def generate_response(self, user_input: str, history: list[Message]) -> str:
    # Format messages for LLM
    prompt = self.format_prompt(history + [Message.user(user_input)])

    # Call LLM (example with OpenAI)
    response = await self.llm.generate(prompt)

    return response.content

Add Streaming Support

Make the bot stream responses:
class StreamingChatBot(SimpleChatBot, StreamingNode):
    async def execute(self, node_context, workflow_context):
        # Get streaming context
        stream_ctx = self.get_streaming_context(workflow_context)

        # Stream tokens
        async for token in self.generate_streaming_response():
            if stream_ctx:
                await self.emit_token(stream_ctx, token, self.node_id)

Common Patterns

Pattern 1: Multi-Turn Conversations

# Keep conversation flowing
while True:
    user_input = input("You: ")
    if user_input.lower() == 'quit':
        break

    result = await chatbot.execute(
        NodeContext(input_data={"message": user_input}),
        workflow_context
    )
    print(f"Bot: {result.output['response']}")

Pattern 2: Multiple Sessions

# Different users get different sessions
for user_id in ["alice", "bob", "charlie"]:
    context = WorkflowContext(
        workflow_id=f"chat_{user_id}",
        global_variables={"session_id": user_id}
    )
    # Each user has isolated conversation history

Pattern 3: History Analysis

# Analyze conversation patterns
history = await chatbot.get_history(workflow_context)
messages = await history.get_messages()

user_messages = [m for m in messages if m.role == "user"]
avg_length = sum(len(m.content) for m in user_messages) / len(user_messages)
print(f"Average message length: {avg_length:.1f} characters")

Next Steps

Now that you understand the basics, explore: