blob: fa21d2c6aa7cea7a438340d613baa876cea7b819 [file] [log] [blame]
Harald Welte95873a92024-02-06 19:42:19 +01001"""GSMA eSIM RSP ES2+ interface according to SGP.22 v2.5"""
2
3# (C) 2024 by Harald Welte <laforge@osmocom.org>
4#
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU Affero General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU Affero General Public License for more details.
14#
15# You should have received a copy of the GNU Affero General Public License
16# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18import abc
19import requests
20import logging
21import json
22from datetime import datetime
23import time
24import base64
25
26logger = logging.getLogger(__name__)
27logger.setLevel(logging.DEBUG)
28
29class ApiParam(abc.ABC):
30 """A class reprsenting a single parameter in the ES2+ API."""
31 @classmethod
32 def verify_decoded(cls, data):
33 """Verify the decoded reprsentation of a value. Should raise an exception if somthing is odd."""
34 pass
35
36 @classmethod
37 def verify_encoded(cls, data):
38 """Verify the encoded reprsentation of a value. Should raise an exception if somthing is odd."""
39 pass
40
41 @classmethod
42 def encode(cls, data):
43 """[Validate and] Encode the given value."""
44 cls.verify_decoded(data)
45 encoded = cls._encode(data)
46 cls.verify_decoded(encoded)
47 return encoded
48
49 @classmethod
50 def _encode(cls, data):
51 """encoder function, typically [but not always] overridden by derived class."""
52 return data
53
54 @classmethod
55 def decode(cls, data):
56 """[Validate and] Decode the given value."""
57 cls.verify_encoded(data)
58 decoded = cls._decode(data)
59 cls.verify_decoded(decoded)
60 return decoded
61
62 @classmethod
63 def _decode(cls, data):
64 """decoder function, typically [but not always] overridden by derived class."""
65 return data
66
67class ApiParamString(ApiParam):
68 """Base class representing an API parameter of 'string' type."""
69 pass
70
71
72class ApiParamInteger(ApiParam):
73 """Base class representing an API parameter of 'integer' type."""
74 @classmethod
75 def _decode(cls, data):
76 return int(data)
77
78 @classmethod
79 def _encode(cls, data):
80 return str(data)
81
82 @classmethod
83 def verify_decoded(cls, data):
84 if not isinstance(data, int):
85 raise TypeError('Expected an integer input data type')
86
87 @classmethod
88 def verify_encoded(cls, data):
Harald Welteb2b29cf2024-04-02 21:59:03 +000089 if isinstance(data, int):
90 return
Harald Welte95873a92024-02-06 19:42:19 +010091 if not data.isdecimal():
92 raise ValueError('integer (%s) contains non-decimal characters' % data)
93 assert str(int(data)) == data
94
95class ApiParamBoolean(ApiParam):
96 """Base class representing an API parameter of 'boolean' type."""
97 @classmethod
98 def _encode(cls, data):
99 return bool(data)
100
101class ApiParamFqdn(ApiParam):
102 """String, as a list of domain labels concatenated using the full stop (dot, period) character as
103 separator between labels. Labels are restricted to the Alphanumeric mode character set defined in table 5
104 of ISO/IEC 18004"""
105 @classmethod
106 def verify_encoded(cls, data):
107 # FIXME
108 pass
109
110class param:
111 class Iccid(ApiParamString):
112 """String representation of 19 or 20 digits, where the 20th digit MAY optionally be the padding
113 character F."""
114 @classmethod
115 def _encode(cls, data):
116 data = str(data)
117 # SGP.22 version prior to 2.2 do not require support for 19-digit ICCIDs, so let's always
118 # encode it with padding F at the end.
119 if len(data) == 19:
120 data += 'F'
121 return data
122
123 @classmethod
124 def verify_encoded(cls, data):
125 if len(data) not in [19, 20]:
126 raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data)))
127
128 @classmethod
129 def _decode(cls, data):
130 # strip trailing padding (if it's 20 digits)
131 if len(data) == 20 and data[-1] in ['F', 'f']:
132 data = data[:-1]
133 return data
134
135 @classmethod
136 def verify_decoded(cls, data):
137 data = str(data)
138 if len(data) not in [19, 20]:
139 raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data)))
140 if len(data) == 19:
141 decimal_part = data
142 else:
143 decimal_part = data[:-1]
144 final_part = data[-1:]
145 if final_part not in ['F', 'f'] and not final_part.isdecimal():
146 raise ValueError('ICCID (%s) contains non-decimal characters' % data)
147 if not decimal_part.isdecimal():
148 raise ValueError('ICCID (%s) contains non-decimal characters' % data)
149
150
151 class Eid(ApiParamString):
152 """String of 32 decimal characters"""
153 @classmethod
154 def verify_encoded(cls, data):
155 if len(data) != 32:
156 raise ValueError('EID length invalid: "%s" (%u)' % (data, len(data)))
157
158 @classmethod
159 def verify_decoded(cls, data):
160 if not data.isdecimal():
161 raise ValueError('EID (%s) contains non-decimal characters' % data)
162
163 class ProfileType(ApiParamString):
164 pass
165
166 class MatchingId(ApiParamString):
167 pass
168
169 class ConfirmationCode(ApiParamString):
170 pass
171
172 class SmdsAddress(ApiParamFqdn):
173 pass
174
175 class SmdpAddress(ApiParamFqdn):
176 pass
177
178 class ReleaseFlag(ApiParamBoolean):
179 pass
180
181 class FinalProfileStatusIndicator(ApiParamString):
182 pass
183
184 class Timestamp(ApiParamString):
185 """String format as specified by W3C: YYYY-MM-DDThh:mm:ssTZD"""
186 @classmethod
187 def _decode(cls, data):
188 return datetime.fromisoformat(data)
189
190 @classmethod
191 def _encode(cls, data):
192 return datetime.toisoformat(data)
193
194 class NotificationPointId(ApiParamInteger):
195 pass
196
197 class NotificationPointStatus(ApiParam):
198 pass
199
200 class ResultData(ApiParam):
201 @classmethod
202 def _decode(cls, data):
203 return base64.b64decode(data)
204
205 @classmethod
206 def _encode(cls, data):
207 return base64.b64encode(data)
208
209 class JsonResponseHeader(ApiParam):
210 """SGP.22 section 6.5.1.4."""
211 @classmethod
212 def verify_decoded(cls, data):
213 fe_status = data.get('functionExecutionStatus')
214 if not fe_status:
215 raise ValueError('Missing mandatory functionExecutionStatus in header')
216 status = fe_status.get('status')
217 if not status:
218 raise ValueError('Missing mandatory status in header functionExecutionStatus')
219 if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']:
220 raise ValueError('Unknown/unspecified status "%s"' % status)
221
222
223class HttpStatusError(Exception):
224 pass
225
226class HttpHeaderError(Exception):
227 pass
228
229class Es2PlusApiError(Exception):
230 """Exception representing an error at the ES2+ API level (status != Executed)."""
231 def __init__(self, func_ex_status: dict):
232 self.status = func_ex_status['status']
233 sec = {
234 'subjectCode': None,
235 'reasonCode': None,
236 'subjectIdentifier': None,
237 'message': None,
238 }
239 actual_sec = func_ex_status.get('statusCodeData', None)
240 sec.update(actual_sec)
241 self.subject_code = sec['subjectCode']
242 self.reason_code = sec['reasonCode']
243 self.subject_id = sec['subjectIdentifier']
244 self.message = sec['message']
245
246 def __str__(self):
247 return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")'
248
249class Es2PlusApiFunction(abc.ABC):
250 """Base classs for representing an ES2+ API Function."""
251 # the below class variables are expected to be overridden in derived classes
252
253 path = None
254 # dictionary of input parameters. key is parameter name, value is ApiParam class
255 input_params = {}
256 # list of mandatory input parameters
257 input_mandatory = []
258 # dictionary of output parameters. key is parameter name, value is ApiParam class
259 output_params = {}
260 # list of mandatory output parameters (for successful response)
261 output_mandatory = []
262 # expected HTTP status code of the response
263 expected_http_status = 200
Harald Welte1d1ba8e2024-04-02 22:00:27 +0000264 # the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE)
265 http_method = 'POST'
Harald Welte95873a92024-02-06 19:42:19 +0100266
267 def __init__(self, url_prefix: str, func_req_id: str, session):
268 self.url_prefix = url_prefix
269 self.func_req_id = func_req_id
270 self.session = session
271
272 def encode(self, data: dict, func_call_id: str) -> dict:
273 """Validate an encode input dict into JSON-serializable dict for request body."""
274 output = {
275 'header': {
276 'functionRequesterIdentifier': self.func_req_id,
277 'functionCallIdentifier': func_call_id
278 }
279 }
280 for p in self.input_mandatory:
281 if not p in data:
282 raise ValueError('Mandatory input parameter %s missing' % p)
283 for p, v in data.items():
284 p_class = self.input_params.get(p)
285 if not p_class:
286 logger.warning('Unexpected/unsupported input parameter %s=%s', p, v)
287 output[p] = v
288 else:
289 output[p] = p_class.encode(v)
290 return output
291
292
293 def decode(self, data: dict) -> dict:
294 """[further] Decode and validate the JSON-Dict of the respnse body."""
295 output = {}
296 # let's first do the header, it's special
297 if not 'header' in data:
298 raise ValueError('Mandatory output parameter "header" missing')
299 hdr_class = self.output_params.get('header')
300 output['header'] = hdr_class.decode(data['header'])
301
302 if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']:
303 raise Es2PlusApiError(output['header']['functionExecutionStatus'])
304 # we can only expect mandatory parameters to be present in case of successful execution
305 for p in self.output_mandatory:
306 if p == 'header':
307 continue
308 if not p in data:
309 raise ValueError('Mandatory output parameter "%s" missing' % p)
310 for p, v in data.items():
311 p_class = self.output_params.get(p)
312 if not p_class:
313 logger.warning('Unexpected/unsupported output parameter "%s"="%s"', p, v)
314 output[p] = v
315 else:
316 output[p] = p_class.decode(v)
317 return output
318
319 def call(self, data: dict, func_call_id:str, timeout=10) -> dict:
320 """Make an API call to the ES2+ API endpoint represented by this object.
321 Input data is passed in `data` as json-serializable dict. Output data
322 is returned as json-deserialized dict."""
323 url = self.url_prefix + self.path
324 encoded = json.dumps(self.encode(data, func_call_id))
325 headers = {
326 'Content-Type': 'application/json',
327 'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
328 }
329
330 logger.debug("HTTP REQ %s - '%s'" % (url, encoded))
Harald Welte1d1ba8e2024-04-02 22:00:27 +0000331 response = self.session.request(self.http_method, url, data=encoded, headers=headers, timeout=timeout)
Harald Welte95873a92024-02-06 19:42:19 +0100332 logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers))
333 logger.debug("HTTP RSP: %s" % (response.content))
334
335 if response.status_code != self.expected_http_status:
336 raise HttpStatusError(response)
337 if not response.headers.get('Content-Type').startswith(headers['Content-Type']):
338 raise HttpHeaderError(response)
339 if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'):
340 raise HttpHeaderError(response)
341
342 return self.decode(response.json())
343
344
345# ES2+ DownloadOrder function (SGP.22 section 5.3.1)
346class DownloadOrder(Es2PlusApiFunction):
347 path = '/gsma/rsp2/es2plus/downloadOrder'
348 input_params = {
349 'eid': param.Eid,
350 'iccid': param.Iccid,
351 'profileType': param.ProfileType
352 }
353 output_params = {
354 'header': param.JsonResponseHeader,
355 'iccid': param.Iccid,
356 }
357 output_mandatory = ['header', 'iccid']
358
359# ES2+ ConfirmOrder function (SGP.22 section 5.3.2)
360class ConfirmOrder(Es2PlusApiFunction):
361 path = '/gsma/rsp2/es2plus/confirmOrder'
362 input_params = {
363 'iccid': param.Iccid,
364 'eid': param.Eid,
365 'matchingId': param.MatchingId,
366 'confirmationCode': param.ConfirmationCode,
367 'smdsAddress': param.SmdsAddress,
368 'releaseFlag': param.ReleaseFlag,
369 }
370 input_mandatory = ['iccid', 'releaseFlag']
371 output_params = {
372 'header': param.JsonResponseHeader,
373 'eid': param.Eid,
374 'matchingId': param.MatchingId,
375 'smdpAddress': param.SmdpAddress,
376 }
377 output_mandatory = ['header', 'matchingId']
378
379# ES2+ CancelOrder function (SGP.22 section 5.3.3)
380class CancelOrder(Es2PlusApiFunction):
381 path = '/gsma/rsp2/es2plus/cancelOrder'
382 input_params = {
383 'iccid': param.Iccid,
384 'eid': param.Eid,
385 'matchingId': param.MatchingId,
386 'finalProfileStatusIndicator': param.FinalProfileStatusIndicator,
387 }
388 input_mandatory = ['finalProfileStatusIndicator', 'iccid']
389 output_params = {
390 'header': param.JsonResponseHeader,
391 }
392 output_mandatory = ['header']
393
394# ES2+ ReleaseProfile function (SGP.22 section 5.3.4)
395class ReleaseProfile(Es2PlusApiFunction):
396 path = '/gsma/rsp2/es2plus/releaseProfile'
397 input_params = {
398 'iccid': param.Iccid,
399 }
400 input_mandatory = ['iccid']
401 output_params = {
402 'header': param.JsonResponseHeader,
403 }
404 output_mandatory = ['header']
405
406# ES2+ HandleDownloadProgress function (SGP.22 section 5.3.5)
407class HandleDownloadProgressInfo(Es2PlusApiFunction):
408 path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo'
409 input_params = {
410 'eid': param.Eid,
411 'iccid': param.Iccid,
412 'profileType': param.ProfileType,
413 'timestamp': param.Timestamp,
414 'notificationPointId': param.NotificationPointId,
415 'notificationPointStatus': param.NotificationPointStatus,
416 'resultData': param.ResultData,
417 }
418 input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus']
419 expected_http_status = 204
420
421
422class Es2pApiClient:
423 """Main class representing a full ES2+ API client. Has one method for each API function."""
424 def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None):
425 self.func_id = 0
426 self.session = requests.Session()
427 if server_cert_verify:
428 self.session.verify = server_cert_verify
429 if client_cert:
430 self.session.cert = client_cert
431
432 self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session)
433 self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session)
434 self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session)
435 self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session)
436 self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session)
437
438 def _gen_func_id(self) -> str:
439 """Generate the next function call id."""
440 self.func_id += 1
441 return 'FCI-%u-%u' % (time.time(), self.func_id)
442
443
444 def call_downloadOrder(self, data: dict) -> dict:
445 """Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1)."""
446 return self.downloadOrder.call(data, self._gen_func_id())
447
448 def call_confirmOrder(self, data: dict) -> dict:
449 """Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2)."""
450 return self.confirmOrder.call(data, self._gen_func_id())
451
452 def call_cancelOrder(self, data: dict) -> dict:
453 """Perform ES2+ CancelOrder function (SGP.22 section 5.3.3)."""
454 return self.cancelOrder.call(data, self._gen_func_id())
455
456 def call_releaseProfile(self, data: dict) -> dict:
457 """Perform ES2+ CancelOrder function (SGP.22 section 5.3.4)."""
458 return self.releaseProfile.call(data, self._gen_func_id())
459
460 def call_handleDownloadProgressInfo(self, data: dict) -> dict:
461 """Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5)."""
462 return self.handleDownloadProgressInfo.call(data, self._gen_func_id())