Skip to main content

Overview

This guide helps you migrate existing AI workflows from other frameworks to Nadoo Flow Core. We cover the most common patterns and provide code examples for smooth transitions.

From LangChain

Basic Chain Migration

from langchain.chains import LLMChain
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate

prompt = PromptTemplate(
    template="Summarize: {text}",
    input_variables=["text"]
)

llm = OpenAI(temperature=0.7)
chain = LLMChain(llm=llm, prompt=prompt)

result = chain.run(text="Long article here...")
print(result)

Sequential Chain

from langchain.chains import SimpleSequentialChain

chain1 = LLMChain(llm=llm, prompt=prompt1)
chain2 = LLMChain(llm=llm, prompt=prompt2)

overall_chain = SimpleSequentialChain(
    chains=[chain1, chain2]
)

result = overall_chain.run("input")

Memory/Chat History

from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain

memory = ConversationBufferMemory()
conversation = ConversationChain(
    llm=llm,
    memory=memory
)

response1 = conversation.predict(input="Hi")
response2 = conversation.predict(input="What did I say?")

Retrieval QA

from langchain.chains import RetrievalQA
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_texts(texts, embeddings)

qa = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=vectorstore.as_retriever()
)

result = qa.run("What is Nadoo?")

From CrewAI

Agent Migration

from crewai import Agent, Task, Crew

researcher = Agent(
    role="Researcher",
    goal="Research and gather information",
    backstory="Expert researcher",
    verbose=True
)

task = Task(
    description="Research AI frameworks",
    agent=researcher
)

crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()

Multi-Agent Workflow

researcher = Agent(role="Researcher", ...)
writer = Agent(role="Writer", ...)

research_task = Task(description="Research", agent=researcher)
write_task = Task(description="Write", agent=writer)

crew = Crew(agents=[researcher, writer], tasks=[research_task, write_task])

From AutoGen

Conversational Agents

from autogen import AssistantAgent, UserProxyAgent

assistant = AssistantAgent(
    name="assistant",
    llm_config={"model": "gpt-4"}
)

user_proxy = UserProxyAgent(
    name="user_proxy",
    human_input_mode="NEVER"
)

user_proxy.initiate_chat(
    assistant,
    message="Solve this problem"
)

Migration Checklist

1

Assess Current Implementation

  • List all chains/agents/workflows
  • Identify external dependencies
  • Document data flows
  • Note any custom components
2

Plan Migration

  • Prioritize workflows by complexity
  • Identify reusable patterns
  • Plan for testing strategy
  • Schedule migration phases
3

Migrate Core Workflows

  • Start with simplest workflows
  • Convert one workflow at a time
  • Test thoroughly
  • Update integrations
4

Update Dependencies

  • Install Nadoo Flow Core
  • Update import statements
  • Configure environment
  • Update deployment configs
5

Test & Validate

  • Unit test each workflow
  • Integration testing
  • Performance testing
  • User acceptance testing
6

Deploy

  • Deploy to staging
  • Monitor for issues
  • Gradual rollout to production
  • Keep old system as fallback

Common Patterns

Pattern 1: Simple LLM Call

# Before (any framework)
result = llm.generate("prompt")

# After (Nadoo)
result = await LLMNode(model="gpt-4").execute({"prompt": "prompt"})

Pattern 2: Chain of Operations

# Before
step1 = operation1(input)
step2 = operation2(step1)
step3 = operation3(step2)

# After
workflow = (
    FunctionNode(operation1)
    | FunctionNode(operation2)
    | FunctionNode(operation3)
)
result = await workflow.execute(input)

Pattern 3: Parallel Execution

# Before
results = [func(input) for func in functions]

# After
workflow = ParallelNode([
    FunctionNode(func) for func in functions
])
results = await workflow.execute(input)

Pattern 4: Conditional Logic

# Before
if condition:
    result = path_a(input)
else:
    result = path_b(input)

# After
router = ConditionalNode()
router.add_path("condition_true", path_a_node)
router.add_path("condition_false", path_b_node)
result = await router.execute(input)

Troubleshooting

Problem: Your old code was synchronousSolution:
  • Add async to function definitions
  • Use await for all node executions
  • Update calling code to handle async
# Wrap sync functions
async def async_wrapper(data):
    return sync_function(data)
Problem: Input/output formats don’t matchSolution:
  • Use FunctionNode to transform data
  • Create adapter nodes
adapter = FunctionNode(lambda x: {
    "new_format": x["old_format"]
})
workflow = adapter | main_workflow
Problem: Framework-specific features not availableSolution:
  • Implement custom nodes
  • Use parallel workflows
  • Check Flow Core examples
  • Request feature via GitHub
Problem: Migration is slower than expectedSolution:
  • Enable async throughout
  • Use ParallelNode for independent tasks
  • Implement caching
  • Profile and optimize bottlenecks

Best Practices

1. Incremental Migration

Don’t migrate everything at once:
# Start small
simple_workflow = LLMNode(model="gpt-4")

# Gradually add complexity
workflow = (
    PreprocessNode()
    | LLMNode(model="gpt-4")
    | PostprocessNode()
)

# Finally migrate complex workflows
full_workflow = build_complex_workflow()

2. Maintain Backward Compatibility

Create compatibility layers:
class LangChainCompatNode(ChainableNode):
    """Wrapper for LangChain components"""

    def __init__(self, langchain_chain):
        super().__init__()
        self.chain = langchain_chain

    async def execute(self, data):
        # Convert to LangChain format
        lc_input = convert_to_langchain(data)

        # Run LangChain chain
        result = self.chain.run(lc_input)

        # Convert back
        return convert_from_langchain(result)

3. Test Continuously

# Unit tests
async def test_workflow():
    result = await workflow.execute(test_input)
    assert result["output"] == expected_output

# Integration tests
async def test_end_to_end():
    result = await full_workflow.execute(real_data)
    validate_result(result)

4. Document Changes

Keep a migration log:
## Migration Log

### 2024-01-15
- Migrated user_onboarding workflow
- Replaced LangChain SimpleSequentialChain
- Performance: 45% faster
- Issues: None

### 2024-01-20
- Migrated RAG pipeline
- Custom vector search implementation
- Performance: Similar
- Issues: Memory management (resolved)

Getting Help

Next Steps