Skip to main content

Overview

Flow Core’s Tools API enables seamless integration of Python functions as LLM-callable tools. With automatic schema inference, type validation, and multi-provider support, you can expose any function to AI models with minimal code.

StructuredTool

Creating Tools from Functions

The simplest way to create a tool is from an existing function:
from nadoo_flow import StructuredTool

def search_database(
    query: str,
    limit: int = 10,
    filters: dict[str, Any] | None = None
) -> list[dict]:
    """Search the knowledge base for relevant documents.

    Args:
        query: The search query string
        limit: Maximum number of results to return
        filters: Optional filters to apply to search

    Returns:
        List of matching documents with metadata
    """
    # Implementation
    return db.search(query, limit, filters)

# Create tool with automatic schema inference
tool = StructuredTool.from_function(
    func=search_database,
    infer_schema=True,      # Auto-generate schema from type hints
    parse_docstring=True    # Parse descriptions from docstring
)

# The tool now has:
# - Pydantic schema for validation
# - OpenAI/Anthropic compatible format
# - Automatic error handling

Manual Tool Creation

For more control, create tools manually:
from pydantic import BaseModel, Field

class SearchArgs(BaseModel):
    """Arguments for database search"""
    query: str = Field(description="Search query")
    limit: int = Field(default=10, ge=1, le=100, description="Max results")
    filters: dict[str, Any] | None = Field(default=None, description="Filters")

tool = StructuredTool(
    node_id="search_tool",
    func=search_database,
    args_schema=SearchArgs,
    description="Search the knowledge base",
    return_direct=False,  # Wrap output in dict
    metadata={"category": "search", "version": "1.0"}
)

Async Tool Support

Tools can handle both sync and async functions:
async def async_search(query: str) -> list[dict]:
    """Async database search"""
    async with get_db() as db:
        return await db.search(query)

# Provide both versions
tool = StructuredTool.from_function(
    func=sync_search,           # Sync version
    coroutine=async_search,     # Async version
    infer_schema=True
)

# Tool automatically uses appropriate version
result = await tool.arun({"query": "AI agents"})  # Uses async
result = tool.run({"query": "AI agents"})         # Uses sync

Tool Registry

Manage collections of tools with the ToolRegistry:
from nadoo_flow import ToolRegistry

# Create registry
registry = ToolRegistry()

# Register tools
registry.register(search_tool)
registry.register(calculator_tool)
registry.register(weather_tool)

# Get tool by name
search = registry.get("search_tool")

# List all tools
tools = registry.list_tools()
for tool in tools:
    print(f"{tool.name}: {tool.description}")

# Remove tool
registry.unregister("weather_tool")

Export for LLM Providers

Convert tools to provider-specific formats:
# OpenAI format
openai_tools = registry.to_openai_tools()
# Returns:
[{
    "type": "function",
    "function": {
        "name": "search_database",
        "description": "Search the knowledge base...",
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Search query"
                },
                "limit": {
                    "type": "integer",
                    "description": "Max results",
                    "default": 10
                }
            },
            "required": ["query"]
        }
    }
}]

# Anthropic format
anthropic_tools = registry.to_anthropic_tools()
# Returns:
[{
    "name": "search_database",
    "description": "Search the knowledge base...",
    "input_schema": {
        "type": "object",
        "properties": {...},
        "required": ["query"]
    }
}]

# Google Gemini format
gemini_tools = registry.to_gemini_tools()

Tool Node Integration

Using Tools in Workflows

Integrate tools into your workflows as nodes:
from nadoo_flow import ToolNode, BaseNode

# Create tool node
search_node = ToolNode(
    node_id="search",
    tool=search_tool,
    input_mapping={"user_query": "query"},  # Map input keys
    output_key="search_results"
)

# Use in workflow
workflow = InputNode() | search_node | FormatNode() | OutputNode()

# Execute
result = await workflow.run({"user_query": "What is RAG?"})

Dynamic Tool Selection

Let LLMs choose which tool to use:
class ToolSelectorNode(BaseNode):
    def __init__(self, tools: list[StructuredTool]):
        super().__init__("tool_selector")
        self.tools = tools
        self.registry = ToolRegistry()
        for tool in tools:
            self.registry.register(tool)

    async def execute(self, node_context, workflow_context):
        query = node_context.input_data["query"]

        # Ask LLM to select appropriate tool
        llm_response = await self.llm.generate(
            prompt=f"Select the best tool for: {query}",
            tools=self.registry.to_openai_tools()
        )

        # Execute selected tool
        tool_name = llm_response.tool_choice.name
        tool_args = llm_response.tool_choice.arguments

        tool = self.registry.get(tool_name)
        result = await tool.arun(tool_args)

        return NodeResult(
            success=True,
            output={"tool_used": tool_name, "result": result}
        )

