Overview
Human-in-the-Loop (HITL) workflows combine AI automation with human judgment, enabling validation, approval, and feedback mechanisms within your Flow Core pipelines.Basic Human Approval Workflow
Copy
from nadoo_flow import ChainableNode, LLMNode, FunctionNode
import asyncio
from typing import Optional, Callable
class HumanApprovalNode(ChainableNode):
"""Request human approval before proceeding"""
def __init__(self, timeout: int = 300):
super().__init__()
self.timeout = timeout
self.pending_approvals = {}
async def execute(self, data):
request_id = data.get("id", "default")
content = data.get("content", "")
print(f"\n{'='*60}")
print(f"APPROVAL REQUIRED")
print(f"{'='*60}")
print(f"Content: {content}")
print(f"ID: {request_id}")
print(f"\nApprove this action? (yes/no): ", end="", flush=True)
# Wait for approval with timeout
try:
approval = await asyncio.wait_for(
self._get_approval(),
timeout=self.timeout
)
return {
**data,
"approved": approval,
"timestamp": "2024-01-01T12:00:00Z"
}
except asyncio.TimeoutError:
return {
**data,
"approved": False,
"reason": "timeout",
"timeout_seconds": self.timeout
}
async def _get_approval(self) -> bool:
"""Get user approval (simulated)"""
# In production, this would interface with a UI or messaging system
await asyncio.sleep(1)
# Simulate approval
return True
# Create workflow with human approval
content_generator = LLMNode(model="gpt-4")
approval_gate = HumanApprovalNode(timeout=300)
publisher = FunctionNode(lambda x: {"published": x["approved"]})
workflow = (
content_generator
| FunctionNode(lambda x: {"id": "post_1", "content": x["content"]})
| approval_gate
| publisher
)
# Execute
result = await workflow.execute({"prompt": "Write a blog post about AI"})
print(f"Published: {result['published']}")
Advanced HITL with Feedback Loop
Copy
class FeedbackNode(ChainableNode):
"""Collect and incorporate human feedback"""
def __init__(self, max_iterations: int = 3):
super().__init__()
self.max_iterations = max_iterations
async def execute(self, data):
current_output = data.get("output", "")
iteration = data.get("iteration", 0)
if iteration >= self.max_iterations:
return {
**data,
"final_output": current_output,
"iterations": iteration,
"status": "max_iterations_reached"
}
# Present to user
print(f"\n{'='*60}")
print(f"REVIEW OUTPUT (Iteration {iteration + 1}/{self.max_iterations})")
print(f"{'='*60}")
print(current_output)
print(f"\nOptions:")
print("1. Approve")
print("2. Request changes")
print("3. Reject")
choice = input("Your choice (1/2/3): ")
if choice == "1":
return {
**data,
"final_output": current_output,
"status": "approved",
"iterations": iteration + 1
}
elif choice == "2":
feedback = input("What changes would you like? ")
return {
**data,
"feedback": feedback,
"iteration": iteration + 1,
"status": "needs_revision"
}
else:
return {
**data,
"status": "rejected",
"iterations": iteration + 1
}
class RevisionNode(ChainableNode):
"""Revise content based on feedback"""
def __init__(self):
super().__init__()
self.llm = LLMNode(model="gpt-4")
async def execute(self, data):
if data.get("status") != "needs_revision":
return data
original = data.get("output", "")
feedback = data.get("feedback", "")
revision_prompt = f"""Original content:
{original}
User feedback:
{feedback}
Please revise the content based on the feedback:"""
revised = await self.llm.execute({"prompt": revision_prompt})
return {
**data,
"output": revised["content"],
"previous_version": original
}
# Feedback loop workflow
async def feedback_loop_workflow(initial_prompt: str):
"""Workflow with human feedback loop"""
# Generate initial content
generator = LLMNode(model="gpt-4")
initial = await generator.execute({"prompt": initial_prompt})
data = {
"output": initial["content"],
"iteration": 0
}
# Iterate until approved or max iterations
while True:
# Get feedback
feedback_node = FeedbackNode(max_iterations=3)
result = await feedback_node.execute(data)
if result["status"] in ["approved", "rejected", "max_iterations_reached"]:
return result
# Revise based on feedback
revision_node = RevisionNode()
data = await revision_node.execute(result)
# Execute
result = await feedback_loop_workflow("Write a product description")
print(f"Final status: {result['status']}")
Multi-Reviewer Workflow
Copy
class MultiReviewerNode(ChainableNode):
"""Collect reviews from multiple people"""
def __init__(self, reviewers: list, approval_threshold: float = 0.5):
super().__init__()
self.reviewers = reviewers
self.approval_threshold = approval_threshold
async def execute(self, data):
content = data.get("content", "")
reviews = []
print(f"\n{'='*60}")
print(f"MULTI-REVIEWER APPROVAL")
print(f"{'='*60}")
for reviewer in self.reviewers:
print(f"\nReviewer: {reviewer}")
print(f"Content: {content[:100]}...")
# Simulate review collection
review = await self._get_review(reviewer, content)
reviews.append(review)
# Calculate approval rate
approvals = sum(1 for r in reviews if r["approved"])
approval_rate = approvals / len(reviews)
return {
**data,
"reviews": reviews,
"approval_rate": approval_rate,
"approved": approval_rate >= self.approval_threshold,
"total_reviewers": len(reviews)
}
async def _get_review(self, reviewer: str, content: str):
"""Get review from a reviewer"""
# Simulate review process
await asyncio.sleep(0.5)
# Mock review
import random
approved = random.choice([True, True, False]) # 66% approval rate
return {
"reviewer": reviewer,
"approved": approved,
"comments": f"Review from {reviewer}",
"timestamp": "2024-01-01T12:00:00Z"
}
# Multi-reviewer workflow
reviewers = ["Alice", "Bob", "Charlie"]
multi_review = MultiReviewerNode(reviewers, approval_threshold=0.66)
result = await multi_review.execute({
"content": "This is the content to be reviewed."
})
print(f"Approved: {result['approved']} ({result['approval_rate']*100:.0f}%)")
Conditional Human Intervention
Copy
class SmartHITLNode(ChainableNode):
"""Automatically determine when human input is needed"""
def __init__(self, confidence_threshold: float = 0.7):
super().__init__()
self.confidence_threshold = confidence_threshold
async def execute(self, data):
confidence = data.get("confidence", 1.0)
content = data.get("content", "")
# Only request human review for low-confidence cases
if confidence < self.confidence_threshold:
print(f"\n{'='*60}")
print(f"LOW CONFIDENCE DETECTED ({confidence:.2f})")
print(f"{'='*60}")
print(f"Content: {content}")
print(f"\nApprove or provide correction: ")
# Get human input
action = input("Action (approve/correct/reject): ")
if action == "correct":
correction = input("Provide correction: ")
return {
**data,
"content": correction,
"confidence": 1.0,
"human_corrected": True
}
elif action == "reject":
return {
**data,
"rejected": True,
"human_reviewed": True
}
else:
return {
**data,
"confidence": 1.0,
"human_approved": True
}
else:
# High confidence - proceed automatically
return {
**data,
"auto_approved": True
}
# Workflow with smart HITL
class ClassificationWorkflow:
def __init__(self):
self.classifier = LLMNode(model="gpt-3.5-turbo")
self.hitl = SmartHITLNode(confidence_threshold=0.7)
async def classify(self, text: str):
# Classify with confidence score
prompt = f"""Classify this text and provide confidence (0-1).
Text: {text}
Format: category|confidence
Example: technology|0.95"""
result = await self.classifier.execute({"prompt": prompt})
# Parse result
try:
category, confidence = result["content"].split("|")
confidence = float(confidence)
except:
category = "unknown"
confidence = 0.0
# Human review if low confidence
reviewed = await self.hitl.execute({
"content": category,
"confidence": confidence,
"text": text
})
return reviewed
# Execute
classifier = ClassificationWorkflow()
result = await classifier.classify("AI and machine learning news")
Task Assignment and Routing
Copy
class TaskRouter(ChainableNode):
"""Route tasks to appropriate human workers"""
def __init__(self):
super().__init__()
self.workers = {
"translation": ["translator_1", "translator_2"],
"review": ["reviewer_1", "reviewer_2", "reviewer_3"],
"annotation": ["annotator_1", "annotator_2"]
}
self.task_queue = {}
async def execute(self, data):
task_type = data.get("task_type", "review")
priority = data.get("priority", "normal")
# Find available worker
worker = await self._assign_worker(task_type, priority)
if not worker:
return {
**data,
"status": "queued",
"reason": "no_available_workers"
}
# Assign task
task_id = f"task_{len(self.task_queue)}"
self.task_queue[task_id] = {
"worker": worker,
"data": data,
"assigned_at": "2024-01-01T12:00:00Z"
}
print(f"\nTask {task_id} assigned to {worker}")
print(f"Type: {task_type}, Priority: {priority}")
# Simulate task completion
result = await self._wait_for_completion(task_id)
return {
**data,
"task_id": task_id,
"worker": worker,
"result": result,
"status": "completed"
}
async def _assign_worker(self, task_type: str, priority: str):
"""Assign task to available worker"""
workers = self.workers.get(task_type, [])
if workers:
# Simple round-robin assignment
return workers[len(self.task_queue) % len(workers)]
return None
async def _wait_for_completion(self, task_id: str):
"""Wait for worker to complete task"""
await asyncio.sleep(1) # Simulate work time
return {"completed": True, "quality_score": 0.95}
# Task routing workflow
router = TaskRouter()
tasks = [
{"task_type": "translation", "priority": "high", "text": "Hello world"},
{"task_type": "review", "priority": "normal", "document": "doc.pdf"},
{"task_type": "annotation", "priority": "low", "data": "image.jpg"}
]
for task in tasks:
result = await router.execute(task)
print(f"Task completed by {result['worker']}")
Quality Control Pipeline
Copy
class QualityControl(ChainableNode):
"""Multi-stage quality control with human validators"""
def __init__(self):
super().__init__()
self.stages = [
"automated_check",
"peer_review",
"expert_review"
]
async def execute(self, data):
content = data.get("content", "")
quality_scores = []
for stage in self.stages:
score = await self._execute_stage(stage, content, data)
quality_scores.append({
"stage": stage,
"score": score,
"passed": score >= 0.7
})
# Fail fast if stage fails
if score < 0.7:
return {
**data,
"quality_control": {
"passed": False,
"failed_at": stage,
"scores": quality_scores
}
}
return {
**data,
"quality_control": {
"passed": True,
"scores": quality_scores,
"average_score": sum(s["score"] for s in quality_scores) / len(quality_scores)
}
}
async def _execute_stage(self, stage: str, content: str, data: dict):
"""Execute quality control stage"""
if stage == "automated_check":
# Automated checks (grammar, formatting, etc.)
return await self._automated_check(content)
elif stage == "peer_review":
# Peer review
return await self._peer_review(content, data)
else:
# Expert review
return await self._expert_review(content, data)
async def _automated_check(self, content: str) -> float:
"""Automated quality checks"""
await asyncio.sleep(0.1)
# Check length, formatting, etc.
score = 0.85 if len(content) > 10 else 0.5
print(f"Automated check: {score}")
return score
async def _peer_review(self, content: str, data: dict) -> float:
"""Peer review by human"""
print(f"\nPeer review required for: {content[:50]}...")
# Simulate peer review
await asyncio.sleep(0.5)
score = 0.9
print(f"Peer review score: {score}")
return score
async def _expert_review(self, content: str, data: dict) -> float:
"""Expert review"""
print(f"\nExpert review required for: {content[:50]}...")
# Simulate expert review
await asyncio.sleep(0.5)
score = 0.95
print(f"Expert review score: {score}")
return score
# Quality control workflow
qc = QualityControl()
result = await qc.execute({
"content": "This is high-quality content that needs validation."
})
print(f"QC Passed: {result['quality_control']['passed']}")
print(f"Average Score: {result['quality_control']['average_score']:.2f}")
Async Human Input Queue
Copy
class AsyncHumanQueue(ChainableNode):
"""Queue system for async human tasks"""
def __init__(self):
super().__init__()
self.queue = asyncio.Queue()
self.results = {}
async def submit_task(self, task_id: str, data: dict):
"""Submit task to queue"""
await self.queue.put({
"id": task_id,
"data": data,
"submitted_at": "2024-01-01T12:00:00Z"
})
print(f"Task {task_id} queued (queue size: {self.queue.qsize()})")
async def process_queue(self):
"""Process tasks from queue"""
while True:
try:
task = await asyncio.wait_for(self.queue.get(), timeout=1.0)
result = await self._process_task(task)
self.results[task["id"]] = result
self.queue.task_done()
except asyncio.TimeoutError:
break
async def _process_task(self, task: dict):
"""Process individual task"""
print(f"\nProcessing task {task['id']}")
print(f"Data: {task['data']}")
# Simulate human processing
await asyncio.sleep(1)
return {
"task_id": task["id"],
"status": "completed",
"result": "Task completed successfully"
}
async def get_result(self, task_id: str, timeout: int = 60):
"""Get result for task"""
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time < timeout:
if task_id in self.results:
return self.results[task_id]
await asyncio.sleep(0.5)
raise TimeoutError(f"Task {task_id} not completed within {timeout}s")
async def execute(self, data):
task_id = data.get("task_id", f"task_{id(data)}")
await self.submit_task(task_id, data)
# Start processing if not already running
asyncio.create_task(self.process_queue())
# Wait for result
result = await self.get_result(task_id)
return result
# Async queue workflow
queue = AsyncHumanQueue()
# Submit multiple tasks
tasks = [
{"task_id": "task_1", "type": "review", "content": "Content 1"},
{"task_id": "task_2", "type": "approve", "content": "Content 2"},
{"task_id": "task_3", "type": "translate", "content": "Content 3"}
]
# Process tasks concurrently
results = await asyncio.gather(*[
queue.execute(task) for task in tasks
])
for result in results:
print(f"Task {result['task_id']}: {result['status']}")
Best Practices
User Experience
User Experience
- Provide clear context for human reviewers
- Set reasonable timeouts for human tasks
- Send notifications for pending tasks
- Allow task reassignment if needed
Workflow Design
Workflow Design
- Only request human input when necessary
- Use confidence thresholds to trigger HITL
- Implement escalation paths
- Track metrics on human intervention rate
Quality Assurance
Quality Assurance
- Implement multi-stage validation
- Collect feedback for continuous improvement
- Track reviewer agreement rates
- Provide reviewer training and guidelines
Scalability
Scalability
- Use async queues for task distribution
- Implement load balancing across reviewers
- Cache common decisions
- Automate repeated patterns
Integration Patterns
Slack Integration
Copy
class SlackApprovalNode(ChainableNode):
"""Request approval via Slack"""
async def execute(self, data):
# Send message to Slack
# Wait for button click response
# Return approval status
pass
# Usage
slack_approval = SlackApprovalNode()
Web UI Integration
Copy
class WebUIApprovalNode(ChainableNode):
"""Request approval via web interface"""
async def execute(self, data):
# Create pending approval in database
# Return task URL for user
# Poll for approval status
pass
Email Integration
Copy
class EmailApprovalNode(ChainableNode):
"""Request approval via email"""
async def execute(self, data):
# Send approval request email
# Listen for email response
# Parse and return approval
pass