Skip to main content

Overview

Human-in-the-Loop (HITL) workflows combine AI automation with human judgment, enabling validation, approval, and feedback mechanisms within your Flow Core pipelines.

Basic Human Approval Workflow

from nadoo_flow import ChainableNode, LLMNode, FunctionNode
import asyncio
from typing import Optional, Callable

class HumanApprovalNode(ChainableNode):
    """Request human approval before proceeding"""

    def __init__(self, timeout: int = 300):
        super().__init__()
        self.timeout = timeout
        self.pending_approvals = {}

    async def execute(self, data):
        request_id = data.get("id", "default")
        content = data.get("content", "")

        print(f"\n{'='*60}")
        print(f"APPROVAL REQUIRED")
        print(f"{'='*60}")
        print(f"Content: {content}")
        print(f"ID: {request_id}")
        print(f"\nApprove this action? (yes/no): ", end="", flush=True)

        # Wait for approval with timeout
        try:
            approval = await asyncio.wait_for(
                self._get_approval(),
                timeout=self.timeout
            )

            return {
                **data,
                "approved": approval,
                "timestamp": "2024-01-01T12:00:00Z"
            }
        except asyncio.TimeoutError:
            return {
                **data,
                "approved": False,
                "reason": "timeout",
                "timeout_seconds": self.timeout
            }

    async def _get_approval(self) -> bool:
        """Get user approval (simulated)"""
        # In production, this would interface with a UI or messaging system
        await asyncio.sleep(1)
        # Simulate approval
        return True

# Create workflow with human approval
content_generator = LLMNode(model="gpt-4")
approval_gate = HumanApprovalNode(timeout=300)
publisher = FunctionNode(lambda x: {"published": x["approved"]})

workflow = (
    content_generator
    | FunctionNode(lambda x: {"id": "post_1", "content": x["content"]})
    | approval_gate
    | publisher
)

# Execute
result = await workflow.execute({"prompt": "Write a blog post about AI"})
print(f"Published: {result['published']}")

Advanced HITL with Feedback Loop

class FeedbackNode(ChainableNode):
    """Collect and incorporate human feedback"""

    def __init__(self, max_iterations: int = 3):
        super().__init__()
        self.max_iterations = max_iterations

    async def execute(self, data):
        current_output = data.get("output", "")
        iteration = data.get("iteration", 0)

        if iteration >= self.max_iterations:
            return {
                **data,
                "final_output": current_output,
                "iterations": iteration,
                "status": "max_iterations_reached"
            }

        # Present to user
        print(f"\n{'='*60}")
        print(f"REVIEW OUTPUT (Iteration {iteration + 1}/{self.max_iterations})")
        print(f"{'='*60}")
        print(current_output)
        print(f"\nOptions:")
        print("1. Approve")
        print("2. Request changes")
        print("3. Reject")
        choice = input("Your choice (1/2/3): ")

        if choice == "1":
            return {
                **data,
                "final_output": current_output,
                "status": "approved",
                "iterations": iteration + 1
            }
        elif choice == "2":
            feedback = input("What changes would you like? ")
            return {
                **data,
                "feedback": feedback,
                "iteration": iteration + 1,
                "status": "needs_revision"
            }
        else:
            return {
                **data,
                "status": "rejected",
                "iterations": iteration + 1
            }

class RevisionNode(ChainableNode):
    """Revise content based on feedback"""

    def __init__(self):
        super().__init__()
        self.llm = LLMNode(model="gpt-4")

    async def execute(self, data):
        if data.get("status") != "needs_revision":
            return data

        original = data.get("output", "")
        feedback = data.get("feedback", "")

        revision_prompt = f"""Original content:
{original}

User feedback:
{feedback}

Please revise the content based on the feedback:"""

        revised = await self.llm.execute({"prompt": revision_prompt})

        return {
            **data,
            "output": revised["content"],
            "previous_version": original
        }

