module OSMUX_Emulation {

/* Functionalities that we want this module to implement:
 *  * act as a Osmux source that generates a Osmux Stream
 *  * act as a Osmux sink that consumes a Osmux Stream
 *
 * for all of the above, we want to be able to
 *  * specify the payload type
 *  * specify the interval / sample rate
 *  * create drop-outs in the stream
 *  * detect reordered or lost frames
 *  * validate if the size of the frames matches epectations
 *  * play back real audio (at least some tones?)
 *  * enable/disable generation/verification of RTCP
 */

/* Ideas:

* each component consists of transmitter and receiver
* transmitters and receivers can be operated as tuple?
* high-level operation
** set-up config at transmitter + receiver
** transmit sequence of payloads
** verify receiption of those payloads
* can operate full-duplex/bi-directional as needed

* transmitter
** trigger transmission of n number of packets
** transmit them at normal ptime interval
** payload size configurable
** payload contents PRBS or the like

* receiver
** count number of related packets at receiver
** check received payload type
** check received timestamp increments
** check received seq_nr increments
** (optionally) check for SSRC
** (optionally) check for payload size
** (optionally) check for payload contents

* later
** how to test transcoding?
** how to test pure play-out endpoints (rx only)?
** how to test "Rx from wrong IP/port" scenarios?
** how to test RTCP?
** maybe keep ports un-connected to show wrong src -lrt

*/




import from General_Types all;
import from Osmocom_Types all;
import from IPL4asp_Types all;
import from Misc_Helpers all;
import from AMR_Types all;
import from OSMUX_Types all;
import from OSMUX_CodecPort all;
import from OSMUX_CodecPort_CtrlFunct all;

type component OSMUX_Emulation_CT {
	/* down-facing ports for Osmux on top of IPL4asp */
	port OSMUX_CODEC_PT OSMUX;
	var integer g_osmux_conn_id := -1;

	/* user-facing port for controlling the binding */
	port OsmuxEM_CTRL_PT CTRL;
	/* user-facing port for sniffing Osmux frames */
	port OsmuxEM_DATA_PT DATA;

	/* configurable by user, should be fixed */
	var OsmuxemConfig g_cfg := c_OsmuxemDefaultCfg;

	/* statistics */
	var OsmuxemStats g_stat := c_OsmuxemStatsReset;

	var HostName g_remote_host;
	var PortNumber g_remote_port;
	var HostName g_local_host;
	var PortNumber g_local_port;

	/* state variables, change over time */
	var boolean g_rx_enabled := false;
	var boolean g_tx_connected := false; /* Set to true after connect() */

	var INT7b g_rx_payload_type := 0;
	var LIN2_BO_LAST g_rx_last_seq;
	var uint32_t g_rx_last_ts;

	var RxHandleTableRec RxHandleTable[16];
	var OsmuxTxHandle TxHandleList[16];
}

type record RxHandleTableRec {
	OsmuxCID cid,
	OsmuxRxHandle vc_conn
};

type record OsmuxRxHandle {
	OsmuxCID cid,
	boolean first_seq_seen,
	INT1 last_seq_ack
};

const OsmuxRxHandle c_OsmuxemDefaultRxHandle := {
	cid := 0,
	first_seq_seen := false,
	last_seq_ack := 0
}

type record OsmuxTxHandle {
	INT2b ft,
	BIT1 amr_f,
	BIT1 amr_q,
	INT1 seq,
	OsmuxCID cid,
	INT4b amr_ft,
	INT4b amr_cmr
};

template OsmuxTxHandle t_TxHandleAMR590(OsmuxCID cid) := {
	ft := 1,//enum2int(OsmuxFT:OSMUX_FT_AMR)
	amr_f := '0'B, /* this frame is the last frame in this payload */
	amr_q := '1'B, /* frame not damaged */
	seq := 12,
	cid := cid,
	amr_ft := 2, /* AMR 5.90 */
	amr_cmr := 0
};

type enumerated OsmuxemMode {
	OSMUXEM_MODE_NONE,
	OSMUXEM_MODE_TXONLY,
	OSMUXEM_MODE_RXONLY,
	OSMUXEM_MODE_BIDIR
};

type record OsmuxemStats {
	/* number of packets transmitted */
	integer num_pkts_tx,
	/* number of Osmux payload bytes transmitted */
	integer bytes_payload_tx,

	/* number of packets received */
	integer num_pkts_rx,
	/* number of Osmux payload bytes received */
	integer bytes_payload_rx,
	/* number of packets received out-of-sequence */
	integer num_pkts_rx_err_seq,
	/* number of packets received during Rx disable */
	integer num_pkts_rx_err_disabled,
	/* number of packets received with mismatching payload */
	integer num_pkts_rx_err_payload
}

const OsmuxemStats c_OsmuxemStatsReset := {
	num_pkts_tx := 0,
	bytes_payload_tx := 0,
	num_pkts_rx := 0,
	bytes_payload_rx := 0,
	num_pkts_rx_err_seq := 0,
	num_pkts_rx_err_disabled := 0,
	num_pkts_rx_err_payload := 0
}

type record OsmuxemConfig {
	INT3b batch_size,
	integer tx_duration_ms,
	octetstring tx_fixed_payload optional,
	octetstring rx_fixed_payload optional
};

const OsmuxemConfig c_OsmuxemDefaultCfg := {
	batch_size := 4,
	tx_duration_ms := 20 * 4, /* 4 is batch_size */
	tx_fixed_payload := '010203040102030401020304010203040102030401020304'O,
	rx_fixed_payload := '010203040102030401020304010203040102030401020304'O
}

signature OsmuxEM_bind(in HostName local_host, inout PortNumber local_port);
signature OsmuxEM_connect(in HostName remote_host, in PortNumber remote_port);
signature OsmuxEM_mode(in OsmuxemMode mode);
signature OsmuxEM_configure(in OsmuxemConfig cfg);
signature OsmuxEM_stats_get(out OsmuxemStats stats);
signature OsmuxEM_register_rxhandle(in OsmuxRxHandle hdl);
signature OsmuxEM_register_txhandle(in OsmuxTxHandle hdl);

type port OsmuxEM_CTRL_PT procedure {
	inout OsmuxEM_bind, OsmuxEM_connect, OsmuxEM_mode, OsmuxEM_configure,
	      OsmuxEM_stats_get, OsmuxEM_register_rxhandle, OsmuxEM_register_txhandle;
} with { extension "internal" };

type port OsmuxEM_DATA_PT message {
	inout OSMUX_PDU;
} with { extension "internal" };

function f_osmuxem_bind(OsmuxEM_CTRL_PT pt, in HostName local_host, inout PortNumber local_port) {
	pt.call(OsmuxEM_bind:{local_host, local_port}) {
		[] pt.getreply(OsmuxEM_bind:{local_host, ?}) -> param (local_port) {};
	}
}
function f_osmuxem_connect(OsmuxEM_CTRL_PT pt, in HostName remote_host, in PortNumber remote_port) {
	pt.call(OsmuxEM_connect:{remote_host, remote_port}) {
		[] pt.getreply(OsmuxEM_connect:{remote_host, remote_port}) {};
	}
}
function f_osmuxem_mode(OsmuxEM_CTRL_PT pt, in OsmuxemMode mode) {
	pt.call(OsmuxEM_mode:{mode}) {
		[] pt.getreply(OsmuxEM_mode:{mode}) {};
	}
}
function f_osmuxem_configure(OsmuxEM_CTRL_PT pt, in OsmuxemConfig cfg) {
	pt.call(OsmuxEM_configure:{cfg}) {
		[] pt.getreply(OsmuxEM_configure:{cfg}) {};
	}
}
function f_osmuxem_stats_get(OsmuxEM_CTRL_PT pt) return OsmuxemStats {
	var OsmuxemStats stats;
	pt.call(OsmuxEM_stats_get:{-}) {
		[] pt.getreply(OsmuxEM_stats_get:{?}) -> param(stats) {};
	}
	return stats;
}

function f_osmuxem_register_rxhandle(OsmuxEM_CTRL_PT pt, OsmuxRxHandle hdl) {
	pt.call(OsmuxEM_register_rxhandle:{hdl}) {
		[] pt.getreply(OsmuxEM_register_rxhandle:{hdl}) {};
	}
}

function f_osmuxem_register_txhandle(OsmuxEM_CTRL_PT pt, OsmuxTxHandle hdl) {
	pt.call(OsmuxEM_register_txhandle:{hdl}) {
		[] pt.getreply(OsmuxEM_register_txhandle:{hdl}) {};
	}
}


function f_osmuxem_stats_compare_value(integer a, integer b, integer tolerance := 0) return boolean {
	var integer temp;

	temp := (a - b)
	if (temp < 0) {
		temp := -temp;
	}

	if (temp > tolerance) {
		return false;
	}

	return true;
}

/* Cross-compare two osmuxem-statistics. The transmission statistics on the a side
 * must match the reception statistics on the other side and vice versa. The
 * user may also supply a tolerance value (number of packets) when deviations
 * are acceptable */
function f_osmuxem_stats_compare(OsmuxemStats a, OsmuxemStats b, integer tolerance := 0) return boolean {
	var integer plen;

	log("stats A: ", a);
	log("stats B: ", b);
	log("tolerance: ", tolerance, " packets");

	if (f_osmuxem_stats_compare_value(a.num_pkts_tx, b.num_pkts_rx, tolerance) == false) {
		return false;
	}

	if (f_osmuxem_stats_compare_value(a.num_pkts_rx, b.num_pkts_tx, tolerance) == false) {
		return false;
	}

	if(a.num_pkts_tx > 0) {
		plen := a.bytes_payload_tx / a.num_pkts_tx;
	} else {
		plen := 0;
	}

	if (f_osmuxem_stats_compare_value(a.bytes_payload_tx, b.bytes_payload_rx, tolerance * plen) == false) {
		return false;
	}

	if (f_osmuxem_stats_compare_value(a.bytes_payload_rx, b.bytes_payload_tx, tolerance * plen) == false) {
		return false;
	}

	return true;
}

/* Check the statistics for general signs of errors. This is a basic general
 * check that will fit most situations and  is intended to be executed by
 * the testcases as as needed. */
function f_osmuxem_stats_err_check(OsmuxemStats s) {
	log("stats: ", s);

	/* Check if there was some activity at either on the RX or on the
	 * TX side, but complete silence would indicate some problem */
	if (s.num_pkts_tx < 1 and s.num_pkts_rx < 1) {
		Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
					"no Osmux packet activity detected (packets)");
	}
	if (s.bytes_payload_tx < 1 and s.bytes_payload_rx < 1) {
		Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
					"no Osmux packet activity detected (bytes)");
	}

	/* Check error counters */
	if (s.num_pkts_rx_err_seq != 0) {
		Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
					"Osmux packet sequence number errors occurred");
	}
	if (s.num_pkts_rx_err_disabled != 0) {
		Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
					"Osmux packets received while RX was disabled");
	}
	if (s.num_pkts_rx_err_payload != 0) {
		Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
					"Osmux packets with mismatching payload received");
	}
}

