The IHiveChainInterface extends IWaxBaseInterface with online blockchain operations. It provides API access, transaction broadcasting, and automatic TAPOS data retrieval.
Properties
api
Returns the API collection for making RPC calls to the Hive node.
chain = create_hive_chain()
# Access various APIs
accounts = await chain.api.database_api.find_accounts(accounts=["alice"])
dgpo = await chain.api.database_api.get_dynamic_global_properties()
history = await chain.api.account_history_api.get_account_history(
account="alice",
start=-1,
limit=10
)
Collection of API endpoints including:
database_api - Account, block, and state queries
account_history_api - Transaction history
network_broadcast_api - Transaction broadcasting
rc_api - Resource credit queries
- And more…
endpoint_url
Gets or sets the Hive node endpoint URL.
chain = create_hive_chain()
# Get current endpoint
print(chain.endpoint_url) # "https://api.hive.blog"
# Change endpoint
chain.endpoint_url = "https://api.deathwing.me"
Current endpoint URL being used for API calls.
Setter raises: InvalidEndpointUrlFormatError if the URL format is invalid.
Inherited properties
All properties from IWaxBaseInterface are available:
chain_id - Chain identifier
config - Protocol configuration
address_prefix - Public key prefix
hive - HIVE asset factory
hbd - HBD asset factory
vests - VESTS asset factory
See Wax base interface for details.
Transaction methods
create_transaction
Creates an online transaction with automatic TAPOS data from the blockchain.
import asyncio
from datetime import timedelta
from wax import create_hive_chain
from wax.proto.operations import vote
async def main():
chain = create_hive_chain()
# Create with default 1-minute expiration
tx = await chain.create_transaction()
# Create with custom expiration
tx = await chain.create_transaction(expiration=timedelta(minutes=5))
# Add operations
tx.push_operation(
vote(
voter="alice",
author="bob",
permlink="example-post",
weight=10000
)
)
chain.teardown()
asyncio.run(main())
expiration
TTimestamp | None
default:"timedelta(minutes=1)"
Transaction expiration time. Can be:
datetime - Absolute UTC expiration time
timedelta - Duration from now
None - Uses default 1 minute
Online transaction object with TAPOS data automatically filled from the current head block.
Raises: AssertionError if expiration type is invalid.
broadcast
Broadcasts a signed transaction to the blockchain.
import asyncio
from wax import create_hive_chain
from wax.proto.operations import transfer
from beekeepy import AsyncBeekeeper
async def main():
chain = create_hive_chain()
# Create and sign transaction
tx = await chain.create_transaction()
tx.push_operation(
transfer(
from_account="alice",
to_account="bob",
amount=chain.hive.satoshis(1000),
memo="Payment"
)
)
async with await AsyncBeekeeper.factory() as beekeeper:
session = await beekeeper.create_session(salt="")
wallet = await session.create_wallet(name="wallet", password="pass")
await wallet.import_key(private_key="5J...")
await tx.sign(wallet=wallet, public_key="STM...")
# Broadcast to blockchain
await chain.broadcast(tx)
print(f"Transaction {tx.id} broadcasted successfully")
chain.teardown()
asyncio.run(main())
transaction
ITransaction | IOnlineTransaction
required
Signed transaction to broadcast.
Raises:
TransactionNotSignedError - Transaction has no signatures
WaxValidationFailedError - Transaction validation failed
AccountNotFoundError - One or more impacted accounts not found (IOnlineTransaction only)
PrivateKeyDetectedInMemoError - Private key detected in operation content (IOnlineTransaction only)
When broadcasting an IOnlineTransaction, the perform_on_chain_verification() method is called automatically to verify accounts exist and scan for leaked private keys.
Account authority methods
collect_account_authorities
Retrieves authority information for one or more accounts.
import asyncio
from wax import create_hive_chain
async def main():
chain = create_hive_chain()
# Get single account
auth_info = await chain.collect_account_authorities("alice")
print(f"Account: {auth_info.account}")
print(f"Memo key: {auth_info.memo_key}")
print(f"Owner threshold: {auth_info.authorities.owner.weight_threshold}")
# Get multiple accounts
auth_infos = await chain.collect_account_authorities("alice", "bob", "carol")
for auth_info in auth_infos:
print(f"Account: {auth_info.account}")
print(f" Posting keys: {list(auth_info.authorities.posting.key_auths.keys())}")
print(f" Active keys: {list(auth_info.authorities.active.key_auths.keys())}")
print(f" Owner keys: {list(auth_info.authorities.owner.key_auths.keys())}")
chain.teardown()
asyncio.run(main())
One or more account names to retrieve authorities for.
return
WaxAccountAuthorityInfo | list[WaxAccountAuthorityInfo]
If single account provided, returns single WaxAccountAuthorityInfo. If multiple accounts provided, returns list.Each WaxAccountAuthorityInfo contains:
account - Account name
authorities - Owner, active, and posting authorities
memo_key - Memo public key
last_owner_update - Last owner key update time
previous_owner_update - Previous owner key update time
Raises:
InvalidAccountNameError - Account name is invalid
AccountNotFoundError - Account not found on the blockchain
API extension methods
extends
Extends the API collection with custom API classes.
import asyncio
from typing import Any
from beekeepy.handle.remote import AbstractAsyncApi
from wax import create_hive_chain
from wax.api.collection import DatabaseApi as BaseDatabaseApi
# Extend existing API
class DatabaseApi(BaseDatabaseApi):
@BaseDatabaseApi.endpoint_jsonrpc
async def get_config(self) -> Any:
...
# Add new API
class BlockApi(AbstractAsyncApi):
@AbstractAsyncApi.endpoint_jsonrpc
async def get_block_header(self, *, block_num: int) -> Any:
...
# Create API collection
class MyApiCollection:
def __init__(self):
self.database_api = DatabaseApi
self.block_api = BlockApi
async def main():
chain = create_hive_chain()
extended = chain.extends(MyApiCollection)
# Use extended APIs with full type support
config = await extended.api.database_api.get_config()
header = await extended.api.block_api.get_block_header(block_num=123)
extended.teardown()
asyncio.run(main())
new_api
type[ExtendedApiCollectionT]
required
New API collection class to add. Must contain API classes as attributes.
return
IHiveChainInterface[ExtendedApiCollectionT | ApiCollectionT]
New chain instance with extended API collection. Original instance remains unchanged.
API class requirements:
- Extend existing API classes (like
DatabaseApi) or use AbstractAsyncApi for new APIs
- Use
@endpoint_jsonrpc decorator for endpoint methods
- Endpoint parameters must be keyword-only (use
* before params)
- Class name must match the Hive API name (e.g.,
BlockApi for block_api)
extend_rest
Extends the REST API collection with custom REST API classes.
import asyncio
from beekeepy.handle.remote import AbstractAsyncApi
from wax import create_hive_chain
class HafbeApi(AbstractAsyncApi):
@AbstractAsyncApi.endpoint_rest
async def get_account(self, *, account: str):
...
class MyRestApiCollection:
def __init__(self):
self.hafbe_api = HafbeApi
async def main():
chain = create_hive_chain()
extended = chain.extend_rest(MyRestApiCollection)
account_data = await extended.api.hafbe_api.get_account(account="alice")
extended.teardown()
asyncio.run(main())
new_rest_api
type[ExtendedApiCollectionT]
required
New REST API collection class to add.
return
IHiveChainInterface[ExtendedApiCollectionT | ApiCollectionT]
New chain instance with extended REST API collection.
REST API requirements:
- Use
AbstractAsyncApi as base class
- Use
@endpoint_rest decorator for REST endpoints
- Endpoint methods should return parsed response data
Cleanup methods
teardown
Cleans up API communication resources. Call this when finished with the chain instance.
import asyncio
from wax import create_hive_chain
async def main():
chain = create_hive_chain()
try:
# Use chain for operations
accounts = await chain.api.database_api.find_accounts(accounts=["alice"])
# ... more operations ...
finally:
# Always clean up
chain.teardown()
asyncio.run(main())
Always call teardown() when finished with a chain instance to properly close connections and free resources. Use a try/finally block to ensure cleanup happens even if errors occur.
Complete example
Here’s a complete example showing common operations:
import asyncio
from datetime import timedelta
from wax import create_hive_chain, WaxChainOptions
from wax.proto.operations import vote, comment
from beekeepy import AsyncBeekeeper
async def main():
# Create chain instance
chain = create_hive_chain(
WaxChainOptions(
endpoint_url="https://api.hive.blog"
)
)
try:
# Query blockchain
dgpo = await chain.api.database_api.get_dynamic_global_properties()
print(f"Head block: {dgpo.head_block_number}")
# Get account data
accounts = await chain.api.database_api.find_accounts(accounts=["alice"])
account = accounts.accounts[0]
print(f"Account balance: {account.balance}")
# Collect authorities
auth_info = await chain.collect_account_authorities("alice")
print(f"Posting keys: {list(auth_info.authorities.posting.key_auths.keys())}")
# Create and broadcast transaction
tx = await chain.create_transaction(expiration=timedelta(minutes=2))
tx.push_operation(
comment(
parent_author="",
parent_permlink="test",
author="alice",
permlink="my-post",
title="Hello Hive",
body="My first post using Wax!",
json_metadata="{}"
)
)
tx.push_operation(
vote(
voter="alice",
author="alice",
permlink="my-post",
weight=10000
)
)
# Sign transaction
async with await AsyncBeekeeper.factory() as beekeeper:
session = await beekeeper.create_session(salt="")
wallet = await session.create_wallet(name="my_wallet", password="secret")
await wallet.import_key(private_key="5J...")
await tx.sign(wallet=wallet, public_key="STM...")
# Broadcast
await chain.broadcast(tx)
print(f"Transaction {tx.id} broadcasted!")
finally:
# Clean up
chain.teardown()
asyncio.run(main())