Overview
Flow Core’s Parsers API provides robust parsing and validation of LLM outputs, converting unstructured text into validated Pydantic models. With automatic retry capabilities and format instruction generation, parsers ensure reliable data extraction from AI responses.OutputParser Base Class
All parsers inherit from the OutputParser base class:Copy
from nadoo_flow import OutputParser
class CustomParser(OutputParser):
"""Custom parser implementation"""
def parse(self, text: str) -> Any:
"""Parse text and return structured data"""
# Parsing logic here
return parsed_data
def get_format_instructions(self) -> str:
"""Return instructions for LLM on output format"""
return "Please format your response as..."
def parse_with_prompt(self, completion: str, prompt: str) -> Any:
"""Parse considering original prompt context"""
# Optional: Use prompt for better parsing
return self.parse(completion)
StructuredOutputParser
Parse LLM outputs into Pydantic models with validation:Copy
from nadoo_flow import StructuredOutputParser
from pydantic import BaseModel, Field
from typing import List, Literal
# Define your data model
class AgentAction(BaseModel):
"""Action for an AI agent to take"""
action: Literal["search", "calculate", "respond", "clarify"]
reasoning: str = Field(description="Why this action was chosen")
parameters: dict[str, Any] = Field(description="Action parameters")
confidence: float = Field(ge=0, le=1, description="Confidence score")
# Create parser
parser = StructuredOutputParser(pydantic_model=AgentAction)
# Get format instructions for LLM
instructions = parser.get_format_instructions()
print(instructions)
# Output:
# Please provide your response as a valid JSON object that matches this schema:
# - action: search|calculate|respond|clarify
# - reasoning: string
# - parameters: object
# - confidence: 0.0-1.0
# Parse LLM output
llm_output = "LLM output with JSON containing action, reasoning, parameters, and confidence fields"
result = parser.parse(llm_output)
print(result)
# Returns AgentAction object with validated and parsed fields
Advanced Features
JSON Extraction
Automatically extracts JSON from various formats:Copy
# Handles JSON in markdown code blocks
text_with_markdown = """
Here's the result:
```json
{"key": "value"}
Handles inline JSON
text_with_inline = ‘The answer is in JSON format as shown’Handles malformed JSON (attempts repair)
text_with_errors = ‘Malformed JSON example’ # Missing closing braceCopy
#### Nested Models
Support for complex nested structures:
```python
class Step(BaseModel):
name: str
description: str
duration: int # minutes
class WorkflowPlan(BaseModel):
goal: str
steps: List[Step]
total_duration: int
requires_human_input: bool
parser = StructuredOutputParser(pydantic_model=WorkflowPlan)
# Parser handles nested validation automatically
ParserNode
Integrate parsers into workflows:Copy
from nadoo_flow import ParserNode, BaseNode
# Create parser node
parser_node = ParserNode(
node_id="parse_response",
parser=StructuredOutputParser(AgentAction),
input_key="llm_output", # Where to find text to parse
output_key="parsed_action" # Where to store parsed result
)
# Use in workflow
workflow = LLMNode() | parser_node | ActionExecutorNode()
# The parser node automatically:
# - Extracts text from input
# - Parses to Pydantic model
# - Validates data
# - Passes structured data to next node
RetryableParserNode
Automatic retry with LLM on parse failure:Copy
from nadoo_flow import RetryableParserNode
class ComplexOutput(BaseModel):
analysis: str
recommendations: List[str]
metrics: dict[str, float]
# Create retryable parser
parser_node = RetryableParserNode(
node_id="parse_with_retry",
parser=StructuredOutputParser(ComplexOutput),
llm_node=my_llm_node, # LLM to use for retry
max_retries=3,
retry_prompt_template="""
The previous output could not be parsed correctly.
Error: {error}
Please provide a properly formatted response according to these instructions:
{format_instructions}
Original output that failed:
{completion}
"""
)
# On parse failure:
# 1. Sends error + format instructions back to LLM
# 2. LLM generates corrected output
# 3. Attempts parsing again
# 4. Repeats up to max_retries
Built-in Parsers
JsonOutputParser
Simple JSON parsing without schema:Copy
from nadoo_flow import JsonOutputParser
parser = JsonOutputParser()
# Parse any JSON string
json_text = '{"name": "John", "age": 30, "tags": ["python", "ai"]}'
result = parser.parse(json_text)
# Returns: dict with parsed JSON
# Extracts from markdown code blocks
markdown_text = """
```json
{"status": "success"}
Returns: dict with status key
Copy
### StringOutputParser
Pass-through parser for plain text:
```python
from nadoo_flow import StringOutputParser
parser = StringOutputParser()
# Simply returns the input text
result = parser.parse("Any text here")
# Returns: "Any text here"
# Useful as default/fallback parser
ListOutputParser
Parse text into lists:Copy
from nadoo_flow import ListOutputParser
parser = ListOutputParser(
delimiter="\n", # Split by newline
strip=True # Strip whitespace
)
text = """
1. First item
2. Second item
3. Third item
"""
result = parser.parse(text)
# Returns: ["1. First item", "2. Second item", "3. Third item"]
# Custom delimiter
parser = ListOutputParser(delimiter=", ")
result = parser.parse("apple, banana, orange")
# Returns: ["apple", "banana", "orange"]
Parser Patterns
Pattern 1: Multi-Format Parser
Handle multiple output formats:Copy
class MultiFormatParser(OutputParser):
def __init__(self, parsers: dict[str, OutputParser]):
self.parsers = parsers
def parse(self, text: str) -> Any:
"""Try multiple parsers in order"""
errors = []
# Try JSON first
if "json" in self.parsers:
try:
return self.parsers["json"].parse(text)
except Exception as e:
errors.append(f"JSON: {e}")
# Try YAML
if "yaml" in self.parsers:
try:
return self.parsers["yaml"].parse(text)
except Exception as e:
errors.append(f"YAML: {e}")
# Try custom format
if "custom" in self.parsers:
try:
return self.parsers["custom"].parse(text)
except Exception as e:
errors.append(f"Custom: {e}")
raise ValueError(f"Could not parse with any format: {errors}")
# Usage
parsers_dict = {
"json": JsonOutputParser(),
"yaml": YamlOutputParser(),
"custom": CustomFormatParser()
}
parser = MultiFormatParser(parsers_dict)
Pattern 2: Partial Parsing
Parse incomplete/streaming outputs:Copy
class StreamingParser(OutputParser):
def __init__(self, base_parser: OutputParser):
self.base_parser = base_parser
self.buffer = ""
def parse_partial(self, chunk: str) -> Any | None:
"""Parse streaming chunks"""
self.buffer += chunk
# Try to parse if we have complete JSON
if self.looks_complete():
try:
result = self.base_parser.parse(self.buffer)
self.buffer = "" # Clear on success
return result
except:
return None # Wait for more data
return None
def looks_complete(self) -> bool:
"""Check if buffer might be complete"""
# Simple check for JSON
return (
self.buffer.strip().startswith("{") and
self.buffer.strip().endswith("}")
)
Pattern 3: Validation Chain
Chain multiple validators:Copy
class ValidatedParser(OutputParser):
def __init__(
self,
base_parser: OutputParser,
validators: list[callable]
):
self.base_parser = base_parser
self.validators = validators
def parse(self, text: str) -> Any:
"""Parse and validate through chain"""
# Initial parse
result = self.base_parser.parse(text)
# Run validators
for validator in self.validators:
result = validator(result)
return result
# Usage
def validate_completeness(data):
"""Ensure all required fields are present"""
required = ["action", "reasoning"]
for field in required:
if field not in data:
raise ValueError(f"Missing required field: {field}")
return data
def validate_consistency(data):
"""Check logical consistency"""
if data.get("action") == "calculate" and not data.get("expression"):
raise ValueError("Calculate action requires expression")
return data
parser = ValidatedParser(
base_parser=JsonOutputParser(),
validators=[validate_completeness, validate_consistency]
)
Pattern 4: Fallback Parser
Graceful degradation on parse failure:Copy
class FallbackParser(OutputParser):
def __init__(self, parsers: list[OutputParser]):
self.parsers = parsers
def parse(self, text: str) -> Any:
"""Try parsers in order until one succeeds"""
last_error = None
for parser in self.parsers:
try:
return parser.parse(text)
except Exception as e:
last_error = e
continue
# All failed - return raw text as fallback
return {
"raw_text": text,
"parse_error": str(last_error),
"fallback": True
}
# Usage
parsers_list = [
StructuredOutputParser(MyModel), # Try structured first
JsonOutputParser(), # Then any JSON
StringOutputParser() # Finally just text
]
parser = FallbackParser(parsers_list)
Real-World Examples
Code Generation Parser
Copy
class CodeBlock(BaseModel):
language: str
code: str
explanation: str | None = None
class CodeGenerationOutput(BaseModel):
description: str
code_blocks: List[CodeBlock]
dependencies: List[str]
usage_example: str | None = None
class CodeParser(OutputParser):
def __init__(self):
self.structured_parser = StructuredOutputParser(CodeGenerationOutput)
def parse(self, text: str) -> CodeGenerationOutput:
"""Extract code blocks and metadata from LLM output"""
import re
# Extract code blocks
code_pattern = r'```(\w+)?\n(.*?)```'
code_blocks = []
for match in re.finditer(code_pattern, text, re.DOTALL):
language = match.group(1) or "text"
code = match.group(2).strip()
block = CodeBlock(
language=language,
code=code,
explanation=None
)
code_blocks.append(block)
# Extract dependencies (pip install lines)
dep_pattern = r'pip install ([\w\-\[\]]+)'
dependencies = re.findall(dep_pattern, text)
# Build structured output
output = CodeGenerationOutput(
description=text[:200], # First 200 chars as description
code_blocks=code_blocks,
dependencies=dependencies,
usage_example=None
)
return output
# Usage
parser = CodeParser()
llm_output = """
Here's a Python function to calculate fibonacci:
```python
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
Copy
### SQL Query Parser
```python
class SQLQuery(BaseModel):
query: str
explanation: str
tables_used: List[str]
estimated_rows: int | None = None
class SQLParser(OutputParser):
def __init__(self):
self.base_parser = StructuredOutputParser(SQLQuery)
def parse(self, text: str) -> SQLQuery:
"""Extract and validate SQL query"""
import re
import sqlparse
# Try structured parse first
try:
return self.base_parser.parse(text)
except:
pass
# Fallback to extraction
# Find SQL in code blocks or after keywords
sql_pattern = r'```sql\n(.*?)```|SELECT.*?(?=\n\n|\Z)'
matches = re.findall(sql_pattern, text, re.DOTALL | re.IGNORECASE)
if not matches:
raise ValueError("No SQL query found in output")
query = matches[0] if isinstance(matches[0], str) else matches[0][0]
# Clean and format
query = sqlparse.format(
query,
reindent=True,
keyword_case='upper'
)
# Extract table names
parsed = sqlparse.parse(query)[0]
tables = self._extract_tables(parsed)
result = SQLQuery(
query=query,
explanation=text[:200],
tables_used=tables
)
return result
def _extract_tables(self, parsed_query) -> List[str]:
"""Extract table names from parsed SQL"""
# Simplified extraction
tables = []
for token in parsed_query.tokens:
if token.ttype is None and 'FROM' in str(token).upper():
# Extract table name after FROM
pass
return tables
Report Parser
Copy
class ReportSection(BaseModel):
title: str
content: str
metrics: dict[str, Any] | None = None
class Report(BaseModel):
summary: str
sections: List[ReportSection]
conclusions: List[str]
next_steps: List[str]
class ReportParser(OutputParser):
def parse(self, text: str) -> Report:
"""Parse markdown-formatted report"""
lines = text.split('\n')
summary = ""
sections = []
conclusions = []
next_steps = []
current_section = None
current_content = []
for line in lines:
# Extract summary (first paragraph)
if not summary and line.strip() and not line.startswith('#'):
summary = line.strip()
# New section
elif line.startswith('##'):
if current_section:
section = ReportSection(
title=current_section,
content='\n'.join(current_content).strip()
)
sections.append(section)
current_section = line.replace('##', '').strip()
current_content = []
# Section content
elif current_section:
current_content.append(line)
# Conclusions
elif 'conclusion' in line.lower():
in_conclusions = True
# Next steps
elif 'next step' in line.lower():
in_next_steps = True
# Add last section
if current_section:
last_section = ReportSection(
title=current_section,
content='\n'.join(current_content).strip()
)
sections.append(last_section)
report = Report(
summary=summary,
sections=sections,
conclusions=conclusions,
next_steps=next_steps
)
return report
Best Practices
Provide Clear Format Instructions
Provide Clear Format Instructions
Give LLMs explicit formatting requirements:
Copy
def get_format_instructions(self) -> str:
return """
Provide your response as JSON with this exact structure:
{
"field1": "value",
"field2": ["item1", "item2"],
"field3": {"nested": "object"}
}
Ensure all strings are properly quoted and the JSON is valid.
"""
Handle Parse Failures Gracefully
Handle Parse Failures Gracefully
Always have a fallback strategy:
Copy
try:
parsed = parser.parse(text)
except ParseError as e:
# Log error
logger.warning(f"Parse failed: {e}")
# Use fallback
parsed = {"raw": text, "error": str(e)}
Validate Parsed Data
Validate Parsed Data
Don’t trust LLM output blindly:
Copy
def parse(self, text: str) -> Any:
data = self.extract_json(text)
# Validate business logic
if data.get("price", 0) < 0:
raise ValueError("Price cannot be negative")
# Sanitize strings
data["description"] = self.sanitize_html(
data.get("description", "")
)
return data
Use Type Hints
Use Type Hints
Leverage Pydantic for automatic validation:
Copy
class StrictModel(BaseModel):
class Config:
str_strip_whitespace = True # Auto-strip strings
use_enum_values = True # Convert enums
validate_assignment = True # Validate on update
field: str = Field(min_length=1, max_length=100)
score: float = Field(ge=0, le=1)
Complete Example
Copy
from nadoo_flow import (
StructuredOutputParser, RetryableParserNode,
BaseNode, NodeResult, WorkflowContext
)
from pydantic import BaseModel, Field, validator
from typing import List, Literal, Optional
from datetime import datetime
# Complex structured output
class Task(BaseModel):
id: str
title: str
priority: Literal["low", "medium", "high", "critical"]
assigned_to: Optional[str] = None
due_date: Optional[datetime] = None
dependencies: List[str] = Field(default_factory=list)
@validator('due_date')
def validate_future_date(cls, v):
if v and v < datetime.now():
raise ValueError("Due date must be in the future")
return v
class ProjectPlan(BaseModel):
project_name: str
objective: str
tasks: List[Task]
milestones: List[str]
estimated_duration: int # days
risks: List[str] = Field(default_factory=list)
@validator('tasks')
def validate_task_dependencies(cls, v):
task_ids = {task.id for task in v}
for task in v:
for dep in task.dependencies:
if dep not in task_ids:
raise ValueError(f"Unknown dependency: {dep}")
return v
# Create parser
parser = StructuredOutputParser(pydantic_model=ProjectPlan)
# LLM Node that generates project plans
class ProjectPlannerNode(BaseNode):
def __init__(self):
super().__init__("project_planner")
async def execute(self, node_context, workflow_context):
prompt = f"""
Create a detailed project plan for: {node_context.input_data['request']}
{parser.get_format_instructions()}
"""
# Simulate LLM response (simplified for brevity)
llm_output = """Project plan output with structured JSON format"""
result = NodeResult(success=True, output=dict(llm_output=llm_output))
return result
# Create workflow with retryable parser
workflow = (
ProjectPlannerNode() |
RetryableParserNode(
node_id="parse_plan",
parser=parser,
llm_node=ProjectPlannerNode(), # Use same LLM for retry
max_retries=3,
input_key="llm_output",
output_key="project_plan"
)
)
# Execute
async def create_project_plan():
input_data = {"request": "Build an AI recommendation system"}
result = await workflow.run(
input_data,
WorkflowContext(workflow_id="planning")
)
plan = result.get("project_plan")
print(f"Project: {plan.project_name}")
print(f"Duration: {plan.estimated_duration} days")
print(f"Tasks: {len(plan.tasks)}")
for task in plan.tasks:
print(f" - {task.id}: {task.title} ({task.priority})")
# Run the workflow
await create_project_plan()