blob: 8cb00824c8d8798e7dc79f2c7c9476017233f489 [file] [log] [blame]
Harald Welte5bbb1442023-12-11 12:46:47 +01001#!/usr/bin/env python3
2
3# Early proof-of-concept towards a SM-DP+ HTTP service for GSMA consumer eSIM RSP
4#
5# (C) 2023-2024 by Harald Welte <laforge@osmocom.org>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU Affero General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU Affero General Public License for more details.
16#
17# You should have received a copy of the GNU Affero General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20import json
21import sys
22import argparse
23import uuid
24import os
25import functools
26from typing import Optional, Dict, List
27from pprint import pprint as pp
28
29import base64
30from base64 import b64decode
31from klein import Klein
32from twisted.web.iweb import IRequest
33import asn1tools
34
35from pySim.utils import h2b, b2h, swap_nibbles
36
37import pySim.esim.rsp as rsp
38from pySim.esim.es8p import *
Harald Welteaf87cd52024-01-19 21:29:06 +010039from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id
40from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError
Harald Welte5bbb1442023-12-11 12:46:47 +010041
42# HACK: make this configurable
43DATA_DIR = './smdpp-data'
44HOSTNAME = 'testsmdpplus1.example.com' # must match certificates!
45
46
47def b64encode2str(req: bytes) -> str:
48 """Encode given input bytes as base64 and return result as string."""
49 return base64.b64encode(req).decode('ascii')
50
51def set_headers(request: IRequest):
52 """Set the request headers as mandatory by GSMA eSIM RSP."""
53 request.setHeader('Content-Type', 'application/json;charset=UTF-8')
54 request.setHeader('X-Admin-Protocol', 'gsma/rsp/v2.1.0')
55
56def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict:
57 r = {'subjectCode': subject_code, 'reasonCode': reason_code }
58 if subject_id:
59 r['subjectIdentifier'] = subject_id
60 if message:
61 r['message'] = message
62 return r
63
64def build_resp_header(js: dict, status: str = 'Executed-Success', status_code_data = None) -> None:
65 # SGP.22 v3.0 6.5.1.4
66 js['header'] = {
67 'functionExecutionStatus': {
68 'status': status,
69 }
70 }
71 if status_code_data:
72 js['header']['functionExecutionStatus']['statusCodeData'] = status_code_data
73
74from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
Harald Welte45b7dc92024-01-19 19:28:15 +010075from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption
Harald Welte5bbb1442023-12-11 12:46:47 +010076from cryptography.hazmat.primitives.asymmetric import ec
77from cryptography.hazmat.primitives import hashes
78from cryptography.exceptions import InvalidSignature
79from cryptography import x509
80
Harald Welte5bbb1442023-12-11 12:46:47 +010081def ecdsa_tr03111_to_dss(sig: bytes) -> bytes:
82 """convert an ECDSA signature from BSI TR-03111 format to DER: first get long integers; then encode those."""
83 assert len(sig) == 64
84 r = int.from_bytes(sig[0:32], 'big')
85 s = int.from_bytes(sig[32:32*2], 'big')
86 return encode_dss_signature(r, s)
87
88
Harald Welte5bbb1442023-12-11 12:46:47 +010089class ApiError(Exception):
90 def __init__(self, subject_code: str, reason_code: str, message: Optional[str] = None,
91 subject_id: Optional[str] = None):
92 self.status_code = build_status_code(subject_code, reason_code, subject_id, message)
93
94 def encode(self) -> str:
95 """Encode the API Error into a responseHeader string."""
96 js = {}
97 build_resp_header(js, 'Failed', self.status_code)
98 return json.dumps(js)
99
Harald Welte5bbb1442023-12-11 12:46:47 +0100100class SmDppHttpServer:
101 app = Klein()
102
103 @staticmethod
104 def load_certs_from_path(path: str) -> List[x509.Certificate]:
105 """Load all DER + PEM files from given directory path and return them as list of x509.Certificate
106 instances."""
107 certs = []
108 for dirpath, dirnames, filenames in os.walk(path):
109 for filename in filenames:
110 cert = None
111 if filename.endswith('.der'):
112 with open(os.path.join(dirpath, filename), 'rb') as f:
113 cert = x509.load_der_x509_certificate(f.read())
114 elif filename.endswith('.pem'):
115 with open(os.path.join(dirpath, filename), 'rb') as f:
116 cert = x509.load_pem_x509_certificate(f.read())
117 if cert:
118 # verify it is a CI certificate (keyCertSign + i-rspRole-ci)
119 if not cert_policy_has_oid(cert, oid.id_rspRole_ci):
120 raise ValueError("alleged CI certificate %s doesn't have CI policy" % filename)
121 certs.append(cert)
122 return certs
123
124 def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]:
125 """Find CI certificate for given key identifier."""
126 for cert in self.ci_certs:
127 print("cert: %s" % cert)
128 subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions))
129 print(subject_exts)
130 subject_pkid = subject_exts[0].value
131 print(subject_pkid)
132 if subject_pkid and subject_pkid.key_identifier == ci_pkid:
133 return cert
134 return None
135
136 def __init__(self, server_hostname: str, ci_certs_path: str, use_brainpool: bool = False):
137 self.server_hostname = server_hostname
Harald Welte8a39d002024-01-30 21:01:01 +0100138 self.upp_dir = os.path.realpath(os.path.join(DATA_DIR, 'upp'))
Harald Welte5bbb1442023-12-11 12:46:47 +0100139 self.ci_certs = self.load_certs_from_path(ci_certs_path)
140 # load DPauth cert + key
141 self.dp_auth = CertAndPrivkey(oid.id_rspRole_dp_auth_v2)
142 cert_dir = os.path.join(DATA_DIR, 'certs')
143 if use_brainpool:
144 self.dp_auth.cert_from_der_file(os.path.join(cert_dir, 'DPauth', 'CERT_S_SM_DPauth_ECDSA_BRP.der'))
145 self.dp_auth.privkey_from_pem_file(os.path.join(cert_dir, 'DPauth', 'SK_S_SM_DPauth_ECDSA_BRP.pem'))
146 else:
147 self.dp_auth.cert_from_der_file(os.path.join(cert_dir, 'DPauth', 'CERT_S_SM_DPauth_ECDSA_NIST.der'))
148 self.dp_auth.privkey_from_pem_file(os.path.join(cert_dir, 'DPauth', 'SK_S_SM_DPauth_ECDSA_NIST.pem'))
149 # load DPpb cert + key
150 self.dp_pb = CertAndPrivkey(oid.id_rspRole_dp_pb_v2)
151 if use_brainpool:
152 self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_BRP.der'))
153 self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_BRP.pem'))
154 else:
155 self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_NIST.der'))
156 self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_NIST.pem'))
157 self.rss = rsp.RspSessionStore(os.path.join(DATA_DIR, "sm-dp-sessions"))
158
159 @app.handle_errors(ApiError)
160 def handle_apierror(self, request: IRequest, failure):
161 request.setResponseCode(200)
162 pp(failure)
163 return failure.value.encode()
164
165 @staticmethod
166 def _ecdsa_verify(cert: x509.Certificate, signature: bytes, data: bytes) -> bool:
167 pubkey = cert.public_key()
168 dss_sig = ecdsa_tr03111_to_dss(signature)
169 try:
170 pubkey.verify(dss_sig, data, ec.ECDSA(hashes.SHA256()))
171 return True
172 except InvalidSignature:
173 return False
174
175 @staticmethod
176 def rsp_api_wrapper(func):
177 """Wrapper that can be used as decorator in order to perform common REST API endpoint entry/exit
178 functionality, such as JSON decoding/encoding and debug-printing."""
179 @functools.wraps(func)
180 def _api_wrapper(self, request: IRequest):
181 # TODO: evaluate User-Agent + X-Admin-Protocol header
182 # TODO: reject any non-JSON Content-type
183
184 content = json.loads(request.content.read())
Harald Welteebb6f7f2024-01-17 19:36:47 +0100185 print("Rx JSON: %s" % json.dumps(content))
Harald Welte5bbb1442023-12-11 12:46:47 +0100186 set_headers(request)
187
188 output = func(self, request, content) or {}
189
190 build_resp_header(output)
Harald Welteebb6f7f2024-01-17 19:36:47 +0100191 print("Tx JSON: %s" % json.dumps(output))
Harald Welte5bbb1442023-12-11 12:46:47 +0100192 return json.dumps(output)
193 return _api_wrapper
194
195 @app.route('/gsma/rsp2/es9plus/initiateAuthentication', methods=['POST'])
196 @rsp_api_wrapper
197 def initiateAutentication(self, request: IRequest, content: dict) -> dict:
198 """See ES9+ InitiateAuthentication SGP.22 Section 5.6.1"""
199 # Verify that the received address matches its own SM-DP+ address, where the comparison SHALL be
200 # case-insensitive. Otherwise, the SM-DP+ SHALL return a status code "SM-DP+ Address - Refused".
201 if content['smdpAddress'] != self.server_hostname:
202 raise ApiError('8.8.1', '3.8', 'Invalid SM-DP+ Address')
203
204 euiccChallenge = b64decode(content['euiccChallenge'])
205 if len(euiccChallenge) != 16:
206 raise ValueError
207
208 euiccInfo1_bin = b64decode(content['euiccInfo1'])
209 euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
210 print("Rx euiccInfo1: %s" % euiccInfo1)
211 #euiccInfo1['svn']
212
213 # TODO: If euiccCiPKIdListForSigningV3 is present ...
214
215 pkid_list = euiccInfo1['euiccCiPKIdListForSigning']
216 if 'euiccCiPKIdListForSigningV3' in euiccInfo1:
217 pkid_list = pkid_list + euiccInfo1['euiccCiPKIdListForSigningV3']
218 # verify it supports one of the keys indicated by euiccCiPKIdListForSigning
Harald Welteaf87cd52024-01-19 21:29:06 +0100219 ci_cert = None
220 for x in pkid_list:
221 ci_cert = self.ci_get_cert_for_pkid(x)
Harald Welte9fd4bbe2024-01-25 15:50:20 +0100222 # we already support multiple CI certificates but only one set of DPauth + DPpb keys. So we must
223 # make sure we choose a CI key-id which has issued both the eUICC as well as our own SM-DP side
224 # certs.
225 if ci_cert and cert_get_subject_key_id(ci_cert) == self.dp_auth.get_authority_key_identifier().key_identifier:
Harald Welteaf87cd52024-01-19 21:29:06 +0100226 break
Harald Welte9fd4bbe2024-01-25 15:50:20 +0100227 else:
228 ci_cert = None
Harald Welteaf87cd52024-01-19 21:29:06 +0100229 if not ci_cert:
Harald Welte5bbb1442023-12-11 12:46:47 +0100230 raise ApiError('8.8.2', '3.1', 'None of the proposed Public Key Identifiers is supported by the SM-DP+')
231
232 # TODO: Determine the set of CERT.DPauth.SIG that satisfy the following criteria:
233 # * Part of a certificate chain ending at one of the eSIM CA RootCA Certificate, whose Public Keys is
234 # supported by the eUICC (indicated by euiccCiPKIdListForVerification).
235 # * Using a certificate chain that the eUICC and the LPA both support:
236 #euiccInfo1['euiccCiPKIdListForVerification']
237 # raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA CErtificate with a Public Key supported by the eUICC')
238
239 # Generate a TransactionID which is used to identify the ongoing RSP session. The TransactionID
240 # SHALL be unique within the scope and lifetime of each SM-DP+.
241 transactionId = uuid.uuid4().hex
242 assert not transactionId in self.rss
243
244 # Generate a serverChallenge for eUICC authentication attached to the ongoing RSP session.
245 serverChallenge = os.urandom(16)
246
247 # Generate a serverSigned1 data object as expected by the eUICC and described in section 5.7.13 "ES10b.AuthenticateServer". If and only if both eUICC and LPA indicate crlStaplingV3Support, the SM-DP+ SHALL indicate crlStaplingV3Used in sessionContext.
248 serverSigned1 = {
249 'transactionId': h2b(transactionId),
250 'euiccChallenge': euiccChallenge,
251 'serverAddress': self.server_hostname,
252 'serverChallenge': serverChallenge,
253 }
254 print("Tx serverSigned1: %s" % serverSigned1)
255 serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
256 print("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
257 output = {}
258 output['serverSigned1'] = b64encode2str(serverSigned1_bin)
259
260 # Generate a signature (serverSignature1) as described in section 5.7.13 "ES10b.AuthenticateServer" using the SK related to the selected CERT.DPauth.SIG.
261 # serverSignature1 SHALL be created using the private key associated to the RSP Server Certificate for authentication, and verified by the eUICC using the contained public key as described in section 2.6.9. serverSignature1 SHALL apply on serverSigned1 data object.
262 output['serverSignature1'] = b64encode2str(b'\x5f\x37\x40' + self.dp_auth.ecdsa_sign(serverSigned1_bin))
263
264 output['transactionId'] = transactionId
265 server_cert_aki = self.dp_auth.get_authority_key_identifier()
266 output['euiccCiPKIdToBeUsed'] = b64encode2str(b'\x04\x14' + server_cert_aki.key_identifier)
267 output['serverCertificate'] = b64encode2str(self.dp_auth.get_cert_as_der()) # CERT.DPauth.SIG
268 # FIXME: add those certificate
269 #output['otherCertsInChain'] = b64encode2str()
270
271 # create SessionState and store it in rss
Harald Welteaf87cd52024-01-19 21:29:06 +0100272 self.rss[transactionId] = rsp.RspSessionState(transactionId, serverChallenge,
273 cert_get_subject_key_id(ci_cert))
Harald Welte5bbb1442023-12-11 12:46:47 +0100274
275 return output
276
277 @app.route('/gsma/rsp2/es9plus/authenticateClient', methods=['POST'])
278 @rsp_api_wrapper
279 def authenticateClient(self, request: IRequest, content: dict) -> dict:
280 """See ES9+ AuthenticateClient in SGP.22 Section 5.6.3"""
281 transactionId = content['transactionId']
282
283 authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
284 authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
285 print("Rx %s: %s" % authenticateServerResp)
286 if authenticateServerResp[0] == 'authenticateResponseError':
287 r_err = authenticateServerResp[1]
288 #r_err['transactionId']
289 #r_err['authenticateErrorCode']
290 raise ValueError("authenticateResponseError %s" % r_err)
291
292 r_ok = authenticateServerResp[1]
293 euiccSigned1 = r_ok['euiccSigned1']
294 # TODO: use original data, don't re-encode?
295 euiccSigned1_bin = rsp.asn1.encode('EuiccSigned1', euiccSigned1)
296 euiccSignature1_bin = r_ok['euiccSignature1']
297 euiccCertificate_dec = r_ok['euiccCertificate']
298 # TODO: use original data, don't re-encode?
299 euiccCertificate_bin = rsp.asn1.encode('Certificate', euiccCertificate_dec)
300 eumCertificate_dec = r_ok['eumCertificate']
301 eumCertificate_bin = rsp.asn1.encode('Certificate', eumCertificate_dec)
302 # TODO v3: otherCertsInChain
303
304 # load certificate
305 euicc_cert = x509.load_der_x509_certificate(euiccCertificate_bin)
306 eum_cert = x509.load_der_x509_certificate(eumCertificate_bin)
307
Harald Welte5bbb1442023-12-11 12:46:47 +0100308 # Verify that the transactionId is known and relates to an ongoing RSP session. Otherwise, the SM-DP+
309 # SHALL return a status code "TransactionId - Unknown"
310 ss = self.rss.get(transactionId, None)
311 if ss is None:
312 raise ApiError('8.10.1', '3.9', 'Unknown')
313 ss.euicc_cert = euicc_cert
Harald Welteaf87cd52024-01-19 21:29:06 +0100314 ss.eum_cert = eum_cert # TODO: do we need this in the state?
Harald Welte5bbb1442023-12-11 12:46:47 +0100315
Harald Welteaf87cd52024-01-19 21:29:06 +0100316 # Verify that the Root Certificate of the eUICC certificate chain corresponds to the
317 # euiccCiPKIdToBeUsed or TODO: euiccCiPKIdToBeUsedV3
318 if cert_get_auth_key_id(eum_cert) != ss.ci_cert_id:
319 raise ApiError('8.11.1', '3.9', 'Unknown')
320
321 # Verify the validity of the eUICC certificate chain
322 cs = CertificateSet(self.ci_get_cert_for_pkid(ss.ci_cert_id))
323 cs.add_intermediate_cert(eum_cert)
324 # TODO v3: otherCertsInChain
325 try:
326 cs.verify_cert_chain(euicc_cert)
327 except VerifyError:
328 raise ApiError('8.1.3', '6.1', 'Verification failed')
329 # raise ApiError('8.1.3', '6.3', 'Expired')
330
331
332 # Verify euiccSignature1 over euiccSigned1 using pubkey from euiccCertificate.
333 # Otherwise, the SM-DP+ SHALL return a status code "eUICC - Verification failed"
334 if not self._ecdsa_verify(euicc_cert, euiccSignature1_bin, euiccSigned1_bin):
335 raise ApiError('8.1', '6.1', 'Verification failed')
336
Harald Welte5bbb1442023-12-11 12:46:47 +0100337 # TODO: verify EID of eUICC cert is within permitted range of EUM cert
338
339 ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
340 print("EID (from eUICC cert): %s" % ss.eid)
341
342 # Verify that the serverChallenge attached to the ongoing RSP session matches the
343 # serverChallenge returned by the eUICC. Otherwise, the SM-DP+ SHALL return a status code "eUICC -
344 # Verification failed".
345 if euiccSigned1['serverChallenge'] != ss.serverChallenge:
346 raise ApiError('8.1', '6.1', 'Verification failed')
347
Harald Welte8a39d002024-01-30 21:01:01 +0100348 # If ctxParams1 contains a ctxParamsForCommonAuthentication data object, the SM-DP+ Shall [...]
349 # TODO: We really do a very simplistic job here, this needs to be properly implemented later,
350 # considering all the various cases, profile state, etc.
351 if euiccSigned1['ctxParams1'][0] == 'ctxParamsForCommonAuthentication':
352 cpca = euiccSigned1['ctxParams1'][1]
353 matchingId = cpca.get('matchingId', None)
354 if not matchingId:
355 # TODO: check if any pending profile downloads for the EID
356 raise ApiError('8.2.6', '3.8', 'Refused')
357 if matchingId:
358 # look up profile based on matchingID. We simply check if a given file exists for now..
359 path = os.path.join(self.upp_dir, matchingId) + '.der'
360 # prevent directory traversal attack
361 if os.path.commonprefix((os.path.realpath(path),self.upp_dir)) != self.upp_dir:
362 raise ApiError('8.2.6', '3.8', 'Refused')
363 if not os.path.isfile(path) or not os.access(path, os.R_OK):
364 raise ApiError('8.2.6', '3.8', 'Refused')
365 ss.matchingId = matchingId
366
367 # FIXME: we actually want to perform the profile binding herr, and read the profile metadat from the profile
368
Harald Welte5bbb1442023-12-11 12:46:47 +0100369 # Put together profileMetadata + _bin
370 ss.profileMetadata = ProfileMetadata(iccid_bin= h2b(swap_nibbles('89000123456789012358')), spn="OsmocomSPN", profile_name="OsmocomProfile")
371 profileMetadata_bin = ss.profileMetadata.gen_store_metadata_request()
372
373 # Put together smdpSigned2 + _bin
374 smdpSigned2 = {
375 'transactionId': h2b(ss.transactionId),
376 'ccRequiredFlag': False, # whether the Confirmation Code is required
377 #'bppEuiccOtpk': None, # whether otPK.EUICC.ECKA already used for binding the BPP, tag '5F49'
378 }
379 smdpSigned2_bin = rsp.asn1.encode('SmdpSigned2', smdpSigned2)
380
381 ss.smdpSignature2_do = b'\x5f\x37\x40' + self.dp_pb.ecdsa_sign(smdpSigned2_bin + b'\x5f\x37\x40' + euiccSignature1_bin)
382
383 # update non-volatile state with updated ss object
384 self.rss[transactionId] = ss
385 return {
386 'transactionId': transactionId,
387 'profileMetadata': b64encode2str(profileMetadata_bin),
388 'smdpSigned2': b64encode2str(smdpSigned2_bin),
389 'smdpSignature2': b64encode2str(ss.smdpSignature2_do),
390 'smdpCertificate': b64encode2str(self.dp_pb.get_cert_as_der()), # CERT.DPpb.SIG
391 }
392
393 @app.route('/gsma/rsp2/es9plus/getBoundProfilePackage', methods=['POST'])
394 @rsp_api_wrapper
395 def getBoundProfilePackage(self, request: IRequest, content: dict) -> dict:
396 """See ES9+ GetBoundProfilePackage SGP.22 Section 5.6.2"""
397 transactionId = content['transactionId']
398
399 # Verify that the received transactionId is known and relates to an ongoing RSP session
400 ss = self.rss.get(transactionId, None)
401 if not ss:
402 raise ApiError('8.10.1', '3.9', 'The RSP session identified by the TransactionID is unknown')
403
404 prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
405 prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
406 print("Rx %s: %s" % prepDownloadResp)
407
408 if prepDownloadResp[0] == 'downloadResponseError':
409 r_err = prepDownloadResp[1]
410 #r_err['transactionId']
411 #r_err['downloadErrorCode']
412 raise ValueError("downloadResponseError %s" % r_err)
413
414 r_ok = prepDownloadResp[1]
415
416 # Verify the euiccSignature2 computed over euiccSigned2 and smdpSignature2 using the PK.EUICC.SIG attached to the ongoing RSP session
417 euiccSigned2 = r_ok['euiccSigned2']
418 # TODO: use original data, don't re-encode?
419 euiccSigned2_bin = rsp.asn1.encode('EUICCSigned2', euiccSigned2)
420 if not self._ecdsa_verify(ss.euicc_cert, r_ok['euiccSignature2'], euiccSigned2_bin + ss.smdpSignature2_do):
421 raise ApiError('8.1', '6.1', 'eUICC signature is invalid')
422
423 # not in spec: Verify that signed TransactionID is outer transaction ID
424 if h2b(transactionId) != euiccSigned2['transactionId']:
425 raise ApiError('8.10.1', '3.9', 'The signed transactionId != outer transactionId')
426
427 # store otPK.EUICC.ECKA in session state
428 ss.euicc_otpk = euiccSigned2['euiccOtpk']
429 print("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
430
431 # Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
432 # Reference value of CERT.DPpb.ECDDSA
433 print("curve = %s" % self.dp_pb.get_curve())
434 ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
435 # extract the public key in (hopefully) the right format for the ES8+ interface
436 ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
437 print("smdpOtpk: %s" % b2h(ss.smdp_otpk))
438 print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
439
440 ss.host_id = b'mahlzeit'
441
442 # Generate Session Keys using the CRT, opPK.eUICC.ECKA and otSK.DP.ECKA according to annex G
443 euicc_public_key = ec.EllipticCurvePublicKey.from_encoded_point(ss.smdp_ot.curve, ss.euicc_otpk)
444 ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key)
445 print("shared_secret: %s" % b2h(ss.shared_secret))
446
447 # TODO: Check if this order requires a Confirmation Code verification
448
449 # Perform actual protection + binding of profile package (or return pre-bound one)
Harald Welte8a39d002024-01-30 21:01:01 +0100450 with open(os.path.join(self.upp_dir, ss.matchingId)+'.der', 'rb') as f:
Harald Welte5bbb1442023-12-11 12:46:47 +0100451 upp = UnprotectedProfilePackage.from_der(f.read(), metadata=ss.profileMetadata)
452 # HACK: Use empty PPP as we're still debuggin the configureISDP step, and we want to avoid
453 # cluttering the log with stuff happening after the failure
454 #upp = UnprotectedProfilePackage.from_der(b'', metadata=ss.profileMetadata)
455 if False:
456 # Use random keys
457 bpp = BoundProfilePackage.from_upp(upp)
458 else:
459 # Use sesssion keys
460 ppp = ProtectedProfilePackage.from_upp(upp, BspInstance(b'\x00'*16, b'\x11'*16, b'\x22'*16))
461 bpp = BoundProfilePackage.from_ppp(ppp)
462
463 # update non-volatile state with updated ss object
464 self.rss[transactionId] = ss
465 return {
466 'transactionId': transactionId,
467 'boundProfilePackage': b64encode2str(bpp.encode(ss, self.dp_pb)),
468 }
469
470 @app.route('/gsma/rsp2/es9plus/handleNotification', methods=['POST'])
471 @rsp_api_wrapper
472 def handleNotification(self, request: IRequest, content: dict) -> dict:
473 """See ES9+ HandleNotification in SGP.22 Section 5.6.4"""
474 pendingNotification_bin = b64decode(content['pendingNotification'])
475 pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
476 print("Rx %s: %s" % pendingNotification)
477 if pendingNotification[0] == 'profileInstallationResult':
478 profileInstallRes = pendingNotification[1]
479 pird = profileInstallRes['profileInstallationResultData']
480 transactionId = b2h(pird['transactionId'])
481 ss = self.rss.get(transactionId, None)
482 if ss is None:
483 print("Unable to find session for transactionId")
484 return
485 profileInstallRes['euiccSignPIR']
486 # TODO: use original data, don't re-encode?
487 pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
488 # verify eUICC signature
489 if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin):
490 print("Unable to verify eUICC signature")
491 print("Profile Installation Final Result: ", pird['finalResult'])
492 # remove session state
493 del self.rss[transactionId]
494 elif pendingNotification[0] == 'otherSignedNotification':
495 # TODO
496 pass
497 else:
498 raise ValueError(pendingNotification)
499
500 #@app.route('/gsma/rsp3/es9plus/handleDeviceChangeRequest, methods=['POST']')
501 #@rsp_api_wrapper
502 #"""See ES9+ ConfirmDeviceChange in SGP.22 Section 5.6.6"""
503 # TODO: implement this
504
505 @app.route('/gsma/rsp2/es9plus/cancelSession', methods=['POST'])
506 @rsp_api_wrapper
507 def cancelSession(self, request: IRequest, content: dict) -> dict:
508 """See ES9+ CancelSession in SGP.22 Section 5.6.5"""
509 print("Rx JSON: %s" % content)
510 transactionId = content['transactionId']
511
512 # Verify that the received transactionId is known and relates to an ongoing RSP session
513 ss = self.rss.get(transactionId, None)
514 if ss is None:
515 raise ApiError('8.10.1', '3.9', 'The RSP session identified by the transactionId is unknown')
516
517 cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
518 cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
519 print("Rx %s: %s" % cancelSessionResponse)
520
521 if cancelSessionResponse[0] == 'cancelSessionResponseError':
522 # FIXME: print some error
523 return
524 cancelSessionResponseOk = cancelSessionResponse[1]
525 # TODO: use original data, don't re-encode?
526 ecsr = cancelSessionResponseOk['euiccCancelSessionSigned']
527 ecsr_bin = rsp.asn1.encode('EuiccCancelSessionSigned', ecsr)
528 # Verify the eUICC signature (euiccCancelSessionSignature) using the PK.EUICC.SIG attached to the ongoing RSP session
529 if not self._ecdsa_verify(ss.euicc_cert, cancelSessionResponseOk['euiccCancelSessionSignature'], ecsr_bin):
530 raise ApiError('8.1', '6.1', 'eUICC signature is invalid')
531
532 # Verify that the received smdpOid corresponds to the one in SM-DP+ CERT.DPauth.SIG
533 subj_alt_name = self.dp_auth.get_subject_alt_name()
534 if x509.ObjectIdentifier(ecsr['smdpOid']) != subj_alt_name.oid:
535 raise ApiError('8.8', '3.10', 'The provided SM-DP+ OID is invalid.')
536
537 if ecsr['transactionId'] != h2b(transactionId):
538 raise ApiError('8.10.1', '3.9', 'The signed transactionId != outer transactionId')
539
540 # TODO: 1. Notify the Operator using the function "ES2+.HandleNotification" function
541 # TODO: 2. Terminate the corresponding pending download process.
542 # TODO: 3. If required, execute the SM-DS Event Deletion procedure described in section 3.6.3.
543
544 # delete actual session data
545 del self.rss[transactionId]
546 return { 'transactionId': transactionId }
547
548
549def main(argv):
550 parser = argparse.ArgumentParser()
551 #parser.add_argument("-H", "--host", help="Host/IP to bind HTTP to", default="localhost")
552 #parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000)
553 #parser.add_argument("-v", "--verbose", help="increase output verbosity", action='count', default=0)
554
555 args = parser.parse_args()
556
557 hs = SmDppHttpServer(HOSTNAME, os.path.join(DATA_DIR, 'certs', 'CertificateIssuer'), use_brainpool=True)
558 #hs.app.run(endpoint_description="ssl:port=8000:dhParameters=dh_param_2048.pem")
559 hs.app.run("localhost", 8000)
560
561if __name__ == "__main__":
562 main(sys.argv)