Skip to main content

Overview

The Backend Registry is the central management system for execution backends in Flow Core. It handles registration, discovery, and instantiation of different backend implementations.

Registry API

Basic Usage

from nadoo_flow import BackendRegistry

# Get the default backend (native)
backend = BackendRegistry.get_default()

# Get a specific backend
backend = BackendRegistry.create("native")

# List available backends
backends = BackendRegistry.list_available()
print(backends)  # ['native']

Registering Backends

Global Registration

from nadoo_flow import BackendRegistry, BaseBackend

class CustomBackend(BaseBackend):
    """Custom backend implementation"""

    def __init__(self, config=None):
        super().__init__(config)
        self.name = "custom"

    async def execute_node(self, node, node_context, workflow_context):
        # Implementation
        pass

# Register globally
BackendRegistry.register("custom", CustomBackend)

# Now available everywhere
backend = BackendRegistry.create("custom")

Factory Registration

Register a factory function for lazy initialization:
def create_custom_backend(config):
    """Factory function for backend creation"""
    return CustomBackend(config)

BackendRegistry.register_factory("custom", create_custom_backend)

# Backend created on demand
backend = BackendRegistry.create("custom", config={"debug": True})

Class Registration

Register a backend class directly:
@BackendRegistry.register_class("enhanced")
class EnhancedBackend(BaseBackend):
    """Auto-registered backend"""

    def __init__(self, config=None):
        super().__init__(config)
        self.name = "enhanced"

Configuration Management

Backend Configuration

# Configure backend at creation
backend = BackendRegistry.create(
    "native",
    config={
        "max_retries": 3,
        "timeout": 60,
        "debug": True
    }
)

# Global configuration
BackendRegistry.configure_default({
    "backend": "native",
    "config": {
        "max_workers": 4,
        "enable_caching": True
    }
})

Environment-based Configuration

import os

class BackendConfig:
    """Environment-aware configuration"""

    @staticmethod
    def from_env():
        return {
            "backend": os.getenv("NADOO_BACKEND", "native"),
            "config": {
                "debug": os.getenv("NADOO_DEBUG", "false").lower() == "true",
                "max_workers": int(os.getenv("NADOO_MAX_WORKERS", "4"))
            }
        }

# Apply configuration
config = BackendConfig.from_env()
BackendRegistry.configure_default(config)

Backend Discovery

Plugin System

from nadoo_flow.backends import BackendPlugin

class PluginRegistry:
    """Discover and load backend plugins"""

    @staticmethod
    def discover_plugins(path="backends/plugins"):
        """Scan directory for backend plugins"""
        import importlib
        import pkgutil

        plugins = []
        for finder, name, ispkg in pkgutil.iter_modules([path]):
            module = importlib.import_module(f"backends.plugins.{name}")
            if hasattr(module, "Backend"):
                plugins.append(module.Backend)

        return plugins

    @staticmethod
    def register_plugins():
        """Auto-register discovered plugins"""
        for backend_cls in PluginRegistry.discover_plugins():
            if hasattr(backend_cls, "backend_name"):
                BackendRegistry.register(
                    backend_cls.backend_name,
                    backend_cls
                )

Entry Points

Use setuptools entry points for plugin discovery:
# In setup.py or pyproject.toml
[project.entry-points."nadoo.backends"]
ray = "nadoo_ray:RayBackend"
dask = "nadoo_dask:DaskBackend"
celery = "nadoo_celery:CeleryBackend"
Load entry point backends:
def load_entry_point_backends():
    """Load backends from entry points"""
    import importlib.metadata

    eps = importlib.metadata.entry_points()
    backend_eps = eps.get("nadoo.backends", [])

    for ep in backend_eps:
        backend_cls = ep.load()
        BackendRegistry.register(ep.name, backend_cls)

Backend Selection

Strategy Pattern

class BackendSelector:
    """Select appropriate backend based on context"""

    @staticmethod
    def select(workflow_type, requirements):
        """Choose backend based on requirements"""

        if requirements.get("distributed"):
            if BackendRegistry.is_available("ray"):
                return "ray"
            elif BackendRegistry.is_available("dask"):
                return "dask"

        if requirements.get("streaming"):
            if BackendRegistry.is_available("native"):
                return "native"

        # Default fallback
        return BackendRegistry.get_default_name()

# Usage
requirements = {
    "distributed": True,
    "streaming": False,
    "gpu": False
}

backend_name = BackendSelector.select("batch_processing", requirements)
backend = BackendRegistry.create(backend_name)

Capability Matching

def find_backend_by_capabilities(required_capabilities):
    """Find backend matching required capabilities"""

    for backend_name in BackendRegistry.list_available():
        backend = BackendRegistry.create(backend_name)
        capabilities = backend.get_capabilities()

        if all(capabilities.get(cap) for cap in required_capabilities):
            return backend_name

    return None

# Find backend with specific capabilities
backend_name = find_backend_by_capabilities([
    "streaming",
    "parallel",
    "async"
])

Monitoring and Metrics

Backend Metrics