# Feedback loop workflow
async def feedback_loop_workflow(initial_prompt: str):
    """Workflow with human feedback loop"""

    # Generate initial content
    generator = LLMNode(model="gpt-4")
    initial = await generator.execute({"prompt": initial_prompt})

    data = {
        "output": initial["content"],
        "iteration": 0
    }

    # Iterate until approved or max iterations
    while True:
        # Get feedback
        feedback_node = FeedbackNode(max_iterations=3)
        result = await feedback_node.execute(data)

        if result["status"] in ["approved", "rejected", "max_iterations_reached"]:
            return result

        # Revise based on feedback
        revision_node = RevisionNode()
        data = await revision_node.execute(result)

# Execute
result = await feedback_loop_workflow("Write a product description")
print(f"Final status: {result['status']}")

Multi-Reviewer Workflow

class MultiReviewerNode(ChainableNode):
    """Collect reviews from multiple people"""

    def __init__(self, reviewers: list, approval_threshold: float = 0.5):
        super().__init__()
        self.reviewers = reviewers
        self.approval_threshold = approval_threshold

    async def execute(self, data):
        content = data.get("content", "")
        reviews = []

        print(f"\n{'='*60}")
        print(f"MULTI-REVIEWER APPROVAL")
        print(f"{'='*60}")

        for reviewer in self.reviewers:
            print(f"\nReviewer: {reviewer}")
            print(f"Content: {content[:100]}...")

            # Simulate review collection
            review = await self._get_review(reviewer, content)
            reviews.append(review)

        # Calculate approval rate
        approvals = sum(1 for r in reviews if r["approved"])
        approval_rate = approvals / len(reviews)

        return {
            **data,
            "reviews": reviews,
            "approval_rate": approval_rate,
            "approved": approval_rate >= self.approval_threshold,
            "total_reviewers": len(reviews)
        }

    async def _get_review(self, reviewer: str, content: str):
        """Get review from a reviewer"""
        # Simulate review process
        await asyncio.sleep(0.5)

        # Mock review
        import random
        approved = random.choice([True, True, False])  # 66% approval rate

        return {
            "reviewer": reviewer,
            "approved": approved,
            "comments": f"Review from {reviewer}",
            "timestamp": "2024-01-01T12:00:00Z"
        }

# Multi-reviewer workflow
reviewers = ["Alice", "Bob", "Charlie"]
multi_review = MultiReviewerNode(reviewers, approval_threshold=0.66)

result = await multi_review.execute({
    "content": "This is the content to be reviewed."
})

print(f"Approved: {result['approved']} ({result['approval_rate']*100:.0f}%)")

Conditional Human Intervention

class SmartHITLNode(ChainableNode):
    """Automatically determine when human input is needed"""

    def __init__(self, confidence_threshold: float = 0.7):
        super().__init__()
        self.confidence_threshold = confidence_threshold

    async def execute(self, data):
        confidence = data.get("confidence", 1.0)
        content = data.get("content", "")

        # Only request human review for low-confidence cases
        if confidence < self.confidence_threshold:
            print(f"\n{'='*60}")
            print(f"LOW CONFIDENCE DETECTED ({confidence:.2f})")
            print(f"{'='*60}")
            print(f"Content: {content}")
            print(f"\nApprove or provide correction: ")

            # Get human input
            action = input("Action (approve/correct/reject): ")

            if action == "correct":
                correction = input("Provide correction: ")
                return {
                    **data,
                    "content": correction,
                    "confidence": 1.0,
                    "human_corrected": True
                }
            elif action == "reject":
                return {
                    **data,
                    "rejected": True,
                    "human_reviewed": True
                }
            else:
                return {
                    **data,
                    "confidence": 1.0,
                    "human_approved": True
                }
        else:
            # High confidence - proceed automatically
            return {
                **data,
                "auto_approved": True
            }

# Workflow with smart HITL
class ClassificationWorkflow:
    def __init__(self):
        self.classifier = LLMNode(model="gpt-3.5-turbo")
        self.hitl = SmartHITLNode(confidence_threshold=0.7)

    async def classify(self, text: str):
        # Classify with confidence score
        prompt = f"""Classify this text and provide confidence (0-1).
Text: {text}

Format: category|confidence
Example: technology|0.95"""

        result = await self.classifier.execute({"prompt": prompt})

        # Parse result
        try:
            category, confidence = result["content"].split("|")
            confidence = float(confidence)
        except:
            category = "unknown"
            confidence = 0.0

        # Human review if low confidence
        reviewed = await self.hitl.execute({
            "content": category,
            "confidence": confidence,
            "text": text
        })

        return reviewed

