Overview
Flow Core’s Tools API enables seamless integration of Python functions as LLM-callable tools. With automatic schema inference, type validation, and multi-provider support, you can expose any function to AI models with minimal code.StructuredTool
Creating Tools from Functions
The simplest way to create a tool is from an existing function:Copy
from nadoo_flow import StructuredTool
def search_database(
query: str,
limit: int = 10,
filters: dict[str, Any] | None = None
) -> list[dict]:
"""Search the knowledge base for relevant documents.
Args:
query: The search query string
limit: Maximum number of results to return
filters: Optional filters to apply to search
Returns:
List of matching documents with metadata
"""
# Implementation
return db.search(query, limit, filters)
# Create tool with automatic schema inference
tool = StructuredTool.from_function(
func=search_database,
infer_schema=True, # Auto-generate schema from type hints
parse_docstring=True # Parse descriptions from docstring
)
# The tool now has:
# - Pydantic schema for validation
# - OpenAI/Anthropic compatible format
# - Automatic error handling
Manual Tool Creation
For more control, create tools manually:Copy
from pydantic import BaseModel, Field
class SearchArgs(BaseModel):
"""Arguments for database search"""
query: str = Field(description="Search query")
limit: int = Field(default=10, ge=1, le=100, description="Max results")
filters: dict[str, Any] | None = Field(default=None, description="Filters")
tool = StructuredTool(
node_id="search_tool",
func=search_database,
args_schema=SearchArgs,
description="Search the knowledge base",
return_direct=False, # Wrap output in dict
metadata={"category": "search", "version": "1.0"}
)
Async Tool Support
Tools can handle both sync and async functions:Copy
async def async_search(query: str) -> list[dict]:
"""Async database search"""
async with get_db() as db:
return await db.search(query)
# Provide both versions
tool = StructuredTool.from_function(
func=sync_search, # Sync version
coroutine=async_search, # Async version
infer_schema=True
)
# Tool automatically uses appropriate version
result = await tool.arun({"query": "AI agents"}) # Uses async
result = tool.run({"query": "AI agents"}) # Uses sync
Tool Registry
Manage collections of tools with the ToolRegistry:Copy
from nadoo_flow import ToolRegistry
# Create registry
registry = ToolRegistry()
# Register tools
registry.register(search_tool)
registry.register(calculator_tool)
registry.register(weather_tool)
# Get tool by name
search = registry.get("search_tool")
# List all tools
tools = registry.list_tools()
for tool in tools:
print(f"{tool.name}: {tool.description}")
# Remove tool
registry.unregister("weather_tool")
Export for LLM Providers
Convert tools to provider-specific formats:Copy
# OpenAI format
openai_tools = registry.to_openai_tools()
# Returns:
[{
"type": "function",
"function": {
"name": "search_database",
"description": "Search the knowledge base...",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"limit": {
"type": "integer",
"description": "Max results",
"default": 10
}
},
"required": ["query"]
}
}
}]
# Anthropic format
anthropic_tools = registry.to_anthropic_tools()
# Returns:
[{
"name": "search_database",
"description": "Search the knowledge base...",
"input_schema": {
"type": "object",
"properties": {...},
"required": ["query"]
}
}]
# Google Gemini format
gemini_tools = registry.to_gemini_tools()
Tool Node Integration
Using Tools in Workflows
Integrate tools into your workflows as nodes:Copy
from nadoo_flow import ToolNode, BaseNode
# Create tool node
search_node = ToolNode(
node_id="search",
tool=search_tool,
input_mapping={"user_query": "query"}, # Map input keys
output_key="search_results"
)
# Use in workflow
workflow = InputNode() | search_node | FormatNode() | OutputNode()
# Execute
result = await workflow.run({"user_query": "What is RAG?"})
Dynamic Tool Selection
Let LLMs choose which tool to use:Copy
class ToolSelectorNode(BaseNode):
def __init__(self, tools: list[StructuredTool]):
super().__init__("tool_selector")
self.tools = tools
self.registry = ToolRegistry()
for tool in tools:
self.registry.register(tool)
async def execute(self, node_context, workflow_context):
query = node_context.input_data["query"]
# Ask LLM to select appropriate tool
llm_response = await self.llm.generate(
prompt=f"Select the best tool for: {query}",
tools=self.registry.to_openai_tools()
)
# Execute selected tool
tool_name = llm_response.tool_choice.name
tool_args = llm_response.tool_choice.arguments
tool = self.registry.get(tool_name)
result = await tool.arun(tool_args)
return NodeResult(
success=True,
output={"tool_used": tool_name, "result": result}
)
Advanced Tool Features
Schema Inference
Automatic schema generation from function signatures:Copy
from nadoo_flow.tools import infer_schema_from_function
def complex_function(
text: str,
count: int = 5,
options: list[str] | None = None,
metadata: dict[str, Any] = {}
) -> dict[str, Any]:
"""Process text with options"""
pass
# Infer Pydantic schema
schema = infer_schema_from_function(complex_function)
# Schema includes:
# - Type validation
# - Default values
# - Optional handling
# - Nested structures
Docstring Parsing
Extract descriptions from docstrings:Copy
from nadoo_flow.tools import parse_docstring
def well_documented_function(arg1: str, arg2: int) -> str:
"""
Brief description of the function.
Detailed explanation of what this function does
and how it should be used.
Args:
arg1: Description of first argument
arg2: Description of second argument
with multi-line support
Returns:
Description of return value
Raises:
ValueError: When input is invalid
Examples:
>>> well_documented_function("test", 5)
"result"
"""
pass
info = parse_docstring(well_documented_function)
# Returns:
{
"description": "Brief description of the function.",
"long_description": "Detailed explanation...",
"args": {
"arg1": "Description of first argument",
"arg2": "Description of second argument with multi-line support"
},
"returns": "Description of return value",
"raises": {"ValueError": "When input is invalid"},
"examples": ['well_documented_function("test", 5)']
}
Tool Validation
Validate tool inputs before execution:Copy
class ValidatedTool(StructuredTool):
def validate_input(self, args: dict) -> dict:
"""Custom validation logic"""
# Run Pydantic validation
validated = super().validate_input(args)
# Additional custom validation
if validated.get("query", "").strip() == "":
raise ValueError("Query cannot be empty")
if validated.get("limit", 0) > 1000:
raise ValueError("Limit cannot exceed 1000")
return validated
async def arun(self, args: dict) -> Any:
# Validation happens automatically
validated_args = self.validate_input(args)
return await self.coroutine(**validated_args)
Tool Patterns
Pattern 1: Multi-Step Tools
Tools that perform multiple operations:Copy
class MultiStepTool(StructuredTool):
def __init__(self):
super().__init__(
node_id="multi_step_tool",
func=self.execute,
description="Perform multi-step operation"
)
async def execute(
self,
query: str,
process: bool = True,
summarize: bool = True
) -> dict:
results = {}
# Step 1: Search
results["search"] = await self.search(query)
# Step 2: Process (optional)
if process:
results["processed"] = await self.process(results["search"])
# Step 3: Summarize (optional)
if summarize:
data = results.get("processed", results["search"])
results["summary"] = await self.summarize(data)
return results
Pattern 2: Tool Chains
Chain multiple tools together:Copy
class ToolChain:
def __init__(self, tools: list[StructuredTool]):
self.tools = tools
async def execute(self, initial_input: dict) -> dict:
"""Execute tools in sequence"""
current_input = initial_input
for tool in self.tools:
# Each tool's output becomes next tool's input
result = await tool.arun(current_input)
current_input = {"previous_output": result, **current_input}
return current_input
# Usage
chain = ToolChain([
search_tool,
filter_tool,
format_tool
])
result = await chain.execute({"query": "AI agents"})
Pattern 3: Conditional Tools
Tools with conditional execution:Copy
class ConditionalTool(StructuredTool):
def __init__(self):
super().__init__(
node_id="conditional_tool",
func=self.execute,
description="Tool with conditional logic"
)
async def execute(
self,
action: Literal["search", "calculate", "generate"],
**kwargs
) -> Any:
"""Execute different operations based on action"""
if action == "search":
return await self.search_handler(**kwargs)
elif action == "calculate":
return await self.calculate_handler(**kwargs)
elif action == "generate":
return await self.generate_handler(**kwargs)
else:
raise ValueError(f"Unknown action: {action}")
Pattern 4: Retry with Fallback
Tools with built-in retry and fallback:Copy
class ResilientTool(StructuredTool):
def __init__(self, primary_func, fallback_func, max_retries=3):
super().__init__(
node_id="resilient_tool",
func=self.execute_with_retry,
description="Tool with retry and fallback"
)
self.primary_func = primary_func
self.fallback_func = fallback_func
self.max_retries = max_retries
async def execute_with_retry(self, **kwargs) -> Any:
"""Try primary function, fall back if needed"""
last_error = None
# Try primary function with retries
for attempt in range(self.max_retries):
try:
return await self.primary_func(**kwargs)
except Exception as e:
last_error = e
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
# Fall back to alternative
logger.warning(f"Primary failed, using fallback: {last_error}")
return await self.fallback_func(**kwargs)
Real-World Examples
Web Search Tool
Copy
import httpx
from typing import List, Dict
class WebSearchTool(StructuredTool):
def __init__(self, api_key: str):
self.api_key = api_key
super().__init__(
node_id="web_search",
func=self.search,
description="Search the web for information",
infer_schema=True
)
async def search(
self,
query: str,
max_results: int = 5,
region: str = "us"
) -> List[Dict[str, str]]:
"""
Search the web using search API.
Args:
query: Search query
max_results: Maximum number of results
region: Region code for search
Returns:
List of search results with title, url, and snippet
"""
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.search.example.com/search",
params={
"q": query,
"count": max_results,
"region": region
},
headers={"Authorization": f"Bearer {self.api_key}"}
)
response.raise_for_status()
results = response.json()["results"]
return [
{
"title": r["title"],
"url": r["url"],
"snippet": r["snippet"]
}
for r in results
]
Calculator Tool
Copy
import ast
import operator
class CalculatorTool(StructuredTool):
"""Safe mathematical expression evaluator"""
ALLOWED_OPS = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.Mod: operator.mod,
ast.USub: operator.neg
}
def __init__(self):
super().__init__(
node_id="calculator",
func=self.calculate,
description="Perform mathematical calculations",
infer_schema=True
)
def calculate(self, expression: str) -> float:
"""
Safely evaluate a mathematical expression.
Args:
expression: Mathematical expression to evaluate
Returns:
Result of the calculation
Raises:
ValueError: If expression contains invalid operations
"""
try:
node = ast.parse(expression, mode='eval')
return self._eval_node(node.body)
except Exception as e:
raise ValueError(f"Invalid expression: {e}")
def _eval_node(self, node):
if isinstance(node, ast.Constant):
return node.value
elif isinstance(node, ast.BinOp):
op = self.ALLOWED_OPS.get(type(node.op))
if op is None:
raise ValueError(f"Unsupported operation: {node.op}")
left = self._eval_node(node.left)
right = self._eval_node(node.right)
return op(left, right)
elif isinstance(node, ast.UnaryOp):
op = self.ALLOWED_OPS.get(type(node.op))
if op is None:
raise ValueError(f"Unsupported operation: {node.op}")
return op(self._eval_node(node.operand))
else:
raise ValueError(f"Unsupported node type: {type(node)}")
File Operations Tool
Copy
import aiofiles
from pathlib import Path
class FileOperationsTool(StructuredTool):
def __init__(self, base_dir: str = "."):
self.base_dir = Path(base_dir).resolve()
super().__init__(
node_id="file_ops",
func=self.execute,
description="Perform file operations",
infer_schema=True
)
async def execute(
self,
operation: Literal["read", "write", "list", "delete"],
path: str,
content: str | None = None
) -> dict:
"""
Perform file operations.
Args:
operation: Type of operation to perform
path: File path relative to base directory
content: Content for write operation
Returns:
Operation result
"""
# Ensure path is within base directory (security)
file_path = (self.base_dir / path).resolve()
if not str(file_path).startswith(str(self.base_dir)):
raise ValueError("Path outside base directory")
if operation == "read":
async with aiofiles.open(file_path, 'r') as f:
content = await f.read()
return {"content": content}
elif operation == "write":
if content is None:
raise ValueError("Content required for write")
async with aiofiles.open(file_path, 'w') as f:
await f.write(content)
return {"success": True, "bytes_written": len(content)}
elif operation == "list":
files = list(Path(file_path).iterdir())
return {
"files": [str(f.relative_to(self.base_dir)) for f in files]
}
elif operation == "delete":
file_path.unlink()
return {"success": True}
Best Practices
Type Hints Are Essential
Type Hints Are Essential
Always use type hints for automatic schema generation:
Copy
# Good - clear types
def search(query: str, limit: int = 10) -> list[dict]:
pass
# Bad - no type information
def search(query, limit=10):
pass
Document Your Functions
Document Your Functions
Use docstrings for better tool descriptions:
Copy
def process_data(data: dict) -> dict:
"""Process and validate input data.
Args:
data: Input data to process
Returns:
Processed and validated data
"""
Handle Errors Gracefully
Handle Errors Gracefully
Return meaningful error messages:
Copy
def safe_tool(**kwargs):
try:
return process(**kwargs)
except ValidationError as e:
return {"error": f"Invalid input: {e}"}
except Exception as e:
return {"error": f"Unexpected error: {e}"}
Validate Inputs
Validate Inputs
Use Pydantic models for complex validation:
Copy
class SearchArgs(BaseModel):
query: str = Field(min_length=1, max_length=500)
limit: int = Field(ge=1, le=100)
@validator('query')
def clean_query(cls, v):
return v.strip().lower()
Complete Example
Copy
from nadoo_flow import (
StructuredTool, ToolRegistry, ToolNode,
BaseNode, NodeResult, WorkflowContext
)
from pydantic import BaseModel, Field
from typing import List, Dict, Any
import asyncio
# Define tools
class WebSearchArgs(BaseModel):
query: str = Field(description="Search query")
max_results: int = Field(default=5, ge=1, le=20)
async def web_search(query: str, max_results: int = 5) -> List[Dict]:
"""Search the web for information"""
# Simulated web search
await asyncio.sleep(0.5)
return [
{"title": f"Result {i}", "url": f"https://example.com/{i}"}
for i in range(max_results)
]
class CalculateArgs(BaseModel):
expression: str = Field(description="Math expression")
def calculate(expression: str) -> float:
"""Calculate mathematical expressions"""
# Safe evaluation
return eval(expression, {"__builtins__": {}}, {})
# Create tools
search_tool = StructuredTool.from_function(
func=web_search,
coroutine=web_search,
args_schema=WebSearchArgs,
description="Search the web"
)
calc_tool = StructuredTool.from_function(
func=calculate,
args_schema=CalculateArgs,
description="Calculate math"
)
# Create registry
registry = ToolRegistry()
registry.register(search_tool)
registry.register(calc_tool)
# Tool-using workflow
class ToolUserNode(BaseNode):
def __init__(self, registry: ToolRegistry):
super().__init__("tool_user")
self.registry = registry
async def execute(self, node_context, workflow_context):
task = node_context.input_data["task"]
# Determine which tool to use
if "search" in task.lower():
tool = self.registry.get("web_search")
args = {"query": task, "max_results": 3}
elif "calculate" in task.lower() or "math" in task.lower():
tool = self.registry.get("calculate")
# Extract expression from task
expression = task.split("calculate")[-1].strip()
args = {"expression": expression}
else:
return NodeResult(
success=False,
error="No suitable tool found"
)
# Execute tool
result = await tool.arun(args) if tool.coroutine else tool.run(args)
return NodeResult(
success=True,
output={
"tool_used": tool.node_id,
"result": result
}
)
# Create workflow
workflow = ToolUserNode(registry)
# Test different tasks
async def test_tools():
tasks = [
"search for Python tutorials",
"calculate 15 * 23 + 7",
"unknown task"
]
for task in tasks:
result = await workflow.execute(
NodeContext(node_id="test", input_data={"task": task}),
WorkflowContext(workflow_id="test_workflow")
)
print(f"Task: {task}")
print(f"Result: {result.output}\n")
# Run
asyncio.run(test_tools())