Take into account Marker bit when patching RTP stream

On a deployed osmo-mgw with RTP traffic coming from a thirdparty
RTP source, it was usual to see log messages like following one from
time to time:
"The input timestamp has an alignment error of 159 on SSRC"

Doing a quick traffic analysis showed that the above mentioned RTP
source was generating traffic from time to time containing RTP packets
with the Marker (M) bit.

Those messages were logged because the verification & patching funcions
in osmo-mgw were not Marker-bit aware. Hence, this patch implements
support for Marker bit when handling RTP packets.

The Marker bit is usually used as a start of a talkspurt, and has to be
considered a syncrhonization point, where timestamp and relation to real
time don't need to match with last received RTP packet in the stream.

Related: SYS#5498
Change-Id: I1fb449eda49e82607649122b9b9d983a9e5983fa
diff --git a/include/osmocom/mgcp/mgcp_network.h b/include/osmocom/mgcp/mgcp_network.h
index 75b342d..d6d90dd 100644
--- a/include/osmocom/mgcp/mgcp_network.h
+++ b/include/osmocom/mgcp/mgcp_network.h
@@ -176,4 +176,4 @@
 /* internal RTP Annex A counting */
 void mgcp_rtp_annex_count(const struct mgcp_endpoint *endp, struct mgcp_rtp_state *state,
 			const uint16_t seq, const int32_t transit,
-			const uint32_t ssrc);
+			const uint32_t ssrc, const bool marker_bit);
diff --git a/src/libosmo-mgcp/mgcp_network.c b/src/libosmo-mgcp/mgcp_network.c
index d18c7cb..82dc6ec 100644
--- a/src/libosmo-mgcp/mgcp_network.c
+++ b/src/libosmo-mgcp/mgcp_network.c
@@ -285,13 +285,21 @@
 				       struct mgcp_rtp_state *state,
 				       const struct mgcp_rtp_end *rtp_end,
 				       const struct osmo_sockaddr *addr,
