Skip to main content

Overview

Flow Core’s Parsers API provides robust parsing and validation of LLM outputs, converting unstructured text into validated Pydantic models. With automatic retry capabilities and format instruction generation, parsers ensure reliable data extraction from AI responses.

OutputParser Base Class

All parsers inherit from the OutputParser base class:
from nadoo_flow import OutputParser

class CustomParser(OutputParser):
    """Custom parser implementation"""

    def parse(self, text: str) -> Any:
        """Parse text and return structured data"""
        # Parsing logic here
        return parsed_data

    def get_format_instructions(self) -> str:
        """Return instructions for LLM on output format"""
        return "Please format your response as..."

    def parse_with_prompt(self, completion: str, prompt: str) -> Any:
        """Parse considering original prompt context"""
        # Optional: Use prompt for better parsing
        return self.parse(completion)

StructuredOutputParser

Parse LLM outputs into Pydantic models with validation:
from nadoo_flow import StructuredOutputParser
from pydantic import BaseModel, Field
from typing import List, Literal

# Define your data model
class AgentAction(BaseModel):
    """Action for an AI agent to take"""
    action: Literal["search", "calculate", "respond", "clarify"]
    reasoning: str = Field(description="Why this action was chosen")
    parameters: dict[str, Any] = Field(description="Action parameters")
    confidence: float = Field(ge=0, le=1, description="Confidence score")

# Create parser
parser = StructuredOutputParser(pydantic_model=AgentAction)

# Get format instructions for LLM
instructions = parser.get_format_instructions()
print(instructions)
# Output:
# Please provide your response as a valid JSON object that matches this schema:
# - action: search|calculate|respond|clarify
# - reasoning: string
# - parameters: object
# - confidence: 0.0-1.0

# Parse LLM output
llm_output = "LLM output with JSON containing action, reasoning, parameters, and confidence fields"

result = parser.parse(llm_output)
print(result)
# Returns AgentAction object with validated and parsed fields

Advanced Features

JSON Extraction

Automatically extracts JSON from various formats:
# Handles JSON in markdown code blocks
text_with_markdown = """
Here's the result:

```json
{"key": "value"}
"""

Handles inline JSON

text_with_inline = ‘The answer is in JSON format as shown’

Handles malformed JSON (attempts repair)

text_with_errors = ‘Malformed JSON example’ # Missing closing brace

#### Nested Models

Support for complex nested structures:

