import zlib
from base64 import a85decode, a85encode, b16decode, b16encode
from itertools import groupby
from math import ceil, floor
from typing import TYPE_CHECKING, Protocol, cast
from .common._utils import batched
from .cos.objects import PdfDictionary, PdfName, PdfReference
from .cos.tokenizer import WHITESPACE
from .exceptions import PdfFilterError
if TYPE_CHECKING:
from .security.standard_handler import StandardSecurityHandler
[docs]
class PdfFilter(Protocol):
[docs]
def decode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: ...
[docs]
def encode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: ...
[docs]
class ASCIIHexFilter(PdfFilter):
"""Filter for hexadecimal strings. EOD is '>'.
See ISO 32000-2:2020 § 7.4.2 "ASCIIHexDecode Filter" for details.
This filter does not take any parameters. ``params`` will be ignored.
"""
[docs]
def decode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes:
if contents[-1:] != b">":
raise PdfFilterError("ASCIIHex: EOD not at end of stream.")
hexdata = bytearray(ch for ch in contents[:-1] if ch not in WHITESPACE)
return b16decode(hexdata, casefold=True)
[docs]
def encode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes:
return b16encode(contents) + b">"
[docs]
class ASCII85Filter(PdfFilter):
"""Filter for Adobe's ASCII85 implementation. EOD is '~>'.
See ISO 32000-2:2020 § 7.4.3 "ASCII85Decode Filter" for details.
This filter does not take any parameters. ``params`` will be ignored.
"""
[docs]
def decode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes:
return a85decode(contents, ignorechars=WHITESPACE, adobe=True)
[docs]
def encode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes:
# we do not need the starting delimiter with PDFs
return a85encode(contents, adobe=True)[2:]
[docs]
class RunLengthFilter(PdfFilter):
"""Filter for a form of byte-oriented run-length encoding (RLE) scheme resembling
the Apple PackBits format (see ISO 32000-2:2020 § 7.4.5 "RunLengthDecode Filter").
In this filter, data is formatted as a sequence of runs. Each run starts with a length
byte and is followed by 1 to 128 bytes of data.
- If the length byte is in the range 0 to 127, the following ``length byte + 1`` \
bytes shall be copied exactly.
- If the length byte is in the range 129 to 255, the following byte shall be copied \
``257 - length`` bytes.
- A length byte of 128 means EOD.
Implementation note: encoding is performed using a threshold determined by the
average of the lengths of each run. Values under such threshold are copied.
Values over such threshold are repeated.
This filter does not take any parameters. ``params`` will be ignored.
"""
[docs]
def decode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes:
idx = 0
output = bytes()
while idx < len(contents):
lenbyte = contents[idx]
idx += 1
if 0 <= lenbyte <= 127:
output += contents[idx : idx + lenbyte + 1]
idx += lenbyte + 1
elif 129 <= lenbyte <= 255:
output += bytes(contents[idx] for _ in range(257 - lenbyte))
idx += 1
elif lenbyte == 128:
break
return output
def _encode_repeat_runs(self, runs: list[bytes]) -> bytes:
output = b""
for run in runs:
for batch in batched(run, 128):
if not batch:
continue
batch_len = len(batch)
if batch_len < 2:
# 257 - 1 is 256 which wouldn't fit in a byte
# so simply use the "copying" method for this batch
byte = (batch_len - 1).to_bytes(1, "big")
data = b"".join(item.to_bytes(1, "big") for item in batch)
output += byte + data
continue
# repeat the first char at desire
byte = (257 - batch_len).to_bytes(1, "big")
output += byte + run[:1]
return output
def _encode_copy_run(self, run: bytes) -> bytes:
output = b""
for batch in batched(run, 128):
if not batch:
continue
length_byte = (len(batch) - 1).to_bytes(1, "big")
copy_bytes = b"".join(item.to_bytes(1, "big") for item in batch)
output += length_byte + copy_bytes
return output
[docs]
def encode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes:
# perform typical rle first than decode it.
runs = [(len(list(group)), val.to_bytes(1, "big")) for val, group in groupby(contents)]
decoded_runs = (length * val for length, val in runs)
# grouping runs by len helps merge runs together if the "copying" method is selected.
runs_by_len = [(key, list(run)) for key, run in groupby(decoded_runs, key=len)]
# values above this threshold are encoded using the "repeating" method.
# values below are encoded using the "copying" method.
# this is the first heuristic that came to mind and it seems to work decently.
run_length_threshold = sum(length for length, _ in runs) / len(runs)
final_output = b""
for run_length, runs in runs_by_len:
if run_length > run_length_threshold:
# above this threshold we use the "repeating" method
final_output += self._encode_repeat_runs(runs)
else:
# below this threshold, use the "copying" method
# merge the runs first though
final_output += self._encode_copy_run(b"".join(runs))
final_output += b"\x80"
return final_output
[docs]
class FlateFilter(PdfFilter):
"""Filter for zlib/deflate compression (see ISO 32000-2:2020 § 7.4.4 "LZWDecode and
FlateDecode Filters").
This filter supports predictors which can increase predictability of data and hence
improve compression. 2 predictor groups are supported by the spec: the PNG filters
defined in § 9. Filtering of the PNG spec and TIFF Predictor 2 defined in the TIFF
6.0 spec and which is currently unimplemented.
The predictor is specified by means of the Predictor key in ``params`` (default: 1).
If the Predictor is not 1, the following parameters can be provided:
- **Colors**: Amount of color components per sample. Can be any value greater \
than 1 (default: 1).
- **BitsPerComponent**: Bit length of each of the color components. \
Possible values are: 1, 2, 4, 8 (default), and 16.
- **Columns**: Amount of samples per row. Can be any value greater than 1 \
(default: 1).
Given these values, the length of a sample in bytes is given by
``Length(Sample) = ceil((Colors * BitsPerComponent) / 8)``
and the length of a row is given by
``Length(Row) = Length(Sample) * Columns``
"""
[docs]
def decode(self, contents: bytes, *, params: PdfDictionary[str, int] | None = None) -> bytes: # pyright: ignore[reportIncompatibleMethodOverride]
if params is None:
params = PdfDictionary()
uncomp = zlib.decompress(contents, 0)
# No predictor applied, return uncompressed.
if (predictor := params.get("Predictor", 1)) == 1:
return uncomp
cols = params.get("Columns", 1)
colors = params.get("Colors", 1)
bpc = params.get("BitsPerComponent", 8)
if predictor == 2:
raise PdfFilterError("FlateDecode: TIFF Predictor 2 not supported.")
elif 10 <= predictor <= 15:
return bytes(self._undo_png_prediction(bytearray(uncomp), cols, colors, bpc))
else:
raise PdfFilterError(f"FlateDecode: Predictor {predictor} not supported.")
[docs]
def encode(self, contents: bytes, *, params: PdfDictionary[str, int] | None = None) -> bytes: # pyright: ignore[reportIncompatibleMethodOverride]
if params is None:
params = PdfDictionary()
if (predictor := params.get("Predictor", 1)) == 1:
return zlib.compress(contents)
cols = params.get("Columns", 1)
colors = params.get("Colors", 1)
bpc = params.get("BitsPerComponent", 8)
if predictor == 2:
raise PdfFilterError("FlateDecode: TIFF Predictor 2 not supported.")
elif 10 <= predictor <= 15:
return zlib.compress(
self._apply_png_prediction(bytearray(contents), predictor - 10, cols, colors, bpc)
)
else:
raise PdfFilterError(f"FlateDecode: Predictor {predictor} not supported.")
def _predict_paeth(self, a: int, b: int, c: int) -> int:
p = a + b - c
pa = abs(p - a)
pb = abs(p - b)
pc = abs(p - c)
if pa <= pb and pa <= pc:
return a
elif pb <= pc:
return b
else:
return c
def _process_png_row(
self,
encode: bool,
row: bytearray,
filter_type: int,
previous: bytearray,
sample_length: int,
) -> bytearray:
for c in range(len(row)):
# (Fig. 19 in the PNG spec)
# cur_byte is x, byte_left is a, byte_up is b, byte_up_left is c
cur_byte = row[c]
byte_left = row[c - sample_length] if c >= sample_length else 0
byte_up = previous[c]
byte_up_left = previous[c - sample_length] if c >= sample_length else 0
if filter_type == 0: # None
char = cur_byte
elif filter_type == 1: # Sub
char = cur_byte - byte_left if encode else cur_byte + byte_left
elif filter_type == 2: # Up
char = cur_byte - byte_up if encode else cur_byte + byte_up
elif filter_type == 3: # Average
avg = floor((byte_left + byte_up) / 2)
char = cur_byte - avg if encode else cur_byte + avg
elif filter_type == 4: # Paeth
paeth = self._predict_paeth(byte_left, byte_up, byte_up_left)
char = cur_byte - paeth if encode else cur_byte + paeth
else:
raise PdfFilterError(
f"FlateDecode [png]: Row uses unsupported filter {filter_type}"
)
row[c] = char % 256 if filter_type else char
return row
def _undo_png_prediction(
self, filtered: bytearray, cols: int, colors: int, bpc: int
) -> bytearray:
sample_length = ceil(colors * bpc / 8)
row_length = sample_length * cols
previous = bytearray([0] * row_length)
output = bytearray()
# 1 + row_length because the first byte is the filter type
for r in range(0, len(filtered), 1 + row_length):
filter_type = filtered[r]
decoded = self._process_png_row(
False,
filtered[r + 1 : r + 1 + row_length],
filter_type,
previous,
sample_length,
)
output.extend(decoded)
previous = decoded.copy()
return output
def _apply_png_prediction(
self, to_filter: bytearray, filter_type: int, cols: int, colors: int, bpc: int
) -> bytearray:
sample_length = ceil(colors * bpc / 8)
row_length = sample_length * cols
previous = bytearray([0] * row_length)
output = bytearray()
for r in range(0, len(to_filter), row_length):
row = to_filter[r : r + row_length]
if 0 <= filter_type <= 4:
encoded = self._process_png_row(True, row, filter_type, previous, sample_length)
output.extend(filter_type.to_bytes(1, "big") + encoded)
elif filter_type == 5: # Optimum
# TODO: we will default optimum to be paeth for now
# TODO: implement actual heuristic
encoded = self._process_png_row(True, row, 4, previous, sample_length)
output.extend((4).to_bytes(1, "big") + row)
else:
raise PdfFilterError(
f"FlateDecode [png]: Row uses unsupported filter {filter_type}"
)
previous = to_filter[r : r + row_length].copy()
return output
# TODO: Please test
[docs]
class CryptFetchFilter(PdfFilter):
"""Filter for encrypted streams (see ISO 32000-2:2020 § 7.4.10 "Crypt Filter").
This filter takes two optional parameters: ``Type``, which defines the decode parameters
as being for this filter; and ``Name``, which defines what filter should be used to
decrypt the stream.
This filter requires 3 additional parameters. These parameters are for use exclusively
within the PDF processor and shall not be written to the document.
- **Handler**: An instance of the security handler.
- **EncryptionKey**: The encryption key generated from the security handler.
- **Reference**: The indirect reference of the object to decrypt.
"""
[docs]
def encode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: # pyright: ignore[reportIncompatibleMethodOverride]
raise NotImplementedError("Crypt: Encrypting streams not implemented.")
[docs]
def decode(self, contents: bytes, *, params: PdfDictionary | None = None) -> bytes: # pyright: ignore[reportIncompatibleMethodOverride]
if params is None:
raise ValueError("Crypt: This filter requires parameters.")
cf_name = cast(PdfName, params.get("Name", PdfName(b"Identity")))
if cf_name.value == b"Identity":
return contents
handler = cast("StandardSecurityHandler", params["Handler"])
crypt_filter = cast(PdfDictionary, handler.encryption.get("CF", PdfDictionary())).get(
cf_name.value.decode()
)
return handler.decrypt_object(
cast(bytes, params["EncryptionKey"]),
contents,
cast(PdfReference, params.data["Reference"]),
crypt_filter=cast("PdfDictionary | None", crypt_filter),
)
SUPPORTED_FILTERS: dict[bytes, type[PdfFilter]] = {
b"FlateDecode": FlateFilter,
b"ASCII85Decode": ASCII85Filter,
b"ASCIIHexDecode": ASCIIHexFilter,
b"RunLengthDecode": RunLengthFilter,
b"Crypt": CryptFetchFilter,
}