class BackendMetrics:
    """Track backend usage and performance"""

    def __init__(self):
        self.metrics = {}

    def record_execution(self, backend_name, duration, success):
        """Record execution metrics"""
        if backend_name not in self.metrics:
            self.metrics[backend_name] = {
                "total_executions": 0,
                "successful": 0,
                "failed": 0,
                "total_duration": 0
            }

        self.metrics[backend_name]["total_executions"] += 1
        self.metrics[backend_name]["total_duration"] += duration

        if success:
            self.metrics[backend_name]["successful"] += 1
        else:
            self.metrics[backend_name]["failed"] += 1

    def get_stats(self, backend_name):
        """Get backend statistics"""
        if backend_name not in self.metrics:
            return None

        stats = self.metrics[backend_name]
        return {
            "total": stats["total_executions"],
            "success_rate": stats["successful"] / stats["total_executions"],
            "avg_duration": stats["total_duration"] / stats["total_executions"]
        }

# Global metrics
metrics = BackendMetrics()

# Wrap backend execution
async def execute_with_metrics(backend, node, context):
    start = time.time()
    try:
        result = await backend.execute_node(node, context)
        duration = time.time() - start
        metrics.record_execution(backend.name, duration, result.success)
        return result
    except Exception as e:
        duration = time.time() - start
        metrics.record_execution(backend.name, duration, False)
        raise

Testing Backends

Mock Backend for Testing

class MockBackend(BaseBackend):
    """Mock backend for testing"""

    def __init__(self, config=None):
        super().__init__(config)
        self.name = "mock"
        self.executed_nodes = []

    async def execute_node(self, node, node_context, workflow_context):
        """Record execution for testing"""
        self.executed_nodes.append({
            "node_id": node.node_id,
            "input": node_context.input_data
        })

        return NodeResult(
            success=True,
            output={"mocked": True}
        )

# Use in tests
@pytest.fixture
def mock_backend():
    backend = MockBackend()
    BackendRegistry.register("mock", backend)
    return backend

async def test_workflow_execution(mock_backend):
    workflow = create_test_workflow()
    workflow.backend = "mock"

    result = await workflow.execute({"input": "test"})

    assert len(mock_backend.executed_nodes) == 3
    assert mock_backend.executed_nodes[0]["node_id"] == "node1"

Backend Validation

class BackendValidator:
    """Validate backend implementations"""

    @staticmethod
    async def validate(backend):
        """Run validation suite"""
        errors = []

        # Check required methods
        required_methods = [
            "execute_node",
            "execute_workflow",
            "get_capabilities"
        ]

        for method in required_methods:
            if not hasattr(backend, method):
                errors.append(f"Missing required method: {method}")

        # Test execution
        try:
            test_node = TestNode()
            test_context = NodeContext(
                node_id="test",
                input_data={"test": True}
            )

            result = await backend.execute_node(
                test_node,
                test_context,
                WorkflowContext()
            )

            if not isinstance(result, NodeResult):
                errors.append("execute_node must return NodeResult")

        except Exception as e:
            errors.append(f"Execution test failed: {e}")

        return errors

# Validate on registration
async def register_with_validation(name, backend_cls):
    backend = backend_cls()
    errors = await BackendValidator.validate(backend)

    if errors:
        raise ValueError(f"Backend validation failed: {errors}")

    BackendRegistry.register(name, backend_cls)

Advanced Features

Backend Chaining

class ChainedBackend(BaseBackend):
    """Execute through multiple backends"""

    def __init__(self, backends):
        super().__init__()
        self.backends = backends

    async def execute_node(self, node, node_context, workflow_context):
        """Execute through backend chain"""
        current_context = node_context

        for backend in self.backends:
            result = await backend.execute_node(
                node,
                current_context,
                workflow_context
            )

            if not result.success:
                return result

            # Pass output to next backend
            current_context = NodeContext(
                node_id=node.node_id,
                input_data=result.output
            )

        return result

# Usage
logging_backend = LoggingBackend()
native_backend = BackendRegistry.create("native")

chained = ChainedBackend([logging_backend, native_backend])
BackendRegistry.register("chained", chained)

Backend Pooling

class BackendPool:
    """Pool of backend instances"""

    def __init__(self, backend_name, pool_size=5):
        self.backends = [
            BackendRegistry.create(backend_name)
            for _ in range(pool_size)
        ]
        self.current = 0

    def get_next(self):
        """Round-robin backend selection"""
        backend = self.backends[self.current]
        self.current = (self.current + 1) % len(self.backends)
        return backend

    async def execute_parallel(self, nodes):
        """Execute nodes in parallel across pool"""
        tasks = []
        for i, node in enumerate(nodes):
            backend = self.backends[i % len(self.backends)]
            task = backend.execute_node(node, ...)
            tasks.append(task)

        return await asyncio.gather(*tasks)

Best Practices

Initialize the registry early in your application:
# In __init__.py or main.py
def initialize_backends():
    # Register custom backends
    BackendRegistry.register("custom", CustomBackend)

    # Load plugins
    PluginRegistry.register_plugins()

    # Load entry points
    load_entry_point_backends()

    # Set default
    BackendRegistry.set_default("native")

# Call on startup
initialize_backends()
Handle backend registration errors gracefully:
def safe_register(name, backend_cls):
    try:
        BackendRegistry.register(name, backend_cls)
        logger.info(f"Registered backend: {name}")
    except Exception as e:
        logger.error(f"Failed to register {name}: {e}")
        # Use fallback
        return False
    return True
Check backend version compatibility:
def check_compatibility(backend):
    required_version = "1.0.0"
    if hasattr(backend, "__version__"):
        from packaging import version
        if version.parse(backend.__version__) < version.parse(required_version):
            raise ValueError(f"Backend version {backend.__version__} is too old")

See Also