Skip to main content

Build Your First Workflow in 5 Minutes

Let’s create a simple text processing workflow that demonstrates the power and elegance of Nadoo Flow Core.

Prerequisites

Make sure you have:
  • Python 3.11+ installed
  • nadoo-flow-core package installed (pip install nadoo-flow-core)

Your First Workflow

We’ll build a workflow that:
  1. Takes user input
  2. Transforms the text (uppercase)
  3. Analyzes sentiment
  4. Returns formatted results
import asyncio
from nadoo_flow import (
    ChainableNode,
    FunctionNode,
    NodeResult,
    WorkflowContext
)

# Step 1: Define custom nodes
class TextTransformNode(ChainableNode):
    """Transform text to uppercase"""

    def __init__(self):
        super().__init__(
            node_id="transform",
            node_type="transform",
            name="Text Transformer"
        )

    async def execute(self, node_context, workflow_context):
        text = node_context.input_data.get("text", "")

        # Transform the text
        transformed = text.upper()

        # Store in workflow context for later use
        workflow_context.set_global_variable("original_text", text)

        return NodeResult(
            success=True,
            output={"text": transformed}
        )

class SentimentAnalyzerNode(ChainableNode):
    """Analyze sentiment of text"""

    def __init__(self):
        super().__init__(
            node_id="sentiment",
            node_type="analysis",
            name="Sentiment Analyzer"
        )

    async def execute(self, node_context, workflow_context):
        text = node_context.input_data.get("text", "")

        # Simple sentiment analysis (in production, use a real model)
        positive_words = ["GOOD", "GREAT", "HAPPY", "LOVE", "EXCELLENT"]
        negative_words = ["BAD", "HATE", "TERRIBLE", "AWFUL", "HORRIBLE"]

        score = 0
        for word in positive_words:
            if word in text:
                score += 1
        for word in negative_words:
            if word in text:
                score -= 1

        sentiment = "positive" if score > 0 else "negative" if score < 0 else "neutral"

        return NodeResult(
            success=True,
            output={
                "text": text,
                "sentiment": sentiment,
                "score": score
            }
        )

# Step 2: Build the workflow chain
def create_workflow():
    """Create and return the workflow chain"""

    # Chain nodes together using pipe operator
    workflow = (
        TextTransformNode()
        | SentimentAnalyzerNode()
        | FunctionNode(lambda x: {
            "summary": f"Sentiment: {x['sentiment']} (score: {x['score']})",
            "processed_text": x['text']
        })
    )

    return workflow

# Step 3: Run the workflow
async def main():
    # Create the workflow
    workflow = create_workflow()

    # Test inputs
    test_inputs = [
        {"text": "I love this framework, it's excellent!"},
        {"text": "This is a neutral statement"},
        {"text": "I hate bugs, they're terrible"}
    ]

    print("🚀 Nadoo Flow Core - Quick Start Demo\n")
    print("=" * 50)

    for input_data in test_inputs:
        print(f"\nInput: {input_data['text']}")
        print("-" * 30)

        # Run the workflow
        result = await workflow.run(input_data)

        # Display results
        print(f"Result: {result['summary']}")
        print(f"Processed: {result['processed_text']}")

if __name__ == "__main__":
    asyncio.run(main())

Expected Output

🚀 Nadoo Flow Core - Quick Start Demo

==================================================

Input: I love this framework, it's excellent!
------------------------------
Result: Sentiment: positive (score: 2)
Processed: I LOVE THIS FRAMEWORK, IT'S EXCELLENT!

Input: This is a neutral statement
------------------------------
Result: Sentiment: neutral (score: 0)
Processed: THIS IS A NEUTRAL STATEMENT

Input: I hate bugs, they're terrible
------------------------------
Result: Sentiment: negative (score: -2)
Processed: I HATE BUGS, THEY'RE TERRIBLE

Understanding the Code

Let’s break down what’s happening:
1

Node Definition

We define custom nodes by extending ChainableNode:
  • Override execute() method with your logic
  • Return NodeResult with success status and output
  • Access input via node_context.input_data
  • Store data in workflow_context for sharing
