diff --git a/README.md b/README.md
index eeecadb..5b8cf6d 100644
--- a/README.md
+++ b/README.md
@@ -24,6 +24,7 @@ pyplayready test DEVICE.prd
## Usage
An example code snippet:
+
```python
from pyplayready.cdm import Cdm
from pyplayready.device import Device
@@ -48,7 +49,9 @@ pssh = PSSH(
"AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
)
-request = cdm.get_license_challenge(pssh.wrm_headers[0])
+# set to `True` if your device doesn't support scalable licenses (this projects also doesn't yet) to downgrade the WRMHEADERs to v4.0.0.0
+wrm_headers = pssh.get_wrm_headers(downgrade_to_v4=False)
+request = cdm.get_license_challenge(wrm_headers[0])
response = requests.post(
url="https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)",
diff --git a/pyplayready/__init__.py b/pyplayready/__init__.py
index d37927b..ea3b145 100644
--- a/pyplayready/__init__.py
+++ b/pyplayready/__init__.py
@@ -8,4 +8,4 @@ from .pssh import *
from .xml_key import *
from .xmrlicense import *
-__version__ = "0.0.2"
+__version__ = "0.1.0"
diff --git a/pyplayready/cdm.py b/pyplayready/cdm.py
index 78bf9c4..ff6cbb9 100644
--- a/pyplayready/cdm.py
+++ b/pyplayready/cdm.py
@@ -30,14 +30,14 @@ class Cdm:
encryption_key: ECCKey,
signing_key: ECCKey,
client_version: str = "10.0.16384.10011",
- la_version: int = 1
+ protocol_version: int = 1
):
self.security_level = security_level
self.certificate_chain = certificate_chain
self.encryption_key = encryption_key
self.signing_key = signing_key
self.client_version = client_version
- self.la_version = la_version
+ self.protocol_version = protocol_version
self.curve = Curve.get_curve("secp256r1")
self.elgamal = ElGamal(self.curve)
@@ -61,14 +61,14 @@ class Cdm:
signing_key=device.signing_key
)
- def get_key_data(self):
+ def get_key_data(self) -> bytes:
point1, point2 = self.elgamal.encrypt(
message_point=self._xml_key.get_point(self.elgamal.curve),
public_key=self._wmrm_key
)
return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y)
- def get_cipher_data(self):
+ def get_cipher_data(self) -> bytes:
b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
body = f"{b64_chain}"
@@ -94,7 +94,7 @@ class Cdm:
) -> str:
return (
''
- f'{self.la_version}'
+ f'{self.protocol_version}'
f'{content_header}'
''
f'{self.client_version}'
diff --git a/pyplayready/main.py b/pyplayready/main.py
index 70879f6..4af1f6a 100644
--- a/pyplayready/main.py
+++ b/pyplayready/main.py
@@ -2,7 +2,6 @@ import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
-from zlib import crc32
import click
import requests
@@ -53,7 +52,7 @@ def license_(device_path: Path, pssh: PSSH, server: str) -> None:
cdm = Cdm.from_device(device)
log.info("Loaded CDM")
- challenge = cdm.get_license_challenge(pssh.wrm_headers[0])
+ challenge = cdm.get_license_challenge(pssh.get_wrm_headers(downgrade_to_v4=True)[0])
log.info("Created License Request (Challenge)")
log.debug(challenge)
@@ -167,7 +166,7 @@ def create_device(
out_path = output
else:
out_dir = output or Path.cwd()
- out_path = out_dir / f"{device.get_name()}_{crc32(prd_bin).to_bytes(4, 'big').hex()}.prd"
+ out_path = out_dir / f"{device.get_name()}.prd"
if out_path.exists():
log.error(f"A file already exists at the path '{out_path}', cannot overwrite.")
diff --git a/pyplayready/pssh.py b/pyplayready/pssh.py
index 1793978..ec3e919 100644
--- a/pyplayready/pssh.py
+++ b/pyplayready/pssh.py
@@ -4,6 +4,8 @@ from uuid import UUID
from construct import Struct, Int32ul, Int16ul, Array, this, Bytes, PaddedString, Switch, Int32ub, Const, Container
+from pyplayready.wrmheader import WRMHeader
+
class _PlayreadyPSSHStructs:
PSSHBox = Struct(
@@ -55,24 +57,34 @@ class PSSH:
if self._is_playready_pssh_box(data):
pssh_box = _PlayreadyPSSHStructs.PSSHBox.parse(data)
if bool(self._is_utf_16(pssh_box.data)):
- self.wrm_headers = [pssh_box.data.decode("utf-16-le")]
+ self._wrm_headers = [pssh_box.data.decode("utf-16-le")]
elif bool(self._is_utf_16(pssh_box.data[6:])):
- self.wrm_headers = [pssh_box.data[6:].decode("utf-16-le")]
+ self._wrm_headers = [pssh_box.data[6:].decode("utf-16-le")]
elif bool(self._is_utf_16(pssh_box.data[10:])):
- self.wrm_headers = [pssh_box.data[10:].decode("utf-16-le")]
+ self._wrm_headers = [pssh_box.data[10:].decode("utf-16-le")]
else:
- self.wrm_headers = list(self._get_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(pssh_box.data)))
+ self._wrm_headers = list(self._read_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(pssh_box.data)))
elif bool(self._is_utf_16(data)):
- self.wrm_headers = [data.decode("utf-16-le")]
+ self._wrm_headers = [data.decode("utf-16-le")]
elif bool(self._is_utf_16(data[6:])):
- self.wrm_headers = [data[6:].decode("utf-16-le")]
+ self._wrm_headers = [data[6:].decode("utf-16-le")]
elif bool(self._is_utf_16(data[10:])):
- self.wrm_headers = [data[10:].decode("utf-16-le")]
+ self._wrm_headers = [data[10:].decode("utf-16-le")]
else:
- self.wrm_headers = list(self._get_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(data)))
+ self._wrm_headers = list(self._read_wrm_headers(_PlayreadyPSSHStructs.PlayreadyHeader.parse(data)))
except Exception:
raise Exception("Could not parse data as a PSSH Box nor a PlayReadyHeader")
+ @staticmethod
+ def _downgrade(wrm_header: str) -> str:
+ return WRMHeader(wrm_header).to_v4_0_0_0()
+
+ def get_wrm_headers(self, downgrade_to_v4: bool = False):
+ return list(map(
+ self._downgrade if downgrade_to_v4 else (lambda _: _),
+ self._wrm_headers
+ ))
+
def _is_playready_pssh_box(self, data: bytes) -> bool:
return data[12:28] == self.SYSTEM_ID.bytes
@@ -81,7 +93,7 @@ class PSSH:
return all(map(lambda i: data[i] == 0, range(1, len(data), 2)))
@staticmethod
- def _get_wrm_headers(wrm_header: Container):
+ def _read_wrm_headers(wrm_header: Container):
for record in wrm_header.records:
if record.type == 1:
yield record.data
diff --git a/pyplayready/wrmheader.py b/pyplayready/wrmheader.py
new file mode 100644
index 0000000..7a6a325
--- /dev/null
+++ b/pyplayready/wrmheader.py
@@ -0,0 +1,188 @@
+import base64
+from enum import Enum
+from typing import Optional, List, Union, Tuple
+
+import xmltodict
+
+
+class WRMHeader:
+ class SignedKeyID:
+ def __init__(
+ self,
+ alg_id: str,
+ value: str,
+ checksum: str
+ ):
+ self.alg_id = alg_id
+ self.value = value
+ self.checksum = checksum
+
+ class Version(Enum):
+ VERSION_4_0_0_0 = "4.0.0.0"
+ VERSION_4_1_0_0 = "4.1.0.0"
+ VERSION_4_2_0_0 = "4.2.0.0"
+ VERSION_4_3_0_0 = "4.3.0.0"
+ UNKNOWN = "UNKNOWN"
+
+ @classmethod
+ def _missing_(cls, value):
+ return cls.UNKNOWN
+
+ _RETURN_STRUCTURE = Tuple[List[SignedKeyID], Union[str, None], Union[str, None], Union[str, None]]
+
+ def __init__(
+ self,
+ data: Union[str, bytes]
+ ):
+ """Represents a PlayReady WRM Header"""
+ if not data:
+ raise ValueError("Data must not be empty")
+
+ if isinstance(data, str):
+ try:
+ data = base64.b64decode(data).decode()
+ except Exception:
+ data = data.encode()
+
+ self._raw_data: bytes = data
+ self._parsed = xmltodict.parse(self._raw_data)
+
+ self._header = self._parsed.get('WRMHEADER')
+ if not self._header:
+ raise ValueError("Data is not a valid WRMHEADER")
+
+ self.version = self.Version(self._header.get('@version'))
+
+ @staticmethod
+ def _ensure_list(element: Union[dict, list]) -> List:
+ if isinstance(element, dict):
+ return [element]
+ return element
+
+ def to_v4_0_0_0(self) -> str:
+ """Will ignore any remaining Key IDs if there's more than just one"""
+ return self._build_v4_0_0_0_wrm_header(*self.read_attributes())
+
+ @staticmethod
+ def _read_v4_0_0_0(data: dict) -> _RETURN_STRUCTURE:
+ protect_info = data.get("PROTECTINFO")
+
+ return (
+ [WRMHeader.SignedKeyID(
+ alg_id=protect_info["ALGID"],
+ value=data["KID"],
+ checksum=data.get("CHECKSUM")
+ )],
+ data.get("LA_URL"),
+ data.get("LUI_URL"),
+ data.get("DS_ID")
+ )
+
+ @staticmethod
+ def _read_v4_1_0_0(data: dict) -> _RETURN_STRUCTURE:
+ protect_info = data.get("PROTECTINFO")
+
+ key_ids = []
+ if protect_info:
+ kid = protect_info["KID"]
+ if kid:
+ key_ids = [WRMHeader.SignedKeyID(
+ alg_id=kid["@ALGID"],
+ value=kid["@VALUE"],
+ checksum=kid.get("@CHECKSUM")
+ )]
+
+ return (
+ key_ids,
+ data.get("LA_URL"),
+ data.get("LUI_URL"),
+ data.get("DS_ID")
+ )
+
+ @staticmethod
+ def _read_v4_2_0_0(data: dict) -> _RETURN_STRUCTURE:
+ protect_info = data.get("PROTECTINFO")
+
+ key_ids = []
+ if protect_info:
+ kids = protect_info["KIDS"]
+ if kids:
+ for kid in WRMHeader._ensure_list(kids["KID"]):
+ key_ids.append(WRMHeader.SignedKeyID(
+ alg_id=kid["@ALGID"],
+ value=kid["@VALUE"],
+ checksum=kid.get("@CHECKSUM")
+ ))
+
+ return (
+ key_ids,
+ data.get("LA_URL"),
+ data.get("LUI_URL"),
+ data.get("DS_ID")
+ )
+
+ @staticmethod
+ def _read_v4_3_0_0(data: dict) -> _RETURN_STRUCTURE:
+ protect_info = data.get("PROTECTINFO")
+
+ key_ids = []
+ if protect_info:
+ kids = protect_info["KIDS"]
+ for kid in WRMHeader._ensure_list(kids["KID"]):
+ key_ids.append(WRMHeader.SignedKeyID(
+ alg_id=kid.get("@ALGID"),
+ value=kid["@VALUE"],
+ checksum=kid.get("@CHECKSUM")
+ ))
+
+ return (
+ key_ids,
+ data.get("LA_URL"),
+ data.get("LUI_URL"),
+ data.get("DS_ID")
+ )
+
+ def read_attributes(self) -> _RETURN_STRUCTURE:
+ data = self._header.get("DATA")
+ if not data:
+ raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required")
+
+ match self.version:
+ case self.Version.VERSION_4_0_0_0:
+ return self._read_v4_0_0_0(data)
+ case self.Version.VERSION_4_1_0_0:
+ return self._read_v4_1_0_0(data)
+ case self.Version.VERSION_4_2_0_0:
+ return self._read_v4_2_0_0(data)
+ case self.Version.VERSION_4_3_0_0:
+ return self._read_v4_3_0_0(data)
+
+ @staticmethod
+ def _build_v4_0_0_0_wrm_header(
+ key_ids: List[SignedKeyID],
+ la_url: Optional[str],
+ lui_url: Optional[str],
+ ds_id: Optional[str]
+ ) -> str:
+ if len(key_ids) == 0:
+ raise Exception("No Key IDs available")
+
+ key_id = key_ids[0]
+ return (
+ ''
+ ''
+ ''
+ '16'
+ 'AESCTR'
+ ''
+ f'{key_id.value}' +
+ (f'{la_url}' if la_url else '') +
+ (f'{lui_url}' if lui_url else '') +
+ (f'{ds_id}' if ds_id else '') +
+ (f'{key_id.checksum}' if key_id.checksum else '') +
+ ''
+ ''
+ )
+
+ def dumps(self) -> str:
+ return self._raw_data.decode()
diff --git a/pyproject.toml b/pyproject.toml
index 6a11328..43f17a5 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "pyplayready"
-version = "0.0.2"
+version = "0.1.0"
description = "pyplayready CDM (Content Decryption Module) implementation in Python."
license = "GPL-3.0-only"
authors = ["DevLARLEY"]
@@ -36,6 +36,7 @@ pycryptodome = "^3.21.0"
construct = "^2.10.70"
ECPy = "^1.2.5"
click = "^8.1.7"
+xmltodict = "^0.14.2"
[tool.poetry.scripts]
pyplayready = "pyplayready.main:main"
diff --git a/requirements.txt b/requirements.txt
index b6d8bb8..308c138 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,4 +2,5 @@ requests
pycryptodome
ecpy
construct
-click
\ No newline at end of file
+click
+xmltodict
\ No newline at end of file