from abc import ABC, abstractmethod
from collections.abc import Generator, Iterator, MutableMapping, MutableSequence
from typing import Any, Generic, Iterable, SupportsIndex, TypeGuard, cast, overload
from typing_extensions import Self, TypeVar
from ...common._utils import batched
from ..helpers import into_bytes, is_null_like
from .base import PdfHexString, PdfObject
from .containers import PdfArray, PdfDictionary
_K = TypeVar("_K", bound=PdfObject, default=PdfObject)
_V = TypeVar("_V", bound=PdfObject, default=PdfObject)
class _FlatMapping(Generic[_K, _V], MutableMapping[_K, _V]):
def __init__(self, tree: "_NNTree[_K, _V]") -> None:
self._tree = tree
def __getitem__(self, key: _K) -> _V:
source = self._tree._get_source_items()
if source is not None:
return self._tree._into_output_value(source[key])
items = self._tree._get_items() or PdfArray()
for current_key, value in batched(items, 2, strict=True):
if self._tree._into_output_key(current_key) == key:
return self._tree._into_output_value(value)
raise KeyError(key)
def __setitem__(self, key: _K, value: _V) -> None:
source = self._tree._get_source_items()
if source is not None:
source[key] = value
self._tree._sync_items_from_source()
return
items = self._tree._get_items()
if items is None:
self._tree._set_items(items := PdfArray())
# set key if present
for index, current_key in enumerate(items[::2]):
if self._tree._into_output_key(current_key) == key:
items[2 * index + 1] = self._tree._into_input_value(value)
return
# add key otherwise
for index in range(0, len(items), 2):
current_key = items[index]
if self._tree._into_output_key(current_key) > key:
items[index:index] = [
self._tree._into_input_key(key),
self._tree._into_input_value(value),
]
break
else:
items.extend([self._tree._into_input_key(key), self._tree._into_input_value(value)])
self._tree._compute_and_set_limits()
def __delitem__(self, key: _K) -> None:
source = self._tree._get_source_items()
if source is not None:
del source[key]
self._tree._sync_items_from_source()
return
items = self._tree._get_items() or PdfArray()
for index, current_key in enumerate(items[::2]):
if self._tree._into_output_key(current_key) == key:
del items[2 * index : 2 * index + 2]
return
raise KeyError(key)
def __iter__(self) -> Iterator[_K]:
source = self._tree._get_source_items()
if source is not None:
yield from source
return
items = self._tree._get_items() or PdfArray()
for key, _ in batched(items, 2, strict=True):
yield self._tree._into_output_key(key)
def __len__(self) -> int:
source = self._tree._get_source_items()
if source is not None:
return len(source)
items = self._tree._get_items() or PdfArray()
return len(items) // 2
def __repr__(self) -> str:
return repr(self._tree._get_items_map())
_NT = TypeVar("_NT", bound="_NNTree")
class _NNTreeKidList(MutableSequence[_NT], Generic[_NT]):
def __init__(self, arr: PdfArray, tree_type: type[_NT], *, parent: _NT | None = None) -> None:
self._arr = arr
self._parent = parent
self._tree_type = tree_type
def _get_source_kids(self) -> MutableSequence[_NT] | None:
if self._parent is not None:
return self._parent._get_source_kids()
def _sync_from_source(self) -> None:
source = self._get_source_kids()
if source is None:
return
assert self._parent is not None
self._parent._sync_kids_from_source()
self._arr = cast(PdfArray[PdfObject], self._parent._raw["Kids"])
def _decode_type(self, obj: PdfObject) -> _NT:
assert isinstance(obj, PdfDictionary)
return self._tree_type.from_dict(obj, parent=self._parent)
def _encode_type(self, obj: _NT) -> PdfObject:
return obj._raw
@overload
def __getitem__(self, index: SupportsIndex) -> _NT: ...
@overload
def __getitem__(self, index: slice) -> MutableSequence[_NT]: ...
def __getitem__(self, index: SupportsIndex | slice) -> _NT | MutableSequence[_NT]:
self._sync_from_source()
if isinstance(index, slice):
return [self._decode_type(it) for it in self._arr[index]]
return self._decode_type(self._arr[index])
def __len__(self) -> int:
self._sync_from_source()
return len(self._arr)
def insert(self, index: int, value: _NT) -> None:
self._sync_from_source()
source = self._get_source_kids()
if source is not None:
source.insert(index, value)
self._sync_from_source()
return
self._arr.insert(index, self._encode_type(value))
@overload
def __setitem__(self, index: SupportsIndex, value: _NT) -> None: ...
@overload
def __setitem__(self, index: slice, value: Iterable[_NT]) -> None: ...
def __setitem__(self, index: SupportsIndex | slice, value: _NT | Iterable[_NT]) -> None:
self._sync_from_source()
source = self._get_source_kids()
if source is not None:
source[index] = value
self._sync_from_source()
return
if isinstance(index, slice):
assert isinstance(value, Iterable)
self._arr[index] = [self._encode_type(it) for it in value]
else:
value = cast(_NT, value)
self._arr[index] = self._encode_type(value)
def __delitem__(self, index: SupportsIndex | slice) -> None:
self._sync_from_source()
source = self._get_source_kids()
if source is not None:
del source[index]
self._sync_from_source()
return
del self._arr[index]
class _NNTree(ABC, Generic[_K, _V], MutableMapping[_K, _V]):
@classmethod
def from_dict(cls, data: PdfDictionary, *, parent: "_NNTree[_K, _V] | None" = None) -> Self:
tree = cls(parent=parent)
tree._raw = data
return tree
def __init__(
self,
items: dict[_K, _V] | None = None,
kids: MutableSequence[Self] | None = None,
limits: tuple[_K, _V] | None = None,
*,
parent: "_NNTree[_K, _V] | None" = None,
) -> None:
"""
Initializes a tree object.
All arguments are optional. ``items`` and ``kids`` are mutually exclusive
and shall not be specified both at once. If ``limits`` is not provided,
it is computed based on the provided items or children, where applicable.
Arguments:
items: The mapping of items contained in this node.
kids: The immediate children nodes of this node.
limits: The least and greatest keys of this node.
"""
self.parent = parent
self._raw = PdfDictionary()
self._source_items: MutableMapping[_K, _V] | None = None
self._source_kids: MutableSequence[Self] | None = None
if items is not None:
self._set_items_map(items)
self.kids = kids
if limits is not None:
self._raw["Limits"] = PdfArray([self._into_output_key(lim) for lim in limits])
else:
self._compute_and_set_limits()
def _get_source_items(self) -> MutableMapping[_K, _V] | None:
return self._source_items
def _get_source_kids(self) -> MutableSequence[Self] | None:
return self._source_kids
def _sync_items_from_source(self) -> None:
source = self._source_items
if source is None:
return
items = PdfArray()
for key, value in sorted(source.items(), key=lambda item: item[0]):
items.extend([self._into_input_key(key), self._into_input_value(value)])
self._set_items(items)
def _sync_kids_from_source(self) -> None:
source = self._source_kids
if source is None:
return
kids = PdfArray([kid._raw for kid in sorted(source, key=lambda k: k.limits or ())])
for kid in source:
kid.parent = self
self._raw["Kids"] = kids
@abstractmethod
def _get_items(self) -> PdfArray[PdfObject] | None:
raise NotImplementedError
@abstractmethod
def _set_items(self, items: PdfArray[PdfObject] | None) -> None:
raise NotImplementedError
@abstractmethod
def _into_output_key(self, key: PdfObject) -> _K:
return cast(_K, key)
@abstractmethod
def _into_output_value(self, value: PdfObject) -> _V:
return cast(_V, value)
@abstractmethod
def _into_input_value(self, value: _V) -> PdfObject:
return cast(PdfObject, value)
@abstractmethod
def _into_input_key(self, key: _K) -> PdfObject:
return cast(PdfObject, key)
@abstractmethod
def _is_valid_key(self, key: object) -> bool:
raise NotImplementedError
@property
def kids(self) -> "_NNTreeKidList[Self] | None":
"""The immediate children of this node."""
if self._get_source_kids() is not None:
self._sync_kids_from_source()
kids = self._raw.get("Kids")
if is_null_like(kids):
return
kids = cast(PdfArray, kids)
return _NNTreeKidList[Self](kids, type(self), parent=self)
@kids.setter
def kids(self, value: MutableSequence[Self] | None) -> None:
if value is None:
self._source_kids = None
self._raw.pop("Kids", None)
return
if self._get_items() is not None:
raise ValueError("cannot set kids because items are already present")
self._source_kids = value
self._sync_kids_from_source()
@property
def limits(self) -> tuple[_K, _K] | None:
"""Two items representing the least and greatest keys included in the
key-value pairs of the tree and any of its descendants."""
limits = self._raw.get("Limits")
if is_null_like(limits):
return
limits = cast(PdfArray, limits)
return self._into_output_key(limits[0]), self._into_output_key(limits[1])
def walk(self, compare_key: _K | None = None) -> Generator[tuple[_K, _V]]:
"""Walks the tree and yields the key-value pairs as found.
When ``compare_key`` is specified, trees will be skipped if the comparison
key does not fall within the range of the tree's :attr:`.limits` value.
Raises :exc:`ValueError` if the tree contains both nodes and key-value pairs.
"""
if self._source_items is not None:
for key, value in self._source_items.items():
yield (self._into_output_key(key), self._into_output_value(value))
return
if self.kids is not None and self._get_items() is not None:
raise ValueError("nodes and items in tree are mutually exclusive")
items = iter(self._get_items() or [])
while not is_null_like(key := next(items, None)):
assert key is not None
value = next(items, None)
if value is None:
break
yield (self._into_output_key(key), self._into_output_value(value))
for tree in self.kids or []:
if compare_key is not None and tree.limits is not None:
first, last = tree.limits
compare_key = cast(Any, compare_key)
first = cast(Any, first)
last = cast(Any, last)
if compare_key < first or last < compare_key:
continue
yield from tree.walk(compare_key)
def find_leaf(self, target_key: _K) -> Self:
"""Finds the leaf node that contains ``target_key`` or otherwise the
closest leaf node that can contain ``target_key``."""
if not self.kids:
return self
closest = None
for tree in self.kids:
if tree.limits is None and (limits := tree._compute_limits()):
tree._raw["Limits"] = limits
assert tree.limits is not None
first, last = tree.limits
first = cast(Any, first)
last = cast(Any, last)
if first <= target_key <= last or target_key < first:
return tree.find_leaf(target_key)
closest = tree
assert closest is not None
return closest.find_leaf(target_key)
def _get_items_map(self) -> dict[_K, _V] | None:
if self._source_items is not None:
self._sync_items_from_source()
return dict(self._source_items)
items = self._get_items()
if items is None:
return
items_map: dict[_K, _V] = {}
for key, value in batched(items, 2, strict=True):
key = self._into_output_key(key)
items_map[key] = self._into_output_value(value)
return items_map
def _set_items_map(self, mapping: dict[_K, _V] | None) -> None:
self._source_items = mapping
if mapping is None:
self._set_items(None)
return
self._sync_items_from_source()
def _compute_limits(self) -> PdfArray[PdfObject] | None:
if self.parent is None:
return
first_item = next(self.walk(), None)
if first_item is None:
return
lower_limit = upper_limit = first_item[0]
for key, _ in self.walk():
key = cast(Any, key)
if key < lower_limit:
lower_limit = key
if key > upper_limit:
upper_limit = key
return PdfArray([self._into_input_key(lower_limit), self._into_input_key(upper_limit)])
def _compute_and_set_limits(self) -> None:
limits = self._compute_limits()
if not limits:
self._raw.pop("Limits", None)
else:
self._raw["Limits"] = limits
def _update_node_limits(self, node: "_NNTree[_K, _V]") -> None:
while node.parent is not None:
node._compute_and_set_limits()
node = node.parent
def __iter__(self) -> Iterator[_K]:
for key, _ in self.walk():
yield key
def __len__(self) -> int:
return sum(1 for _ in self)
def __getitem__(self, key: _K) -> _V:
for ret_key, value in self.walk(key):
if ret_key == key:
return value
raise KeyError(key) from None
def __setitem__(self, key: _K, value: _V) -> None:
leaf = self.find_leaf(key)
items = _FlatMapping(leaf)
items[key] = value
self._update_node_limits(leaf)
def __delitem__(self, key: _K) -> None:
leaf = self.find_leaf(key)
items = _FlatMapping(leaf)
del items[key]
self._update_node_limits(leaf)
def __contains__(self, key: object) -> bool:
if not self._is_valid_key(key):
return False
if not isinstance(key, PdfObject):
return False
key = self._into_output_key(key)
for ret_key, _ in self.walk(key):
if ret_key == key:
return True
return False
[docs]
class NameTree(Generic[_V], _NNTree[bytes, _V]):
"""A tree object associating string keys with values. See ISO 32000-2:2020 §
7.9.6 "Name trees"."""
def _get_items(self) -> PdfArray[PdfObject] | None:
self._sync_items_from_source()
names = self._raw.get("Names")
if is_null_like(names):
return
return cast(PdfArray, names)
def _set_items(self, items: PdfArray[PdfObject] | None) -> None:
if items is None:
self._raw.pop("Names", None)
return
if not is_null_like(self._raw.get("Kids")):
raise ValueError("cannot set names because kids is already present")
self._raw["Names"] = items
self._compute_and_set_limits()
def _into_input_key(self, key: bytes) -> PdfObject:
return cast(PdfObject, key)
def _into_input_value(self, value: _V) -> PdfObject:
return cast(PdfObject, value)
def _into_output_key(self, key: PdfObject) -> bytes:
assert self._is_valid_key(key)
return into_bytes(key)
def _into_output_value(self, value: PdfObject) -> _V:
return cast(_V, value)
def _is_valid_key(self, key: object) -> TypeGuard[PdfHexString | bytes]:
return isinstance(key, PdfHexString | bytes)
@property
def names(self) -> MutableMapping[bytes, _V] | None:
"""The key-value pairs of this tree node."""
if self._get_items() is not None:
return _FlatMapping(self)
@names.setter
def names(self, value: dict[bytes, _V] | None) -> None:
self._set_items_map(value)
[docs]
class NumberTree(Generic[_V], _NNTree[int, _V]):
"""A tree object associating integer keys with values. See ISO 32000-2:2020 §
7.9.7 "Number trees"."""
def _get_items(self) -> PdfArray[PdfObject] | None:
self._sync_items_from_source()
nums = self._raw.get("Nums")
if is_null_like(nums):
return
return cast(PdfArray, nums)
def _set_items(self, items: PdfArray[PdfObject] | None) -> None:
if items is None:
self._raw.pop("Nums", None)
return
if not is_null_like(self._raw.get("Kids")):
raise ValueError("cannot set nums because kids is already present")
self._raw["Nums"] = items
self._compute_and_set_limits()
def _into_input_key(self, key: int) -> PdfObject:
return cast(PdfObject, key)
def _into_input_value(self, value: _V) -> PdfObject:
return cast(PdfObject, value)
def _into_output_key(self, key: PdfObject) -> int:
assert self._is_valid_key(key)
return key
def _into_output_value(self, value: PdfObject) -> _V:
return cast(_V, value)
def _is_valid_key(self, key: object) -> TypeGuard[int]:
return isinstance(key, int)
@property
def nums(self) -> MutableMapping[int, _V] | None:
"""The key-value pairs of this tree node."""
if self._get_items() is not None:
return _FlatMapping(self)
@nums.setter
def nums(self, value: dict[int, _V] | None) -> None:
self._set_items_map(value)