Stages are the building blocks of PDAL pipelines. Each stage represents a specific operation on point cloud data, from reading files to applying transformations to writing output.
Stage hierarchy
PDAL Python provides three main stage types that correspond to PDAL’s stage categories:
Reader
Readers load point cloud data from various sources and formats. The Reader class can infer the driver type from the filename extension:
import pdal
# Type is automatically inferred from .las extension
reader = pdal.Reader("input.las")
print(reader.type) # "readers.las"
# Explicitly specify the type
reader = pdal.Reader(type="readers.las", filename="input.las")
# Use the convenience method
reader = pdal.Reader.las(filename="input.las")
Constructor signature:
class Reader(InferableTypeStage):
def __init__(self, filename: Optional[str] = None, **options: Any):
"""Create a Reader stage.
Args:
filename: Path to input file (type inferred from extension)
**options: Additional reader-specific options
"""
Filter
Filters transform, analyze, or filter point cloud data. Unlike readers and writers, filters require an explicit type specification:
import pdal
# Filters require the type parameter
filter_stage = pdal.Filter(type="filters.sort", dimension="X")
# Use the convenience method (recommended)
filter_stage = pdal.Filter.sort(dimension="X")
Constructor signature:
class Filter(Stage):
def __init__(self, type: str, **options: Any):
"""Create a Filter stage.
Args:
type: PDAL filter type (e.g., 'filters.sort')
**options: Filter-specific options
"""
Writer
Writers save processed point cloud data to files or databases. Like readers, writers can infer the driver type from the filename:
import pdal
# Type is automatically inferred from .las extension
writer = pdal.Writer("output.las")
print(writer.type) # "writers.las"
# Explicitly specify the type
writer = pdal.Writer(type="writers.las", filename="output.las")
# Use the convenience method
writer = pdal.Writer.las(filename="output.las")
Constructor signature:
class Writer(InferableTypeStage):
def __init__(self, filename: Optional[str] = None, **options: Any):
"""Create a Writer stage.
Args:
filename: Path to output file (type inferred from extension)
**options: Additional writer-specific options
"""
How stages work together
Stages are connected in sequence to form a pipeline. Data flows from readers through filters and finally to writers:
import pdal
# Each stage processes the output of the previous stage
pipeline = (
pdal.Reader("test/data/autzen-utm.las")
| pdal.Filter.range(limits="Intensity[50:200)")
| pdal.Filter.splitter(length=1000)
| pdal.Writer.las(filename="output.las")
)
In this example:
- The Reader loads points from the LAS file
- The Filter.range removes points outside the intensity range
- The Filter.splitter divides the data into tiles
- The Writer saves the processed points to a new file
Stage options and configuration
All stage options are passed as keyword arguments during construction. These options correspond to the PDAL stage documentation:
import pdal
# Reader with options
reader = pdal.Reader.las(
filename="input.las",
spatialreference="EPSG:4326"
)
# Filter with options
range_filter = pdal.Filter.range(
limits="Classification[2:2]", # Keep only ground points
tag="ground_only" # Optional tag for referencing
)
# Writer with options
writer = pdal.Writer.las(
filename="output.las",
offset_x="auto",
offset_y="auto",
offset_z="auto",
scale_x=0.01,
scale_y=0.01,
scale_z=0.01
)
Stage properties
Each stage provides several read-only properties:
import pdal
stage = pdal.Filter.sort(dimension="X", tag="my_sorter")
# Get the PDAL type identifier
print(stage.type) # "filters.sort"
# Check if the stage supports streaming
print(stage.streamable) # True or False
# Get the stage tag (if provided)
print(stage.tag) # "my_sorter"
# Get all stage options as a dictionary
print(stage.options) # {"type": "filters.sort", "dimension": "X", "tag": "my_sorter"}
# Get input stages
print(stage.inputs) # []
The pipe operator
The pipe operator (|) provides an intuitive way to chain stages together. This creates a new Pipeline object:
import pdal
# Chain stages with the pipe operator
read = pdal.Reader("test/data/autzen-utm.las")
frange = pdal.Filter.range(limits="Intensity[50:200)")
fsplitter = pdal.Filter.splitter(length=1000)
# Create pipeline by piping stages together
pipeline = read | frange | fsplitter
pipeline.execute()
# You can also pipe a stage to a pipeline
pipeline = read | (frange | fsplitter)
pipeline.execute()
# Or pipe pipelines together
pipeline = (read | frange) | (fsplitter | pdal.Writer.null())
pipeline.execute()
The pipe operator always creates a new Pipeline object. To modify an existing pipeline, use the in-place operator |= instead.
Some filters need to reference specific stages as inputs, especially when working with multiple data streams:
import pdal
read = pdal.Reader("test/data/autzen-utm.las")
frange = pdal.Filter.range(limits="Intensity[50:200)")
fsplitter = pdal.Filter.splitter(length=1000)
# Delaunay filter takes both range and splitter outputs as input
fdelaunay = pdal.Filter.delaunay(inputs=[frange, fsplitter])
pipeline = read | frange | fsplitter | fdelaunay
pipeline.execute()
Inputs can be specified as:
- Stage objects: Direct reference to a stage instance
- String tags: Reference by the stage’s tag name
import pdal
# Reference by stage object
stage1 = pdal.Filter.sort(dimension="X", tag="sorter")
stage2 = pdal.Filter.merge(inputs=[stage1])
# Reference by tag name
stage1 = pdal.Filter.sort(dimension="X", tag="sorter")
stage2 = pdal.Filter.merge(inputs=["sorter"])
Convenience methods
PDAL Python auto-generates convenience methods for all available PDAL drivers. These methods are accessible as static methods on the stage classes:
import pdal
# Instead of:
reader = pdal.Reader(type="readers.las", filename="input.las")
# Use the convenience method:
reader = pdal.Reader.las(filename="input.las")
# View documentation for any stage
help(pdal.Filter.head)
The help output shows all available options:
Help on function head in module pdal.pipeline:
head(**kwargs)
Return N points from beginning of the point cloud.
user_data: User JSON
log: Debug output filename
option_file: File from which to read additional options
where: Expression describing points to be passed to this filter
where_merge='auto': If 'where' option is set, describes how skipped points should be merged with kept points in standard mode.
count='10': Number of points to return from beginning. If 'invert' is true, number of points to drop from the beginning.
invert='false': If true, 'count' specifies the number of points to skip from the beginning.
Streamable stages
Some PDAL stages support streaming execution, which allows processing large datasets without loading all points into memory:
import pdal
# Check if individual stages are streamable
print(pdal.Reader.las("foo").streamable) # True
print(pdal.Filter.crop().streamable) # True
print(pdal.Filter.cluster().streamable) # False
print(pdal.Writer.las(filename="foo").streamable) # True
# A pipeline is streamable only if all stages are streamable
streamable_pipeline = (
pdal.Reader.las("input.las")
| pdal.Filter.range(limits="Z[0:100]")
| pdal.Writer.las(filename="output.las")
)
print(streamable_pipeline.streamable) # True
non_streamable_pipeline = (
pdal.Reader.las("input.las")
| pdal.Filter.cluster() # Not streamable
| pdal.Writer.las(filename="output.las")
)
print(non_streamable_pipeline.streamable) # False
Real-world example
Here’s a complete example demonstrating stage configuration for creating a Digital Terrain Model (DTM):
import pdal
pc_path = 'https://github.com/PDAL/data/raw/refs/heads/main/autzen/autzen.laz'
out_file = 'autzen_dtm.tif'
# Read point cloud
reader = pdal.Reader.las(pc_path)
# Remove noisy points
lownoise_filter = pdal.Filter.range(
limits='Classification![7:7]', tag='lownoise'
)
highnoise_filter = pdal.Filter.range(
limits='Classification![18:]', tag='highnoise'
)
# Fix incorrectly labeled returns
prepare_ground = pdal.Filter.assign(
value=[
'Classification=0',
'ReturnNumber=1 WHERE ReturnNumber < 1',
'NumberOfReturns=1 WHERE NumberOfReturns < 1',
],
tag='prepare_ground_classifier',
)
# Classify ground points
smrf_classifier = pdal.Filter.smrf(tag='ground_classifier')
# Write DTM with GDAL
gdal_writer = pdal.Writer.gdal(
filename=out_file,
where='Classification == 2',
data_type='float32',
resolution=10,
output_type='idw',
window_size=3,
pdal_metadata=True,
)
# Build and execute pipeline
pipeline = (
reader
| lownoise_filter
| highnoise_filter
| prepare_ground
| smrf_classifier
| gdal_writer
)
pipeline.execute()