Skip to main content
Transformers handle the conversion of metadata between different formats while preserving semantic meaning and relationships.

TransformerInterface

application_sdk.transformers.TransformerInterface Abstract base class for metadata transformers.

Methods

transform_metadata

Transform metadata from one format to another.
def transform_metadata(
    self,
    typename: str,
    dataframe: daft.DataFrame,
    workflow_id: str,
    workflow_run_id: str,
    entity_class_definitions: Dict[str, Type[Any]] | None = None,
    **kwargs: Any
) -> daft.DataFrame
typename
str
required
The type identifier for the metadata being transformed
dataframe
daft.DataFrame
required
The source metadata DataFrame to transform
workflow_id
str
required
Identifier for the workflow requesting the transformation
workflow_run_id
str
required
Identifier for the specific workflow run
entity_class_definitions
Dict[str, Type[Any]] | None
default:"None"
Mapping of entity types to their class definitions
**kwargs
Any
Additional keyword arguments for specific transformer implementations
return
daft.DataFrame
The transformed metadata DataFrame
raises
NotImplementedError
If the subclass does not implement this method

Example Usage

Basic Transformer

from application_sdk.transformers import TransformerInterface
import daft
from typing import Dict, Type, Any

class JSONToAtlanTransformer(TransformerInterface):
    """Transform JSON metadata to Atlan format."""
    
    def transform_metadata(
        self,
        typename: str,
        dataframe: daft.DataFrame,
        workflow_id: str,
        workflow_run_id: str,
        entity_class_definitions: Dict[str, Type[Any]] | None = None,
        **kwargs: Any
    ) -> daft.DataFrame:
        """Transform JSON metadata to Atlan entities."""
        
        # Add workflow context
        dataframe = dataframe.with_column(
            "workflow_id",
            daft.lit(workflow_id)
        )
        dataframe = dataframe.with_column(
            "workflow_run_id",
            daft.lit(workflow_run_id)
        )
        
        # Transform based on typename
        if typename == "Table":
            return self._transform_table(dataframe)
        elif typename == "Column":
            return self._transform_column(dataframe)
        else:
            return dataframe
    
    def _transform_table(self, df: daft.DataFrame) -> daft.DataFrame:
        """Transform table metadata."""
        # Add qualified name
        df = df.with_column(
            "qualified_name",
            daft.col("database") + "." + daft.col("schema") + "." + daft.col("name")
        )
        
        # Add entity type
        df = df.with_column("type_name", daft.lit("Table"))
        
        return df
    
    def _transform_column(self, df: daft.DataFrame) -> daft.DataFrame:
        """Transform column metadata."""
        # Add qualified name
        df = df.with_column(
            "qualified_name",
            daft.col("table_qualified_name") + "." + daft.col("name")
        )
        
        # Add entity type
        df = df.with_column("type_name", daft.lit("Column"))
        
        return df

SQL Transformer

from application_sdk.transformers import TransformerInterface
import daft
from typing import Dict, Type, Any