2

Workflow Composition

Use the pipe operator (|) to chain nodes:
  • Output of one node becomes input of the next
  • Create linear pipelines easily
  • Mix custom nodes with FunctionNode for quick transforms
3

Execution

Run workflows asynchronously:
  • Call workflow.run() with input data
  • Get the final output as a dictionary
  • Handle errors with try/except blocks

Adding Streaming

Want real-time updates? Enable streaming:
streaming_example.py
# Stream outputs from each node
async def stream_example():
    workflow = create_workflow()

    print("Streaming workflow execution:")
    async for chunk in workflow.stream({"text": "Hello world"}):
        print(f"  → Node output: {chunk}")

# Run: asyncio.run(stream_example())
Output:
Streaming workflow execution:
  → Node output: {'text': 'HELLO WORLD'}
  → Node output: {'text': 'HELLO WORLD', 'sentiment': 'neutral', 'score': 0}
  → Node output: {'summary': 'Sentiment: neutral (score: 0)', 'processed_text': 'HELLO WORLD'}

Parallel Processing

Process multiple items concurrently:
parallel_example.py
from nadoo_flow import ParallelNode, ParallelStrategy

# Create parallel processor
class ParallelProcessor(ChainableNode):
    def __init__(self):
        super().__init__(
            node_id="parallel",
            node_type="parallel",
            name="Parallel Processor"
        )

        # Create sub-workflows
        self.processors = [
            TextTransformNode(),
            SentimentAnalyzerNode(),
            FunctionNode(lambda x: {"length": len(x.get("text", ""))})
        ]

    async def execute(self, node_context, workflow_context):
        # Run all processors in parallel
        parallel = ParallelNode(
            nodes=self.processors,
            strategy=ParallelStrategy.ALL_SETTLED
        )

        result = await parallel.execute(node_context, workflow_context)
        return result

# Usage
processor = ParallelProcessor()
result = await processor.run({"text": "Process this in parallel!"})

Error Handling

Add resilience with retry and fallback:
resilient_example.py
from nadoo_flow import RetryableNode, RetryPolicy

class ResilientNode(RetryableNode):
    def __init__(self):
        super().__init__(
            node_id="resilient",
            node_type="api",
            name="API Caller",
            retry_policy=RetryPolicy(
                max_attempts=3,
                initial_delay=1.0,
                exponential_base=2.0
            )
        )

    async def _execute_with_retry(self, node_context, workflow_context):
        # Your API call logic here
        # Will automatically retry on failure
        pass

Adding Context

Share data between nodes using workflow context:
context_example.py
class ContextAwareNode(ChainableNode):
    async def execute(self, node_context, workflow_context):
        # Set global variables
        workflow_context.set_global_variable("user_id", "123")
        workflow_context.set_global_variable("timestamp", datetime.now())

        # Get variables set by previous nodes
        original = workflow_context.get_global_variable("original_text")

        # Access node-specific context
        previous_output = node_context.input_data

        return NodeResult(
            success=True,
            output={"processed": True},
            metadata={"user": "123"}
        )

Next Steps

Now that you’ve built your first workflow, explore more advanced features:

Quick Tips

Always use type hints for better IDE support and error detection:
from typing import Dict, Any

async def execute(
    self,
    node_context: NodeContext,
    workflow_context: WorkflowContext
) -> NodeResult:
    ...
Insert PassthroughNode to inspect data flow:
workflow = (
    transform
    | PassthroughNode()  # Inspect here
    | analyze
)
Cache expensive operations:
from nadoo_flow import CachedNode, InMemoryCache

cached = CachedNode(
    node=expensive_node,
    cache=InMemoryCache(),
    ttl=3600  # 1 hour
)
Always return proper NodeResult on failure:
try:
    # Your logic
    return NodeResult(success=True, output=data)
except Exception as e:
    return NodeResult(
        success=False,
        error=str(e),
        output={}
    )
Pro Tip: Start simple, then add complexity. Begin with basic nodes, then add parallel execution, caching, and error handling as needed.