Skip to main content
A Pipeline is the core object in PDAL Python that coordinates the execution of processing stages on point cloud data. It manages the flow of data through a sequence of readers, filters, and writers.

What is a Pipeline?

The Pipeline class extends PDAL’s native pipeline functionality to provide a Python-friendly interface. A pipeline defines a sequence of operations to perform on point cloud data, from reading input files to applying transformations and writing results.
import pdal

# Create a simple pipeline
pipeline = pdal.Pipeline()

Construction methods

You can construct pipelines in several ways, depending on your needs and preferences.

JSON string

The traditional PDAL approach uses a JSON string to define the pipeline configuration:
import pdal

json = """
{
  "pipeline": [
    "1.2-with-color.las",
    {
        "type": "filters.sort",
        "dimension": "X"
    }
  ]
}"""

pipeline = pdal.Pipeline(json)
count = pipeline.execute()

Sequence of stages

You can construct a pipeline from a list or sequence of Stage objects:
import pdal

reader = pdal.Reader("1.2-with-color.las")
filter_stage = pdal.Filter.sort(dimension="X")

# Pass as a sequence
pipeline = pdal.Pipeline([reader, filter_stage])

Pipe operator

The most Pythonic approach uses the pipe operator (|) to chain stages together:
import pdal

pipeline = pdal.Reader("1.2-with-color.las") | pdal.Filter.sort(dimension="X")
You can pipe stages to pipelines, pipelines to stages, or pipelines to other pipelines:
# Stage to stage
pipeline = stage1 | stage2

# Stage to pipeline
pipeline = stage1 | existing_pipeline

# Pipeline to stage
pipeline = existing_pipeline | stage1

# Pipeline to pipeline
pipeline = pipeline1 | pipeline2

In-place pipeline updates

To update an existing pipeline without creating a new one, use the in-place pipe operator (|=):
import pdal

# Update pipeline in-place
pipeline = pdal.Pipeline()
pipeline |= pdal.Reader("input.las")
pipeline |= pdal.Filter.range(limits="Intensity[50:200)")
pipeline |= pdal.Writer.las(filename="output.las")

Execution methods

PDAL Python provides multiple execution methods depending on your memory and performance requirements.

Standard execution

The execute() method runs the entire pipeline and loads all point data into memory:
import pdal

pipeline = pdal.Reader("test/data/autzen-utm.las").pipeline()
count = pipeline.execute()  # Returns the number of points processed
arrays = pipeline.arrays    # Access the resulting data
metadata = pipeline.metadata
log = pipeline.log
Signature:
def execute(self) -> int:
    """Execute the pipeline and return the point count."""

Streaming execution

For pipelines that consist exclusively of streamable stages, use execute_streaming() to process data in chunks without storing all points in memory:
import pdal

pipeline = pdal.Reader("large-file.las") | pdal.Filter.range(limits="Classification[2:2]")

if pipeline.streamable:
    count = pipeline.execute_streaming(chunk_size=10000)
    print(f"Processed {count} points")
Signature:
def execute_streaming(self, chunk_size: int = 10000) -> int:
    """Execute a streamable pipeline in streaming mode."""
This method is functionally equivalent to sum(map(len, pipeline.iterator(chunk_size))) but more efficient as it avoids allocating arrays in memory.

Iterator execution

The iterator() method returns an iterator that yields NumPy arrays of processed points, allowing you to work with data incrementally:
import pdal
import numpy as np

pipeline = pdal.Reader("test/data/autzen-utm.las") | pdal.Filter.range(limits="Intensity[80:120]")

for array in pipeline.iterator(chunk_size=500):
    print(f"Processing chunk of {len(array)} points")
    # Process each chunk as needed

# Or concatenate all chunks into one array
full_array = np.concatenate(list(pipeline.iterator(chunk_size=500)))
Signature:
def iterator(self, chunk_size: int = 10000, prefetch: int = 0) -> Iterator[np.ndarray]:
    """Return an iterator that yields arrays of up to chunk_size points."""
The optional prefetch parameter allows prefetching up to this number of arrays in parallel and buffering them until they are yielded.
Only pipelines where all stages are streamable can use execute_streaming() or iterator(). Check the pipeline.streamable property first.

Working with input arrays

You can pass NumPy arrays directly to a pipeline as input data:
import numpy as np
import pdal

# Create sample data
x_vals = [1.0, 2.0, 3.0, 4.0, 5.0]
y_vals = [6.0, 7.0, 8.0, 9.0, 10.0]
z_vals = [1.5, 3.5, 5.5, 7.5, 9.5]
test_data = np.array(
    [(x, y, z) for x, y, z in zip(x_vals, y_vals, z_vals)],
    dtype=[("X", float), ("Y", float), ("Z", float)],
)

# Create pipeline with input array
pipeline = pdal.Pipeline(
    '{"pipeline": [{"type":"filters.range", "limits":"X[2.5:4.5]"}]}',
    arrays=[test_data]
)
count = pipeline.execute()
filtered_arrays = pipeline.arrays
You can also use the .pipeline() method on a filter stage:
import pdal

# Filter intensity values from an array
filter_stage = pdal.Filter.expression(expression="Intensity >= 100 && Intensity < 300")
pipeline = filter_stage.pipeline(intensity_array)
count = pipeline.execute()
filtered = pipeline.arrays[0]

Pipeline properties

After constructing a pipeline, you can inspect its properties:
import pdal

pipeline = pdal.Reader("input.las") | pdal.Filter.sort(dimension="X")

# Check if pipeline is streamable
print(pipeline.streamable)  # True or False

# Get the list of stages
print(pipeline.stages)  # [<Reader>, <Filter>]

# Get or set log level
pipeline.loglevel = logging.DEBUG
print(pipeline.loglevel)  # logging.DEBUG

Real-world example

Here’s a complete example that reads a LAS file, filters it with NumPy, passes it back to PDAL for further filtering, and writes the results:
import pdal

data = "https://github.com/PDAL/PDAL/blob/master/test/data/las/1.2-with-color.las?raw=true"

pipeline = pdal.Reader.las(filename=data).pipeline()
print(pipeline.execute())  # 1065 points

# Get the data from the first array
arr = pipeline.arrays[0]

# Filter out entries that have intensity < 50
intensity = arr[arr["Intensity"] > 30]
print(len(intensity))  # 704 points

# Now use pdal to clamp points that have intensity 100 <= v < 300
pipeline = pdal.Filter.expression(expression="Intensity >= 100 && Intensity < 300").pipeline(intensity)
print(pipeline.execute())  # 387 points
clamped = pipeline.arrays[0]

# Write the filtered data
pipeline = pdal.Writer.las(
    filename="clamped.las",
    offset_x="auto",
    offset_y="auto",
    offset_z="auto",
    scale_x=0.01,
    scale_y=0.01,
    scale_z=0.01,
).pipeline(clamped)
print(pipeline.execute())  # 387 points

Build docs developers (and LLMs) love