Vadim Yanitskiy | 1381ff1 | 2018-10-27 01:48:15 +0700 | [diff] [blame] | 1 | # -*- coding: utf-8 -*- |
| 2 | |
| 3 | """ pySim: Transport Link for Calypso bases phones |
| 4 | """ |
| 5 | |
| 6 | # |
| 7 | # Copyright (C) 2018 Vadim Yanitskiy <axilirator@gmail.com> |
| 8 | # |
| 9 | # This program is free software: you can redistribute it and/or modify |
| 10 | # it under the terms of the GNU General Public License as published by |
| 11 | # the Free Software Foundation, either version 2 of the License, or |
| 12 | # (at your option) any later version. |
| 13 | # |
| 14 | # This program is distributed in the hope that it will be useful, |
| 15 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 16 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 17 | # GNU General Public License for more details. |
| 18 | # |
| 19 | # You should have received a copy of the GNU General Public License |
| 20 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 21 | # |
| 22 | |
| 23 | from __future__ import absolute_import |
| 24 | |
| 25 | import select |
| 26 | import struct |
| 27 | import socket |
| 28 | import os |
| 29 | |
| 30 | from pySim.transport import LinkBase |
| 31 | from pySim.exceptions import * |
| 32 | from pySim.utils import h2b, b2h |
| 33 | |
| 34 | class L1CTLMessage(object): |
| 35 | |
| 36 | # Every (encoded) L1CTL message has the following structure: |
| 37 | # - msg_length (2 bytes, net order) |
| 38 | # - l1ctl_hdr (packed structure) |
| 39 | # - msg_type |
| 40 | # - flags |
| 41 | # - padding (2 spare bytes) |
| 42 | # - ... payload ... |
| 43 | |
| 44 | def __init__(self, msg_type, flags = 0x00): |
| 45 | # Init L1CTL message header |
| 46 | self.data = struct.pack("BBxx", msg_type, flags) |
| 47 | |
| 48 | def gen_msg(self): |
| 49 | return struct.pack("!H", len(self.data)) + self.data |
| 50 | |
| 51 | class L1CTLMessageReset(L1CTLMessage): |
| 52 | |
| 53 | # L1CTL message types |
| 54 | L1CTL_RESET_REQ = 0x0d |
| 55 | L1CTL_RESET_IND = 0x07 |
| 56 | L1CTL_RESET_CONF = 0x0e |
| 57 | |
| 58 | # Reset types |
| 59 | L1CTL_RES_T_BOOT = 0x00 |
| 60 | L1CTL_RES_T_FULL = 0x01 |
| 61 | L1CTL_RES_T_SCHED = 0x02 |
| 62 | |
| 63 | def __init__(self, type = L1CTL_RES_T_FULL): |
| 64 | super(L1CTLMessageReset, self).__init__(self.L1CTL_RESET_REQ) |
| 65 | self.data += struct.pack("Bxxx", type) |
| 66 | |
| 67 | class L1CTLMessageSIM(L1CTLMessage): |
| 68 | |
| 69 | # SIM related message types |
| 70 | L1CTL_SIM_REQ = 0x16 |
| 71 | L1CTL_SIM_CONF = 0x17 |
| 72 | |
| 73 | def __init__(self, pdu): |
| 74 | super(L1CTLMessageSIM, self).__init__(self.L1CTL_SIM_REQ) |
| 75 | self.data += pdu |
| 76 | |
| 77 | class CalypsoSimLink(LinkBase): |
| 78 | |
| 79 | def __init__(self, sock_path = "/tmp/osmocom_l2"): |
| 80 | # Make sure that a given socket path exists |
| 81 | if not os.path.exists(sock_path): |
| 82 | raise ReaderError("There is no such ('%s') UNIX socket" % sock_path) |
| 83 | |
| 84 | print("Connecting to osmocon at '%s'..." % sock_path) |
| 85 | |
| 86 | # Establish a client connection |
| 87 | self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| 88 | self.sock.connect(sock_path) |
| 89 | |
| 90 | def __del__(self): |
| 91 | self.sock.close() |
| 92 | |
| 93 | def wait_for_rsp(self, exp_len = 128): |
| 94 | # Wait for incoming data (timeout is 3 seconds) |
| 95 | s, _, _ = select.select([self.sock], [], [], 3.0) |
| 96 | if not s: |
| 97 | raise ReaderError("Timeout waiting for card response") |
| 98 | |
| 99 | # Receive expected amount of bytes from osmocon |
| 100 | rsp = self.sock.recv(exp_len) |
| 101 | return rsp |
| 102 | |
| 103 | def reset_card(self): |
| 104 | # Request FULL reset |
| 105 | req_msg = L1CTLMessageReset() |
| 106 | self.sock.send(req_msg.gen_msg()) |
| 107 | |
| 108 | # Wait for confirmation |
| 109 | rsp = self.wait_for_rsp() |
| 110 | rsp_msg = struct.unpack_from("!HB", rsp) |
| 111 | if rsp_msg[1] != L1CTLMessageReset.L1CTL_RESET_CONF: |
| 112 | raise ReaderError("Failed to reset Calypso PHY") |
| 113 | |
| 114 | def connect(self): |
| 115 | self.reset_card() |
| 116 | |
| 117 | def disconnect(self): |
| 118 | pass # Nothing to do really ... |
| 119 | |
| 120 | def wait_for_card(self, timeout = None, newcardonly = False): |
| 121 | pass # Nothing to do really ... |
| 122 | |
| 123 | def send_apdu_raw(self, pdu): |
| 124 | """see LinkBase.send_apdu_raw""" |
| 125 | |
| 126 | # Request FULL reset |
| 127 | req_msg = L1CTLMessageSIM(h2b(pdu)) |
| 128 | self.sock.send(req_msg.gen_msg()) |
| 129 | |
| 130 | # Read message length first |
| 131 | rsp = self.wait_for_rsp(struct.calcsize("!H")) |
| 132 | msg_len = struct.unpack_from("!H", rsp)[0] |
| 133 | if msg_len < struct.calcsize("BBxx"): |
| 134 | raise ReaderError("Missing L1CTL header for L1CTL_SIM_CONF") |
| 135 | |
| 136 | # Read the whole message then |
| 137 | rsp = self.sock.recv(msg_len) |
| 138 | |
| 139 | # Verify L1CTL header |
| 140 | hdr = struct.unpack_from("BBxx", rsp) |
| 141 | if hdr[0] != L1CTLMessageSIM.L1CTL_SIM_CONF: |
| 142 | raise ReaderError("Unexpected L1CTL message received") |
| 143 | |
| 144 | # Verify the payload length |
| 145 | offset = struct.calcsize("BBxx") |
| 146 | if len(rsp) <= offset: |
| 147 | raise ProtocolError("Empty response from SIM?!?") |
| 148 | |
| 149 | # Omit L1CTL header |
| 150 | rsp = rsp[offset:] |
| 151 | |
| 152 | # Unpack data and SW |
| 153 | data = rsp[:-2] |
| 154 | sw = rsp[-2:] |
| 155 | |
| 156 | return b2h(data), b2h(sw) |