blob: cfcd5f8b0a36b2188b9c06b779c82ecd82387252 [file] [log] [blame]
Harald Welte5bbb1442023-12-11 12:46:47 +01001#!/usr/bin/env python3
2
3# Early proof-of-concept towards a SM-DP+ HTTP service for GSMA consumer eSIM RSP
4#
5# (C) 2023-2024 by Harald Welte <laforge@osmocom.org>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU Affero General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU Affero General Public License for more details.
16#
17# You should have received a copy of the GNU Affero General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20import json
21import sys
22import argparse
23import uuid
24import os
25import functools
26from typing import Optional, Dict, List
27from pprint import pprint as pp
28
29import base64
30from base64 import b64decode
31from klein import Klein
32from twisted.web.iweb import IRequest
33import asn1tools
34
35from pySim.utils import h2b, b2h, swap_nibbles
36
37import pySim.esim.rsp as rsp
38from pySim.esim.es8p import *
Harald Welteaf87cd52024-01-19 21:29:06 +010039from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id
40from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError
Harald Welte5bbb1442023-12-11 12:46:47 +010041
42# HACK: make this configurable
43DATA_DIR = './smdpp-data'
44HOSTNAME = 'testsmdpplus1.example.com' # must match certificates!
45
46
47def b64encode2str(req: bytes) -> str:
48 """Encode given input bytes as base64 and return result as string."""
49 return base64.b64encode(req).decode('ascii')
50
51def set_headers(request: IRequest):
52 """Set the request headers as mandatory by GSMA eSIM RSP."""
53 request.setHeader('Content-Type', 'application/json;charset=UTF-8')
54 request.setHeader('X-Admin-Protocol', 'gsma/rsp/v2.1.0')
55
56def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict:
57 r = {'subjectCode': subject_code, 'reasonCode': reason_code }
58 if subject_id:
59 r['subjectIdentifier'] = subject_id
60 if message:
61 r['message'] = message
62 return r
63
64def build_resp_header(js: dict, status: str = 'Executed-Success', status_code_data = None) -> None:
65 # SGP.22 v3.0 6.5.1.4
66 js['header'] = {
67 'functionExecutionStatus': {
68 'status': status,
69 }
70 }
71 if status_code_data:
72 js['header']['functionExecutionStatus']['statusCodeData'] = status_code_data
73
74from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
Harald Welte45b7dc92024-01-19 19:28:15 +010075from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption
Harald Welte5bbb1442023-12-11 12:46:47 +010076from cryptography.hazmat.primitives.asymmetric import ec
77from cryptography.hazmat.primitives import hashes
78from cryptography.exceptions import InvalidSignature
79from cryptography import x509
80
Harald Welte5bbb1442023-12-11 12:46:47 +010081def ecdsa_tr03111_to_dss(sig: bytes) -> bytes:
82 """convert an ECDSA signature from BSI TR-03111 format to DER: first get long integers; then encode those."""
83 assert len(sig) == 64
84 r = int.from_bytes(sig[0:32], 'big')
85 s = int.from_bytes(sig[32:32*2], 'big')
86 return encode_dss_signature(r, s)
87
88
Harald Welte5bbb1442023-12-11 12:46:47 +010089class ApiError(Exception):
90 def __init__(self, subject_code: str, reason_code: str, message: Optional[str] = None,
91 subject_id: Optional[str] = None):
92 self.status_code = build_status_code(subject_code, reason_code, subject_id, message)
93
94 def encode(self) -> str:
95 """Encode the API Error into a responseHeader string."""
96 js = {}
97 build_resp_header(js, 'Failed', self.status_code)
98 return json.dumps(js)
99
Harald Welte5bbb1442023-12-11 12:46:47 +0100100class SmDppHttpServer:
101 app = Klein()
102
103 @staticmethod
104 def load_certs_from_path(path: str) -> List[x509.Certificate]:
105 """Load all DER + PEM files from given directory path and return them as list of x509.Certificate
106 instances."""
107 certs = []
108 for dirpath, dirnames, filenames in os.walk(path):
109 for filename in filenames:
110 cert = None
111 if filename.endswith('.der'):
112 with open(os.path.join(dirpath, filename), 'rb') as f:
113 cert = x509.load_der_x509_certificate(f.read())
114 elif filename.endswith('.pem'):
115 with open(os.path.join(dirpath, filename), 'rb') as f:
116 cert = x509.load_pem_x509_certificate(f.read())
117 if cert:
118 # verify it is a CI certificate (keyCertSign + i-rspRole-ci)
119 if not cert_policy_has_oid(cert, oid.id_rspRole_ci):
120 raise ValueError("alleged CI certificate %s doesn't have CI policy" % filename)
121 certs.append(cert)
122 return certs
123
124 def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]:
125 """Find CI certificate for given key identifier."""
126 for cert in self.ci_certs:
127 print("cert: %s" % cert)
128 subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions))
129 print(subject_exts)
130 subject_pkid = subject_exts[0].value
131 print(subject_pkid)
132 if subject_pkid and subject_pkid.key_identifier == ci_pkid:
133 return cert
134 return None
135
136 def __init__(self, server_hostname: str, ci_certs_path: str, use_brainpool: bool = False):
137 self.server_hostname = server_hostname
138 self.ci_certs = self.load_certs_from_path(ci_certs_path)
139 # load DPauth cert + key
140 self.dp_auth = CertAndPrivkey(oid.id_rspRole_dp_auth_v2)
141 cert_dir = os.path.join(DATA_DIR, 'certs')
142 if use_brainpool:
143 self.dp_auth.cert_from_der_file(os.path.join(cert_dir, 'DPauth', 'CERT_S_SM_DPauth_ECDSA_BRP.der'))
144 self.dp_auth.privkey_from_pem_file(os.path.join(cert_dir, 'DPauth', 'SK_S_SM_DPauth_ECDSA_BRP.pem'))
145 else:
146 self.dp_auth.cert_from_der_file(os.path.join(cert_dir, 'DPauth', 'CERT_S_SM_DPauth_ECDSA_NIST.der'))
147 self.dp_auth.privkey_from_pem_file(os.path.join(cert_dir, 'DPauth', 'SK_S_SM_DPauth_ECDSA_NIST.pem'))
148 # load DPpb cert + key
149 self.dp_pb = CertAndPrivkey(oid.id_rspRole_dp_pb_v2)
150 if use_brainpool:
151 self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_BRP.der'))
152 self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_BRP.pem'))
153 else:
154 self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_NIST.der'))
155 self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_NIST.pem'))
156 self.rss = rsp.RspSessionStore(os.path.join(DATA_DIR, "sm-dp-sessions"))
157
158 @app.handle_errors(ApiError)
159 def handle_apierror(self, request: IRequest, failure):
160 request.setResponseCode(200)
161 pp(failure)
162 return failure.value.encode()
163
164 @staticmethod
165 def _ecdsa_verify(cert: x509.Certificate, signature: bytes, data: bytes) -> bool:
166 pubkey = cert.public_key()
167 dss_sig = ecdsa_tr03111_to_dss(signature)
168 try:
169 pubkey.verify(dss_sig, data, ec.ECDSA(hashes.SHA256()))
170 return True
171 except InvalidSignature:
172 return False
173
174 @staticmethod
175 def rsp_api_wrapper(func):
176 """Wrapper that can be used as decorator in order to perform common REST API endpoint entry/exit
177 functionality, such as JSON decoding/encoding and debug-printing."""
178 @functools.wraps(func)
179 def _api_wrapper(self, request: IRequest):
180 # TODO: evaluate User-Agent + X-Admin-Protocol header
181 # TODO: reject any non-JSON Content-type
182
183 content = json.loads(request.content.read())
Harald Welteebb6f7f2024-01-17 19:36:47 +0100184 print("Rx JSON: %s" % json.dumps(content))
Harald Welte5bbb1442023-12-11 12:46:47 +0100185 set_headers(request)
186
187 output = func(self, request, content) or {}
188
189 build_resp_header(output)
Harald Welteebb6f7f2024-01-17 19:36:47 +0100190 print("Tx JSON: %s" % json.dumps(output))
Harald Welte5bbb1442023-12-11 12:46:47 +0100191 return json.dumps(output)
192 return _api_wrapper
193
194 @app.route('/gsma/rsp2/es9plus/initiateAuthentication', methods=['POST'])
195 @rsp_api_wrapper
196 def initiateAutentication(self, request: IRequest, content: dict) -> dict:
197 """See ES9+ InitiateAuthentication SGP.22 Section 5.6.1"""
198 # Verify that the received address matches its own SM-DP+ address, where the comparison SHALL be
199 # case-insensitive. Otherwise, the SM-DP+ SHALL return a status code "SM-DP+ Address - Refused".
200 if content['smdpAddress'] != self.server_hostname:
201 raise ApiError('8.8.1', '3.8', 'Invalid SM-DP+ Address')
202
203 euiccChallenge = b64decode(content['euiccChallenge'])
204 if len(euiccChallenge) != 16:
205 raise ValueError
206
207 euiccInfo1_bin = b64decode(content['euiccInfo1'])
208 euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
209 print("Rx euiccInfo1: %s" % euiccInfo1)
210 #euiccInfo1['svn']
211
212 # TODO: If euiccCiPKIdListForSigningV3 is present ...
213
214 pkid_list = euiccInfo1['euiccCiPKIdListForSigning']
215 if 'euiccCiPKIdListForSigningV3' in euiccInfo1:
216 pkid_list = pkid_list + euiccInfo1['euiccCiPKIdListForSigningV3']
217 # verify it supports one of the keys indicated by euiccCiPKIdListForSigning
Harald Welteaf87cd52024-01-19 21:29:06 +0100218 ci_cert = None
219 for x in pkid_list:
220 ci_cert = self.ci_get_cert_for_pkid(x)
Harald Welte9fd4bbe2024-01-25 15:50:20 +0100221 # we already support multiple CI certificates but only one set of DPauth + DPpb keys. So we must
222 # make sure we choose a CI key-id which has issued both the eUICC as well as our own SM-DP side
223 # certs.
224 if ci_cert and cert_get_subject_key_id(ci_cert) == self.dp_auth.get_authority_key_identifier().key_identifier:
Harald Welteaf87cd52024-01-19 21:29:06 +0100225 break
Harald Welte9fd4bbe2024-01-25 15:50:20 +0100226 else:
227 ci_cert = None
Harald Welteaf87cd52024-01-19 21:29:06 +0100228 if not ci_cert:
Harald Welte5bbb1442023-12-11 12:46:47 +0100229 raise ApiError('8.8.2', '3.1', 'None of the proposed Public Key Identifiers is supported by the SM-DP+')
230
231 # TODO: Determine the set of CERT.DPauth.SIG that satisfy the following criteria:
232 # * Part of a certificate chain ending at one of the eSIM CA RootCA Certificate, whose Public Keys is
233 # supported by the eUICC (indicated by euiccCiPKIdListForVerification).
234 # * Using a certificate chain that the eUICC and the LPA both support:
235 #euiccInfo1['euiccCiPKIdListForVerification']
236 # raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA CErtificate with a Public Key supported by the eUICC')
237
238 # Generate a TransactionID which is used to identify the ongoing RSP session. The TransactionID
239 # SHALL be unique within the scope and lifetime of each SM-DP+.
240 transactionId = uuid.uuid4().hex
241 assert not transactionId in self.rss
242
243 # Generate a serverChallenge for eUICC authentication attached to the ongoing RSP session.
244 serverChallenge = os.urandom(16)
245
246 # Generate a serverSigned1 data object as expected by the eUICC and described in section 5.7.13 "ES10b.AuthenticateServer". If and only if both eUICC and LPA indicate crlStaplingV3Support, the SM-DP+ SHALL indicate crlStaplingV3Used in sessionContext.
247 serverSigned1 = {
248 'transactionId': h2b(transactionId),
249 'euiccChallenge': euiccChallenge,
250 'serverAddress': self.server_hostname,
251 'serverChallenge': serverChallenge,
252 }
253 print("Tx serverSigned1: %s" % serverSigned1)
254 serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
255 print("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
256 output = {}
257 output['serverSigned1'] = b64encode2str(serverSigned1_bin)
258
259 # Generate a signature (serverSignature1) as described in section 5.7.13 "ES10b.AuthenticateServer" using the SK related to the selected CERT.DPauth.SIG.
260 # serverSignature1 SHALL be created using the private key associated to the RSP Server Certificate for authentication, and verified by the eUICC using the contained public key as described in section 2.6.9. serverSignature1 SHALL apply on serverSigned1 data object.
261 output['serverSignature1'] = b64encode2str(b'\x5f\x37\x40' + self.dp_auth.ecdsa_sign(serverSigned1_bin))
262
263 output['transactionId'] = transactionId
264 server_cert_aki = self.dp_auth.get_authority_key_identifier()
265 output['euiccCiPKIdToBeUsed'] = b64encode2str(b'\x04\x14' + server_cert_aki.key_identifier)
266 output['serverCertificate'] = b64encode2str(self.dp_auth.get_cert_as_der()) # CERT.DPauth.SIG
267 # FIXME: add those certificate
268 #output['otherCertsInChain'] = b64encode2str()
269
270 # create SessionState and store it in rss
Harald Welteaf87cd52024-01-19 21:29:06 +0100271 self.rss[transactionId] = rsp.RspSessionState(transactionId, serverChallenge,
272 cert_get_subject_key_id(ci_cert))
Harald Welte5bbb1442023-12-11 12:46:47 +0100273
274 return output
275
276 @app.route('/gsma/rsp2/es9plus/authenticateClient', methods=['POST'])
277 @rsp_api_wrapper
278 def authenticateClient(self, request: IRequest, content: dict) -> dict:
279 """See ES9+ AuthenticateClient in SGP.22 Section 5.6.3"""
280 transactionId = content['transactionId']
281
282 authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
283 authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
284 print("Rx %s: %s" % authenticateServerResp)
285 if authenticateServerResp[0] == 'authenticateResponseError':
286 r_err = authenticateServerResp[1]
287 #r_err['transactionId']
288 #r_err['authenticateErrorCode']
289 raise ValueError("authenticateResponseError %s" % r_err)
290
291 r_ok = authenticateServerResp[1]
292 euiccSigned1 = r_ok['euiccSigned1']
293 # TODO: use original data, don't re-encode?
294 euiccSigned1_bin = rsp.asn1.encode('EuiccSigned1', euiccSigned1)
295 euiccSignature1_bin = r_ok['euiccSignature1']
296 euiccCertificate_dec = r_ok['euiccCertificate']
297 # TODO: use original data, don't re-encode?
298 euiccCertificate_bin = rsp.asn1.encode('Certificate', euiccCertificate_dec)
299 eumCertificate_dec = r_ok['eumCertificate']
300 eumCertificate_bin = rsp.asn1.encode('Certificate', eumCertificate_dec)
301 # TODO v3: otherCertsInChain
302
303 # load certificate
304 euicc_cert = x509.load_der_x509_certificate(euiccCertificate_bin)
305 eum_cert = x509.load_der_x509_certificate(eumCertificate_bin)
306
Harald Welte5bbb1442023-12-11 12:46:47 +0100307 # Verify that the transactionId is known and relates to an ongoing RSP session. Otherwise, the SM-DP+
308 # SHALL return a status code "TransactionId - Unknown"
309 ss = self.rss.get(transactionId, None)
310 if ss is None:
311 raise ApiError('8.10.1', '3.9', 'Unknown')
312 ss.euicc_cert = euicc_cert
Harald Welteaf87cd52024-01-19 21:29:06 +0100313 ss.eum_cert = eum_cert # TODO: do we need this in the state?
Harald Welte5bbb1442023-12-11 12:46:47 +0100314
Harald Welteaf87cd52024-01-19 21:29:06 +0100315 # Verify that the Root Certificate of the eUICC certificate chain corresponds to the
316 # euiccCiPKIdToBeUsed or TODO: euiccCiPKIdToBeUsedV3
317 if cert_get_auth_key_id(eum_cert) != ss.ci_cert_id:
318 raise ApiError('8.11.1', '3.9', 'Unknown')
319
320 # Verify the validity of the eUICC certificate chain
321 cs = CertificateSet(self.ci_get_cert_for_pkid(ss.ci_cert_id))
322 cs.add_intermediate_cert(eum_cert)
323 # TODO v3: otherCertsInChain
324 try:
325 cs.verify_cert_chain(euicc_cert)
326 except VerifyError:
327 raise ApiError('8.1.3', '6.1', 'Verification failed')
328 # raise ApiError('8.1.3', '6.3', 'Expired')
329
330
331 # Verify euiccSignature1 over euiccSigned1 using pubkey from euiccCertificate.
332 # Otherwise, the SM-DP+ SHALL return a status code "eUICC - Verification failed"
333 if not self._ecdsa_verify(euicc_cert, euiccSignature1_bin, euiccSigned1_bin):
334 raise ApiError('8.1', '6.1', 'Verification failed')
335
Harald Welte5bbb1442023-12-11 12:46:47 +0100336 # TODO: verify EID of eUICC cert is within permitted range of EUM cert
337
338 ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
339 print("EID (from eUICC cert): %s" % ss.eid)
340
341 # Verify that the serverChallenge attached to the ongoing RSP session matches the
342 # serverChallenge returned by the eUICC. Otherwise, the SM-DP+ SHALL return a status code "eUICC -
343 # Verification failed".
344 if euiccSigned1['serverChallenge'] != ss.serverChallenge:
345 raise ApiError('8.1', '6.1', 'Verification failed')
346
347 # Put together profileMetadata + _bin
348 ss.profileMetadata = ProfileMetadata(iccid_bin= h2b(swap_nibbles('89000123456789012358')), spn="OsmocomSPN", profile_name="OsmocomProfile")
349 profileMetadata_bin = ss.profileMetadata.gen_store_metadata_request()
350
351 # Put together smdpSigned2 + _bin
352 smdpSigned2 = {
353 'transactionId': h2b(ss.transactionId),
354 'ccRequiredFlag': False, # whether the Confirmation Code is required
355 #'bppEuiccOtpk': None, # whether otPK.EUICC.ECKA already used for binding the BPP, tag '5F49'
356 }
357 smdpSigned2_bin = rsp.asn1.encode('SmdpSigned2', smdpSigned2)
358
359 ss.smdpSignature2_do = b'\x5f\x37\x40' + self.dp_pb.ecdsa_sign(smdpSigned2_bin + b'\x5f\x37\x40' + euiccSignature1_bin)
360
361 # update non-volatile state with updated ss object
362 self.rss[transactionId] = ss
363 return {
364 'transactionId': transactionId,
365 'profileMetadata': b64encode2str(profileMetadata_bin),
366 'smdpSigned2': b64encode2str(smdpSigned2_bin),
367 'smdpSignature2': b64encode2str(ss.smdpSignature2_do),
368 'smdpCertificate': b64encode2str(self.dp_pb.get_cert_as_der()), # CERT.DPpb.SIG
369 }
370
371 @app.route('/gsma/rsp2/es9plus/getBoundProfilePackage', methods=['POST'])
372 @rsp_api_wrapper
373 def getBoundProfilePackage(self, request: IRequest, content: dict) -> dict:
374 """See ES9+ GetBoundProfilePackage SGP.22 Section 5.6.2"""
375 transactionId = content['transactionId']
376
377 # Verify that the received transactionId is known and relates to an ongoing RSP session
378 ss = self.rss.get(transactionId, None)
379 if not ss:
380 raise ApiError('8.10.1', '3.9', 'The RSP session identified by the TransactionID is unknown')
381
382 prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
383 prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
384 print("Rx %s: %s" % prepDownloadResp)
385
386 if prepDownloadResp[0] == 'downloadResponseError':
387 r_err = prepDownloadResp[1]
388 #r_err['transactionId']
389 #r_err['downloadErrorCode']
390 raise ValueError("downloadResponseError %s" % r_err)
391
392 r_ok = prepDownloadResp[1]
393
394 # Verify the euiccSignature2 computed over euiccSigned2 and smdpSignature2 using the PK.EUICC.SIG attached to the ongoing RSP session
395 euiccSigned2 = r_ok['euiccSigned2']
396 # TODO: use original data, don't re-encode?
397 euiccSigned2_bin = rsp.asn1.encode('EUICCSigned2', euiccSigned2)
398 if not self._ecdsa_verify(ss.euicc_cert, r_ok['euiccSignature2'], euiccSigned2_bin + ss.smdpSignature2_do):
399 raise ApiError('8.1', '6.1', 'eUICC signature is invalid')
400
401 # not in spec: Verify that signed TransactionID is outer transaction ID
402 if h2b(transactionId) != euiccSigned2['transactionId']:
403 raise ApiError('8.10.1', '3.9', 'The signed transactionId != outer transactionId')
404
405 # store otPK.EUICC.ECKA in session state
406 ss.euicc_otpk = euiccSigned2['euiccOtpk']
407 print("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
408
409 # Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
410 # Reference value of CERT.DPpb.ECDDSA
411 print("curve = %s" % self.dp_pb.get_curve())
412 ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
413 # extract the public key in (hopefully) the right format for the ES8+ interface
414 ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
415 print("smdpOtpk: %s" % b2h(ss.smdp_otpk))
416 print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
417
418 ss.host_id = b'mahlzeit'
419
420 # Generate Session Keys using the CRT, opPK.eUICC.ECKA and otSK.DP.ECKA according to annex G
421 euicc_public_key = ec.EllipticCurvePublicKey.from_encoded_point(ss.smdp_ot.curve, ss.euicc_otpk)
422 ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key)
423 print("shared_secret: %s" % b2h(ss.shared_secret))
424
425 # TODO: Check if this order requires a Confirmation Code verification
426
427 # Perform actual protection + binding of profile package (or return pre-bound one)
428 with open(os.path.join(DATA_DIR, 'upp', 'TS48 V2 eSIM_GTP_SAIP2.1_NoBERTLV.rename2der'), 'rb') as f:
429 upp = UnprotectedProfilePackage.from_der(f.read(), metadata=ss.profileMetadata)
430 # HACK: Use empty PPP as we're still debuggin the configureISDP step, and we want to avoid
431 # cluttering the log with stuff happening after the failure
432 #upp = UnprotectedProfilePackage.from_der(b'', metadata=ss.profileMetadata)
433 if False:
434 # Use random keys
435 bpp = BoundProfilePackage.from_upp(upp)
436 else:
437 # Use sesssion keys
438 ppp = ProtectedProfilePackage.from_upp(upp, BspInstance(b'\x00'*16, b'\x11'*16, b'\x22'*16))
439 bpp = BoundProfilePackage.from_ppp(ppp)
440
441 # update non-volatile state with updated ss object
442 self.rss[transactionId] = ss
443 return {
444 'transactionId': transactionId,
445 'boundProfilePackage': b64encode2str(bpp.encode(ss, self.dp_pb)),
446 }
447
448 @app.route('/gsma/rsp2/es9plus/handleNotification', methods=['POST'])
449 @rsp_api_wrapper
450 def handleNotification(self, request: IRequest, content: dict) -> dict:
451 """See ES9+ HandleNotification in SGP.22 Section 5.6.4"""
452 pendingNotification_bin = b64decode(content['pendingNotification'])
453 pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
454 print("Rx %s: %s" % pendingNotification)
455 if pendingNotification[0] == 'profileInstallationResult':
456 profileInstallRes = pendingNotification[1]
457 pird = profileInstallRes['profileInstallationResultData']
458 transactionId = b2h(pird['transactionId'])
459 ss = self.rss.get(transactionId, None)
460 if ss is None:
461 print("Unable to find session for transactionId")
462 return
463 profileInstallRes['euiccSignPIR']
464 # TODO: use original data, don't re-encode?
465 pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
466 # verify eUICC signature
467 if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin):
468 print("Unable to verify eUICC signature")
469 print("Profile Installation Final Result: ", pird['finalResult'])
470 # remove session state
471 del self.rss[transactionId]
472 elif pendingNotification[0] == 'otherSignedNotification':
473 # TODO
474 pass
475 else:
476 raise ValueError(pendingNotification)
477
478 #@app.route('/gsma/rsp3/es9plus/handleDeviceChangeRequest, methods=['POST']')
479 #@rsp_api_wrapper
480 #"""See ES9+ ConfirmDeviceChange in SGP.22 Section 5.6.6"""
481 # TODO: implement this
482
483 @app.route('/gsma/rsp2/es9plus/cancelSession', methods=['POST'])
484 @rsp_api_wrapper
485 def cancelSession(self, request: IRequest, content: dict) -> dict:
486 """See ES9+ CancelSession in SGP.22 Section 5.6.5"""
487 print("Rx JSON: %s" % content)
488 transactionId = content['transactionId']
489
490 # Verify that the received transactionId is known and relates to an ongoing RSP session
491 ss = self.rss.get(transactionId, None)
492 if ss is None:
493 raise ApiError('8.10.1', '3.9', 'The RSP session identified by the transactionId is unknown')
494
495 cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
496 cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
497 print("Rx %s: %s" % cancelSessionResponse)
498
499 if cancelSessionResponse[0] == 'cancelSessionResponseError':
500 # FIXME: print some error
501 return
502 cancelSessionResponseOk = cancelSessionResponse[1]
503 # TODO: use original data, don't re-encode?
504 ecsr = cancelSessionResponseOk['euiccCancelSessionSigned']
505 ecsr_bin = rsp.asn1.encode('EuiccCancelSessionSigned', ecsr)
506 # Verify the eUICC signature (euiccCancelSessionSignature) using the PK.EUICC.SIG attached to the ongoing RSP session
507 if not self._ecdsa_verify(ss.euicc_cert, cancelSessionResponseOk['euiccCancelSessionSignature'], ecsr_bin):
508 raise ApiError('8.1', '6.1', 'eUICC signature is invalid')
509
510 # Verify that the received smdpOid corresponds to the one in SM-DP+ CERT.DPauth.SIG
511 subj_alt_name = self.dp_auth.get_subject_alt_name()
512 if x509.ObjectIdentifier(ecsr['smdpOid']) != subj_alt_name.oid:
513 raise ApiError('8.8', '3.10', 'The provided SM-DP+ OID is invalid.')
514
515 if ecsr['transactionId'] != h2b(transactionId):
516 raise ApiError('8.10.1', '3.9', 'The signed transactionId != outer transactionId')
517
518 # TODO: 1. Notify the Operator using the function "ES2+.HandleNotification" function
519 # TODO: 2. Terminate the corresponding pending download process.
520 # TODO: 3. If required, execute the SM-DS Event Deletion procedure described in section 3.6.3.
521
522 # delete actual session data
523 del self.rss[transactionId]
524 return { 'transactionId': transactionId }
525
526
527def main(argv):
528 parser = argparse.ArgumentParser()
529 #parser.add_argument("-H", "--host", help="Host/IP to bind HTTP to", default="localhost")
530 #parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000)
531 #parser.add_argument("-v", "--verbose", help="increase output verbosity", action='count', default=0)
532
533 args = parser.parse_args()
534
535 hs = SmDppHttpServer(HOSTNAME, os.path.join(DATA_DIR, 'certs', 'CertificateIssuer'), use_brainpool=True)
536 #hs.app.run(endpoint_description="ssl:port=8000:dhParameters=dh_param_2048.pem")
537 hs.app.run("localhost", 8000)
538
539if __name__ == "__main__":
540 main(sys.argv)