```python
class Step(BaseModel):
    name: str
    description: str
    duration: int  # minutes

class WorkflowPlan(BaseModel):
    goal: str
    steps: List[Step]
    total_duration: int
    requires_human_input: bool

parser = StructuredOutputParser(pydantic_model=WorkflowPlan)

# Parser handles nested validation automatically

ParserNode

Integrate parsers into workflows:
from nadoo_flow import ParserNode, BaseNode

# Create parser node
parser_node = ParserNode(
    node_id="parse_response",
    parser=StructuredOutputParser(AgentAction),
    input_key="llm_output",    # Where to find text to parse
    output_key="parsed_action"  # Where to store parsed result
)

# Use in workflow
workflow = LLMNode() | parser_node | ActionExecutorNode()

# The parser node automatically:
# - Extracts text from input
# - Parses to Pydantic model
# - Validates data
# - Passes structured data to next node

RetryableParserNode

Automatic retry with LLM on parse failure:
from nadoo_flow import RetryableParserNode

class ComplexOutput(BaseModel):
    analysis: str
    recommendations: List[str]
    metrics: dict[str, float]

# Create retryable parser
parser_node = RetryableParserNode(
    node_id="parse_with_retry",
    parser=StructuredOutputParser(ComplexOutput),
    llm_node=my_llm_node,  # LLM to use for retry
    max_retries=3,
    retry_prompt_template="""
    The previous output could not be parsed correctly.
    Error: {error}

    Please provide a properly formatted response according to these instructions:
    {format_instructions}

    Original output that failed:
    {completion}
    """
)

# On parse failure:
# 1. Sends error + format instructions back to LLM
# 2. LLM generates corrected output
# 3. Attempts parsing again
# 4. Repeats up to max_retries

Built-in Parsers

JsonOutputParser

Simple JSON parsing without schema:
from nadoo_flow import JsonOutputParser

parser = JsonOutputParser()

# Parse any JSON string
json_text = '{"name": "John", "age": 30, "tags": ["python", "ai"]}'
result = parser.parse(json_text)
# Returns: dict with parsed JSON

# Extracts from markdown code blocks
markdown_text = """
```json
{"status": "success"}
""" result = parser.parse(markdown_text)

Returns: dict with status key


### StringOutputParser

Pass-through parser for plain text:

```python
from nadoo_flow import StringOutputParser

parser = StringOutputParser()

# Simply returns the input text
result = parser.parse("Any text here")
# Returns: "Any text here"

# Useful as default/fallback parser

ListOutputParser

Parse text into lists:
from nadoo_flow import ListOutputParser

parser = ListOutputParser(
    delimiter="\n",  # Split by newline
    strip=True       # Strip whitespace
)

text = """
1. First item
2. Second item
3. Third item
"""

result = parser.parse(text)
# Returns: ["1. First item", "2. Second item", "3. Third item"]

# Custom delimiter
parser = ListOutputParser(delimiter=", ")
result = parser.parse("apple, banana, orange")
# Returns: ["apple", "banana", "orange"]

Parser Patterns

Pattern 1: Multi-Format Parser

Handle multiple output formats:
class MultiFormatParser(OutputParser):
    def __init__(self, parsers: dict[str, OutputParser]):
        self.parsers = parsers

    def parse(self, text: str) -> Any:
        """Try multiple parsers in order"""
        errors = []

        # Try JSON first
        if "json" in self.parsers:
            try:
                return self.parsers["json"].parse(text)
            except Exception as e:
                errors.append(f"JSON: {e}")

        # Try YAML
        if "yaml" in self.parsers:
            try:
                return self.parsers["yaml"].parse(text)
            except Exception as e:
                errors.append(f"YAML: {e}")

        # Try custom format
        if "custom" in self.parsers:
            try:
                return self.parsers["custom"].parse(text)
            except Exception as e:
                errors.append(f"Custom: {e}")

        raise ValueError(f"Could not parse with any format: {errors}")

# Usage
parsers_dict = {
    "json": JsonOutputParser(),
    "yaml": YamlOutputParser(),
    "custom": CustomFormatParser()
}
parser = MultiFormatParser(parsers_dict)

Pattern 2: Partial Parsing

Parse incomplete/streaming outputs:
class StreamingParser(OutputParser):
    def __init__(self, base_parser: OutputParser):
        self.base_parser = base_parser
        self.buffer = ""

    def parse_partial(self, chunk: str) -> Any | None:
        """Parse streaming chunks"""
        self.buffer += chunk

        # Try to parse if we have complete JSON
        if self.looks_complete():
            try:
                result = self.base_parser.parse(self.buffer)
                self.buffer = ""  # Clear on success
                return result
            except:
                return None  # Wait for more data

        return None

    def looks_complete(self) -> bool:
        """Check if buffer might be complete"""
        # Simple check for JSON
        return (
            self.buffer.strip().startswith("{") and
            self.buffer.strip().endswith("}")
        )

Pattern 3: Validation Chain

Chain multiple validators:
class ValidatedParser(OutputParser):
    def __init__(
        self,
        base_parser: OutputParser,
        validators: list[callable]
    ):
        self.base_parser = base_parser
        self.validators = validators

    def parse(self, text: str) -> Any:
        """Parse and validate through chain"""
        # Initial parse
        result = self.base_parser.parse(text)

        # Run validators
        for validator in self.validators:
            result = validator(result)

        return result

# Usage
def validate_completeness(data):
    """Ensure all required fields are present"""
    required = ["action", "reasoning"]
    for field in required:
        if field not in data:
            raise ValueError(f"Missing required field: {field}")
    return data

def validate_consistency(data):
    """Check logical consistency"""
    if data.get("action") == "calculate" and not data.get("expression"):
        raise ValueError("Calculate action requires expression")
    return data

parser = ValidatedParser(
    base_parser=JsonOutputParser(),
    validators=[validate_completeness, validate_consistency]
)

Pattern 4: Fallback Parser

Graceful degradation on parse failure:
class FallbackParser(OutputParser):
    def __init__(self, parsers: list[OutputParser]):
        self.parsers = parsers

    def parse(self, text: str) -> Any:
        """Try parsers in order until one succeeds"""
        last_error = None

        for parser in self.parsers:
            try:
                return parser.parse(text)
            except Exception as e:
                last_error = e
                continue

        # All failed - return raw text as fallback
        return {
            "raw_text": text,
            "parse_error": str(last_error),
            "fallback": True
        }

# Usage
parsers_list = [
    StructuredOutputParser(MyModel),  # Try structured first
    JsonOutputParser(),                # Then any JSON
    StringOutputParser()               # Finally just text
]
parser = FallbackParser(parsers_list)

Real-World Examples

Code Generation Parser

class CodeBlock(BaseModel):
    language: str
    code: str
    explanation: str | None = None

class CodeGenerationOutput(BaseModel):
    description: str
    code_blocks: List[CodeBlock]
    dependencies: List[str]
    usage_example: str | None = None

class CodeParser(OutputParser):
    def __init__(self):
        self.structured_parser = StructuredOutputParser(CodeGenerationOutput)

    def parse(self, text: str) -> CodeGenerationOutput:
        """Extract code blocks and metadata from LLM output"""
        import re

        # Extract code blocks
        code_pattern = r'```(\w+)?\n(.*?)```'
        code_blocks = []

        for match in re.finditer(code_pattern, text, re.DOTALL):
            language = match.group(1) or "text"
            code = match.group(2).strip()
            block = CodeBlock(
                language=language,
                code=code,
                explanation=None
            )
            code_blocks.append(block)

        # Extract dependencies (pip install lines)
        dep_pattern = r'pip install ([\w\-\[\]]+)'
        dependencies = re.findall(dep_pattern, text)

        # Build structured output
        output = CodeGenerationOutput(
            description=text[:200],  # First 200 chars as description
            code_blocks=code_blocks,
            dependencies=dependencies,
            usage_example=None
        )
        return output

# Usage
parser = CodeParser()
llm_output = """
Here's a Python function to calculate fibonacci:

