Overview
Parallel search demonstrates how to execute multiple search operations simultaneously using Flow Core’s parallel execution capabilities. This pattern significantly reduces latency when querying multiple data sources.Basic Parallel Search
Copy
from nadoo_flow import ParallelNode, ChainableNode, MergeNode
import asyncio
import aiohttp
class WebSearchNode(ChainableNode):
"""Search the web using a search API"""
def __init__(self, search_engine="google"):
super().__init__()
self.search_engine = search_engine
async def execute(self, data):
query = data["query"]
# Simulate API call
await asyncio.sleep(0.5)
results = {
"google": ["Result 1 from Google", "Result 2 from Google"],
"bing": ["Result 1 from Bing", "Result 2 from Bing"],
"duckduckgo": ["Result 1 from DuckDuckGo", "Result 2 from DuckDuckGo"]
}
return {
"source": self.search_engine,
"results": results.get(self.search_engine, []),
"query": query
}
# Create parallel search workflow
parallel_search = (
ParallelNode([
WebSearchNode("google"),
WebSearchNode("bing"),
WebSearchNode("duckduckgo")
])
| MergeNode(merge_strategy="combine")
| ChainableNode.create(lambda x: {
"all_results": [
{"source": r["source"], "results": r["results"]}
for r in x
],
"total_results": sum(len(r["results"]) for r in x)
})
)
# Execute parallel search
result = await parallel_search.execute({"query": "Nadoo AI"})
print(f"Found {result['total_results']} results from {len(result['all_results'])} sources")
Advanced Multi-Source Search
Copy
from nadoo_flow import ParallelNode, ChainableNode, LLMNode
import asyncio
from typing import List, Dict, Any
class DatabaseSearchNode(ChainableNode):
"""Search internal database"""
def __init__(self, db_name: str, connection_string: str = None):
super().__init__()
self.db_name = db_name
self.connection_string = connection_string
async def execute(self, data):
query = data["query"]
# Simulate database query
await asyncio.sleep(0.3)
# Mock results
results = [
{"id": 1, "title": f"DB Result 1 from {self.db_name}"},
{"id": 2, "title": f"DB Result 2 from {self.db_name}"}
]
return {
"source": f"database:{self.db_name}",
"results": results,
"count": len(results)
}
class APISearchNode(ChainableNode):
"""Search external API"""
def __init__(self, api_name: str, endpoint: str):
super().__init__()
self.api_name = api_name
self.endpoint = endpoint
async def execute(self, data):
query = data["query"]
# Simulate API call
async with aiohttp.ClientSession() as session:
# In production, make actual API call
await asyncio.sleep(0.4)
# Mock results
results = {
"items": [
{"name": f"API Result 1 from {self.api_name}"},
{"name": f"API Result 2 from {self.api_name}"}
]
}
return {
"source": f"api:{self.api_name}",
"results": results["items"],
"api_version": "v1"
}
class FileSearchNode(ChainableNode):
"""Search local files"""
def __init__(self, directory: str):
super().__init__()
self.directory = directory
async def execute(self, data):
query = data["query"]
# Simulate file search
await asyncio.sleep(0.2)
# Mock results
results = [
{"path": f"{self.directory}/file1.txt", "matches": 3},
{"path": f"{self.directory}/file2.md", "matches": 1}
]
return {
"source": "filesystem",
"directory": self.directory,
"results": results,
"total_matches": sum(r["matches"] for r in results)
}
# Create comprehensive search pipeline
multi_source_search = (
# Parallel search across different sources
ParallelNode([
DatabaseSearchNode("products", "postgresql://..."),
DatabaseSearchNode("customers", "postgresql://..."),
APISearchNode("wikipedia", "https://api.wikipedia.org"),
APISearchNode("news", "https://newsapi.org"),
FileSearchNode("/docs"),
WebSearchNode("google")
])
| MergeNode(merge_strategy="combine")
| ChainableNode.create(lambda x: {
"all_sources": x,
"source_count": len(x),
"total_results": sum(len(s.get("results", [])) for s in x)
})
| LLMNode(
model="gpt-4",
system_prompt="Synthesize these search results into a coherent answer"
)
)
# Execute multi-source search
result = await multi_source_search.execute({"query": "What is Nadoo AI?"})
Semantic Search Pipeline
Copy
from nadoo_flow import ParallelNode, ChainableNode, FunctionNode
import numpy as np
from typing import List
class EmbeddingNode(ChainableNode):
"""Generate embeddings for text"""
def __init__(self, model="text-embedding-ada-002"):
super().__init__()
self.model = model
async def execute(self, data):
text = data.get("text", "")
# In production, call embedding API
# For demo, create random embedding
embedding = np.random.randn(1536).tolist()
return {
"text": text,
"embedding": embedding,
"model": self.model
}
class VectorSearchNode(ChainableNode):
"""Search vector database"""
def __init__(self, index_name: str, top_k: int = 5):
super().__init__()
self.index_name = index_name
self.top_k = top_k
async def execute(self, data):
query_embedding = data["embedding"]
# Simulate vector search
await asyncio.sleep(0.3)
# Mock similar documents
results = [
{
"id": f"doc_{i}",
"score": 0.95 - (i * 0.05),
"text": f"Similar document {i} from {self.index_name}"
}
for i in range(self.top_k)
]
return {
"index": self.index_name,
"results": results,
"top_score": results[0]["score"] if results else 0
}
# Semantic search across multiple indexes
semantic_search = (
# Generate query embedding
EmbeddingNode()
| ParallelNode([
VectorSearchNode("products_index", top_k=10),
VectorSearchNode("documentation_index", top_k=5),
VectorSearchNode("support_tickets_index", top_k=3)
])
| MergeNode(merge_strategy="combine")
| FunctionNode(lambda x: {
"semantic_results": [
{
"index": idx["index"],
"top_results": idx["results"][:3],
"best_score": idx["top_score"]
}
for idx in x
]
})
)
# Execute semantic search
result = await semantic_search.execute({
"text": "How to integrate Nadoo AI with my application?"
})
Real-Time Market Data Aggregation
Copy
class StockDataNode(ChainableNode):
"""Fetch stock data from provider"""
def __init__(self, provider: str):
super().__init__()
self.provider = provider
async def execute(self, data):
symbol = data["symbol"]
# Simulate API call with different latencies
latency = {"yahoo": 0.5, "alpha": 0.7, "iex": 0.3}.get(self.provider, 0.5)
await asyncio.sleep(latency)
# Mock stock data
price = 100 + np.random.randn() * 10
volume = int(1000000 * (1 + np.random.rand()))
return {
"provider": self.provider,
"symbol": symbol,
"price": round(price, 2),
"volume": volume,
"timestamp": "2024-01-01T12:00:00Z"
}
class NewsSearchNode(ChainableNode):
"""Search financial news"""
def __init__(self, source: str):
super().__init__()
self.source = source
async def execute(self, data):
symbol = data["symbol"]
await asyncio.sleep(0.4)
# Mock news results
news = [
{
"title": f"{symbol} announces Q4 earnings",
"source": self.source,
"sentiment": "positive"
},
{
"title": f"Analysts upgrade {symbol} to buy",
"source": self.source,
"sentiment": "positive"
}
]
return {
"source": self.source,
"articles": news,
"avg_sentiment": 0.7
}
class SocialSentimentNode(ChainableNode):
"""Analyze social media sentiment"""
async def execute(self, data):
symbol = data["symbol"]
await asyncio.sleep(0.6)
return {
"platform": "twitter",
"mentions": 1234,
"sentiment_score": 0.65,
"trending": True
}
# Real-time market analysis pipeline
market_analysis = (
ParallelNode([
# Price data from multiple providers
ParallelNode([
StockDataNode("yahoo"),
StockDataNode("alpha"),
StockDataNode("iex")
]) | MergeNode(merge_strategy="average_price"),
# News from multiple sources
ParallelNode([
NewsSearchNode("reuters"),
NewsSearchNode("bloomberg"),
NewsSearchNode("cnbc")
]) | MergeNode(merge_strategy="combine"),
# Social sentiment
SocialSentimentNode()
])
| MergeNode(merge_strategy="combine")
| ChainableNode.create(lambda x: {
"symbol": x[0][0]["symbol"],
"consensus_price": round(
sum(p["price"] for p in x[0]) / len(x[0]), 2
),
"total_news": sum(len(n["articles"]) for n in x[1]),
"social_sentiment": x[2]["sentiment_score"],
"recommendation": "BUY" if x[2]["sentiment_score"] > 0.5 else "HOLD"
})
)
# Execute market analysis
result = await market_analysis.execute({"symbol": "AAPL"})
print(f"Analysis for {result['symbol']}: {result['recommendation']}")
Performance Optimization Strategies
Timeout and Fallback
Copy
class TimeoutSearchNode(ChainableNode):
"""Search with timeout"""
def __init__(self, source: str, timeout: float = 5.0):
super().__init__()
self.source = source
self.timeout = timeout
async def execute(self, data):
try:
# Attempt search with timeout
result = await asyncio.wait_for(
self._search(data),
timeout=self.timeout
)
return result
except asyncio.TimeoutError:
return {
"source": self.source,
"error": "timeout",
"results": []
}
async def _search(self, data):
# Simulate slow search
await asyncio.sleep(10) # Will timeout
return {"source": self.source, "results": ["data"]}
# Parallel search with timeouts
resilient_search = (
ParallelNode([
TimeoutSearchNode("slow_api", timeout=2.0),
TimeoutSearchNode("fast_api", timeout=5.0),
TimeoutSearchNode("medium_api", timeout=3.0)
])
| MergeNode(merge_strategy="combine")
| FunctionNode(lambda x: {
"successful": [s for s in x if "error" not in s],
"failed": [s for s in x if "error" in s],
"total_attempted": len(x)
})
)
Batch Processing
Copy
class BatchSearchNode(ChainableNode):
"""Process multiple queries in batch"""
def __init__(self, batch_size: int = 10):
super().__init__()
self.batch_size = batch_size
async def execute(self, data):
queries = data["queries"]
results = []
# Process in batches
for i in range(0, len(queries), self.batch_size):
batch = queries[i:i + self.batch_size]
# Process batch in parallel
batch_results = await asyncio.gather(*[
self._search_single(q) for q in batch
])
results.extend(batch_results)
return {"batch_results": results}
async def _search_single(self, query):
await asyncio.sleep(0.1)
return {"query": query, "results": [f"Result for {query}"]}
# Batch parallel search
batch_search = (
FunctionNode(lambda x: {"queries": x["query_list"]})
| ParallelNode([
BatchSearchNode(batch_size=5),
BatchSearchNode(batch_size=10),
BatchSearchNode(batch_size=20)
])
| MergeNode(merge_strategy="first_complete")
)
Monitoring and Metrics
Copy
class MetricsNode(ChainableNode):
"""Track search metrics"""
async def execute(self, data):
start_time = asyncio.get_event_loop().time()
# Execute search
result = data.copy()
end_time = asyncio.get_event_loop().time()
result["metrics"] = {
"execution_time": end_time - start_time,
"result_count": len(result.get("results", [])),
"source": result.get("source", "unknown")
}
return result
# Search with metrics
monitored_search = (
ParallelNode([
WebSearchNode("google") | MetricsNode(),
DatabaseSearchNode("products") | MetricsNode(),
APISearchNode("news", "https://api.news.com") | MetricsNode()
])
| MergeNode(merge_strategy="combine")
| FunctionNode(lambda x: {
"results": [r for r in x],
"total_time": sum(r["metrics"]["execution_time"] for r in x),
"fastest_source": min(x, key=lambda r: r["metrics"]["execution_time"])["source"]
})
)
Best Practices
Resource Management
Resource Management
- Limit concurrent connections
- Implement connection pooling
- Use timeouts for all external calls
- Monitor memory usage in large batches
Error Handling
Error Handling
- Always handle timeouts gracefully
- Provide fallback results
- Log failures for debugging
- Implement circuit breakers for unreliable sources
Performance Tuning
Performance Tuning
- Balance parallelism with resource constraints
- Use caching for frequently accessed data
- Implement result deduplication
- Consider priority-based execution