diff --git a/README.md b/README.md
new file mode 100644
index 0000000..2dfaeff
--- /dev/null
+++ b/README.md
@@ -0,0 +1,71 @@
+# pyplayready
+All of this is already public. 100% of this code has been derived from the mspr_toolkit.
+
+## Installation
+```shell
+pip install pyplayready
+```
+
+Run `pyplayready --help` to view available cli functions
+
+## Devices
+Run the command below to create a Playready Device (.prd) from a `bgroupcert.dat` and `zgpriv.dat`:
+```shell
+pyplayready create-device -c bgroupcert.dat -g zgpriv.dat
+```
+
+## Usage
+```python
+from pyplayready.cdm import Cdm
+from pyplayready.device import Device
+from pyplayready.pssh import PSSH
+
+import requests
+
+device = Device.load("C:/Path/To/A/Device.prd")
+cdm = Cdm.from_device(device)
+
+pssh = PSSH(
+ "AAADfHBzc2gAAAAAmgTweZhAQoarkuZb4IhflQAAA1xcAwAAAQABAFIDPABXAFIATQBIAEUAQQBEAEUAUgAgAHgAbQBsAG4AcwA9ACIAaAB0AH"
+ "QAcAA6AC8ALwBzAGMAaABlAG0AYQBzAC4AbQBpAGMAcgBvAHMAbwBmAHQALgBjAG8AbQAvAEQAUgBNAC8AMgAwADAANwAvADAAMwAvAFAAbABh"
+ "AHkAUgBlAGEAZAB5AEgAZQBhAGQAZQByACIAIAB2AGUAcgBzAGkAbwBuAD0AIgA0AC4AMAAuADAALgAwACIAPgA8AEQAQQBUAEEAPgA8AFAAUg"
+ "BPAFQARQBDAFQASQBOAEYATwA+ADwASwBFAFkATABFAE4APgAxADYAPAAvAEsARQBZAEwARQBOAD4APABBAEwARwBJAEQAPgBBAEUAUwBDAFQA"
+ "UgA8AC8AQQBMAEcASQBEAD4APAAvAFAAUgBPAFQARQBDAFQASQBOAEYATwA+ADwASwBJAEQAPgA0AFIAcABsAGIAKwBUAGIATgBFAFMAOAB0AE"
+ "cAawBOAEYAVwBUAEUASABBAD0APQA8AC8ASwBJAEQAPgA8AEMASABFAEMASwBTAFUATQA+AEsATABqADMAUQB6AFEAUAAvAE4AQQA9ADwALwBD"
+ "AEgARQBDAEsAUwBVAE0APgA8AEwAQQBfAFUAUgBMAD4AaAB0AHQAcABzADoALwAvAHAAcgBvAGYAZgBpAGMAaQBhAGwAcwBpAHQAZQAuAGsAZQ"
+ "B5AGQAZQBsAGkAdgBlAHIAeQAuAG0AZQBkAGkAYQBzAGUAcgB2AGkAYwBlAHMALgB3AGkAbgBkAG8AdwBzAC4AbgBlAHQALwBQAGwAYQB5AFIA"
+ "ZQBhAGQAeQAvADwALwBMAEEAXwBVAFIATAA+ADwAQwBVAFMAVABPAE0AQQBUAFQAUgBJAEIAVQBUAEUAUwA+ADwASQBJAFMAXwBEAFIATQBfAF"
+ "YARQBSAFMASQBPAE4APgA4AC4AMQAuADIAMwAwADQALgAzADEAPAAvAEkASQBTAF8ARABSAE0AXwBWAEUAUgBTAEkATwBOAD4APAAvAEMAVQBT"
+ "AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
+)
+
+request = cdm.get_license_challenge(pssh.wrm_headers[0])
+
+response = requests.post(
+ url="https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)",
+ headers={
+ 'Content-Type': 'text/xml; charset=UTF-8',
+ },
+ data=request,
+)
+
+cdm.parse_license(response.text)
+
+for key in cdm.get_keys():
+ print(f"{key.key_id.hex}:{key.key.hex()}")
+```
+
+## Disclaimer
+
+1. This project requires a valid Microsoft Certificate and Group Key, which are not provided by this project.
+2. Public test provisions are available and provided by Microsoft to use for testing projects such as this one.
+3. This project does not condone piracy or any action against the terms of the DRM systems.
+4. All efforts in this project have been the result of Reverse-Engineering, Publicly available research, and Trial & Error.
+5. Do not use this program to decrypt or access any content for which you do not have the legal rights or explicit permission.
+6. Unauthorized decryption or distribution of copyrighted materials is a violation of applicable laws and intellectual property rights.
+7. This tool must not be used for any illegal activities, including but not limited to piracy, circumventing digital rights management (DRM), or unauthorized access to protected content.
+8. The developers, contributors, and maintainers of this program are not responsible for any misuse or illegal activities performed using this software.
+9. By using this program, you agree to comply with all applicable laws and regulations governing digital rights and copyright protections.
+
+## Credits
++ [mspr_toolkit](https://security-explorations.com/materials/mspr_toolkit.zip)
diff --git a/pyplayready/__init__.py b/pyplayready/__init__.py
new file mode 100644
index 0000000..f102a9c
--- /dev/null
+++ b/pyplayready/__init__.py
@@ -0,0 +1 @@
+__version__ = "0.0.1"
diff --git a/pyplayready/bcert.py b/pyplayready/bcert.py
new file mode 100644
index 0000000..747e72d
--- /dev/null
+++ b/pyplayready/bcert.py
@@ -0,0 +1,428 @@
+from __future__ import annotations
+
+import base64
+from pathlib import Path
+from typing import Union
+
+from Crypto.Hash import SHA256
+from Crypto.Signature import DSS
+from construct import Bytes, Const, Int32ub, GreedyRange, Switch, Container, ListContainer
+from construct import Int16ub, Array
+from construct import Struct, this
+
+from pyplayready.ecc_key import ECCKey
+
+
+class _BCertStructs:
+ DrmBCertBasicInfo = Struct(
+ "cert_id" / Bytes(16),
+ "security_level" / Int32ub,
+ "flags" / Int32ub,
+ "cert_type" / Int32ub,
+ "public_key_digest" / Bytes(32),
+ "expiration_date" / Int32ub,
+ "client_id" / Bytes(16)
+ )
+
+ # TODO: untested
+ DrmBCertDomainInfo = Struct(
+ "service_id" / Bytes(16),
+ "account_id" / Bytes(16),
+ "revision_timestamp" / Int32ub,
+ "domain_url_length" / Int32ub,
+ "domain_url" / Bytes((this.domain_url_length + 3) & 0xfffffffc)
+ )
+
+ # TODO: untested
+ DrmBCertPCInfo = Struct(
+ "security_version" / Int32ub
+ )
+
+ # TODO: untested
+ DrmBCertDeviceInfo = Struct(
+ "max_license" / Int32ub,
+ "max_header" / Int32ub,
+ "max_chain_depth" / Int32ub
+ )
+
+ DrmBCertFeatureInfo = Struct(
+ "feature_count" / Int32ub,
+ "features" / Array(this.feature_count, Int32ub)
+ )
+
+ DrmBCertKeyInfo = Struct(
+ "key_count" / Int32ub,
+ "cert_keys" / Array(this.key_count, Struct(
+ "type" / Int16ub,
+ "length" / Int16ub,
+ "flags" / Int32ub,
+ "key" / Bytes(this.length // 8),
+ "usages_count" / Int32ub,
+ "usages" / Array(this.usages_count, Int32ub)
+ ))
+ )
+
+ DrmBCertManufacturerInfo = Struct(
+ "flags" / Int32ub,
+ "manufacturer_name_length" / Int32ub,
+ "manufacturer_name" / Bytes((this.manufacturer_name_length + 3) & 0xfffffffc),
+ "model_name_length" / Int32ub,
+ "model_name" / Bytes((this.model_name_length + 3) & 0xfffffffc),
+ "model_number_length" / Int32ub,
+ "model_number" / Bytes((this.model_number_length + 3) & 0xfffffffc),
+ )
+
+ DrmBCertSignatureInfo = Struct(
+ "signature_type" / Int16ub,
+ "signature_size" / Int16ub,
+ "signature" / Bytes(this.signature_size),
+ "signature_key_size" / Int32ub,
+ "signature_key" / Bytes(this.signature_key_size // 8)
+ )
+
+ # TODO: untested
+ DrmBCertSilverlightInfo = Struct(
+ "security_version" / Int32ub,
+ "platform_identifier" / Int32ub
+ )
+
+ # TODO: untested
+ DrmBCertMeteringInfo = Struct(
+ "metering_id" / Bytes(16),
+ "metering_url_length" / Int32ub,
+ "metering_url" / Bytes((this.metering_url_length + 3) & 0xfffffffc)
+ )
+
+ # TODO: untested
+ DrmBCertExtDataSignKeyInfo = Struct(
+ "type" / Int16ub,
+ "length" / Int16ub,
+ "flags" / Int32ub,
+ "key" / Bytes(this.length // 8)
+ )
+
+ # TODO: untested
+ BCertExtDataRecord = Struct(
+ "data_size" / Int32ub,
+ "data" / Bytes(this.data_size)
+ )
+
+ # TODO: untested
+ DrmBCertExtDataSignature = Struct(
+ "signature_type" / Int16ub,
+ "signature_size" / Int16ub,
+ "signature" / Bytes(this.signature_size)
+ )
+
+ # TODO: untested
+ BCertExtDataContainer = Struct(
+ "record_count" / Int32ub,
+ "records" / Array(this.record_count, BCertExtDataRecord),
+ "signature" / DrmBCertExtDataSignature
+ )
+
+ # TODO: untested
+ DrmBCertServerInfo = Struct(
+ "warning_days" / Int32ub
+ )
+
+ # TODO: untested
+ DrmBcertSecurityVersion = Struct(
+ "security_version" / Int32ub,
+ "platform_identifier" / Int32ub
+ )
+
+ Attribute = Struct(
+ "flags" / Int16ub,
+ "tag" / Int16ub,
+ "length" / Int32ub,
+ "attribute" / Switch(
+ lambda this_: this_.tag,
+ {
+ 1: DrmBCertBasicInfo,
+ 2: DrmBCertDomainInfo,
+ 3: DrmBCertPCInfo,
+ 4: DrmBCertDeviceInfo,
+ 5: DrmBCertFeatureInfo,
+ 6: DrmBCertKeyInfo,
+ 7: DrmBCertManufacturerInfo,
+ 8: DrmBCertSignatureInfo,
+ 9: DrmBCertSilverlightInfo,
+ 10: DrmBCertMeteringInfo,
+ 11: DrmBCertExtDataSignKeyInfo,
+ 12: BCertExtDataContainer,
+ 13: DrmBCertExtDataSignature,
+ 14: Bytes(this.length - 8),
+ 15: DrmBCertServerInfo,
+ 16: DrmBcertSecurityVersion,
+ 17: DrmBcertSecurityVersion
+ },
+ default=Bytes(this.length - 8)
+ )
+ )
+
+ BCert = Struct(
+ "signature" / Const(b"CERT"),
+ "version" / Int32ub,
+ "total_length" / Int32ub,
+ "certificate_length" / Int32ub,
+ "attributes" / GreedyRange(Attribute)
+ )
+
+ BCertChain = Struct(
+ "signature" / Const(b"CHAI"),
+ "version" / Int32ub,
+ "total_length" / Int32ub,
+ "flags" / Int32ub,
+ "certificate_count" / Int32ub,
+ "certificates" / GreedyRange(BCert)
+ )
+
+
+class Certificate(_BCertStructs):
+ def __init__(
+ self,
+ parsed_bcert: Container,
+ bcert_obj: _BCertStructs.BCert = _BCertStructs.BCert
+ ):
+ self.parsed = parsed_bcert
+ self._BCERT = bcert_obj
+
+ @classmethod
+ def new_key_cert(
+ cls,
+ cert_id: bytes,
+ security_level: int,
+ client_id: bytes,
+ signing_key: ECCKey,
+ encryption_key: ECCKey,
+ group_key: ECCKey,
+ parent: CertificateChain,
+ expiry: int = 0xFFFFFFFF,
+ max_license: int = 10240,
+ max_header: int = 15360,
+ max_chain_depth: int = 2
+ ) -> Certificate:
+ if not cert_id:
+ raise ValueError("Certificate ID is required")
+ if not client_id:
+ raise ValueError("Client ID is required")
+
+ basic_info = Container(
+ cert_id=cert_id,
+ security_level=security_level,
+ flags=0,
+ cert_type=2,
+ public_key_digest=signing_key.public_sha256_digest(),
+ expiration_date=expiry,
+ client_id=client_id
+ )
+ basic_info_attribute = Container(
+ flags=1,
+ tag=1,
+ length=len(_BCertStructs.DrmBCertBasicInfo.build(basic_info)) + 8,
+ attribute=basic_info
+ )
+
+ device_info = Container(
+ max_license=max_license,
+ max_header=max_header,
+ max_chain_depth=max_chain_depth
+ )
+ device_info_attribute = Container(
+ flags=1,
+ tag=4,
+ length=len(_BCertStructs.DrmBCertDeviceInfo.build(device_info)) + 8,
+ attribute=device_info
+ )
+
+ feature = Container(
+ feature_count=1,
+ features=ListContainer([
+ 4
+ ])
+ )
+ feature_attribute = Container(
+ flags=1,
+ tag=5,
+ length=len(_BCertStructs.DrmBCertFeatureInfo.build(feature)) + 8,
+ attribute=feature
+ )
+
+ cert_key_sign = Container(
+ type=1,
+ length=512, # bits
+ flags=0,
+ key=signing_key.public_bytes(),
+ usages_count=1,
+ usages=ListContainer([
+ 1
+ ])
+ )
+ cert_key_encrypt = Container(
+ type=1,
+ length=512, # bits
+ flags=0,
+ key=encryption_key.public_bytes(),
+ usages_count=1,
+ usages=ListContainer([
+ 2
+ ])
+ )
+ key_info = Container(
+ key_count=2,
+ cert_keys=ListContainer([
+ cert_key_sign,
+ cert_key_encrypt
+ ])
+ )
+ key_info_attribute = Container(
+ flags=1,
+ tag=6,
+ length=len(_BCertStructs.DrmBCertKeyInfo.build(key_info)) + 8,
+ attribute=key_info
+ )
+
+ manufacturer_info = parent.get_certificate(0).get_attribute(7)
+
+ new_bcert_container = Container(
+ signature=b"CERT",
+ version=1,
+ total_length=0, # filled at a later time
+ certificate_length=0, # filled at a later time
+ attributes=ListContainer([
+ basic_info_attribute,
+ device_info_attribute,
+ feature_attribute,
+ key_info_attribute,
+ manufacturer_info,
+ ])
+ )
+
+ payload = _BCertStructs.BCert.build(new_bcert_container)
+ new_bcert_container.certificate_length = len(payload)
+ new_bcert_container.total_length = len(payload) + 144 # signature length
+
+ sign_payload = _BCertStructs.BCert.build(new_bcert_container)
+
+ hash_obj = SHA256.new(sign_payload)
+ signer = DSS.new(group_key.key, 'fips-186-3')
+ signature = signer.sign(hash_obj)
+
+ signature_info = Container(
+ signature_type=1,
+ signature_size=64,
+ signature=signature,
+ signature_key_size=512, # bits
+ signature_key=group_key.public_bytes()
+ )
+ signature_info_attribute = Container(
+ flags=1,
+ tag=8,
+ length=len(_BCertStructs.DrmBCertSignatureInfo.build(signature_info)) + 8,
+ attribute=signature_info
+ )
+ new_bcert_container.attributes.append(signature_info_attribute)
+
+ return cls(new_bcert_container)
+
+ @classmethod
+ def loads(cls, data: Union[str, bytes]) -> Certificate:
+ if isinstance(data, str):
+ data = base64.b64decode(data)
+ if not isinstance(data, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
+
+ cert = _BCertStructs.BCert
+ return cls(
+ parsed_bcert=cert.parse(data),
+ bcert_obj=cert
+ )
+
+ @classmethod
+ def load(cls, path: Union[Path, str]) -> Certificate:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ with Path(path).open(mode="rb") as f:
+ return cls.loads(f.read())
+
+ def get_attribute(self, type_: int):
+ for attribute in self.parsed.attributes:
+ if attribute.tag == type_:
+ return attribute
+
+ def get_security_level(self) -> int:
+ basic_info_attribute = self.get_attribute(1).attribute
+ if basic_info_attribute:
+ return basic_info_attribute.security_level
+
+ @staticmethod
+ def _unpad(name: bytes):
+ return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
+
+ def get_name(self):
+ manufacturer_info = self.get_attribute(7).attribute
+ if manufacturer_info:
+ return f"{self._unpad(manufacturer_info.manufacturer_name)} {self._unpad(manufacturer_info.model_name)} {self._unpad(manufacturer_info.model_number)}"
+
+ def dumps(self) -> bytes:
+ return self._BCERT.build(self.parsed)
+
+ def struct(self) -> _BCertStructs.BCert:
+ return self._BCERT
+
+
+class CertificateChain(_BCertStructs):
+ def __init__(
+ self,
+ parsed_bcert_chain: Container,
+ bcert_chain_obj: _BCertStructs.BCertChain = _BCertStructs.BCertChain
+ ):
+ self.parsed = parsed_bcert_chain
+ self._BCERT_CHAIN = bcert_chain_obj
+
+ @classmethod
+ def loads(cls, data: Union[str, bytes]) -> CertificateChain:
+ if isinstance(data, str):
+ data = base64.b64decode(data)
+ if not isinstance(data, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
+
+ cert_chain = _BCertStructs.BCertChain
+ return cls(
+ parsed_bcert_chain=cert_chain.parse(data),
+ bcert_chain_obj=cert_chain
+ )
+
+ @classmethod
+ def load(cls, path: Union[Path, str]) -> CertificateChain:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ with Path(path).open(mode="rb") as f:
+ return cls.loads(f.read())
+
+ def dumps(self) -> bytes:
+ return self._BCERT_CHAIN.build(self.parsed)
+
+ def struct(self) -> _BCertStructs.BCertChain:
+ return self._BCERT_CHAIN
+
+ def get_certificate(self, index: int) -> Certificate:
+ return Certificate(self.parsed.certificates[index])
+
+ def get_security_level(self) -> int:
+ # not sure if there's a better way than this
+ return self.get_certificate(0).get_security_level()
+
+ def get_name(self) -> str:
+ return self.get_certificate(0).get_name()
+
+ def append(self, bcert: Certificate) -> None:
+ self.parsed.certificate_count += 1
+ self.parsed.certificates.append(bcert.parsed)
+ self.parsed.total_length += len(bcert.dumps())
+
+ def prepend(self, bcert: Certificate) -> None:
+ self.parsed.certificate_count += 1
+ self.parsed.certificates.insert(0, bcert.parsed)
+ self.parsed.total_length += len(bcert.dumps())
diff --git a/pyplayready/cdm.py b/pyplayready/cdm.py
new file mode 100644
index 0000000..61defea
--- /dev/null
+++ b/pyplayready/cdm.py
@@ -0,0 +1,216 @@
+from __future__ import annotations
+
+import base64
+import math
+import time
+from typing import List
+from uuid import UUID
+import xml.etree.ElementTree as ET
+
+from Crypto.Cipher import AES
+from Crypto.Hash import SHA256
+from Crypto.Random import get_random_bytes
+from Crypto.Signature import DSS
+from Crypto.Util.Padding import pad
+from ecpy.curves import Point, Curve
+
+from pyplayready.bcert import CertificateChain
+from pyplayready.ecc_key import ECCKey
+from pyplayready.key import Key
+from pyplayready.xml_key import XmlKey
+from pyplayready.elgamal import ElGamal
+from pyplayready.xmrlicense import XMRLicense
+
+
+class Cdm:
+ def __init__(
+ self,
+ security_level: int,
+ certificate_chain: CertificateChain,
+ encryption_key: ECCKey,
+ signing_key: ECCKey,
+ client_version: str = "10.0.16384.10011"
+ ):
+ self.security_level = security_level
+ self.certificate_chain = certificate_chain
+ self.encryption_key = encryption_key
+ self.signing_key = signing_key
+ self.client_version = client_version
+
+ self.curve = Curve.get_curve("secp256r1")
+ self.elgamal = ElGamal(self.curve)
+
+ self._wmrm_key = Point(
+ x=0xc8b6af16ee941aadaa5389b4af2c10e356be42af175ef3face93254e7b0b3d9b,
+ y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562,
+ curve=self.curve
+ )
+ self._xml_key = XmlKey()
+
+ self._keys: List[Key] = []
+
+ @classmethod
+ def from_device(cls, device) -> Cdm:
+ """Initialize a Playready CDM from a Playready Device (.prd) file"""
+ return cls(
+ security_level=device.security_level,
+ certificate_chain=device.group_certificate,
+ encryption_key=device.encryption_key,
+ signing_key=device.signing_key
+ )
+
+ def get_key_data(self):
+ point1, point2 = self.elgamal.encrypt(
+ message_point=self._xml_key.get_point(self.elgamal.curve),
+ public_key=self._wmrm_key
+ )
+ return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y)
+
+ def get_cipher_data(self):
+ b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
+ body = f"{b64_chain}"
+
+ cipher = AES.new(
+ key=self._xml_key.aes_key,
+ mode=AES.MODE_CBC,
+ iv=self._xml_key.aes_iv
+ )
+
+ ciphertext = cipher.encrypt(pad(
+ body.encode(),
+ AES.block_size
+ ))
+
+ return self._xml_key.aes_iv + ciphertext
+
+ def _build_digest_content(
+ self,
+ content_header: str,
+ nonce: str,
+ wmrm_cipher: str,
+ cert_cipher: str
+ ) -> str:
+ return (
+ ''
+ '1'
+ f'{content_header}'
+ ''
+ f'{self.client_version}'
+ ''
+ f'{nonce}'
+ f'{math.floor(time.time())}'
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ 'WMRMServer'
+ ''
+ ''
+ f'{wmrm_cipher}'
+ ''
+ ''
+ ''
+ ''
+ f'{cert_cipher}'
+ ''
+ ''
+ ''
+ )
+
+ @staticmethod
+ def _build_signed_info(digest_value: str) -> str:
+ return (
+ ''
+ ''
+ ''
+ ''
+ ''
+ f'{digest_value}'
+ ''
+ ''
+ )
+
+ def get_license_challenge(self, content_header: str) -> str:
+ la_content = self._build_digest_content(
+ content_header=content_header,
+ nonce=base64.b64encode(get_random_bytes(16)).decode(),
+ wmrm_cipher=base64.b64encode(self.get_key_data()).decode(),
+ cert_cipher=base64.b64encode(self.get_cipher_data()).decode()
+ )
+
+ la_hash_obj = SHA256.new()
+ la_hash_obj.update(la_content.encode())
+ la_hash = la_hash_obj.digest()
+
+ signed_info = self._build_signed_info(base64.b64encode(la_hash).decode())
+ signed_info_digest = SHA256.new(signed_info.encode())
+
+ signer = DSS.new(self.signing_key.key, 'fips-186-3')
+ signature = signer.sign(signed_info_digest)
+
+ # haven't found a better way to do this. xmltodict.unparse doesn't work
+ main_body = (
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ + la_content +
+ ''
+ + signed_info +
+ f'{base64.b64encode(signature).decode()}'
+ ''
+ ''
+ ''
+ f'{base64.b64encode(self.signing_key.public_bytes()).decode()}'
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ )
+
+ return main_body
+
+ def _decrypt_ecc256_key(self, encrypted_key: bytes) -> bytes:
+ point1 = Point(
+ x=int.from_bytes(encrypted_key[:32], 'big'),
+ y=int.from_bytes(encrypted_key[32:64], 'big'),
+ curve=self.curve
+ )
+ point2 = Point(
+ x=int.from_bytes(encrypted_key[64:96], 'big'),
+ y=int.from_bytes(encrypted_key[96:128], 'big'),
+ curve=self.curve
+ )
+
+ decrypted = self.elgamal.decrypt((point1, point2), int(self.encryption_key.key.d))
+ return self.elgamal.to_bytes(decrypted.x)[16:32]
+
+ def parse_license(self, licence: str) -> None:
+ try:
+ root = ET.fromstring(licence)
+ license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License")
+ for license_element in license_elements:
+ parsed_licence = XMRLicense.loads(license_element.text)
+ for key in parsed_licence.get_content_keys():
+ if Key.CipherType(key.cipher_type) == Key.CipherType.ECC256:
+ self._keys.append(Key(
+ key_id=UUID(bytes_le=key.key_id),
+ key_type=key.key_type,
+ cipher_type=key.cipher_type,
+ key_length=key.key_length,
+ key=self._decrypt_ecc256_key(key.encrypted_key)
+ ))
+ except Exception as e:
+ raise Exception(f"Unable to parse license, {e}")
+
+ def get_keys(self) -> List[Key]:
+ return self._keys
diff --git a/pyplayready/device.py b/pyplayready/device.py
new file mode 100644
index 0000000..3b5f8ce
--- /dev/null
+++ b/pyplayready/device.py
@@ -0,0 +1,105 @@
+from __future__ import annotations
+
+import base64
+from enum import IntEnum
+from pathlib import Path
+from typing import Union, Any
+
+from construct import Struct, Const, Int8ub, Bytes, this, Int32ub
+
+from pyplayready.bcert import CertificateChain
+from pyplayready.ecc_key import ECCKey
+
+
+class SecurityLevel(IntEnum):
+ SL150 = 150
+ SL2000 = 2000
+ SL3000 = 3000
+
+
+class _DeviceStructs:
+ magic = Const(b"PRD")
+
+ v1 = Struct(
+ "signature" / magic,
+ "version" / Int8ub,
+ "group_key_length" / Int32ub,
+ "group_key" / Bytes(this.group_key_length),
+ "group_certificate_length" / Int32ub,
+ "group_certificate" / Bytes(this.group_certificate_length)
+ )
+
+ v2 = Struct(
+ "signature" / magic,
+ "version" / Int8ub,
+ "group_certificate_length" / Int32ub,
+ "group_certificate" / Bytes(this.group_certificate_length),
+ "encryption_key" / Bytes(96),
+ "signing_key" / Bytes(96),
+ )
+
+
+class Device:
+ CURRENT_STRUCT = _DeviceStructs.v2
+
+ def __init__(
+ self,
+ *_: Any,
+ group_certificate: Union[str, bytes],
+ encryption_key: Union[str, bytes],
+ signing_key: Union[str, bytes],
+ **__: Any
+ ):
+ if isinstance(group_certificate, str):
+ group_certificate = base64.b64decode(group_certificate)
+ if not isinstance(group_certificate, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {group_certificate!r}")
+
+ if isinstance(encryption_key, str):
+ encryption_key = base64.b64decode(encryption_key)
+ if not isinstance(encryption_key, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {encryption_key!r}")
+ if isinstance(signing_key, str):
+ signing_key = base64.b64decode(signing_key)
+ if not isinstance(signing_key, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {signing_key!r}")
+
+ self.group_certificate = CertificateChain.loads(group_certificate)
+ self.encryption_key = ECCKey.loads(encryption_key)
+ self.signing_key = ECCKey.loads(signing_key)
+ self.security_level = self.group_certificate.get_security_level()
+
+ @classmethod
+ def loads(cls, data: Union[str, bytes]) -> Device:
+ if isinstance(data, str):
+ data = base64.b64decode(data)
+ if not isinstance(data, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
+ return cls(**cls.CURRENT_STRUCT.parse(data))
+
+ @classmethod
+ def load(cls, path: Union[Path, str]) -> Device:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ with Path(path).open(mode="rb") as f:
+ return cls.loads(f.read())
+
+ def dumps(self) -> bytes:
+ return self.CURRENT_STRUCT.build(dict(
+ version=2,
+ group_certificate_length=len(self.group_certificate.dumps()),
+ group_certificate=self.group_certificate.dumps(),
+ encryption_key=self.encryption_key.dumps(),
+ signing_key=self.signing_key.dumps()
+ ))
+
+ def dump(self, path: Union[Path, str]) -> None:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ path = Path(path)
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_bytes(self.dumps())
+
+ def get_name(self):
+ name = f"{self.group_certificate.get_name()}_sl{self.group_certificate.get_security_level()}"
+ return ''.join(char for char in name if char.isascii()).strip().lower().replace(" ", "_")
diff --git a/pyplayready/ecc_key.py b/pyplayready/ecc_key.py
new file mode 100644
index 0000000..bdd72f2
--- /dev/null
+++ b/pyplayready/ecc_key.py
@@ -0,0 +1,105 @@
+from __future__ import annotations
+
+import base64
+from pathlib import Path
+from typing import Union
+
+from Crypto.Hash import SHA256
+from Crypto.PublicKey import ECC
+from Crypto.PublicKey.ECC import EccKey
+from ecpy.curves import Curve, Point
+
+
+class ECCKey:
+ def __init__(
+ self,
+ key: EccKey
+ ):
+ """Represents a PlayReady ECC key"""
+ self.key = key
+
+ @classmethod
+ def generate(cls):
+ return cls(key=ECC.generate(curve='P-256'))
+
+ @classmethod
+ def construct(
+ cls,
+ private_key: Union[bytes, int],
+ public_key_x: Union[bytes, int],
+ public_key_y: Union[bytes, int]
+ ):
+ if isinstance(private_key, bytes):
+ private_key = int.from_bytes(private_key, 'big')
+ if not isinstance(private_key, int):
+ raise ValueError(f"Expecting Bytes or Int input, got {private_key!r}")
+
+ if isinstance(public_key_x, bytes):
+ public_key_x = int.from_bytes(public_key_x, 'big')
+ if not isinstance(public_key_x, int):
+ raise ValueError(f"Expecting Bytes or Int input, got {public_key_x!r}")
+
+ if isinstance(public_key_y, bytes):
+ public_key_y = int.from_bytes(public_key_y, 'big')
+ if not isinstance(public_key_y, int):
+ raise ValueError(f"Expecting Bytes or Int input, got {public_key_y!r}")
+
+ # The public is always derived from the private key; loading the other stuff won't work
+ key = ECC.construct(
+ curve='P-256',
+ d=private_key,
+ )
+
+ return cls(key=key)
+
+ @classmethod
+ def loads(cls, data: Union[str, bytes]) -> ECCKey:
+ if isinstance(data, str):
+ data = base64.b64decode(data)
+ if not isinstance(data, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
+
+ if len(data) not in [96, 32]:
+ raise ValueError(f"Invalid data length. Expecting 96 or 32 bytes, got {len(data)}")
+
+ return cls.construct(
+ private_key=data[:32],
+ public_key_x=data[32:64],
+ public_key_y=data[64:96]
+ )
+
+ @classmethod
+ def load(cls, path: Union[Path, str]) -> ECCKey:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ with Path(path).open(mode="rb") as f:
+ return cls.loads(f.read())
+
+ def dumps(self):
+ return self.private_bytes() + self.public_bytes()
+
+ def dump(self, path: Union[Path, str]) -> None:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ path = Path(path)
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_bytes(self.dumps())
+
+ def get_point(self, curve: Curve) -> Point:
+ return Point(self.key.pointQ.x, self.key.pointQ.y, curve)
+
+ def private_bytes(self) -> bytes:
+ return self.key.d.to_bytes()
+
+ def private_sha256_digest(self) -> bytes:
+ hash_object = SHA256.new()
+ hash_object.update(self.private_bytes())
+ return hash_object.digest()
+
+ def public_bytes(self) -> bytes:
+ return self.key.pointQ.x.to_bytes() + self.key.pointQ.y.to_bytes()
+
+ def public_sha256_digest(self) -> bytes:
+ hash_object = SHA256.new()
+ hash_object.update(self.public_bytes())
+ return hash_object.digest()
diff --git a/pyplayready/elgamal.py b/pyplayready/elgamal.py
new file mode 100644
index 0000000..0f0b9b5
--- /dev/null
+++ b/pyplayready/elgamal.py
@@ -0,0 +1,36 @@
+from typing import Tuple
+
+from ecpy.curves import Curve, Point
+import secrets
+
+
+class ElGamal:
+ def __init__(self, curve: Curve):
+ self.curve = curve
+
+ @staticmethod
+ def to_bytes(n: int) -> bytes:
+ byte_len = (n.bit_length() + 7) // 8
+ if byte_len % 2 != 0:
+ byte_len += 1
+ return n.to_bytes(byte_len, 'big')
+
+ def encrypt(
+ self,
+ message_point: Point,
+ public_key: Point
+ ) -> Tuple[Point, Point]:
+ ephemeral_key = secrets.randbelow(self.curve.order)
+ point1 = ephemeral_key * self.curve.generator
+ point2 = message_point + (ephemeral_key * public_key)
+ return point1, point2
+
+ @staticmethod
+ def decrypt(
+ encrypted: Tuple[Point, Point],
+ private_key: int
+ ) -> Point:
+ point1, point2 = encrypted
+ shared_secret = private_key * point1
+ decrypted_message = point2 - shared_secret
+ return decrypted_message
diff --git a/pyplayready/key.py b/pyplayready/key.py
new file mode 100644
index 0000000..bc656e9
--- /dev/null
+++ b/pyplayready/key.py
@@ -0,0 +1,42 @@
+from enum import Enum
+from uuid import UUID
+
+
+class Key:
+ class KeyType(Enum):
+ Invalid = 0x0000
+ AES128CTR = 0x0001
+ RC4 = 0x0002
+ AES128ECB = 0x0003
+ Cocktail = 0x0004
+ UNKNOWN = 0xffff
+
+ @classmethod
+ def _missing_(cls, value):
+ return cls.UNKNOWN
+
+ class CipherType(Enum):
+ Invalid = 0x0000
+ RSA128 = 0x0001
+ ChainedLicense = 0x0002
+ ECC256 = 0x0003
+ ECCforScalableLicenses = 4
+ UNKNOWN = 0xffff
+
+ @classmethod
+ def _missing_(cls, value):
+ return cls.UNKNOWN
+
+ def __init__(
+ self,
+ key_id: UUID,
+ key_type: int,
+ cipher_type: int,
+ key_length: int,
+ key: bytes
+ ):
+ self.key_id = key_id
+ self.key_type = self.KeyType(key_type)
+ self.cipher_type = self.CipherType(cipher_type)
+ self.key_length = key_length
+ self.key = key
diff --git a/pyplayready/main.py b/pyplayready/main.py
new file mode 100644
index 0000000..70879f6
--- /dev/null
+++ b/pyplayready/main.py
@@ -0,0 +1,230 @@
+import logging
+from datetime import datetime
+from pathlib import Path
+from typing import Optional
+from zlib import crc32
+
+import click
+import requests
+from Crypto.Random import get_random_bytes
+
+from pyplayready import __version__
+from pyplayready.bcert import CertificateChain, Certificate
+from pyplayready.cdm import Cdm
+from pyplayready.device import Device
+from pyplayready.ecc_key import ECCKey
+from pyplayready.pssh import PSSH
+
+
+@click.group(invoke_without_command=True)
+@click.option("-v", "--version", is_flag=True, default=False, help="Print version information.")
+@click.option("-d", "--debug", is_flag=True, default=False, help="Enable DEBUG level logs.")
+def main(version: bool, debug: bool) -> None:
+ """Python PlayReady CDM implementation"""
+ logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)
+ log = logging.getLogger()
+
+ current_year = datetime.now().year
+ copyright_years = f"2024-{current_year}"
+
+ log.info("pyplayready version %s Copyright (c) %s DevLARLEY", __version__, copyright_years)
+ log.info("https://github.com/ready-dl/pyplayready")
+ log.info("Run 'pyplayready --help' for help")
+ if version:
+ return
+
+
+@main.command(name="license")
+@click.argument("device_path", type=Path)
+@click.argument("pssh", type=PSSH)
+@click.argument("server", type=str)
+def license_(device_path: Path, pssh: PSSH, server: str) -> None:
+ """
+ Make a License Request to a server using a given PSSH
+ Will return a list of all keys within the returned license
+
+ Only works for standard license servers that don't use any license wrapping
+ """
+ log = logging.getLogger("license")
+
+ device = Device.load(device_path)
+ log.info(f"Loaded Device: {device.get_name()}")
+
+ cdm = Cdm.from_device(device)
+ log.info("Loaded CDM")
+
+ challenge = cdm.get_license_challenge(pssh.wrm_headers[0])
+ log.info("Created License Request (Challenge)")
+ log.debug(challenge)
+
+ license_res = requests.post(
+ url=server,
+ headers={
+ 'Content-Type': 'text/xml; charset=UTF-8',
+ },
+ data=challenge
+ )
+
+ if license_res.status_code != 200:
+ log.error(f"Failed to send challenge: [{license_res.status_code}] {license_res.text}")
+ return
+
+ licence = license_res.text
+ log.info("Got License Message")
+ log.debug(licence)
+
+ cdm.parse_license(licence)
+ log.info("License Parsed Successfully")
+
+ for key in cdm.get_keys():
+ log.info(f"{key.key_id.hex}:{key.key.hex()}")
+
+
+@main.command()
+@click.argument("device", type=Path)
+@click.pass_context
+def test(ctx: click.Context, device: Path) -> None:
+ """
+ Test the CDM code by getting Content Keys for the Tears Of Steel demo on the Playready Test Server.
+ https://testweb.playready.microsoft.com/Content/Content2X
+ + DASH Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism/manifest.mpd
+ + MSS Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism.smoothstreaming/manifest
+
+ The device argument is a Path to a Playready Device (.prd) file which contains the device's group key and
+ group certificate.
+ """
+ pssh = PSSH(
+ "AAADfHBzc2gAAAAAmgTweZhAQoarkuZb4IhflQAAA1xcAwAAAQABAFIDPABXAFIATQBIAEUAQQBEAEUAUgAgAHgAbQBsAG4AcwA9ACIAaAB0AH"
+ "QAcAA6AC8ALwBzAGMAaABlAG0AYQBzAC4AbQBpAGMAcgBvAHMAbwBmAHQALgBjAG8AbQAvAEQAUgBNAC8AMgAwADAANwAvADAAMwAvAFAAbABh"
+ "AHkAUgBlAGEAZAB5AEgAZQBhAGQAZQByACIAIAB2AGUAcgBzAGkAbwBuAD0AIgA0AC4AMAAuADAALgAwACIAPgA8AEQAQQBUAEEAPgA8AFAAUg"
+ "BPAFQARQBDAFQASQBOAEYATwA+ADwASwBFAFkATABFAE4APgAxADYAPAAvAEsARQBZAEwARQBOAD4APABBAEwARwBJAEQAPgBBAEUAUwBDAFQA"
+ "UgA8AC8AQQBMAEcASQBEAD4APAAvAFAAUgBPAFQARQBDAFQASQBOAEYATwA+ADwASwBJAEQAPgA0AFIAcABsAGIAKwBUAGIATgBFAFMAOAB0AE"
+ "cAawBOAEYAVwBUAEUASABBAD0APQA8AC8ASwBJAEQAPgA8AEMASABFAEMASwBTAFUATQA+AEsATABqADMAUQB6AFEAUAAvAE4AQQA9ADwALwBD"
+ "AEgARQBDAEsAUwBVAE0APgA8AEwAQQBfAFUAUgBMAD4AaAB0AHQAcABzADoALwAvAHAAcgBvAGYAZgBpAGMAaQBhAGwAcwBpAHQAZQAuAGsAZQ"
+ "B5AGQAZQBsAGkAdgBlAHIAeQAuAG0AZQBkAGkAYQBzAGUAcgB2AGkAYwBlAHMALgB3AGkAbgBkAG8AdwBzAC4AbgBlAHQALwBQAGwAYQB5AFIA"
+ "ZQBhAGQAeQAvADwALwBMAEEAXwBVAFIATAA+ADwAQwBVAFMAVABPAE0AQQBUAFQAUgBJAEIAVQBUAEUAUwA+ADwASQBJAFMAXwBEAFIATQBfAF"
+ "YARQBSAFMASQBPAE4APgA4AC4AMQAuADIAMwAwADQALgAzADEAPAAvAEkASQBTAF8ARABSAE0AXwBWAEUAUgBTAEkATwBOAD4APAAvAEMAVQBT"
+ "AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
+ )
+
+ license_server = "https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)"
+
+ ctx.invoke(
+ license_,
+ device_path=device,
+ pssh=pssh,
+ server=license_server
+ )
+
+
+@main.command()
+@click.option("-k", "--group_key", type=Path, required=True, help="Device ECC private group key")
+@click.option("-c", "--group_certificate", type=Path, required=True, help="Device group certificate chain")
+@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
+@click.pass_context
+def create_device(
+ ctx: click.Context,
+ group_key: Path,
+ group_certificate: Path,
+ output: Optional[Path] = None
+) -> None:
+ """Create a Playready Device (.prd) file from an ECC private group key and group certificate chain"""
+ if not group_key.is_file():
+ raise click.UsageError("group_key: Not a path to a file, or it doesn't exist.", ctx)
+ if not group_certificate.is_file():
+ raise click.UsageError("group_certificate: Not a path to a file, or it doesn't exist.", ctx)
+
+ log = logging.getLogger("create-device")
+
+ encryption_key = ECCKey.generate()
+ signing_key = ECCKey.generate()
+
+ certificate_chain = CertificateChain.load(group_certificate)
+ group_key = ECCKey.load(group_key)
+
+ new_certificate = Certificate.new_key_cert(
+ cert_id=get_random_bytes(16),
+ security_level=certificate_chain.get_security_level(),
+ client_id=get_random_bytes(16),
+ signing_key=signing_key,
+ encryption_key=encryption_key,
+ group_key=group_key,
+ parent=certificate_chain
+ )
+ certificate_chain.prepend(new_certificate)
+
+ device = Device(
+ group_certificate=certificate_chain.dumps(),
+ encryption_key=encryption_key.dumps(),
+ signing_key=signing_key.dumps()
+ )
+
+ prd_bin = device.dumps()
+
+ if output and output.suffix:
+ if output.suffix.lower() != ".prd":
+ log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.")
+ out_path = output
+ else:
+ out_dir = output or Path.cwd()
+ out_path = out_dir / f"{device.get_name()}_{crc32(prd_bin).to_bytes(4, 'big').hex()}.prd"
+
+ if out_path.exists():
+ log.error(f"A file already exists at the path '{out_path}', cannot overwrite.")
+ return
+
+ out_path.parent.mkdir(parents=True, exist_ok=True)
+ out_path.write_bytes(prd_bin)
+
+ log.info("Created Playready Device (.prd) file, %s", out_path.name)
+ log.info(" + Security Level: %s", device.security_level)
+ log.info(" + Group Certificate: %s (%s bytes)", bool(device.group_certificate.dumps()), len(device.group_certificate.dumps()))
+ log.info(" + Encryption Key: %s (%s bytes)", bool(device.encryption_key.dumps()), len(device.encryption_key.dumps()))
+ log.info(" + Signing Key: %s (%s bytes)", bool(device.signing_key.dumps()), len(device.signing_key.dumps()))
+ log.info(" + Saved to: %s", out_path.absolute())
+
+
+@main.command()
+@click.argument("prd_path", type=Path)
+@click.option("-o", "--out_dir", type=Path, default=None, help="Output Directory")
+@click.pass_context
+def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] = None) -> None:
+ """
+ Export a Playready Device (.prd) file to a Group Certificate, Encryption Key and Signing Key
+ If an output directory is not specified, it will be stored in the current working directory
+ """
+ if not prd_path.is_file():
+ raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx)
+
+ log = logging.getLogger("export-device")
+ log.info("Exporting Playready Device (.prd) file, %s", prd_path.stem)
+
+ if not out_dir:
+ out_dir = Path.cwd()
+
+ out_path = out_dir / prd_path.stem
+ if out_path.exists():
+ if any(out_path.iterdir()):
+ log.error("Output directory is not empty, cannot overwrite.")
+ return
+ else:
+ log.warning("Output directory already exists, but is empty.")
+ else:
+ out_path.mkdir(parents=True)
+
+ device = Device.load(prd_path)
+
+ log.info(f"L{device.security_level} {device.get_name()}")
+ log.info(f"Saving to: {out_path}")
+
+ client_id_path = out_path / "bgroupcert.dat"
+ client_id_path.write_bytes(device.group_certificate.dumps())
+ log.info("Exported Group Certificate to bgroupcert.dat")
+
+ private_key_path = out_path / "zprivencr.dat"
+ private_key_path.write_bytes(device.encryption_key.dumps())
+ log.info("Exported Encryption Key as zprivencr.dat")
+
+ private_key_path = out_path / "zprivsig.dat"
+ private_key_path.write_bytes(device.signing_key.dumps())
+ log.info("Exported Signing Key as zprivsig.dat")
diff --git a/pyplayready/pssh.py b/pyplayready/pssh.py
new file mode 100644
index 0000000..1793978
--- /dev/null
+++ b/pyplayready/pssh.py
@@ -0,0 +1,87 @@
+import base64
+from typing import Union
+from uuid import UUID
+
+from construct import Struct, Int32ul, Int16ul, Array, this, Bytes, PaddedString, Switch, Int32ub, Const, Container
+
+
+class _PlayreadyPSSHStructs:
+ PSSHBox = Struct(
+ "length" / Int32ub,
+ "pssh" / Const(b"pssh"),
+ "fullbox" / Int32ub,
+ "system_id" / Bytes(16),
+ "data_length" / Int32ub,
+ "data" / Bytes(this.data_length)
+ )
+
+ PlayreadyObject = Struct(
+ "type" / Int16ul,
+ "length" / Int16ul,
+ "data" / Switch(
+ this.type,
+ {
+ 1: PaddedString(this.length, "utf16")
+ },
+ default=Bytes(this.length)
+ )
+ )
+
+ PlayreadyHeader = Struct(
+ "length" / Int32ul,
+ "record_count" / Int16ul,
+ "records" / Array(this.record_count, PlayreadyObject)
+ )
+
+
+class PSSH:
+ SYSTEM_ID = UUID(hex="9a04f07998404286ab92e65be0885f95")
+
+ def __init__(
+ self,
+ data: Union[str, bytes]
+ ):
+ """Represents a PlayReady PSSH"""
+ if not data:
+ raise ValueError("Data must not be empty")
+
+ if isinstance(data, str):
+ try:
+ data = base64.b64decode(data)
+ except Exception as e:
+ raise Exception(f"Could not decode data as Base64, {e}")
+
+ try:
+ if self._is_playready_pssh_box(data):
+ pssh_box = _PlayreadyPSSHStructs.PSSHBox.parse(data)
+ if bool(self._is_utf_16(pssh_box.data)):
+ self.wrm_headers = [pssh_box.data.decode("utf-16-le")]
+ elif bool(self._is_utf_16(pssh_box.data[6:])):
+ self.wrm_headers = [pssh_box.data[6:].decode("utf-16-le")]
+ elif bool(self._is_utf_16(pssh_box.data[10:])):
+ self.wrm_headers = [pssh_box.data[10:].decode("utf-16-le")]
+ else:
+ self.wrm_headers = list(self._get_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(pssh_box.data)))
+ elif bool(self._is_utf_16(data)):
+ self.wrm_headers = [data.decode("utf-16-le")]
+ elif bool(self._is_utf_16(data[6:])):
+ self.wrm_headers = [data[6:].decode("utf-16-le")]
+ elif bool(self._is_utf_16(data[10:])):
+ self.wrm_headers = [data[10:].decode("utf-16-le")]
+ else:
+ self.wrm_headers = list(self._get_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(data)))
+ except Exception:
+ raise Exception("Could not parse data as a PSSH Box nor a PlayReadyHeader")
+
+ def _is_playready_pssh_box(self, data: bytes) -> bool:
+ return data[12:28] == self.SYSTEM_ID.bytes
+
+ @staticmethod
+ def _is_utf_16(data: bytes) -> bool:
+ return all(map(lambda i: data[i] == 0, range(1, len(data), 2)))
+
+ @staticmethod
+ def _get_wrm_headers(wrm_header: Container):
+ for record in wrm_header.records:
+ if record.type == 1:
+ yield record.data
diff --git a/pyplayready/xml_key.py b/pyplayready/xml_key.py
new file mode 100644
index 0000000..25872ef
--- /dev/null
+++ b/pyplayready/xml_key.py
@@ -0,0 +1,18 @@
+from ecpy.curves import Point, Curve
+
+from pyplayready.ecc_key import ECCKey
+from pyplayready.elgamal import ElGamal
+
+
+class XmlKey:
+ def __init__(self):
+ self._shared_point = ECCKey.generate()
+ self.shared_key_x = self._shared_point.key.pointQ.x
+ self.shared_key_y = self._shared_point.key.pointQ.y
+
+ self._shared_key_x_bytes = ElGamal.to_bytes(int(self.shared_key_x))
+ self.aes_iv = self._shared_key_x_bytes[:16]
+ self.aes_key = self._shared_key_x_bytes[16:]
+
+ def get_point(self, curve: Curve) -> Point:
+ return Point(self.shared_key_x, self.shared_key_y, curve)
diff --git a/pyplayready/xmrlicense.py b/pyplayready/xmrlicense.py
new file mode 100644
index 0000000..fd54125
--- /dev/null
+++ b/pyplayready/xmrlicense.py
@@ -0,0 +1,251 @@
+from __future__ import annotations
+
+import base64
+from pathlib import Path
+from typing import Union
+
+from construct import Const, GreedyRange, Struct, Int32ub, Bytes, Int16ub, this, Switch, LazyBound, Array, Container
+
+
+class _XMRLicenseStructs:
+ PlayEnablerType = Struct(
+ "player_enabler_type" / Bytes(16)
+ )
+
+ DomainRestrictionObject = Struct(
+ "account_id" / Bytes(16),
+ "revision" / Int32ub
+ )
+
+ IssueDateObject = Struct(
+ "issue_date" / Int32ub
+ )
+
+ RevInfoVersionObject = Struct(
+ "sequence" / Int32ub
+ )
+
+ SecurityLevelObject = Struct(
+ "minimum_security_level" / Int16ub
+ )
+
+ EmbeddedLicenseSettingsObject = Struct(
+ "indicator" / Int16ub
+ )
+
+ ECCKeyObject = Struct(
+ "curve_type" / Int16ub,
+ "key_length" / Int16ub,
+ "key" / Bytes(this.key_length)
+ )
+
+ SignatureObject = Struct(
+ "signature_type" / Int16ub,
+ "signature_data_length" / Int16ub,
+ "signature_data" / Bytes(this.signature_data_length)
+ )
+
+ ContentKeyObject = Struct(
+ "key_id" / Bytes(16),
+ "key_type" / Int16ub,
+ "cipher_type" / Int16ub,
+ "key_length" / Int16ub,
+ "encrypted_key" / Bytes(this.key_length)
+ )
+
+ RightsSettingsObject = Struct(
+ "rights" / Int16ub
+ )
+
+ OutputProtectionLevelRestrictionObject = Struct(
+ "minimum_compressed_digital_video_opl" / Int16ub,
+ "minimum_uncompressed_digital_video_opl" / Int16ub,
+ "minimum_analog_video_opl" / Int16ub,
+ "minimum_digital_compressed_audio_opl" / Int16ub,
+ "minimum_digital_uncompressed_audio_opl" / Int16ub,
+ )
+
+ ExpirationRestrictionObject = Struct(
+ "begin_date" / Int32ub,
+ "end_date" / Int32ub
+ )
+
+ RemovalDateObject = Struct(
+ "removal_date" / Int32ub
+ )
+
+ UplinkKIDObject = Struct(
+ "uplink_kid" / Bytes(16),
+ "chained_checksum_type" / Int16ub,
+ "chained_checksum_length" / Int16ub,
+ "chained_checksum" / Bytes(this.chained_checksum_length)
+ )
+
+ AnalogVideoOutputConfigurationRestriction = Struct(
+ "video_output_protection_id" / Bytes(16),
+ "binary_configuration_data" / Bytes(this._.length - 24)
+ )
+
+ DigitalVideoOutputRestrictionObject = Struct(
+ "video_output_protection_id" / Bytes(16),
+ "binary_configuration_data" / Bytes(this._.length - 24)
+ )
+
+ DigitalAudioOutputRestrictionObject = Struct(
+ "audio_output_protection_id" / Bytes(16),
+ "binary_configuration_data" / Bytes(this._.length - 24)
+ )
+
+ PolicyMetadataObject = Struct(
+ "metadata_type" / Bytes(16),
+ "policy_data" / Bytes(this._.length)
+ )
+
+ SecureStopRestrictionObject = Struct(
+ "metering_id" / Bytes(16)
+ )
+
+ MeteringRestrictionObject = Struct(
+ "metering_id" / Bytes(16)
+ )
+
+ ExpirationAfterFirstPlayRestrictionObject = Struct(
+ "seconds" / Int32ub
+ )
+
+ GracePeriodObject = Struct(
+ "grace_period" / Int32ub
+ )
+
+ SourceIdObject = Struct(
+ "source_id" / Int32ub
+ )
+
+ AuxiliaryKey = Struct(
+ "location" / Int32ub,
+ "key" / Bytes(16)
+ )
+
+ AuxiliaryKeysObject = Struct(
+ "count" / Int16ub,
+ "auxiliary_keys" / Array(this.count, AuxiliaryKey)
+ )
+
+ UplinkKeyObject3 = Struct(
+ "uplink_key_id" / Bytes(16),
+ "chained_length" / Int16ub,
+ "checksum" / Bytes(this.chained_length),
+ "count" / Int16ub,
+ "entries" / Array(this.count, Int32ub)
+ )
+
+ CopyEnablerObject = Struct(
+ "copy_enabler_type" / Bytes(16)
+ )
+
+ CopyCountRestrictionObject = Struct(
+ "count" / Int32ub
+ )
+
+ MoveObject = Struct(
+ "minimum_move_protection_level" / Int32ub
+ )
+
+ XMRObject = Struct(
+ "flags" / Int16ub,
+ "type" / Int16ub,
+ "length" / Int32ub,
+ "data" / Switch(
+ lambda this_: this_.type,
+ {
+ 0x0005: OutputProtectionLevelRestrictionObject,
+ 0x0008: AnalogVideoOutputConfigurationRestriction,
+ 0x000a: ContentKeyObject,
+ 0x000b: SignatureObject,
+ 0x000d: RightsSettingsObject,
+ 0x0012: ExpirationRestrictionObject,
+ 0x0013: IssueDateObject,
+ 0x0016: MeteringRestrictionObject,
+ 0x001a: GracePeriodObject,
+ 0x0022: SourceIdObject,
+ 0x002a: ECCKeyObject,
+ 0x002c: PolicyMetadataObject,
+ 0x0029: DomainRestrictionObject,
+ 0x0030: ExpirationAfterFirstPlayRestrictionObject,
+ 0x0031: DigitalAudioOutputRestrictionObject,
+ 0x0032: RevInfoVersionObject,
+ 0x0033: EmbeddedLicenseSettingsObject,
+ 0x0034: SecurityLevelObject,
+ 0x0037: MoveObject,
+ 0x0039: PlayEnablerType,
+ 0x003a: CopyEnablerObject,
+ 0x003b: UplinkKIDObject,
+ 0x003d: CopyCountRestrictionObject,
+ 0x0050: RemovalDateObject,
+ 0x0051: AuxiliaryKeysObject,
+ 0x0052: UplinkKeyObject3,
+ 0x005a: SecureStopRestrictionObject,
+ 0x0059: DigitalVideoOutputRestrictionObject
+ },
+ default=LazyBound(lambda: _XMRLicenseStructs.XMRObject)
+ )
+ )
+
+ XmrLicense = Struct(
+ "signature" / Const(b"XMR\x00"),
+ "xmr_version" / Int32ub,
+ "rights_id" / Bytes(16),
+ "containers" / GreedyRange(XMRObject)
+ )
+
+
+class XMRLicense(_XMRLicenseStructs):
+ def __init__(
+ self,
+ parsed_license: Container,
+ license_obj: _XMRLicenseStructs.XmrLicense = _XMRLicenseStructs.XmrLicense
+ ):
+ self.parsed = parsed_license
+ self._LICENSE = license_obj
+
+ @classmethod
+ def loads(cls, data: Union[str, bytes]) -> XMRLicense:
+ if isinstance(data, str):
+ data = base64.b64decode(data)
+ if not isinstance(data, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
+
+ licence = _XMRLicenseStructs.XmrLicense
+ return cls(
+ parsed_license=licence.parse(data),
+ license_obj=licence
+ )
+
+ @classmethod
+ def load(cls, path: Union[Path, str]) -> XMRLicense:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ with Path(path).open(mode="rb") as f:
+ return cls.loads(f.read())
+
+ def dumps(self) -> bytes:
+ return self._LICENSE.build(self.parsed)
+
+ def struct(self) -> _XMRLicenseStructs.XmrLicense:
+ return self._LICENSE
+
+ def _locate(self, container: Container):
+ if container.flags == 2 or container.flags == 3:
+ return self._locate(container.data)
+ else:
+ return container
+
+ def get_object(self, type_: int):
+ for obj in self.parsed.containers:
+ container = self._locate(obj)
+ if container.type == type_:
+ yield container.data
+
+ def get_content_keys(self):
+ for content_key in self.get_object(10):
+ yield content_key
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..68b670d
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,41 @@
+[build-system]
+requires = ["poetry-core>=1.0.0"]
+build-backend = "poetry.core.masonry.api"
+
+[tool.poetry]
+name = "pyplayready"
+version = "0.1.6"
+description = "pyplayready CDM (Content Decryption Module) implementation in Python."
+license = "GPL-3.0-only"
+authors = ["DevLARLEY"]
+readme = "README.md"
+repository = "https://github.com/ready-dl/pyplayready"
+keywords = ["python", "drm", "playready", "microsoft"]
+classifiers = [
+ "Development Status :: 5 - Production/Stable",
+ "Intended Audience :: Developers",
+ "Intended Audience :: End Users/Desktop",
+ "Natural Language :: English",
+ "Operating System :: OS Independent",
+ "Topic :: Multimedia :: Video",
+ "Topic :: Security :: Cryptography",
+ "Topic :: Software Development :: Libraries :: Python Modules"
+]
+include = [
+ { path = "README.md", format = "sdist" },
+ { path = "LICENSE", format = "sdist" },
+]
+
+[tool.poetry.urls]
+"Issues" = "https://github.com/ready-dl/pyplayready/issues"
+
+[tool.poetry.dependencies]
+python = ">=3.8,<4.0"
+requests = "^2.32.3"
+pycryptodome = "^3.21.0"
+construct = "^2.10.70"
+ECPy = "^1.2.5"
+click = "^8.1.7"
+
+[tool.poetry.scripts]
+pyplayready = "pyplayready.main:main"
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..b6d8bb8
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,5 @@
+requests
+pycryptodome
+ecpy
+construct
+click
\ No newline at end of file