Source code for pdfnaut.filters

import zlib
from base64 import a85decode, a85encode, b16decode, b16encode
from itertools import groupby
from math import ceil, floor
from typing import TYPE_CHECKING, Protocol, cast

from .common._utils import batched
from .cos.objects import PdfDictionary, PdfName, PdfReference
from .cos.tokenizer import WHITESPACE
from .exceptions import PdfFilterError

if TYPE_CHECKING:
    from .security.standard_handler import StandardSecurityHandler


[docs] class PdfFilter(Protocol):
[docs] def decode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: ...
[docs] def encode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: ...
[docs] class ASCIIHexFilter(PdfFilter): """Filter for hexadecimal strings. EOD is '>'. See ISO 32000-2:2020 § 7.4.2 "ASCIIHexDecode Filter" for details. This filter does not take any parameters. ``params`` will be ignored. """
[docs] def decode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: if contents[-1:] != b">": raise PdfFilterError("ASCIIHex: EOD not at end of stream.") hexdata = bytearray(ch for ch in contents[:-1] if ch not in WHITESPACE) return b16decode(hexdata, casefold=True)
[docs] def encode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: return b16encode(contents) + b">"
[docs] class ASCII85Filter(PdfFilter): """Filter for Adobe's ASCII85 implementation. EOD is '~>'. See ISO 32000-2:2020 § 7.4.3 "ASCII85Decode Filter" for details. This filter does not take any parameters. ``params`` will be ignored. """
[docs] def decode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: return a85decode(contents, ignorechars=WHITESPACE, adobe=True)
[docs] def encode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: # we do not need the starting delimiter with PDFs return a85encode(contents, adobe=True)[2:]
[docs] class RunLengthFilter(PdfFilter): """Filter for a form of byte-oriented run-length encoding (RLE) scheme resembling the Apple PackBits format (see ISO 32000-2:2020 § 7.4.5 "RunLengthDecode Filter"). In this filter, data is formatted as a sequence of runs. Each run starts with a length byte and is followed by 1 to 128 bytes of data. - If the length byte is in the range 0 to 127, the following ``length byte + 1`` \ bytes shall be copied exactly. - If the length byte is in the range 129 to 255, the following byte shall be copied \ ``257 - length`` bytes. - A length byte of 128 means EOD. Implementation note: encoding is performed using a threshold determined by the average of the lengths of each run. Values under such threshold are copied. Values over such threshold are repeated. This filter does not take any parameters. ``params`` will be ignored. """
[docs] def decode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: idx = 0 output = bytes() while idx < len(contents): lenbyte = contents[idx] idx += 1 if 0 <= lenbyte <= 127: output += contents[idx : idx + lenbyte + 1] idx += lenbyte + 1 elif 129 <= lenbyte <= 255: output += bytes(contents[idx] for _ in range(257 - lenbyte)) idx += 1 elif lenbyte == 128: break return output
def _encode_repeat_runs(self, runs: list[bytes]) -> bytes: output = b"" for run in runs: for batch in batched(run, 128): if not batch: continue batch_len = len(batch) if batch_len < 2: # 257 - 1 is 256 which wouldn't fit in a byte # so simply use the "copying" method for this batch byte = (batch_len - 1).to_bytes(1, "big") data = b"".join(item.to_bytes(1, "big") for item in batch) output += byte + data continue # repeat the first char at desire byte = (257 - batch_len).to_bytes(1, "big") output += byte + run[:1] return output def _encode_copy_run(self, run: bytes) -> bytes: output = b"" for batch in batched(run, 128): if not batch: continue length_byte = (len(batch) - 1).to_bytes(1, "big") copy_bytes = b"".join(item.to_bytes(1, "big") for item in batch) output += length_byte + copy_bytes return output
[docs] def encode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: # perform typical rle first than decode it. runs = [(len(list(group)), val.to_bytes(1, "big")) for val, group in groupby(contents)] decoded_runs = (length * val for length, val in runs) # grouping runs by len helps merge runs together if the "copying" method is selected. runs_by_len = [(key, list(run)) for key, run in groupby(decoded_runs, key=len)] # values above this threshold are encoded using the "repeating" method. # values below are encoded using the "copying" method. # this is the first heuristic that came to mind and it seems to work decently. run_length_threshold = sum(length for length, _ in runs) / len(runs) final_output = b"" for run_length, runs in runs_by_len: if run_length > run_length_threshold: # above this threshold we use the "repeating" method final_output += self._encode_repeat_runs(runs) else: # below this threshold, use the "copying" method # merge the runs first though final_output += self._encode_copy_run(b"".join(runs)) final_output += b"\x80" return final_output
[docs] class FlateFilter(PdfFilter): """Filter for zlib/deflate compression (see ISO 32000-2:2020 § 7.4.4 "LZWDecode and FlateDecode Filters"). This filter supports predictors which can increase predictability of data and hence improve compression. 2 predictor groups are supported by the spec: the PNG filters defined in § 9. Filtering of the PNG spec and TIFF Predictor 2 defined in the TIFF 6.0 spec and which is currently unimplemented. The predictor is specified by means of the Predictor key in ``params`` (default: 1). If the Predictor is not 1, the following parameters can be provided: - **Colors**: Amount of color components per sample. Can be any value greater \ than 1 (default: 1). - **BitsPerComponent**: Bit length of each of the color components. \ Possible values are: 1, 2, 4, 8 (default), and 16. - **Columns**: Amount of samples per row. Can be any value greater than 1 \ (default: 1). Given these values, the length of a sample in bytes is given by ``Length(Sample) = ceil((Colors * BitsPerComponent) / 8)`` and the length of a row is given by ``Length(Row) = Length(Sample) * Columns`` """
[docs] def decode(self, contents: bytes, *, params: PdfDictionary[str, int] | None = None) -> bytes: # pyright: ignore[reportIncompatibleMethodOverride] if params is None: params = PdfDictionary() uncomp = zlib.decompress(contents, 0) # No predictor applied, return uncompressed. if (predictor := params.get("Predictor", 1)) == 1: return uncomp cols = params.get("Columns", 1) colors = params.get("Colors", 1) bpc = params.get("BitsPerComponent", 8) if predictor == 2: raise PdfFilterError("FlateDecode: TIFF Predictor 2 not supported.") elif 10 <= predictor <= 15: return bytes(self._undo_png_prediction(bytearray(uncomp), cols, colors, bpc)) else: raise PdfFilterError(f"FlateDecode: Predictor {predictor} not supported.")
[docs] def encode(self, contents: bytes, *, params: PdfDictionary[str, int] | None = None) -> bytes: # pyright: ignore[reportIncompatibleMethodOverride] if params is None: params = PdfDictionary() if (predictor := params.get("Predictor", 1)) == 1: return zlib.compress(contents) cols = params.get("Columns", 1) colors = params.get("Colors", 1) bpc = params.get("BitsPerComponent", 8) if predictor == 2: raise PdfFilterError("FlateDecode: TIFF Predictor 2 not supported.") elif 10 <= predictor <= 15: return zlib.compress( self._apply_png_prediction(bytearray(contents), predictor - 10, cols, colors, bpc) ) else: raise PdfFilterError(f"FlateDecode: Predictor {predictor} not supported.")
def _predict_paeth(self, a: int, b: int, c: int) -> int: p = a + b - c pa = abs(p - a) pb = abs(p - b) pc = abs(p - c) if pa <= pb and pa <= pc: return a elif pb <= pc: return b else: return c def _process_png_row( self, encode: bool, row: bytearray, filter_type: int, previous: bytearray, sample_length: int, ) -> bytearray: for c in range(len(row)): # (Fig. 19 in the PNG spec) # cur_byte is x, byte_left is a, byte_up is b, byte_up_left is c cur_byte = row[c] byte_left = row[c - sample_length] if c >= sample_length else 0 byte_up = previous[c] byte_up_left = previous[c - sample_length] if c >= sample_length else 0 if filter_type == 0: # None char = cur_byte elif filter_type == 1: # Sub char = cur_byte - byte_left if encode else cur_byte + byte_left elif filter_type == 2: # Up char = cur_byte - byte_up if encode else cur_byte + byte_up elif filter_type == 3: # Average avg = floor((byte_left + byte_up) / 2) char = cur_byte - avg if encode else cur_byte + avg elif filter_type == 4: # Paeth paeth = self._predict_paeth(byte_left, byte_up, byte_up_left) char = cur_byte - paeth if encode else cur_byte + paeth else: raise PdfFilterError( f"FlateDecode [png]: Row uses unsupported filter {filter_type}" ) row[c] = char % 256 if filter_type else char return row def _undo_png_prediction( self, filtered: bytearray, cols: int, colors: int, bpc: int ) -> bytearray: sample_length = ceil(colors * bpc / 8) row_length = sample_length * cols previous = bytearray([0] * row_length) output = bytearray() # 1 + row_length because the first byte is the filter type for r in range(0, len(filtered), 1 + row_length): filter_type = filtered[r] decoded = self._process_png_row( False, filtered[r + 1 : r + 1 + row_length], filter_type, previous, sample_length, ) output.extend(decoded) previous = decoded.copy() return output def _apply_png_prediction( self, to_filter: bytearray, filter_type: int, cols: int, colors: int, bpc: int ) -> bytearray: sample_length = ceil(colors * bpc / 8) row_length = sample_length * cols previous = bytearray([0] * row_length) output = bytearray() for r in range(0, len(to_filter), row_length): row = to_filter[r : r + row_length] if 0 <= filter_type <= 4: encoded = self._process_png_row(True, row, filter_type, previous, sample_length) output.extend(filter_type.to_bytes(1, "big") + encoded) elif filter_type == 5: # Optimum # TODO: we will default optimum to be paeth for now # TODO: implement actual heuristic encoded = self._process_png_row(True, row, 4, previous, sample_length) output.extend((4).to_bytes(1, "big") + row) else: raise PdfFilterError( f"FlateDecode [png]: Row uses unsupported filter {filter_type}" ) previous = to_filter[r : r + row_length].copy() return output
# TODO: Please test
[docs] class CryptFetchFilter(PdfFilter): """Filter for encrypted streams (see ISO 32000-2:2020 § 7.4.10 "Crypt Filter"). This filter takes two optional parameters: ``Type``, which defines the decode parameters as being for this filter; and ``Name``, which defines what filter should be used to decrypt the stream. This filter requires 3 additional parameters. These parameters are for use exclusively within the PDF processor and shall not be written to the document. - **Handler**: An instance of the security handler. - **EncryptionKey**: The encryption key generated from the security handler. - **Reference**: The indirect reference of the object to decrypt. """
[docs] def encode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: # pyright: ignore[reportIncompatibleMethodOverride] raise NotImplementedError("Crypt: Encrypting streams not implemented.")
[docs] def decode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: # pyright: ignore[reportIncompatibleMethodOverride] if params is None: raise ValueError("Crypt: This filter requires parameters.") cf_name = cast(PdfName, params.get("Name", PdfName(b"Identity"))) if cf_name.value == b"Identity": return contents handler = cast("StandardSecurityHandler", params["Handler"]) crypt_filter = cast(PdfDictionary, handler.encryption.get("CF", PdfDictionary())).get( cf_name.value.decode() ) return handler.decrypt_object( cast(bytes, params["EncryptionKey"]), contents, cast(PdfReference, params.data["Reference"]), crypt_filter=cast("PdfDictionary | None", crypt_filter), )
SUPPORTED_FILTERS: dict[bytes, type[PdfFilter]] = { b"FlateDecode": FlateFilter, b"ASCII85Decode": ASCII85Filter, b"ASCIIHexDecode": ASCIIHexFilter, b"RunLengthDecode": RunLengthFilter, b"Crypt": CryptFetchFilter, }