import logging
from copy import deepcopy
from typing import BinaryIO, Iterable, Optional, Union
from ._pointwriter import IPointWriter
from .compression import LazBackend
from .errors import LaspyException
from .header import LasHeader
from .point import dims
from .point.record import PackedPointRecord
from .vlrs.vlrlist import VLRList
logger = logging.getLogger(__name__)
[docs]class LasWriter:
"""
Allows to write a complete LAS/LAZ file to the destination.
"""
[docs] def __init__(
self,
dest: BinaryIO,
header: LasHeader,
do_compress: Optional[bool] = None,
laz_backend: Optional[Union[LazBackend, Iterable[LazBackend]]] = None,
closefd: bool = True,
encoding_errors: str = "strict",
) -> None:
"""
Parameters
----------
dest: file_object
file object where the LAS/LAZ will be written
header: LasHeader
The header of the file to be written
do_compress: bool, optional
Whether the file data should be written as LAS (uncompressed)
or LAZ (compressed).
If None, the file won't be compressed, unless a laz_backend is provided
laz_backend: LazBackend or list of LazBackend, optional
The LazBackend to use (or if it is a sequence the LazBackend to try)
for the compression
closefd: bool, default True
should the `dest` be closed when the writer is closed
encoding_errors: str, default 'strict'
How encoding errors should be treated.
Possible values and their explanation can be seen here:
https://docs.python.org/3/library/codecs.html#error-handlers.
"""
self.closefd = closefd
self.encoding_errors = encoding_errors
self.header = deepcopy(header)
# The point writer will take take of creating and writing
# the correct laszip vlr, however we have to make sure
# no prior laszip vlr exists
try:
self.header.vlrs.pop(header.vlrs.index("LasZipVlr"))
except ValueError:
pass
self.header.partial_reset()
self.dest = dest
self.done = False
dims.raise_if_version_not_compatible_with_fmt(
header.point_format.id, str(self.header.version)
)
if laz_backend is not None:
if do_compress is None:
do_compress = True
self.laz_backend = laz_backend
else:
if do_compress is None:
do_compress = False
self.laz_backend = LazBackend.detect_available()
self.header.are_points_compressed = do_compress
if do_compress:
self.point_writer: IPointWriter = self._create_laz_backend(self.laz_backend)
else:
self.point_writer: IPointWriter = UncompressedPointWriter(self.dest)
self.point_writer.write_initial_header_and_vlrs(
self.header, self.encoding_errors
)
[docs] def write_points(self, points: PackedPointRecord) -> None:
"""
.. note ::
If you are writing points coming from multiple different input files
into one output file, you have to make sure the point record
you write all use the same scales and offset of the writer.
You can use :meth:`.LasData.change_scaling` or :meth:`.ScaleAwarePointRecord.change_scaling`
to do that.
Parameters
----------
points: PackedPointRecord or ScaleAwarePointRecord
The points to be written
Raises
------
LaspyException
If the point format of the points does not match
the point format of the writer.
"""
if not points:
return
if self.done:
raise LaspyException("Cannot write points anymore")
if points.point_format != self.header.point_format:
raise LaspyException("Incompatible point formats")
self.header.grow(points)
self.point_writer.write_points(points)
[docs] def write_evlrs(self, evlrs: VLRList) -> None:
"""Writes the EVLRs to the file
Parameters
----------
evlrs: VLRList
The EVLRs to be written
Raises
------
LaspyException
If the file's version is not >= 1.4
"""
if self.header.version.minor < 4:
raise LaspyException(
"EVLRs are not supported on files with version less than 1.4"
)
if len(evlrs) > 0:
self.point_writer.done()
self.done = True
self.header.number_of_evlrs = len(evlrs)
self.header.start_of_first_evlr = self.dest.tell()
evlrs.write_to(self.dest, as_extended=True)
[docs] def close(self) -> None:
"""Closes the writer.
flushes the points, updates the header, making it impossible
to write points afterwards.
"""
if self.point_writer is not None:
if not self.done:
self.point_writer.done()
if self.header.point_count == 0:
self.header.maxs = [0.0, 0.0, 0.0]
self.header.mins = [0.0, 0.0, 0.0]
self.point_writer.write_updated_header(self.header, self.encoding_errors)
if self.closefd:
self.dest.close()
self.done = True
def _create_laz_backend(
self, laz_backends: Union[LazBackend, Iterable[LazBackend]]
) -> "IPointWriter":
try:
laz_backends = iter(laz_backends)
except TypeError:
laz_backends = (laz_backends,)
last_error: Optional[Exception] = None
for backend in laz_backends:
try:
if not backend.is_available():
raise LaspyException(f"The '{backend}' is not available")
return backend.create_writer(self.dest, self.header)
except Exception as e:
logger.error(e)
last_error = e
if last_error is not None:
raise LaspyException(f"No LazBackend could be initialized: {last_error}")
else:
raise LaspyException("No LazBackend selected, cannot compress")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class UncompressedPointWriter(IPointWriter):
"""
Writing points in the simple uncompressed case.
"""
def __init__(self, dest: BinaryIO) -> None:
self.dest = dest
@property
def destination(self) -> BinaryIO:
return self.dest
def write_points(self, points: PackedPointRecord) -> None:
self.dest.write(points.memoryview())
def done(self) -> None:
pass