Overview
Clients in the Atlan Application SDK handle connection management and HTTP communication with external systems. They provide a clean abstraction for making API calls, handling authentication, and managing retries.
Clients focus on connection management and HTTP operations. Business logic should be implemented in Handlers .
ClientInterface
The ClientInterface defines the base contract that all client implementations must follow.
from abc import ABC , abstractmethod
from typing import Any
class ClientInterface ( ABC ):
"""Base interface for implementing client connections."""
@abstractmethod
async def load ( self , * args : Any, ** kwargs : Any) -> None :
"""Establish the client connection."""
raise NotImplementedError ( "load method is not implemented" )
async def close ( self , * args : Any, ** kwargs : Any) -> None :
"""Close the client connection."""
return
BaseClient
The BaseClient class provides a complete implementation for HTTP-based clients with built-in support for authentication, custom headers, and retry logic.
from application_sdk.clients.base import BaseClient
from typing import Dict, Any
class MyClient ( BaseClient ):
async def load ( self , ** kwargs : Any) -> None :
credentials = kwargs.get( "credentials" , {})
# Set up authentication headers
self .http_headers = {
"Authorization" : f "Bearer { credentials.get( 'api_key' ) } " ,
"User-Agent" : "MyApp/1.0"
}
Constructor Parameters
credentials
Dict[str, Any]
default: "{}"
Client credentials for authentication.
HTTP headers for all requests. Supports dict, httpx Headers object, or list of tuples. # As dict
client = BaseClient( http_headers = { "User-Agent" : "MyApp" })
# As Headers object
from httpx import Headers
client = BaseClient( http_headers = Headers({ "User-Agent" : "MyApp" }))
# As list of tuples
client = BaseClient( http_headers = [( "User-Agent" , "MyApp" )])
Instance Attributes
HTTP headers applied to all requests. Set in the load() method.
HTTP transport layer for requests. Override in load() for custom retry behavior. Default: httpx.AsyncHTTPTransport() (no status code retries)
Stored credentials for the client.
Required Methods
load()
Initialize the client with credentials and set up authentication.
async def load ( self , ** kwargs : Any) -> None :
"""Initialize the client."""
credentials = kwargs.get( "credentials" , {})
# Extract credentials
api_key = credentials.get( "api_key" )
if not api_key:
raise ValueError ( "API key is required" )
# Set up headers for all requests
self .http_headers = {
"Authorization" : f "Bearer { api_key } " ,
"Content-Type" : "application/json" ,
"User-Agent" : "MyApp/1.0"
}
# Store credentials for later use
self .credentials = credentials
Keyword arguments typically including:
credentials (Dict[str, Any]): Authentication credentials
Configuration parameters for retry logic
Always set self.http_headers in the load() method rather than the constructor. This ensures headers are properly configured with credentials.
HTTP Request Methods
execute_http_get_request()
Perform an HTTP GET request.
response = await client.execute_http_get_request(
url = "https://api.example.com/data" ,
headers = { "Custom-Header" : "value" },
params = { "limit" : 100 , "offset" : 0 },
auth = ( "username" , "password" ),
timeout = 30
)
if response and response.status_code == 200 :
data = response.json()
The URL to make the GET request to.
Additional headers for this request. Merged with client-level headers. Supports dict, httpx Headers object, or list of tuples.
Query parameters. Supports dict, list of tuples, or string. # As dict
params = { "key" : "value" , "limit" : 100 }
# As list of tuples (for duplicate keys)
params = [( "tag" , "python" ), ( "tag" , "api" )]
# As string
params = "key=value&limit=100"
Authentication for the request. Supports:
Tuple: ("username", "password") for basic auth
httpx.BasicAuth: BasicAuth("username", "password")
httpx.DigestAuth: For digest authentication
Custom auth classes
from httpx import BasicAuth
# Using tuple
auth = ( "user" , "pass" )
# Using BasicAuth
auth = BasicAuth( "user" , "pass" )
Request timeout in seconds.
HTTP response object if successful, None if failed.
execute_http_post_request()
Perform an HTTP POST request with support for JSON, form data, files, and raw content.
# JSON request
response = await client.execute_http_post_request(
url = "https://api.example.com/data" ,
json_data = { "name" : "test" , "value" : 123 },
headers = { "Content-Type" : "application/json" },
timeout = 30
)
# Form data
response = await client.execute_http_post_request(
url = "https://api.example.com/form" ,
data = { "field1" : "value1" , "field2" : "value2" }
)
# File upload
with open ( "file.txt" , "rb" ) as f:
response = await client.execute_http_post_request(
url = "https://api.example.com/upload" ,
files = { "file" : ( "file.txt" , f, "text/plain" )},
data = { "description" : "My file" }
)
The URL to make the POST request to.
Form data to send in the request body. Supports dict, list of tuples, or other httpx-compatible formats.
JSON data to send in the request body. Any JSON-serializable object.
Raw binary content to send in the request body.
Files to upload. Supports various formats: # Single file
files = { "file" : open ( "file.txt" , "rb" )}
# With filename and content type
files = { "file" : ( "file.txt" , open ( "file.txt" , "rb" ), "text/plain" )}
# Multiple files
files = {
"file1" : open ( "file1.txt" , "rb" ),
"file2" : open ( "file2.txt" , "rb" )
}
Additional headers for this request. Merged with client-level headers.
Query parameters to include in the URL.
Cookies to include in the request.
Authentication for the request. Same options as GET request.
Whether to follow HTTP redirects.
Whether to verify SSL certificates.
Request timeout in seconds.
HTTP response object if successful, None if failed.
The client supports a two-level header system:
Client-level headers : Set in load() and applied to all requests
Method-level headers : Passed to individual methods and merged with client headers
class MyClient ( BaseClient ):
async def load ( self , ** kwargs ):
# Client-level headers (all requests)
self .http_headers = {
"Authorization" : f "Bearer { kwargs[ 'api_key' ] } " ,
"User-Agent" : "MyApp/1.0"
}
# Method-level headers (specific request)
response = await client.execute_http_get_request(
url = "https://api.example.com/data" ,
headers = { "X-Request-ID" : "123" } # Merged with client headers
)
Method-level headers override client-level headers for the same header name.
Advanced Retry Configuration
For applications requiring advanced retry logic (status code-based retries, rate limiting, custom backoff), use the httpx-retries library:
from application_sdk.clients.base import BaseClient
from typing import Any
class ResilientClient ( BaseClient ):
async def load ( self , ** kwargs : Any) -> None :
credentials = kwargs.get( "credentials" , {})
# Set up headers
self .http_headers = {
"Authorization" : f "Bearer { credentials[ 'api_key' ] } "
}
# Install httpx-retries: pip install httpx-retries
from httpx_retries import Retry, RetryTransport
# Configure retry for status codes and network errors
retry = Retry(
total = 5 , # Maximum retry attempts
backoff_factor = 10 , # Exponential backoff multiplier
status_forcelist = [ 429 , 500 , 502 , 503 , 504 ], # Retry these status codes
raise_on_status = False # Don't raise on retry-able errors
)
# Override transport with retry support
self .http_retry_transport = RetryTransport( retry = retry)
Maximum number of retry attempts.
Multiplier for exponential backoff between retries. Wait time = backoff_factor * (2 ** retry_count)
HTTP status codes that should trigger a retry. Common codes:
429: Too Many Requests (rate limiting)
500: Internal Server Error
502: Bad Gateway
503: Service Unavailable
504: Gateway Timeout
The default httpx.AsyncHTTPTransport() only retries network-level errors (connection failures, timeouts). It does NOT retry based on HTTP status codes. For status code retries, you must use httpx-retries or a similar library.
Complete Client Example
from application_sdk.clients.base import BaseClient
from application_sdk.observability.logger_adaptor import get_logger
from typing import Dict, Any, List, Optional
import httpx
logger = get_logger( __name__ )
class DataAPIClient ( BaseClient ):
"""Client for interacting with a data API."""
def __init__ ( self ):
super (). __init__ ()
self .base_url = None
async def load ( self , ** kwargs : Any) -> None :
"""Initialize the client with credentials."""
credentials = kwargs.get( "credentials" , {})
# Extract configuration
api_key = credentials.get( "api_key" )
self .base_url = credentials.get( "base_url" , "https://api.datasource.com" )
if not api_key:
raise ValueError ( "API key is required" )
# Set up authentication headers
self .http_headers = {
"Authorization" : f "Bearer { api_key } " ,
"Content-Type" : "application/json" ,
"User-Agent" : "AtlanSDK/1.0"
}
# Optional: Set up retry logic
if kwargs.get( "enable_retries" , True ):
try :
from httpx_retries import Retry, RetryTransport
retry = Retry(
total = 3 ,
backoff_factor = 5 ,
status_forcelist = [ 429 , 500 , 502 , 503 , 504 ]
)
self .http_retry_transport = RetryTransport( retry = retry)
logger.info( "Retry logic enabled" )
except ImportError :
logger.warning( "httpx-retries not installed, using default transport" )
logger.info( f "Client initialized for { self .base_url } " )
async def get_sources ( self ) -> List[Dict[ str , Any]]:
"""Fetch list of data sources."""
response = await self .execute_http_get_request(
url = f " { self .base_url } /v1/sources" ,
timeout = 30
)
if response and response.status_code == 200 :
return response.json().get( "sources" , [])
logger.error( f "Failed to fetch sources: { response.status_code if response else 'No response' } " )
return []
async def get_source_details ( self , source_id : str ) -> Optional[Dict[ str , Any]]:
"""Fetch details for a specific source."""
response = await self .execute_http_get_request(
url = f " { self .base_url } /v1/sources/ { source_id } " ,
timeout = 30
)
if response and response.status_code == 200 :
return response.json()
return None
async def query_data ( self , source_id : str , query : str , params : Dict[ str , Any] = None ) -> List[Dict[ str , Any]]:
"""Execute a query against a data source."""
response = await self .execute_http_post_request(
url = f " { self .base_url } /v1/query" ,
json_data = {
"source_id" : source_id,
"query" : query,
"parameters" : params or {}
},
timeout = 300 # Longer timeout for queries
)
if response and response.status_code == 200 :
return response.json().get( "data" , [])
error_msg = f "Query failed: { response.status_code if response else 'No response' } "
logger.error(error_msg)
raise Exception (error_msg)
async def upload_data ( self , file_path : str , destination : str ) -> Dict[ str , Any]:
"""Upload a data file."""
with open (file_path, "rb" ) as f:
response = await self .execute_http_post_request(
url = f " { self .base_url } /v1/upload" ,
files = { "file" : (file_path, f, "application/octet-stream" )},
data = { "destination" : destination},
timeout = 600 # Long timeout for uploads
)
if response and response.status_code == 200 :
return response.json()
raise Exception ( f "Upload failed: { response.status_code if response else 'No response' } " )
async def close ( self ) -> None :
"""Clean up client resources."""
logger.info( "Closing client connection" )
# Perform any cleanup if needed
Best Practices
Set Headers in load() Method
Use Retry Logic for Production
For production applications, configure retry logic to handle transient failures. from httpx_retries import Retry, RetryTransport
async def load ( self , ** kwargs ):
# ... headers setup ...
retry = Retry(
total = 5 ,
backoff_factor = 10 ,
status_forcelist = [ 429 , 500 , 502 , 503 , 504 ]
)
self .http_retry_transport = RetryTransport( retry = retry)
Always check for None responses and handle errors appropriately. response = await self .client.execute_http_get_request(url)
if response is None :
logger.error( "Request failed: No response received" )
return None
if response.status_code != 200 :
logger.error( f "Request failed: { response.status_code } " )
return None
return response.json()
Set realistic timeouts based on the operation type. # Quick metadata request
await client.execute_http_get_request(url, timeout = 10 )
# Long-running query
await client.execute_http_post_request(url, json_data = query, timeout = 300 )
# Large file upload
await client.execute_http_post_request(url, files = files, timeout = 600 )
Handlers - Use clients to implement business logic
Activities - Access handlers (which use clients) from workflows