Advanced Tool Features

Schema Inference

Automatic schema generation from function signatures:
from nadoo_flow.tools import infer_schema_from_function

def complex_function(
    text: str,
    count: int = 5,
    options: list[str] | None = None,
    metadata: dict[str, Any] = {}
) -> dict[str, Any]:
    """Process text with options"""
    pass

# Infer Pydantic schema
schema = infer_schema_from_function(complex_function)

# Schema includes:
# - Type validation
# - Default values
# - Optional handling
# - Nested structures

Docstring Parsing

Extract descriptions from docstrings:
from nadoo_flow.tools import parse_docstring

def well_documented_function(arg1: str, arg2: int) -> str:
    """
    Brief description of the function.

    Detailed explanation of what this function does
    and how it should be used.

    Args:
        arg1: Description of first argument
        arg2: Description of second argument
            with multi-line support

    Returns:
        Description of return value

    Raises:
        ValueError: When input is invalid

    Examples:
        >>> well_documented_function("test", 5)
        "result"
    """
    pass

info = parse_docstring(well_documented_function)
# Returns:
{
    "description": "Brief description of the function.",
    "long_description": "Detailed explanation...",
    "args": {
        "arg1": "Description of first argument",
        "arg2": "Description of second argument with multi-line support"
    },
    "returns": "Description of return value",
    "raises": {"ValueError": "When input is invalid"},
    "examples": ['well_documented_function("test", 5)']
}

Tool Validation

Validate tool inputs before execution:
class ValidatedTool(StructuredTool):
    def validate_input(self, args: dict) -> dict:
        """Custom validation logic"""
        # Run Pydantic validation
        validated = super().validate_input(args)

        # Additional custom validation
        if validated.get("query", "").strip() == "":
            raise ValueError("Query cannot be empty")

        if validated.get("limit", 0) > 1000:
            raise ValueError("Limit cannot exceed 1000")

        return validated

    async def arun(self, args: dict) -> Any:
        # Validation happens automatically
        validated_args = self.validate_input(args)
        return await self.coroutine(**validated_args)

Tool Patterns

Pattern 1: Multi-Step Tools

Tools that perform multiple operations:
class MultiStepTool(StructuredTool):
    def __init__(self):
        super().__init__(
            node_id="multi_step_tool",
            func=self.execute,
            description="Perform multi-step operation"
        )

    async def execute(
        self,
        query: str,
        process: bool = True,
        summarize: bool = True
    ) -> dict:
        results = {}

        # Step 1: Search
        results["search"] = await self.search(query)

        # Step 2: Process (optional)
        if process:
            results["processed"] = await self.process(results["search"])

        # Step 3: Summarize (optional)
        if summarize:
            data = results.get("processed", results["search"])
            results["summary"] = await self.summarize(data)

        return results

Pattern 2: Tool Chains

Chain multiple tools together:
class ToolChain:
    def __init__(self, tools: list[StructuredTool]):
        self.tools = tools

    async def execute(self, initial_input: dict) -> dict:
        """Execute tools in sequence"""
        current_input = initial_input

        for tool in self.tools:
            # Each tool's output becomes next tool's input
            result = await tool.arun(current_input)
            current_input = {"previous_output": result, **current_input}

        return current_input

# Usage
chain = ToolChain([
    search_tool,
    filter_tool,
    format_tool
])

result = await chain.execute({"query": "AI agents"})

Pattern 3: Conditional Tools

Tools with conditional execution:
class ConditionalTool(StructuredTool):
    def __init__(self):
        super().__init__(
            node_id="conditional_tool",
            func=self.execute,
            description="Tool with conditional logic"
        )

    async def execute(
        self,
        action: Literal["search", "calculate", "generate"],
        **kwargs
    ) -> Any:
        """Execute different operations based on action"""

        if action == "search":
            return await self.search_handler(**kwargs)
        elif action == "calculate":
            return await self.calculate_handler(**kwargs)
        elif action == "generate":
            return await self.generate_handler(**kwargs)
        else:
            raise ValueError(f"Unknown action: {action}")

