blob: 1f4311da0b4ae529f6223340b4935d7921fba7d0 [file] [log] [blame]
Harald Welte5bbb1442023-12-11 12:46:47 +01001#!/usr/bin/env python3
2
3# Early proof-of-concept towards a SM-DP+ HTTP service for GSMA consumer eSIM RSP
4#
5# (C) 2023-2024 by Harald Welte <laforge@osmocom.org>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU Affero General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU Affero General Public License for more details.
16#
17# You should have received a copy of the GNU Affero General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20import json
21import sys
22import argparse
23import uuid
24import os
25import functools
26from typing import Optional, Dict, List
27from pprint import pprint as pp
28
29import base64
30from base64 import b64decode
31from klein import Klein
32from twisted.web.iweb import IRequest
33import asn1tools
34
35from pySim.utils import h2b, b2h, swap_nibbles
36
37import pySim.esim.rsp as rsp
Harald Welte8449b142024-02-20 23:50:28 +010038from pySim.esim import saip
Harald Welte5bbb1442023-12-11 12:46:47 +010039from pySim.esim.es8p import *
Harald Welteaf87cd52024-01-19 21:29:06 +010040from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id
41from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError
Harald Welte5bbb1442023-12-11 12:46:47 +010042
43# HACK: make this configurable
44DATA_DIR = './smdpp-data'
45HOSTNAME = 'testsmdpplus1.example.com' # must match certificates!
46
47
48def b64encode2str(req: bytes) -> str:
49 """Encode given input bytes as base64 and return result as string."""
50 return base64.b64encode(req).decode('ascii')
51
52def set_headers(request: IRequest):
53 """Set the request headers as mandatory by GSMA eSIM RSP."""
54 request.setHeader('Content-Type', 'application/json;charset=UTF-8')
55 request.setHeader('X-Admin-Protocol', 'gsma/rsp/v2.1.0')
56
57def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict:
58 r = {'subjectCode': subject_code, 'reasonCode': reason_code }
59 if subject_id:
60 r['subjectIdentifier'] = subject_id
61 if message:
62 r['message'] = message
63 return r
64
65def build_resp_header(js: dict, status: str = 'Executed-Success', status_code_data = None) -> None:
66 # SGP.22 v3.0 6.5.1.4
67 js['header'] = {
68 'functionExecutionStatus': {
69 'status': status,
70 }
71 }
72 if status_code_data:
73 js['header']['functionExecutionStatus']['statusCodeData'] = status_code_data
74
75from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
Harald Welte45b7dc92024-01-19 19:28:15 +010076from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption
Harald Welte5bbb1442023-12-11 12:46:47 +010077from cryptography.hazmat.primitives.asymmetric import ec
78from cryptography.hazmat.primitives import hashes
79from cryptography.exceptions import InvalidSignature
80from cryptography import x509
81
Harald Welte5bbb1442023-12-11 12:46:47 +010082def ecdsa_tr03111_to_dss(sig: bytes) -> bytes:
83 """convert an ECDSA signature from BSI TR-03111 format to DER: first get long integers; then encode those."""
84 assert len(sig) == 64
85 r = int.from_bytes(sig[0:32], 'big')
86 s = int.from_bytes(sig[32:32*2], 'big')
87 return encode_dss_signature(r, s)
88
89
Harald Welte5bbb1442023-12-11 12:46:47 +010090class ApiError(Exception):
91 def __init__(self, subject_code: str, reason_code: str, message: Optional[str] = None,
92 subject_id: Optional[str] = None):
93 self.status_code = build_status_code(subject_code, reason_code, subject_id, message)
94
95 def encode(self) -> str:
96 """Encode the API Error into a responseHeader string."""
97 js = {}
98 build_resp_header(js, 'Failed', self.status_code)
99 return json.dumps(js)
100
Harald Welte5bbb1442023-12-11 12:46:47 +0100101class SmDppHttpServer:
102 app = Klein()
103
104 @staticmethod
105 def load_certs_from_path(path: str) -> List[x509.Certificate]:
106 """Load all DER + PEM files from given directory path and return them as list of x509.Certificate
107 instances."""
108 certs = []
109 for dirpath, dirnames, filenames in os.walk(path):
110 for filename in filenames:
111 cert = None
112 if filename.endswith('.der'):
113 with open(os.path.join(dirpath, filename), 'rb') as f:
114 cert = x509.load_der_x509_certificate(f.read())
115 elif filename.endswith('.pem'):
116 with open(os.path.join(dirpath, filename), 'rb') as f:
117 cert = x509.load_pem_x509_certificate(f.read())
118 if cert:
119 # verify it is a CI certificate (keyCertSign + i-rspRole-ci)
120 if not cert_policy_has_oid(cert, oid.id_rspRole_ci):
121 raise ValueError("alleged CI certificate %s doesn't have CI policy" % filename)
122 certs.append(cert)
123 return certs
124
125 def ci_get_cert_for_pkid(self, ci_pkid: bytes) -> Optional[x509.Certificate]:
126 """Find CI certificate for given key identifier."""
127 for cert in self.ci_certs:
128 print("cert: %s" % cert)
129 subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), cert.extensions))
130 print(subject_exts)
131 subject_pkid = subject_exts[0].value
132 print(subject_pkid)
133 if subject_pkid and subject_pkid.key_identifier == ci_pkid:
134 return cert
135 return None
136
137 def __init__(self, server_hostname: str, ci_certs_path: str, use_brainpool: bool = False):
138 self.server_hostname = server_hostname
Harald Welte8a39d002024-01-30 21:01:01 +0100139 self.upp_dir = os.path.realpath(os.path.join(DATA_DIR, 'upp'))
Harald Welte5bbb1442023-12-11 12:46:47 +0100140 self.ci_certs = self.load_certs_from_path(ci_certs_path)
141 # load DPauth cert + key
142 self.dp_auth = CertAndPrivkey(oid.id_rspRole_dp_auth_v2)
143 cert_dir = os.path.join(DATA_DIR, 'certs')
144 if use_brainpool:
145 self.dp_auth.cert_from_der_file(os.path.join(cert_dir, 'DPauth', 'CERT_S_SM_DPauth_ECDSA_BRP.der'))
146 self.dp_auth.privkey_from_pem_file(os.path.join(cert_dir, 'DPauth', 'SK_S_SM_DPauth_ECDSA_BRP.pem'))
147 else:
148 self.dp_auth.cert_from_der_file(os.path.join(cert_dir, 'DPauth', 'CERT_S_SM_DPauth_ECDSA_NIST.der'))
149 self.dp_auth.privkey_from_pem_file(os.path.join(cert_dir, 'DPauth', 'SK_S_SM_DPauth_ECDSA_NIST.pem'))
150 # load DPpb cert + key
151 self.dp_pb = CertAndPrivkey(oid.id_rspRole_dp_pb_v2)
152 if use_brainpool:
153 self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_BRP.der'))
154 self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_BRP.pem'))
155 else:
156 self.dp_pb.cert_from_der_file(os.path.join(cert_dir, 'DPpb', 'CERT_S_SM_DPpb_ECDSA_NIST.der'))
157 self.dp_pb.privkey_from_pem_file(os.path.join(cert_dir, 'DPpb', 'SK_S_SM_DPpb_ECDSA_NIST.pem'))
158 self.rss = rsp.RspSessionStore(os.path.join(DATA_DIR, "sm-dp-sessions"))
159
160 @app.handle_errors(ApiError)
161 def handle_apierror(self, request: IRequest, failure):
162 request.setResponseCode(200)
163 pp(failure)
164 return failure.value.encode()
165
166 @staticmethod
167 def _ecdsa_verify(cert: x509.Certificate, signature: bytes, data: bytes) -> bool:
168 pubkey = cert.public_key()
169 dss_sig = ecdsa_tr03111_to_dss(signature)
170 try:
171 pubkey.verify(dss_sig, data, ec.ECDSA(hashes.SHA256()))
172 return True
173 except InvalidSignature:
174 return False
175
176 @staticmethod
177 def rsp_api_wrapper(func):
178 """Wrapper that can be used as decorator in order to perform common REST API endpoint entry/exit
179 functionality, such as JSON decoding/encoding and debug-printing."""
180 @functools.wraps(func)
181 def _api_wrapper(self, request: IRequest):
182 # TODO: evaluate User-Agent + X-Admin-Protocol header
183 # TODO: reject any non-JSON Content-type
184
185 content = json.loads(request.content.read())
Harald Welteebb6f7f2024-01-17 19:36:47 +0100186 print("Rx JSON: %s" % json.dumps(content))
Harald Welte5bbb1442023-12-11 12:46:47 +0100187 set_headers(request)
188
189 output = func(self, request, content) or {}
190
191 build_resp_header(output)
Harald Welteebb6f7f2024-01-17 19:36:47 +0100192 print("Tx JSON: %s" % json.dumps(output))
Harald Welte5bbb1442023-12-11 12:46:47 +0100193 return json.dumps(output)
194 return _api_wrapper
195
196 @app.route('/gsma/rsp2/es9plus/initiateAuthentication', methods=['POST'])
197 @rsp_api_wrapper
198 def initiateAutentication(self, request: IRequest, content: dict) -> dict:
199 """See ES9+ InitiateAuthentication SGP.22 Section 5.6.1"""
200 # Verify that the received address matches its own SM-DP+ address, where the comparison SHALL be
201 # case-insensitive. Otherwise, the SM-DP+ SHALL return a status code "SM-DP+ Address - Refused".
202 if content['smdpAddress'] != self.server_hostname:
203 raise ApiError('8.8.1', '3.8', 'Invalid SM-DP+ Address')
204
205 euiccChallenge = b64decode(content['euiccChallenge'])
206 if len(euiccChallenge) != 16:
207 raise ValueError
208
209 euiccInfo1_bin = b64decode(content['euiccInfo1'])
210 euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
211 print("Rx euiccInfo1: %s" % euiccInfo1)
212 #euiccInfo1['svn']
213
214 # TODO: If euiccCiPKIdListForSigningV3 is present ...
215
216 pkid_list = euiccInfo1['euiccCiPKIdListForSigning']
217 if 'euiccCiPKIdListForSigningV3' in euiccInfo1:
218 pkid_list = pkid_list + euiccInfo1['euiccCiPKIdListForSigningV3']
219 # verify it supports one of the keys indicated by euiccCiPKIdListForSigning
Harald Welteaf87cd52024-01-19 21:29:06 +0100220 ci_cert = None
221 for x in pkid_list:
222 ci_cert = self.ci_get_cert_for_pkid(x)
Harald Welte9fd4bbe2024-01-25 15:50:20 +0100223 # we already support multiple CI certificates but only one set of DPauth + DPpb keys. So we must
224 # make sure we choose a CI key-id which has issued both the eUICC as well as our own SM-DP side
225 # certs.
226 if ci_cert and cert_get_subject_key_id(ci_cert) == self.dp_auth.get_authority_key_identifier().key_identifier:
Harald Welteaf87cd52024-01-19 21:29:06 +0100227 break
Harald Welte9fd4bbe2024-01-25 15:50:20 +0100228 else:
229 ci_cert = None
Harald Welteaf87cd52024-01-19 21:29:06 +0100230 if not ci_cert:
Harald Welte5bbb1442023-12-11 12:46:47 +0100231 raise ApiError('8.8.2', '3.1', 'None of the proposed Public Key Identifiers is supported by the SM-DP+')
232
233 # TODO: Determine the set of CERT.DPauth.SIG that satisfy the following criteria:
234 # * Part of a certificate chain ending at one of the eSIM CA RootCA Certificate, whose Public Keys is
235 # supported by the eUICC (indicated by euiccCiPKIdListForVerification).
236 # * Using a certificate chain that the eUICC and the LPA both support:
237 #euiccInfo1['euiccCiPKIdListForVerification']
238 # raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA CErtificate with a Public Key supported by the eUICC')
239
240 # Generate a TransactionID which is used to identify the ongoing RSP session. The TransactionID
241 # SHALL be unique within the scope and lifetime of each SM-DP+.
Philipp Maier7b524fa2024-03-15 09:22:28 +0100242 transactionId = uuid.uuid4().hex.upper()
Harald Welte5bbb1442023-12-11 12:46:47 +0100243 assert not transactionId in self.rss
244
245 # Generate a serverChallenge for eUICC authentication attached to the ongoing RSP session.
246 serverChallenge = os.urandom(16)
247
248 # Generate a serverSigned1 data object as expected by the eUICC and described in section 5.7.13 "ES10b.AuthenticateServer". If and only if both eUICC and LPA indicate crlStaplingV3Support, the SM-DP+ SHALL indicate crlStaplingV3Used in sessionContext.
249 serverSigned1 = {
250 'transactionId': h2b(transactionId),
251 'euiccChallenge': euiccChallenge,
252 'serverAddress': self.server_hostname,
253 'serverChallenge': serverChallenge,
254 }
255 print("Tx serverSigned1: %s" % serverSigned1)
256 serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
257 print("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
258 output = {}
259 output['serverSigned1'] = b64encode2str(serverSigned1_bin)
260
261 # Generate a signature (serverSignature1) as described in section 5.7.13 "ES10b.AuthenticateServer" using the SK related to the selected CERT.DPauth.SIG.
262 # serverSignature1 SHALL be created using the private key associated to the RSP Server Certificate for authentication, and verified by the eUICC using the contained public key as described in section 2.6.9. serverSignature1 SHALL apply on serverSigned1 data object.
263 output['serverSignature1'] = b64encode2str(b'\x5f\x37\x40' + self.dp_auth.ecdsa_sign(serverSigned1_bin))
264
265 output['transactionId'] = transactionId
266 server_cert_aki = self.dp_auth.get_authority_key_identifier()
267 output['euiccCiPKIdToBeUsed'] = b64encode2str(b'\x04\x14' + server_cert_aki.key_identifier)
268 output['serverCertificate'] = b64encode2str(self.dp_auth.get_cert_as_der()) # CERT.DPauth.SIG
269 # FIXME: add those certificate
270 #output['otherCertsInChain'] = b64encode2str()
271
272 # create SessionState and store it in rss
Harald Welteaf87cd52024-01-19 21:29:06 +0100273 self.rss[transactionId] = rsp.RspSessionState(transactionId, serverChallenge,
274 cert_get_subject_key_id(ci_cert))
Harald Welte5bbb1442023-12-11 12:46:47 +0100275
276 return output
277
278 @app.route('/gsma/rsp2/es9plus/authenticateClient', methods=['POST'])
279 @rsp_api_wrapper
280 def authenticateClient(self, request: IRequest, content: dict) -> dict:
281 """See ES9+ AuthenticateClient in SGP.22 Section 5.6.3"""
282 transactionId = content['transactionId']
283
284 authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
285 authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
286 print("Rx %s: %s" % authenticateServerResp)
287 if authenticateServerResp[0] == 'authenticateResponseError':
288 r_err = authenticateServerResp[1]
289 #r_err['transactionId']
290 #r_err['authenticateErrorCode']
291 raise ValueError("authenticateResponseError %s" % r_err)
292
293 r_ok = authenticateServerResp[1]
294 euiccSigned1 = r_ok['euiccSigned1']
295 # TODO: use original data, don't re-encode?
296 euiccSigned1_bin = rsp.asn1.encode('EuiccSigned1', euiccSigned1)
297 euiccSignature1_bin = r_ok['euiccSignature1']
298 euiccCertificate_dec = r_ok['euiccCertificate']
299 # TODO: use original data, don't re-encode?
300 euiccCertificate_bin = rsp.asn1.encode('Certificate', euiccCertificate_dec)
301 eumCertificate_dec = r_ok['eumCertificate']
302 eumCertificate_bin = rsp.asn1.encode('Certificate', eumCertificate_dec)
303 # TODO v3: otherCertsInChain
304
305 # load certificate
306 euicc_cert = x509.load_der_x509_certificate(euiccCertificate_bin)
307 eum_cert = x509.load_der_x509_certificate(eumCertificate_bin)
308
Harald Welte5bbb1442023-12-11 12:46:47 +0100309 # Verify that the transactionId is known and relates to an ongoing RSP session. Otherwise, the SM-DP+
310 # SHALL return a status code "TransactionId - Unknown"
311 ss = self.rss.get(transactionId, None)
312 if ss is None:
313 raise ApiError('8.10.1', '3.9', 'Unknown')
314 ss.euicc_cert = euicc_cert
Harald Welteaf87cd52024-01-19 21:29:06 +0100315 ss.eum_cert = eum_cert # TODO: do we need this in the state?
Harald Welte5bbb1442023-12-11 12:46:47 +0100316
Harald Welteaf87cd52024-01-19 21:29:06 +0100317 # Verify that the Root Certificate of the eUICC certificate chain corresponds to the
318 # euiccCiPKIdToBeUsed or TODO: euiccCiPKIdToBeUsedV3
319 if cert_get_auth_key_id(eum_cert) != ss.ci_cert_id:
320 raise ApiError('8.11.1', '3.9', 'Unknown')
321
322 # Verify the validity of the eUICC certificate chain
323 cs = CertificateSet(self.ci_get_cert_for_pkid(ss.ci_cert_id))
324 cs.add_intermediate_cert(eum_cert)
325 # TODO v3: otherCertsInChain
326 try:
327 cs.verify_cert_chain(euicc_cert)
328 except VerifyError:
329 raise ApiError('8.1.3', '6.1', 'Verification failed')
330 # raise ApiError('8.1.3', '6.3', 'Expired')
331
332
333 # Verify euiccSignature1 over euiccSigned1 using pubkey from euiccCertificate.
334 # Otherwise, the SM-DP+ SHALL return a status code "eUICC - Verification failed"
335 if not self._ecdsa_verify(euicc_cert, euiccSignature1_bin, euiccSigned1_bin):
336 raise ApiError('8.1', '6.1', 'Verification failed')
337
Harald Welte5bbb1442023-12-11 12:46:47 +0100338 # TODO: verify EID of eUICC cert is within permitted range of EUM cert
339
340 ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value
341 print("EID (from eUICC cert): %s" % ss.eid)
342
343 # Verify that the serverChallenge attached to the ongoing RSP session matches the
344 # serverChallenge returned by the eUICC. Otherwise, the SM-DP+ SHALL return a status code "eUICC -
345 # Verification failed".
346 if euiccSigned1['serverChallenge'] != ss.serverChallenge:
347 raise ApiError('8.1', '6.1', 'Verification failed')
348
Harald Welte8a39d002024-01-30 21:01:01 +0100349 # If ctxParams1 contains a ctxParamsForCommonAuthentication data object, the SM-DP+ Shall [...]
350 # TODO: We really do a very simplistic job here, this needs to be properly implemented later,
351 # considering all the various cases, profile state, etc.
352 if euiccSigned1['ctxParams1'][0] == 'ctxParamsForCommonAuthentication':
353 cpca = euiccSigned1['ctxParams1'][1]
354 matchingId = cpca.get('matchingId', None)
355 if not matchingId:
356 # TODO: check if any pending profile downloads for the EID
357 raise ApiError('8.2.6', '3.8', 'Refused')
358 if matchingId:
359 # look up profile based on matchingID. We simply check if a given file exists for now..
360 path = os.path.join(self.upp_dir, matchingId) + '.der'
361 # prevent directory traversal attack
362 if os.path.commonprefix((os.path.realpath(path),self.upp_dir)) != self.upp_dir:
363 raise ApiError('8.2.6', '3.8', 'Refused')
364 if not os.path.isfile(path) or not os.access(path, os.R_OK):
365 raise ApiError('8.2.6', '3.8', 'Refused')
366 ss.matchingId = matchingId
Harald Welte8449b142024-02-20 23:50:28 +0100367 with open(path, 'rb') as f:
368 pes = saip.ProfileElementSequence.from_der(f.read())
369 iccid_str = b2h(pes.get_pe_for_type('header').decoded['iccid'])
Harald Welte8a39d002024-01-30 21:01:01 +0100370
371 # FIXME: we actually want to perform the profile binding herr, and read the profile metadat from the profile
372
Harald Welte5bbb1442023-12-11 12:46:47 +0100373 # Put together profileMetadata + _bin
Harald Welte8449b142024-02-20 23:50:28 +0100374 ss.profileMetadata = ProfileMetadata(iccid_bin=h2b(swap_nibbles(iccid_str)), spn="OsmocomSPN", profile_name=matchingId)
Harald Welte5bbb1442023-12-11 12:46:47 +0100375 profileMetadata_bin = ss.profileMetadata.gen_store_metadata_request()
376
377 # Put together smdpSigned2 + _bin
378 smdpSigned2 = {
379 'transactionId': h2b(ss.transactionId),
380 'ccRequiredFlag': False, # whether the Confirmation Code is required
381 #'bppEuiccOtpk': None, # whether otPK.EUICC.ECKA already used for binding the BPP, tag '5F49'
382 }
383 smdpSigned2_bin = rsp.asn1.encode('SmdpSigned2', smdpSigned2)
384
385 ss.smdpSignature2_do = b'\x5f\x37\x40' + self.dp_pb.ecdsa_sign(smdpSigned2_bin + b'\x5f\x37\x40' + euiccSignature1_bin)
386
387 # update non-volatile state with updated ss object
388 self.rss[transactionId] = ss
389 return {
390 'transactionId': transactionId,
391 'profileMetadata': b64encode2str(profileMetadata_bin),
392 'smdpSigned2': b64encode2str(smdpSigned2_bin),
393 'smdpSignature2': b64encode2str(ss.smdpSignature2_do),
394 'smdpCertificate': b64encode2str(self.dp_pb.get_cert_as_der()), # CERT.DPpb.SIG
395 }
396
397 @app.route('/gsma/rsp2/es9plus/getBoundProfilePackage', methods=['POST'])
398 @rsp_api_wrapper
399 def getBoundProfilePackage(self, request: IRequest, content: dict) -> dict:
400 """See ES9+ GetBoundProfilePackage SGP.22 Section 5.6.2"""
401 transactionId = content['transactionId']
402
403 # Verify that the received transactionId is known and relates to an ongoing RSP session
404 ss = self.rss.get(transactionId, None)
405 if not ss:
406 raise ApiError('8.10.1', '3.9', 'The RSP session identified by the TransactionID is unknown')
407
408 prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
409 prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
410 print("Rx %s: %s" % prepDownloadResp)
411
412 if prepDownloadResp[0] == 'downloadResponseError':
413 r_err = prepDownloadResp[1]
414 #r_err['transactionId']
415 #r_err['downloadErrorCode']
416 raise ValueError("downloadResponseError %s" % r_err)
417
418 r_ok = prepDownloadResp[1]
419
420 # Verify the euiccSignature2 computed over euiccSigned2 and smdpSignature2 using the PK.EUICC.SIG attached to the ongoing RSP session
421 euiccSigned2 = r_ok['euiccSigned2']
422 # TODO: use original data, don't re-encode?
423 euiccSigned2_bin = rsp.asn1.encode('EUICCSigned2', euiccSigned2)
424 if not self._ecdsa_verify(ss.euicc_cert, r_ok['euiccSignature2'], euiccSigned2_bin + ss.smdpSignature2_do):
425 raise ApiError('8.1', '6.1', 'eUICC signature is invalid')
426
427 # not in spec: Verify that signed TransactionID is outer transaction ID
428 if h2b(transactionId) != euiccSigned2['transactionId']:
429 raise ApiError('8.10.1', '3.9', 'The signed transactionId != outer transactionId')
430
431 # store otPK.EUICC.ECKA in session state
432 ss.euicc_otpk = euiccSigned2['euiccOtpk']
433 print("euiccOtpk: %s" % (b2h(ss.euicc_otpk)))
434
435 # Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter
436 # Reference value of CERT.DPpb.ECDDSA
437 print("curve = %s" % self.dp_pb.get_curve())
438 ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve())
439 # extract the public key in (hopefully) the right format for the ES8+ interface
440 ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
441 print("smdpOtpk: %s" % b2h(ss.smdp_otpk))
442 print("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())))
443
444 ss.host_id = b'mahlzeit'
445
446 # Generate Session Keys using the CRT, opPK.eUICC.ECKA and otSK.DP.ECKA according to annex G
447 euicc_public_key = ec.EllipticCurvePublicKey.from_encoded_point(ss.smdp_ot.curve, ss.euicc_otpk)
448 ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key)
449 print("shared_secret: %s" % b2h(ss.shared_secret))
450
451 # TODO: Check if this order requires a Confirmation Code verification
452
453 # Perform actual protection + binding of profile package (or return pre-bound one)
Harald Welte8a39d002024-01-30 21:01:01 +0100454 with open(os.path.join(self.upp_dir, ss.matchingId)+'.der', 'rb') as f:
Harald Welte5bbb1442023-12-11 12:46:47 +0100455 upp = UnprotectedProfilePackage.from_der(f.read(), metadata=ss.profileMetadata)
456 # HACK: Use empty PPP as we're still debuggin the configureISDP step, and we want to avoid
457 # cluttering the log with stuff happening after the failure
458 #upp = UnprotectedProfilePackage.from_der(b'', metadata=ss.profileMetadata)
459 if False:
460 # Use random keys
461 bpp = BoundProfilePackage.from_upp(upp)
462 else:
463 # Use sesssion keys
464 ppp = ProtectedProfilePackage.from_upp(upp, BspInstance(b'\x00'*16, b'\x11'*16, b'\x22'*16))
465 bpp = BoundProfilePackage.from_ppp(ppp)
466
467 # update non-volatile state with updated ss object
468 self.rss[transactionId] = ss
469 return {
470 'transactionId': transactionId,
471 'boundProfilePackage': b64encode2str(bpp.encode(ss, self.dp_pb)),
472 }
473
474 @app.route('/gsma/rsp2/es9plus/handleNotification', methods=['POST'])
475 @rsp_api_wrapper
476 def handleNotification(self, request: IRequest, content: dict) -> dict:
477 """See ES9+ HandleNotification in SGP.22 Section 5.6.4"""
478 pendingNotification_bin = b64decode(content['pendingNotification'])
479 pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
480 print("Rx %s: %s" % pendingNotification)
481 if pendingNotification[0] == 'profileInstallationResult':
482 profileInstallRes = pendingNotification[1]
483 pird = profileInstallRes['profileInstallationResultData']
484 transactionId = b2h(pird['transactionId'])
485 ss = self.rss.get(transactionId, None)
486 if ss is None:
487 print("Unable to find session for transactionId")
488 return
489 profileInstallRes['euiccSignPIR']
490 # TODO: use original data, don't re-encode?
491 pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird)
492 # verify eUICC signature
493 if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin):
494 print("Unable to verify eUICC signature")
495 print("Profile Installation Final Result: ", pird['finalResult'])
496 # remove session state
497 del self.rss[transactionId]
498 elif pendingNotification[0] == 'otherSignedNotification':
499 # TODO
500 pass
501 else:
502 raise ValueError(pendingNotification)
503
504 #@app.route('/gsma/rsp3/es9plus/handleDeviceChangeRequest, methods=['POST']')
505 #@rsp_api_wrapper
506 #"""See ES9+ ConfirmDeviceChange in SGP.22 Section 5.6.6"""
507 # TODO: implement this
508
509 @app.route('/gsma/rsp2/es9plus/cancelSession', methods=['POST'])
510 @rsp_api_wrapper
511 def cancelSession(self, request: IRequest, content: dict) -> dict:
512 """See ES9+ CancelSession in SGP.22 Section 5.6.5"""
513 print("Rx JSON: %s" % content)
514 transactionId = content['transactionId']
515
516 # Verify that the received transactionId is known and relates to an ongoing RSP session
517 ss = self.rss.get(transactionId, None)
518 if ss is None:
519 raise ApiError('8.10.1', '3.9', 'The RSP session identified by the transactionId is unknown')
520
521 cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
522 cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
523 print("Rx %s: %s" % cancelSessionResponse)
524
525 if cancelSessionResponse[0] == 'cancelSessionResponseError':
526 # FIXME: print some error
527 return
528 cancelSessionResponseOk = cancelSessionResponse[1]
529 # TODO: use original data, don't re-encode?
530 ecsr = cancelSessionResponseOk['euiccCancelSessionSigned']
531 ecsr_bin = rsp.asn1.encode('EuiccCancelSessionSigned', ecsr)
532 # Verify the eUICC signature (euiccCancelSessionSignature) using the PK.EUICC.SIG attached to the ongoing RSP session
533 if not self._ecdsa_verify(ss.euicc_cert, cancelSessionResponseOk['euiccCancelSessionSignature'], ecsr_bin):
534 raise ApiError('8.1', '6.1', 'eUICC signature is invalid')
535
536 # Verify that the received smdpOid corresponds to the one in SM-DP+ CERT.DPauth.SIG
537 subj_alt_name = self.dp_auth.get_subject_alt_name()
538 if x509.ObjectIdentifier(ecsr['smdpOid']) != subj_alt_name.oid:
539 raise ApiError('8.8', '3.10', 'The provided SM-DP+ OID is invalid.')
540
541 if ecsr['transactionId'] != h2b(transactionId):
542 raise ApiError('8.10.1', '3.9', 'The signed transactionId != outer transactionId')
543
544 # TODO: 1. Notify the Operator using the function "ES2+.HandleNotification" function
545 # TODO: 2. Terminate the corresponding pending download process.
546 # TODO: 3. If required, execute the SM-DS Event Deletion procedure described in section 3.6.3.
547
548 # delete actual session data
549 del self.rss[transactionId]
550 return { 'transactionId': transactionId }
551
552
553def main(argv):
554 parser = argparse.ArgumentParser()
555 #parser.add_argument("-H", "--host", help="Host/IP to bind HTTP to", default="localhost")
556 #parser.add_argument("-p", "--port", help="TCP port to bind HTTP to", default=8000)
557 #parser.add_argument("-v", "--verbose", help="increase output verbosity", action='count', default=0)
558
559 args = parser.parse_args()
560
561 hs = SmDppHttpServer(HOSTNAME, os.path.join(DATA_DIR, 'certs', 'CertificateIssuer'), use_brainpool=True)
562 #hs.app.run(endpoint_description="ssl:port=8000:dhParameters=dh_param_2048.pem")
563 hs.app.run("localhost", 8000)
564
565if __name__ == "__main__":
566 main(sys.argv)