A Pipeline is the core object in PDAL Python that coordinates the execution of processing stages on point cloud data. It manages the flow of data through a sequence of readers, filters, and writers.
What is a Pipeline?
The Pipeline class extends PDAL’s native pipeline functionality to provide a Python-friendly interface. A pipeline defines a sequence of operations to perform on point cloud data, from reading input files to applying transformations and writing results.
import pdal
# Create a simple pipeline
pipeline = pdal.Pipeline()
Construction methods
You can construct pipelines in several ways, depending on your needs and preferences.
JSON string
The traditional PDAL approach uses a JSON string to define the pipeline configuration:
import pdal
json = """
{
"pipeline": [
"1.2-with-color.las",
{
"type": "filters.sort",
"dimension": "X"
}
]
}"""
pipeline = pdal.Pipeline(json)
count = pipeline.execute()
Sequence of stages
You can construct a pipeline from a list or sequence of Stage objects:
import pdal
reader = pdal.Reader("1.2-with-color.las")
filter_stage = pdal.Filter.sort(dimension="X")
# Pass as a sequence
pipeline = pdal.Pipeline([reader, filter_stage])
Pipe operator
The most Pythonic approach uses the pipe operator (|) to chain stages together:
import pdal
pipeline = pdal.Reader("1.2-with-color.las") | pdal.Filter.sort(dimension="X")
You can pipe stages to pipelines, pipelines to stages, or pipelines to other pipelines:
# Stage to stage
pipeline = stage1 | stage2
# Stage to pipeline
pipeline = stage1 | existing_pipeline
# Pipeline to stage
pipeline = existing_pipeline | stage1
# Pipeline to pipeline
pipeline = pipeline1 | pipeline2
In-place pipeline updates
To update an existing pipeline without creating a new one, use the in-place pipe operator (|=):
import pdal
# Update pipeline in-place
pipeline = pdal.Pipeline()
pipeline |= pdal.Reader("input.las")
pipeline |= pdal.Filter.range(limits="Intensity[50:200)")
pipeline |= pdal.Writer.las(filename="output.las")
Execution methods
PDAL Python provides multiple execution methods depending on your memory and performance requirements.
Standard execution
The execute() method runs the entire pipeline and loads all point data into memory:
import pdal
pipeline = pdal.Reader("test/data/autzen-utm.las").pipeline()
count = pipeline.execute() # Returns the number of points processed
arrays = pipeline.arrays # Access the resulting data
metadata = pipeline.metadata
log = pipeline.log
Signature:
def execute(self) -> int:
"""Execute the pipeline and return the point count."""
Streaming execution
For pipelines that consist exclusively of streamable stages, use execute_streaming() to process data in chunks without storing all points in memory:
import pdal
pipeline = pdal.Reader("large-file.las") | pdal.Filter.range(limits="Classification[2:2]")
if pipeline.streamable:
count = pipeline.execute_streaming(chunk_size=10000)
print(f"Processed {count} points")
Signature:
def execute_streaming(self, chunk_size: int = 10000) -> int:
"""Execute a streamable pipeline in streaming mode."""
This method is functionally equivalent to sum(map(len, pipeline.iterator(chunk_size))) but more efficient as it avoids allocating arrays in memory.
Iterator execution
The iterator() method returns an iterator that yields NumPy arrays of processed points, allowing you to work with data incrementally:
import pdal
import numpy as np
pipeline = pdal.Reader("test/data/autzen-utm.las") | pdal.Filter.range(limits="Intensity[80:120]")
for array in pipeline.iterator(chunk_size=500):
print(f"Processing chunk of {len(array)} points")
# Process each chunk as needed
# Or concatenate all chunks into one array
full_array = np.concatenate(list(pipeline.iterator(chunk_size=500)))
Signature:
def iterator(self, chunk_size: int = 10000, prefetch: int = 0) -> Iterator[np.ndarray]:
"""Return an iterator that yields arrays of up to chunk_size points."""
The optional prefetch parameter allows prefetching up to this number of arrays in parallel and buffering them until they are yielded.
Only pipelines where all stages are streamable can use execute_streaming() or iterator(). Check the pipeline.streamable property first.
You can pass NumPy arrays directly to a pipeline as input data:
import numpy as np
import pdal
# Create sample data
x_vals = [1.0, 2.0, 3.0, 4.0, 5.0]
y_vals = [6.0, 7.0, 8.0, 9.0, 10.0]
z_vals = [1.5, 3.5, 5.5, 7.5, 9.5]
test_data = np.array(
[(x, y, z) for x, y, z in zip(x_vals, y_vals, z_vals)],
dtype=[("X", float), ("Y", float), ("Z", float)],
)
# Create pipeline with input array
pipeline = pdal.Pipeline(
'{"pipeline": [{"type":"filters.range", "limits":"X[2.5:4.5]"}]}',
arrays=[test_data]
)
count = pipeline.execute()
filtered_arrays = pipeline.arrays
You can also use the .pipeline() method on a filter stage:
import pdal
# Filter intensity values from an array
filter_stage = pdal.Filter.expression(expression="Intensity >= 100 && Intensity < 300")
pipeline = filter_stage.pipeline(intensity_array)
count = pipeline.execute()
filtered = pipeline.arrays[0]
Pipeline properties
After constructing a pipeline, you can inspect its properties:
import pdal
pipeline = pdal.Reader("input.las") | pdal.Filter.sort(dimension="X")
# Check if pipeline is streamable
print(pipeline.streamable) # True or False
# Get the list of stages
print(pipeline.stages) # [<Reader>, <Filter>]
# Get or set log level
pipeline.loglevel = logging.DEBUG
print(pipeline.loglevel) # logging.DEBUG
Real-world example
Here’s a complete example that reads a LAS file, filters it with NumPy, passes it back to PDAL for further filtering, and writes the results:
import pdal
data = "https://github.com/PDAL/PDAL/blob/master/test/data/las/1.2-with-color.las?raw=true"
pipeline = pdal.Reader.las(filename=data).pipeline()
print(pipeline.execute()) # 1065 points
# Get the data from the first array
arr = pipeline.arrays[0]
# Filter out entries that have intensity < 50
intensity = arr[arr["Intensity"] > 30]
print(len(intensity)) # 704 points
# Now use pdal to clamp points that have intensity 100 <= v < 300
pipeline = pdal.Filter.expression(expression="Intensity >= 100 && Intensity < 300").pipeline(intensity)
print(pipeline.execute()) # 387 points
clamped = pipeline.arrays[0]
# Write the filtered data
pipeline = pdal.Writer.las(
filename="clamped.las",
offset_x="auto",
offset_y="auto",
offset_z="auto",
scale_x=0.01,
scale_y=0.01,
scale_z=0.01,
).pipeline(clamped)
print(pipeline.execute()) # 387 points