Skip to main content

Overview

Parallel search demonstrates how to execute multiple search operations simultaneously using Flow Core’s parallel execution capabilities. This pattern significantly reduces latency when querying multiple data sources.
from nadoo_flow import ParallelNode, ChainableNode, MergeNode
import asyncio
import aiohttp

class WebSearchNode(ChainableNode):
    """Search the web using a search API"""

    def __init__(self, search_engine="google"):
        super().__init__()
        self.search_engine = search_engine

    async def execute(self, data):
        query = data["query"]
        # Simulate API call
        await asyncio.sleep(0.5)

        results = {
            "google": ["Result 1 from Google", "Result 2 from Google"],
            "bing": ["Result 1 from Bing", "Result 2 from Bing"],
            "duckduckgo": ["Result 1 from DuckDuckGo", "Result 2 from DuckDuckGo"]
        }

        return {
            "source": self.search_engine,
            "results": results.get(self.search_engine, []),
            "query": query
        }

# Create parallel search workflow
parallel_search = (
    ParallelNode([
        WebSearchNode("google"),
        WebSearchNode("bing"),
        WebSearchNode("duckduckgo")
    ])
    | MergeNode(merge_strategy="combine")
    | ChainableNode.create(lambda x: {
        "all_results": [
            {"source": r["source"], "results": r["results"]}
            for r in x
        ],
        "total_results": sum(len(r["results"]) for r in x)
    })
)

# Execute parallel search
result = await parallel_search.execute({"query": "Nadoo AI"})
print(f"Found {result['total_results']} results from {len(result['all_results'])} sources")
from nadoo_flow import ParallelNode, ChainableNode, LLMNode
import asyncio
from typing import List, Dict, Any

class DatabaseSearchNode(ChainableNode):
    """Search internal database"""

    def __init__(self, db_name: str, connection_string: str = None):
        super().__init__()
        self.db_name = db_name
        self.connection_string = connection_string

    async def execute(self, data):
        query = data["query"]

        # Simulate database query
        await asyncio.sleep(0.3)

        # Mock results
        results = [
            {"id": 1, "title": f"DB Result 1 from {self.db_name}"},
            {"id": 2, "title": f"DB Result 2 from {self.db_name}"}
        ]

        return {
            "source": f"database:{self.db_name}",
            "results": results,
            "count": len(results)
        }

class APISearchNode(ChainableNode):
    """Search external API"""

    def __init__(self, api_name: str, endpoint: str):
        super().__init__()
        self.api_name = api_name
        self.endpoint = endpoint

    async def execute(self, data):
        query = data["query"]

        # Simulate API call
        async with aiohttp.ClientSession() as session:
            # In production, make actual API call
            await asyncio.sleep(0.4)

            # Mock results
            results = {
                "items": [
                    {"name": f"API Result 1 from {self.api_name}"},
                    {"name": f"API Result 2 from {self.api_name}"}
                ]
            }

        return {
            "source": f"api:{self.api_name}",
            "results": results["items"],
            "api_version": "v1"
        }

class FileSearchNode(ChainableNode):
    """Search local files"""

    def __init__(self, directory: str):
        super().__init__()
        self.directory = directory

    async def execute(self, data):
        query = data["query"]

        # Simulate file search
        await asyncio.sleep(0.2)

        # Mock results
        results = [
            {"path": f"{self.directory}/file1.txt", "matches": 3},
            {"path": f"{self.directory}/file2.md", "matches": 1}
        ]

        return {
            "source": "filesystem",
            "directory": self.directory,
            "results": results,
            "total_matches": sum(r["matches"] for r in results)
        }

# Create comprehensive search pipeline
multi_source_search = (
    # Parallel search across different sources
    ParallelNode([
        DatabaseSearchNode("products", "postgresql://..."),
        DatabaseSearchNode("customers", "postgresql://..."),
        APISearchNode("wikipedia", "https://api.wikipedia.org"),
        APISearchNode("news", "https://newsapi.org"),
        FileSearchNode("/docs"),
        WebSearchNode("google")
    ])
    | MergeNode(merge_strategy="combine")
    | ChainableNode.create(lambda x: {
        "all_sources": x,
        "source_count": len(x),
        "total_results": sum(len(s.get("results", [])) for s in x)
    })
    | LLMNode(
        model="gpt-4",
        system_prompt="Synthesize these search results into a coherent answer"
    )
)

