from __future__ import annotations
import hashlib
import logging
import re
from collections import UserDict
from datetime import time
from enum import IntEnum
from functools import partial
from io import BufferedIOBase, BytesIO
from pathlib import Path
from typing import IO, BinaryIO, TypeVar, cast
from typing_extensions import TypeAlias
from ..common._utils import get_closest
from ..cos.objects.base import PdfHexString, PdfName, PdfNull, PdfObject, PdfReference
from ..cos.objects.containers import PdfArray, PdfDictionary
from ..cos.objects.stream import PdfStream
from ..cos.objects.xref import (
CompressedXRefEntry,
FreeXRefEntry,
InUseXRefEntry,
PdfXRefEntry,
PdfXRefSection,
PdfXRefSubsection,
)
from ..exceptions import PdfParseError
from ..security.standard_handler import StandardSecurityHandler
from .serializer import PdfSerializer, serialize
from .tokenizer import PdfTokenizer
LOGGER = logging.getLogger(__name__)
[docs]
def generate_file_id(filename: str, content_size: int) -> PdfHexString:
"""Generates a file identifier using ``filename`` and ``content_size`` as
described in ISO 32000-2:2020 § 14.4 "File identifiers".
File identifiers are values that uniquely separate a revision of a document
from another. The file identifier is generated using the same information
specified in the standard, that is, the current time, the file path and
the file size in bytes.
"""
id_digest = hashlib.md5(time().isoformat("auto").encode())
id_digest.update(filename.encode())
id_digest.update(str(content_size).encode())
return PdfHexString(id_digest.hexdigest().encode())
[docs]
class PermsAcquired(IntEnum):
"""Permissions acquired after opening or decrypting a document."""
NONE = 0
"""No permissions acquired, document is still encrypted."""
USER = 1
"""User permissions within the limits specified by the security handler."""
OWNER = 2
"""Owner permissions (all permissions)."""
class FreeObject:
def __repr__(self) -> str:
return "free"
MapObject: TypeAlias = "PdfObject | FreeObject"
[docs]
class ObjectStream:
"""A mapping of object numbers to PDF objects representing an object stream
(see ISO 32000-2:2020 § 7.5.7 "Object Streams")."""
[docs]
def __init__(self, pdf: PdfParser, stream: PdfStream, stream_objnum: int) -> None:
"""
Arguments:
pdf (PdfParser):
The PDF parser or document to which this object stream belongs.
stream (PdfStream):
The stream being represented by this object.
stream_objnum (int):
The object number of this stream within the PDF document.
"""
self.pdf = pdf
self.stream = stream
self.stream_objnum = stream_objnum
# object index: resolved object
self.resolved_objects: dict[int, PdfObject] = {}
self._decoded = self.stream.decode()
self._first = cast(int, self.stream.details["First"])
self._n_objects = cast(int, self.stream.details["N"])
# list of tuples of (object number, relative offset within)
self.index_pairs = self.parse_indices()
[docs]
def parse_indices(self) -> list[tuple[int, int]]:
"""Parses the object stream's indices.
The indices are a list of 2-element pairs specifying, in order, the object
number of an item within the stream and the object's location within the
stream relative to the offset in the /First key.
"""
index_tokenizer = PdfTokenizer(self._decoded[: self._first])
index_pairs = []
for _ in range(self._n_objects):
obj_num = cast(int, next(index_tokenizer))
relative_offset = cast(int, next(index_tokenizer)) # relative to /First
index_pairs.append((obj_num, relative_offset))
return index_pairs
[docs]
def get_object(self, index: int, *, cache: bool = True) -> PdfObject:
"""Gets an object at a specified ``index`` inside an object stream.
Arguments:
index (int):
The index of an object within the stream.
cache (bool, optional, keyword only):
Whether to access or write to the object store (by default, True).
If True, this method will always retrieve from and write objects
to the object store if possible. If False, this method will always
retrieve objects from the contents of the stream.
"""
if cache and index in self.resolved_objects:
return self.resolved_objects[index]
_, relative_offset = self.index_pairs[index]
start_of_obj_tokenizer = PdfTokenizer(self._decoded[self._first + relative_offset :])
start_of_obj_tokenizer.resolver = self.pdf.get_object
resolved = cast(PdfObject, next(start_of_obj_tokenizer))
if cache:
self.resolved_objects[index] = resolved
return resolved
[docs]
def to_stream(self) -> PdfStream:
"""Returns a :class:`.PdfStream` representing the contents of this object stream."""
object_string = b""
indices = []
for idx in range(self._n_objects):
if cached := self.resolved_objects.get(idx):
writing_object = cached
else:
writing_object = self.get_object(idx, cache=False)
obj_num, _ = self.index_pairs[idx]
entry = self.pdf.xref.get((obj_num, 0))
start_offset = len(object_string)
if entry is None:
new_obj_num = self.pdf.objects.add(writing_object).object_number
# guarantee that the pdf processor doesn't write a new object
# but rather uses the one from the object stream
self.pdf.xref[(new_obj_num, 0)] = CompressedXRefEntry(
self.stream_objnum, start_offset
)
else:
new_obj_num = obj_num
object_string += serialize(writing_object) + b" "
indices.append((new_obj_num, start_offset))
index_string = b""
for obj_num, rel_offset in indices:
index_string += f"{obj_num} {rel_offset}".encode() + b" "
objstm_data = self.stream.details | PdfDictionary(
Type=PdfName(b"ObjStm"),
N=len(indices),
First=len(index_string),
Length=0, # to be filled in
)
return PdfStream.create(index_string + object_string, cast(PdfDictionary, objstm_data))
[docs]
class ObjectMap(UserDict[int, MapObject]):
"""A mapping of object numbers to either object references, in-use objects or free objects.
Object references included in :attr:`.ObjectMap.unresolved` are items that have not been
requested yet. Once an object is requested, it is removed from the unresolved set and
added to the map as is.
Free objects are indicated with the :class:`.FreeObject` class.
"""
[docs]
def __init__(self, pdf: PdfParser) -> None:
super().__init__()
self._pdf = pdf
self.initial_reference_map: dict[int, tuple[int, int]] = {}
"""A mapping of object numbers to reference tuples for the initial entries made
when the object map is filled."""
self.unresolved = set()
"""A set of unresolved object numbers (objects that have not been requested
or cached yet)."""
[docs]
def fill(self) -> None:
"""Fills the object map with the items available in the PDF's xref table."""
self.initial_reference_map = {obj: (obj, gen) for (obj, gen) in self._pdf.xref.keys()}
self.unresolved.clear()
for obj, gen in self.initial_reference_map.values():
entry = self._pdf.xref[(obj, gen)]
if isinstance(entry, FreeXRefEntry):
self[obj] = FreeObject()
else:
self[obj] = PdfReference(obj, gen).with_resolver(self._pdf.get_object)
self.unresolved.add(obj)
T = TypeVar("T")
[docs]
def get_next_ref(self) -> PdfReference:
"""Creates a new reference based on the current object number in the map."""
if not self:
return PdfReference(1, 0)
highest_objnum = max(self.keys())
return PdfReference(highest_objnum + 1, 0)
[docs]
def add(self, pdf_object: PdfObject) -> PdfReference[PdfObject]:
"""Adds a new ``pdf_object`` to the map. Returns its reference."""
reference = self.get_next_ref()
self[reference.object_number] = pdf_object
return reference.with_resolver(self._pdf.get_object)
[docs]
def delete(self, obj_num: int) -> MapObject | None:
"""Deletes object with number ``obj_num``. Returns the object if it
exists, otherwise returns None."""
return self.pop(obj_num, None)
[docs]
def free(self, obj_num: int) -> None:
"""Marks object with number ``obj_num`` as a free object."""
self[obj_num] = FreeObject()
def __getitem__(self, obj_num: int) -> MapObject:
value = super().__getitem__(obj_num)
if isinstance(value, PdfReference) and obj_num in self.unresolved:
resolved = self[obj_num] = value.get()
self.unresolved.discard(obj_num)
return resolved
return value
[docs]
class PdfParser:
"""A parser that can completely parse a PDF document.
It consumes the PDF's cross-reference tables and trailers. It merges the tables
into a single one and provides an interface to individually parse each indirect
object using :class:`~pdfnaut.cos.tokenizer.PdfTokenizer`.
Arguments:
data (bytes):
The document to be processed.
strict (bool, optional, keyword only):
Whether to warn or fail on issues caused by non-spec-compliance.
Defaults to False.
"""
[docs]
def __init__(self, data: bytes, *, strict: bool = False) -> None:
self.strict = strict
self._tokenizer = PdfTokenizer(data)
self._tokenizer.resolver = self.get_object
# object number: object stream
self._objstm_cache: dict[int, ObjectStream] = {}
# object number: direct object
self.objects = ObjectMap(self)
"""A mapping of objects present in the document."""
self.updates: list[PdfXRefSection] = []
"""A list of all incremental updates present in the document (most recent update first)."""
# placeholder to make the type checker happy
self.trailer = PdfDictionary[str, PdfObject]({"Size": 0, "Root": PdfReference(0, 0)})
"""The most recent trailer in the PDF document.
For details on the contents of the trailer, see ISO 32000-2:2020 § 7.5.5 "File Trailer".
"""
self.xref: dict[tuple[int, int], PdfXRefEntry] = {}
"""A cross-reference mapping combining the entries of all XRef tables present
in the document.
The key is a tuple of two integers: object number and generation number.
The value is any of the 3 types of XRef entries (free, in use, compressed).
This attribute reflects the state of the XRef table when the document was
first loaded. Assume read-only.
"""
self.header_version = ""
"""The document's PDF version as seen in the header.
This value should be used if no Version entry exists in the document catalog or
if the header's version is newer. Otherwise, use the Version entry.
"""
self.security_handler = None
"""The document's standard security handler, if any, as specified in the Encrypt
dictionary of the PDF trailer.
This field being set indicates that a supported security handler was used for
encryption. If not set, the parser will not attempt to decrypt this document.
"""
self._encryption_key = None
self._hot_references: list[PdfReference] = []
"""A list of references being currently processed by :meth:`.get_object()`.
This is here as a measure to prevent circular reference loops.
"""
[docs]
def parse(self, start_xref: int | None = None) -> None:
"""Parses the entire document.
It begins by parsing the most recent XRef table and trailer. If this trailer
points to a previous XRef, this function is called again with a ``start_xref``
offset until no more XRefs are found.
It also sets up the Standard security handler for use in case the document
is encrypted.
Arguments:
start_xref (int, optional):
The offset where the most recent XRef can be found. If no offset is
provided, this function will attempt to locate one.
"""
# Move to the header
self._tokenizer.position = 0
self.header_version = self.parse_header()
# Because the function may be called recursively, we check if this is the first call.
if start_xref is None:
start_xref = self.lookup_xref_start()
# Move to the offset where the XRef and trailer are
self._tokenizer.position = start_xref
section = self.parse_xref_and_trailer()
self.updates.append(section)
if "Prev" in section.trailer:
# More XRefs were found. Recurse!
self._tokenizer.position = 0
self.parse(cast(int, section.trailer["Prev"]))
else:
# That's it. Merge them together.
self.xref = self.get_merged_xrefs()
self.trailer = self.updates[0].trailer
# Fills the object store so we can refer to objects now!
self.objects.fill()
# Is the document encrypted with a standard security handler?
if "Encrypt" in self.trailer:
assert "ID" in self.trailer
encryption = cast(PdfDictionary, self.trailer["Encrypt"])
if cast(PdfName, encryption["Filter"]).value == b"Standard":
self.security_handler = StandardSecurityHandler(
encryption, cast("list[PdfHexString | bytes]", self.trailer["ID"])
)
[docs]
def build_xref_map(
self, subsections: list[PdfXRefSubsection]
) -> dict[tuple[int, int], PdfXRefEntry]:
"""Creates a dictionary mapping references to XRef entries in the document."""
entry_map: dict[tuple[int, int], PdfXRefEntry] = {}
for subsection in subsections:
for idx, entry in enumerate(subsection.entries, subsection.first_obj_number):
if isinstance(entry, FreeXRefEntry):
gen = entry.gen_if_used_again
elif isinstance(entry, InUseXRefEntry):
gen = entry.generation
else:
# compressed entries are assumed 0
gen = 0
entry_map[(idx, gen)] = entry
return entry_map
[docs]
def get_merged_xrefs(self) -> dict[tuple[int, int], PdfXRefEntry]:
"""Combines all XRef updates in the document into a cross-reference mapping
that includes all entries."""
entry_map: dict[tuple[int, int], PdfXRefEntry] = {}
hybrid_objnums = []
# from least recent to most recent
for section in self.updates[::-1]:
update_map = self.build_xref_map(section.subsections)
# if the document is a hybrid-reference file, append any hidden objects.
if "XRefStm" in section.trailer:
self._tokenizer.position = cast(int, section.trailer["XRefStm"])
xrefstm = self.parse_compressed_xref()
hybrid_map = self.build_xref_map(xrefstm.subsections)
for (obj, gen), hybrid_entry in hybrid_map.items():
update_entry = update_map.get((obj, gen))
# But only append if they aren't a thing or they are marked as "free"
if update_entry is None or (
isinstance(update_entry, FreeXRefEntry) and hybrid_entry is not None
):
entry_map[(obj, gen)] = hybrid_entry
hybrid_objnums.append(obj)
entry_map.update(update_map)
# If entries from the "hybrid section" were added, we have to remove
# the free entries they are meant to replace. Otherwise, the object store
# might get a bit confused and panic.
for objnum in hybrid_objnums:
for (num, gen), entry in entry_map.items():
if num == objnum and isinstance(entry, FreeXRefEntry):
del entry_map[(num, gen)]
break
return entry_map
[docs]
def lookup_xref_start(self) -> int:
"""Scans through the PDF until it finds the XRef offset then returns it."""
contents = bytearray()
# The PDF spec tells us we need to parse from the end of the file
# and the XRef comes first
self._tokenizer.position = len(self._tokenizer.data) - 1
while self._tokenizer.position > 0:
contents.insert(0, ord(self._tokenizer.peek()))
if contents.startswith(b"startxref"):
break
self._tokenizer.position -= 1
if not contents.startswith(b"startxref"):
raise PdfParseError("Cannot locate XRef table. 'startxref' offset missing.")
# advance to the startxref offset, we know it's there.
self._tokenizer.skip(9)
self._tokenizer.skip_whitespace()
return int(self._tokenizer.parse_numeric()) # startxref
[docs]
def parse_xref_and_trailer(self) -> PdfXRefSection:
"""Parses both the cross-reference table and the PDF trailer.
PDFs may include a typical uncompressed XRef table (and hence separate XRefs and
trailers) or an XRef stream that combines both.
"""
start_offset = self._tokenizer.position
if self._tokenizer.matches(b"xref"):
xref = self.parse_simple_xref()
self._tokenizer.skip_whitespace()
trailer = self.parse_simple_trailer()
return PdfXRefSection(xref, trailer)
elif self._tokenizer.try_parse_indirect(header=True) is not None:
self._tokenizer.position = start_offset
return self.parse_compressed_xref()
elif not self.strict:
LOGGER.warning("did not find xref table at offset %d", self._tokenizer.position)
# let's attempt to locate a nearby xref table
target = self._tokenizer.position
table_offsets = self._find_xref_offsets()
# get the xref table nearest to our offset
self._tokenizer.position = get_closest(table_offsets, target)
section = self.parse_xref_and_trailer()
# make sure the user can see our corrections
if "Prev" in section.trailer:
section.trailer["Prev"] = get_closest(
table_offsets, cast(int, section.trailer["Prev"])
)
return section
else:
raise PdfParseError("XRef offset does not point to XRef section.")
def _find_xref_offsets(self) -> list[int]:
table_offsets = []
# looks for the start of a xref table
for mat in re.finditer(rb"(?<!start)xref(\W*)(\d+) (\d+)", self._tokenizer.data):
table_offsets.append(mat.start())
# looks for indirect objects, then checks if they are xref streams
for mat in re.finditer(rb"(?P<num>\d+)\s+(?P<gen>\d+)\s+obj", self._tokenizer.data):
self._tokenizer.position = mat.start()
self._tokenizer.skip(mat.end() - mat.start())
self._tokenizer.skip_whitespace()
if self._tokenizer.matches(b"<<"):
mapping = self._tokenizer.parse_dictionary()
if isinstance(typ := mapping.get("Type"), PdfName) and typ.value == b"XRef":
table_offsets.append(mat.start())
return sorted(table_offsets)
[docs]
def parse_simple_trailer(self) -> PdfDictionary:
"""Parses the PDF's standard trailer which is used to quickly locate other
cross reference tables and special objects.
The trailer is separate if the XRef table is standard (uncompressed).
Otherwise it is part of the XRef object.
"""
self._tokenizer.skip(7) # past the 'trailer' keyword
self._tokenizer.skip_whitespace()
# next token is a dictionary
return self._tokenizer.parse_dictionary()
[docs]
def parse_simple_xref(self) -> list[PdfXRefSubsection]:
"""Parses a standard, uncompressed XRef table of the format described in
ISO 32000-2:2020 § 7.5.4 "Cross-Reference table".
If ``startxref`` points to an XRef object, :meth:`.parse_compressed_xref`
should be called instead.
"""
self._tokenizer.skip(4)
self._tokenizer.skip_whitespace()
subsections = []
while not self._tokenizer.done:
# subsection
subsection = re.match(
rb"(?P<first_obj>\d+)\s(?P<count>\d+)", self._tokenizer.peek_line()
)
if subsection is None:
break
self._tokenizer.skip(subsection.end())
self._tokenizer.skip_whitespace()
# xref entries
entries: list[PdfXRefEntry] = []
for idx in range(int(subsection.group("count"))):
entry = re.match(
rb"(?P<offset>\d{10}) (?P<gen>\d{5}) (?P<status>[fn])",
self._tokenizer.peek(20),
)
if entry is None:
raise PdfParseError(f"Expected valid XRef entry at row {idx + 1}")
offset = int(entry.group("offset"))
generation = int(entry.group("gen"))
if entry.group("status") == b"n":
entries.append(InUseXRefEntry(offset, generation))
else:
entries.append(FreeXRefEntry(offset, generation))
# some files do not respect the 20-byte length req. for entries
# hence this is here for tolerance
self._tokenizer.skip(entry.end())
self._tokenizer.skip_whitespace()
subsections.append(
PdfXRefSubsection(
int(subsection.group("first_obj")),
int(subsection.group("count")),
entries,
)
)
return subsections
[docs]
def parse_compressed_xref(self) -> PdfXRefSection:
"""Parses a compressed cross-reference stream which includes both the XRef table
and information from the PDF trailer as described in ISO 32000-2:2020 § 7.5.8
"Cross-reference streams".
"""
xref_stream = self.parse_indirect_object(InUseXRefEntry(self._tokenizer.position, 0), None)
assert isinstance(xref_stream, PdfStream)
contents = BytesIO(xref_stream.decode())
xref_widths = cast(PdfArray[int], xref_stream.details["W"])
xref_indices = cast(
PdfArray[int],
xref_stream.details.get("Index", PdfArray([0, xref_stream.details["Size"]])),
)
subsections = []
for idx in range(0, len(xref_indices), 2):
subsection = PdfXRefSubsection(
first_obj_number=xref_indices[idx],
count=xref_indices[idx + 1],
entries=[],
)
for _ in range(subsection.count):
field_type = int.from_bytes(contents.read(xref_widths[0]) or b"\x01", "big")
second = int.from_bytes(contents.read(xref_widths[1]), "big")
third = int.from_bytes(contents.read(xref_widths[2]), "big")
if field_type == 0:
subsection.entries.append(
FreeXRefEntry(next_free_object=second, gen_if_used_again=third)
)
elif field_type == 1:
subsection.entries.append(InUseXRefEntry(offset=second, generation=third))
elif field_type == 2:
subsection.entries.append(
CompressedXRefEntry(objstm_number=second, index_within=third)
)
else:
LOGGER.warning("ignoring unknown field type %s in xref table", field_type)
subsections.append(subsection)
return PdfXRefSection(subsections, xref_stream.details)
[docs]
def parse_indirect_object(
self, xref_entry: InUseXRefEntry, reference: PdfReference | None
) -> PdfObject:
"""Parses an indirect object not within an object stream, or basically, an object
that is directly referred to by an ``xref_entry`` and a ``reference``."""
self._tokenizer.position = xref_entry.offset
self._tokenizer.skip_whitespace()
obj_header = self._tokenizer.try_parse_indirect(header=True)
if obj_header is None:
raise PdfParseError("XRef entry does not point to a valid indirect object.")
self._tokenizer.skip_whitespace()
contents = self._tokenizer.get_next_token()
self._tokenizer.skip_whitespace()
# uh oh, a stream?
if self._tokenizer.matches(b"stream"):
extent = cast(PdfDictionary, contents)
# the implicit get_object call might move us around so we must save and then
# restore the previous position
_current = self._tokenizer.position
length = extent["Length"]
self._tokenizer.position = _current
if not isinstance(length, int):
raise PdfParseError("Length entry of stream extent not an integer")
item = PdfStream(extent, self.parse_stream(xref_entry, length))
else:
item = cast(PdfObject, contents)
return self._get_decrypted(item, reference)
def _get_decrypted(self, pdf_object: PdfObject, reference: PdfReference | None) -> PdfObject:
if self.security_handler is None or not self._encryption_key or reference is None:
return pdf_object
if isinstance(pdf_object, PdfStream):
use_stmf = True
# Don't use StmF if the stream handles its own encryption
if filter_ := pdf_object.details.get("Filter"):
if isinstance(filter_, PdfName):
filters = PdfArray[PdfName]([filter_])
else:
filters = cast(PdfArray[PdfName], filter_)
for name in filters:
if name.value == b"Crypt":
use_stmf = False
pdf_object._crypt_params = PdfDictionary(
Handler=self.security_handler,
EncryptionKey=self._encryption_key,
Reference=reference,
)
break
if use_stmf:
pdf_object.raw = self.security_handler.decrypt_object(
self._encryption_key, pdf_object, reference
)
return pdf_object
elif isinstance(pdf_object, PdfHexString):
return PdfHexString.from_raw(
self.security_handler.decrypt_object(
self._encryption_key, pdf_object.value, reference
)
)
elif isinstance(pdf_object, bytes):
return self.security_handler.decrypt_object(self._encryption_key, pdf_object, reference)
elif isinstance(pdf_object, PdfArray):
return PdfArray((self._get_decrypted(obj, reference) for obj in pdf_object.data))
elif isinstance(pdf_object, PdfDictionary):
# The Encrypt key does not need decrypting.
if reference == self.trailer.data["Encrypt"]:
return pdf_object
return PdfDictionary(
{
name: self._get_decrypted(cast(PdfObject, value), reference)
for name, value in pdf_object.data.items()
}
)
# Why would a number be encrypted?
return pdf_object
[docs]
def parse_stream(self, xref_entry: InUseXRefEntry, extent: int) -> bytes:
"""Parses the contents of a PDF stream at ``xref_entry``.
``extent`` specifies the amount of bytes the stream is expected to have.
"""
self._tokenizer.skip(6) # past the 'stream' keyword
self._tokenizer.skip_next_eol(no_cr=True)
contents = self._tokenizer.consume(extent)
self._tokenizer.skip_next_eol(no_cr=True)
if self.xref:
# We get the offset of the entry directly following this one as a bounds check
next_entry_at = iter(
val
for val in self.xref.values()
if isinstance(val, InUseXRefEntry) and val.offset > xref_entry.offset
)
else:
# The stream being parsed is (most likely) part of an XRef object
next_entry_at = iter([])
# Have we gone way beyond the stream?
try:
if self._tokenizer.position >= next(next_entry_at).offset:
raise PdfParseError("\\Length key in stream extent parses beyond object.")
except StopIteration:
pass
self._tokenizer.skip_whitespace()
# Are we done?
if not self._tokenizer.skip_if_matches(b"endstream"):
raise PdfParseError("\\Length key in stream extent does not match end of stream.")
return contents
[docs]
def get_object(
self, reference: PdfReference | tuple[int, int], cache: bool = True
) -> MapObject:
"""Resolves a reference into the indirect object it points to.
Arguments:
reference (PdfReference | tuple[int, int]):
A :class:`.PdfReference` object or a tuple of two integers representing,
in order, the object number and the generation number.
cache (bool, optional):
Whether to interact with the object store when resolving references.
Defaults to True.
When True, the parser will read entries from the object store and write new
ones if they are not present. If False, the parser will always fetch new
entries and will not write to the object store.
Note that the object store will be accessed regardless of the value of
``cache`` if the object is new and is not included in the xref table.
Returns:
The object the reference resolves to.
If the reference is invalid (i.e. does not exist), returns :class:`.PdfNull`.
If the object referred to is a free object, returns :class:`.FreeObject`.
"""
if isinstance(reference, tuple):
reference = PdfReference(*reference).with_resolver(self.get_object)
self._hot_references.append(reference)
if self._hot_references.count(reference) > 1:
loop = " -> ".join(
f"{ref.object_number} {ref.generation} R" for ref in self._hot_references
)
self._hot_references.clear()
raise PdfParseError(f"Possible circular reference loop hit: {loop}")
# If cache requested and the object is cached.
if cache and reference.object_number not in self.objects.unresolved:
self._hot_references.remove(reference)
return self.objects[reference.object_number]
root_entry = self.xref.get((reference.object_number, reference.generation))
if root_entry is None:
# the reference is referring to a new object not registered in the xref table
if (obj_entry := self.objects.get(reference.object_number)) is not None:
self._hot_references.remove(reference)
return obj_entry
return PdfNull()
if isinstance(root_entry, InUseXRefEntry):
obj = self.parse_indirect_object(root_entry, reference)
if not cache:
self._hot_references.remove(reference)
return obj
# Add to cache then set the object as resolved.
self.objects[reference.object_number] = obj
self.objects.unresolved.discard(reference.object_number)
self._hot_references.remove(reference)
return self.objects[reference.object_number]
elif isinstance(root_entry, CompressedXRefEntry):
# Get the object stream it's part of (gen always 0)
objstm_ref = (root_entry.objstm_number, 0)
objstm_entry = self.xref[objstm_ref]
assert isinstance(objstm_entry, InUseXRefEntry)
if cache and root_entry.objstm_number not in self.objects.unresolved:
objstm = self.objects[root_entry.objstm_number]
else:
objstm = self.parse_indirect_object(
objstm_entry,
PdfReference(*objstm_ref).with_resolver(partial(self.get_object, cache=False)),
)
assert isinstance(objstm, PdfStream)
if cache:
self.objects[root_entry.objstm_number] = objstm
self.objects.unresolved.discard(root_entry.objstm_number)
if cache and root_entry.objstm_number in self._objstm_cache:
stm = self._objstm_cache[root_entry.objstm_number]
else:
stm = ObjectStream(self, objstm, root_entry.objstm_number)
if cache:
self._objstm_cache[root_entry.objstm_number] = stm
self._hot_references.remove(reference)
return stm.get_object(root_entry.index_within)
self._hot_references.remove(reference)
return PdfNull()
[docs]
def decrypt(self, password: str) -> PermsAcquired:
"""Decrypts this document through the Standard security handler using the
provided ``password``.
The standard security handler may specify 2 passwords: an owner password and a user
password. The owner password would allow full access to the PDF and the user password
should allow access according to the permissions specified in the document.
When the document is decrypted successfully, the object cache is cleared to make way
for the new objects in decrypted form.
Returns:
PermsAcquired: A value specifying the permissions acquired by ``password``.
- If the document is not encrypted, defaults to :attr:`.PermsAcquired.OWNER`
- if the document was not decrypted, defaults to :attr:`.PermsAcquired.NONE`
"""
if self.security_handler is None:
return PermsAcquired.OWNER
# Is this the owner password?
encryption_key, is_owner_pass = self.security_handler.authenticate_owner_password(
password.encode()
)
if is_owner_pass:
self._encryption_key = encryption_key
self.objects.fill()
return PermsAcquired.OWNER
# Is this the user password?
encryption_key, is_user_pass = self.security_handler.authenticate_user_password(
password.encode()
)
if is_user_pass:
self._encryption_key = encryption_key
self.objects.fill()
return PermsAcquired.USER
return PermsAcquired.NONE
[docs]
def save(self, filepath: str | Path | IO[bytes]) -> None:
"""Saves the contents of this parser to ``filepath``.
``filepath`` may be either a string containing a path, a :class:`pathlib.Path`
instance, or a byte stream (that is, any class implementing :class:`IO[bytes]`).
"""
builder = PdfSerializer()
builder.write_header("2.0")
rows: list[tuple[int, PdfXRefEntry]] = []
use_compressed = False
update_freelist = False
for obj_num in self.objects:
ref_tup = self.objects.initial_reference_map.get(obj_num)
# Object is new
if ref_tup is None:
resolved = self._objstm_cache.get(obj_num, self.objects[obj_num])
if isinstance(resolved, ObjectStream):
resolved = resolved.to_stream()
if isinstance(resolved, FreeObject):
rows.append((obj_num, FreeXRefEntry(-1, 0)))
update_freelist = True
else:
offset = builder.write_object((obj_num, 0), resolved)
rows.append((obj_num, InUseXRefEntry(offset, 0)))
continue
# Object is modified or left intact
entry = self.xref[ref_tup]
if isinstance(entry, FreeXRefEntry):
resolved = self._objstm_cache.get(obj_num, self.objects[obj_num])
if isinstance(resolved, ObjectStream):
resolved = resolved.to_stream()
# Free entry left unmodified or can no longer be used
if isinstance(resolved, FreeObject) or entry.gen_if_used_again >= 65535:
rows.append(
(
obj_num,
FreeXRefEntry(entry.next_free_object, entry.gen_if_used_again),
)
)
continue
# Free entry now in use
offset = builder.write_object((obj_num, entry.gen_if_used_again), resolved)
rows.append((obj_num, InUseXRefEntry(offset, entry.gen_if_used_again)))
update_freelist = True
elif isinstance(entry, InUseXRefEntry):
resolved = self._objstm_cache.get(obj_num, self.objects[obj_num])
if isinstance(resolved, ObjectStream):
resolved = resolved.to_stream()
# In use object freed
if isinstance(resolved, FreeObject):
rows.append((obj_num, FreeXRefEntry(-1, entry.generation + 1)))
update_freelist = True
continue
# In use object either modified or left intact
offset = builder.write_object(ref_tup, resolved)
rows.append((obj_num, InUseXRefEntry(offset, entry.generation)))
elif isinstance(entry, CompressedXRefEntry):
use_compressed = True
rows.append(
(
obj_num,
CompressedXRefEntry(entry.objstm_number, entry.index_within),
)
)
if update_freelist:
# let's first get the members of the freelist
freelist_members = [
idx for idx, (_, entry) in enumerate(rows) if isinstance(entry, FreeXRefEntry)
]
for freelist_idx, xref_idx in enumerate(freelist_members):
obj_num, entry = rows[xref_idx]
assert isinstance(entry, FreeXRefEntry)
if freelist_idx + 1 < len(freelist_members):
entry.next_free_object = rows[freelist_members[freelist_idx + 1]][0]
else:
entry.next_free_object = 0
rows[xref_idx] = (obj_num, entry)
xref_section = builder.generate_xref_section(rows)
new_trailer = PdfDictionary(
{
"Size": len(self.build_xref_map(xref_section)),
"Root": self.trailer.data["Root"],
}
)
if "Info" in self.trailer.data:
new_trailer.data["Info"] = self.trailer.data["Info"]
if isinstance(filepath, BinaryIO):
filename = filepath.name
elif isinstance(filepath, BufferedIOBase):
filename = "" # no filename
else:
filename = str(filepath)
if "ID" in self.trailer.data:
ids = cast(PdfArray["PdfHexString | bytes"], self.trailer.data["ID"])
new_trailer.data["ID"] = PdfArray(
[ids[0], generate_file_id(filename, builder.content.tell())]
)
else:
new_id = generate_file_id(filename, builder.content.tell())
new_trailer.data["ID"] = PdfArray([new_id, new_id])
if use_compressed:
startxref = builder.write_compressed_xref_section(
PdfXRefSection(xref_section, new_trailer)
)
builder.write_trailer(None, startxref)
else:
startxref = builder.write_standard_xref_section(xref_section)
builder.write_trailer(new_trailer, startxref)
builder.write_eof()
if isinstance(filepath, (str, Path)):
with open(filepath, "wb") as output_fp:
output_fp.write(builder.content.getbuffer())
else:
filepath.write(builder.content.getbuffer())