Skip to main content

Overview

Follow these best practices to build robust, scalable, and maintainable AI workflows with Nadoo Flow Core and Builder.

Workflow Design

Keep Workflows Focused

Do: Create small, focused workflows
# Good - Single responsibility
user_classifier = (
    InputValidation()
    | LLMNode(model="gpt-3.5-turbo", system_prompt="Classify user type")
    | OutputFormatter()
)
Don’t: Create monolithic workflows
# Bad - Too many responsibilities
everything_workflow = (
    validate | classify | process | store |
    notify | update | log | report | cleanup
)

Use Meaningful Names

Do: Descriptive node and workflow names
class EmailSentimentAnalyzer(ChainableNode):
    """Analyze sentiment of customer emails"""

class CustomerOnboardingWorkflow:
    """Handle new customer registration and setup"""
Don’t: Generic or unclear names
class Node1(ChainableNode):  # Bad
class Process(ChainableNode):  # Too generic

Implement Proper Error Handling

Do: Handle errors gracefully
from nadoo_flow import ErrorHandlerNode, RetryNode

workflow = (
    DataLoader()
    | ErrorHandlerNode(
        node=ProcessingNode(),
        fallback={"status": "failed", "reason": "processing_error"}
    )
    | RetryNode(
        node=APICall(),
        max_retries=3,
        backoff_factor=2.0
    )
)
Don’t: Ignore errors
# Bad - No error handling
workflow = DataLoader() | ProcessingNode() | APICall()

Performance Optimization

Use Async Properly

Do: Async throughout
async def process_batch(items):
    tasks = [workflow.execute(item) for item in items]
    results = await asyncio.gather(*tasks)
    return results
Don’t: Block the event loop
# Bad - Blocking calls
def process_batch(items):
    return [workflow.execute(item) for item in items]  # Synchronous

Leverage Parallel Execution

Do: Parallelize independent operations
workflow = (
    InputNode()
    | ParallelNode([
        SentimentAnalysis(),
        TopicClassification(),
        LanguageDetection()
    ])
    | MergeResults()
)
Don’t: Sequential when unnecessary
# Bad - Unnecessary sequential execution
workflow = (
    InputNode()
    | SentimentAnalysis()
    | TopicClassification()
    | LanguageDetection()
)

Implement Caching

Do: Cache expensive operations
from functools import lru_cache

class CachedEmbedding(ChainableNode):
    @lru_cache(maxsize=1000)
    async def get_embedding(self, text: str):
        return await self.embedding_api.embed(text)

Optimize LLM Calls

Do: Use appropriate models
# Use cheaper models for simple tasks
simple_task = LLMNode(model="gpt-3.5-turbo")

# Use powerful models only when needed
complex_task = LLMNode(model="gpt-4")
Don’t: Overuse expensive models
# Bad - Using GPT-4 for simple classification
classifier = LLMNode(model="gpt-4")  # Expensive and unnecessary

Code Organization

Structure Your Project

my_nadoo_project/
├── workflows/
│   ├── __init__.py
│   ├── customer_service.py
│   ├── data_processing.py
│   └── analytics.py
├── nodes/
│   ├── __init__.py
│   ├── custom_nodes.py
│   ├── validators.py
│   └── transformers.py
├── config/
│   ├── __init__.py
│   ├── settings.py
│   └── prompts.py
├── tests/
│   ├── test_workflows.py
│   └── test_nodes.py
└── main.py

Use Configuration Files

Do: Externalize configuration
# config/settings.py
class Config:
    LLM_MODEL = os.getenv("LLM_MODEL", "gpt-3.5-turbo")
    MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
    TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7"))

# workflows/customer_service.py
from config.settings import Config

llm = LLMNode(
    model=Config.LLM_MODEL,
    temperature=Config.TEMPERATURE
)
Don’t: Hardcode values
# Bad - Hardcoded configuration
llm = LLMNode(model="gpt-4", temperature=0.7)

Data Management

Validate Inputs

Do: Validate early
from pydantic import BaseModel, validator

class UserInput(BaseModel):
    email: str
    message: str

    @validator('email')
    def validate_email(cls, v):
        if '@' not in v:
            raise ValueError("Invalid email")
        return v

class InputValidator(ChainableNode):
    async def execute(self, data):
        validated = UserInput(**data)
        return validated.dict()

Handle Sensitive Data

Do: Sanitize and encrypt
class DataSanitizer(ChainableNode):
    async def execute(self, data):
        # Remove PII
        sanitized = {
            k: v for k, v in data.items()
            if k not in ['ssn', 'credit_card']
        }

        # Mask email
        if 'email' in sanitized:
            sanitized['email'] = self.mask_email(sanitized['email'])

        return sanitized

Implement Data Versioning

Do: Version your data schemas
class DataV2(BaseModel):
    version: str = "2.0"
    # new fields

class MigrationNode(ChainableNode):
    async def execute(self, data):
        if data.get("version") == "1.0":
            return self.migrate_v1_to_v2(data)
        return data

Testing

Write Comprehensive Tests

Do: Test all components
import pytest

@pytest.mark.asyncio
async def test_workflow_success():
    workflow = create_test_workflow()
    result = await workflow.execute({"input": "test"})
    assert result["success"] is True

@pytest.mark.asyncio
async def test_workflow_error_handling():
    workflow = create_test_workflow()
    result = await workflow.execute({"input": "invalid"})
    assert "error" in result

