-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathrmc_sub.h
146 lines (119 loc) · 5 KB
/
rmc_sub.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Copyright (C) 2018, Jaguar Land Rover
// This program is licensed under the terms and conditions of the
// Mozilla Public License, version 2.0. The full text of the
// Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
//
// Author: Magnus Feuer ([email protected])
// rmc_sub.h - Handle packets subscribed to.
// This file contains data structures and functions to handle incoming packets
// sent by one or more publishers.
// The functions in here are network agnostic and only deals with
// driving packet meta-data and payload through their maturation
// process described below.
//
#ifndef __REL_MCAST_SUB_H__
#define __REL_MCAST_SUB_H__
#include "reliable_multicast.h"
#include "rmc_list.h"
//
// Packet has been received via multicast or tcp.
// Packet can be in one of the following queues.
//
// publisher->received
// Packet has been received but cannot be processed due to missing packets
// with lower pids, forming holes in the receive stream.
//
// publisher->dispatch_ready.
// Packet is ready to be procesed by the caller through sub_get_next_dispatch_ready()
// calls.
//
// publisher->ack_ready
// The packet has been dispatched and is ready to be acknowledged back to the
// the publisner.
//
//
// Call sequence to propagate packet is:
//
// sub_packet_received()
// Packet has been received from network and is pending processing.
//
// sub_process_received_packets()
// Move all consequtive packets, ready to be dispatched, from received to dispatch_ready queue.
//
// sub_packet_dispatched()
// Packet has been processed by caller. Move from dispatch_ready to ack_ready queue.
//
// sub_packet_acknowledged()
// Packet has been acknowledged to publisher and can be freed.
// **Please note that the caller still has to free packet->payload**
//
typedef struct sub_packet {
struct sub_publisher* publisher; // Publisher that sent this packet.
// Packet ID as received from network.
packet_id_t pid;
// Payload data allocation is done by caller and provided via
// rmc_packet_received() call.
// Caller needs to free payload once sub_packet_acknowledged() has
// been called.
void *payload;
// Payload length provided by pub_queue_packet()
payload_len_t payload_len;
// Set to 1 if this packet does not need to be acked.
// Used when we receive packet via tcp.
uint8_t skip_acknowledgement;
// Provided by sub_packet_received()
// Retrieved by sub_packet_user_data()
user_data_t pkg_user_data;
} sub_packet_t;
RMC_LIST(sub_packet_list, sub_packet_node, sub_packet_t*)
typedef sub_packet_list sub_packet_list_t;
typedef sub_packet_node sub_packet_node_t;
// Used by sub_get_ack_sub_pid_intervals
typedef struct sub_pid_interval {
// First packet ID in interval
packet_id_t first_pid;
// Last packet ID in interval
packet_id_t last_pid;
// Timestamp when we received the first pid covered by the
// interval.
usec_timestamp_t receive_ts;
} sub_pid_interval_t;
RMC_LIST(sub_pid_interval_list, sub_pid_interval_node, sub_pid_interval_t)
typedef sub_pid_interval_list sub_pid_interval_list_t;
typedef sub_pid_interval_node sub_pid_interval_node_t;
// A publisher is a feed from a single packet publisher that
// is being processed. It contains the state necessary to drive
// packets toward the cycle described in sub_packet_t aboce.
//
typedef struct sub_publisher {
packet_id_t max_pid_ready; // Highest pid that is ready to be dispatched.
packet_id_t max_pid_received; // Maximum PID received.
// Packets received but need additional packets.
// Sorted on ascending pid
sub_packet_list_t received_pid;
// Received packet intervals.
// Filled by sub_packet_received().
// Depletd by rmc_sub_timeout()
//
sub_pid_interval_list received_interval;
} sub_publisher_t;
extern void sub_init_publisher(sub_publisher_t* pub);
extern int sub_packet_is_duplicate(sub_publisher_t* pub,
packet_id_t pid);
extern int sub_packet_received(sub_publisher_t* pub,
packet_id_t pid,
void* payload,
payload_len_t payload_len,
char store_receive_interval_data,
usec_timestamp_t current_ts,
user_data_t pkg_user_data);
// Go through all received packets and move those that are ready to
// be dispathed to the ready queue
// Should be called after one or more calls to sub_receive_packet()
extern void sub_process_received_packets(sub_publisher_t* pub, sub_packet_list_t* dispatch_ready);
extern void sub_reset_publisher(sub_publisher_t*,
void (*)(void*, payload_len_t, user_data_t));;
extern usec_timestamp_t sub_oldest_unacknowledged_packet(sub_publisher_t* pub);
extern user_data_t sub_packet_user_data(sub_packet_t* pack);
extern int sub_packet_add_to_received_interval(sub_publisher_t* pub, packet_id_t pid);
#endif // __REL_MCAST_SUB__