Agentic RAG with Web Search enhances local document querying with real-time web search capabilities. This multi-agent system uses CrewAI to orchestrate specialized agents that search both a user-uploaded PDF and the web, then synthesize comprehensive answers.
import pdfplumberfrom openai import OpenAIimport uuidclient = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))def extract_text_from_pdf(pdf_path): """Extract text from PDF using pdfplumber.""" text = [] with pdfplumber.open(pdf_path) as pdf: for page in pdf.pages: page_text = page.extract_text() if page_text: text.append(page_text.strip()) return textdef get_openai_embedding(text): """Generate OpenAI embeddings.""" response = client.embeddings.create( input=text, model="text-embedding-3-large" # 3072 dimensions ) return response.data[0].embeddingdef load_pdf_to_qdrant(pdf_path): """Load PDF into Qdrant vector database.""" # Extract text chunks text_chunks = extract_text_from_pdf(pdf_path) # Create collection if it doesn't exist if qdrant.collection_exists(collection_name): qdrant.delete_collection(collection_name) qdrant.create_collection( collection_name=collection_name, vectors_config=VectorParams(size=3072, distance=Distance.COSINE) ) # Generate embeddings and store points = [] for chunk in text_chunks: embedding = get_openai_embedding(chunk) points.append(PointStruct( id=str(uuid.uuid4()), vector=embedding, payload={"text": chunk} # Store original text as metadata )) qdrant.upsert(collection_name=collection_name, points=points)
from crewai import Agent, Task, Crew, Processfrom crewai_tools import EXASearchTool, QdrantVectorSearchTool# Initialize toolssearch_tool = EXASearchTool()qdrant_tool = QdrantVectorSearchTool( qdrant_url=os.getenv("QDRANT_URL"), qdrant_api_key=os.getenv("QDRANT_API_KEY"), collection_name=collection_name, limit=3, # Return top 3 results score_threshold=0.35 # Minimum similarity score)# Database search agentdb_search_agent = Agent( role="Senior Semantic Search Agent", goal="Find and analyze documents based on semantic search", backstory="""You are an expert research assistant who can find relevant information using semantic search in a Qdrant database.""", tools=[qdrant_tool], verbose=True)# Web search agentsearch_agent = Agent( role="Senior Search Agent", goal="Search for relevant information using web search", backstory="""You are an expert search assistant who can find relevant information using the EXA search tool.""", tools=[search_tool], verbose=True)# Answer synthesis agentanswer_agent = Agent( role="Senior Answer Assistant", goal="Generate answers based on context provided", backstory="""You are an expert answer assistant who can generate comprehensive answers based on multiple sources.""", verbose=True)
from crewai import Task# Database search taskdb_search_task = Task( description="""Search for relevant documents about the {query}. Your final answer should include: - The relevant information found - The similarity scores of the results - The metadata of the relevant documents""", expected_output="A list of relevant documents with similarity scores and metadata.", agent=db_search_agent, tools=[qdrant_tool])# Web search tasksearch_task = Task( description="""Search for relevant information about the {query} using web search.""", expected_output="Search results with relevant context and ranking.", agent=search_agent, tools=[search_tool])# Answer generation taskanswer_task = Task( description="""Given the context from database and web search, generate a comprehensive answer to: {query} Format your response with: - Summary of findings - Key results from both sources - Actionable insights - References with links """, expected_output="A comprehensive, well-formatted markdown answer.", agent=answer_agent)
from crewai import Crew, Process# Create crew with sequential processcrew = Crew( agents=[db_search_agent, search_agent, answer_agent], tasks=[db_search_task, search_task, answer_task], process=Process.sequential, # Run tasks in order verbose=True)# Run the crewresult = crew.kickoff(inputs={"query": "What is quantum computing?"})print(result)
import streamlit as stimport tempfileimport osst.title("🤖 Agentic RAG with Web Search")# Sidebar: PDF upload and API keyswith st.sidebar: st.header("Configuration") qdrant_api_key = st.text_input("Qdrant API Key", type="password") exa_api_key = st.text_input("Exa API Key", type="password") st.divider() st.header("Upload PDF") uploaded_file = st.file_uploader("Choose a PDF", type=["pdf"]) if uploaded_file: # Save to temp file with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: tmp_file.write(uploaded_file.getbuffer()) temp_path = tmp_file.name with st.spinner("Loading PDF into Qdrant..."): try: load_pdf_to_qdrant(temp_path) st.success("✅ PDF loaded successfully!") st.session_state.pdf_loaded = True except Exception as e: st.error(f"Error: {str(e)}") # Clean up temp file os.unlink(temp_path)# Chat interfaceif "messages" not in st.session_state: st.session_state.messages = []for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"])if prompt := st.chat_input("Ask a question about your document..."): if not st.session_state.get('pdf_loaded', False): st.warning("Please upload a PDF first.") else: # Add user message st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) # Run crew with st.chat_message("assistant"): with st.spinner("Thinking..."): result = crew.kickoff(inputs={"query": prompt}) st.markdown(result) st.session_state.messages.append({"role": "assistant", "content": result})