Pattern 4: Retry with Fallback

Tools with built-in retry and fallback:
class ResilientTool(StructuredTool):
    def __init__(self, primary_func, fallback_func, max_retries=3):
        super().__init__(
            node_id="resilient_tool",
            func=self.execute_with_retry,
            description="Tool with retry and fallback"
        )
        self.primary_func = primary_func
        self.fallback_func = fallback_func
        self.max_retries = max_retries

    async def execute_with_retry(self, **kwargs) -> Any:
        """Try primary function, fall back if needed"""
        last_error = None

        # Try primary function with retries
        for attempt in range(self.max_retries):
            try:
                return await self.primary_func(**kwargs)
            except Exception as e:
                last_error = e
                if attempt < self.max_retries - 1:
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff

        # Fall back to alternative
        logger.warning(f"Primary failed, using fallback: {last_error}")
        return await self.fallback_func(**kwargs)

Real-World Examples

Web Search Tool

import httpx
from typing import List, Dict

class WebSearchTool(StructuredTool):
    def __init__(self, api_key: str):
        self.api_key = api_key
        super().__init__(
            node_id="web_search",
            func=self.search,
            description="Search the web for information",
            infer_schema=True
        )

    async def search(
        self,
        query: str,
        max_results: int = 5,
        region: str = "us"
    ) -> List[Dict[str, str]]:
        """
        Search the web using search API.

        Args:
            query: Search query
            max_results: Maximum number of results
            region: Region code for search

        Returns:
            List of search results with title, url, and snippet
        """
        async with httpx.AsyncClient() as client:
            response = await client.get(
                "https://api.search.example.com/search",
                params={
                    "q": query,
                    "count": max_results,
                    "region": region
                },
                headers={"Authorization": f"Bearer {self.api_key}"}
            )
            response.raise_for_status()

            results = response.json()["results"]
            return [
                {
                    "title": r["title"],
                    "url": r["url"],
                    "snippet": r["snippet"]
                }
                for r in results
            ]

Calculator Tool

import ast
import operator

class CalculatorTool(StructuredTool):
    """Safe mathematical expression evaluator"""

    ALLOWED_OPS = {
        ast.Add: operator.add,
        ast.Sub: operator.sub,
        ast.Mult: operator.mul,
        ast.Div: operator.truediv,
        ast.Pow: operator.pow,
        ast.Mod: operator.mod,
        ast.USub: operator.neg
    }

    def __init__(self):
        super().__init__(
            node_id="calculator",
            func=self.calculate,
            description="Perform mathematical calculations",
            infer_schema=True
        )

    def calculate(self, expression: str) -> float:
        """
        Safely evaluate a mathematical expression.

        Args:
            expression: Mathematical expression to evaluate

        Returns:
            Result of the calculation

        Raises:
            ValueError: If expression contains invalid operations
        """
        try:
            node = ast.parse(expression, mode='eval')
            return self._eval_node(node.body)
        except Exception as e:
            raise ValueError(f"Invalid expression: {e}")

    def _eval_node(self, node):
        if isinstance(node, ast.Constant):
            return node.value
        elif isinstance(node, ast.BinOp):
            op = self.ALLOWED_OPS.get(type(node.op))
            if op is None:
                raise ValueError(f"Unsupported operation: {node.op}")
            left = self._eval_node(node.left)
            right = self._eval_node(node.right)
            return op(left, right)
        elif isinstance(node, ast.UnaryOp):
            op = self.ALLOWED_OPS.get(type(node.op))
            if op is None:
                raise ValueError(f"Unsupported operation: {node.op}")
            return op(self._eval_node(node.operand))
        else:
            raise ValueError(f"Unsupported node type: {type(node)}")

File Operations Tool

import aiofiles
from pathlib import Path

