From 78fb7be3b73acaed870dd414f04c9a72060a531e Mon Sep 17 00:00:00 2001 From: BuildTools Date: Sat, 30 Nov 2024 12:55:39 +0100 Subject: [PATCH] + v3 PRD device format + Reprovisioning CLI function for v3 --- pyplayready/__init__.py | 2 +- pyplayready/bcert.py | 54 +++++++++++++++++++++-- pyplayready/device.py | 54 ++++++++++++++++++----- pyplayready/ecc_key.py | 23 +--------- pyplayready/exceptions.py | 8 ++++ pyplayready/main.py | 91 +++++++++++++++++++++++++++++++-------- pyproject.toml | 2 +- 7 files changed, 179 insertions(+), 55 deletions(-) diff --git a/pyplayready/__init__.py b/pyplayready/__init__.py index cc207d3..f71132f 100644 --- a/pyplayready/__init__.py +++ b/pyplayready/__init__.py @@ -10,4 +10,4 @@ from .session import * from .xml_key import * from .xmrlicense import * -__version__ = "0.3.9" +__version__ = "0.4.0" diff --git a/pyplayready/bcert.py b/pyplayready/bcert.py index fac1fd5..2bb25ce 100644 --- a/pyplayready/bcert.py +++ b/pyplayready/bcert.py @@ -1,6 +1,10 @@ from __future__ import annotations import collections.abc +from Crypto.PublicKey import ECC + +from pyplayready.exceptions import InvalidCertificateChain + # monkey patch for construct 2.8.8 compatibility if not hasattr(collections, 'Sequence'): collections.Sequence = collections.abc.Sequence @@ -51,7 +55,7 @@ class _BCertStructs: ) DrmBCertFeatureInfo = Struct( - "feature_count" / Int32ub, + "feature_count" / Int32ub, # max. 32 "features" / Array(this.feature_count, Int32ub) ) @@ -100,8 +104,8 @@ class _BCertStructs: # TODO: untested DrmBCertExtDataSignKeyInfo = Struct( - "type" / Int16ub, - "length" / Int16ub, + "key_type" / Int16ub, + "key_length" / Int16ub, "flags" / Int32ub, "key" / Bytes(this.length // 8) ) @@ -121,7 +125,7 @@ class _BCertStructs: # TODO: untested BCertExtDataContainer = Struct( - "record_count" / Int32ub, + "record_count" / Int32ub, # always 1 "records" / Array(this.record_count, BCertExtDataRecord), "signature" / DrmBCertExtDataSignature ) @@ -380,6 +384,26 @@ class Certificate(_BCertStructs): def struct(self) -> _BCertStructs.BCert: return self._BCERT + def verify_signature(self): + sign_payload = self.dumps()[:-144] + signature_attribute = self.get_attribute(8).attribute + + raw_signature_key = signature_attribute.signature_key + signature_key = ECC.construct( + curve='P-256', + point_x=int.from_bytes(raw_signature_key[:32], 'big'), + point_y=int.from_bytes(raw_signature_key[32:], 'big') + ) + + hash_obj = SHA256.new(sign_payload) + verifier = DSS.new(signature_key, 'fips-186-3') + + try: + verifier.verify(hash_obj, signature_attribute.signature) + return True + except ValueError: + return False + class CertificateChain(_BCertStructs): """Represents a BCertChain""" @@ -437,3 +461,25 @@ class CertificateChain(_BCertStructs): self.parsed.certificate_count += 1 self.parsed.certificates.insert(0, bcert.parsed) self.parsed.total_length += len(bcert.dumps()) + + def remove(self, index: int) -> None: + if self.parsed.certificate_count <= 0: + raise InvalidCertificateChain("CertificateChain does not contain any Certificates") + if index >= self.parsed.certificate_count: + raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total") + + self.parsed.certificate_count -= 1 + bcert = Certificate(self.parsed.certificates[index]) + self.parsed.total_length -= len(bcert.dumps()) + self.parsed.certificates.pop(index) + + def get(self, index: int) -> Certificate: + if self.parsed.certificate_count <= 0: + raise InvalidCertificateChain("CertificateChain does not contain any Certificates") + if index >= self.parsed.certificate_count: + raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total") + + return Certificate(self.parsed.certificates[index]) + + def count(self) -> int: + return self.parsed.certificate_count diff --git a/pyplayready/device.py b/pyplayready/device.py index a77259f..b52005a 100644 --- a/pyplayready/device.py +++ b/pyplayready/device.py @@ -20,6 +20,12 @@ class SecurityLevel(IntEnum): class _DeviceStructs: magic = Const(b"PRD") + header = Struct( + "signature" / magic, + "version" / Int8ub, + ) + + # was never in production v1 = Struct( "signature" / magic, "version" / Int8ub, @@ -38,36 +44,53 @@ class _DeviceStructs: "signing_key" / Bytes(96), ) + v3 = Struct( + "signature" / magic, + "version" / Int8ub, + "group_key" / Bytes(96), + "encryption_key" / Bytes(96), + "signing_key" / Bytes(96), + "group_certificate_length" / Int32ub, + "group_certificate" / Bytes(this.group_certificate_length), + ) + class Device: """Represents a PlayReady Device (.prd)""" - CURRENT_STRUCT = _DeviceStructs.v2 + CURRENT_STRUCT = _DeviceStructs.v3 + CURRENT_VERSION = 3 def __init__( self, *_: Any, - group_certificate: Union[str, bytes], + group_key: Union[str, bytes, None], encryption_key: Union[str, bytes], signing_key: Union[str, bytes], + group_certificate: Union[str, bytes], **__: Any ): - if isinstance(group_certificate, str): - group_certificate = base64.b64decode(group_certificate) - if not isinstance(group_certificate, bytes): - raise ValueError(f"Expecting Bytes or Base64 input, got {group_certificate!r}") + if isinstance(group_key, str): + group_key = base64.b64decode(group_key) if isinstance(encryption_key, str): encryption_key = base64.b64decode(encryption_key) if not isinstance(encryption_key, bytes): raise ValueError(f"Expecting Bytes or Base64 input, got {encryption_key!r}") + if isinstance(signing_key, str): signing_key = base64.b64decode(signing_key) if not isinstance(signing_key, bytes): raise ValueError(f"Expecting Bytes or Base64 input, got {signing_key!r}") - self.group_certificate = CertificateChain.loads(group_certificate) + if isinstance(group_certificate, str): + group_certificate = base64.b64decode(group_certificate) + if not isinstance(group_certificate, bytes): + raise ValueError(f"Expecting Bytes or Base64 input, got {group_certificate!r}") + + self.group_key = None if group_key is None else ECCKey.loads(group_key) self.encryption_key = ECCKey.loads(encryption_key) self.signing_key = ECCKey.loads(signing_key) + self.group_certificate = CertificateChain.loads(group_certificate) self.security_level = self.group_certificate.get_security_level() @classmethod @@ -76,6 +99,14 @@ class Device: data = base64.b64decode(data) if not isinstance(data, bytes): raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") + + prd_header = _DeviceStructs.header.parse(data) + if prd_header.version == 2: + return cls( + group_key=None, + **_DeviceStructs.v2.parse(data) + ) + return cls(**cls.CURRENT_STRUCT.parse(data)) @classmethod @@ -87,11 +118,12 @@ class Device: def dumps(self) -> bytes: return self.CURRENT_STRUCT.build(dict( - version=2, + version=self.CURRENT_VERSION, + group_key=self.group_key.dumps(), + encryption_key=self.encryption_key.dumps(), + signing_key=self.signing_key.dumps(), group_certificate_length=len(self.group_certificate.dumps()), group_certificate=self.group_certificate.dumps(), - encryption_key=self.encryption_key.dumps(), - signing_key=self.signing_key.dumps() )) def dump(self, path: Union[Path, str]) -> None: @@ -101,6 +133,6 @@ class Device: path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(self.dumps()) - def get_name(self): + def get_name(self) -> str: name = f"{self.group_certificate.get_name()}_sl{self.group_certificate.get_security_level()}" return ''.join(char for char in name if (char.isalnum() or char in '_- ')).strip().lower().replace(" ", "_") diff --git a/pyplayready/ecc_key.py b/pyplayready/ecc_key.py index 49a0d16..8a39d0f 100644 --- a/pyplayready/ecc_key.py +++ b/pyplayready/ecc_key.py @@ -22,28 +22,13 @@ class ECCKey: return cls(key=ECC.generate(curve='P-256')) @classmethod - def construct( - cls, - private_key: Union[bytes, int], - public_key_x: Union[bytes, int], - public_key_y: Union[bytes, int] - ): + def construct(cls, private_key: Union[bytes, int]): """Construct an ECC key pair from private/public bytes/ints""" if isinstance(private_key, bytes): private_key = int.from_bytes(private_key, 'big') if not isinstance(private_key, int): raise ValueError(f"Expecting Bytes or Int input, got {private_key!r}") - if isinstance(public_key_x, bytes): - public_key_x = int.from_bytes(public_key_x, 'big') - if not isinstance(public_key_x, int): - raise ValueError(f"Expecting Bytes or Int input, got {public_key_x!r}") - - if isinstance(public_key_y, bytes): - public_key_y = int.from_bytes(public_key_y, 'big') - if not isinstance(public_key_y, int): - raise ValueError(f"Expecting Bytes or Int input, got {public_key_y!r}") - # The public is always derived from the private key; loading the other stuff won't work key = ECC.construct( curve='P-256', @@ -62,11 +47,7 @@ class ECCKey: if len(data) not in [96, 32]: raise ValueError(f"Invalid data length. Expecting 96 or 32 bytes, got {len(data)}") - return cls.construct( - private_key=data[:32], - public_key_x=data[32:64], - public_key_y=data[64:96] - ) + return cls.construct(private_key=data[:32]) @classmethod def load(cls, path: Union[Path, str]) -> ECCKey: diff --git a/pyplayready/exceptions.py b/pyplayready/exceptions.py index 5be8e0f..8f2281f 100644 --- a/pyplayready/exceptions.py +++ b/pyplayready/exceptions.py @@ -24,3 +24,11 @@ class DeviceMismatch(PyPlayreadyException): class InvalidLicense(PyPlayreadyException): """Unable to parse XMR License.""" + + +class InvalidCertificateChain(PyPlayreadyException): + """The BCert is not correctly formatted.""" + + +class OutdatedDevice(PyPlayreadyException): + """The PlayReady Device is outdated and does not support a specific operation.""" diff --git a/pyplayready/main.py b/pyplayready/main.py index 2fd5b93..55cbf06 100644 --- a/pyplayready/main.py +++ b/pyplayready/main.py @@ -12,6 +12,7 @@ from pyplayready.bcert import CertificateChain, Certificate from pyplayready.cdm import Cdm from pyplayready.device import Device from pyplayready.ecc_key import ECCKey +from pyplayready.exceptions import OutdatedDevice from pyplayready.pssh import PSSH @@ -144,8 +145,8 @@ def create_device( encryption_key = ECCKey.generate() signing_key = ECCKey.generate() - certificate_chain = CertificateChain.load(group_certificate) group_key = ECCKey.load(group_key) + certificate_chain = CertificateChain.load(group_certificate) new_certificate = Certificate.new_leaf_cert( cert_id=get_random_bytes(16), @@ -159,13 +160,12 @@ def create_device( certificate_chain.prepend(new_certificate) device = Device( - group_certificate=certificate_chain.dumps(), + group_key=group_key.dumps(), encryption_key=encryption_key.dumps(), - signing_key=signing_key.dumps() + signing_key=signing_key.dumps(), + group_certificate=certificate_chain.dumps(), ) - prd_bin = device.dumps() - if output and output.suffix: if output.suffix.lower() != ".prd": log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.") @@ -179,23 +179,78 @@ def create_device( return out_path.parent.mkdir(parents=True, exist_ok=True) - out_path.write_bytes(prd_bin) + out_path.write_bytes(device.dumps()) log.info("Created Playready Device (.prd) file, %s", out_path.name) log.info(" + Security Level: %s", device.security_level) - log.info(" + Group Certificate: %s (%s bytes)", bool(device.group_certificate.dumps()), len(device.group_certificate.dumps())) - log.info(" + Encryption Key: %s (%s bytes)", bool(device.encryption_key.dumps()), len(device.encryption_key.dumps())) - log.info(" + Signing Key: %s (%s bytes)", bool(device.signing_key.dumps()), len(device.signing_key.dumps())) + log.info(" + Group Key: %s bytes", len(device.group_key.dumps())) + log.info(" + Encryption Key: %s bytes", len(device.encryption_key.dumps())) + log.info(" + Signing Key: %s bytes", len(device.signing_key.dumps())) + log.info(" + Group Certificate: %s bytes", len(device.group_certificate.dumps())) log.info(" + Saved to: %s", out_path.absolute()) +@main.command() +@click.argument("prd_path", type=Path) +@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory") +@click.pass_context +def reprovision_device(ctx: click.Context, prd_path: Path, output: Optional[Path] = None) -> None: + """ + Reprovision a Playready Device (.prd) by creating a new leaf certificate and new encryption/signing keys. + Will override the device if an output path or directory is not specified + + Only works on PRD Devices of v3 or higher + """ + if not prd_path.is_file(): + raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx) + + log = logging.getLogger("reprovision-device") + log.info("Reprovisioning Playready Device (.prd) file, %s", prd_path.name) + + device = Device.load(prd_path) + + if device.group_key is None: + raise OutdatedDevice("Device does not support reprovisioning, re-create it or use a Device with a version of 3 or higher") + + device.group_certificate.remove(0) + + encryption_key = ECCKey.generate() + signing_key = ECCKey.generate() + + device.encryption_key = encryption_key + device.signing_key = signing_key + + new_certificate = Certificate.new_leaf_cert( + cert_id=get_random_bytes(16), + security_level=device.group_certificate.get_security_level(), + client_id=get_random_bytes(16), + signing_key=signing_key, + encryption_key=encryption_key, + group_key=device.group_key, + parent=device.group_certificate + ) + device.group_certificate.prepend(new_certificate) + + if output and output.suffix: + if output.suffix.lower() != ".prd": + log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.") + out_path = output + else: + out_path = prd_path + + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_bytes(device.dumps()) + + log.info("Reprovisioned Playready Device (.prd) file, %s", out_path.name) + + @main.command() @click.argument("prd_path", type=Path) @click.option("-o", "--out_dir", type=Path, default=None, help="Output Directory") @click.pass_context def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] = None) -> None: """ - Export a Playready Device (.prd) file to a Group Certificate, Encryption Key and Signing Key + Export a Playready Device (.prd) file to a Group Key, Encryption Key, Signing Key and Group Certificate If an output directory is not specified, it will be stored in the current working directory """ if not prd_path.is_file(): @@ -222,9 +277,9 @@ def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] = log.info(f"L{device.security_level} {device.get_name()}") log.info(f"Saving to: {out_path}") - client_id_path = out_path / "bgroupcert.dat" - client_id_path.write_bytes(device.group_certificate.dumps()) - log.info("Exported Group Certificate to bgroupcert.dat") + group_key_path = out_path / "zgpriv.dat" + group_key_path.write_bytes(device.group_key.dumps()) + log.info("Exported Group Key as zgpriv.dat") private_key_path = out_path / "zprivencr.dat" private_key_path.write_bytes(device.encryption_key.dumps()) @@ -234,6 +289,10 @@ def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] = private_key_path.write_bytes(device.signing_key.dumps()) log.info("Exported Signing Key as zprivsig.dat") + client_id_path = out_path / "bgroupcert.dat" + client_id_path.write_bytes(device.group_certificate.dumps()) + log.info("Exported Group Certificate to bgroupcert.dat") + @main.command("serve", short_help="Serve your local CDM and Playready Devices Remotely.") @click.argument("config_path", type=Path) @@ -243,16 +302,14 @@ def serve_(config_path: Path, host: str, port: int) -> None: """ Serve your local CDM and Playready Devices Remotely. - \b [CONFIG] is a path to a serve config file. See `serve.example.yml` for an example config file. - \b Host as 127.0.0.1 may block remote access even if port-forwarded. Instead, use 0.0.0.0 and ensure the TCP port you choose is forwarded. """ - from pyplayready import serve # isort:skip - import yaml # isort:skip + from pyplayready import serve + import yaml config = yaml.safe_load(config_path.read_text(encoding="utf8")) serve.run(config, host, port) diff --git a/pyproject.toml b/pyproject.toml index 39bf93e..1867c6d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pyplayready" -version = "0.3.9" +version = "0.4.0" description = "pyplayready CDM (Content Decryption Module) implementation in Python." license = "CC BY-NC-ND 4.0" authors = ["DevLARLEY, Erevoc", "DevataDev"]