Source code for pdfnaut.cos.serializer

from collections import defaultdict
from io import BytesIO
from typing import Any, Literal

from ..cos.objects.base import PdfComment, PdfHexString, PdfName, PdfNull, PdfObject, PdfReference
from ..cos.objects.containers import PdfArray, PdfDictionary
from ..cos.objects.stream import PdfStream
from ..cos.objects.xref import (
    CompressedXRefEntry,
    FreeXRefEntry,
    InUseXRefEntry,
    PdfXRefEntry,
    PdfXRefSection,
    PdfXRefSubsection,
)
from ..exceptions import PdfWriteError
from .tokenizer import STRING_ESCAPE


def serialize_comment(comment: PdfComment) -> bytes:
    return b"%" + comment.value


def serialize_null(_) -> bytes:
    return b"null"


def serialize_bool(boolean: bool) -> bytes:
    return b"true" if boolean else b"false"


def serialize_literal_string(byte_str: bytes, *, keep_ascii: bool = False) -> bytes:
    output = bytearray()
    escape = {v: k for k, v in STRING_ESCAPE.items()}

    for char in byte_str:
        char = char.to_bytes(1, "big")

        if (esc := escape.get(char)) is not None:
            # character needs to be escaped
            output += esc
        elif keep_ascii and not char.isascii():
            # octal \ddd notation
            output += rf"\{ord(char):0>3o}".encode()
        else:
            # character does not need escaping
            output += char

    return b"(" + output + b")"


def serialize_name(name: PdfName) -> bytes:
    output = b"/"

    for char in name.value:
        char = char.to_bytes(1, "big")
        if char.isalnum():
            output += char
        else:
            output += rf"#{ord(char):x}".encode()

    return output


def serialize_hex_string(string: PdfHexString) -> bytes:
    return b"<" + string.raw + b">"


def serialize_indirect_ref(reference: PdfReference) -> bytes:
    return f"{reference.object_number} {reference.generation} R".encode()


def serialize_numeric(number: int | float) -> bytes:
    return str(number).encode()


def serialize_array(array: PdfArray) -> bytes:
    return b"[" + b" ".join(serialize(item) for item in array.data) + b"]"


def serialize_dictionary(dictionary: PdfDictionary) -> bytes:
    items = []
    for key, val in dictionary.data.items():
        items.append(serialize(PdfName(key.encode())))
        items.append(serialize(val))

    return b"<<" + b" ".join(items) + b">>"


def serialize_stream(stream: PdfStream, *, eol: bytes) -> bytes:
    return b"".join(
        [
            serialize_dictionary(stream.details) + eol,
            b"stream" + eol,
            stream.raw + eol,
            b"endstream",
        ]
    )


def serialize(object_: PdfObject | PdfComment, *, params: dict[str, Any] | None = None) -> bytes:
    if params is None:
        params = {}

    if isinstance(object_, PdfComment):
        return serialize_comment(object_)
    elif isinstance(object_, PdfName):
        return serialize_name(object_)
    elif isinstance(object_, bytes):
        return serialize_literal_string(object_, keep_ascii=params.get("keep_ascii", False))
    elif isinstance(object_, bool):
        return serialize_bool(object_)
    elif isinstance(object_, PdfNull):
        return serialize_null(object_)
    elif isinstance(object_, PdfHexString):
        return serialize_hex_string(object_)
    elif isinstance(object_, PdfReference):
        return serialize_indirect_ref(object_)
    elif isinstance(object_, (int, float)):
        return serialize_numeric(object_)
    elif isinstance(object_, PdfArray):
        return serialize_array(object_)
    elif isinstance(object_, PdfDictionary):
        return serialize_dictionary(object_)
    elif isinstance(object_, PdfStream):
        return serialize_stream(object_, eol=params["eol"])

    raise PdfWriteError(f"Cannot serialize type {type(object_)}")