template PDU_Osmux_AMR ts_OsmuxAMR(BIT1 marker, INT3b ctr, BIT1 amr_f, BIT1 amr_q, INT1 seq,
				   OsmuxCID cid, INT4b amr_ft, INT4b amr_cmr,
				   octetstring payload) := {
	header := {
		marker := marker,
		ft := 1,
		ctr := ctr,
		amr_f := amr_f,
		amr_q := amr_q,
		seq := seq,
		cid := cid,
		amr_ft := amr_ft,
		amr_cmr := amr_cmr
	},
	data := payload
}

private function f_rxhandle_get_by_cid(OsmuxCID cid) runs on OSMUX_Emulation_CT return OsmuxRxHandle {
	var integer i;
	for (i := 0; i < sizeof(RxHandleTable); i := i+1) {
		if (isbound(RxHandleTable[i].cid) and RxHandleTable[i].cid == cid) {
			return RxHandleTable[i].vc_conn;
		}
	}
	Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
				log2str("No Component for CID ", cid));
	return RxHandleTable[0].vc_conn; /* make compiler happy, not reached */
}

private function f_rxhandle_cid_add(OsmuxRxHandle hdl) runs on OSMUX_Emulation_CT {
	var integer i;
	for (i := 0; i < sizeof(RxHandleTable); i := i+1) {
		if (not isbound(RxHandleTable[i].cid)) {
			RxHandleTable[i].cid := hdl.cid;
			RxHandleTable[i].vc_conn := hdl;
			return;
		}
	}
	Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
				log2str("No Space in RxHandleTable for ", hdl.cid));
}

