"Converting RAG pipeline from synchronous to asynchronous" means modifying a RAG pipeline so that it can handle tasks concurrently rather than executing them one after another in a blocking manner.
Key Differences:
Synchronous (Blocking Execution): Each step (retrieving documents, processing them, generating responses) happens one at a time, waiting for the previous step to complete.
Asynchronous (Non-Blocking Execution): Multiple steps can run in parallel or without waiting for each other, improving efficiency and response time.
Why Convert to Asynchronous?
Faster processing, especially for multiple requests.
Better resource utilization.
Improved scalability in real-time applications.
How to Convert?
Use async/await in Python (e.g., with asyncio).
Modify API calls (e.g., to vector databases) to support async execution.
Use async-compatible libraries for retrieval and generation steps.
Here’s how you can convert a synchronous RAG pipeline to an asynchronous RAG pipeline using asyncio and LangChain.
Synchronous RAG Pipeline (Blocking Execution)
from langchain.chains import RetrievalQA from langchain.vectorstores import FAISS from langchain.embeddings.openai import OpenAIEmbeddings from langchain.chat_models import ChatOpenAI
# Load the vector database vectorstore = FAISS.load_local("faiss_index", OpenAIEmbeddings())
# Create a retriever retriever = vectorstore.as_retriever()
# Initialize the LLM (Language Model) llm = ChatOpenAI(model="gpt-3.5-turbo")
# Create RAG pipeline qa = RetrievalQA.from_chain_type(llm=llm, retriever=retriever)
# Run the pipeline synchronously query = "What is Artificial Intelligence?" response = qa.run(query) print(response) |
👆 Problem: Each step (retrieval, LLM call) waits for the previous step, making it slow for multiple queries.
Asynchronous RAG Pipeline (Non-Blocking Execution)
import asyncio from langchain.chains import RetrievalQA from langchain.vectorstores import FAISS from langchain.embeddings.openai import OpenAIEmbeddings from langchain.chat_models import ChatOpenAI
# Load vector database asynchronously vectorstore = FAISS.load_local("faiss_index", OpenAIEmbeddings()) retriever = vectorstore.as_retriever() llm = ChatOpenAI(model="gpt-3.5-turbo")
# Create an asynchronous RAG pipeline qa = RetrievalQA.from_chain_type(llm=llm, retriever=retriever)
# Define an async function for querying async def async_query(query): return await asyncio.to_thread(qa.run, query)
# Run multiple queries asynchronously async def main(): queries = ["What is Artificial Intelligence?", "How does AI impact healthcare?", "What is Machine Learning?"] tasks = [async_query(q) for q in queries] responses = await asyncio.gather(*tasks) for q, r in zip(queries, responses): print(f"Q: {q}\nA: {r}\n")
# Run the async main function asyncio.run(main()) |
👆 Benefits of Asynchronous Execution:
✅ Handles multiple queries concurrently.
✅ Faster execution, especially useful for APIs or web apps.
✅ Uses asyncio.gather() to send multiple queries at the same time.
No comments:
Post a Comment