# Execute
classifier = ClassificationWorkflow()
result = await classifier.classify("AI and machine learning news")

Task Assignment and Routing

class TaskRouter(ChainableNode):
    """Route tasks to appropriate human workers"""

    def __init__(self):
        super().__init__()
        self.workers = {
            "translation": ["translator_1", "translator_2"],
            "review": ["reviewer_1", "reviewer_2", "reviewer_3"],
            "annotation": ["annotator_1", "annotator_2"]
        }
        self.task_queue = {}

    async def execute(self, data):
        task_type = data.get("task_type", "review")
        priority = data.get("priority", "normal")

        # Find available worker
        worker = await self._assign_worker(task_type, priority)

        if not worker:
            return {
                **data,
                "status": "queued",
                "reason": "no_available_workers"
            }

        # Assign task
        task_id = f"task_{len(self.task_queue)}"
        self.task_queue[task_id] = {
            "worker": worker,
            "data": data,
            "assigned_at": "2024-01-01T12:00:00Z"
        }

        print(f"\nTask {task_id} assigned to {worker}")
        print(f"Type: {task_type}, Priority: {priority}")

        # Simulate task completion
        result = await self._wait_for_completion(task_id)

        return {
            **data,
            "task_id": task_id,
            "worker": worker,
            "result": result,
            "status": "completed"
        }

    async def _assign_worker(self, task_type: str, priority: str):
        """Assign task to available worker"""
        workers = self.workers.get(task_type, [])
        if workers:
            # Simple round-robin assignment
            return workers[len(self.task_queue) % len(workers)]
        return None

    async def _wait_for_completion(self, task_id: str):
        """Wait for worker to complete task"""
        await asyncio.sleep(1)  # Simulate work time
        return {"completed": True, "quality_score": 0.95}

# Task routing workflow
router = TaskRouter()

tasks = [
    {"task_type": "translation", "priority": "high", "text": "Hello world"},
    {"task_type": "review", "priority": "normal", "document": "doc.pdf"},
    {"task_type": "annotation", "priority": "low", "data": "image.jpg"}
]

for task in tasks:
    result = await router.execute(task)
    print(f"Task completed by {result['worker']}")

Quality Control Pipeline

class QualityControl(ChainableNode):
    """Multi-stage quality control with human validators"""

    def __init__(self):
        super().__init__()
        self.stages = [
            "automated_check",
            "peer_review",
            "expert_review"
        ]

    async def execute(self, data):
        content = data.get("content", "")
        quality_scores = []

        for stage in self.stages:
            score = await self._execute_stage(stage, content, data)
            quality_scores.append({
                "stage": stage,
                "score": score,
                "passed": score >= 0.7
            })

            # Fail fast if stage fails
            if score < 0.7:
                return {
                    **data,
                    "quality_control": {
                        "passed": False,
                        "failed_at": stage,
                        "scores": quality_scores
                    }
                }

        return {
            **data,
            "quality_control": {
                "passed": True,
                "scores": quality_scores,
                "average_score": sum(s["score"] for s in quality_scores) / len(quality_scores)
            }
        }

    async def _execute_stage(self, stage: str, content: str, data: dict):
        """Execute quality control stage"""
        if stage == "automated_check":
            # Automated checks (grammar, formatting, etc.)
            return await self._automated_check(content)
        elif stage == "peer_review":
            # Peer review
            return await self._peer_review(content, data)
        else:
            # Expert review
            return await self._expert_review(content, data)

    async def _automated_check(self, content: str) -> float:
        """Automated quality checks"""
        await asyncio.sleep(0.1)
        # Check length, formatting, etc.
        score = 0.85 if len(content) > 10 else 0.5
        print(f"Automated check: {score}")
        return score

    async def _peer_review(self, content: str, data: dict) -> float:
        """Peer review by human"""
        print(f"\nPeer review required for: {content[:50]}...")
        # Simulate peer review
        await asyncio.sleep(0.5)
        score = 0.9
        print(f"Peer review score: {score}")
        return score

    async def _expert_review(self, content: str, data: dict) -> float:
        """Expert review"""
        print(f"\nExpert review required for: {content[:50]}...")
        # Simulate expert review
        await asyncio.sleep(0.5)
        score = 0.95
        print(f"Expert review score: {score}")
        return score