-				       int16_t delta_seq, uint32_t in_timestamp)
+				       int16_t delta_seq, uint32_t in_timestamp,
+				       bool marker_bit)
 {
 	int32_t tsdelta = state->packet_duration;
 	int timestamp_offset;
 	uint32_t out_timestamp;
 	char ipbuf[INET6_ADDRSTRLEN];
 
+	if (marker_bit) {
+		/* If RTP pkt contains marker bit, the timestamps are not longer
+		 * in sync, so we can erase timestamp offset patching. */
+		state->patch.timestamp_offset = 0;
+		return 0;
+	}
+
 	if (tsdelta == 0) {
 		tsdelta = state->out_stream.last_tsdelta;
 		if (tsdelta != 0) {
@@ -335,13 +343,19 @@
 				      struct mgcp_rtp_state *state,
 				      const struct mgcp_rtp_end *rtp_end,
 				      const struct osmo_sockaddr *addr,
-				      uint32_t timestamp)
+				      uint32_t timestamp, bool marker_bit)
 {
 	char ipbuf[INET6_ADDRSTRLEN];
 	int ts_error = 0;
 	int ts_check = 0;
 	int ptime = state->packet_duration;
 
+	if (marker_bit) {
+		/* If RTP pkt contains marker bit, the timestamps are not longer
+		 * in sync, so no alignment is needed. */
+		return 0;
+	}
+
 	/* Align according to: T + Toffs - Tlast = k * Tptime */
 
 	ts_error = ts_alignment_error(&state->out_stream, ptime,
@@ -414,12 +428,13 @@
 
 void mgcp_rtp_annex_count(const struct mgcp_endpoint *endp,
 			  struct mgcp_rtp_state *state, const uint16_t seq,
-			  const int32_t transit, const uint32_t ssrc)
+			  const int32_t transit, const uint32_t ssrc,
+			  const bool marker_bit)
 {
 	int32_t d;
 
 	/* initialize or re-initialize */
-	if (!state->stats.initialized || state->stats.ssrc != ssrc) {
+	if (!state->stats.initialized || state->stats.ssrc != ssrc || marker_bit) {
 		state->stats.initialized = 1;
 		state->stats.base_seq = seq;
 		state->stats.max_seq = seq - 1;
@@ -503,6 +518,7 @@
 	int32_t transit;
 	uint16_t seq;
 	uint32_t timestamp, ssrc;
+	bool marker_bit;
 	struct rtp_hdr *rtp_hdr;
 	int payload = rtp_end->codec->payload_type;
 	unsigned int len = msgb_length(msg);
@@ -515,9 +531,10 @@
 	timestamp = ntohl(rtp_hdr->timestamp);
 	arrival_time = get_current_ts(rtp_end->codec->rate);
 	ssrc = ntohl(rtp_hdr->ssrc);
+	marker_bit = !!rtp_hdr->marker;
 	transit = arrival_time - timestamp;
 
-	mgcp_rtp_annex_count(endp, state, seq, transit, ssrc);
+	mgcp_rtp_annex_count(endp, state, seq, transit, ssrc, marker_bit);
 
 	if (!state->initialized) {
 		state->initialized = 1;
@@ -571,7 +588,7 @@
 			    state->packet_duration;
 
 			adjust_rtp_timestamp_offset(endp, state, rtp_end, addr,
-						    delta_seq, timestamp);
+						    delta_seq, timestamp, marker_bit);
 
 			state->patch.patch_ssrc = true;
 			ssrc = state->patch.orig_ssrc;
@@ -589,10 +606,14 @@
 
 		state->in_stream.last_tsdelta = 0;
 	} else {
-		/* Compute current per-packet timestamp delta */
-		check_rtp_timestamp(endp, state, &state->in_stream, rtp_end,
-				    addr, seq, timestamp, "input",
-				    &state->in_stream.last_tsdelta);
+		if (!marker_bit) {
+			/* Compute current per-packet timestamp delta */
+			check_rtp_timestamp(endp, state, &state->in_stream, rtp_end,
+					    addr, seq, timestamp, "input",
+					    &state->in_stream.last_tsdelta);
+		} else {
+			state->in_stream.last_tsdelta = 0;
+		}
 
 		if (state->patch.patch_ssrc)
 			ssrc = state->patch.orig_ssrc;
@@ -607,7 +628,7 @@
 	    state->out_stream.ssrc == ssrc && state->packet_duration)
 		/* Align the timestamp offset */
 		align_rtp_timestamp_offset(endp, state, rtp_end, addr,
-					   timestamp);
+					   timestamp, marker_bit);
 
 	/* Store the updated SSRC back to the packet */
 	if (state->patch.patch_ssrc)
@@ -622,10 +643,14 @@
 	rtp_hdr->timestamp = htonl(timestamp);
 
 	/* Check again, whether the timestamps are still valid */
-	if (state->out_stream.ssrc == ssrc)
-		check_rtp_timestamp(endp, state, &state->out_stream, rtp_end,
-				    addr, seq, timestamp, "output",
-				    &state->out_stream.last_tsdelta);
+	if (!marker_bit) {
+		if (state->out_stream.ssrc == ssrc)
+			check_rtp_timestamp(endp, state, &state->out_stream, rtp_end,
+					    addr, seq, timestamp, "output",
+					    &state->out_stream.last_tsdelta);
+	} else {
+		state->out_stream.last_tsdelta = 0;
+	}
 
 	/* Save output values */
 	state->out_stream.last_seq = seq;
diff --git a/tests/mgcp/mgcp_test.c b/tests/mgcp/mgcp_test.c
index 6978b1d..7397f5c 100644
--- a/tests/mgcp/mgcp_test.c
+++ b/tests/mgcp/mgcp_test.c
@@ -1282,6 +1282,15 @@
 	/* RTP: SeqNo=1002, TS=160320 */
 	{2.040000, 20, "\x80\x62\x03\xEA\x00\x02\x72\x40\x50\x60\x70\x80"
 	 "\x01\x23\x45\x67\x89\xAB\xCD\xEF"},
+	/* RTP: SeqNo=1003, TS=180320, Marker */
+	{2.060000, 20, "\x80\xE2\x03\xEB\x00\x02\xC0\x60\x50\x60\x70\x80"
+	 "\x01\x23\x45\x67\x89\xAB\xCD\xEF"},
+	 /* RTP: SeqNo=1004, TS=180480 */
+	{2.080000, 20, "\x80\x62\x03\xEC\x00\x02\xC1\x00\x50\x60\x70\x80"
+	 "\x01\x23\x45\x67\x89\xAB\xCD\xEF"},
+	 /* RTP: SeqNo=1005, TS=180480, 10ms too late */
+	{2.110000, 20, "\x80\x62\x03\xED\x00\x02\xC1\xA0\x50\x60\x70\x80"
+	 "\x01\x23\x45\x67\x89\xAB\xCD\xEF"},
 };
 
 static void test_packet_error_detection(int patch_ssrc, int patch_ts)
@@ -1566,24 +1575,24 @@
 
 	OSMO_ASSERT(conn->state.stats.initialized == 0);
 
-	mgcp_rtp_annex_count(endp, &conn->state, 0, 0, 2342);
+	mgcp_rtp_annex_count(endp, &conn->state, 0, 0, 2342, false);
 	OSMO_ASSERT(conn->state.stats.initialized == 1);
 	OSMO_ASSERT(conn->state.stats.cycles == 0);
 	OSMO_ASSERT(conn->state.stats.max_seq == 0);
 
-	mgcp_rtp_annex_count(endp, &conn->state, 1, 0, 2342);
+	mgcp_rtp_annex_count(endp, &conn->state, 1, 0, 2342, false);
 	OSMO_ASSERT(conn->state.stats.initialized == 1);
 	OSMO_ASSERT(conn->state.stats.cycles == 0);
 	OSMO_ASSERT(conn->state.stats.max_seq == 1);
 
 	/* now jump.. */
-	mgcp_rtp_annex_count(endp, &conn->state, UINT16_MAX, 0, 2342);
+	mgcp_rtp_annex_count(endp, &conn->state, UINT16_MAX, 0, 2342, false);
 	OSMO_ASSERT(conn->state.stats.initialized == 1);
 	OSMO_ASSERT(conn->state.stats.cycles == 0);
 	OSMO_ASSERT(conn->state.stats.max_seq == UINT16_MAX);
 
 	/* and wrap */
-	mgcp_rtp_annex_count(endp, &conn->state, 0, 0, 2342);
+	mgcp_rtp_annex_count(endp, &conn->state, 0, 0, 2342, false);
 	OSMO_ASSERT(conn->state.stats.initialized == 1);
 	OSMO_ASSERT(conn->state.stats.cycles == UINT16_MAX + 1);
 	OSMO_ASSERT(conn->state.stats.max_seq == 0);
diff --git a/tests/mgcp/mgcp_test.ok b/tests/mgcp/mgcp_test.ok
index 575fd83..94fada3 100644
--- a/tests/mgcp/mgcp_test.ok
+++ b/tests/mgcp/mgcp_test.ok
Binary files differ