# Execute multi-source search
result = await multi_source_search.execute({"query": "What is Nadoo AI?"})

Semantic Search Pipeline

from nadoo_flow import ParallelNode, ChainableNode, FunctionNode
import numpy as np
from typing import List

class EmbeddingNode(ChainableNode):
    """Generate embeddings for text"""

    def __init__(self, model="text-embedding-ada-002"):
        super().__init__()
        self.model = model

    async def execute(self, data):
        text = data.get("text", "")

        # In production, call embedding API
        # For demo, create random embedding
        embedding = np.random.randn(1536).tolist()

        return {
            "text": text,
            "embedding": embedding,
            "model": self.model
        }

class VectorSearchNode(ChainableNode):
    """Search vector database"""

    def __init__(self, index_name: str, top_k: int = 5):
        super().__init__()
        self.index_name = index_name
        self.top_k = top_k

    async def execute(self, data):
        query_embedding = data["embedding"]

        # Simulate vector search
        await asyncio.sleep(0.3)

        # Mock similar documents
        results = [
            {
                "id": f"doc_{i}",
                "score": 0.95 - (i * 0.05),
                "text": f"Similar document {i} from {self.index_name}"
            }
            for i in range(self.top_k)
        ]

        return {
            "index": self.index_name,
            "results": results,
            "top_score": results[0]["score"] if results else 0
        }

# Semantic search across multiple indexes
semantic_search = (
    # Generate query embedding
    EmbeddingNode()
    | ParallelNode([
        VectorSearchNode("products_index", top_k=10),
        VectorSearchNode("documentation_index", top_k=5),
        VectorSearchNode("support_tickets_index", top_k=3)
    ])
    | MergeNode(merge_strategy="combine")
    | FunctionNode(lambda x: {
        "semantic_results": [
            {
                "index": idx["index"],
                "top_results": idx["results"][:3],
                "best_score": idx["top_score"]
            }
            for idx in x
        ]
    })
)

# Execute semantic search
result = await semantic_search.execute({
    "text": "How to integrate Nadoo AI with my application?"
})

Real-Time Market Data Aggregation

class StockDataNode(ChainableNode):
    """Fetch stock data from provider"""

    def __init__(self, provider: str):
        super().__init__()
        self.provider = provider

    async def execute(self, data):
        symbol = data["symbol"]

        # Simulate API call with different latencies
        latency = {"yahoo": 0.5, "alpha": 0.7, "iex": 0.3}.get(self.provider, 0.5)
        await asyncio.sleep(latency)

        # Mock stock data
        price = 100 + np.random.randn() * 10
        volume = int(1000000 * (1 + np.random.rand()))

        return {
            "provider": self.provider,
            "symbol": symbol,
            "price": round(price, 2),
            "volume": volume,
            "timestamp": "2024-01-01T12:00:00Z"
        }

class NewsSearchNode(ChainableNode):
    """Search financial news"""

    def __init__(self, source: str):
        super().__init__()
        self.source = source

    async def execute(self, data):
        symbol = data["symbol"]

        await asyncio.sleep(0.4)

        # Mock news results
        news = [
            {
                "title": f"{symbol} announces Q4 earnings",
                "source": self.source,
                "sentiment": "positive"
            },
            {
                "title": f"Analysts upgrade {symbol} to buy",
                "source": self.source,
                "sentiment": "positive"
            }
        ]

        return {
            "source": self.source,
            "articles": news,
            "avg_sentiment": 0.7
        }

class SocialSentimentNode(ChainableNode):
    """Analyze social media sentiment"""

    async def execute(self, data):
        symbol = data["symbol"]

        await asyncio.sleep(0.6)

        return {
            "platform": "twitter",
            "mentions": 1234,
            "sentiment_score": 0.65,
            "trending": True
        }

# Real-time market analysis pipeline
market_analysis = (
    ParallelNode([
        # Price data from multiple providers
        ParallelNode([
            StockDataNode("yahoo"),
            StockDataNode("alpha"),
            StockDataNode("iex")
        ]) | MergeNode(merge_strategy="average_price"),

        # News from multiple sources
        ParallelNode([
            NewsSearchNode("reuters"),
            NewsSearchNode("bloomberg"),
            NewsSearchNode("cnbc")
        ]) | MergeNode(merge_strategy="combine"),

        # Social sentiment
        SocialSentimentNode()
    ])
    | MergeNode(merge_strategy="combine")
    | ChainableNode.create(lambda x: {
        "symbol": x[0][0]["symbol"],
        "consensus_price": round(
            sum(p["price"] for p in x[0]) / len(x[0]), 2
        ),
        "total_news": sum(len(n["articles"]) for n in x[1]),
        "social_sentiment": x[2]["sentiment_score"],
        "recommendation": "BUY" if x[2]["sentiment_score"] > 0.5 else "HOLD"
    })
)

