Skip to main content
Streamable pipelines (pipelines that consist exclusively of streamable PDAL stages) can be executed in streaming mode. This allows you to process large point cloud datasets that may not fit entirely in memory.

Executing streamable pipelines with iterator()

The Pipeline.iterator() method returns an iterator object that yields NumPy arrays of up to chunk_size size at a time.
import pdal
pipeline = pdal.Reader("test/data/autzen-utm.las") | pdal.Filter.expression(expression="Intensity > 80 && Intensity < 120)")
for array in pipeline.iterator(chunk_size=500):
    print(len(array))
# or to concatenate all arrays into one
# full_array = np.concatenate(list(pipeline))

Parameters

  • chunk_size (default=10000): The maximum number of points to include in each yielded array
  • prefetch (default=0): Allows prefetching up to this number of arrays in parallel and buffering them until they are yielded to the caller

execute_streaming() method

If you just want to execute a streamable pipeline in streaming mode and don’t need to access the data points (typically when the pipeline has Writer stage(s)), you can use the Pipeline.execute_streaming(chunk_size) method instead. This is functionally equivalent to sum(map(len, pipeline.iterator(chunk_size))) but more efficient as it avoids allocating and filling any arrays in memory.
import pdal
pipeline = pdal.Reader("input.las") | pdal.Writer.las(filename="output.las")
count = pipeline.execute_streaming(chunk_size=10000)
print(f"Processed {count} points")

Using arrays as buffers with stream handlers

It’s possible to treat NumPy arrays passed to PDAL as buffers that are iteratively populated through custom Python functions during the execution of the pipeline. This may be useful in cases where you want the reading of the input data to be handled in a streamable fashion, such as:
  • When the total NumPy array data wouldn’t fit into memory
  • To initiate execution of a streamable PDAL pipeline while the input data is still being read
To enable this mode, you need to include the Python populate function along with each corresponding NumPy array.
import numpy as np
import pdal

# Numpy array to be used as buffer
in_buffer = np.zeros(max_chunk_size, dtype=[("X", float), ("Y", float), ("Z", float)])

# The function to populate the buffer iteratively
def load_next_chunk() -> int:
    """
    Function called by PDAL before reading the data from the buffer.

    IMPORTANT: must return the total number of items to be read from the buffer.
    The Pipeline execution will keep calling this function in a loop until 0 is returned.
    """
    #
    # Replace here with your code that populates the buffer and returns the number of elements to read
    #
    chunk_size = next_chunk.size
    in_buffer[:chunk_size]["X"] = next_chunk[:]["X"]
    in_buffer[:chunk_size]["Y"] = next_chunk[:]["Y"]
    in_buffer[:chunk_size]["Z"] = next_chunk[:]["Z"]

    return chunk_size

# Configure input array and handler during Pipeline initialization...
p = pdal.Pipeline(pipeline_json, arrays=[in_buffer], stream_handlers=[load_next_chunk])

# ...alternatively you can use the setter on an existing Pipeline
# p.inputs = [(in_buffer, load_next_chunk)]

Complete streaming example

The following example demonstrates how to stream the read and write of a very large LAZ file with a low memory footprint:
import numpy as np
import pdal

in_chunk_size = 10_000_000
in_pipeline = pdal.Reader.las(**{
    "filename": "in_test.laz"
}).pipeline()

in_pipeline_it = in_pipeline.iterator(in_chunk_size).__iter__()

out_chunk_size = 50_000_000
out_file = "out_test.laz"
out_pipeline = pdal.Writer.las(
    filename=out_file
).pipeline()

out_buffer = np.zeros(in_chunk_size, dtype=[("X", float), ("Y", float), ("Z", float)])

def load_next_chunk():
    try:
        next_chunk = next(in_pipeline_it)
    except StopIteration:
        # Stops the streaming
        return 0

    chunk_size = next_chunk.size
    out_buffer[:chunk_size]["X"] = next_chunk[:]["X"]
    out_buffer[:chunk_size]["Y"] = next_chunk[:]["Y"]
    out_buffer[:chunk_size]["Z"] = next_chunk[:]["Z"]

    print(f"Loaded next chunk -> {chunk_size}")

    return chunk_size

out_pipeline.inputs = [(out_buffer, load_next_chunk)]

out_pipeline.loglevel = 20 # INFO
count = out_pipeline.execute_streaming(out_chunk_size)

print(f"\nWROTE - {count}")
This example:
  1. Creates an input pipeline to read a LAZ file in chunks of 10 million points
  2. Sets up an output pipeline to write to a new LAZ file
  3. Uses a buffer array and handler function to transfer data between pipelines
  4. Executes the streaming write operation with a 50 million point chunk size
  5. Prints progress as chunks are loaded and the final point count

Build docs developers (and LLMs) love