# src/remem/information_extraction/my_custom_extraction_openai.py
from typing import Dict, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from remem.llm.openai_gpt import CacheOpenAI
from remem.prompts import PromptTemplateManager
from remem.utils.misc_utils import NerRawOutput, TripleRawOutput
from remem.information_extraction.openie_openai import ChunkInfo
class MyCustomExtraction:
def __init__(self, llm_model: CacheOpenAI, global_config=None):
# Initialize prompt template manager
self.prompt_template_manager = PromptTemplateManager(
role_mapping={"system": "system", "user": "user", "assistant": "assistant"}
)
self.llm_model = llm_model
self.global_config = global_config
def extract_chunk(self, chunk_key: str, passage: str) -> Dict:
"""Extract information from a single chunk."""
# Render your custom prompt template
input_message = self.prompt_template_manager.render(
name="my_custom_template",
passage=passage
)
# Call LLM
raw_response, metadata, cache_hit = self.llm_model.infer(
messages=input_message,
response_format={"type": "json_object"}
)
# Parse response and return structured output
# ... your parsing logic ...
return {
"chunk_id": chunk_key,
"response": raw_response,
"metadata": metadata,
# ... your extracted data ...
}
def batch_openie(self, chunks: Dict[str, ChunkInfo]) -> Tuple:
"""Batch extraction using multi-threading."""
chunk_passages = {chunk_key: chunk["content"] for chunk_key, chunk in chunks.items()}
results_list = []
with ThreadPoolExecutor() as executor:
futures = {
executor.submit(self.extract_chunk, chunk_key, passage): chunk_key
for chunk_key, passage in chunk_passages.items()
}
pbar = tqdm(as_completed(futures), total=len(futures), desc="Custom extraction")
for future in pbar:
result = future.result()
results_list.append(result)
# Convert to expected output format
results_dict = {res["chunk_id"]: res for res in results_list}
return results_dict