Overview
The Conference Talk Generator is a sophisticated RAG application that creates unique KubeCon talk proposals by combining historical conference data from Couchbase vector search with real-time web research using Google ADK agents.Dual-Context RAG
Historical data + Real-time research
Vector Search
Couchbase with 4096-dim embeddings
ADK Research
Parallel web research agents
Multi-Stage Pipeline
Crawling, embedding, RAG generation
Architecture Overview
This application follows a comprehensive 5-stage pipeline:Multi-Agent Architecture
RAG Application Flow
import streamlit as st
from openai import OpenAI
from couchbase.cluster import Cluster
from adk_research_agent import run_adk_research
# Initialize OpenAI client for Nebius
client = OpenAI(
base_url=os.getenv("NEBIUS_API_BASE"),
api_key=os.getenv("NEBIUS_API_KEY")
)
def generate_talk_suggestion(
query: str,
similar_talks: List[Dict],
adk_research: str
) -> str:
"""Synthesizes all context for final proposal."""
# Historical context from vector search
historical_context = "\n\n".join([
f"Title: {talk['title']}\n"
f"Description: {talk['description']}\n"
f"Category: {talk['category']}"
for talk in similar_talks
])
prompt = f"""
You are an expert conference program advisor.
**User's Core Idea:** "{query}"
---
**PART 1: HISTORICAL CONTEXT (FROM DATABASE)**
Similar talks from past conferences. Offer fresh perspective:
{historical_context}
---
**PART 2: REAL-TIME WEB ANALYSIS (FROM RESEARCH AGENT)**
Current discussions and emerging trends:
{adk_research}
---
**YOUR TASK:**
Synthesize ALL parts. Create timely talk proposal that:
1. Avoids repetition of historical talks
2. Addresses current trends from web research
3. Provides genuine gap or novel angle
**FORMAT:**
**Title:** [Compelling modern title]
**Abstract:** [2-3 paragraphs with new insights]
**Key Learning Objectives:** [3-4 bullet points]
**Target Audience:** [Specify audience level]
**Why This Talk is Unique:** [Differentiation from past talks]
"""
response = client.chat.completions.create(
model="Qwen/Qwen3-235B-A22B",
messages=[
{"role": "system", "content": "Expert conference program advisor"},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=2048
)
return response.choices[0].message.content
Stage 1: URL Extraction
import sys
from bs4 import BeautifulSoup
import re
def extract_event_urls():
"""Extract KubeCon talk URLs from HTML schedule."""
html_content = sys.stdin.read()
soup = BeautifulSoup(html_content, 'html.parser')
# Find all event links
links = soup.find_all('a', href=re.compile(r'event/'))
urls = set()
for link in links:
href = link.get('href')
if href:
full_url = f"https://events.linuxfoundation.org{href}"
urls.add(full_url)
# Merge with existing URLs
try:
with open('event_urls.txt', 'r') as f:
existing = set(line.strip() for line in f)
urls.update(existing)
except FileNotFoundError:
pass
# Save updated URLs
with open('event_urls.txt', 'w') as f:
for url in sorted(urls):
f.write(f"{url}\n")
print(f"Extracted {len(urls)} unique URLs")
# Usage:
# python extract_events.py < schedule.html
Stage 2: Data Crawling
import asyncio
from crawl4ai import AsyncWebCrawler
from bs4 import BeautifulSoup
from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
async def extract_talk_info(html_content):
"""Parse HTML and extract structured talk data."""
soup = BeautifulSoup(html_content, 'html.parser')
talk_info = {
'title': 'Unknown',
'description': 'No description available',
'speaker': 'Unknown',
'category': 'Uncategorized',
'date': 'Unknown',
'location': 'Unknown'
}
# Extract title from event name span
title_elem = soup.find('span', class_='event')
if title_elem:
name_elem = title_elem.find('a', class_='name')
if name_elem:
talk_info['title'] = name_elem.text.strip()
# Extract description from tip-description div
desc_elem = soup.find('div', class_='tip-description')
if desc_elem:
talk_info['description'] = desc_elem.text.strip()
# Extract speakers
speakers = []
speakers_div = soup.find('div', class_='sched-event-details-roles')
if speakers_div:
speaker_elems = speakers_div.find_all('h2')
for speaker_elem in speaker_elems:
speaker_name = speaker_elem.find('a')
if speaker_name:
speakers.append(speaker_name.text.strip())
talk_info['speaker'] = ' & '.join(speakers) if speakers else 'Unknown'
# Extract category
category_elem = soup.find('div', class_='sched-event-type')
if category_elem:
category_link = category_elem.find('a')
if category_link:
talk_info['category'] = category_link.text.strip()
return talk_info
async def crawl_talks():
"""Crawl all URLs and store in Couchbase."""
# Read URLs
with open('event_urls.txt', 'r') as f:
urls = [line.strip() for line in f if line.strip()]
# Connect to Couchbase
auth = PasswordAuthenticator(
os.getenv('CB_USERNAME'),
os.getenv('CB_PASSWORD')
)
cluster = Cluster(os.getenv('CB_CONNECTION_STRING'), ClusterOptions(auth))
bucket = cluster.bucket(os.getenv('CB_BUCKET'))
collection = bucket.collection(os.getenv('CB_COLLECTION'))
# Crawl in batches
async with AsyncWebCrawler() as crawler:
batch_size = 5
for i in range(0, len(urls), batch_size):
batch_urls = urls[i:i + batch_size]
batch_results = await crawler.arun_many(batch_urls)
for url, result in zip(batch_urls, batch_results):
if result and result.html:
talk_info = await extract_talk_info(result.html)
talk_info['url'] = url
talk_info['crawled_at'] = datetime.utcnow().isoformat()
# Store with document key based on URL
doc_key = f"talk_{url.split('/')[-1]}"
collection.upsert(doc_key, talk_info)
print(f"Stored: {talk_info['title']}")
await asyncio.sleep(1) # Rate limiting
# Run:
# python couchbase_utils.py
Stage 3: Embedding Generation
from openai import OpenAI
from couchbase.cluster import Cluster
client = OpenAI(
base_url=os.getenv("NEBIUS_API_BASE"),
api_key=os.getenv("NEBIUS_API_KEY")
)
def generate_embeddings():
"""Generate vector embeddings for all talks."""
# Connect to Couchbase
cluster = Cluster(...)
collection = bucket.collection('talks')
# Query all documents
query = "SELECT * FROM `kubecon-talks`"
result = cluster.query(query)
for row in result:
talk = row['kubecon-talks']
# Combine fields for embedding
text = f"{talk['title']} {talk['description']} {talk['category']}"
# Generate embedding (4096 dimensions)
response = client.embeddings.create(
model="intfloat/e5-mistral-7b-instruct",
input=text
)
embedding = response.data[0].embedding
# Update document with embedding
talk['embedding'] = embedding
collection.upsert(talk['id'], talk)
print(f"Embedded: {talk['title']}")
# Run:
# python embeddinggeneration.py
Stage 4: ADK Research Agent
from google.adk.agents.parallel_agent import ParallelAgent
from google.adk.agents.sequential_agent import SequentialAgent
from google.adk.agents.llm_agent import LlmAgent
from exa_py import Exa
from tavily import TavilyClient
from linkup import LinkupClient
def exa_search_ai(topic: str) -> dict:
"""Search for latest developments about topic."""
exa_client = Exa(api_key=os.getenv("EXA_API_KEY"))
results = exa_client.search_and_contents(
query=f"Latest developments, discussions, and news about {topic}",
num_results=5,
start_published_date=(datetime.now() - timedelta(days=90)).isoformat()
)
return {"type": "exa", "results": [r.__dict__ for r in results.results]}
def tavily_search_ai_analysis(topic: str) -> dict:
"""Search social platforms for community sentiment."""
client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
response = client.search(
query=f"Community sentiment and technical questions about {topic}",
include_domains=["x.com", "reddit.com", "dev.to"],
time_range="month"
)
return {"type": "tavily", "results": response.get("results", [])}
def run_adk_research(topic: str) -> str:
"""Run parallel research agents and synthesize results."""
model = LiteLlm(
model="nebius/Qwen/Qwen3-235B-A22B",
api_base=os.getenv("NEBIUS_API_BASE"),
api_key=os.getenv("NEBIUS_API_KEY")
)
# Research agents
exa_agent = LlmAgent(
name="ExaAgent",
model=model,
instruction=f"Use exa_search_ai to fetch latest about '{topic}'",
tools=[exa_search_ai],
output_key="exa_results"
)
tavily_agent = LlmAgent(
name="TavilyAgent",
model=model,
instruction=f"Use tavily_search_ai_analysis for sentiment on '{topic}'",
tools=[tavily_search_ai_analysis],
output_key="tavily_results"
)
# Synthesis agent
summary_agent = LlmAgent(
name="SummaryAgent",
model=model,
instruction="""
Combine 'exa_results' into coherent summary.
Focus on latest trends, key talking points, emerging technologies.
""",
output_key="final_summary"
)
# Parallel + Sequential pipeline
parallel_search = ParallelAgent(
name="ParallelSearch",
sub_agents=[exa_agent]
)
pipeline = SequentialAgent(
name="ResearchPipeline",
sub_agents=[parallel_search, summary_agent]
)
# Execute
session_service = InMemorySessionService()
runner = Runner(agent=pipeline, session_service=session_service)
content = types.Content(
role="user",
parts=[types.Part(text=f"Start analysis for {topic}")]
)
events = runner.run(session_id="session", new_message=content)
for event in events:
if event.is_final_response():
return event.content.parts[0].text
return "Research failed"
Stage 5: Vector Search & Generation
from couchbase.vector_search import VectorQuery, VectorSearch
from couchbase.search import SearchRequest, MatchNoneQuery
class CouchbaseConnection:
def generate_embedding(self, text: str) -> List[float]:
"""Generate query embedding."""
response = client.embeddings.create(
model="intfloat/e5-mistral-7b-instruct",
input=text
)
return response.data[0].embedding
def get_similar_talks(self, query: str, num_results: int = 5):
"""Vector search for similar historical talks."""
embedding = self.generate_embedding(query)
search_req = SearchRequest.create(
MatchNoneQuery()
).with_vector_search(
VectorSearch.from_vector_query(
VectorQuery("embedding", embedding, num_candidates=num_results)
)
)
result = self.scope.search(self.search_index_name, search_req)
rows = list(result.rows())
similar_talks = []
for row in rows:
doc = self.collection.get(row.id)
if doc and doc.value:
talk = doc.value
similar_talks.append({
"title": talk.get("title"),
"description": talk.get("description"),
"category": talk.get("category"),
"speaker": talk.get("speaker"),
"score": row.score
})
return similar_talks
Streamlit Application
import streamlit as st
st.title("KubeCon Talk Proposal Generator")
# Initialize Couchbase
if 'cb_connection' not in st.session_state:
st.session_state.cb_connection = CouchbaseConnection()
cb = st.session_state.cb_connection
user_query = st.text_area(
"Enter your talk idea:",
placeholder="e.g., Using OpenTelemetry inferred spans in serverless"
)
if st.button("Generate Proposal"):
adk_research_results = ""
similar_talks = []
# Step 1: ADK Research
with st.spinner("Running web research..."):
adk_research_results = run_adk_research(user_query)
st.success("Real-time research complete")
# Step 2: Vector Search
with st.spinner("Searching historical talks..."):
similar_talks = cb.get_similar_talks(user_query)
st.success("Historical context retrieved")
# Step 3: Generate Proposal
with st.spinner("Synthesizing proposal..."):
final_proposal = generate_talk_suggestion(
user_query,
similar_talks,
adk_research_results
)
st.success("Proposal generated")
st.divider()
st.subheader("Generated Talk Proposal")
st.markdown(final_proposal)
# Show context used
with st.expander("View Web Research"):
st.markdown(adk_research_results)
with st.expander("View Historical Context"):
st.json(similar_talks)
Couchbase Vector Search Setup
Index Configuration
{
"name": "kubecontalks",
"type": "fulltext-index",
"params": {
"mapping": {
"types": {
"_default": {
"enabled": true,
"dynamic": true,
"properties": {
"embedding": {
"enabled": true,
"fields": [
{
"name": "embedding",
"type": "vector",
"dims": 4096,
"similarity": "dot_product"
}
]
}
}
}
}
}
}
}
Advanced Patterns
Dual-Context RAG
# Historical Context (vector DB)
similar_talks = cb.get_similar_talks(query)
# Real-Time Context (web research)
adk_research = run_adk_research(query)
# Synthesis (LLM combines both)
proposal = generate_talk_suggestion(query, similar_talks, adk_research)
Parallel Research with ADK
# Multiple agents search simultaneously
parallel_search = ParallelAgent(
sub_agents=[exa_agent, tavily_agent, linkup_agent]
)
# Then sequential synthesis
pipeline = SequentialAgent(
sub_agents=[parallel_search, summary_agent]
)
Configuration
Environment Variables
# Couchbase
CB_CONNECTION_STRING=couchbase://cluster-url
CB_USERNAME=admin
CB_PASSWORD=password
CB_BUCKET=kubecon-talks
CB_COLLECTION=talks
CB_SEARCH_INDEX=kubecontalks
# AI Inference
NEBIUS_API_KEY=your_key
NEBIUS_API_BASE=https://api.tokenfactory.nebius.com/v1
# Research APIs
EXA_API_KEY=your_key
TAVILY_API_KEY=your_key
LINKUP_API_KEY=your_key
Use Cases
Conference Proposals
Generate unique talk proposals for KubeCon, re:Invent, etc.
Content Ideas
Blog posts, webinars based on current trends
Research Synthesis
Combine historical data with latest developments
Trend Analysis
Identify gaps in conference coverage
Project Structure
conference_talk_abstract_generator/
├── extract_events.py # URL extraction
├── couchbase_utils.py # Web crawling + storage
├── embeddinggeneration.py # Vector embedding
├── adk_research_agent.py # Real-time research
├── talk_suggestions_app.py # Streamlit RAG app
├── event_urls.txt # Extracted URLs
└── .env # Configuration
Related Patterns
Trend Analyzer
Similar ADK pipeline without vector search
Deep Researcher
Web research with different framework
Learn More
RAG Workflows
Retrieval-Augmented Generation patterns
Multi-Agent Patterns
Parallel and sequential agent orchestration
Advanced Agents
More advanced agent examples