Skip to main content

Overview

Flow Core’s backend system is designed to be extensible. You can create custom backends to integrate with any workflow orchestration system or implement your own execution logic.

Backend Interface

All backends must implement the BaseBackend interface:
from nadoo_flow.backends import BaseBackend
from typing import Dict, Any, Optional

class CustomBackend(BaseBackend):
    """Custom backend implementation"""

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """Initialize the backend with configuration"""
        super().__init__(config)
        self.name = "custom"

    async def execute_node(
        self,
        node: BaseNode,
        node_context: NodeContext,
        workflow_context: WorkflowContext
    ) -> NodeResult:
        """Execute a single node"""
        # Custom execution logic
        return await node.execute(node_context, workflow_context)

    async def execute_workflow(
        self,
        workflow: Workflow,
        input_data: Dict[str, Any],
        context: Optional[WorkflowContext] = None
    ) -> Dict[str, Any]:
        """Execute complete workflow"""
        # Custom workflow orchestration
        pass

    def validate_node(self, node: BaseNode) -> bool:
        """Validate if node is compatible with this backend"""
        return True

    def get_capabilities(self) -> Dict[str, bool]:
        """Return backend capabilities"""
        return {
            "streaming": True,
            "parallel": True,
            "async": True,
            "distributed": False
        }

Implementation Example

Simple Custom Backend

from nadoo_flow.backends import BaseBackend
from nadoo_flow import NodeResult, NodeStatus
import asyncio
import logging

class LoggingBackend(BaseBackend):
    """Backend that logs all operations"""

    def __init__(self, config=None):
        super().__init__(config)
        self.name = "logging"
        self.logger = logging.getLogger(__name__)

    async def execute_node(self, node, node_context, workflow_context):
        """Execute node with detailed logging"""
        self.logger.info(f"Starting node: {node.node_id}")
        self.logger.debug(f"Input: {node_context.input_data}")

        try:
            # Execute the node
            result = await node.execute(node_context, workflow_context)

            self.logger.info(f"Node {node.node_id} completed successfully")
            self.logger.debug(f"Output: {result.output}")

            return result

        except Exception as e:
            self.logger.error(f"Node {node.node_id} failed: {e}")
            return NodeResult(
                success=False,
                error=str(e)
            )

    async def execute_workflow(self, workflow, input_data, context=None):
        """Execute workflow with step-by-step logging"""
        self.logger.info(f"Starting workflow: {workflow.workflow_id}")

        # Create context if not provided
        if context is None:
            context = WorkflowContext(workflow_id=workflow.workflow_id)

        # Execute nodes in sequence
        current_data = input_data
        for node in workflow.nodes:
            node_context = NodeContext(
                node_id=node.node_id,
                input_data=current_data
            )

            result = await self.execute_node(node, node_context, context)

            if not result.success:
                self.logger.error(f"Workflow failed at node {node.node_id}")
                raise Exception(result.error)

            current_data = result.output

        self.logger.info("Workflow completed successfully")
        return current_data

Distributed Backend

class DistributedBackend(BaseBackend):
    """Backend for distributed execution across workers"""

    def __init__(self, config=None):
        super().__init__(config)
        self.name = "distributed"
        self.worker_pool = self._init_workers(config)

    def _init_workers(self, config):
        """Initialize worker pool"""
        num_workers = config.get("num_workers", 4) if config else 4
        return WorkerPool(num_workers)

    async def execute_node(self, node, node_context, workflow_context):
        """Distribute node execution to worker"""
        # Select worker based on load
        worker = await self.worker_pool.get_worker()

        # Serialize and send to worker
        task = {
            "node": serialize_node(node),
            "context": node_context.to_dict(),
            "workflow_context": workflow_context.to_dict()
        }

        # Execute on worker
        result = await worker.execute(task)

        return NodeResult.from_dict(result)

    async def execute_parallel(self, nodes, contexts, workflow_context):
        """Execute multiple nodes in parallel on different workers"""
        tasks = []

        for node, context in zip(nodes, contexts):
            task = self.worker_pool.submit(
                self.execute_node,
                node, context, workflow_context
            )
            tasks.append(task)

        results = await asyncio.gather(*tasks)
        return results

    def get_capabilities(self):
        return {
            "streaming": False,
            "parallel": True,
            "async": True,
            "distributed": True
        }

