Skip to content

Commit 3010bac

Browse files
committed
Sink service: Add a maximum number of DL packets in parallel
This an optional feature that is by default unset
1 parent 75a76bc commit 3010bac

File tree

3 files changed

+79
-9
lines changed

3 files changed

+79
-9
lines changed

sink_service/source/data.c

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,39 @@ static char * m_interface = NULL;
2929
/** Bus slot used to register the Vtable */
3030
static sd_bus_slot * m_slot = NULL;
3131

32+
/** Max mtu size of sink */
33+
static size_t m_max_mtu;
34+
35+
/* Max number of downlink packet being sent in parallel */
36+
static size_t m_downlink_limit;
37+
3238
/**********************************************************************
3339
* DBUS Methods implementation *
3440
**********************************************************************/
41+
42+
static uint8_t m_message_queued_in_sink = 0;
43+
44+
static void on_data_sent_cb(uint16_t pduid, uint32_t buffering_delay, uint8_t result)
45+
{
46+
m_message_queued_in_sink -= (uint8_t) (pduid >> 8);
47+
LOGD("Message sent %d, Message_queued: %d\n", pduid, m_message_queued_in_sink);
48+
}
49+
3550
/**
3651
* \brief Send a message handler
3752
* \param ... (from sd_bus function signature)
3853
*/
3954
static int send_message(sd_bus_message * m, void * userdata, sd_bus_error * error)
4055
{
56+
static uint8_t m_pdu_id = 0;
57+
4158
app_message_t message;
4259
app_res_e res;
4360
const void * data;
4461
size_t n;
4562
int r;
4663
uint8_t qos;
64+
uint8_t weight;
4765

4866
/* Read the parameters */
4967
r = sd_bus_message_read(m,
@@ -77,21 +95,46 @@ static int send_message(sd_bus_message * m, void * userdata, sd_bus_error * erro
7795
message.bytes = data;
7896
message.num_bytes = n;
7997

98+
if (m_downlink_limit > 0)
99+
{
100+
/* Check if message can be queued */
101+
weight = (n + m_max_mtu - 1) / m_max_mtu;
102+
if (m_message_queued_in_sink + weight > m_downlink_limit)
103+
{
104+
// No point to try sending data, queue is already full
105+
return sd_bus_reply_method_return(m, "u", APP_RES_OUT_OF_MEMORY);
106+
}
107+
108+
/* Keep track of packet queued on the sink */
109+
/* Encode weight in ID */
110+
message.pdu_id = weight << 8 | m_pdu_id++;
111+
message.on_data_sent_cb = on_data_sent_cb;
112+
}
113+
else
114+
{
115+
message.pdu_id = 0;
116+
message.on_data_sent_cb = NULL;
117+
118+
}
119+
80120
LOGD("Message to send on EP %d from EP %d to 0x%x size = %d\n",
81121
message.dst_ep,
82122
message.src_ep,
83123
message.dst_addr,
84124
message.num_bytes);
85125

86-
/* Send packet. For now, packets are not tracked to keep behavior simpler */
87-
message.pdu_id = 0;
88-
message.on_data_sent_cb = NULL;
126+
89127

90128
res = WPC_send_data_with_options(&message);
91129
if (res != APP_RES_OK)
92130
{
93131
LOGE("Cannot send data: %d\n", res);
94132
}
133+
else if (m_downlink_limit > 0)
134+
{
135+
m_message_queued_in_sink += weight;
136+
LOGI("Message_queued: %d\n", m_message_queued_in_sink);
137+
}
95138

96139
return sd_bus_reply_method_return(m, "u", res);
97140
}
@@ -203,17 +246,24 @@ static const sd_bus_vtable data_vtable[] = {
203246

204247
SD_BUS_VTABLE_END};
205248

206-
int Data_Init(sd_bus * bus, char * object, char * interface)
249+
int Data_Init(sd_bus * bus, char * object, char * interface, size_t downlink_limit)
207250
{
208251
int ret;
209252

210253
m_bus = bus;
211254
m_object = object;
212255
m_interface = interface;
256+
m_downlink_limit = downlink_limit;
213257

214258
/* Register for all data */
215259
WPC_register_for_data(onDataReceived);
216260

261+
if (WPC_get_mtu((uint8_t *) &m_max_mtu) != APP_RES_OK)
262+
{
263+
LOGW("Cannot read max mtu from node");
264+
m_max_mtu = 102;
265+
}
266+
217267
/* Install the data vtable */
218268
ret = sd_bus_add_object_vtable(bus, &m_slot, object, interface, data_vtable, NULL);
219269
if (ret < 0)

sink_service/source/data.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
* The sd_bus instance to publish the config interface
1616
*\param object
1717
*\param interface
18+
*\param downlink_limit
19+
If > 0, max number of downlink messages being queued in parallel
1820
* \return 0 if initialization succeed, an error code otherwise
1921
* \note Connection with sink must be ready before calling this module
2022
*/
21-
int Data_Init(sd_bus * bus, char * object, char * interface);
23+
int Data_Init(sd_bus * bus, char * object, char * interface, size_t downlink_limit);
2224

2325
void Data_Close();
2426

sink_service/source/main.c

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,15 @@ static bool get_service_name(char service_name[MAX_SIZE_SERVICE_NAME], unsigned
7474
* Pointer where to store max_poll_fail_duration value (if any)
7575
* \param fragment_max_duration_s
7676
* Pointer where to store fragment_max_duration_s value (if any)
77+
* \param downlink_limit
78+
* Pointer where to store downlink_limit value (if any)
7779
*/
7880
static void get_env_parameters(unsigned long * baudrate,
7981
char ** port_name,
8082
unsigned int * sink_id,
8183
unsigned int * max_poll_fail_duration,
82-
unsigned int * fragment_max_duration_s)
84+
unsigned int * fragment_max_duration_s,
85+
unsigned int * downlink_limit)
8386
{
8487
char * ptr;
8588

@@ -110,6 +113,11 @@ static void get_env_parameters(unsigned long * baudrate,
110113
*fragment_max_duration_s = strtoul(ptr, NULL, 0);
111114
LOGI("WM_GW_SINK_MAX_FRAGMENT_DURATION_S: %lu\n", *fragment_max_duration_s);
112115
}
116+
if ((ptr = getenv("WM_GW_SINK_DOWNLINK_LIMIT")) != NULL)
117+
{
118+
*downlink_limit = strtoul(ptr, NULL, 0);
119+
LOGI("WM_GW_SINK_DOWNLINK_LIMIT: %lu\n", *downlink_limit);
120+
}
113121
}
114122

115123
// Usual baudrate to test in automatic mode
@@ -147,13 +155,14 @@ int main(int argc, char * argv[])
147155
unsigned int sink_id = 0;
148156
unsigned int max_poll_fail_duration = UNDEFINED_MAX_POLL_FAIL_DURATION;
149157
unsigned int fragment_max_duration_s = DEFAULT_FRAGMENT_MAX_DURATION_S;
158+
unsigned int downlink_limit = true;
150159

151160
/* Acquires environment parameters */
152161
get_env_parameters(&baudrate, &port_name, &sink_id, &max_poll_fail_duration,
153-
&fragment_max_duration_s);
162+
&fragment_max_duration_s, &downlink_limit);
154163

155164
/* Parse command line arguments - take precedence over environmental ones */
156-
while ((c = getopt(argc, argv, "b:p:i:d:f:")) != -1)
165+
while ((c = getopt(argc, argv, "b:p:i:d:f:l:")) != -1)
157166
{
158167
switch (c)
159168
{
@@ -176,6 +185,9 @@ int main(int argc, char * argv[])
176185
case 'f':
177186
fragment_max_duration_s = strtoul(optarg, NULL, 0);
178187
break;
188+
case 'l':
189+
downlink_limit = strtoul(optarg, NULL, 0);
190+
break;
179191
case '?':
180192
default:
181193
LOGE("Error in argument parsing\n");
@@ -184,6 +196,12 @@ int main(int argc, char * argv[])
184196
}
185197
}
186198

199+
if (downlink_limit > 16)
200+
{
201+
LOGE("Max downlink limit is 16 (%d)\n", downlink_limit);
202+
return EXIT_FAILURE;
203+
}
204+
187205
/* Generate full service name */
188206
if (!get_service_name(full_service_name, sink_id))
189207
{
@@ -263,7 +281,7 @@ int main(int argc, char * argv[])
263281
goto finish;
264282
}
265283

266-
if (Data_Init(m_bus, "/com/wirepas/sink", "com.wirepas.sink.data1") < 0)
284+
if (Data_Init(m_bus, "/com/wirepas/sink", "com.wirepas.sink.data1", downlink_limit) < 0)
267285
{
268286
LOGE("Cannot initialize data module\n");
269287
r = -1;

0 commit comments

Comments
 (0)