class SQLMetadataTransformer(TransformerInterface):
    """Transform SQL database metadata to Atlan format."""
    
    def transform_metadata(
        self,
        typename: str,
        dataframe: daft.DataFrame,
        workflow_id: str,
        workflow_run_id: str,
        entity_class_definitions: Dict[str, Type[Any]] | None = None,
        **kwargs: Any
    ) -> daft.DataFrame:
        """Transform SQL metadata with connection info."""
        
        connection_qn = kwargs.get("connection_qualified_name")
        connector_type = kwargs.get("connector_type", "mysql")
        
        # Add connection context
        dataframe = dataframe.with_column(
            "connection_qualified_name",
            daft.lit(connection_qn)
        )
        dataframe = dataframe.with_column(
            "connector_type",
            daft.lit(connector_type)
        )
        
        # Add workflow tracking
        dataframe = dataframe.with_column(
            "workflow_id",
            daft.lit(workflow_id)
        )
        dataframe = dataframe.with_column(
            "workflow_run_id",
            daft.lit(workflow_run_id)
        )
        
        # Transform by type
        if typename == "Database":
            return self._transform_database(dataframe, connection_qn)
        elif typename == "Schema":
            return self._transform_schema(dataframe, connection_qn)
        elif typename == "Table":
            return self._transform_table(dataframe, connection_qn)
        elif typename == "Column":
            return self._transform_column(dataframe)
        
        return dataframe
    
    def _transform_database(self, df: daft.DataFrame, conn_qn: str) -> daft.DataFrame:
        """Transform database entities."""
        df = df.with_column(
            "qualified_name",
            daft.lit(conn_qn) + "/" + daft.col("name")
        )
        df = df.with_column("type_name", daft.lit("Database"))
        return df
    
    def _transform_schema(self, df: daft.DataFrame, conn_qn: str) -> daft.DataFrame:
        """Transform schema entities."""
        df = df.with_column(
            "qualified_name",
            daft.lit(conn_qn) + "/" + daft.col("database_name") + "/" + daft.col("name")
        )
        df = df.with_column("type_name", daft.lit("Schema"))
        return df
    
    def _transform_table(self, df: daft.DataFrame, conn_qn: str) -> daft.DataFrame:
        """Transform table entities."""
        df = df.with_column(
            "qualified_name",
            daft.lit(conn_qn) + "/" + 
            daft.col("database_name") + "/" + 
            daft.col("schema_name") + "/" + 
            daft.col("name")
        )
        df = df.with_column("type_name", daft.lit("Table"))
        
        # Add row count if available
        if "row_count" in df.column_names:
            df = df.with_column(
                "row_count",
                daft.col("row_count").cast(daft.DataType.int64())
            )
        
        return df
    
    def _transform_column(self, df: daft.DataFrame) -> daft.DataFrame:
        """Transform column entities."""
        df = df.with_column(
            "qualified_name",
            daft.col("table_qualified_name") + "/" + daft.col("name")
        )
        df = df.with_column("type_name", daft.lit("Column"))
        
        # Add data type and order
        if "data_type" in df.column_names:
            df = df.with_column(
                "data_type",
                daft.col("data_type").cast(daft.DataType.string())
            )
        
        if "order" in df.column_names:
            df = df.with_column(
                "order",
                daft.col("order").cast(daft.DataType.int32())
            )
        
        return df

Enrichment Transformer

from application_sdk.transformers import TransformerInterface
import daft
from typing import Dict, Type, Any
import hashlib
from datetime import datetime

class EnrichmentTransformer(TransformerInterface):
    """Transformer that enriches metadata with additional fields."""
    
    def transform_metadata(
        self,
        typename: str,
        dataframe: daft.DataFrame,
        workflow_id: str,
        workflow_run_id: str,
        entity_class_definitions: Dict[str, Type[Any]] | None = None,
        **kwargs: Any
    ) -> daft.DataFrame:
        """Enrich metadata with computed fields."""
        
        # Add timestamps
        current_time = datetime.now().isoformat()
        dataframe = dataframe.with_column(
            "ingestion_timestamp",
            daft.lit(current_time)
        )
        
        # Add workflow context
        dataframe = dataframe.with_column(
            "workflow_id",
            daft.lit(workflow_id)
        )
        dataframe = dataframe.with_column(
            "workflow_run_id",
            daft.lit(workflow_run_id)
        )
        
        # Generate entity IDs if not present
        if "entity_id" not in dataframe.column_names:
            dataframe = self._add_entity_ids(dataframe)
        
        # Add type-specific enrichments
        if typename == "Table":
            dataframe = self._enrich_table(dataframe)
        elif typename == "Column":
            dataframe = self._enrich_column(dataframe)
        
        return dataframe
    
    def _add_entity_ids(self, df: daft.DataFrame) -> daft.DataFrame:
        """Generate entity IDs from qualified names."""
        # Use UDF to generate hash-based IDs
        def generate_id(qualified_name: str) -> str:
            return hashlib.sha256(qualified_name.encode()).hexdigest()[:16]
        
        df = df.with_column(
            "entity_id",
            daft.col("qualified_name").apply(generate_id, return_dtype=daft.DataType.string())
        )
        return df
    
    def _enrich_table(self, df: daft.DataFrame) -> daft.DataFrame:
        """Add table-specific enrichments."""
        # Add size category based on row count
        if "row_count" in df.column_names:
            def categorize_size(row_count: int) -> str:
                if row_count < 1000:
                    return "small"
                elif row_count < 1000000:
                    return "medium"
                else:
                    return "large"
            
            df = df.with_column(
                "size_category",
                daft.col("row_count").apply(
                    categorize_size,
                    return_dtype=daft.DataType.string()
                )
            )
        
        return df
    
    def _enrich_column(self, df: daft.DataFrame) -> daft.DataFrame:
        """Add column-specific enrichments."""
        # Detect potential PII based on column name
        def detect_pii(column_name: str) -> bool:
            pii_keywords = ["email", "phone", "ssn", "credit_card", "password"]
            return any(keyword in column_name.lower() for keyword in pii_keywords)
        
        df = df.with_column(
            "potential_pii",
            daft.col("name").apply(
                detect_pii,
                return_dtype=daft.DataType.bool()
            )
        )
        
        return df

