Hyperbolic AgentKit supports multiple agent types and execution modes, each optimized for different use cases. All agents are powered by the ReAct (Reasoning and Acting) pattern via LangGraph.
One-on-one conversation with the agent via terminal.
chatbot.py
async def run_chat_mode(agent_executor, config, runnable_config): """Run the agent interactively based on user input.""" print_system("Starting chat mode... Type 'exit' to end.") while True: user_input = input(f"{Colors.BLUE}{Colors.BOLD}User: {Colors.ENDC}") if user_input.lower() == "exit": break async for chunk in agent_executor.astream( {"messages": [HumanMessage(content=user_input)]}, runnable_config ): if "agent" in chunk: response = chunk["agent"]["messages"][0].content print_ai(format_ai_message_content(response)) elif "tools" in chunk: print_system(chunk["tools"]["messages"][0].content)
Use Cases:
Testing and debugging
Direct interaction with GPU compute
Blockchain operations
General assistance tasks
Starting Chat Mode:
poetry run python chatbot.py# Select option 1: Interactive chat mode
Autonomous agent that monitors Twitter mentions and interacts with KOLs.
chatbot.py
async def run_twitter_automation(agent_executor, config, runnable_config): """Run the agent autonomously with specified intervals.""" print_system(f"Starting autonomous mode as {config['character']['name']}...") twitter_state.load() while True: # Check if enough time has passed since last check if not twitter_state.can_check_mentions(): wait_time = MENTION_CHECK_INTERVAL - \ (datetime.now() - twitter_state.last_check_time).total_seconds() if wait_time > 0: await asyncio.sleep(wait_time) continue # Update check time twitter_state.last_check_time = datetime.now() twitter_state.save() # Select KOLs for interaction selected_kols = random.sample(config['character']['kol_list'], NUM_KOLS) # Create autonomous task prompt thought = f""" Task 1: Query podcast knowledge base and recent tweets Task 2: Check for and reply to new Twitter mentions Task 3: Interact with KOLs """ # Execute autonomous workflow async for chunk in agent_executor.astream( {"messages": [HumanMessage(content=thought)]}, runnable_config ): # Process results... # Wait before next cycle await asyncio.sleep(MENTION_CHECK_INTERVAL)
try: async for chunk in agent_executor.astream( {"messages": [HumanMessage(content=thought)]}, runnable_config ): # Process chunksexcept KeyboardInterrupt: print_system("\nSaving state and exiting...") twitter_state.save() sys.exit(0)except Exception as e: print_error(f"Unexpected error: {str(e)}") print_error(f"Error type: {type(e).__name__}") # Continue after error in autonomous mode print_system("Continuing after error...") await asyncio.sleep(MENTION_CHECK_INTERVAL)