| """GSMA eSIM RSP ES2+ interface according to SGP.22 v2.5""" |
| |
| # (C) 2024 by Harald Welte <laforge@osmocom.org> |
| # |
| # This program is free software: you can redistribute it and/or modify |
| # it under the terms of the GNU Affero General Public License as published by |
| # the Free Software Foundation, either version 3 of the License, or |
| # (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU Affero General Public License for more details. |
| # |
| # You should have received a copy of the GNU Affero General Public License |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| |
| import abc |
| import requests |
| import logging |
| import json |
| from datetime import datetime |
| import time |
| import base64 |
| |
| logger = logging.getLogger(__name__) |
| logger.setLevel(logging.DEBUG) |
| |
| class ApiParam(abc.ABC): |
| """A class reprsenting a single parameter in the ES2+ API.""" |
| @classmethod |
| def verify_decoded(cls, data): |
| """Verify the decoded reprsentation of a value. Should raise an exception if somthing is odd.""" |
| pass |
| |
| @classmethod |
| def verify_encoded(cls, data): |
| """Verify the encoded reprsentation of a value. Should raise an exception if somthing is odd.""" |
| pass |
| |
| @classmethod |
| def encode(cls, data): |
| """[Validate and] Encode the given value.""" |
| cls.verify_decoded(data) |
| encoded = cls._encode(data) |
| cls.verify_decoded(encoded) |
| return encoded |
| |
| @classmethod |
| def _encode(cls, data): |
| """encoder function, typically [but not always] overridden by derived class.""" |
| return data |
| |
| @classmethod |
| def decode(cls, data): |
| """[Validate and] Decode the given value.""" |
| cls.verify_encoded(data) |
| decoded = cls._decode(data) |
| cls.verify_decoded(decoded) |
| return decoded |
| |
| @classmethod |
| def _decode(cls, data): |
| """decoder function, typically [but not always] overridden by derived class.""" |
| return data |
| |
| class ApiParamString(ApiParam): |
| """Base class representing an API parameter of 'string' type.""" |
| pass |
| |
| |
| class ApiParamInteger(ApiParam): |
| """Base class representing an API parameter of 'integer' type.""" |
| @classmethod |
| def _decode(cls, data): |
| return int(data) |
| |
| @classmethod |
| def _encode(cls, data): |
| return str(data) |
| |
| @classmethod |
| def verify_decoded(cls, data): |
| if not isinstance(data, int): |
| raise TypeError('Expected an integer input data type') |
| |
| @classmethod |
| def verify_encoded(cls, data): |
| if isinstance(data, int): |
| return |
| if not data.isdecimal(): |
| raise ValueError('integer (%s) contains non-decimal characters' % data) |
| assert str(int(data)) == data |
| |
| class ApiParamBoolean(ApiParam): |
| """Base class representing an API parameter of 'boolean' type.""" |
| @classmethod |
| def _encode(cls, data): |
| return bool(data) |
| |
| class ApiParamFqdn(ApiParam): |
| """String, as a list of domain labels concatenated using the full stop (dot, period) character as |
| separator between labels. Labels are restricted to the Alphanumeric mode character set defined in table 5 |
| of ISO/IEC 18004""" |
| @classmethod |
| def verify_encoded(cls, data): |
| # FIXME |
| pass |
| |
| class param: |
| class Iccid(ApiParamString): |
| """String representation of 19 or 20 digits, where the 20th digit MAY optionally be the padding |
| character F.""" |
| @classmethod |
| def _encode(cls, data): |
| data = str(data) |
| # SGP.22 version prior to 2.2 do not require support for 19-digit ICCIDs, so let's always |
| # encode it with padding F at the end. |
| if len(data) == 19: |
| data += 'F' |
| return data |
| |
| @classmethod |
| def verify_encoded(cls, data): |
| if len(data) not in [19, 20]: |
| raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data))) |
| |
| @classmethod |
| def _decode(cls, data): |
| # strip trailing padding (if it's 20 digits) |
| if len(data) == 20 and data[-1] in ['F', 'f']: |
| data = data[:-1] |
| return data |
| |
| @classmethod |
| def verify_decoded(cls, data): |
| data = str(data) |
| if len(data) not in [19, 20]: |
| raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data))) |
| if len(data) == 19: |
| decimal_part = data |
| else: |
| decimal_part = data[:-1] |
| final_part = data[-1:] |
| if final_part not in ['F', 'f'] and not final_part.isdecimal(): |
| raise ValueError('ICCID (%s) contains non-decimal characters' % data) |
| if not decimal_part.isdecimal(): |
| raise ValueError('ICCID (%s) contains non-decimal characters' % data) |
| |
| |
| class Eid(ApiParamString): |
| """String of 32 decimal characters""" |
| @classmethod |
| def verify_encoded(cls, data): |
| if len(data) != 32: |
| raise ValueError('EID length invalid: "%s" (%u)' % (data, len(data))) |
| |
| @classmethod |
| def verify_decoded(cls, data): |
| if not data.isdecimal(): |
| raise ValueError('EID (%s) contains non-decimal characters' % data) |
| |
| class ProfileType(ApiParamString): |
| pass |
| |
| class MatchingId(ApiParamString): |
| pass |
| |
| class ConfirmationCode(ApiParamString): |
| pass |
| |
| class SmdsAddress(ApiParamFqdn): |
| pass |
| |
| class SmdpAddress(ApiParamFqdn): |
| pass |
| |
| class ReleaseFlag(ApiParamBoolean): |
| pass |
| |
| class FinalProfileStatusIndicator(ApiParamString): |
| pass |
| |
| class Timestamp(ApiParamString): |
| """String format as specified by W3C: YYYY-MM-DDThh:mm:ssTZD""" |
| @classmethod |
| def _decode(cls, data): |
| return datetime.fromisoformat(data) |
| |
| @classmethod |
| def _encode(cls, data): |
| return datetime.toisoformat(data) |
| |
| class NotificationPointId(ApiParamInteger): |
| pass |
| |
| class NotificationPointStatus(ApiParam): |
| pass |
| |
| class ResultData(ApiParam): |
| @classmethod |
| def _decode(cls, data): |
| return base64.b64decode(data) |
| |
| @classmethod |
| def _encode(cls, data): |
| return base64.b64encode(data) |
| |
| class JsonResponseHeader(ApiParam): |
| """SGP.22 section 6.5.1.4.""" |
| @classmethod |
| def verify_decoded(cls, data): |
| fe_status = data.get('functionExecutionStatus') |
| if not fe_status: |
| raise ValueError('Missing mandatory functionExecutionStatus in header') |
| status = fe_status.get('status') |
| if not status: |
| raise ValueError('Missing mandatory status in header functionExecutionStatus') |
| if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']: |
| raise ValueError('Unknown/unspecified status "%s"' % status) |
| |
| |
| class HttpStatusError(Exception): |
| pass |
| |
| class HttpHeaderError(Exception): |
| pass |
| |
| class Es2PlusApiError(Exception): |
| """Exception representing an error at the ES2+ API level (status != Executed).""" |
| def __init__(self, func_ex_status: dict): |
| self.status = func_ex_status['status'] |
| sec = { |
| 'subjectCode': None, |
| 'reasonCode': None, |
| 'subjectIdentifier': None, |
| 'message': None, |
| } |
| actual_sec = func_ex_status.get('statusCodeData', None) |
| sec.update(actual_sec) |
| self.subject_code = sec['subjectCode'] |
| self.reason_code = sec['reasonCode'] |
| self.subject_id = sec['subjectIdentifier'] |
| self.message = sec['message'] |
| |
| def __str__(self): |
| return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")' |
| |
| class Es2PlusApiFunction(abc.ABC): |
| """Base classs for representing an ES2+ API Function.""" |
| # the below class variables are expected to be overridden in derived classes |
| |
| path = None |
| # dictionary of input parameters. key is parameter name, value is ApiParam class |
| input_params = {} |
| # list of mandatory input parameters |
| input_mandatory = [] |
| # dictionary of output parameters. key is parameter name, value is ApiParam class |
| output_params = {} |
| # list of mandatory output parameters (for successful response) |
| output_mandatory = [] |
| # expected HTTP status code of the response |
| expected_http_status = 200 |
| # the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE) |
| http_method = 'POST' |
| |
| def __init__(self, url_prefix: str, func_req_id: str, session): |
| self.url_prefix = url_prefix |
| self.func_req_id = func_req_id |
| self.session = session |
| |
| def encode(self, data: dict, func_call_id: str) -> dict: |
| """Validate an encode input dict into JSON-serializable dict for request body.""" |
| output = { |
| 'header': { |
| 'functionRequesterIdentifier': self.func_req_id, |
| 'functionCallIdentifier': func_call_id |
| } |
| } |
| for p in self.input_mandatory: |
| if not p in data: |
| raise ValueError('Mandatory input parameter %s missing' % p) |
| for p, v in data.items(): |
| p_class = self.input_params.get(p) |
| if not p_class: |
| logger.warning('Unexpected/unsupported input parameter %s=%s', p, v) |
| output[p] = v |
| else: |
| output[p] = p_class.encode(v) |
| return output |
| |
| |
| def decode(self, data: dict) -> dict: |
| """[further] Decode and validate the JSON-Dict of the respnse body.""" |
| output = {} |
| # let's first do the header, it's special |
| if not 'header' in data: |
| raise ValueError('Mandatory output parameter "header" missing') |
| hdr_class = self.output_params.get('header') |
| output['header'] = hdr_class.decode(data['header']) |
| |
| if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']: |
| raise Es2PlusApiError(output['header']['functionExecutionStatus']) |
| # we can only expect mandatory parameters to be present in case of successful execution |
| for p in self.output_mandatory: |
| if p == 'header': |
| continue |
| if not p in data: |
| raise ValueError('Mandatory output parameter "%s" missing' % p) |
| for p, v in data.items(): |
| p_class = self.output_params.get(p) |
| if not p_class: |
| logger.warning('Unexpected/unsupported output parameter "%s"="%s"', p, v) |
| output[p] = v |
| else: |
| output[p] = p_class.decode(v) |
| return output |
| |
| def call(self, data: dict, func_call_id:str, timeout=10) -> dict: |
| """Make an API call to the ES2+ API endpoint represented by this object. |
| Input data is passed in `data` as json-serializable dict. Output data |
| is returned as json-deserialized dict.""" |
| url = self.url_prefix + self.path |
| encoded = json.dumps(self.encode(data, func_call_id)) |
| headers = { |
| 'Content-Type': 'application/json', |
| 'X-Admin-Protocol': 'gsma/rsp/v2.5.0', |
| } |
| |
| logger.debug("HTTP REQ %s - '%s'" % (url, encoded)) |
| response = self.session.request(self.http_method, url, data=encoded, headers=headers, timeout=timeout) |
| logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers)) |
| logger.debug("HTTP RSP: %s" % (response.content)) |
| |
| if response.status_code != self.expected_http_status: |
| raise HttpStatusError(response) |
| if not response.headers.get('Content-Type').startswith(headers['Content-Type']): |
| raise HttpHeaderError(response) |
| if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'): |
| raise HttpHeaderError(response) |
| |
| return self.decode(response.json()) |
| |
| |
| # ES2+ DownloadOrder function (SGP.22 section 5.3.1) |
| class DownloadOrder(Es2PlusApiFunction): |
| path = '/gsma/rsp2/es2plus/downloadOrder' |
| input_params = { |
| 'eid': param.Eid, |
| 'iccid': param.Iccid, |
| 'profileType': param.ProfileType |
| } |
| output_params = { |
| 'header': param.JsonResponseHeader, |
| 'iccid': param.Iccid, |
| } |
| output_mandatory = ['header', 'iccid'] |
| |
| # ES2+ ConfirmOrder function (SGP.22 section 5.3.2) |
| class ConfirmOrder(Es2PlusApiFunction): |
| path = '/gsma/rsp2/es2plus/confirmOrder' |
| input_params = { |
| 'iccid': param.Iccid, |
| 'eid': param.Eid, |
| 'matchingId': param.MatchingId, |
| 'confirmationCode': param.ConfirmationCode, |
| 'smdsAddress': param.SmdsAddress, |
| 'releaseFlag': param.ReleaseFlag, |
| } |
| input_mandatory = ['iccid', 'releaseFlag'] |
| output_params = { |
| 'header': param.JsonResponseHeader, |
| 'eid': param.Eid, |
| 'matchingId': param.MatchingId, |
| 'smdpAddress': param.SmdpAddress, |
| } |
| output_mandatory = ['header', 'matchingId'] |
| |
| # ES2+ CancelOrder function (SGP.22 section 5.3.3) |
| class CancelOrder(Es2PlusApiFunction): |
| path = '/gsma/rsp2/es2plus/cancelOrder' |
| input_params = { |
| 'iccid': param.Iccid, |
| 'eid': param.Eid, |
| 'matchingId': param.MatchingId, |
| 'finalProfileStatusIndicator': param.FinalProfileStatusIndicator, |
| } |
| input_mandatory = ['finalProfileStatusIndicator', 'iccid'] |
| output_params = { |
| 'header': param.JsonResponseHeader, |
| } |
| output_mandatory = ['header'] |
| |
| # ES2+ ReleaseProfile function (SGP.22 section 5.3.4) |
| class ReleaseProfile(Es2PlusApiFunction): |
| path = '/gsma/rsp2/es2plus/releaseProfile' |
| input_params = { |
| 'iccid': param.Iccid, |
| } |
| input_mandatory = ['iccid'] |
| output_params = { |
| 'header': param.JsonResponseHeader, |
| } |
| output_mandatory = ['header'] |
| |
| # ES2+ HandleDownloadProgress function (SGP.22 section 5.3.5) |
| class HandleDownloadProgressInfo(Es2PlusApiFunction): |
| path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo' |
| input_params = { |
| 'eid': param.Eid, |
| 'iccid': param.Iccid, |
| 'profileType': param.ProfileType, |
| 'timestamp': param.Timestamp, |
| 'notificationPointId': param.NotificationPointId, |
| 'notificationPointStatus': param.NotificationPointStatus, |
| 'resultData': param.ResultData, |
| } |
| input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus'] |
| expected_http_status = 204 |
| |
| |
| class Es2pApiClient: |
| """Main class representing a full ES2+ API client. Has one method for each API function.""" |
| def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None): |
| self.func_id = 0 |
| self.session = requests.Session() |
| if server_cert_verify: |
| self.session.verify = server_cert_verify |
| if client_cert: |
| self.session.cert = client_cert |
| |
| self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session) |
| self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session) |
| self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session) |
| self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session) |
| self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session) |
| |
| def _gen_func_id(self) -> str: |
| """Generate the next function call id.""" |
| self.func_id += 1 |
| return 'FCI-%u-%u' % (time.time(), self.func_id) |
| |
| |
| def call_downloadOrder(self, data: dict) -> dict: |
| """Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1).""" |
| return self.downloadOrder.call(data, self._gen_func_id()) |
| |
| def call_confirmOrder(self, data: dict) -> dict: |
| """Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2).""" |
| return self.confirmOrder.call(data, self._gen_func_id()) |
| |
| def call_cancelOrder(self, data: dict) -> dict: |
| """Perform ES2+ CancelOrder function (SGP.22 section 5.3.3).""" |
| return self.cancelOrder.call(data, self._gen_func_id()) |
| |
| def call_releaseProfile(self, data: dict) -> dict: |
| """Perform ES2+ CancelOrder function (SGP.22 section 5.3.4).""" |
| return self.releaseProfile.call(data, self._gen_func_id()) |
| |
| def call_handleDownloadProgressInfo(self, data: dict) -> dict: |
| """Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5).""" |
| return self.handleDownloadProgressInfo.call(data, self._gen_func_id()) |