blob: 41cdfc8f59cfd806f269a4e58f762d13c2d2076c [file] [log] [blame]
Harald Welte5bbb1442023-12-11 12:46:47 +01001#!/usr/bin/env python3
2
3# Early proof-of-concept towards a SM-DP+ HTTP service for GSMA consumer eSIM RSP
4#
5# (C) 2023-2024 by Harald Welte <laforge@osmocom.org>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU Affero General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU Affero General Public License for more details.
16#
17# You should have received a copy of the GNU Affero General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20import json
21import sys
22import argparse
23import uuid
24import os
25import functools
26from typing import Optional, Dict, List
27from pprint import pprint as pp
28
29import base64
30from base64 import b64decode
31from klein import Klein
32from twisted.web.iweb import IRequest
33import asn1tools
34
35from pySim.utils import h2b, b2h, swap_nibbles
36
37import pySim.esim.rsp as rsp
38from pySim.esim.es8p import *
39
40# HACK: make this configurable
41DATA_DIR = './smdpp-data'
42HOSTNAME = 'testsmdpplus1.example.com' # must match certificates!
43
44
45def b64encode2str(req: bytes) -> str:
46 """Encode given input bytes as base64 and return result as string."""
47 return base64.b64encode(req).decode('ascii')
48
49def set_headers(request: IRequest):
50 """Set the request headers as mandatory by GSMA eSIM RSP."""
51 request.setHeader('Content-Type', 'application/json;charset=UTF-8')
52 request.setHeader('X-Admin-Protocol', 'gsma/rsp/v2.1.0')
53
54def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict:
55 r = {'subjectCode': subject_code, 'reasonCode': reason_code }
56 if subject_id:
57 r['subjectIdentifier'] = subject_id
58 if message:
59 r['message'] = message
60 return r
61
62def build_resp_header(js: dict, status: str = 'Executed-Success', status_code_data = None) -> None:
63 # SGP.22 v3.0 6.5.1.4
64 js['header'] = {
65 'functionExecutionStatus': {
66 'status': status,
67 }
68 }
69 if status_code_data:
70 js['header']['functionExecutionStatus']['statusCodeData'] = status_code_data
71
72from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
73from cryptography.hazmat.primitives.serialization import load_pem_private_key, Encoding, PublicFormat, PrivateFormat, NoEncryption
74from cryptography.hazmat.primitives.asymmetric import ec
75from cryptography.hazmat.primitives import hashes
76from cryptography.exceptions import InvalidSignature
77from cryptography import x509
78
79
80def ecdsa_dss_to_tr03111(sig: bytes) -> bytes:
81 """convert from DER format to BSI TR-03111; first get long integers; then convert those to bytes."""
82 r, s = decode_dss_signature(sig)
83 return r.to_bytes(32, 'big') + s.to_bytes(32, 'big')
84
85def ecdsa_tr03111_to_dss(sig: bytes) -> bytes:
86 """convert an ECDSA signature from BSI TR-03111 format to DER: first get long integers; then encode those."""
87 assert len(sig) == 64
88 r = int.from_bytes(sig[0:32], 'big')
89 s = int.from_bytes(sig[32:32*2], 'big')
90 return encode_dss_signature(r, s)
91
92
93class CertAndPrivkey:
94 """A pair of certificate and private key, as used for ECDSA signing."""
95 def __init__(self, required_policy_oid: Optional[x509.ObjectIdentifier] = None,
96 cert: Optional[x509.Certificate] = None, priv_key = None):
97 self.required_policy_oid = required_policy_oid
98 self.cert = cert
99 self.priv_key = priv_key
100
101 def cert_from_der_file(self, path: str):
102 with open(path, 'rb') as f:
103 cert = x509.load_der_x509_certificate(f.read())
104 if self.required_policy_oid:
105 # verify it is the right type of certificate (id-rspRole-dp-auth, id-rspRole-dp-auth-v2, etc.)
106 assert cert_policy_has_oid(cert, self.required_policy_oid)
107 self.cert = cert
108
109 def privkey_from_pem_file(self, path: str, password: Optional[str] = None):
110 with open(path, 'rb') as f:
111 self.priv_key = load_pem_private_key(f.read(), password)
112
113 def ecdsa_sign(self, plaintext: bytes) -> bytes:
114 """Sign some input-data using an ECDSA signature compliant with SGP.22,
115 which internally refers to Global Platform 2.2 Annex E, which in turn points
116 to BSI TS-03111 which states "concatengated raw R + S values". """
117 sig = self.priv_key.sign(plaintext, ec.ECDSA(hashes.SHA256()))
118 # convert from DER format to BSI TR-03111; first get long integers; then convert those to bytes
119 return ecdsa_dss_to_tr03111(sig)
120
121 def get_authority_key_identifier(self) -> x509.AuthorityKeyIdentifier:
122 """Return the AuthorityKeyIdentifier X.509 extension of the certificate."""
123 return list(filter(lambda x: isinstance(x.value, x509.AuthorityKeyIdentifier), self.cert.extensions))[0].value
124
125 def get_subject_alt_name(self) -> x509.SubjectAlternativeName:
126 """Return the SubjectAlternativeName X.509 extension of the certificate."""
127 return list(filter(lambda x: isinstance(x.value, x509.SubjectAlternativeName), self.cert.extensions))[0].value
128
129 def get_cert_as_der(self) -> bytes:
130 """Return certificate encoded as DER."""
131 return self.cert.public_bytes(Encoding.DER)
132
133 def get_curve(self) -> ec.EllipticCurve:
134 return self.cert.public_key().public_numbers().curve
135
136
137
138
139class ApiError(Exception):
140 def __init__(self, subject_code: str, reason_code: str, message: Optional[str] = None,
141 subject_id: Optional[str] = None):
142 self.status_code = build_status_code(subject_code, reason_code, subject_id, message)
143
144 def encode(self) -> str:
145 """Encode the API Error into a responseHeader string."""
146 js = {}
147 build_resp_header(js, 'Failed', self.status_code)
148 return json.dumps(js)
149
150def cert_policy_has_oid(cert: x509.Certificate, match_oid: x509.ObjectIdentifier) -> bool:
151 """Determine if given certificate has a certificatePolicy extension of matching OID."""
152 for policy_ext in filter(lambda x: isinstance(x.value, x509.CertificatePolicies), cert.extensions):
153 if any(policy.policy_identifier == match_oid for policy in policy_ext.value._policies):
154 return True
155 return False
156
157ID_RSP = "2.23.146.1"
158ID_RSP_CERT_OBJECTS = '.'.join([ID_RSP, '2'])
159ID_RSP_ROLE = '.'.join([ID_RSP_CERT_OBJECTS, '1'])
160
161class oid:
162 id_rspRole_ci = x509.ObjectIdentifier(ID_RSP_ROLE + '.0')
163 id_rspRole_euicc_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.1')
164 id_rspRole_eum_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.2')
165 id_rspRole_dp_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.3')
166 id_rspRole_dp_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.4')
167 id_rspRole_dp_pb_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.5')
168 id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6')
169 id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7')
170
171class SmDppHttpServer:
172 app = Klein()
173
174 @staticmethod
175 def load_certs_from_path(path: str) -> List[x509.Certificate]:
176 """Load all DER + PEM files from given directory path and return them as list of x509.Certificate
177 instances."""
178 certs = []
179 for dirpath, dirnames, filenames in os.walk(path):
180 for filename in filenames:
181 cert = None
182 if filename.endswith('.der'):
183 with open(os.path.join(dirpath, filename), 'rb') as f:
184 cert = x509.load_der_x509_certificate(f.read())
185 elif filename.endswith('.pem'):
186 with open(os.path.join(dirpath, filename), 'rb') as f:
187 cert = x509.load_pem_x509_certificate(f.read())
188 if cert:
189 # verify it is a CI certificate (keyCertSign + i-rspRole-ci)
190 if not cert_policy_has_oid(cert, oid.id_rspRole_ci):
191 raise ValueError("alleged CI certificate %s doesn't have CI policy" % filename)
192 certs.append(cert)
193 return certs
194
195 def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]:
196 """Find CI certificate for given key identifier."""
197 for cert in self.ci_certs:
198 print("cert: %s" % cert)
199 subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions))
200 print(subject_exts)
201 subject_pkid = subject_exts[0].value
202 print(subject_pkid)
203 if subject_pkid and subject_pkid.key_identifier == ci_pkid:
204 return cert
205 return None
206
207 def __init__(self, server_hostname: str, ci_certs_path: str, use_brainpool: bool = False):
208 self.server_hostname = server_hostname
209 self.ci_certs = self.load_certs_from_path(ci_certs_path)
210 # load DPauth cert + key
211 self.dp_auth = CertAndPrivkey(oid.id_rspRole_dp_auth_v2)
212 cert_dir = os.path.join(DATA_DIR, 'certs')
213 if use_brainpool:
214 self.dp_auth.cert_from_der_file(os.path.join(cert_dir, 'DPauth', 'CERT_S_SM_DPauth_ECDSA_BRP.der'))
215 self.dp_auth.privkey_from_pem_file(os.path.join(cert_dir, 'DPauth', 'SK_S_SM_DPauth_ECDSA_BRP.pem'))
216 else:
217 self.dp_auth.cert_from_der_file(os.path.join(cert_dir, 'DPauth', 'CERT_S_SM_DPauth_ECDSA_NIST.der'))
218 self.dp_auth.privkey_from_pem_file(os.path.join(cert_dir, 'DPauth', 'SK_S_SM_DPauth_ECDSA_NIST.pem'))
219 # load DPpb cert + key
220 self.dp_pb = CertAndPrivkey(oid.id_rspRole_dp_pb_v2)
221 if use_brainpool:
222 self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_BRP.der'))
223 self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_BRP.pem'))
224 else:
225 self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_NIST.der'))
226 self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_NIST.pem'))
227 self.rss = rsp.RspSessionStore(os.path.join(DATA_DIR, "sm-dp-sessions"))
228
229 @app.handle_errors(ApiError)
230 def handle_apierror(self, request: IRequest, failure):
231 request.setResponseCode(200)
232 pp(failure)
233 return failure.value.encode()
234
235 @staticmethod
236 def _ecdsa_verify(cert: x509.Certificate, signature: bytes, data: bytes) -> bool:
237 pubkey = cert.public_key()
238 dss_sig = ecdsa_tr03111_to_dss(signature)
239 try:
240 pubkey.verify(dss_sig, data, ec.ECDSA(hashes.SHA256()))
241 return True
242 except InvalidSignature:
243 return False
244
245 @staticmethod
246 def rsp_api_wrapper(func):
247 """Wrapper that can be used as decorator in order to perform common REST API endpoint entry/exit
248 functionality, such as JSON decoding/encoding and debug-printing."""
249 @functools.wraps(func)
250 def _api_wrapper(self, request: IRequest):
251 # TODO: evaluate User-Agent + X-Admin-Protocol header
252 # TODO: reject any non-JSON Content-type
253
254 content = json.loads(request.content.read())
Harald Welteebb6f7f2024-01-17 19:36:47 +0100255 print("Rx JSON: %s" % json.dumps(content))
Harald Welte5bbb1442023-12-11 12:46:47 +0100256 set_headers(request)
257
258 output = func(self, request, content) or {}
259
260 build_resp_header(output)
Harald Welteebb6f7f2024-01-17 19:36:47 +0100261 print("Tx JSON: %s" % json.dumps(output))
Harald Welte5bbb1442023-12-11 12:46:47 +0100262 return json.dumps(output)
263 return _api_wrapper
264
265 @app.route('/gsma/rsp2/es9plus/initiateAuthentication', methods=['POST'])
266 @rsp_api_wrapper
267 def initiateAutentication(self, request: IRequest, content: dict) -> dict:
268 """See ES9+ InitiateAuthentication SGP.22 Section 5.6.1"""
269 # Verify that the received address matches its own SM-DP+ address, where the comparison SHALL be
270 # case-insensitive. Otherwise, the SM-DP+ SHALL return a status code "SM-DP+ Address - Refused".
271 if content['smdpAddress'] != self.server_hostname:
272 raise ApiError('8.8.1', '3.8', 'Invalid SM-DP+ Address')
273
274 euiccChallenge = b64decode(content['euiccChallenge'])
275 if len(euiccChallenge) != 16:
276 raise ValueError
277
278 euiccInfo1_bin = b64decode(content['euiccInfo1'])
279 euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
280 print("Rx euiccInfo1: %s" % euiccInfo1)
281 #euiccInfo1['svn']
282
283 # TODO: If euiccCiPKIdListForSigningV3 is present ...
284
285 pkid_list = euiccInfo1['euiccCiPKIdListForSigning']
286 if 'euiccCiPKIdListForSigningV3' in euiccInfo1:
287 pkid_list = pkid_list + euiccInfo1['euiccCiPKIdListForSigningV3']
288 # verify it supports one of the keys indicated by euiccCiPKIdListForSigning
289 if not any(self.ci_get_cert_for_pkid(x) for x in pkid_list):
290 raise ApiError('8.8.2', '3.1', 'None of the proposed Public Key Identifiers is supported by the SM-DP+')
291
292 # TODO: Determine the set of CERT.DPauth.SIG that satisfy the following criteria:
293 # * Part of a certificate chain ending at one of the eSIM CA RootCA Certificate, whose Public Keys is
294 # supported by the eUICC (indicated by euiccCiPKIdListForVerification).
295 # * Using a certificate chain that the eUICC and the LPA both support:
296 #euiccInfo1['euiccCiPKIdListForVerification']
297 # raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA CErtificate with a Public Key supported by the eUICC')
298
299 # Generate a TransactionID which is used to identify the ongoing RSP session. The TransactionID
300 # SHALL be unique within the scope and lifetime of each SM-DP+.
301 transactionId = uuid.uuid4().hex
302 assert not transactionId in self.rss
303
304 # Generate a serverChallenge for eUICC authentication attached to the ongoing RSP session.
305 serverChallenge = os.urandom(16)
306
307 # Generate a serverSigned1 data object as expected by the eUICC and described in section 5.7.13 "ES10b.AuthenticateServer". If and only if both eUICC and LPA indicate crlStaplingV3Support, the SM-DP+ SHALL indicate crlStaplingV3Used in sessionContext.
308 serverSigned1 = {
309 'transactionId': h2b(transactionId),
310 'euiccChallenge': euiccChallenge,
311 'serverAddress': self.server_hostname,
312 'serverChallenge': serverChallenge,
313 }
314 print("Tx serverSigned1: %s" % serverSigned1)
315 serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
316 print("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
317 output = {}
318 output['serverSigned1'] = b64encode2str(serverSigned1_bin)
319
320 # Generate a signature (serverSignature1) as described in section 5.7.13 "ES10b.AuthenticateServer" using the SK related to the selected CERT.DPauth.SIG.
321 # serverSignature1 SHALL be created using the private key associated to the RSP Server Certificate for authentication, and verified by the eUICC using the contained public key as described in section 2.6.9. serverSignature1 SHALL apply on serverSigned1 data object.
322 output['serverSignature1'] = b64encode2str(b'\x5f\x37\x40' + self.dp_auth.ecdsa_sign(serverSigned1_bin))
323
324 output['transactionId'] = transactionId
325 server_cert_aki = self.dp_auth.get_authority_key_identifier()
326 output['euiccCiPKIdToBeUsed'] = b64encode2str(b'\x04\x14' + server_cert_aki.key_identifier)
327 output['serverCertificate'] = b64encode2str(self.dp_auth.get_cert_as_der()) # CERT.DPauth.SIG
328 # FIXME: add those certificate
329 #output['otherCertsInChain'] = b64encode2str()
330
331 # create SessionState and store it in rss
332 self.rss[transactionId] = rsp.RspSessionState(transactionId, serverChallenge)
333
334 return output
335
336 @app.route('/gsma/rsp2/es9plus/authenticateClient', methods=['POST'])
337 @rsp_api_wrapper
338 def authenticateClient(self, request: IRequest, content: dict) -> dict:
339 """See ES9+ AuthenticateClient in SGP.22 Section 5.6.3"""
340 transactionId = content['transactionId']
341
342 authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
343 authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
344 print("Rx %s: %s" % authenticateServerResp)
345 if authenticateServerResp[0] == 'authenticateResponseError':
346 r_err = authenticateServerResp[1]
347 #r_err['transactionId']
348 #r_err['authenticateErrorCode']
349 raise ValueError("authenticateResponseError %s" % r_err)
350
351 r_ok = authenticateServerResp[1]
352 euiccSigned1 = r_ok['euiccSigned1']
353 # TODO: use original data, don't re-encode?
354 euiccSigned1_bin = rsp.asn1.encode('EuiccSigned1', euiccSigned1)
355 euiccSignature1_bin = r_ok['euiccSignature1']
356 euiccCertificate_dec = r_ok['euiccCertificate']
357 # TODO: use original data, don't re-encode?
358 euiccCertificate_bin = rsp.asn1.encode('Certificate', euiccCertificate_dec)
359 eumCertificate_dec = r_ok['eumCertificate']
360 eumCertificate_bin = rsp.asn1.encode('Certificate', eumCertificate_dec)
361 # TODO v3: otherCertsInChain
362
363 # load certificate
364 euicc_cert = x509.load_der_x509_certificate(euiccCertificate_bin)
365 eum_cert = x509.load_der_x509_certificate(eumCertificate_bin)
366
367 # TODO: Verify the validity of the eUICC certificate chain
368 # raise ApiError('8.1.3', '6.1', 'Verification failed')
369 # raise ApiError('8.1.3', '6.3', 'Expired')
370
371 # TODO: Verify that the Root Certificate of the eUICC certificate chain corresponds to the
372 # euiccCiPKIdToBeUsed or euiccCiPKIdToBeUsedV3
373 # raise ApiError('8.11.1', '3.9', 'Unknown')
374
375 # Verify euiccSignature1 over euiccSigned1 using pubkey from euiccCertificate.
376 # Otherwise, the SM-DP+ SHALL return a status code "eUICC - Verification failed"
377 if not self._ecdsa_verify(euicc_cert, euiccSignature1_bin, euiccSigned1_bin):
378 raise ApiError('8.1', '6.1', 'Verification failed')
379
380 # Verify that the transactionId is known and relates to an ongoing RSP session. Otherwise, the SM-DP+
381 # SHALL return a status code "TransactionId - Unknown"
382 ss = self.rss.get(transactionId, None)
383 if ss is None:
384 raise ApiError('8.10.1', '3.9', 'Unknown')
385 ss.euicc_cert = euicc_cert
386 ss.eum_cert = eum_cert # do we need this in the state?
387
388 # TODO: verify eUICC cert is signed by EUM cert
389 # TODO: verify EUM cert is signed by CI cert
390 # TODO: verify EID of eUICC cert is within permitted range of EUM cert
391
392 ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
393 print("EID (from eUICC cert): %s" % ss.eid)
394
395 # Verify that the serverChallenge attached to the ongoing RSP session matches the
396 # serverChallenge returned by the eUICC. Otherwise, the SM-DP+ SHALL return a status code "eUICC -
397 # Verification failed".
398 if euiccSigned1['serverChallenge'] != ss.serverChallenge:
399 raise ApiError('8.1', '6.1', 'Verification failed')
400
401 # Put together profileMetadata + _bin
402 ss.profileMetadata = ProfileMetadata(iccid_bin= h2b(swap_nibbles('89000123456789012358')), spn="OsmocomSPN", profile_name="OsmocomProfile")
403 profileMetadata_bin = ss.profileMetadata.gen_store_metadata_request()
404
405 # Put together smdpSigned2 + _bin
406 smdpSigned2 = {
407 'transactionId': h2b(ss.transactionId),
408 'ccRequiredFlag': False, # whether the Confirmation Code is required
409 #'bppEuiccOtpk': None, # whether otPK.EUICC.ECKA already used for binding the BPP, tag '5F49'
410 }
411 smdpSigned2_bin = rsp.asn1.encode('SmdpSigned2', smdpSigned2)
412
413 ss.smdpSignature2_do = b'\x5f\x37\x40' + self.dp_pb.ecdsa_sign(smdpSigned2_bin + b'\x5f\x37\x40' + euiccSignature1_bin)
414
415 # update non-volatile state with updated ss object
416 self.rss[transactionId] = ss
417 return {
418 'transactionId': transactionId,
419 'profileMetadata': b64encode2str(profileMetadata_bin),
420 'smdpSigned2': b64encode2str(smdpSigned2_bin),
421 'smdpSignature2': b64encode2str(ss.smdpSignature2_do),
422 'smdpCertificate': b64encode2str(self.dp_pb.get_cert_as_der()), # CERT.DPpb.SIG
423 }
424
425 @app.route('/gsma/rsp2/es9plus/getBoundProfilePackage', methods=['POST'])
426 @rsp_api_wrapper
427 def getBoundProfilePackage(self, request: IRequest, content: dict) -> dict:
428 """See ES9+ GetBoundProfilePackage SGP.22 Section 5.6.2"""
429 transactionId = content['transactionId']
430
431 # Verify that the received transactionId is known and relates to an ongoing RSP session
432 ss = self.rss.get(transactionId, None)
433 if not ss:
434 raise ApiError('8.10.1', '3.9', 'The RSP session identified by the TransactionID is unknown')
435
436 prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
437 prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
438 print("Rx %s: %s" % prepDownloadResp)
439
440 if prepDownloadResp[0] == 'downloadResponseError':
441 r_err = prepDownloadResp[1]
442 #r_err['transactionId']
443 #r_err['downloadErrorCode']
444 raise ValueError("downloadResponseError %s" % r_err)
445
446 r_ok = prepDownloadResp[1]
447
448 # Verify the euiccSignature2 computed over euiccSigned2 and smdpSignature2 using the PK.EUICC.SIG attached to the ongoing RSP session
449 euiccSigned2 = r_ok['euiccSigned2']
450 # TODO: use original data, don't re-encode?
451 euiccSigned2_bin = rsp.asn1.encode('EUICCSigned2', euiccSigned2)
452 if not self._ecdsa_verify(ss.euicc_cert, r_ok['euiccSignature2'], euiccSigned2_bin + ss.smdpSignature2_do):
453 raise ApiError('8.1', '6.1', 'eUICC signature is invalid')
454
455 # not in spec: Verify that signed TransactionID is outer transaction ID
456 if h2b(transactionId) != euiccSigned2['transactionId']:
457 raise ApiError('8.10.1', '3.9', 'The signed transactionId != outer transactionId')
458
459 # store otPK.EUICC.ECKA in session state
460 ss.euicc_otpk = euiccSigned2['euiccOtpk']
461 print("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
462
463 # Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
464 # Reference value of CERT.DPpb.ECDDSA
465 print("curve = %s" % self.dp_pb.get_curve())
466 ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
467 # extract the public key in (hopefully) the right format for the ES8+ interface
468 ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
469 print("smdpOtpk: %s" % b2h(ss.smdp_otpk))
470 print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
471
472 ss.host_id = b'mahlzeit'
473
474 # Generate Session Keys using the CRT, opPK.eUICC.ECKA and otSK.DP.ECKA according to annex G
475 euicc_public_key = ec.EllipticCurvePublicKey.from_encoded_point(ss.smdp_ot.curve, ss.euicc_otpk)
476 ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key)
477 print("shared_secret: %s" % b2h(ss.shared_secret))
478
479 # TODO: Check if this order requires a Confirmation Code verification
480
481 # Perform actual protection + binding of profile package (or return pre-bound one)
482 with open(os.path.join(DATA_DIR, 'upp', 'TS48 V2 eSIM_GTP_SAIP2.1_NoBERTLV.rename2der'), 'rb') as f:
483 upp = UnprotectedProfilePackage.from_der(f.read(), metadata=ss.profileMetadata)
484 # HACK: Use empty PPP as we're still debuggin the configureISDP step, and we want to avoid
485 # cluttering the log with stuff happening after the failure
486 #upp = UnprotectedProfilePackage.from_der(b'', metadata=ss.profileMetadata)
487 if False:
488 # Use random keys
489 bpp = BoundProfilePackage.from_upp(upp)
490 else:
491 # Use sesssion keys
492 ppp = ProtectedProfilePackage.from_upp(upp, BspInstance(b'\x00'*16, b'\x11'*16, b'\x22'*16))
493 bpp = BoundProfilePackage.from_ppp(ppp)
494
495 # update non-volatile state with updated ss object
496 self.rss[transactionId] = ss
497 return {
498 'transactionId': transactionId,
499 'boundProfilePackage': b64encode2str(bpp.encode(ss, self.dp_pb)),
500 }
501
502 @app.route('/gsma/rsp2/es9plus/handleNotification', methods=['POST'])
503 @rsp_api_wrapper
504 def handleNotification(self, request: IRequest, content: dict) -> dict:
505 """See ES9+ HandleNotification in SGP.22 Section 5.6.4"""
506 pendingNotification_bin = b64decode(content['pendingNotification'])
507 pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
508 print("Rx %s: %s" % pendingNotification)
509 if pendingNotification[0] == 'profileInstallationResult':
510 profileInstallRes = pendingNotification[1]
511 pird = profileInstallRes['profileInstallationResultData']
512 transactionId = b2h(pird['transactionId'])
513 ss = self.rss.get(transactionId, None)
514 if ss is None:
515 print("Unable to find session for transactionId")
516 return
517 profileInstallRes['euiccSignPIR']
518 # TODO: use original data, don't re-encode?
519 pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
520 # verify eUICC signature
521 if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin):
522 print("Unable to verify eUICC signature")
523 print("Profile Installation Final Result: ", pird['finalResult'])
524 # remove session state
525 del self.rss[transactionId]
526 elif pendingNotification[0] == 'otherSignedNotification':
527 # TODO
528 pass
529 else:
530 raise ValueError(pendingNotification)
531
532 #@app.route('/gsma/rsp3/es9plus/handleDeviceChangeRequest, methods=['POST']')
533 #@rsp_api_wrapper
534 #"""See ES9+ ConfirmDeviceChange in SGP.22 Section 5.6.6"""
535 # TODO: implement this
536
537 @app.route('/gsma/rsp2/es9plus/cancelSession', methods=['POST'])
538 @rsp_api_wrapper
539 def cancelSession(self, request: IRequest, content: dict) -> dict:
540 """See ES9+ CancelSession in SGP.22 Section 5.6.5"""
541 print("Rx JSON: %s" % content)
542 transactionId = content['transactionId']
543
544 # Verify that the received transactionId is known and relates to an ongoing RSP session
545 ss = self.rss.get(transactionId, None)
546 if ss is None:
547 raise ApiError('8.10.1', '3.9', 'The RSP session identified by the transactionId is unknown')
548
549 cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
550 cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
551 print("Rx %s: %s" % cancelSessionResponse)
552
553 if cancelSessionResponse[0] == 'cancelSessionResponseError':
554 # FIXME: print some error
555 return
556 cancelSessionResponseOk = cancelSessionResponse[1]
557 # TODO: use original data, don't re-encode?
558 ecsr = cancelSessionResponseOk['euiccCancelSessionSigned']
559 ecsr_bin = rsp.asn1.encode('EuiccCancelSessionSigned', ecsr)
560 # Verify the eUICC signature (euiccCancelSessionSignature) using the PK.EUICC.SIG attached to the ongoing RSP session
561 if not self._ecdsa_verify(ss.euicc_cert, cancelSessionResponseOk['euiccCancelSessionSignature'], ecsr_bin):
562 raise ApiError('8.1', '6.1', 'eUICC signature is invalid')
563
564 # Verify that the received smdpOid corresponds to the one in SM-DP+ CERT.DPauth.SIG
565 subj_alt_name = self.dp_auth.get_subject_alt_name()
566 if x509.ObjectIdentifier(ecsr['smdpOid']) != subj_alt_name.oid:
567 raise ApiError('8.8', '3.10', 'The provided SM-DP+ OID is invalid.')
568
569 if ecsr['transactionId'] != h2b(transactionId):
570 raise ApiError('8.10.1', '3.9', 'The signed transactionId != outer transactionId')
571
572 # TODO: 1. Notify the Operator using the function "ES2+.HandleNotification" function
573 # TODO: 2. Terminate the corresponding pending download process.
574 # TODO: 3. If required, execute the SM-DS Event Deletion procedure described in section 3.6.3.
575
576 # delete actual session data
577 del self.rss[transactionId]
578 return { 'transactionId': transactionId }
579
580
581def main(argv):
582 parser = argparse.ArgumentParser()
583 #parser.add_argument("-H", "--host", help="Host/IP to bind HTTP to", default="localhost")
584 #parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000)
585 #parser.add_argument("-v", "--verbose", help="increase output verbosity", action='count', default=0)
586
587 args = parser.parse_args()
588
589 hs = SmDppHttpServer(HOSTNAME, os.path.join(DATA_DIR, 'certs', 'CertificateIssuer'), use_brainpool=True)
590 #hs.app.run(endpoint_description="ssl:port=8000:dhParameters=dh_param_2048.pem")
591 hs.app.run("localhost", 8000)
592
593if __name__ == "__main__":
594 main(sys.argv)