Validation Transformer

from application_sdk.transformers import TransformerInterface
import daft
from typing import Dict, Type, Any, List

class ValidationTransformer(TransformerInterface):
    """Transformer that validates and cleans metadata."""
    
    REQUIRED_FIELDS = {
        "Table": ["name", "qualified_name", "type_name"],
        "Column": ["name", "qualified_name", "type_name", "data_type"],
        "Schema": ["name", "qualified_name", "type_name"],
    }
    
    def transform_metadata(
        self,
        typename: str,
        dataframe: daft.DataFrame,
        workflow_id: str,
        workflow_run_id: str,
        entity_class_definitions: Dict[str, Type[Any]] | None = None,
        **kwargs: Any
    ) -> daft.DataFrame:
        """Validate and clean metadata."""
        
        # Validate required fields
        self._validate_required_fields(typename, dataframe)
        
        # Clean data
        dataframe = self._clean_data(dataframe)
        
        # Add validation metadata
        dataframe = dataframe.with_column(
            "validated",
            daft.lit(True)
        )
        dataframe = dataframe.with_column(
            "validation_workflow_id",
            daft.lit(workflow_id)
        )
        
        return dataframe
    
    def _validate_required_fields(self, typename: str, df: daft.DataFrame):
        """Validate that required fields are present."""
        required = self.REQUIRED_FIELDS.get(typename, [])
        missing = [field for field in required if field not in df.column_names]
        
        if missing:
            raise ValueError(
                f"Missing required fields for {typename}: {missing}"
            )
    
    def _clean_data(self, df: daft.DataFrame) -> daft.DataFrame:
        """Clean and normalize data."""
        # Trim string fields
        for col_name in df.column_names:
            col_type = df.schema()[col_name].dtype
            if col_type == daft.DataType.string():
                df = df.with_column(
                    col_name,
                    daft.col(col_name).str.strip()
                )
        
        # Replace empty strings with None
        for col_name in df.column_names:
            col_type = df.schema()[col_name].dtype
            if col_type == daft.DataType.string():
                df = df.with_column(
                    col_name,
                    daft.col(col_name).if_else(
                        daft.col(col_name) == "",
                        None,
                        daft.col(col_name)
                    )
                )
        
        return df

Best Practices

Transformer Design

  • Keep transformers focused on data transformation logic
  • Make transformers stateless and reusable
  • Handle multiple entity types in a single transformer
  • Use type-specific methods for clarity

Data Processing

  • Use Daft DataFrame operations for efficiency
  • Add workflow context to transformed data
  • Generate qualified names consistently
  • Validate data before transformation

Error Handling

  • Validate required fields early
  • Provide clear error messages
  • Handle missing or null values gracefully
  • Log transformation errors with context

Performance

  • Use vectorized operations
  • Minimize column operations
  • Avoid unnecessary data copies
  • Process data in batches when possible

Build docs developers (and LLMs) love