Overview
The Backend Registry is the central management system for execution backends in Flow Core. It handles registration, discovery, and instantiation of different backend implementations.Registry API
Basic Usage
Copy
from nadoo_flow import BackendRegistry
# Get the default backend (native)
backend = BackendRegistry.get_default()
# Get a specific backend
backend = BackendRegistry.create("native")
# List available backends
backends = BackendRegistry.list_available()
print(backends) # ['native']
Registering Backends
Global Registration
Copy
from nadoo_flow import BackendRegistry, BaseBackend
class CustomBackend(BaseBackend):
"""Custom backend implementation"""
def __init__(self, config=None):
super().__init__(config)
self.name = "custom"
async def execute_node(self, node, node_context, workflow_context):
# Implementation
pass
# Register globally
BackendRegistry.register("custom", CustomBackend)
# Now available everywhere
backend = BackendRegistry.create("custom")
Factory Registration
Register a factory function for lazy initialization:Copy
def create_custom_backend(config):
"""Factory function for backend creation"""
return CustomBackend(config)
BackendRegistry.register_factory("custom", create_custom_backend)
# Backend created on demand
backend = BackendRegistry.create("custom", config={"debug": True})
Class Registration
Register a backend class directly:Copy
@BackendRegistry.register_class("enhanced")
class EnhancedBackend(BaseBackend):
"""Auto-registered backend"""
def __init__(self, config=None):
super().__init__(config)
self.name = "enhanced"
Configuration Management
Backend Configuration
Copy
# Configure backend at creation
backend = BackendRegistry.create(
"native",
config={
"max_retries": 3,
"timeout": 60,
"debug": True
}
)
# Global configuration
BackendRegistry.configure_default({
"backend": "native",
"config": {
"max_workers": 4,
"enable_caching": True
}
})
Environment-based Configuration
Copy
import os
class BackendConfig:
"""Environment-aware configuration"""
@staticmethod
def from_env():
return {
"backend": os.getenv("NADOO_BACKEND", "native"),
"config": {
"debug": os.getenv("NADOO_DEBUG", "false").lower() == "true",
"max_workers": int(os.getenv("NADOO_MAX_WORKERS", "4"))
}
}
# Apply configuration
config = BackendConfig.from_env()
BackendRegistry.configure_default(config)
Backend Discovery
Plugin System
Copy
from nadoo_flow.backends import BackendPlugin
class PluginRegistry:
"""Discover and load backend plugins"""
@staticmethod
def discover_plugins(path="backends/plugins"):
"""Scan directory for backend plugins"""
import importlib
import pkgutil
plugins = []
for finder, name, ispkg in pkgutil.iter_modules([path]):
module = importlib.import_module(f"backends.plugins.{name}")
if hasattr(module, "Backend"):
plugins.append(module.Backend)
return plugins
@staticmethod
def register_plugins():
"""Auto-register discovered plugins"""
for backend_cls in PluginRegistry.discover_plugins():
if hasattr(backend_cls, "backend_name"):
BackendRegistry.register(
backend_cls.backend_name,
backend_cls
)
Entry Points
Use setuptools entry points for plugin discovery:Copy
# In setup.py or pyproject.toml
[project.entry-points."nadoo.backends"]
ray = "nadoo_ray:RayBackend"
dask = "nadoo_dask:DaskBackend"
celery = "nadoo_celery:CeleryBackend"
Copy
def load_entry_point_backends():
"""Load backends from entry points"""
import importlib.metadata
eps = importlib.metadata.entry_points()
backend_eps = eps.get("nadoo.backends", [])
for ep in backend_eps:
backend_cls = ep.load()
BackendRegistry.register(ep.name, backend_cls)
Backend Selection
Strategy Pattern
Copy
class BackendSelector:
"""Select appropriate backend based on context"""
@staticmethod
def select(workflow_type, requirements):
"""Choose backend based on requirements"""
if requirements.get("distributed"):
if BackendRegistry.is_available("ray"):
return "ray"
elif BackendRegistry.is_available("dask"):
return "dask"
if requirements.get("streaming"):
if BackendRegistry.is_available("native"):
return "native"
# Default fallback
return BackendRegistry.get_default_name()
# Usage
requirements = {
"distributed": True,
"streaming": False,
"gpu": False
}
backend_name = BackendSelector.select("batch_processing", requirements)
backend = BackendRegistry.create(backend_name)
Capability Matching
Copy
def find_backend_by_capabilities(required_capabilities):
"""Find backend matching required capabilities"""
for backend_name in BackendRegistry.list_available():
backend = BackendRegistry.create(backend_name)
capabilities = backend.get_capabilities()
if all(capabilities.get(cap) for cap in required_capabilities):
return backend_name
return None
# Find backend with specific capabilities
backend_name = find_backend_by_capabilities([
"streaming",
"parallel",
"async"
])
Monitoring and Metrics
Backend Metrics
Copy
class BackendMetrics:
"""Track backend usage and performance"""
def __init__(self):
self.metrics = {}
def record_execution(self, backend_name, duration, success):
"""Record execution metrics"""
if backend_name not in self.metrics:
self.metrics[backend_name] = {
"total_executions": 0,
"successful": 0,
"failed": 0,
"total_duration": 0
}
self.metrics[backend_name]["total_executions"] += 1
self.metrics[backend_name]["total_duration"] += duration
if success:
self.metrics[backend_name]["successful"] += 1
else:
self.metrics[backend_name]["failed"] += 1
def get_stats(self, backend_name):
"""Get backend statistics"""
if backend_name not in self.metrics:
return None
stats = self.metrics[backend_name]
return {
"total": stats["total_executions"],
"success_rate": stats["successful"] / stats["total_executions"],
"avg_duration": stats["total_duration"] / stats["total_executions"]
}
# Global metrics
metrics = BackendMetrics()
# Wrap backend execution
async def execute_with_metrics(backend, node, context):
start = time.time()
try:
result = await backend.execute_node(node, context)
duration = time.time() - start
metrics.record_execution(backend.name, duration, result.success)
return result
except Exception as e:
duration = time.time() - start
metrics.record_execution(backend.name, duration, False)
raise
Testing Backends
Mock Backend for Testing
Copy
class MockBackend(BaseBackend):
"""Mock backend for testing"""
def __init__(self, config=None):
super().__init__(config)
self.name = "mock"
self.executed_nodes = []
async def execute_node(self, node, node_context, workflow_context):
"""Record execution for testing"""
self.executed_nodes.append({
"node_id": node.node_id,
"input": node_context.input_data
})
return NodeResult(
success=True,
output={"mocked": True}
)
# Use in tests
@pytest.fixture
def mock_backend():
backend = MockBackend()
BackendRegistry.register("mock", backend)
return backend
async def test_workflow_execution(mock_backend):
workflow = create_test_workflow()
workflow.backend = "mock"
result = await workflow.execute({"input": "test"})
assert len(mock_backend.executed_nodes) == 3
assert mock_backend.executed_nodes[0]["node_id"] == "node1"
Backend Validation
Copy
class BackendValidator:
"""Validate backend implementations"""
@staticmethod
async def validate(backend):
"""Run validation suite"""
errors = []
# Check required methods
required_methods = [
"execute_node",
"execute_workflow",
"get_capabilities"
]
for method in required_methods:
if not hasattr(backend, method):
errors.append(f"Missing required method: {method}")
# Test execution
try:
test_node = TestNode()
test_context = NodeContext(
node_id="test",
input_data={"test": True}
)
result = await backend.execute_node(
test_node,
test_context,
WorkflowContext()
)
if not isinstance(result, NodeResult):
errors.append("execute_node must return NodeResult")
except Exception as e:
errors.append(f"Execution test failed: {e}")
return errors
# Validate on registration
async def register_with_validation(name, backend_cls):
backend = backend_cls()
errors = await BackendValidator.validate(backend)
if errors:
raise ValueError(f"Backend validation failed: {errors}")
BackendRegistry.register(name, backend_cls)
Advanced Features
Backend Chaining
Copy
class ChainedBackend(BaseBackend):
"""Execute through multiple backends"""
def __init__(self, backends):
super().__init__()
self.backends = backends
async def execute_node(self, node, node_context, workflow_context):
"""Execute through backend chain"""
current_context = node_context
for backend in self.backends:
result = await backend.execute_node(
node,
current_context,
workflow_context
)
if not result.success:
return result
# Pass output to next backend
current_context = NodeContext(
node_id=node.node_id,
input_data=result.output
)
return result
# Usage
logging_backend = LoggingBackend()
native_backend = BackendRegistry.create("native")
chained = ChainedBackend([logging_backend, native_backend])
BackendRegistry.register("chained", chained)
Backend Pooling
Copy
class BackendPool:
"""Pool of backend instances"""
def __init__(self, backend_name, pool_size=5):
self.backends = [
BackendRegistry.create(backend_name)
for _ in range(pool_size)
]
self.current = 0
def get_next(self):
"""Round-robin backend selection"""
backend = self.backends[self.current]
self.current = (self.current + 1) % len(self.backends)
return backend
async def execute_parallel(self, nodes):
"""Execute nodes in parallel across pool"""
tasks = []
for i, node in enumerate(nodes):
backend = self.backends[i % len(self.backends)]
task = backend.execute_node(node, ...)
tasks.append(task)
return await asyncio.gather(*tasks)
Best Practices
Registry Initialization
Registry Initialization
Initialize the registry early in your application:
Copy
# In __init__.py or main.py
def initialize_backends():
# Register custom backends
BackendRegistry.register("custom", CustomBackend)
# Load plugins
PluginRegistry.register_plugins()
# Load entry points
load_entry_point_backends()
# Set default
BackendRegistry.set_default("native")
# Call on startup
initialize_backends()
Error Handling
Error Handling
Handle backend registration errors gracefully:
Copy
def safe_register(name, backend_cls):
try:
BackendRegistry.register(name, backend_cls)
logger.info(f"Registered backend: {name}")
except Exception as e:
logger.error(f"Failed to register {name}: {e}")
# Use fallback
return False
return True
Version Compatibility
Version Compatibility
Check backend version compatibility:
Copy
def check_compatibility(backend):
required_version = "1.0.0"
if hasattr(backend, "__version__"):
from packaging import version
if version.parse(backend.__version__) < version.parse(required_version):
raise ValueError(f"Backend version {backend.__version__} is too old")