[docs] class PdfSerializer: """A serializer that is able to produce a valid PDF document. Arguments: eol (bytes, optional): The end of line sequence to use when serializing. One of CR, LF, or CRLF. Defaults to CRLF. """
[docs] def __init__(self, *, eol: Literal[b"\r\n", b"\r", b"\n"] = b"\r\n") -> None: self.content = BytesIO() """The buffer containing the serialized content to be written.""" self.eol = eol
[docs] def write_header(self, version: str, *, with_binary_marker: bool = True) -> None: """Appends the PDF file header to the document (see § 7.5.2, "File Header"). Arguments: version (str): A string representing the version of the PDF file. with_binary_marker (bool, optional): Whether to also append the recommended binary marker. Defaults to True. """ comment = PdfComment(f"PDF-{version}".encode()) self.content.write(serialize_comment(comment) + self.eol) if with_binary_marker: marker = PdfComment(b"\xee\xe1\xf5\xf4") self.content.write(serialize_comment(marker) + self.eol)
[docs] def write_object(self, reference: PdfReference | tuple[int, int], contents: PdfObject) -> int: """Appends an indirect object to the document. Arguments: reference (PdfReference | tuple[int, int]): The object number and generation to which the object should be assigned. contents (PdfObject): The contents to associate with the reference. Returns: int: The offset where the indirect object starts. """ if isinstance(reference, tuple): reference = PdfReference(*reference) start_offset = self.content.tell() self.content.writelines( [ f"{reference.object_number} {reference.generation} obj".encode() + self.eol, serialize(contents, params={"eol": self.eol}) + self.eol, b"endobj" + self.eol, ] ) return start_offset
[docs] def generate_xref_section( self, rows: list[tuple[int, PdfXRefEntry]] ) -> list[PdfXRefSubsection]: """Generates a cross-reference section from a list of ``rows``. Each row consists of a two-element tuple containing the object number of the XRef entry and the entry itself. The object numbers will determine the amount of subsections created and the entries within them. The output is a list of XRef subsections that can be then serialized by either :meth:`.write_standard_xref_section` or :meth:`.write_compressed_xref_section`. """ rows = sorted(rows, key=lambda entry: entry[0]) subsections: defaultdict[int, list[tuple[int, PdfXRefEntry]]] = defaultdict(list) first_obj_num = rows[0][0] for entry in rows: subsections[first_obj_num].append(entry) if len(subsections[first_obj_num]) <= 1: continue first_key, _ = subsections[first_obj_num][-1] second_key, _ = subsections[first_obj_num][-2] if first_key != second_key and abs(first_key - second_key) != 1: # The keys should belong in different subsections. Move the last key to a # different subsection and set that subsection for the rest of entries. last = subsections[first_obj_num].pop() first_obj_num = last[0] subsections[first_obj_num].append(last) return [ PdfXRefSubsection(obj_num, len(entries), [ent for _, ent in entries]) for obj_num, entries in subsections.items() ]
[docs] def write_standard_xref_section(self, subsections: list[PdfXRefSubsection]) -> int: """Appends a standard XRef section (see ISO 32000-2:2020 § 7.5.4 "Cross-Reference Table") to the document. Returns the ``startxref`` offset that should be written to the document.""" startxref = self.content.tell() self.content.write(b"xref" + self.eol) for subsection in subsections: self.content.write( f"{subsection.first_obj_number} {subsection.count}".encode() + self.eol ) for entry in subsection.entries: if isinstance(entry, InUseXRefEntry): self.content.write(f"{entry.offset:0>10} {entry.generation:0>5} n".encode()) elif isinstance(entry, FreeXRefEntry): self.content.write( f"{entry.next_free_object:0>10} {entry.gen_if_used_again:0>5} f".encode() ) else: raise PdfWriteError( "cannot write a compressed xref entry within a standard xref section." ) self.content.write(self.eol) return startxref
[docs] def write_compressed_xref_section(self, section: PdfXRefSection) -> int: """Appends a compressed XRef stream (see ISO 32000-2:2020 § 7.5.8 "Cross-Reference Streams") from ``section`` (to use as part of the extent) to the document. Returns the ``startxref`` offset that should be written to the document. """ indices: PdfArray[PdfArray[int]] = PdfArray() table_rows: list[list[int]] = [] for subsection in section.subsections: indices.append(PdfArray([subsection.first_obj_number, subsection.count])) for entry in subsection.entries: if isinstance(entry, FreeXRefEntry): table_rows.append([0, entry.next_free_object, entry.gen_if_used_again]) elif isinstance(entry, InUseXRefEntry): table_rows.append([1, entry.offset, entry.generation]) elif isinstance(entry, CompressedXRefEntry): table_rows.append([2, entry.objstm_number, entry.index_within]) def max_byte_width(col: tuple[int, ...]) -> int: return ((max(col).bit_length() + 7) // 8) or 1 widths = [max_byte_width(column) for column in zip(*table_rows)] content_per_row = [] for row in table_rows: content_per_row.append( b"".join(item.to_bytes(widths[idx], "big") for idx, item in enumerate(row)) ) contents = b"".join(content_per_row) stream = PdfStream( PdfDictionary( Type=PdfName(b"XRef"), W=PdfArray(widths), Index=PdfArray(sum(indices, start=PdfArray())), Length=len(contents), **section.trailer.data, ), contents, ) highest_objnum = sum(max(indices, key=sum)) return self.write_object((highest_objnum, 0), stream)
[docs] def write_trailer( self, trailer: PdfDictionary | None = None, startxref: int | None = None ) -> None: """Appends a standard ``trailer`` to the document (see ISO 32000-2:2020 § 7.5.5 "File Trailer") alongside the ``startxref`` offset. Both arguments are optional, indicating their presence in the appended output. If the XRef section written previously was an XRef stream, the trailer has already been written and should be ``None``. """ if trailer is not None: self.content.write(b"trailer" + self.eol) self.content.write(serialize_dictionary(trailer) + self.eol) if startxref is not None: self.content.write(b"startxref" + self.eol) self.content.write(str(startxref).encode() + self.eol)
[docs] def write_eof(self) -> None: """Appends the end of file (EOF) marker to the document.""" self.content.write(b"%%EOF" + self.eol)