private function f_txhandle_cid_add(OsmuxTxHandle hdl) runs on OSMUX_Emulation_CT {
	var integer i;
	for (i := 0; i < sizeof(TxHandleList); i := i+1) {
		if (not isbound(TxHandleList[i])) {
			TxHandleList[i] := hdl;
			return;
		}
	}
	Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
				log2str("No Space in TxHandleList for ", hdl.cid));
}

function f_osmux_gen_expected_rx_rtp_payload(INT4b amr_ft, octetstring tx_fixed_payload) return octetstring {
	var integer payload_len;
	var octetstring payload_truncated;
	var integer i;
	payload_len := f_amrft_payload_len(amr_ft);
	payload_truncated := substr(tx_fixed_payload, 0, payload_len);
	return payload_truncated;
}

private function f_osmux_gen_payload(INT3b ctr, INT4b amr_ft) runs on OSMUX_Emulation_CT return octetstring {
	var octetstring payload_truncated := ''O;
	var integer i;

	for (i := 0; i < ctr + 1; i := i+1) {
		payload_truncated := payload_truncated & f_osmux_gen_expected_rx_rtp_payload(amr_ft, g_cfg.tx_fixed_payload);
	}
	return payload_truncated;
}

private function f_tx_osmux(integer i, INT3b ctr, octetstring payload, BIT1 marker := '0'B) runs on OSMUX_Emulation_CT {
	var OsmuxTxHandle hdl := TxHandleList[i];
	var PDU_Osmux_AMR osmux_amr := valueof(ts_OsmuxAMR(marker, ctr, hdl.amr_f,
					  hdl.amr_q, hdl.seq, hdl.cid, hdl.amr_ft,
					  hdl.amr_cmr, payload));
	OSMUX.send(t_Osmux_Send(g_osmux_conn_id, OSMUX_PDU:{osmux_amr:=osmux_amr}));
	/* increment sequence + timestamp for next transmit */
	TxHandleList[i].seq := TxHandleList[i].seq + 1;

	/* update counters */
	g_stat.num_pkts_tx := g_stat.num_pkts_tx+1;
	g_stat.bytes_payload_tx := g_stat.bytes_payload_tx +
						lengthof(payload);
}

