module RTP_Emulation {

/* Functionalities that we want this module to imeplement:
 *  * act as a RTP source that generates a RTP Stream
 *  * act asaa RTP sink that consumes a RTP Stream
 *
 * for all of the above, we want to be able to
 *  * specify the payload type
 *  * specify the interval / sample rate
 *  * create drop-outs in the stream
 *  * detect reordered or lost frames
 *  * validate if the size of the frames matches epectations
 *  * play back real audio (at least some tones?)
 *  * enable/disable generation/verification of RTCP
 */

/* (C) 2017-2018 Harald Welte <laforge@gnumonks.org>
 * (C) 2018-2019 sysmocom - s.f.m.c. GmbH
 * All rights reserved.
 *
 * Released under the terms of GNU General Public License, Version 2 or
 * (at your option) any later version.
 *
 * SPDX-License-Identifier: GPL-2.0-or-later
 */


/* Ideas:

* each component consists of transmitter and receiver
* transmitters and receivers can be operated as tuple?
* high-level operation
** set-up config at transmitter + receiver
** transmit sequence of payloads
** verify receiption of those payloads
* can operate full-duplex/bi-directional as needed

* transmitter
** trigger transmission of n number of packets
** transmit them at normal ptime interval
** payload size configurable
** payload contents PRBS or the like

* receiver
** count number of related packets at receiver
** check received payload type
** check received timestamp increments
** check received seq_nr increments
** (optionally) check for SSRC
** (optionally) check for payload size
** (optionally) check for payload contents

* later
** how to test transcoding?
** how to test pure play-out endpoints (rx only)?
** how to test "Rx from wrong IP/port" scenarios?
** how to test RTCP?
** maybe keep ports un-connected to show wrong src -lrt

*/




import from General_Types all;
import from Osmocom_Types all;
import from IPL4asp_Types all;
import from RTP_Types all;
import from RTP_CodecPort all;
import from RTP_CodecPort_CtrlFunct all;

import from IuUP_Types all;
import from IuUP_Emulation all;

type component RTP_Emulation_CT {
	/* down-facing ports for RTP and RTCP codec ports on top of IPL4asp */
	port RTP_CODEC_PT RTP;
	var integer g_rtp_conn_id := -1;
	port RTP_CODEC_PT RTCP;
	var integer g_rtcp_conn_id := -1;

	/* user-facing port for controlling the binding */
	port RTPEM_CTRL_PT CTRL;

	/* configurable by user, should be fixed */
	var RtpemConfig g_cfg := c_RtpemDefaultCfg;

	/* statistics */
	var RtpemStats g_stats_rtp := c_RtpemStatsReset;
	var RtpemStats g_stats_rtcp := c_RtpemStatsReset;

	var HostName g_remote_host;
	var PortNumber g_remote_port;
	var HostName g_local_host;
	var PortNumber g_local_port;

	/* state variables, change over time */
	var boolean g_rx_enabled := false;
	var boolean g_tx_connected := false; /* Set to true after connect() */
	var LIN2_BO_LAST g_tx_next_seq := 0;
	var uint32_t g_tx_next_ts := 0;

	var INT7b g_rx_payload_type := 0;
	var LIN2_BO_LAST g_rx_last_seq;
	var uint32_t g_rx_last_ts;

	var IuUP_Entity g_iuup_ent; // := valueof(t_IuUP_Entity(1));

	var boolean g_conn_refuse_expect := false;
	var boolean g_conn_refuse_received := false;
}

type enumerated RtpemMode {
	RTPEM_MODE_NONE,
	RTPEM_MODE_TXONLY,
	RTPEM_MODE_RXONLY,
	RTPEM_MODE_BIDIR
};

type record RtpemStats {
	/* number of packets transmitted */
	integer num_pkts_tx,
	/* number of RTP payload bytes transmitted */
	integer bytes_payload_tx,

	/* number of packets received */
	integer num_pkts_rx,
	/* number of RTP payload bytes received */
	integer bytes_payload_rx,
	/* number of packets received out-of-sequence */
	integer num_pkts_rx_err_seq,
	/* number of packets received wrong timestamp */
	integer num_pkts_rx_err_ts,
	/* number of packets received wrong payload type */
	integer num_pkts_rx_err_pt,
	/* number of packets received during Rx disable */
	integer num_pkts_rx_err_disabled,
	/* number of packets received with mismatching payload */
	integer num_pkts_rx_err_payload
}

const RtpemStats c_RtpemStatsReset := {
	num_pkts_tx := 0,
	bytes_payload_tx := 0,
	num_pkts_rx := 0,
	bytes_payload_rx := 0,
	num_pkts_rx_err_seq := 0,
	num_pkts_rx_err_ts := 0,
	num_pkts_rx_err_pt := 0,
	num_pkts_rx_err_disabled := 0,
	num_pkts_rx_err_payload := 0
}

type record RtpemConfig {
	INT7b tx_payload_type,
	integer tx_samplerate_hz,
	integer tx_duration_ms,
	BIT32_BO_LAST tx_ssrc,
	octetstring tx_fixed_payload optional,
	octetstring rx_fixed_payload optional,
	boolean iuup_mode,
	boolean iuup_tx_init
};

const RtpemConfig c_RtpemDefaultCfg := {
	tx_payload_type := 0,
	tx_samplerate_hz := 8000,
	tx_duration_ms := 20,
	tx_ssrc := '11011110101011011011111011101111'B,
	tx_fixed_payload := '01020304'O,
	rx_fixed_payload := '01020304'O,
	iuup_mode := false,
	iuup_tx_init := true
}

signature RTPEM_bind(in HostName local_host, inout PortNumber local_port);
signature RTPEM_connect(in HostName remote_host, in PortNumber remote_port);
signature RTPEM_mode(in RtpemMode mode);
signature RTPEM_configure(in RtpemConfig cfg);
signature RTPEM_stats_get(out RtpemStats stats, in boolean rtcp);
signature RTPEM_conn_refuse_expect(in boolean expect);
signature RTPEM_conn_refuse_received(out boolean received);

type port RTPEM_CTRL_PT procedure {
	inout RTPEM_bind, RTPEM_connect, RTPEM_mode, RTPEM_configure, RTPEM_stats_get, RTPEM_conn_refuse_expect,
	      RTPEM_conn_refuse_received;
} with { extension "internal" };

function f_rtpem_bind(RTPEM_CTRL_PT pt, in HostName local_host, inout PortNumber local_port) {
	pt.call(RTPEM_bind:{local_host, local_port}) {
		[] pt.getreply(RTPEM_bind:{local_host, ?}) -> param (local_port) {};
	}
}
function f_rtpem_connect(RTPEM_CTRL_PT pt, in HostName remote_host, in PortNumber remote_port) {
	pt.call(RTPEM_connect:{remote_host, remote_port}) {
		[] pt.getreply(RTPEM_connect:{remote_host, remote_port}) {};
	}
}
function f_rtpem_mode(RTPEM_CTRL_PT pt, in RtpemMode mode) {
	pt.call(RTPEM_mode:{mode}) {
		[] pt.getreply(RTPEM_mode:{mode}) {};
	}
}
function f_rtpem_configure(RTPEM_CTRL_PT pt, in RtpemConfig cfg) {
	pt.call(RTPEM_configure:{cfg}) {
		[] pt.getreply(RTPEM_configure:{cfg}) {};
	}
}
function f_rtpem_stats_get(RTPEM_CTRL_PT pt, boolean rtcp := false) return RtpemStats {
	var RtpemStats stats;
	pt.call(RTPEM_stats_get:{-, rtcp}) {
		[] pt.getreply(RTPEM_stats_get:{?, rtcp}) -> param(stats) {};
	}
	return stats;
}

function f_rtpem_stats_compare_value(integer a, integer b, integer tolerance := 0) return boolean {
	var integer temp;

	temp := (a - b)
	if (temp < 0) {
		temp := -temp;
	}

	if (temp > tolerance) {
		return false;
	}

	return true;
}

/* Cross-compare two rtpem-statistics. The transmission statistics on the a side
 * must match the reception statistics on the other side and vice versa. The
 * user may also supply a tolerance value (number of packets) when deviations
 * are acceptable */
function f_rtpem_stats_compare(RtpemStats a, RtpemStats b, integer tolerance := 0) return boolean {
	var integer plen;

	log("stats A: ", a);
	log("stats B: ", b);
	log("tolerance: ", tolerance, " packets");

	if (f_rtpem_stats_compare_value(a.num_pkts_tx, b.num_pkts_rx, tolerance) == false) {
		return false;
	}

	if (f_rtpem_stats_compare_value(a.num_pkts_rx, b.num_pkts_tx, tolerance) == false) {
		return false;
	}

	if(a.num_pkts_tx > 0) {
		plen := a.bytes_payload_tx / a.num_pkts_tx;
	} else {
		plen := 0;
	}

	if (f_rtpem_stats_compare_value(a.bytes_payload_tx, b.bytes_payload_rx, tolerance * plen) == false) {
		return false;
	}

	if (f_rtpem_stats_compare_value(a.bytes_payload_rx, b.bytes_payload_tx, tolerance * plen) == false) {
		return false;
	}

	return true;
}

/* Check the statistics for general signs of errors. This is a basic general
 * check that will fit most situations and  is intended to be executed by
 * the testcases as as needed. */
function f_rtpem_stats_err_check(RtpemStats s) {
	log("stats: ", s);

	/* Check if there was some activity at either on the RX or on the
	 * TX side, but complete silence would indicate some problem */
	if (s.num_pkts_tx < 1 and s.num_pkts_rx < 1) {
		setverdict(fail, "no RTP packet activity detected (packets)");
		mtc.stop;
	}
	if (s.bytes_payload_tx < 1 and s.bytes_payload_rx < 1) {
		setverdict(fail, "no RTP packet activity detected (bytes)");
		mtc.stop;
	}

	/* Check error counters */
	if (s.num_pkts_rx_err_seq != 0) {
		setverdict(fail, "RTP packet sequence number errors occurred");
		mtc.stop;
	}
	if (s.num_pkts_rx_err_ts != 0) {
		setverdict(fail, "RTP packet timestamp errors occurred");
		mtc.stop;
	}
	if (s.num_pkts_rx_err_pt != 0) {
		setverdict(fail, "RTP packet payload type errors occurred");
		mtc.stop;
	}
	if (s.num_pkts_rx_err_disabled != 0) {
		setverdict(fail, "RTP packets received while RX was disabled");
		mtc.stop;
	}
	if (s.num_pkts_rx_err_payload != 0) {
		setverdict(fail, "RTP packets with mismatching payload received");
		mtc.stop;
	}
}

function f_rtpem_conn_refuse_expect(RTPEM_CTRL_PT pt) {
	pt.call(RTPEM_conn_refuse_expect:{true}) {
		[] pt.getreply(RTPEM_conn_refuse_expect:{true}) {};
	}
}

function f_rtpem_conn_refuse_verify(RTPEM_CTRL_PT pt) {
	pt.call(RTPEM_conn_refuse_received:{?}) {
		[] pt.getreply(RTPEM_conn_refuse_received:{true}) {};
		[] pt.getreply(RTPEM_conn_refuse_received:{false}) {
			setverdict(fail, "Expected to receive connection refused");
		};
	}
}

template PDU_RTP ts_RTP(BIT32_BO_LAST ssrc, INT7b pt, LIN2_BO_LAST seq, uint32_t ts,
			octetstring payload, BIT1 marker := '0'B) := {
	version := 2,
	padding_ind := '0'B,
	extension_ind := '0'B,
	CSRC_count := 0,
	marker_bit := marker,
	payload_type := pt,
	sequence_number := seq,
	time_stamp := int2bit(ts, 32),
	SSRC_id := ssrc,
	CSRCs := omit,
	ext_header := omit,
	data := payload
}

private function f_tx_rtp(octetstring payload, BIT1 marker := '0'B) runs on RTP_Emulation_CT {
	if (g_cfg.iuup_mode) {
		payload := f_IuUP_Em_tx_encap(g_iuup_ent, payload);
	}
	var PDU_RTP rtp := valueof(ts_RTP(g_cfg.tx_ssrc, g_cfg.tx_payload_type, g_tx_next_seq,
					  g_tx_next_ts, payload, marker));
	RTP.send(t_RTP_Send(g_rtp_conn_id, RTP_messages_union:{rtp:=rtp}));
	/* increment sequence + timestamp for next transmit */
	g_tx_next_seq := g_tx_next_seq + 1;
	g_tx_next_ts := g_tx_next_ts + (g_cfg.tx_samplerate_hz / (1000 / g_cfg.tx_duration_ms));
}

function f_main() runs on RTP_Emulation_CT
{
	var Result res;
	var boolean is_rtcp

	timer T_transmit := int2float(g_cfg.tx_duration_ms)/1000.0;
	var RTP_RecvFrom rx_rtp;
	var RtpemConfig cfg;
	var template RTP_RecvFrom tr := {
		connId := ?,
		remName := ?,
		remPort := ?,
		locName := ?,
		locPort := ?,
		msg := ?
	};
	var template RTP_RecvFrom tr_rtp := tr;
	var template RTP_RecvFrom tr_rtcp := tr;
	tr_rtp.msg := { rtp := ? };
	tr_rtcp.msg := { rtcp := ? };

	var template ASP_Event tr_conn_refuse := {result := { errorCode := ERROR_SOCKET,
							      connId := ?,
							      os_error_code := 111,
							      os_error_text := ? /* "Connection refused" */}};

	g_iuup_ent := valueof(t_IuUP_Entity(g_cfg.iuup_tx_init));

	while (true) {
	alt {
		/* control procedures (calls) from the user */
		[] CTRL.getcall(RTPEM_bind:{?,?}) -> param(g_local_host, g_local_port) {
			if (g_local_port rem 2 == 1) {
				//CTRL.raise(RTPEM_bind, "Local Port is not an even port number!");
				log("Local Port is not an even port number!");
				continue;
			}

			g_tx_connected := false; /* will set it back to true upon next connect() call */

			if (g_rtp_conn_id != -1) {
				res := RTP_CodecPort_CtrlFunct.f_IPL4_close(RTP, g_rtp_conn_id, {udp := {}});
				g_rtp_conn_id := -1;
			}
			res := RTP_CodecPort_CtrlFunct.f_IPL4_listen(RTP, g_local_host,
								g_local_port, {udp:={}});
			if (not ispresent(res.connId)) {
				setverdict(fail, "Could not listen on RTP socket, check your configuration");
				mtc.stop;
			}
			g_rtp_conn_id := res.connId;
			tr_rtp.connId := g_rtp_conn_id;

			if (g_rtcp_conn_id != -1) {
				res := RTP_CodecPort_CtrlFunct.f_IPL4_close(RTCP, g_rtcp_conn_id, {udp := {}});
				g_rtcp_conn_id := -1;
			}
			res := RTP_CodecPort_CtrlFunct.f_IPL4_listen(RTCP, g_local_host,
								g_local_port+1, {udp:={}});
			if (not ispresent(res.connId)) {
				setverdict(fail, "Could not listen on RTCP socket, check your configuration");
				mtc.stop;
			}
			g_rtcp_conn_id := res.connId;
			tr_rtcp.connId := g_rtcp_conn_id;
			CTRL.reply(RTPEM_bind:{g_local_host, g_local_port});
		}
		[] CTRL.getcall(RTPEM_connect:{?,?}) -> param (g_remote_host, g_remote_port) {
			if (g_remote_port rem 2 == 1) {
				//CTRL.raise(RTPEM_connect, "Remote Port is not an even number!");
				log("Remote Port is not an even number!");
				continue;
			}
			res := RTP_CodecPort_CtrlFunct.f_IPL4_connect(RTP, g_remote_host,
								g_remote_port,
								g_local_host, g_local_port,
								g_rtp_conn_id, {udp:={}});
			if (not ispresent(res.connId)) {
				setverdict(fail, "Could not connect to RTP socket, check your configuration");
				mtc.stop;
			}
			res := RTP_CodecPort_CtrlFunct.f_IPL4_connect(RTCP, g_remote_host,
								g_remote_port+1,
								g_local_host, g_local_port+1,
								g_rtcp_conn_id, {udp:={}});
			if (not ispresent(res.connId)) {
				setverdict(fail, "Could not connect to RTCP socket, check your configuration");
				mtc.stop;
			}
			g_tx_connected := true;
			CTRL.reply(RTPEM_connect:{g_remote_host, g_remote_port});
		}
		[] CTRL.getcall(RTPEM_mode:{RTPEM_MODE_NONE}) {
			T_transmit.stop;
			g_rx_enabled := false;
			CTRL.reply(RTPEM_mode:{RTPEM_MODE_NONE});
		}
		[] CTRL.getcall(RTPEM_mode:{RTPEM_MODE_TXONLY}) {
			/* start transmit timer */
			T_transmit.start;
			g_rx_enabled := false;
			CTRL.reply(RTPEM_mode:{RTPEM_MODE_TXONLY});
		}
		[] CTRL.getcall(RTPEM_mode:{RTPEM_MODE_RXONLY}) {

			T_transmit.stop;
			if (g_rx_enabled == false) {
				/* flush queues */
				RTP.clear;
				RTCP.clear;
				g_rx_enabled := true;
			}
			CTRL.reply(RTPEM_mode:{RTPEM_MODE_RXONLY});
		}
		[] CTRL.getcall(RTPEM_mode:{RTPEM_MODE_BIDIR}) {
			T_transmit.start;
			if (g_rx_enabled == false) {
				/* flush queues */
				RTP.clear;
				RTCP.clear;
				g_rx_enabled := true;
			}
			CTRL.reply(RTPEM_mode:{RTPEM_MODE_BIDIR});
		}
		[] CTRL.getcall(RTPEM_configure:{?}) -> param (cfg) {
			g_cfg := cfg;
			g_iuup_ent.cfg.active_init := g_cfg.iuup_tx_init;
			CTRL.reply(RTPEM_configure:{cfg});
		}
		[] CTRL.getcall(RTPEM_stats_get:{?, ?}) -> param (is_rtcp) {
			if (is_rtcp) {
				CTRL.reply(RTPEM_stats_get:{g_stats_rtcp, is_rtcp});
			} else {
				CTRL.reply(RTPEM_stats_get:{g_stats_rtp, is_rtcp});
			}
		}
		[] CTRL.getcall(RTPEM_conn_refuse_expect:{?}) -> param(g_conn_refuse_expect) {
			CTRL.reply(RTPEM_conn_refuse_expect:{g_conn_refuse_expect});
		}
		[] CTRL.getcall(RTPEM_conn_refuse_received:{?}) {
			CTRL.reply(RTPEM_conn_refuse_received:{g_conn_refuse_received});
		}


		/* simply ignore any RTTP/RTP if receiver not enabled */
		[g_rx_enabled==false] RTP.receive(tr_rtp) {
			g_stats_rtp.num_pkts_rx_err_disabled := g_stats_rtp.num_pkts_rx_err_disabled+1;
			}
		[g_rx_enabled==false] RTCP.receive(tr_rtcp) {
			g_stats_rtcp.num_pkts_rx_err_disabled := g_stats_rtcp.num_pkts_rx_err_disabled+1;
			}

		/* process received RTCP/RTP if receiver enabled */
		[g_rx_enabled] RTP.receive(tr_rtp) -> value rx_rtp {
			/* increment counters */
			if (rx_rtp.msg.rtp.payload_type != g_cfg.tx_payload_type) {
				g_stats_rtp.num_pkts_rx_err_pt := g_stats_rtp.num_pkts_rx_err_pt+1;
			}
			g_stats_rtp.num_pkts_rx := g_stats_rtp.num_pkts_rx+1;
			g_stats_rtp.bytes_payload_rx := g_stats_rtp.bytes_payload_rx +
								lengthof(rx_rtp.msg.rtp.data);
			if (ispresent(g_cfg.rx_fixed_payload) and rx_rtp.msg.rtp.data != g_cfg.rx_fixed_payload) {
				g_stats_rtp.num_pkts_rx_err_payload := g_stats_rtp.num_pkts_rx_err_payload + 1;
			}
			if (g_cfg.iuup_mode) {
				rx_rtp.msg.rtp.data := f_IuUP_Em_rx_decaps(g_iuup_ent, rx_rtp.msg.rtp.data);
			}
		}
		[g_rx_enabled] RTCP.receive(tr_rtcp) -> value rx_rtp {
			//log("RX RTCP: ", rx_rtp);
			g_stats_rtcp.num_pkts_rx := g_stats_rtcp.num_pkts_rx+1;
		}

		/* transmit if timer has expired */
		[g_tx_connected] T_transmit.timeout {
			/* send one RTP frame, re-start timer */
			f_tx_rtp(g_cfg.tx_fixed_payload);
			T_transmit.start;
			/* update counters */
			g_stats_rtp.num_pkts_tx := g_stats_rtp.num_pkts_tx+1;
			g_stats_rtp.bytes_payload_tx := g_stats_rtp.bytes_payload_tx +
								lengthof(g_cfg.tx_fixed_payload);
		}

		/* connection refused */
		[g_conn_refuse_expect] RTP.receive(tr_conn_refuse) {
			log("Connection refused (expected)");
			g_conn_refuse_received := true;
		}
		[not g_conn_refuse_expect] RTP.receive(tr_conn_refuse) {
			setverdict(fail, "Connection refused (unexpected)");
			mtc.stop;
		}

		/* fail on any unexpected messages */
		[] RTP.receive {
			setverdict(fail, "Received unexpected type from RTP");
			mtc.stop;
		}
		[] RTCP.receive {
			setverdict(fail, "Received unexpected type from RTCP");
			mtc.stop;
		}
	}
	}
}


}
