Skip to main content

Overview

Flow Core’s multi-backend architecture allows you to swap execution engines while maintaining the same workflow definitions. This provides flexibility to choose the best backend for your use case.
Currently only the native backend is implemented. LangGraph, CrewAI, and A2A backends are planned for future releases.

Backend Architecture

Available Backends

Native Backend (Available)

The default Flow Core execution engine:
from nadoo_flow.backends import BackendRegistry

# Use native backend (default)
backend = BackendRegistry.create("native")
workflow = backend.create_workflow(config)
result = await workflow.run(input_data)
Features:
  • Full async/await support
  • Built-in resilience patterns
  • Streaming capabilities
  • Memory management
  • Native Python execution

LangGraph Backend (Coming Soon)

Integration with LangChain’s LangGraph:
# Future implementation
backend = BackendRegistry.create("langgraph")
workflow = backend.create_workflow(config)
Planned Features:
  • LangChain ecosystem integration
  • LCEL compatibility
  • LangSmith tracing
  • Built-in agent tools

CrewAI Backend (Coming Soon)

Integration with CrewAI framework:
# Future implementation
backend = BackendRegistry.create("crewai")
workflow = backend.create_workflow(config)
Planned Features:
  • Multi-agent orchestration
  • Role-based agents
  • Task delegation
  • Crew collaboration

A2A Backend (Coming Soon)

Agent-to-Agent communication backend:
# Future implementation
backend = BackendRegistry.create("a2a")
workflow = backend.create_workflow(config)
Planned Features:
  • Inter-agent messaging
  • Distributed execution
  • Agent discovery
  • Protocol negotiation

Backend Protocol

All backends implement the same protocol:
from typing import Protocol

class IBackend(Protocol):
    """Backend protocol interface"""

    async def create_workflow(self, config: dict) -> IWorkflow:
        """Create workflow from configuration"""
        ...

    async def execute(
        self,
        workflow: IWorkflow,
        input_data: dict
    ) -> ExecutionResult:
        """Execute workflow"""
        ...

    async def validate(self, workflow: IWorkflow) -> bool:
        """Validate workflow"""
        ...

Backend Selection

Choosing a Backend

BackendUse CasePerformanceEcosystem
NativeGeneral purpose, productionHighFlow Core
LangGraphLLM workflowsMediumLangChain
CrewAIMulti-agent systemsMediumCrewAI
A2ADistributed agentsVariableCustom

Configuration

# Backend-specific configuration
config = {
    "backend": "native",
    "backend_config": {
        "max_parallel": 10,
        "timeout": 300,
        "retry_strategy": "exponential"
    }
}

backend = BackendRegistry.create(
    config["backend"],
    **config["backend_config"]
)

Backend Registry

Registration

from nadoo_flow.backends import BackendRegistry

# Register custom backend
BackendRegistry.register(
    name="custom",
    backend_class=CustomBackend
)

# List available backends
backends = BackendRegistry.list_backends()
print(backends)  # ["native", "custom"]

Discovery

# Get backend information
info = BackendRegistry.get_info("native")
print(info)
# {
#     "name": "native",
#     "version": "0.1.0",
#     "features": ["async", "streaming", "memory"],
#     "status": "stable"
# }

Creating Custom Backends

Backend Implementation

from nadoo_flow.backends import BaseBackend

class CustomBackend(BaseBackend):
    """Custom backend implementation"""

    def __init__(self, config=None):
        super().__init__(config)
        self.engine = CustomEngine()

    async def create_workflow(self, config: dict):
        """Create workflow using custom engine"""
        return CustomWorkflow(config, self.engine)

    async def execute(self, workflow, input_data):
        """Execute using custom engine"""
        return await self.engine.run(workflow, input_data)

    async def validate(self, workflow):
        """Validate workflow for custom engine"""
        return self.engine.validate(workflow)

Workflow Adapter

class CustomWorkflow:
    """Adapt Flow Core workflow to custom engine"""

    def __init__(self, config, engine):
        self.config = config
        self.engine = engine
        self.nodes = self._convert_nodes(config["nodes"])

    def _convert_nodes(self, nodes):
        """Convert Flow Core nodes to engine format"""
        converted = []
        for node in nodes:
            converted.append(self._adapt_node(node))
        return converted

    async def run(self, input_data):
        """Execute workflow"""
        return await self.engine.execute(
            self.nodes,
            input_data
        )

Backend Compatibility

Feature Matrix

FeatureNativeLangGraphCrewAIA2A
Async Execution🔄🔄🔄
Streaming🔄🔄🔄
Memory🔄🔄🔄
Parallel🔄🔄🔄
Callbacks🔄🔄🔄
Resilience🔄🔄🔄
✅ Available | 🔄 Planned | ❌ Not Supported

Migration Between Backends

# Workflow definition remains the same
workflow_config = {
    "nodes": [...],
    "edges": [...],
    "config": {...}
}

# Switch backends without changing workflow
native_backend = BackendRegistry.create("native")
native_result = await native_backend.execute(workflow_config, data)

# Future: Same workflow, different backend
# langgraph_backend = BackendRegistry.create("langgraph")
# langgraph_result = await langgraph_backend.execute(workflow_config, data)

Performance Comparison

Benchmark Results

# Benchmark different backends
async def benchmark_backends(workflow, input_data):
    results = {}

    for backend_name in BackendRegistry.list_backends():
        backend = BackendRegistry.create(backend_name)

        start = time.time()
        result = await backend.execute(workflow, input_data)
        duration = time.time() - start

        results[backend_name] = {
            "duration": duration,
            "success": result.success,
            "memory_used": get_memory_usage()
        }

    return results

Configuration Examples

Native Backend Config

native_config = {
    "backend": "native",
    "execution": {
        "max_parallel_nodes": 10,
        "timeout_seconds": 300,
        "retry_failed_nodes": True,
        "enable_checkpoints": True
    },
    "memory": {
        "type": "conversation",
        "max_messages": 100
    },
    "streaming": {
        "enabled": True,
        "buffer_size": 100
    }
}

Future Backend Configs

# LangGraph configuration (planned)
langgraph_config = {
    "backend": "langgraph",
    "llm": {
        "model": "gpt-4",
        "temperature": 0.7
    },
    "tools": ["search", "calculator"],
    "memory": "conversation_buffer"
}

# CrewAI configuration (planned)
crewai_config = {
    "backend": "crewai",
    "agents": [
        {"role": "researcher", "goal": "find information"},
        {"role": "writer", "goal": "create content"}
    ],
    "process": "sequential"
}

Best Practices

Begin development with the native backend for full feature support and migrate later if needed.
Write backend-agnostic workflows using the common protocol interface.
When multiple backends are available, test your workflows across them.
Different backends have different performance characteristics - monitor and optimize.

Roadmap

Q1 2024

  • ✅ Native backend stabilization
  • 🔄 LangGraph backend development

Q2 2024

  • 🔄 CrewAI backend integration
  • 🔄 Backend benchmarking suite

Q3 2024

  • 🔄 A2A backend prototype
  • 🔄 Distributed execution support

Next Steps