```python
def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)
You’ll need to install numpy for optimized version: pip install numpy """ result = parser.parse(llm_output) print(result.code_blocks[0].code) # The fibonacci function print(result.dependencies) # [‘numpy’]

### SQL Query Parser

```python
class SQLQuery(BaseModel):
    query: str
    explanation: str
    tables_used: List[str]
    estimated_rows: int | None = None

class SQLParser(OutputParser):
    def __init__(self):
        self.base_parser = StructuredOutputParser(SQLQuery)

    def parse(self, text: str) -> SQLQuery:
        """Extract and validate SQL query"""
        import re
        import sqlparse

        # Try structured parse first
        try:
            return self.base_parser.parse(text)
        except:
            pass

        # Fallback to extraction
        # Find SQL in code blocks or after keywords
        sql_pattern = r'```sql\n(.*?)```|SELECT.*?(?=\n\n|\Z)'
        matches = re.findall(sql_pattern, text, re.DOTALL | re.IGNORECASE)

        if not matches:
            raise ValueError("No SQL query found in output")

        query = matches[0] if isinstance(matches[0], str) else matches[0][0]

        # Clean and format
        query = sqlparse.format(
            query,
            reindent=True,
            keyword_case='upper'
        )

        # Extract table names
        parsed = sqlparse.parse(query)[0]
        tables = self._extract_tables(parsed)

        result = SQLQuery(
            query=query,
            explanation=text[:200],
            tables_used=tables
        )
        return result

    def _extract_tables(self, parsed_query) -> List[str]:
        """Extract table names from parsed SQL"""
        # Simplified extraction
        tables = []
        for token in parsed_query.tokens:
            if token.ttype is None and 'FROM' in str(token).upper():
                # Extract table name after FROM
                pass
        return tables

Report Parser

class ReportSection(BaseModel):
    title: str
    content: str
    metrics: dict[str, Any] | None = None

class Report(BaseModel):
    summary: str
    sections: List[ReportSection]
    conclusions: List[str]
    next_steps: List[str]

class ReportParser(OutputParser):
    def parse(self, text: str) -> Report:
        """Parse markdown-formatted report"""
        lines = text.split('\n')

        summary = ""
        sections = []
        conclusions = []
        next_steps = []

        current_section = None
        current_content = []

        for line in lines:
            # Extract summary (first paragraph)
            if not summary and line.strip() and not line.startswith('#'):
                summary = line.strip()

            # New section
            elif line.startswith('##'):
                if current_section:
                    section = ReportSection(
                        title=current_section,
                        content='\n'.join(current_content).strip()
                    )
                    sections.append(section)
                current_section = line.replace('##', '').strip()
                current_content = []

            # Section content
            elif current_section:
                current_content.append(line)

            # Conclusions
            elif 'conclusion' in line.lower():
                in_conclusions = True

            # Next steps
            elif 'next step' in line.lower():
                in_next_steps = True

        # Add last section
        if current_section:
            last_section = ReportSection(
                title=current_section,
                content='\n'.join(current_content).strip()
            )
            sections.append(last_section)

        report = Report(
            summary=summary,
            sections=sections,
            conclusions=conclusions,
            next_steps=next_steps
        )
        return report