Integration with External Systems

LangChain Integration

from langchain.schema.runnable import Runnable

class LangChainBackend(BaseBackend):
    """Backend that wraps LangChain runnables"""

    def __init__(self, config=None):
        super().__init__(config)
        self.name = "langchain"

    def node_to_runnable(self, node: BaseNode) -> Runnable:
        """Convert Flow Core node to LangChain runnable"""
        class NodeRunnable(Runnable):
            def invoke(self, input_data):
                # Sync wrapper for async node
                import asyncio
                context = NodeContext(
                    node_id=node.node_id,
                    input_data=input_data
                )
                result = asyncio.run(
                    node.execute(context, WorkflowContext())
                )
                return result.output

            async def ainvoke(self, input_data):
                context = NodeContext(
                    node_id=node.node_id,
                    input_data=input_data
                )
                result = await node.execute(context, WorkflowContext())
                return result.output

        return NodeRunnable()

    async def execute_workflow(self, workflow, input_data, context=None):
        """Execute as LangChain chain"""
        # Convert workflow to chain
        chain = None
        for node in workflow.nodes:
            runnable = self.node_to_runnable(node)
            chain = runnable if chain is None else chain | runnable

        # Execute chain
        return await chain.ainvoke(input_data)

Backend Registration

Register your custom backend:
from nadoo_flow import BackendRegistry

# Create backend instance
custom_backend = LoggingBackend(config={
    "log_level": "DEBUG"
})

# Register globally
BackendRegistry.register("logging", custom_backend)

# Use in workflow
workflow = Workflow(backend="logging")

Testing Custom Backends

import pytest
from nadoo_flow.testing import BackendTestSuite

class TestCustomBackend(BackendTestSuite):
    """Test suite for custom backend"""

    @pytest.fixture
    def backend(self):
        """Provide backend instance"""
        return CustomBackend()

    async def test_node_execution(self, backend):
        """Test single node execution"""
        node = SimpleNode("test")
        context = NodeContext(
            node_id="test",
            input_data={"value": 42}
        )

        result = await backend.execute_node(
            node, context, WorkflowContext()
        )

        assert result.success
        assert result.output["value"] == 42

    async def test_workflow_execution(self, backend):
        """Test complete workflow"""
        workflow = create_test_workflow()
        result = await backend.execute_workflow(
            workflow,
            {"input": "test"}
        )

        assert result["status"] == "completed"

    async def test_error_handling(self, backend):
        """Test error scenarios"""
        node = FailingNode("fail")
        context = NodeContext(node_id="fail", input_data={})

        result = await backend.execute_node(
            node, context, WorkflowContext()
        )

        assert not result.success
        assert "error" in result.error

Best Practices

Ensure your backend works with all standard Flow Core nodes:
def validate_node(self, node):
    # Check node compatibility
    required_methods = ["execute", "validate"]
    for method in required_methods:
        if not hasattr(node, method):
            return False
    return True
Manage workflow and node state correctly:
async def execute_node(self, node, node_context, workflow_context):
    # Update context before execution
    node_context.status = NodeStatus.RUNNING
    workflow_context.current_node_id = node.node_id

    result = await node.execute(node_context, workflow_context)

    # Update context after execution
    node_context.status = NodeStatus.COMPLETED
    node_context.output_data = result.output

    return result
Add retry and fallback mechanisms:
async def execute_with_retry(self, node, context, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await self.execute_node(node, context)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)
Include observability features:
async def execute_node(self, node, context, workflow_context):
    start_time = time.time()

    try:
        result = await super().execute_node(node, context, workflow_context)
        self.metrics.record_success(
            node.node_id,
            time.time() - start_time
        )
        return result
    except Exception as e:
        self.metrics.record_failure(node.node_id, str(e))
        raise

See Also