Overview
Flow Core’s backend system is designed to be extensible. You can create custom backends to integrate with any workflow orchestration system or implement your own execution logic.Backend Interface
All backends must implement theBaseBackend interface:
Copy
from nadoo_flow.backends import BaseBackend
from typing import Dict, Any, Optional
class CustomBackend(BaseBackend):
"""Custom backend implementation"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""Initialize the backend with configuration"""
super().__init__(config)
self.name = "custom"
async def execute_node(
self,
node: BaseNode,
node_context: NodeContext,
workflow_context: WorkflowContext
) -> NodeResult:
"""Execute a single node"""
# Custom execution logic
return await node.execute(node_context, workflow_context)
async def execute_workflow(
self,
workflow: Workflow,
input_data: Dict[str, Any],
context: Optional[WorkflowContext] = None
) -> Dict[str, Any]:
"""Execute complete workflow"""
# Custom workflow orchestration
pass
def validate_node(self, node: BaseNode) -> bool:
"""Validate if node is compatible with this backend"""
return True
def get_capabilities(self) -> Dict[str, bool]:
"""Return backend capabilities"""
return {
"streaming": True,
"parallel": True,
"async": True,
"distributed": False
}
Implementation Example
Simple Custom Backend
Copy
from nadoo_flow.backends import BaseBackend
from nadoo_flow import NodeResult, NodeStatus
import asyncio
import logging
class LoggingBackend(BaseBackend):
"""Backend that logs all operations"""
def __init__(self, config=None):
super().__init__(config)
self.name = "logging"
self.logger = logging.getLogger(__name__)
async def execute_node(self, node, node_context, workflow_context):
"""Execute node with detailed logging"""
self.logger.info(f"Starting node: {node.node_id}")
self.logger.debug(f"Input: {node_context.input_data}")
try:
# Execute the node
result = await node.execute(node_context, workflow_context)
self.logger.info(f"Node {node.node_id} completed successfully")
self.logger.debug(f"Output: {result.output}")
return result
except Exception as e:
self.logger.error(f"Node {node.node_id} failed: {e}")
return NodeResult(
success=False,
error=str(e)
)
async def execute_workflow(self, workflow, input_data, context=None):
"""Execute workflow with step-by-step logging"""
self.logger.info(f"Starting workflow: {workflow.workflow_id}")
# Create context if not provided
if context is None:
context = WorkflowContext(workflow_id=workflow.workflow_id)
# Execute nodes in sequence
current_data = input_data
for node in workflow.nodes:
node_context = NodeContext(
node_id=node.node_id,
input_data=current_data
)
result = await self.execute_node(node, node_context, context)
if not result.success:
self.logger.error(f"Workflow failed at node {node.node_id}")
raise Exception(result.error)
current_data = result.output
self.logger.info("Workflow completed successfully")
return current_data
Distributed Backend
Copy
class DistributedBackend(BaseBackend):
"""Backend for distributed execution across workers"""
def __init__(self, config=None):
super().__init__(config)
self.name = "distributed"
self.worker_pool = self._init_workers(config)
def _init_workers(self, config):
"""Initialize worker pool"""
num_workers = config.get("num_workers", 4) if config else 4
return WorkerPool(num_workers)
async def execute_node(self, node, node_context, workflow_context):
"""Distribute node execution to worker"""
# Select worker based on load
worker = await self.worker_pool.get_worker()
# Serialize and send to worker
task = {
"node": serialize_node(node),
"context": node_context.to_dict(),
"workflow_context": workflow_context.to_dict()
}
# Execute on worker
result = await worker.execute(task)
return NodeResult.from_dict(result)
async def execute_parallel(self, nodes, contexts, workflow_context):
"""Execute multiple nodes in parallel on different workers"""
tasks = []
for node, context in zip(nodes, contexts):
task = self.worker_pool.submit(
self.execute_node,
node, context, workflow_context
)
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
def get_capabilities(self):
return {
"streaming": False,
"parallel": True,
"async": True,
"distributed": True
}
Integration with External Systems
LangChain Integration
Copy
from langchain.schema.runnable import Runnable
class LangChainBackend(BaseBackend):
"""Backend that wraps LangChain runnables"""
def __init__(self, config=None):
super().__init__(config)
self.name = "langchain"
def node_to_runnable(self, node: BaseNode) -> Runnable:
"""Convert Flow Core node to LangChain runnable"""
class NodeRunnable(Runnable):
def invoke(self, input_data):
# Sync wrapper for async node
import asyncio
context = NodeContext(
node_id=node.node_id,
input_data=input_data
)
result = asyncio.run(
node.execute(context, WorkflowContext())
)
return result.output
async def ainvoke(self, input_data):
context = NodeContext(
node_id=node.node_id,
input_data=input_data
)
result = await node.execute(context, WorkflowContext())
return result.output
return NodeRunnable()
async def execute_workflow(self, workflow, input_data, context=None):
"""Execute as LangChain chain"""
# Convert workflow to chain
chain = None
for node in workflow.nodes:
runnable = self.node_to_runnable(node)
chain = runnable if chain is None else chain | runnable
# Execute chain
return await chain.ainvoke(input_data)
Backend Registration
Register your custom backend:Copy
from nadoo_flow import BackendRegistry
# Create backend instance
custom_backend = LoggingBackend(config={
"log_level": "DEBUG"
})
# Register globally
BackendRegistry.register("logging", custom_backend)
# Use in workflow
workflow = Workflow(backend="logging")
Testing Custom Backends
Copy
import pytest
from nadoo_flow.testing import BackendTestSuite
class TestCustomBackend(BackendTestSuite):
"""Test suite for custom backend"""
@pytest.fixture
def backend(self):
"""Provide backend instance"""
return CustomBackend()
async def test_node_execution(self, backend):
"""Test single node execution"""
node = SimpleNode("test")
context = NodeContext(
node_id="test",
input_data={"value": 42}
)
result = await backend.execute_node(
node, context, WorkflowContext()
)
assert result.success
assert result.output["value"] == 42
async def test_workflow_execution(self, backend):
"""Test complete workflow"""
workflow = create_test_workflow()
result = await backend.execute_workflow(
workflow,
{"input": "test"}
)
assert result["status"] == "completed"
async def test_error_handling(self, backend):
"""Test error scenarios"""
node = FailingNode("fail")
context = NodeContext(node_id="fail", input_data={})
result = await backend.execute_node(
node, context, WorkflowContext()
)
assert not result.success
assert "error" in result.error
Best Practices
Maintain Compatibility
Maintain Compatibility
Ensure your backend works with all standard Flow Core nodes:
Copy
def validate_node(self, node):
# Check node compatibility
required_methods = ["execute", "validate"]
for method in required_methods:
if not hasattr(node, method):
return False
return True
Handle State Properly
Handle State Properly
Manage workflow and node state correctly:
Copy
async def execute_node(self, node, node_context, workflow_context):
# Update context before execution
node_context.status = NodeStatus.RUNNING
workflow_context.current_node_id = node.node_id
result = await node.execute(node_context, workflow_context)
# Update context after execution
node_context.status = NodeStatus.COMPLETED
node_context.output_data = result.output
return result
Implement Error Recovery
Implement Error Recovery
Add retry and fallback mechanisms:
Copy
async def execute_with_retry(self, node, context, max_retries=3):
for attempt in range(max_retries):
try:
return await self.execute_node(node, context)
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
Add Monitoring
Add Monitoring
Include observability features:
Copy
async def execute_node(self, node, context, workflow_context):
start_time = time.time()
try:
result = await super().execute_node(node, context, workflow_context)
self.metrics.record_success(
node.node_id,
time.time() - start_time
)
return result
except Exception as e:
self.metrics.record_failure(node.node_id, str(e))
raise