class FileOperationsTool(StructuredTool):
    def __init__(self, base_dir: str = "."):
        self.base_dir = Path(base_dir).resolve()
        super().__init__(
            node_id="file_ops",
            func=self.execute,
            description="Perform file operations",
            infer_schema=True
        )

    async def execute(
        self,
        operation: Literal["read", "write", "list", "delete"],
        path: str,
        content: str | None = None
    ) -> dict:
        """
        Perform file operations.

        Args:
            operation: Type of operation to perform
            path: File path relative to base directory
            content: Content for write operation

        Returns:
            Operation result
        """
        # Ensure path is within base directory (security)
        file_path = (self.base_dir / path).resolve()
        if not str(file_path).startswith(str(self.base_dir)):
            raise ValueError("Path outside base directory")

        if operation == "read":
            async with aiofiles.open(file_path, 'r') as f:
                content = await f.read()
            return {"content": content}

        elif operation == "write":
            if content is None:
                raise ValueError("Content required for write")
            async with aiofiles.open(file_path, 'w') as f:
                await f.write(content)
            return {"success": True, "bytes_written": len(content)}

        elif operation == "list":
            files = list(Path(file_path).iterdir())
            return {
                "files": [str(f.relative_to(self.base_dir)) for f in files]
            }

        elif operation == "delete":
            file_path.unlink()
            return {"success": True}

Best Practices

Always use type hints for automatic schema generation:
# Good - clear types
def search(query: str, limit: int = 10) -> list[dict]:
    pass

# Bad - no type information
def search(query, limit=10):
    pass
Use docstrings for better tool descriptions:
def process_data(data: dict) -> dict:
    """Process and validate input data.

    Args:
        data: Input data to process

    Returns:
        Processed and validated data
    """
Return meaningful error messages:
def safe_tool(**kwargs):
    try:
        return process(**kwargs)
    except ValidationError as e:
        return {"error": f"Invalid input: {e}"}
    except Exception as e:
        return {"error": f"Unexpected error: {e}"}
Use Pydantic models for complex validation:
class SearchArgs(BaseModel):
    query: str = Field(min_length=1, max_length=500)
    limit: int = Field(ge=1, le=100)

    @validator('query')
    def clean_query(cls, v):
        return v.strip().lower()

Complete Example

from nadoo_flow import (
    StructuredTool, ToolRegistry, ToolNode,
    BaseNode, NodeResult, WorkflowContext
)
from pydantic import BaseModel, Field
from typing import List, Dict, Any
import asyncio

# Define tools
class WebSearchArgs(BaseModel):
    query: str = Field(description="Search query")
    max_results: int = Field(default=5, ge=1, le=20)

async def web_search(query: str, max_results: int = 5) -> List[Dict]:
    """Search the web for information"""
    # Simulated web search
    await asyncio.sleep(0.5)
    return [
        {"title": f"Result {i}", "url": f"https://example.com/{i}"}
        for i in range(max_results)
    ]

class CalculateArgs(BaseModel):
    expression: str = Field(description="Math expression")

def calculate(expression: str) -> float:
    """Calculate mathematical expressions"""
    # Safe evaluation
    return eval(expression, {"__builtins__": {}}, {})

# Create tools
search_tool = StructuredTool.from_function(
    func=web_search,
    coroutine=web_search,
    args_schema=WebSearchArgs,
    description="Search the web"
)

calc_tool = StructuredTool.from_function(
    func=calculate,
    args_schema=CalculateArgs,
    description="Calculate math"
)

# Create registry
registry = ToolRegistry()
registry.register(search_tool)
registry.register(calc_tool)

# Tool-using workflow
class ToolUserNode(BaseNode):
    def __init__(self, registry: ToolRegistry):
        super().__init__("tool_user")
        self.registry = registry

    async def execute(self, node_context, workflow_context):
        task = node_context.input_data["task"]

        # Determine which tool to use
        if "search" in task.lower():
            tool = self.registry.get("web_search")
            args = {"query": task, "max_results": 3}
        elif "calculate" in task.lower() or "math" in task.lower():
            tool = self.registry.get("calculate")
            # Extract expression from task
            expression = task.split("calculate")[-1].strip()
            args = {"expression": expression}
        else:
            return NodeResult(
                success=False,
                error="No suitable tool found"
            )

        # Execute tool
        result = await tool.arun(args) if tool.coroutine else tool.run(args)

        return NodeResult(
            success=True,
            output={
                "tool_used": tool.node_id,
                "result": result
            }
        )

# Create workflow
workflow = ToolUserNode(registry)

# Test different tasks
async def test_tools():
    tasks = [
        "search for Python tutorials",
        "calculate 15 * 23 + 7",
        "unknown task"
    ]

    for task in tasks:
        result = await workflow.execute(
            NodeContext(node_id="test", input_data={"task": task}),
            WorkflowContext(workflow_id="test_workflow")
        )
        print(f"Task: {task}")
        print(f"Result: {result.output}\n")

# Run
asyncio.run(test_tools())

See Also