Session ID and RemoteCDM implementation

This commit is contained in:
Erevoc 2024-11-14 22:15:19 +02:00
parent 70e47800df
commit 8a4be776eb
11 changed files with 629 additions and 24 deletions

View file

@ -21,8 +21,14 @@ from pyplayready.xml_key import XmlKey
from pyplayready.elgamal import ElGamal
from pyplayready.xmrlicense import XMRLicense
from pyplayready.exceptions import (InvalidSession, TooManySessions)
from pyplayready.session import Session
class Cdm:
MAX_NUM_OF_SESSIONS = 16
def __init__(
self,
security_level: int,
@ -47,9 +53,8 @@ class Cdm:
y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562,
curve=self.curve
)
self._xml_key = XmlKey()
self._keys: List[Key] = []
self.__sessions: dict[bytes, Session] = {}
@classmethod
def from_device(cls, device) -> Cdm:
@ -61,21 +66,52 @@ class Cdm:
signing_key=device.signing_key
)
def get_key_data(self) -> bytes:
def open(self) -> bytes:
"""
Open a Playready Content Decryption Module (CDM) session.
Raises:
TooManySessions: If the session cannot be opened as limit has been reached.
"""
if len(self.__sessions) > self.MAX_NUM_OF_SESSIONS:
raise TooManySessions(f"Too many Sessions open ({self.MAX_NUM_OF_SESSIONS}).")
session = Session(len(self.__sessions) + 1)
self.__sessions[session.id] = session
session._xml_key = XmlKey()
return session.id
def close(self, session_id: bytes) -> None:
"""
Close a Playready Content Decryption Module (CDM) session.
Parameters:
session_id: Session identifier.
Raises:
InvalidSession: If the Session identifier is invalid.
"""
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
del self.__sessions[session_id]
def get_key_data(self, session: Session) -> bytes:
point1, point2 = self.elgamal.encrypt(
message_point=self._xml_key.get_point(self.elgamal.curve),
message_point= session._xml_key.get_point(self.elgamal.curve),
public_key=self._wmrm_key
)
return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y)
def get_cipher_data(self) -> bytes:
def get_cipher_data(self, session: Session) -> bytes:
b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
body = f"<Data><CertificateChains><CertificateChain>{b64_chain}</CertificateChain></CertificateChains></Data>"
cipher = AES.new(
key=self._xml_key.aes_key,
key=session._xml_key.aes_key,
mode=AES.MODE_CBC,
iv=self._xml_key.aes_iv
iv=session._xml_key.aes_iv
)
ciphertext = cipher.encrypt(pad(
@ -83,7 +119,7 @@ class Cdm:
AES.block_size
))
return self._xml_key.aes_iv + ciphertext
return session._xml_key.aes_iv + ciphertext
def _build_digest_content(
self,
@ -134,12 +170,19 @@ class Cdm:
'</SignedInfo>'
)
def get_license_challenge(self, content_header: str) -> str:
def get_license_challenge(self, session_id: bytes, content_header: str) -> str:
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
session.signing_key = self.signing_key
session.encryption_key = self.encryption_key
la_content = self._build_digest_content(
content_header=content_header,
nonce=base64.b64encode(get_random_bytes(16)).decode(),
wmrm_cipher=base64.b64encode(self.get_key_data()).decode(),
cert_cipher=base64.b64encode(self.get_cipher_data()).decode()
wmrm_cipher=base64.b64encode(self.get_key_data(session)).decode(),
cert_cipher=base64.b64encode(self.get_cipher_data(session)).decode()
)
la_hash_obj = SHA256.new()
@ -149,7 +192,7 @@ class Cdm:
signed_info = self._build_signed_info(base64.b64encode(la_hash).decode())
signed_info_digest = SHA256.new(signed_info.encode())
signer = DSS.new(self.signing_key.key, 'fips-186-3')
signer = DSS.new(session.signing_key.key, 'fips-186-3')
signature = signer.sign(signed_info_digest)
# haven't found a better way to do this. xmltodict.unparse doesn't work
@ -167,7 +210,7 @@ class Cdm:
'<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
'<KeyValue>'
'<ECCKeyValue>'
f'<PublicKey>{base64.b64encode(self.signing_key.public_bytes()).decode()}</PublicKey>'
f'<PublicKey>{base64.b64encode(session.signing_key.public_bytes()).decode()}</PublicKey>'
'</ECCKeyValue>'
'</KeyValue>'
'</KeyInfo>'
@ -181,7 +224,7 @@ class Cdm:
return main_body
def _decrypt_ecc256_key(self, encrypted_key: bytes) -> bytes:
def _decrypt_ecc256_key(self, session: Session, encrypted_key: bytes) -> bytes:
point1 = Point(
x=int.from_bytes(encrypted_key[:32], 'big'),
y=int.from_bytes(encrypted_key[32:64], 'big'),
@ -193,10 +236,14 @@ class Cdm:
curve=self.curve
)
decrypted = self.elgamal.decrypt((point1, point2), int(self.encryption_key.key.d))
decrypted = self.elgamal.decrypt((point1, point2), int(session.encryption_key.key.d))
return self.elgamal.to_bytes(decrypted.x)[16:32]
def parse_license(self, licence: str) -> None:
def parse_license(self, session_id: bytes, licence: str) -> None:
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
try:
root = ET.fromstring(licence)
license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License")
@ -204,15 +251,19 @@ class Cdm:
parsed_licence = XMRLicense.loads(license_element.text)
for key in parsed_licence.get_content_keys():
if Key.CipherType(key.cipher_type) == Key.CipherType.ECC256:
self._keys.append(Key(
session.keys.append(Key(
key_id=UUID(bytes_le=key.key_id),
key_type=key.key_type,
cipher_type=key.cipher_type,
key_length=key.key_length,
key=self._decrypt_ecc256_key(key.encrypted_key)
key=self._decrypt_ecc256_key(session, key.encrypted_key)
))
except Exception as e:
raise Exception(f"Unable to parse license, {e}")
def get_keys(self) -> List[Key]:
return self._keys
def get_keys(self, session_id: bytes) -> List[Key]:
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
return session.keys