TransformerInterface
application_sdk.transformers.TransformerInterface
Abstract base class for metadata transformers.
Methods
transform_metadata
Transform metadata from one format to another.def transform_metadata(
self,
typename: str,
dataframe: daft.DataFrame,
workflow_id: str,
workflow_run_id: str,
entity_class_definitions: Dict[str, Type[Any]] | None = None,
**kwargs: Any
) -> daft.DataFrame
The type identifier for the metadata being transformed
The source metadata DataFrame to transform
Identifier for the workflow requesting the transformation
Identifier for the specific workflow run
Mapping of entity types to their class definitions
Additional keyword arguments for specific transformer implementations
The transformed metadata DataFrame
If the subclass does not implement this method
Example Usage
Basic Transformer
from application_sdk.transformers import TransformerInterface
import daft
from typing import Dict, Type, Any
class JSONToAtlanTransformer(TransformerInterface):
"""Transform JSON metadata to Atlan format."""
def transform_metadata(
self,
typename: str,
dataframe: daft.DataFrame,
workflow_id: str,
workflow_run_id: str,
entity_class_definitions: Dict[str, Type[Any]] | None = None,
**kwargs: Any
) -> daft.DataFrame:
"""Transform JSON metadata to Atlan entities."""
# Add workflow context
dataframe = dataframe.with_column(
"workflow_id",
daft.lit(workflow_id)
)
dataframe = dataframe.with_column(
"workflow_run_id",
daft.lit(workflow_run_id)
)
# Transform based on typename
if typename == "Table":
return self._transform_table(dataframe)
elif typename == "Column":
return self._transform_column(dataframe)
else:
return dataframe
def _transform_table(self, df: daft.DataFrame) -> daft.DataFrame:
"""Transform table metadata."""
# Add qualified name
df = df.with_column(
"qualified_name",
daft.col("database") + "." + daft.col("schema") + "." + daft.col("name")
)
# Add entity type
df = df.with_column("type_name", daft.lit("Table"))
return df
def _transform_column(self, df: daft.DataFrame) -> daft.DataFrame:
"""Transform column metadata."""
# Add qualified name
df = df.with_column(
"qualified_name",
daft.col("table_qualified_name") + "." + daft.col("name")
)
# Add entity type
df = df.with_column("type_name", daft.lit("Column"))
return df
SQL Transformer
from application_sdk.transformers import TransformerInterface
import daft
from typing import Dict, Type, Any
class SQLMetadataTransformer(TransformerInterface):
"""Transform SQL database metadata to Atlan format."""
def transform_metadata(
self,
typename: str,
dataframe: daft.DataFrame,
workflow_id: str,
workflow_run_id: str,
entity_class_definitions: Dict[str, Type[Any]] | None = None,
**kwargs: Any
) -> daft.DataFrame:
"""Transform SQL metadata with connection info."""
connection_qn = kwargs.get("connection_qualified_name")
connector_type = kwargs.get("connector_type", "mysql")
# Add connection context
dataframe = dataframe.with_column(
"connection_qualified_name",
daft.lit(connection_qn)
)
dataframe = dataframe.with_column(
"connector_type",
daft.lit(connector_type)
)
# Add workflow tracking
dataframe = dataframe.with_column(
"workflow_id",
daft.lit(workflow_id)
)
dataframe = dataframe.with_column(
"workflow_run_id",
daft.lit(workflow_run_id)
)
# Transform by type
if typename == "Database":
return self._transform_database(dataframe, connection_qn)
elif typename == "Schema":
return self._transform_schema(dataframe, connection_qn)
elif typename == "Table":
return self._transform_table(dataframe, connection_qn)
elif typename == "Column":
return self._transform_column(dataframe)
return dataframe
def _transform_database(self, df: daft.DataFrame, conn_qn: str) -> daft.DataFrame:
"""Transform database entities."""
df = df.with_column(
"qualified_name",
daft.lit(conn_qn) + "/" + daft.col("name")
)
df = df.with_column("type_name", daft.lit("Database"))
return df
def _transform_schema(self, df: daft.DataFrame, conn_qn: str) -> daft.DataFrame:
"""Transform schema entities."""
df = df.with_column(
"qualified_name",
daft.lit(conn_qn) + "/" + daft.col("database_name") + "/" + daft.col("name")
)
df = df.with_column("type_name", daft.lit("Schema"))
return df
def _transform_table(self, df: daft.DataFrame, conn_qn: str) -> daft.DataFrame:
"""Transform table entities."""
df = df.with_column(
"qualified_name",
daft.lit(conn_qn) + "/" +
daft.col("database_name") + "/" +
daft.col("schema_name") + "/" +
daft.col("name")
)
df = df.with_column("type_name", daft.lit("Table"))
# Add row count if available
if "row_count" in df.column_names:
df = df.with_column(
"row_count",
daft.col("row_count").cast(daft.DataType.int64())
)
return df
def _transform_column(self, df: daft.DataFrame) -> daft.DataFrame:
"""Transform column entities."""
df = df.with_column(
"qualified_name",
daft.col("table_qualified_name") + "/" + daft.col("name")
)
df = df.with_column("type_name", daft.lit("Column"))
# Add data type and order
if "data_type" in df.column_names:
df = df.with_column(
"data_type",
daft.col("data_type").cast(daft.DataType.string())
)
if "order" in df.column_names:
df = df.with_column(
"order",
daft.col("order").cast(daft.DataType.int32())
)
return df
Enrichment Transformer
from application_sdk.transformers import TransformerInterface
import daft
from typing import Dict, Type, Any
import hashlib
from datetime import datetime
class EnrichmentTransformer(TransformerInterface):
"""Transformer that enriches metadata with additional fields."""
def transform_metadata(
self,
typename: str,
dataframe: daft.DataFrame,
workflow_id: str,
workflow_run_id: str,
entity_class_definitions: Dict[str, Type[Any]] | None = None,
**kwargs: Any
) -> daft.DataFrame:
"""Enrich metadata with computed fields."""
# Add timestamps
current_time = datetime.now().isoformat()
dataframe = dataframe.with_column(
"ingestion_timestamp",
daft.lit(current_time)
)
# Add workflow context
dataframe = dataframe.with_column(
"workflow_id",
daft.lit(workflow_id)
)
dataframe = dataframe.with_column(
"workflow_run_id",
daft.lit(workflow_run_id)
)
# Generate entity IDs if not present
if "entity_id" not in dataframe.column_names:
dataframe = self._add_entity_ids(dataframe)
# Add type-specific enrichments
if typename == "Table":
dataframe = self._enrich_table(dataframe)
elif typename == "Column":
dataframe = self._enrich_column(dataframe)
return dataframe
def _add_entity_ids(self, df: daft.DataFrame) -> daft.DataFrame:
"""Generate entity IDs from qualified names."""
# Use UDF to generate hash-based IDs
def generate_id(qualified_name: str) -> str:
return hashlib.sha256(qualified_name.encode()).hexdigest()[:16]
df = df.with_column(
"entity_id",
daft.col("qualified_name").apply(generate_id, return_dtype=daft.DataType.string())
)
return df
def _enrich_table(self, df: daft.DataFrame) -> daft.DataFrame:
"""Add table-specific enrichments."""
# Add size category based on row count
if "row_count" in df.column_names:
def categorize_size(row_count: int) -> str:
if row_count < 1000:
return "small"
elif row_count < 1000000:
return "medium"
else:
return "large"
df = df.with_column(
"size_category",
daft.col("row_count").apply(
categorize_size,
return_dtype=daft.DataType.string()
)
)
return df
def _enrich_column(self, df: daft.DataFrame) -> daft.DataFrame:
"""Add column-specific enrichments."""
# Detect potential PII based on column name
def detect_pii(column_name: str) -> bool:
pii_keywords = ["email", "phone", "ssn", "credit_card", "password"]
return any(keyword in column_name.lower() for keyword in pii_keywords)
df = df.with_column(
"potential_pii",
daft.col("name").apply(
detect_pii,
return_dtype=daft.DataType.bool()
)
)
return df
Validation Transformer
from application_sdk.transformers import TransformerInterface
import daft
from typing import Dict, Type, Any, List
class ValidationTransformer(TransformerInterface):
"""Transformer that validates and cleans metadata."""
REQUIRED_FIELDS = {
"Table": ["name", "qualified_name", "type_name"],
"Column": ["name", "qualified_name", "type_name", "data_type"],
"Schema": ["name", "qualified_name", "type_name"],
}
def transform_metadata(
self,
typename: str,
dataframe: daft.DataFrame,
workflow_id: str,
workflow_run_id: str,
entity_class_definitions: Dict[str, Type[Any]] | None = None,
**kwargs: Any
) -> daft.DataFrame:
"""Validate and clean metadata."""
# Validate required fields
self._validate_required_fields(typename, dataframe)
# Clean data
dataframe = self._clean_data(dataframe)
# Add validation metadata
dataframe = dataframe.with_column(
"validated",
daft.lit(True)
)
dataframe = dataframe.with_column(
"validation_workflow_id",
daft.lit(workflow_id)
)
return dataframe
def _validate_required_fields(self, typename: str, df: daft.DataFrame):
"""Validate that required fields are present."""
required = self.REQUIRED_FIELDS.get(typename, [])
missing = [field for field in required if field not in df.column_names]
if missing:
raise ValueError(
f"Missing required fields for {typename}: {missing}"
)
def _clean_data(self, df: daft.DataFrame) -> daft.DataFrame:
"""Clean and normalize data."""
# Trim string fields
for col_name in df.column_names:
col_type = df.schema()[col_name].dtype
if col_type == daft.DataType.string():
df = df.with_column(
col_name,
daft.col(col_name).str.strip()
)
# Replace empty strings with None
for col_name in df.column_names:
col_type = df.schema()[col_name].dtype
if col_type == daft.DataType.string():
df = df.with_column(
col_name,
daft.col(col_name).if_else(
daft.col(col_name) == "",
None,
daft.col(col_name)
)
)
return df
Best Practices
Transformer Design
- Keep transformers focused on data transformation logic
- Make transformers stateless and reusable
- Handle multiple entity types in a single transformer
- Use type-specific methods for clarity
Data Processing
- Use Daft DataFrame operations for efficiency
- Add workflow context to transformed data
- Generate qualified names consistently
- Validate data before transformation
Error Handling
- Validate required fields early
- Provide clear error messages
- Handle missing or null values gracefully
- Log transformation errors with context
Performance
- Use vectorized operations
- Minimize column operations
- Avoid unnecessary data copies
- Process data in batches when possible