The Writer class represents PDAL writer stages that save point cloud data to files or other destinations. Writers are typically the final stage in PDAL pipelines.
Constructor
Writer(filename: Optional[str] = None, **options: Any)
filename
Optional[str]
default:"None"
Path to the output file. Can be passed as the first positional argument or as a keyword argument.
Additional writer-specific options as keyword arguments.
Example
import pdal
# Specify filename positionally
writer = pdal.Writer("output.las")
# Specify filename as keyword argument
writer = pdal.Writer(filename="output.laz")
# With additional options
writer = pdal.Writer(
"output.las",
type="writers.las",
compression="lazperf",
scale_x=0.01,
scale_y=0.01,
scale_z=0.01
)
Type Inference
The Writer class automatically infers the appropriate PDAL writer driver from the file extension when type is not explicitly specified.
# Type automatically inferred as "writers.las"
writer = pdal.Writer("output.las")
# Type automatically inferred as "writers.bpf"
writer = pdal.Writer("output.bpf")
# Explicitly specify type (overrides inference)
writer = pdal.Writer("output.txt", type="writers.text")
Properties
Inherits all properties from Stage:
type - The writer type (e.g., "writers.las")
streamable - Whether the writer supports streaming
tag - Optional tag identifier
inputs - List of input stages or tags
options - Dictionary of all writer options
Methods
Inherits all methods from Stage:
pipeline(*arrays, loglevel) - Create a Pipeline from this writer
__or__(other) - Pipe operator for composing with other stages
Driver-Specific Static Methods
The Writer class provides static methods for each available PDAL writer driver. These are automatically generated and provide convenient shortcuts.
Common writers
las
Writer.las(filename: str, **kwargs) -> Writer
Write LAS/LAZ format files (ASPRS LAS).
writer = pdal.Writer.las("output.las")
# With compression and scaling
writer = pdal.Writer.las(
filename="output.laz",
compression="lazperf",
scale_x=0.01,
scale_y=0.01,
scale_z=0.01,
offset_x="auto",
offset_y="auto",
offset_z="auto"
)
bpf
Writer.bpf(filename: str, **kwargs) -> Writer
Write BPF (Binary Point Format) files.
writer = pdal.Writer.bpf("output.bpf")
text
Writer.text(filename: str, **kwargs) -> Writer
Write point cloud data to text/CSV files.
writer = pdal.Writer.text(
filename="points.csv",
order="X,Y,Z,Intensity",
delimiter=",",
write_header=True
)
gdal
Writer.gdal(filename: str, **kwargs) -> Writer
Write raster data using GDAL.
writer = pdal.Writer.gdal(
filename="elevation.tif",
resolution=1.0,
output_type="idw",
data_type="float32",
where="Classification == 2" # Ground points only
)
tiledb
Writer.tiledb(array_name: str, **kwargs) -> Writer
Write point cloud data to TileDB arrays.
# Requires stats filter to determine domain extent
pipeline = (
pdal.Reader.las("input.las")
| pdal.Filter.stats()
| pdal.Writer.tiledb(array_name="my_point_cloud")
)
ply
Writer.ply(filename: str, **kwargs) -> Writer
Write PLY (Polygon File Format) files.
writer = pdal.Writer.ply(
filename="output.ply",
dims="X,Y,Z,Red,Green,Blue"
)
null
Writer.null(**kwargs) -> Writer
Discard output (useful for testing or when using side effects).
writer = pdal.Writer.null()
The specific static methods available depend on the PDAL drivers installed in your environment. Use help(pdal.Writer.driver_name) to see documentation for a specific driver.
Usage Examples
Basic file writing
import pdal
# Read, process, and write
pipeline = (
pdal.Reader.las("input.las")
| pdal.Filter.range(limits="Classification[2:2]")
| pdal.Writer.las("ground_points.laz", compression="lazperf")
)
count = pipeline.execute()
print(f"Wrote {count} points")
Writing Numpy arrays
import numpy as np
import pdal
# Create point cloud data in Numpy
points = np.array(
[(0, 0, 0), (1, 1, 1), (2, 2, 2)],
dtype=[('X', float), ('Y', float), ('Z', float)]
)
# Write to file
pipeline = pdal.Writer.las(
filename="output.las",
offset_x="auto",
offset_y="auto",
offset_z="auto",
scale_x=0.01,
scale_y=0.01,
scale_z=0.01
).pipeline(points)
pipeline.execute()
import pdal
# Write to both LAS and CSV
reader = pdal.Reader.las("input.las")
filter = pdal.Filter.range(limits="Intensity[100:200]")
las_writer = pdal.Writer.las("output.laz", compression="lazperf", tag="las_out")
csv_writer = pdal.Writer.text(
filename="output.csv",
order="X,Y,Z,Intensity",
delimiter=",",
tag="csv_out"
)
# Both writers receive the same filtered data
pipeline = pdal.Pipeline([reader, filter, las_writer, csv_writer])
pipeline.execute()
Creating a raster DTM
import pdal
pipeline = (
pdal.Reader.las("input.laz")
| pdal.Filter.range(limits="Classification![7:7]") # Remove noise
| pdal.Filter.assign(value=["Classification=0"]) # Reset
| pdal.Filter.smrf() # Classify ground
| pdal.Writer.gdal(
filename="dtm.tif",
where="Classification == 2",
data_type="float32",
resolution=10,
output_type="idw",
window_size=3
)
)
pipeline.execute()
Writing to TileDB with optimal settings
import pdal
# Stats filter is recommended for TileDB to determine domain extent
pipeline = (
pdal.Reader.las("input.laz")
| pdal.Filter.range(limits="Intensity[100:200]")
| pdal.Filter.stats()
| pdal.Writer.tiledb(array_name="filtered_points")
)
count = pipeline.execute()
# Inspect the TileDB schema
import tiledb
with tiledb.open("filtered_points") as array:
print(array.schema)
Streaming large file writes
import numpy as np
import pdal
# Set up input pipeline for reading
in_pipeline = pdal.Reader.las("large_input.laz").pipeline()
in_iterator = in_pipeline.iterator(chunk_size=10_000_000).__iter__()
# Set up output pipeline for writing
out_buffer = np.zeros(10_000_000, dtype=[('X', float), ('Y', float), ('Z', float)])
def load_next_chunk():
try:
next_chunk = next(in_iterator)
except StopIteration:
return 0
chunk_size = next_chunk.size
out_buffer[:chunk_size]['X'] = next_chunk[:]['X']
out_buffer[:chunk_size]['Y'] = next_chunk[:]['Y']
out_buffer[:chunk_size]['Z'] = next_chunk[:]['Z']
return chunk_size
out_pipeline = pdal.Writer.las(
filename="output.laz",
compression="lazperf"
).pipeline()
out_pipeline.inputs = [(out_buffer, load_next_chunk)]
count = out_pipeline.execute_streaming(chunk_size=50_000_000)
print(f"Wrote {count} points")
import pdal
pipeline = (
pdal.Reader.las("input.las")
| pdal.Filter.reprojection(
in_srs="EPSG:4326",
out_srs="EPSG:3857"
)
| pdal.Writer.las(
filename="reprojected.las",
a_srs="EPSG:3857"
)
)
pipeline.execute()