Harald Welte | 95873a9 | 2024-02-06 19:42:19 +0100 | [diff] [blame] | 1 | """GSMA eSIM RSP ES2+ interface according to SGP.22 v2.5""" |
| 2 | |
| 3 | # (C) 2024 by Harald Welte <laforge@osmocom.org> |
| 4 | # |
| 5 | # This program is free software: you can redistribute it and/or modify |
| 6 | # it under the terms of the GNU Affero General Public License as published by |
| 7 | # the Free Software Foundation, either version 3 of the License, or |
| 8 | # (at your option) any later version. |
| 9 | # |
| 10 | # This program is distributed in the hope that it will be useful, |
| 11 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 12 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 13 | # GNU Affero General Public License for more details. |
| 14 | # |
| 15 | # You should have received a copy of the GNU Affero General Public License |
| 16 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 17 | |
| 18 | import abc |
| 19 | import requests |
| 20 | import logging |
| 21 | import json |
| 22 | from datetime import datetime |
| 23 | import time |
| 24 | import base64 |
| 25 | |
| 26 | logger = logging.getLogger(__name__) |
| 27 | logger.setLevel(logging.DEBUG) |
| 28 | |
| 29 | class ApiParam(abc.ABC): |
| 30 | """A class reprsenting a single parameter in the ES2+ API.""" |
| 31 | @classmethod |
| 32 | def verify_decoded(cls, data): |
| 33 | """Verify the decoded reprsentation of a value. Should raise an exception if somthing is odd.""" |
| 34 | pass |
| 35 | |
| 36 | @classmethod |
| 37 | def verify_encoded(cls, data): |
| 38 | """Verify the encoded reprsentation of a value. Should raise an exception if somthing is odd.""" |
| 39 | pass |
| 40 | |
| 41 | @classmethod |
| 42 | def encode(cls, data): |
| 43 | """[Validate and] Encode the given value.""" |
| 44 | cls.verify_decoded(data) |
| 45 | encoded = cls._encode(data) |
| 46 | cls.verify_decoded(encoded) |
| 47 | return encoded |
| 48 | |
| 49 | @classmethod |
| 50 | def _encode(cls, data): |
| 51 | """encoder function, typically [but not always] overridden by derived class.""" |
| 52 | return data |
| 53 | |
| 54 | @classmethod |
| 55 | def decode(cls, data): |
| 56 | """[Validate and] Decode the given value.""" |
| 57 | cls.verify_encoded(data) |
| 58 | decoded = cls._decode(data) |
| 59 | cls.verify_decoded(decoded) |
| 60 | return decoded |
| 61 | |
| 62 | @classmethod |
| 63 | def _decode(cls, data): |
| 64 | """decoder function, typically [but not always] overridden by derived class.""" |
| 65 | return data |
| 66 | |
| 67 | class ApiParamString(ApiParam): |
| 68 | """Base class representing an API parameter of 'string' type.""" |
| 69 | pass |
| 70 | |
| 71 | |
| 72 | class ApiParamInteger(ApiParam): |
| 73 | """Base class representing an API parameter of 'integer' type.""" |
| 74 | @classmethod |
| 75 | def _decode(cls, data): |
| 76 | return int(data) |
| 77 | |
| 78 | @classmethod |
| 79 | def _encode(cls, data): |
| 80 | return str(data) |
| 81 | |
| 82 | @classmethod |
| 83 | def verify_decoded(cls, data): |
| 84 | if not isinstance(data, int): |
| 85 | raise TypeError('Expected an integer input data type') |
| 86 | |
| 87 | @classmethod |
| 88 | def verify_encoded(cls, data): |
Harald Welte | b2b29cf | 2024-04-02 21:59:03 +0000 | [diff] [blame] | 89 | if isinstance(data, int): |
| 90 | return |
Harald Welte | 95873a9 | 2024-02-06 19:42:19 +0100 | [diff] [blame] | 91 | if not data.isdecimal(): |
| 92 | raise ValueError('integer (%s) contains non-decimal characters' % data) |
| 93 | assert str(int(data)) == data |
| 94 | |
| 95 | class ApiParamBoolean(ApiParam): |
| 96 | """Base class representing an API parameter of 'boolean' type.""" |
| 97 | @classmethod |
| 98 | def _encode(cls, data): |
| 99 | return bool(data) |
| 100 | |
| 101 | class ApiParamFqdn(ApiParam): |
| 102 | """String, as a list of domain labels concatenated using the full stop (dot, period) character as |
| 103 | separator between labels. Labels are restricted to the Alphanumeric mode character set defined in table 5 |
| 104 | of ISO/IEC 18004""" |
| 105 | @classmethod |
| 106 | def verify_encoded(cls, data): |
| 107 | # FIXME |
| 108 | pass |
| 109 | |
| 110 | class param: |
| 111 | class Iccid(ApiParamString): |
| 112 | """String representation of 19 or 20 digits, where the 20th digit MAY optionally be the padding |
| 113 | character F.""" |
| 114 | @classmethod |
| 115 | def _encode(cls, data): |
| 116 | data = str(data) |
| 117 | # SGP.22 version prior to 2.2 do not require support for 19-digit ICCIDs, so let's always |
| 118 | # encode it with padding F at the end. |
| 119 | if len(data) == 19: |
| 120 | data += 'F' |
| 121 | return data |
| 122 | |
| 123 | @classmethod |
| 124 | def verify_encoded(cls, data): |
| 125 | if len(data) not in [19, 20]: |
| 126 | raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data))) |
| 127 | |
| 128 | @classmethod |
| 129 | def _decode(cls, data): |
| 130 | # strip trailing padding (if it's 20 digits) |
| 131 | if len(data) == 20 and data[-1] in ['F', 'f']: |
| 132 | data = data[:-1] |
| 133 | return data |
| 134 | |
| 135 | @classmethod |
| 136 | def verify_decoded(cls, data): |
| 137 | data = str(data) |
| 138 | if len(data) not in [19, 20]: |
| 139 | raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data))) |
| 140 | if len(data) == 19: |
| 141 | decimal_part = data |
| 142 | else: |
| 143 | decimal_part = data[:-1] |
| 144 | final_part = data[-1:] |
| 145 | if final_part not in ['F', 'f'] and not final_part.isdecimal(): |
| 146 | raise ValueError('ICCID (%s) contains non-decimal characters' % data) |
| 147 | if not decimal_part.isdecimal(): |
| 148 | raise ValueError('ICCID (%s) contains non-decimal characters' % data) |
| 149 | |
| 150 | |
| 151 | class Eid(ApiParamString): |
| 152 | """String of 32 decimal characters""" |
| 153 | @classmethod |
| 154 | def verify_encoded(cls, data): |
| 155 | if len(data) != 32: |
| 156 | raise ValueError('EID length invalid: "%s" (%u)' % (data, len(data))) |
| 157 | |
| 158 | @classmethod |
| 159 | def verify_decoded(cls, data): |
| 160 | if not data.isdecimal(): |
| 161 | raise ValueError('EID (%s) contains non-decimal characters' % data) |
| 162 | |
| 163 | class ProfileType(ApiParamString): |
| 164 | pass |
| 165 | |
| 166 | class MatchingId(ApiParamString): |
| 167 | pass |
| 168 | |
| 169 | class ConfirmationCode(ApiParamString): |
| 170 | pass |
| 171 | |
| 172 | class SmdsAddress(ApiParamFqdn): |
| 173 | pass |
| 174 | |
| 175 | class SmdpAddress(ApiParamFqdn): |
| 176 | pass |
| 177 | |
| 178 | class ReleaseFlag(ApiParamBoolean): |
| 179 | pass |
| 180 | |
| 181 | class FinalProfileStatusIndicator(ApiParamString): |
| 182 | pass |
| 183 | |
| 184 | class Timestamp(ApiParamString): |
| 185 | """String format as specified by W3C: YYYY-MM-DDThh:mm:ssTZD""" |
| 186 | @classmethod |
| 187 | def _decode(cls, data): |
| 188 | return datetime.fromisoformat(data) |
| 189 | |
| 190 | @classmethod |
| 191 | def _encode(cls, data): |
| 192 | return datetime.toisoformat(data) |
| 193 | |
| 194 | class NotificationPointId(ApiParamInteger): |
| 195 | pass |
| 196 | |
| 197 | class NotificationPointStatus(ApiParam): |
| 198 | pass |
| 199 | |
| 200 | class ResultData(ApiParam): |
| 201 | @classmethod |
| 202 | def _decode(cls, data): |
| 203 | return base64.b64decode(data) |
| 204 | |
| 205 | @classmethod |
| 206 | def _encode(cls, data): |
| 207 | return base64.b64encode(data) |
| 208 | |
| 209 | class JsonResponseHeader(ApiParam): |
| 210 | """SGP.22 section 6.5.1.4.""" |
| 211 | @classmethod |
| 212 | def verify_decoded(cls, data): |
| 213 | fe_status = data.get('functionExecutionStatus') |
| 214 | if not fe_status: |
| 215 | raise ValueError('Missing mandatory functionExecutionStatus in header') |
| 216 | status = fe_status.get('status') |
| 217 | if not status: |
| 218 | raise ValueError('Missing mandatory status in header functionExecutionStatus') |
| 219 | if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']: |
| 220 | raise ValueError('Unknown/unspecified status "%s"' % status) |
| 221 | |
| 222 | |
| 223 | class HttpStatusError(Exception): |
| 224 | pass |
| 225 | |
| 226 | class HttpHeaderError(Exception): |
| 227 | pass |
| 228 | |
| 229 | class Es2PlusApiError(Exception): |
| 230 | """Exception representing an error at the ES2+ API level (status != Executed).""" |
| 231 | def __init__(self, func_ex_status: dict): |
| 232 | self.status = func_ex_status['status'] |
| 233 | sec = { |
| 234 | 'subjectCode': None, |
| 235 | 'reasonCode': None, |
| 236 | 'subjectIdentifier': None, |
| 237 | 'message': None, |
| 238 | } |
| 239 | actual_sec = func_ex_status.get('statusCodeData', None) |
| 240 | sec.update(actual_sec) |
| 241 | self.subject_code = sec['subjectCode'] |
| 242 | self.reason_code = sec['reasonCode'] |
| 243 | self.subject_id = sec['subjectIdentifier'] |
| 244 | self.message = sec['message'] |
| 245 | |
| 246 | def __str__(self): |
| 247 | return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")' |
| 248 | |
| 249 | class Es2PlusApiFunction(abc.ABC): |
| 250 | """Base classs for representing an ES2+ API Function.""" |
| 251 | # the below class variables are expected to be overridden in derived classes |
| 252 | |
| 253 | path = None |
| 254 | # dictionary of input parameters. key is parameter name, value is ApiParam class |
| 255 | input_params = {} |
| 256 | # list of mandatory input parameters |
| 257 | input_mandatory = [] |
| 258 | # dictionary of output parameters. key is parameter name, value is ApiParam class |
| 259 | output_params = {} |
| 260 | # list of mandatory output parameters (for successful response) |
| 261 | output_mandatory = [] |
| 262 | # expected HTTP status code of the response |
| 263 | expected_http_status = 200 |
Harald Welte | 1d1ba8e | 2024-04-02 22:00:27 +0000 | [diff] [blame^] | 264 | # the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE) |
| 265 | http_method = 'POST' |
Harald Welte | 95873a9 | 2024-02-06 19:42:19 +0100 | [diff] [blame] | 266 | |
| 267 | def __init__(self, url_prefix: str, func_req_id: str, session): |
| 268 | self.url_prefix = url_prefix |
| 269 | self.func_req_id = func_req_id |
| 270 | self.session = session |
| 271 | |
| 272 | def encode(self, data: dict, func_call_id: str) -> dict: |
| 273 | """Validate an encode input dict into JSON-serializable dict for request body.""" |
| 274 | output = { |
| 275 | 'header': { |
| 276 | 'functionRequesterIdentifier': self.func_req_id, |
| 277 | 'functionCallIdentifier': func_call_id |
| 278 | } |
| 279 | } |
| 280 | for p in self.input_mandatory: |
| 281 | if not p in data: |
| 282 | raise ValueError('Mandatory input parameter %s missing' % p) |
| 283 | for p, v in data.items(): |
| 284 | p_class = self.input_params.get(p) |
| 285 | if not p_class: |
| 286 | logger.warning('Unexpected/unsupported input parameter %s=%s', p, v) |
| 287 | output[p] = v |
| 288 | else: |
| 289 | output[p] = p_class.encode(v) |
| 290 | return output |
| 291 | |
| 292 | |
| 293 | def decode(self, data: dict) -> dict: |
| 294 | """[further] Decode and validate the JSON-Dict of the respnse body.""" |
| 295 | output = {} |
| 296 | # let's first do the header, it's special |
| 297 | if not 'header' in data: |
| 298 | raise ValueError('Mandatory output parameter "header" missing') |
| 299 | hdr_class = self.output_params.get('header') |
| 300 | output['header'] = hdr_class.decode(data['header']) |
| 301 | |
| 302 | if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']: |
| 303 | raise Es2PlusApiError(output['header']['functionExecutionStatus']) |
| 304 | # we can only expect mandatory parameters to be present in case of successful execution |
| 305 | for p in self.output_mandatory: |
| 306 | if p == 'header': |
| 307 | continue |
| 308 | if not p in data: |
| 309 | raise ValueError('Mandatory output parameter "%s" missing' % p) |
| 310 | for p, v in data.items(): |
| 311 | p_class = self.output_params.get(p) |
| 312 | if not p_class: |
| 313 | logger.warning('Unexpected/unsupported output parameter "%s"="%s"', p, v) |
| 314 | output[p] = v |
| 315 | else: |
| 316 | output[p] = p_class.decode(v) |
| 317 | return output |
| 318 | |
| 319 | def call(self, data: dict, func_call_id:str, timeout=10) -> dict: |
| 320 | """Make an API call to the ES2+ API endpoint represented by this object. |
| 321 | Input data is passed in `data` as json-serializable dict. Output data |
| 322 | is returned as json-deserialized dict.""" |
| 323 | url = self.url_prefix + self.path |
| 324 | encoded = json.dumps(self.encode(data, func_call_id)) |
| 325 | headers = { |
| 326 | 'Content-Type': 'application/json', |
| 327 | 'X-Admin-Protocol': 'gsma/rsp/v2.5.0', |
| 328 | } |
| 329 | |
| 330 | logger.debug("HTTP REQ %s - '%s'" % (url, encoded)) |
Harald Welte | 1d1ba8e | 2024-04-02 22:00:27 +0000 | [diff] [blame^] | 331 | response = self.session.request(self.http_method, url, data=encoded, headers=headers, timeout=timeout) |
Harald Welte | 95873a9 | 2024-02-06 19:42:19 +0100 | [diff] [blame] | 332 | logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers)) |
| 333 | logger.debug("HTTP RSP: %s" % (response.content)) |
| 334 | |
| 335 | if response.status_code != self.expected_http_status: |
| 336 | raise HttpStatusError(response) |
| 337 | if not response.headers.get('Content-Type').startswith(headers['Content-Type']): |
| 338 | raise HttpHeaderError(response) |
| 339 | if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'): |
| 340 | raise HttpHeaderError(response) |
| 341 | |
| 342 | return self.decode(response.json()) |
| 343 | |
| 344 | |
| 345 | # ES2+ DownloadOrder function (SGP.22 section 5.3.1) |
| 346 | class DownloadOrder(Es2PlusApiFunction): |
| 347 | path = '/gsma/rsp2/es2plus/downloadOrder' |
| 348 | input_params = { |
| 349 | 'eid': param.Eid, |
| 350 | 'iccid': param.Iccid, |
| 351 | 'profileType': param.ProfileType |
| 352 | } |
| 353 | output_params = { |
| 354 | 'header': param.JsonResponseHeader, |
| 355 | 'iccid': param.Iccid, |
| 356 | } |
| 357 | output_mandatory = ['header', 'iccid'] |
| 358 | |
| 359 | # ES2+ ConfirmOrder function (SGP.22 section 5.3.2) |
| 360 | class ConfirmOrder(Es2PlusApiFunction): |
| 361 | path = '/gsma/rsp2/es2plus/confirmOrder' |
| 362 | input_params = { |
| 363 | 'iccid': param.Iccid, |
| 364 | 'eid': param.Eid, |
| 365 | 'matchingId': param.MatchingId, |
| 366 | 'confirmationCode': param.ConfirmationCode, |
| 367 | 'smdsAddress': param.SmdsAddress, |
| 368 | 'releaseFlag': param.ReleaseFlag, |
| 369 | } |
| 370 | input_mandatory = ['iccid', 'releaseFlag'] |
| 371 | output_params = { |
| 372 | 'header': param.JsonResponseHeader, |
| 373 | 'eid': param.Eid, |
| 374 | 'matchingId': param.MatchingId, |
| 375 | 'smdpAddress': param.SmdpAddress, |
| 376 | } |
| 377 | output_mandatory = ['header', 'matchingId'] |
| 378 | |
| 379 | # ES2+ CancelOrder function (SGP.22 section 5.3.3) |
| 380 | class CancelOrder(Es2PlusApiFunction): |
| 381 | path = '/gsma/rsp2/es2plus/cancelOrder' |
| 382 | input_params = { |
| 383 | 'iccid': param.Iccid, |
| 384 | 'eid': param.Eid, |
| 385 | 'matchingId': param.MatchingId, |
| 386 | 'finalProfileStatusIndicator': param.FinalProfileStatusIndicator, |
| 387 | } |
| 388 | input_mandatory = ['finalProfileStatusIndicator', 'iccid'] |
| 389 | output_params = { |
| 390 | 'header': param.JsonResponseHeader, |
| 391 | } |
| 392 | output_mandatory = ['header'] |
| 393 | |
| 394 | # ES2+ ReleaseProfile function (SGP.22 section 5.3.4) |
| 395 | class ReleaseProfile(Es2PlusApiFunction): |
| 396 | path = '/gsma/rsp2/es2plus/releaseProfile' |
| 397 | input_params = { |
| 398 | 'iccid': param.Iccid, |
| 399 | } |
| 400 | input_mandatory = ['iccid'] |
| 401 | output_params = { |
| 402 | 'header': param.JsonResponseHeader, |
| 403 | } |
| 404 | output_mandatory = ['header'] |
| 405 | |
| 406 | # ES2+ HandleDownloadProgress function (SGP.22 section 5.3.5) |
| 407 | class HandleDownloadProgressInfo(Es2PlusApiFunction): |
| 408 | path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo' |
| 409 | input_params = { |
| 410 | 'eid': param.Eid, |
| 411 | 'iccid': param.Iccid, |
| 412 | 'profileType': param.ProfileType, |
| 413 | 'timestamp': param.Timestamp, |
| 414 | 'notificationPointId': param.NotificationPointId, |
| 415 | 'notificationPointStatus': param.NotificationPointStatus, |
| 416 | 'resultData': param.ResultData, |
| 417 | } |
| 418 | input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus'] |
| 419 | expected_http_status = 204 |
| 420 | |
| 421 | |
| 422 | class Es2pApiClient: |
| 423 | """Main class representing a full ES2+ API client. Has one method for each API function.""" |
| 424 | def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None): |
| 425 | self.func_id = 0 |
| 426 | self.session = requests.Session() |
| 427 | if server_cert_verify: |
| 428 | self.session.verify = server_cert_verify |
| 429 | if client_cert: |
| 430 | self.session.cert = client_cert |
| 431 | |
| 432 | self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session) |
| 433 | self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session) |
| 434 | self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session) |
| 435 | self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session) |
| 436 | self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session) |
| 437 | |
| 438 | def _gen_func_id(self) -> str: |
| 439 | """Generate the next function call id.""" |
| 440 | self.func_id += 1 |
| 441 | return 'FCI-%u-%u' % (time.time(), self.func_id) |
| 442 | |
| 443 | |
| 444 | def call_downloadOrder(self, data: dict) -> dict: |
| 445 | """Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1).""" |
| 446 | return self.downloadOrder.call(data, self._gen_func_id()) |
| 447 | |
| 448 | def call_confirmOrder(self, data: dict) -> dict: |
| 449 | """Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2).""" |
| 450 | return self.confirmOrder.call(data, self._gen_func_id()) |
| 451 | |
| 452 | def call_cancelOrder(self, data: dict) -> dict: |
| 453 | """Perform ES2+ CancelOrder function (SGP.22 section 5.3.3).""" |
| 454 | return self.cancelOrder.call(data, self._gen_func_id()) |
| 455 | |
| 456 | def call_releaseProfile(self, data: dict) -> dict: |
| 457 | """Perform ES2+ CancelOrder function (SGP.22 section 5.3.4).""" |
| 458 | return self.releaseProfile.call(data, self._gen_func_id()) |
| 459 | |
| 460 | def call_handleDownloadProgressInfo(self, data: dict) -> dict: |
| 461 | """Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5).""" |
| 462 | return self.handleDownloadProgressInfo.call(data, self._gen_func_id()) |