# Execute market analysis
result = await market_analysis.execute({"symbol": "AAPL"})
print(f"Analysis for {result['symbol']}: {result['recommendation']}")

Performance Optimization Strategies

Timeout and Fallback

class TimeoutSearchNode(ChainableNode):
    """Search with timeout"""

    def __init__(self, source: str, timeout: float = 5.0):
        super().__init__()
        self.source = source
        self.timeout = timeout

    async def execute(self, data):
        try:
            # Attempt search with timeout
            result = await asyncio.wait_for(
                self._search(data),
                timeout=self.timeout
            )
            return result
        except asyncio.TimeoutError:
            return {
                "source": self.source,
                "error": "timeout",
                "results": []
            }

    async def _search(self, data):
        # Simulate slow search
        await asyncio.sleep(10)  # Will timeout
        return {"source": self.source, "results": ["data"]}

# Parallel search with timeouts
resilient_search = (
    ParallelNode([
        TimeoutSearchNode("slow_api", timeout=2.0),
        TimeoutSearchNode("fast_api", timeout=5.0),
        TimeoutSearchNode("medium_api", timeout=3.0)
    ])
    | MergeNode(merge_strategy="combine")
    | FunctionNode(lambda x: {
        "successful": [s for s in x if "error" not in s],
        "failed": [s for s in x if "error" in s],
        "total_attempted": len(x)
    })
)

Batch Processing

class BatchSearchNode(ChainableNode):
    """Process multiple queries in batch"""

    def __init__(self, batch_size: int = 10):
        super().__init__()
        self.batch_size = batch_size

    async def execute(self, data):
        queries = data["queries"]
        results = []

        # Process in batches
        for i in range(0, len(queries), self.batch_size):
            batch = queries[i:i + self.batch_size]

            # Process batch in parallel
            batch_results = await asyncio.gather(*[
                self._search_single(q) for q in batch
            ])
            results.extend(batch_results)

        return {"batch_results": results}

    async def _search_single(self, query):
        await asyncio.sleep(0.1)
        return {"query": query, "results": [f"Result for {query}"]}

# Batch parallel search
batch_search = (
    FunctionNode(lambda x: {"queries": x["query_list"]})
    | ParallelNode([
        BatchSearchNode(batch_size=5),
        BatchSearchNode(batch_size=10),
        BatchSearchNode(batch_size=20)
    ])
    | MergeNode(merge_strategy="first_complete")
)

Monitoring and Metrics

class MetricsNode(ChainableNode):
    """Track search metrics"""

    async def execute(self, data):
        start_time = asyncio.get_event_loop().time()

        # Execute search
        result = data.copy()

        end_time = asyncio.get_event_loop().time()

        result["metrics"] = {
            "execution_time": end_time - start_time,
            "result_count": len(result.get("results", [])),
            "source": result.get("source", "unknown")
        }

        return result

# Search with metrics
monitored_search = (
    ParallelNode([
        WebSearchNode("google") | MetricsNode(),
        DatabaseSearchNode("products") | MetricsNode(),
        APISearchNode("news", "https://api.news.com") | MetricsNode()
    ])
    | MergeNode(merge_strategy="combine")
    | FunctionNode(lambda x: {
        "results": [r for r in x],
        "total_time": sum(r["metrics"]["execution_time"] for r in x),
        "fastest_source": min(x, key=lambda r: r["metrics"]["execution_time"])["source"]
    })
)

Best Practices

  • Limit concurrent connections
  • Implement connection pooling
  • Use timeouts for all external calls
  • Monitor memory usage in large batches
  • Always handle timeouts gracefully
  • Provide fallback results
  • Log failures for debugging
  • Implement circuit breakers for unreliable sources
  • Balance parallelism with resource constraints
  • Use caching for frequently accessed data
  • Implement result deduplication
  • Consider priority-based execution

Next Steps