private function f_tx_osmux_all_cid(BIT1 marker := '0'B) runs on OSMUX_Emulation_CT {
	/* TODO: append all in one UDP packet and send together */
	var integer i;
	var octetstring payload_truncated;
	var INT3b ctr := g_cfg.batch_size - 1;

	for (i := 0; i < sizeof(TxHandleList); i := i+1) {
		if (isbound(TxHandleList[i])) {
			payload_truncated := f_osmux_gen_payload(ctr, TxHandleList[i].amr_ft);
			f_tx_osmux(i, ctr, payload_truncated, marker);

		}
	}
}

function f_main() runs on OSMUX_Emulation_CT
{
	var Result res;
	var OsmuxRxHandle rx_hdl;
	var OsmuxTxHandle tx_hdl;
	var octetstring payload_truncated;
	var PortEvent port_event;
	timer T_transmit := int2float(g_cfg.tx_duration_ms)/1000.0;
	var Osmux_RecvFrom rx_osmux;
	var PDU_Osmux_AMR  rx_amr;
	var PDU_Osmux_DUMMY osmux_dummy;
	var OsmuxemConfig cfg;
	var template Osmux_RecvFrom tr_osmux_amr := {
		connId := ?,
		remName := ?,
		remPort := ?,
		locName := ?,
		locPort := ?,
		msg := ?
	};
	tr_osmux_amr.msg := { osmux_amr := ? };
	var template Osmux_RecvFrom tr_osmux_dummy := {
		connId := ?,
		remName := ?,
		remPort := ?,
		locName := ?,
		locPort := ?,
		msg := ?
	};
	tr_osmux_dummy.msg := { osmux_dummy := ? };

	while (true) {
	alt {
		/* control procedures (calls) from the user */
		[] CTRL.getcall(OsmuxEM_bind:{?,?}) -> param(g_local_host, g_local_port) {

			g_tx_connected := false; /* will set it back to true upon next connect() call */

			if (g_osmux_conn_id != -1) {
				res := OSMUX_CodecPort_CtrlFunct.f_IPL4_close(OSMUX, g_osmux_conn_id, {udp := {}});
				g_osmux_conn_id := -1;
			}
			res := OSMUX_CodecPort_CtrlFunct.f_IPL4_listen(OSMUX, g_local_host,
								g_local_port, {udp:={}});
			if (not ispresent(res.connId)) {
				Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
							"Could not listen on Osmux socket, check your configuration");
			}
			g_osmux_conn_id := res.connId;
			tr_osmux_amr.connId := g_osmux_conn_id;
			tr_osmux_dummy.connId := g_osmux_conn_id;

			CTRL.reply(OsmuxEM_bind:{g_local_host, g_local_port});
		}
		[] CTRL.getcall(OsmuxEM_connect:{?,?}) -> param (g_remote_host, g_remote_port) {
			res := OSMUX_CodecPort_CtrlFunct.f_IPL4_connect(OSMUX, g_remote_host,
								g_remote_port,
								g_local_host, g_local_port,
								g_osmux_conn_id, {udp:={}});
			if (not ispresent(res.connId)) {
				Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
							"Could not connect to Osmux socket, check your configuration");
			}
			g_tx_connected := true;
			CTRL.reply(OsmuxEM_connect:{g_remote_host, g_remote_port});
		}
		[] CTRL.getcall(OsmuxEM_mode:{OSMUXEM_MODE_NONE}) {
			T_transmit.stop;
			g_rx_enabled := false;
			CTRL.reply(OsmuxEM_mode:{OSMUXEM_MODE_NONE});
		}
		[] CTRL.getcall(OsmuxEM_mode:{OSMUXEM_MODE_TXONLY}) {
			/* start transmit timer */
			T_transmit.start;
			g_rx_enabled := false;
			CTRL.reply(OsmuxEM_mode:{OSMUXEM_MODE_TXONLY});
		}
		[] CTRL.getcall(OsmuxEM_mode:{OSMUXEM_MODE_RXONLY}) {

			T_transmit.stop;
			if (g_rx_enabled == false) {
				/* flush queues */
				OSMUX.clear;
				g_rx_enabled := true;
			}
			CTRL.reply(OsmuxEM_mode:{OSMUXEM_MODE_RXONLY});
		}
		[] CTRL.getcall(OsmuxEM_mode:{OSMUXEM_MODE_BIDIR}) {
			T_transmit.start;
			if (g_rx_enabled == false) {
				/* flush queues */
				OSMUX.clear;
				g_rx_enabled := true;
			}
			CTRL.reply(OsmuxEM_mode:{OSMUXEM_MODE_BIDIR});
		}
		[] CTRL.getcall(OsmuxEM_configure:{?}) -> param (cfg) {
			g_cfg := cfg;
			CTRL.reply(OsmuxEM_configure:{cfg});
		}
		[] CTRL.getcall(OsmuxEM_register_txhandle:{?}) -> param (tx_hdl) {
			f_txhandle_cid_add(tx_hdl);
			CTRL.reply(OsmuxEM_register_txhandle:{tx_hdl});
		}
		[] CTRL.getcall(OsmuxEM_register_rxhandle:{?}) -> param (rx_hdl) {
			f_rxhandle_cid_add(rx_hdl);
			CTRL.reply(OsmuxEM_register_rxhandle:{rx_hdl});
		}
		[] CTRL.getcall(OsmuxEM_stats_get:{?}) {
				CTRL.reply(OsmuxEM_stats_get:{g_stat});
		}

		/* simply ignore any Osmux AMR if receiver not enabled */
		[g_rx_enabled==false] OSMUX.receive(tr_osmux_amr) {
			g_stat.num_pkts_rx_err_disabled := g_stat.num_pkts_rx_err_disabled+1;
		}
		/* simply ignore any Osmux Dummy if receiver not enabled */
		[g_rx_enabled==false] OSMUX.receive(tr_osmux_dummy) -> value rx_osmux {
			log("Osmux Dummy received on CID ", rx_osmux.msg.osmux_dummy.header.cid, " (rx_disabled)");
		}

		/* process received Osmux AMR if receiver enabled */
		[g_rx_enabled] OSMUX.receive(tr_osmux_amr) -> value rx_osmux {
			/* increment counters */
			g_stat.num_pkts_rx := g_stat.num_pkts_rx+1;
			g_stat.bytes_payload_rx := g_stat.bytes_payload_rx +
								lengthof(rx_osmux.msg.osmux_amr.data);
			rx_hdl := f_rxhandle_get_by_cid(rx_osmux.msg.osmux_amr.header.cid);

			if (rx_hdl.first_seq_seen and rx_hdl.last_seq_ack != rx_osmux.msg.osmux_amr.header.seq - 1 ) {
				g_stat.num_pkts_rx_err_seq := g_stat.num_pkts_rx_err_seq + 1;
			}
			rx_hdl.first_seq_seen := true;
			rx_hdl.last_seq_ack := rx_osmux.msg.osmux_amr.header.seq;

			payload_truncated := f_osmux_gen_payload(rx_osmux.msg.osmux_amr.header.ctr, rx_osmux.msg.osmux_amr.header.amr_ft);
			if (ispresent(g_cfg.rx_fixed_payload) and rx_osmux.msg.osmux_amr.data != payload_truncated) {
				g_stat.num_pkts_rx_err_payload := g_stat.num_pkts_rx_err_payload + 1;
			}
			if (DATA.checkstate("Connected")) {
				DATA.send(rx_osmux.msg);
			}
		}
		/* process received Osmux Dummy if receiver enabled */
		[g_rx_enabled] OSMUX.receive(tr_osmux_dummy) -> value rx_osmux {
			log("Osmux Dummy received on CID", rx_osmux.msg.osmux_dummy.header.cid);
			rx_hdl := f_rxhandle_get_by_cid(rx_osmux.msg.osmux_dummy.header.cid);
		}

		/* transmit if timer has expired */
		[g_tx_connected] T_transmit.timeout {
			/* send one Osmux frame, re-start timer */
			f_tx_osmux_all_cid();
			T_transmit.start;
		}

		[] OSMUX.receive(PortEvent:?) -> value port_event {
			Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
						log2str("Received unexpected port event from Osmux:", port_event));
		}

		/* fail on any unexpected messages */
		[] OSMUX.receive {
			Misc_Helpers.f_shutdown(__BFILE__, __LINE__, fail,
						"Received unexpected msg type from Osmux");
		}
	}
	}
}


}