# Quality control workflow
qc = QualityControl()
result = await qc.execute({
    "content": "This is high-quality content that needs validation."
})

print(f"QC Passed: {result['quality_control']['passed']}")
print(f"Average Score: {result['quality_control']['average_score']:.2f}")

Async Human Input Queue

class AsyncHumanQueue(ChainableNode):
    """Queue system for async human tasks"""

    def __init__(self):
        super().__init__()
        self.queue = asyncio.Queue()
        self.results = {}

    async def submit_task(self, task_id: str, data: dict):
        """Submit task to queue"""
        await self.queue.put({
            "id": task_id,
            "data": data,
            "submitted_at": "2024-01-01T12:00:00Z"
        })
        print(f"Task {task_id} queued (queue size: {self.queue.qsize()})")

    async def process_queue(self):
        """Process tasks from queue"""
        while True:
            try:
                task = await asyncio.wait_for(self.queue.get(), timeout=1.0)
                result = await self._process_task(task)
                self.results[task["id"]] = result
                self.queue.task_done()
            except asyncio.TimeoutError:
                break

    async def _process_task(self, task: dict):
        """Process individual task"""
        print(f"\nProcessing task {task['id']}")
        print(f"Data: {task['data']}")

        # Simulate human processing
        await asyncio.sleep(1)

        return {
            "task_id": task["id"],
            "status": "completed",
            "result": "Task completed successfully"
        }

    async def get_result(self, task_id: str, timeout: int = 60):
        """Get result for task"""
        start_time = asyncio.get_event_loop().time()

        while asyncio.get_event_loop().time() - start_time < timeout:
            if task_id in self.results:
                return self.results[task_id]
            await asyncio.sleep(0.5)

        raise TimeoutError(f"Task {task_id} not completed within {timeout}s")

    async def execute(self, data):
        task_id = data.get("task_id", f"task_{id(data)}")
        await self.submit_task(task_id, data)

        # Start processing if not already running
        asyncio.create_task(self.process_queue())

        # Wait for result
        result = await self.get_result(task_id)

        return result

# Async queue workflow
queue = AsyncHumanQueue()

# Submit multiple tasks
tasks = [
    {"task_id": "task_1", "type": "review", "content": "Content 1"},
    {"task_id": "task_2", "type": "approve", "content": "Content 2"},
    {"task_id": "task_3", "type": "translate", "content": "Content 3"}
]

# Process tasks concurrently
results = await asyncio.gather(*[
    queue.execute(task) for task in tasks
])

for result in results:
    print(f"Task {result['task_id']}: {result['status']}")

Best Practices

  • Provide clear context for human reviewers
  • Set reasonable timeouts for human tasks
  • Send notifications for pending tasks
  • Allow task reassignment if needed
  • Only request human input when necessary
  • Use confidence thresholds to trigger HITL
  • Implement escalation paths
  • Track metrics on human intervention rate
  • Implement multi-stage validation
  • Collect feedback for continuous improvement
  • Track reviewer agreement rates
  • Provide reviewer training and guidelines
  • Use async queues for task distribution
  • Implement load balancing across reviewers
  • Cache common decisions
  • Automate repeated patterns

Integration Patterns

Slack Integration

class SlackApprovalNode(ChainableNode):
    """Request approval via Slack"""

    async def execute(self, data):
        # Send message to Slack
        # Wait for button click response
        # Return approval status
        pass

# Usage
slack_approval = SlackApprovalNode()

Web UI Integration

class WebUIApprovalNode(ChainableNode):
    """Request approval via web interface"""

    async def execute(self, data):
        # Create pending approval in database
        # Return task URL for user
        # Poll for approval status
        pass

Email Integration

class EmailApprovalNode(ChainableNode):
    """Request approval via email"""

    async def execute(self, data):
        # Send approval request email
        # Listen for email response
        # Parse and return approval
        pass

Next Steps