Monday, February 3, 2025

How to convert RAG pipeline from synchronous to asynchronous?


 "Converting RAG pipeline from synchronous to asynchronous" means modifying a RAG pipeline so that it can handle tasks concurrently rather than executing them one after another in a blocking manner.

Key Differences:

  • Synchronous (Blocking Execution): Each step (retrieving documents, processing them, generating responses) happens one at a time, waiting for the previous step to complete.

  • Asynchronous (Non-Blocking Execution): Multiple steps can run in parallel or without waiting for each other, improving efficiency and response time.

Why Convert to Asynchronous?

  • Faster processing, especially for multiple requests.

  • Better resource utilization.

  • Improved scalability in real-time applications.

How to Convert?

  • Use async/await in Python (e.g., with asyncio).

  • Modify API calls (e.g., to vector databases) to support async execution.

  • Use async-compatible libraries for retrieval and generation steps.

Here’s how you can convert a synchronous RAG pipeline to an asynchronous RAG pipeline using asyncio and LangChain.

Synchronous RAG Pipeline (Blocking Execution)

from langchain.chains import RetrievalQA
from langchain.vectorstores import FAISS
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.chat_models import ChatOpenAI

# Load the vector database
vectorstore = FAISS.load_local("faiss_index", OpenAIEmbeddings())

# Create a retriever
retriever = vectorstore.as_retriever()

# Initialize the LLM (Language Model)
llm = ChatOpenAI(model="gpt-3.5-turbo")

# Create RAG pipeline
qa = RetrievalQA.from_chain_type(llm=llm, retriever=retriever)

# Run the pipeline synchronously
query = "What is Artificial Intelligence?"
response = qa.run(query)
print(response)


👆 Problem: Each step (retrieval, LLM call) waits for the previous step, making it slow for multiple queries.


Asynchronous RAG Pipeline (Non-Blocking Execution)


import asyncio
from langchain.chains import RetrievalQA
from langchain.vectorstores import FAISS
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.chat_models import ChatOpenAI

# Load vector database asynchronously
vectorstore = FAISS.load_local("faiss_index", OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
llm = ChatOpenAI(model="gpt-3.5-turbo")

# Create an asynchronous RAG pipeline
qa = RetrievalQA.from_chain_type(llm=llm, retriever=retriever)

# Define an async function for querying
async def async_query(query):
    return await asyncio.to_thread(qa.run, query)

# Run multiple queries asynchronously
async def main():
    queries = ["What is Artificial Intelligence?", "How does AI impact healthcare?", "What is Machine Learning?"]
    tasks = [async_query(q) for q in queries]
    responses = await asyncio.gather(*tasks)
   
    for q, r in zip(queries, responses):
        print(f"Q: {q}\nA: {r}\n")

# Run the async main function
asyncio.run(main())


👆 Benefits of Asynchronous Execution:
✅ Handles multiple queries concurrently.
✅ Faster execution, especially useful for APIs or web apps.
✅ Uses asyncio.gather() to send multiple queries at the same time.

No comments:

Search This Blog