Source code for laspy.lasappender

import io
from typing import BinaryIO, Iterable, Optional, Union

from ._pointappender import IPointAppender
from .compression import LazBackend
from .errors import LaspyException
from .header import LasHeader
from .point.record import PackedPointRecord
from .vlrs.vlrlist import VLRList


[docs]class LasAppender: """Allows to append points to and existing LAS/LAZ file. Appending to LAZ is only supported by the lazrs backend """ def __init__( self, dest: BinaryIO, laz_backend: Optional[Union[LazBackend, Iterable[LazBackend]]] = None, closefd: bool = True, encoding_errors: str = "strict", ) -> None: if not dest.seekable(): raise TypeError("Expected the 'dest' to be a seekable file object") header = LasHeader.read_from(dest) if laz_backend is None: laz_backend = [ bck for bck in LazBackend.detect_available() if bck.supports_append ] self.dest = dest self.header = header if not header.are_points_compressed: self.points_appender = UncompressedPointAppender(self.dest) self.dest.seek( (self.header.point_count * self.header.point_format.size) + self.header.offset_to_point_data, io.SEEK_SET, ) else: self.points_appender = self._create_laz_backend(laz_backend) if header.version.minor >= 4 and header.number_of_evlrs > 0: assert ( self.dest.tell() <= self.header.start_of_first_evlr ), "The position is past the start of evlrs" pos = self.dest.tell() self.dest.seek(self.header.start_of_first_evlr, io.SEEK_SET) self.evlrs: Optional[VLRList] = VLRList.read_from( self.dest, self.header.number_of_evlrs, extended=True ) dest.seek(self.header.start_of_first_evlr, io.SEEK_SET) self.dest.seek(pos, io.SEEK_SET) else: self.evlrs: Optional[VLRList] = None self.closefd = closefd self.encoding_errors = encoding_errors
[docs] def append_points(self, points: PackedPointRecord) -> None: """Append the points to the file, the points must have the same point format as the points already contained within the file. :param points: The points to append """ if points.point_format != self.header.point_format: raise LaspyException("Point formats do not match") self.points_appender.append_points(points) self.header.grow(points)
def close(self) -> None: self.points_appender.done() self._write_evlrs() self._write_updated_header() if self.closefd: self.dest.close() def _write_evlrs(self) -> None: if ( self.header.version.minor >= 4 and self.evlrs is not None and len(self.evlrs) > 0 ): self.header.number_of_evlr = len(self.evlrs) self.header.start_of_first_evlr = self.dest.tell() self.evlrs.write_to(self.dest, as_extended=True) def _write_updated_header(self) -> None: pos = self.dest.tell() self.dest.seek(0, io.SEEK_SET) self.header.write_to( self.dest, ensure_same_size=True, encoding_errors=self.encoding_errors ) self.dest.seek(pos, io.SEEK_SET) def _create_laz_backend( self, laz_backend: Union[LazBackend, Iterable[LazBackend]] = ( LazBackend.LazrsParallel, LazBackend.Lazrs, ), ) -> IPointAppender: try: laz_backend = iter(laz_backend) except TypeError: laz_backend = (laz_backend,) last_error: Optional[Exception] = None for backend in laz_backend: try: return backend.create_appender(self.dest, self.header) except Exception as e: last_error = e if last_error is not None: raise LaspyException(f"Could not initialize a laz backend: {last_error}") else: raise LaspyException(f"No valid laz backend selected") def __enter__(self) -> "LasAppender": return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close()
class UncompressedPointAppender(IPointAppender): """ Appending points in the simple uncompressed case. """ def __init__(self, dest: BinaryIO) -> None: self.dest = dest def append_points(self, points: PackedPointRecord) -> None: self.dest.write(points.memoryview()) def done(self) -> None: pass