Best Practices

Give LLMs explicit formatting requirements:
def get_format_instructions(self) -> str:
    return """
    Provide your response as JSON with this exact structure:
    {
      "field1": "value",
      "field2": ["item1", "item2"],
      "field3": {"nested": "object"}
    }

    Ensure all strings are properly quoted and the JSON is valid.
    """
Always have a fallback strategy:
try:
    parsed = parser.parse(text)
except ParseError as e:
    # Log error
    logger.warning(f"Parse failed: {e}")
    # Use fallback
    parsed = {"raw": text, "error": str(e)}
Don’t trust LLM output blindly:
def parse(self, text: str) -> Any:
    data = self.extract_json(text)

    # Validate business logic
    if data.get("price", 0) < 0:
        raise ValueError("Price cannot be negative")

    # Sanitize strings
    data["description"] = self.sanitize_html(
        data.get("description", "")
    )

    return data
Leverage Pydantic for automatic validation:
class StrictModel(BaseModel):
    class Config:
        str_strip_whitespace = True  # Auto-strip strings
        use_enum_values = True        # Convert enums
        validate_assignment = True    # Validate on update

    field: str = Field(min_length=1, max_length=100)
    score: float = Field(ge=0, le=1)

Complete Example

from nadoo_flow import (
    StructuredOutputParser, RetryableParserNode,
    BaseNode, NodeResult, WorkflowContext
)
from pydantic import BaseModel, Field, validator
from typing import List, Literal, Optional
from datetime import datetime

# Complex structured output
class Task(BaseModel):
    id: str
    title: str
    priority: Literal["low", "medium", "high", "critical"]
    assigned_to: Optional[str] = None
    due_date: Optional[datetime] = None
    dependencies: List[str] = Field(default_factory=list)

    @validator('due_date')
    def validate_future_date(cls, v):
        if v and v < datetime.now():
            raise ValueError("Due date must be in the future")
        return v

class ProjectPlan(BaseModel):
    project_name: str
    objective: str
    tasks: List[Task]
    milestones: List[str]
    estimated_duration: int  # days
    risks: List[str] = Field(default_factory=list)

    @validator('tasks')
    def validate_task_dependencies(cls, v):
        task_ids = {task.id for task in v}
        for task in v:
            for dep in task.dependencies:
                if dep not in task_ids:
                    raise ValueError(f"Unknown dependency: {dep}")
        return v

# Create parser
parser = StructuredOutputParser(pydantic_model=ProjectPlan)

# LLM Node that generates project plans
class ProjectPlannerNode(BaseNode):
    def __init__(self):
        super().__init__("project_planner")

    async def execute(self, node_context, workflow_context):
        prompt = f"""
        Create a detailed project plan for: {node_context.input_data['request']}

        {parser.get_format_instructions()}
        """

        # Simulate LLM response (simplified for brevity)
        llm_output = """Project plan output with structured JSON format"""

        result = NodeResult(success=True, output=dict(llm_output=llm_output))
        return result

# Create workflow with retryable parser
workflow = (
    ProjectPlannerNode() |
    RetryableParserNode(
        node_id="parse_plan",
        parser=parser,
        llm_node=ProjectPlannerNode(),  # Use same LLM for retry
        max_retries=3,
        input_key="llm_output",
        output_key="project_plan"
    )
)

# Execute
async def create_project_plan():
    input_data = {"request": "Build an AI recommendation system"}
    result = await workflow.run(
        input_data,
        WorkflowContext(workflow_id="planning")
    )

    plan = result.get("project_plan")
    print(f"Project: {plan.project_name}")
    print(f"Duration: {plan.estimated_duration} days")
    print(f"Tasks: {len(plan.tasks)}")
    for task in plan.tasks:
        print(f"  - {task.id}: {task.title} ({task.priority})")

# Run the workflow
await create_project_plan()

See Also