Use Test Fixtures

@pytest.fixture
async def mock_llm():
    class MockLLM(ChainableNode):
        async def execute(self, data):
            return {"content": "mocked response"}
    return MockLLM()

@pytest.mark.asyncio
async def test_with_mock(mock_llm):
    workflow = InputNode() | mock_llm
    result = await workflow.execute({"input": "test"})
    assert result["content"] == "mocked response"

Monitoring and Logging

Implement Structured Logging

Do: Use structured logs
import logging
import json

logger = logging.getLogger(__name__)

class LoggingNode(ChainableNode):
    async def execute(self, data):
        logger.info(json.dumps({
            "event": "workflow_start",
            "workflow_id": data.get("id"),
            "timestamp": datetime.now().isoformat()
        }))

        result = await self.process(data)

        logger.info(json.dumps({
            "event": "workflow_complete",
            "workflow_id": data.get("id"),
            "duration_ms": result.get("duration")
        }))

        return result

Track Metrics

Do: Monitor key metrics
from prometheus_client import Counter, Histogram

workflow_executions = Counter(
    'workflow_executions_total',
    'Total workflow executions',
    ['workflow_name', 'status']
)

workflow_duration = Histogram(
    'workflow_duration_seconds',
    'Workflow execution duration'
)

class MetricsNode(ChainableNode):
    async def execute(self, data):
        start = time.time()
        try:
            result = await self.process(data)
            workflow_executions.labels(
                workflow_name=self.name,
                status='success'
            ).inc()
            return result
        except Exception as e:
            workflow_executions.labels(
                workflow_name=self.name,
                status='error'
            ).inc()
            raise
        finally:
            duration = time.time() - start
            workflow_duration.observe(duration)

Security

Protect API Keys

Do: Use environment variables
import os

api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
    raise ValueError("OPENAI_API_KEY not set")

llm = LLMNode(model="gpt-4", api_key=api_key)
Don’t: Hardcode secrets
# Bad - Hardcoded API key
llm = LLMNode(model="gpt-4", api_key="sk-1234567890")

Validate External Inputs

Do: Sanitize user inputs
class InputSanitizer(ChainableNode):
    async def execute(self, data):
        # Remove potentially harmful content
        sanitized = {
            "message": self.clean_text(data["message"]),
            "user_id": self.validate_uuid(data["user_id"])
        }
        return sanitized

    def clean_text(self, text: str) -> str:
        # Remove SQL injection attempts, XSS, etc.
        return text.strip()[:1000]  # Limit length

Implement Rate Limiting

Do: Protect against abuse
from ratelimit import limits, sleep_and_retry

class RateLimitedNode(ChainableNode):
    @sleep_and_retry
    @limits(calls=10, period=60)  # 10 calls per minute
    async def execute(self, data):
        return await self.process(data)

Deployment

Use Environment-Specific Configs

# config/environments.py
class DevelopmentConfig:
    DEBUG = True
    LLM_MODEL = "gpt-3.5-turbo"

class ProductionConfig:
    DEBUG = False
    LLM_MODEL = "gpt-4"
    ENABLE_CACHING = True

config = ProductionConfig if os.getenv("ENV") == "prod" else DevelopmentConfig

Implement Health Checks

from fastapi import FastAPI

app = FastAPI()

@app.get("/health")
async def health_check():
    try:
        # Check critical dependencies
        await db.ping()
        await redis.ping()

        return {
            "status": "healthy",
            "version": "1.0.0",
            "timestamp": datetime.now().isoformat()
        }
    except Exception as e:
        return {
            "status": "unhealthy",
            "error": str(e)
        }, 503

Version Your Workflows

class WorkflowV1(ChainableNode):
    VERSION = "1.0"

    async def execute(self, data):
        # V1 logic
        pass

class WorkflowV2(ChainableNode):
    VERSION = "2.0"

    async def execute(self, data):
        # V2 logic with improvements
        pass

# Route based on version
def get_workflow(version: str):
    workflows = {
        "1.0": WorkflowV1(),
        "2.0": WorkflowV2()
    }
    return workflows.get(version, WorkflowV2())

Documentation

Document Your Workflows

class CustomerSupportBot(ChainableNode):
    """
    Automated customer support chatbot workflow.

    This workflow:
    1. Classifies customer intent
    2. Searches knowledge base
    3. Generates personalized response
    4. Escalates to human if needed

    Args:
        data: Dict containing 'message' and 'user_id'

    Returns:
        Dict with 'response', 'intent', and 'escalated' flag

    Example:
        >>> bot = CustomerSupportBot()
        >>> result = await bot.execute({
        ...     "message": "How do I reset my password?",
        ...     "user_id": "user123"
        ... })
    """

    async def execute(self, data):
        # Implementation
        pass

Maintain a Changelog

## Changelog

### [2.0.0] - 2024-01-15
#### Added
- Multi-language support
- Streaming responses
- Advanced error handling

#### Changed
- Improved performance by 40%
- Updated to GPT-4

#### Deprecated
- Legacy sync API

### [1.0.0] - 2023-12-01
- Initial release

Common Pitfalls to Avoid

Always use await for async operations and async def for async functions
Always handle potential errors with try/except or ErrorHandlerNode
Start simple and add complexity only when needed
Always close connections, clean up resources, and manage memory
Write tests for all critical workflows and edge cases

Next Steps