forked from VrayoSystems/vtrunkd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
linkfd.c
8552 lines (7819 loc) · 428 KB
/
linkfd.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
vtrunkd - Virtual Tunnel Trunking over TCP/IP network.
Copyright (C) 2011-2016 Vrayo Systems Ltd. team
Vtrunkd has been derived from VTUN package by Maxim Krasnyansky.
vtun Copyright (C) 1998-2000 Maxim Krasnyansky <[email protected]>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
*
/*
* linkfd.c,v 1.4.2.15.2.2 2006/11/16 04:03:23 mtbishop Exp
*/
/*
* To fully utilize all capabilities you need linux kernel of at least >=2.6.25
*/
/*
* TODO:
* - collect LOSS stats: Packets Between Loss (PBL); Packets Sequentially Lost (PSL)
* - overcome rtt,rtt2 < 1ms limitation(s)
* - dynamic buffer: fixed size in MB (e.g. 5MB), dynamic packet list (Start-Byte-Rel; End-Byte-Rel)
* - stable channels with stabilizing weights
*
*/
#define _GNU_SOURCE
#include "config.h"
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <strings.h>
#include <string.h>
#include <errno.h>
#include <sys/time.h>
#include <syslog.h>
#include <time.h>
#include <semaphore.h>
#include <stdint.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <inttypes.h>
#include <math.h>
#include <sys/mman.h>
#ifdef PROF
#include <dlfcn.h>
#endif
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h>
#endif
#ifdef HAVE_SCHED_H
#include <sched.h>
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif
#ifdef HAVE_NETINET_TCP_H
#include <netinet/tcp.h>
#include <netinet/udp.h>
#include <netinet/ip_icmp.h>
#endif
#include "udp_states.h"
#include "vtun.h"
#include "linkfd.h"
#include "lib.h"
#include "log.h"
#include "driver.h"
#include "net_structs.h"
#include "netlib.h"
#include "netlink_socket_info.h"
#include "speed_algo.h"
#include "timer.h"
#include "pid.h"
#ifdef TESTING
#include "testing.h"
#endif
#include "packet_code.h"
#include <stdarg.h>
struct my_ip {
u_int8_t ip_vhl; /* header length, version */
#define IP_V(ip) (((ip)->ip_vhl & 0xf0) >> 4)
#define IP_HL(ip) ((ip)->ip_vhl & 0x0f)
u_int8_t ip_tos; /* type of service */
u_int16_t ip_len; /* total length */
u_int16_t ip_id; /* identification */
u_int16_t ip_off; /* fragment offset field */
#define IP_DF 0x4000 /* dont fragment flag */
#define IP_MF 0x2000 /* more fragments flag */
#define IP_OFFMASK 0x1fff /* mask for fragmenting bits */
u_int8_t ip_ttl; /* time to live */
u_int8_t ip_p; /* protocol */
u_int16_t ip_sum; /* checksum */
struct in_addr ip_src,ip_dst; /* source and dest address */
};
#define CH_THRESH 30
#define CS_THRESH 60
#define SEND_Q_IDLE 7000 // send_q less than this enters idling mode; e.g. head is detected by rtt
#define SEND_Q_LIMIT_MINIMAL 9000 // 7000 seems to work
#define SPEED_MINIMAL 100000.0 // 100kb/s minimal speed
#define SENQ_Q_LIMIT_THRESHOLD_MIN 13000 // the value with which that AG starts
//#define SENQ_Q_LIMIT_THRESHOLD_MULTIPLIER 10 // send_q AG allowed threshold = RSR / SENQ_Q_LIMIT_THRESHOLD_MULTIPLIER
#define RATE_THRESHOLD_MULTIPLIER 5 // cut-off by speed only
#define RTT_THRESHOLD_MULTIPLIER 3 // cut-off by RTT only
#define RTT_THRESHOLD_GOOD 50 // cut-off by RTT ms
#define SEND_Q_EFF_WORK 10000 // value for send_q_eff to detect that channel is in use
#define ACS_NOT_IDLE 50000 // ~50pkts/sec ~= 20ms rtt2 accuracy
#define LOSS_SEND_Q_MAX 1000 // maximum send_q allowed is now 1000 (for head?)
#define LOSS_SEND_Q_UNKNOWN -1 // unknown value
#define MIN_SEND_Q_BESTGUESS_3G_PKT 150 // packets best-guess for 3G
// TODO: use mean send_q value for the following def
#define SEND_Q_AG_ALLOWED_THRESH 25000 // depends on RSR_TOP and chan speed. TODO: refine, Q: understand if we're using more B/W than 1 chan has?
//#define MAX_LATENCY_DROP { 0, 550000 }
#define MAX_NETWORK_STALL_MS 250 // 50ms maximum network stall
#define MAX_NETWORK_STALL { 0, MAX_NETWORK_STALL_MS*1000 } // 50ms maximum network stall
#define MAX_LATENCY_DROP_USEC 200000 // typ. is 204-250 upto 450 max RTO at CUBIC
#define MAX_LATENCY_DROP_SHIFT 100 // ms. to add to forced_rtt - or use above
//#define MAX_REORDER_LATENCY { 0, 50000 } // is rtt * 2 actually, default. ACTUALLY this should be in compliance with TCP RTO
#define MAX_REORDER_LATENCY_MAX 499999 // usec
#define MAX_REORDER_LATENCY_MIN 200 // usec
#define MAX_REORDER_PERPATH 8// was 4
// TODO HERE: TCP Model requried ---vvv
#define PLP_UNRECOVERABLE_CUTOFF 10000 // in theory about 50 mbit/s at 20ms // was 1000 - 1000 is okay for like rtt 20ms but not for 100ms
#define PSL_RECOVERABLE 2 // we can recover this amount of loss
#define L_PBL_JOIN_EVENTS 50 // join all events within this PBL
#define DROPPING_LOSSING_DETECT_SECONDS 7 // seconds to pass after drop or loss to say we're not lossing or dropping anymore
//#define MAX_BYTE_DELIVERY_DIFF 100000 // what size of write buffer pumping is allowed? -> currently =RSR_TOP
#define SELECT_SLEEP_USEC 50000 // crucial for mean sqe calculation during idle
#define SUPERLOOP_MAX_LAG_USEC 5000 // 15ms max superloop lag allowed! // cpu lag
#define FCI_P_INTERVAL 3 // interval in packets to send ACK if ACK is not sent via payload packets
#define CUBIC_T_DIV 50
#define TMRTTA 25 // alpha coeff. for RFC6298 for tcp model rtt avg.
#define SKIP_SENDING_CLD_DIV 2
#define MSBL_PUSHDOWN_K 30
#define MSBL_PUSHUP_K 80
#define MAX_STUB_JITTER 1 // maximum packet jitter that we allow on buffer to happen
#define AGAG_MAX 255
#define SLOW_START_MAX_RUN {5, 500000} // max slow_start runtime after idle
#define SLOW_START_IMMUNE {10, 100000} // no SS allowed within this period after previous SS
#define SLOW_START_INCINT 10 // amount of packets to increase MSBL by 1 after
#define TOKENBUF_ADD_BURST 7 // amount of tokens to wait before adding to reduce integer error in add_token
// PLOSS is a "probable loss" event: it occurs if PSL=1or2 for some amount of packets AND we detected probable loss (possible_seq_lost)
// this LOSS detect method uses the fact that we never push the network with 1 or 2 packets; we always push 5+ (TODO: make sure it is true!)
#define PLOSS_PSL 2 // this is '1or2'
#define PLOSS_CHECK_PKTS 15 // how many packets to check for sequential loss to detect PLOSS TODO: find correct value. speed dependent??
#define MAX_SD_W 1700 // stat buf max send_q (0..MAX_SD_W)
#define SD_PARITY 2 // stat buf len = MAX_SD_W / SD_PARITY
#define SLOPE_POINTS 30 // how many points ( / SD_PARITY ) to make linear fit from
#define ZERO_W_THR 2000.0 // ms. when to consider weight of point =0 (value outdated)
#define SPEED_REDETECT_TV {2,0} // timeval (interval) for chan speed redetect
#define HEAD_REDETECT_HYSTERESIS_TV {0,800000} // timeval (interval) for chan speed redetect
#define HEAD_HYSTERESIS_MIN_MS 800 // this replaced the above one
#define HEAD_TRANSITION_DELAY {0, 400}
#define SPEED_REDETECT_IMMUNE_SEC 5 // (interval seconds) before next auto-redetection can occur after PROTUP - added to above timer!
#define LIN_RTT_SLOWDOWN 70 // Grow rtt 40x slower than real-time
#define LIN_FORCE_RTT_GROW 0 // ms // TODO: need to find optimal value for required performance region
#define FORCE_RTT_JITTER_THRESH_MS 30 // ms of jitter to start growing rtt (subbing?)
#define DEAD_RTT 1500 // ms. RTT to consider chan dead
#define DEAD_RSR_USG 40 // %. RSR utilization to consider chan dead if ACS=0
#define DEAD_CHANNEL_RSR 40000 // fixed RSR for dead channel
#define RSR_SMOOTH_GRAN 10 // ms granularity
#define RSR_SMOOTH_FULL 500 // ms for full convergence
#define TRAIN_PKTS 80
#define WRITE_OUT_MAX 30 // write no more than 30 packets at once
//#define NOCONTROL
//#define NO_ACK
#define FAST_PCS_PACKETS_CAN_CALC_SPEED 200 // packets count to calculate PCS speed statistically correct
#define FAST_PCS_MINIMAL_INTERVAL 50 // ms minimal interval
#define RCVBUF_SIZE 1048576
#define WHO_LOST 1
#define WHO_LAGGING 2
// #define TIMEWARP
#ifdef TIMEWARP
#define TW_MAX 10000000
char *timewarp;
int tw_cur;
#endif
#define PUSH_TO_TOP 2 // push Nth packet, 0 to disable
#define get_ds_ts(x) ((x.tv_sec - x.tv_sec / 10000000 * 10000000) * 100 + x.tv_usec / 10000)
#ifdef CPULAGCHK
#define CHKCPU(x) gettimeofday(&cpulag_tmp, NULL);timersub(&cpulag_tmp, &old_time, &tv_tmp_tmp_tmp);if(tv_tmp_tmp_tmp.tv_usec > SUPERLOOP_MAX_LAG_USEC) vlog(LOG_ERR,"WARNING! CPU deficiency detected! Cycle lag: %ld.%06ld place %d", tv_tmp_tmp_tmp.tv_sec, tv_tmp_tmp_tmp.tv_usec, x);
struct timeval cpulag_tmp;
#else
#define CHKCPU {}
#endif
char lossLog[JS_MAX] = { 0 }; // for send_q compressor
int lossLog_cur = 0;
// flags:
uint8_t time_lag_ready;
int ptt_allow_once = 0; // allow to push-to-top single packet
int skip=0;
int forced_rtt_reached=1;
int select_check=0;
sigset_t block_mask, unblock_mask;
char rxmt_mode_request = 0; // flag
long int weight = 0; // bigger weight more time to wait(weight == penalty)
long int weight_cnt = 0;
int acnt = 0; // assert variable
char *out_buf;
uint16_t dirty_seq_num;
int sendbuff;
#define START_SQL 5000
struct udp_stats udp_struct[1];
int drop_packet_flag = 0, drop_counter=0;
int skip_write_flag = 0;
// these are for retransmit mode... to be removed
short retransmit_count = 0;
char channel_mode = MODE_NORMAL;
int hold_mode = 0; // 1 - hold 0 - normal
int force_hold_mode = 1;
int buf_len, incomplete_seq_len = 0;
int16_t my_miss_packets_max = 0; // in ms; calculated here
int16_t miss_packets_max = 0; // get from another side
int proto_err_cnt = 0;
int my_max_send_q_chan_num = 0;
uint32_t my_max_send_q = 0, max_reorder_byte = 0;
uint32_t last_channels_mask = 0;
int32_t send_q_eff = 0;
int max_chan=-1;
uint32_t start_of_train = 0, end_of_train = 0;
struct timeval flood_start_time = { 0, 0 };
char *buf2;
int buf_len_real=0;
int need_send_loss_FCI_flag = 0;
#define WB_1MS_SIZE 500
int wb_1ms[WB_1MS_SIZE] = { 0 };
int wb_1ms_idx = 2, start_print = 0;
char wb_1ms_str[5000] = { '\0' };
/*Variables for the exact way of measuring speed*/
struct timeval send_q_read_time, send_q_read_timer = {0,0}, send_q_read_drop_time = {0, 100000}, send_q_mode_switch_time = {0,0}, net_model_start = {0,0};
int32_t ACK_coming_speed_avg = 0;
int32_t send_q_limit = 7000;
int32_t magic_rtt_avg = 0;
/* Host we are working with.
* Used by signal handlers that's why it is global.
*/
struct vtun_host *lfd_host;
struct conn_info *shm_conn_info;
struct lfd_mod *lfd_mod_head = NULL, *lfd_mod_tail = NULL;
struct channel_info *chan_info = NULL;
struct phisical_status info; /**< We store here all process closed information */
struct {
int packet_sent_ag;
int packet_sent_rmit;
int byte_sent_ag_full;
int byte_sent_rmit_full;
int bytes_rcvd_norm;
int bytes_rcvd_rx;
int pkts_dropped;
int rxmit_req; // outdated: use max_latency_hit + max_reorder_hit
int rxmit_req_rx;
int rxmits; // number of resended packets
int rxmits_notfound; // number of resend packets which not found
int max_latency_hit; // new
int max_reorder_hit; // new
int mode_switches;
int rxm_ntf;
int chok_not;
int max_latency_drops; // new
int bytes_sent_chan[MAX_TCP_LOGICAL_CHANNELS];
int bytes_rcvd_chan[MAX_TCP_LOGICAL_CHANNELS];
int tokens_max;
int maw;
int mar;
int skip_new_h; // skipping and sending new as we are heading
int skip_new_d; // skipping and sending new as we can deliver in time and we have no more packets
int skip_r; // skipping as a result of all computations
int skip_no; // skipping without sending
int skip_l; // skipping by getting last packet
int p_tooold;
int p_expnum;
int p_tooearly;
int web_surf_optimization;
} statb;
struct {
int expiration_ms_fromnow;
int expnum;
} log_tmp;
struct {
int v_min;
int v_avg;
int v_max;
} v_mma;
struct mini_path_desc
{
int process_num;
int rtt;
int packets_between_loss;
};
struct time_lag_info time_lag_info_arr[MAX_TCP_LOGICAL_CHANNELS];
struct time_lag time_lag_local;
struct timeval socket_timeout = { 10, 0 };
struct last_sent_packet last_sent_packet_num[MAX_TCP_LOGICAL_CHANNELS]; // initialized by 0 look for memset(..
fd_set fdset, fdset_w, *pfdset_w;
int delay_acc; // accumulated send delay
int delay_cnt;
uint32_t my_max_speed_chan;
uint32_t my_holded_max_speed;
//uint32_t my_max_send_q;
//
// Declarations.
//
int check_delivery_time_path_unsynced(int pnum, int mld_divider);
int check_delivery_time_unsynced(int mld_divider);
int check_rtt_latency_drop_chan(int chan_num);
int get_rto_usec();
int lost_buf_exists(uint32_t seq_num);
int plp_avg_pbl_unrecoverable(int pnum);
int fit_wlinear (const double *x, const size_t xstride,
const double *w, const size_t wstride,
const double *y, const size_t ystride,
const size_t n,
double *c0, double *c1,
double *cov_00, double *cov_01, double *cov_11,
double *chisq)
{
/* compute the weighted means and weighted deviations from the means */
/* wm denotes a "weighted mean", wm(f) = (sum_i w_i f_i) / (sum_i w_i) */
double W = 0, wm_x = 0, wm_y = 0, wm_dx2 = 0, wm_dxdy = 0;
size_t i;
for (i = 0; i < n; i++)
{
const double wi = w[i * wstride];
if (wi > 0)
{
W += wi;
wm_x += (x[i * xstride] - wm_x) * (wi / W);
wm_y += (y[i * ystride] - wm_y) * (wi / W);
}
}
W = 0; /* reset the total weight */
for (i = 0; i < n; i++)
{
const double wi = w[i * wstride];
if (wi > 0)
{
const double dx = x[i * xstride] - wm_x;
const double dy = y[i * ystride] - wm_y;
W += wi;
wm_dx2 += (dx * dx - wm_dx2) * (wi / W);
wm_dxdy += (dx * dy - wm_dxdy) * (wi / W);
}
}
/* In terms of y = a + b x */
{
double d2 = 0;
double b = wm_dxdy / wm_dx2;
double a = wm_y - wm_x * b;
*c0 = a;
*c1 = b;
*cov_00 = (1 / W) * (1 + wm_x * wm_x / wm_dx2);
*cov_11 = 1 / (W * wm_dx2);
*cov_01 = -wm_x / (W * wm_dx2);
/* Compute chi^2 = \sum w_i (y_i - (a + b * x_i))^2 */
for (i = 0; i < n; i++)
{
const double wi = w[i * wstride];
if (wi > 0)
{
const double dx = x[i * xstride] - wm_x;
const double dy = y[i * ystride] - wm_y;
const double d = dy - b * dx;
d2 += wi * d * d;
}
}
*chisq = d2;
}
return 1;
}
int percent_delta_equal(int A, int B, int percent) {
int delta = A>B?A-B:B-A;
if(delta > 10000000) return 0;
if(A < 2 && B < 2) {
return 1;
}
int dp = delta * 100 / (A/2 + B/2);
if(dp <= percent) {
return 1;
}
return 0;
}
int frame_llist_getLostPacket_byRange(struct frame_llist *l, struct frame_llist *l_jw, struct frame_seq *flist, struct packet_sum *packet_sum) {
int index = l_jw->rel_head;
int prevIndex = -1;
packet_sum->lostAmount = packet_sum->stop_seq - packet_sum->start_seq + 1;
#ifdef CODE_LOG
vlog(LOG_INFO, "jwb %d wb %d",l_jw->length, l->length);
#endif
uint32_t lostSeq = packet_sum->start_seq;
//search lost packet in wb_just_write_frames
while (index > -1) {
#ifdef CODE_LOG
vlog(LOG_INFO, "jwb iterate idx %d lost amount %d seq_num %"PRIu32" lost seq %"PRIu32"", index, packet_sum->lostAmount, flist[index].seq_num, lostSeq);
#endif
if ((flist[index].seq_num >= packet_sum->start_seq) && (flist[index].seq_num <= packet_sum->stop_seq)) {
packet_sum->lostAmount--;
if (lostSeq == flist[index].seq_num) {
lostSeq++;
}
} else if (flist[index].seq_num > packet_sum->stop_seq) {
return lostSeq;
}
index = flist[index].rel_next;
}
//search lost packet in write buf
index = l->rel_head;
while (index > -1) {
#ifdef CODE_LOG
vlog(LOG_INFO, "wb iterate idx %d lost amount %d seq_num %"PRIu32" lost seq %"PRIu32"", index, packet_sum->lostAmount, flist[index].seq_num, lostSeq);
#endif
if ((flist[index].seq_num >= packet_sum->start_seq) && (flist[index].seq_num <= packet_sum->stop_seq)) {
packet_sum->lostAmount--;
if (lostSeq == flist[index].seq_num) {
lostSeq++;
}
} else if (flist[index].seq_num > packet_sum->stop_seq) {
return lostSeq;
}
index = flist[index].rel_next;
}
return lostSeq;
}
int frame_llist_check_index_range(int index, int memory_size) {
if ((index < 0) || (index >= memory_size)) {
return 1;
}
return 0;
}
#define IF_WRITE_CONDITION timersub(&info.current_time, &shm_conn_info->frames_buf[shm_conn_info->write_buf[logical_channel].frames.rel_head].time_stamp, &packet_wait_tv); \
timersub(&info.current_time, &shm_conn_info->write_buf[logical_channel].last_write_time, &since_write_tv); \
forced_rtt_reached=check_tokens(logical_channel); \
cond_flag = ((shm_conn_info->frames_buf[shm_conn_info->write_buf[logical_channel].frames.rel_head].seq_num == (shm_conn_info->write_buf[logical_channel].last_written_seq + 1))) ? 1 : 0; \
buf_len = shm_conn_info->frames_buf[shm_conn_info->write_buf[logical_channel].frames.rel_tail].seq_num - shm_conn_info->write_buf[logical_channel].last_written_seq; \
if ( shm_conn_info->is_single_channel \
|| (forced_rtt_reached && ( \
cond_flag \
|| (buf_len > lfd_host->MAX_ALLOWED_BUF_LEN) \
|| timercmp(&packet_wait_tv, &((struct timeval) MAX_PACKET_WAIT), >=) \
|| ( timercmp(&packet_wait_tv, &max_latency_drop, >=) \
&& timercmp(&since_write_tv, &shm_conn_info->max_network_stall, >=) ) \
|| (shm_conn_info->frames_buf[shm_conn_info->write_buf[logical_channel].frames.rel_head].seq_num < shm_conn_info->seq_num_unrecoverable_loss) \
)) \
)
int update_prev_flushed(int logical_channel, int fprev) {
if(shm_conn_info->prev_flushed) {
info.flush_sequential +=
shm_conn_info->frames_buf[fprev].seq_num - (shm_conn_info->write_buf[logical_channel].last_written_seq + 1);
} else {
// TODO: write avg stats here?
info.flush_sequential =
shm_conn_info->frames_buf[fprev].seq_num - (shm_conn_info->write_buf[logical_channel].last_written_seq + 1);
}
shm_conn_info->prev_flushed = 1;
}
// return who is lagging.
// NOTE: Need to ensure that we have a missing packet at LWS+1 prior to calling this!
int flush_reason_chan(int status, int logical_channel, char *pname, int chan_mask, int *who_lost_pnum) {
// we let that next seq_num to LWS is lost
uint32_t lost_seq_num = shm_conn_info->write_buf[logical_channel].last_written_seq + 1;
int lrq = 0;
int lagging = 0;
*who_lost_pnum = -1;
// find possible processes
for (int i = 0; i < MAX_TCP_PHYSICAL_CHANNELS; i++) {
if (chan_mask & (1 << i) && (!shm_conn_info->stats[i].channel_dead) && check_rtt_latency_drop_chan(i)) {
if( (status == WHO_LAGGING) && ( (shm_conn_info->write_buf[logical_channel].last_received_seq[i]) < lost_seq_num)) {
if( (shm_conn_info->write_buf[logical_channel].last_received_seq[i]) > lrq) { // we find the most recent one that fulfills the conditions
strcpy(pname, shm_conn_info->stats[i].name);
*who_lost_pnum = i;
lrq = shm_conn_info->write_buf[logical_channel].last_received_seq[i];
}
}
if( (status == WHO_LOST) && (lost_seq_num <= shm_conn_info->write_buf[logical_channel].possible_seq_lost[i])) {
if(shm_conn_info->write_buf[logical_channel].possible_seq_lost[i] > lrq) {
strcpy(pname, shm_conn_info->stats[i].name);
*who_lost_pnum = i;
lrq = shm_conn_info->write_buf[logical_channel].possible_seq_lost[i];
}
}
}
}
// now count only
for (int i = 0; i < MAX_TCP_PHYSICAL_CHANNELS; i++) {
if (chan_mask & (1 << i)) {
if( (status == WHO_LAGGING) && ( (shm_conn_info->write_buf[logical_channel].last_received_seq[i]) < lost_seq_num)) {
lagging++;
}
if( (status == WHO_LOST) && (lost_seq_num <= shm_conn_info->write_buf[logical_channel].possible_seq_lost[i])) {
lagging++;
}
}
}
if(lagging == 0 && status == WHO_LOST) { // fixing WHO_LOST only
// could not detect who lost directly(for example, no seq_num has arrived yet on lossing chan [loss detected by FCI]), doing 'possible' mode
pname[0]='L';
for (int i = 0; i < MAX_TCP_PHYSICAL_CHANNELS; i++) {
if (chan_mask & (1 << i)) {
if( (status == WHO_LOST) && shm_conn_info->write_buf[logical_channel].packet_lost_state[i]) {
strcpy(pname+1, shm_conn_info->stats[i].name);
*who_lost_pnum = i;
lagging++;
}
}
}
}
if(lagging == 0 && status == WHO_LOST) { // fixing WHO_LOST only
// now find last one who lost by possible_seq_lost
pname[0]='p';
unsigned int highest_psl = 0;
for (int i = 0; i < MAX_TCP_PHYSICAL_CHANNELS; i++) {
if (chan_mask & (1 << i)) {
if(shm_conn_info->write_buf[logical_channel].possible_seq_lost[i] > highest_psl) {
highest_psl = shm_conn_info->write_buf[logical_channel].possible_seq_lost[i];
strcpy(pname+1, shm_conn_info->stats[i].name);
*who_lost_pnum = i;
lagging++;
}
}
}
}
return lagging;
}
/********** Linker *************/
/* Termination flag */
static volatile sig_atomic_t linker_term;
void segfault_sigaction(int signal, siginfo_t *si, void *arg)
{
printf("CRITICAL ERROR Caught mem-free segfault at address %p; will continue anyway since we are USS 1408 Enterprise !! q:-)\\-<\n", si->si_addr);
//exit(0);
}
static void sig_term(int sig)
{
//vlog(LOG_INFO, "Get sig_term");
//vlog(LOG_ERR, "Closing connection");
io_cancel();
linker_term = VTUN_SIG_TERM;
}
static void sig_hup(int sig)
{
vlog(LOG_INFO, "Get sig_hup");
vlog(LOG_INFO, "Reestablishing connection");
io_cancel();
linker_term = VTUN_SIG_HUP;
}
static void sig_usr1(int sig)
{
if(!select_check) {
vlog(LOG_ERR, "ASSERT FAILED! SIGUSR1 not in select region!");
}
//vlog(LOG_INFO, "Get sig_usr1, check_shm UP");
//info.check_shm = 1;
}
void sig_send1() {
uint32_t chan_mask = shm_conn_info->channels_mask;
pid_t pid;
for (int i = 0; i < MAX_TCP_PHYSICAL_CHANNELS; i++) {
if ((i == info.process_num) || (!(chan_mask & (1 << i)))) {
continue;
}
sem_wait(&(shm_conn_info->stats_sem));
pid = shm_conn_info->stats[i].pid;
sem_post(&(shm_conn_info->stats_sem));
if (pid != 0 && shm_conn_info->max_chan == i && shm_conn_info->stats[i].hold) {
vlog(LOG_INFO, "Sending signal to unhold HEAD");
kill(pid, SIGUSR1);
}
}
}
/**
* колличество отставших пакетов
* buf[] - номера пакетов
*/
int missing_resend_buffer (int chan_num, uint32_t buf[], int *buf_len, uint32_t seq_limit) {
int i = shm_conn_info->write_buf[chan_num].frames.rel_head, n;
uint32_t isq,nsq, k;
int idx=0;
int blen=0, lws, chs;
if(i == -1) {
*buf_len = 0;
return 0;
}
lws = shm_conn_info->write_buf[chan_num].last_written_seq;
chs = shm_conn_info->frames_buf[i].seq_num;
if( ( (chs - lws) >= FRAME_BUF_SIZE) || ( (lws - chs) >= FRAME_BUF_SIZE)) { // this one will not happen :-\
vlog(LOG_ERR, "WARNING: frame difference too high: last w seq: %"PRIu32" fbhead: %"PRIu32" . FIXED. chs %d<->%d lws cn %d", shm_conn_info->write_buf[chan_num].last_written_seq, shm_conn_info->write_buf[chan_num].frames_buf[i].seq_num, chs, lws, chan_num);
shm_conn_info->write_buf[chan_num].last_written_seq = shm_conn_info->frames_buf[i].seq_num-1;
}
// fix for diff btw start
for(k=1; k<(shm_conn_info->frames_buf[i].seq_num - shm_conn_info->write_buf[chan_num].last_written_seq); k++) {
buf[idx] = shm_conn_info->write_buf[chan_num].last_written_seq + k;
idx++;
//vlog(LOG_INFO, "MRB: found in start : tot %d", idx);
if(idx >= FRAME_BUF_SIZE) {
vlog(LOG_ERR, "WARNING: MRB2 frame difference too high: last w seq: %"PRIu32" fbhead: %"PRIu32" . FIXED. chs %d<->%d lws ch %d", shm_conn_info->write_buf[chan_num].last_written_seq, shm_conn_info->frames_buf[i].seq_num, chs, lws, chan_num);
shm_conn_info->write_buf[chan_num].last_written_seq = shm_conn_info->frames_buf[i].seq_num-1;
idx=0;
break;
}
}
while(i > -1) {
n = shm_conn_info->frames_buf[i].rel_next;
//vlog(LOG_INFO, "MRB: scan1");
if( n > -1 ) {
isq = shm_conn_info->frames_buf[i].seq_num;
nsq = shm_conn_info->frames_buf[n].seq_num;
if(nsq > seq_limit) {
break;
}
//vlog(LOG_INFO, "MRB: scan2 %"PRIu32" > %"PRIu32" +1 ?", nsq, isq);
if(nsq > (isq+1)) {
//vlog(LOG_INFO, "MRB: scan2 yes!");
for(k=1; k<=(nsq-(isq+1)); k++) {
if(idx >= FRAME_BUF_SIZE) {
vlog(LOG_ERR, "WARNING: frame seq_num diff in write_buf > FRAME_BUF_SIZE");
*buf_len = blen;
return idx;
}
buf[idx] = isq+k;
idx++;
//vlog(LOG_INFO, "MRB: found in middle : tot %d", idx);
}
}
}
i = n;
blen++;
}
//vlog(LOG_INFO, "missing_resend_buf called and returning %d %d ", idx, blen);
*buf_len = blen;
return idx;
}
int discard_packets(int chan_num, uint32_t stop_sqn) {
int fprev = shm_conn_info->write_buf[chan_num].frames.rel_head;
uint32_t sqn = shm_conn_info->frames_buf[fprev].seq_num;
int idx = fprev;
int n_idx, cnt=0;
while(sqn < stop_sqn) {
// now jsut discard the packet
cnt++;
vlog(LOG_INFO, "Discarding seq_num %ld cnt %d", sqn, cnt);
shm_conn_info->w_stream_pkts[shm_conn_info->frames_buf[idx].shash % W_STREAMS_AMT]--;
n_idx = shm_conn_info->frames_buf[idx].rel_next;
if(frame_llist_pull(&shm_conn_info->write_buf[chan_num].frames, shm_conn_info->frames_buf, &idx) < 0) {
vlog(LOG_ERR, "WARNING! discard_packets tried to pull from empty write_buf 2!");
return -1;
}
frame_llist_append(&shm_conn_info->wb_free_frames, idx, shm_conn_info->frames_buf);
idx = n_idx;
if(idx == -1) {
vlog(LOG_ERR, "ASSERT FAILED: discard_packets did not find stop_sqn in buffer! %ld", stop_sqn);
return -2;
}
sqn = shm_conn_info->frames_buf[idx].seq_num;
}
return 0;
}
/**
* get how many percent to push every 50ms
*
*/
int calculate_hsqs_percents(int max, int percent_fill) {
return max * percent_fill * percent_fill * percent_fill / 100000;
}
/*
* count amount of packets lost sequentially and evenly
*
* return -1 if packets loss is uneven (like 0 1 1 0 0 1 1 1) or in any other case we will need to wait
* or return amount of packets lost
* TODO: can be optimized by using stored buf_len counter and not running this until buf_len reaches PLOSS_CHECK_PKTS
*
*/
int count_sequential_loss_unsync(int chan_num) {
int i = shm_conn_info->write_buf[chan_num].frames.rel_head, n;
int isq, nsq;
int beg_lost = 0;
int packets_checked = 0;
// count lost at beginning
beg_lost = shm_conn_info->frames_buf[i].seq_num - shm_conn_info->write_buf[chan_num].last_written_seq-1;
if(beg_lost > PLOSS_PSL) return beg_lost; // optimization: no need to calculate further as we already lost too much
if(beg_lost == 0) {
vlog(LOG_ERR, "ASSERT FAILED! beg_lost == 0: should never happen; invoke with packet loss only!");
}
// now count losses over N packets
while((i > -1) && (packets_checked < PLOSS_CHECK_PKTS)) {
n = shm_conn_info->frames_buf[i].rel_next;
if( n > -1 ) {
isq = shm_conn_info->frames_buf[i].seq_num;
nsq = shm_conn_info->frames_buf[n].seq_num;
if(nsq > (isq+1)) {
return -1; // means loss not sequential; need to wait further
}
}
i = n;
packets_checked++;
}
if( packets_checked < PLOSS_CHECK_PKTS ) { // we assume that we've been invoked with at least one packet missing
return -1; // this means packets lost AND checked is not enough to make decision yet
// need to wait further..
}
return beg_lost; // now all checks done, return what we've got
}
int fire_event(int *ecount) {
(*ecount)++;
shm_conn_info->event_mask |= (1 << info.process_num); // warning! some events may not be fired immediately as we may race here
sem_post(&(shm_conn_info->event_sem));
}
int get_wb_oldest_ts_unsync(struct timeval *min_tv) {
int i = shm_conn_info->write_buf[1].frames.rel_head, n;
int packets_checked = 0;
*min_tv = info.current_time;
while((i > -1) && (packets_checked < 100)) {
n = shm_conn_info->frames_buf[i].rel_next;
if( n > -1 && timercmp(&shm_conn_info->frames_buf[n].time_stamp, min_tv, <)) {
*min_tv = shm_conn_info->frames_buf[n].time_stamp;
}
i = n;
packets_checked++;
}
return 0;
}
/* check if we are allowed to drop packet again */
int check_drop_period_unsync() {
struct timeval tv_tm, tv_rtt;
timersub(&info.current_time, &shm_conn_info->drop_time, &tv_tm);
//ms2tv(&tv_rtt, shm_conn_info->stats[info.process_num].exact_rtt);
ms2tv(&tv_rtt, DROP_TIME_IMMUNE/1000); // TODO: unnessessary calculation
if(timercmp(&tv_tm, &tv_rtt, >=)) {
//vlog(LOG_ERR, "Last drop passed: %d ms > rtt %d ms", tv2ms(&tv_tm), tv2ms(&tv_rtt));
return 1;
}
// else
return 0;
}
/* Check if the packet sent right now will be delivered in time */
int check_delivery_time(int mld_divider) {
// RTT-only for now..
// struct timeval max_latency_drop = MAX_LATENCY_DROP;
if(info.head_channel) return 1; // this is required! beware when refactoring!
sem_wait(&(shm_conn_info->stats_sem));
int ret = check_delivery_time_unsynced(mld_divider);
sem_post(&(shm_conn_info->stats_sem));
return ret;
}
// this method is crutial as it controls AG/R_MODE operation while in R_MODE
int check_delivery_time_unsynced(int mld_divider) {
return check_delivery_time_path_unsynced(info.process_num, mld_divider);
}
int check_delivery_time_path_unsynced(int pnum, int mld_divider) {
struct timeval max_latency_drop = info.max_latency_drop;
// check for dead channel
if(shm_conn_info->stats[pnum].channel_dead && (shm_conn_info->max_chan != pnum)) {
// vlog(LOG_ERR, "WARNING check_delivery_time DEAD and not HEAD"); // TODO: out-once this!
return 0;
}
// TODO: re-think this!
if( ( (info.rsr < info.send_q_limit_threshold) || (info.send_q_limit_cubic < info.send_q_limit_threshold)) && (shm_conn_info->max_chan != pnum)) {
vlog(LOG_INFO, "WARNING check_delivery_time RSR %d < THR || CUBIC %d < THR=%d", info.rsr, (int32_t)info.send_q_limit_cubic, info.send_q_limit_threshold);
return 0;
}
if( ((shm_conn_info->stats[pnum].exact_rtt + shm_conn_info->stats[pnum].rttvar) - shm_conn_info->stats[shm_conn_info->max_chan].exact_rtt) > ((int32_t)(tv2ms(&max_latency_drop)/mld_divider + shm_conn_info->forced_rtt)) ) {
// no way to deliver in time
//vlog(LOG_ERR, "WARNING check_delivery_time %d + %d - %d > %d + %d", shm_conn_info->stats[pnum].exact_rtt, shm_conn_info->stats[pnum].rttvar, shm_conn_info->stats[shm_conn_info->max_chan].exact_rtt, (int32_t)(tv2ms(&max_latency_drop)/mld_divider), shm_conn_info->forced_rtt);
return 0;
}
//vlog(LOG_ERR, "CDT OK");
return 1;
}
int check_rtt_latency_drop() { // TODO: remove this dumb method (refactor some code)
return check_rtt_latency_drop_chan(info.process_num);
}
/*
This method allows AG or disallows AG based on latency
*/
int check_rtt_latency_drop_chan(int chan_num) {
struct timeval max_latency_drop = info.max_latency_drop;
if(shm_conn_info->stats[chan_num].channel_dead && (shm_conn_info->max_chan != chan_num)) {
return 0;
}
if(shm_conn_info->stats[chan_num].exact_rtt < RTT_THRESHOLD_GOOD) {
return 1;
}
//int my_rtt = (int)(shm_conn_info->stats[chan_num].exact_rtt + shm_conn_info->stats[chan_num].rttvar);
//int min_rtt = (int)shm_conn_info->stats[shm_conn_info->max_chan].exact_rtt;
if(shm_conn_info->max_allowed_rtt != 0) {
if(info.exact_rtt > (shm_conn_info->max_allowed_rtt + shm_conn_info->stats[shm_conn_info->max_chan].exact_rtt)) {
return 0;
}
} else {
//if(my_rtt > min_rtt * RTT_THRESHOLD_MULTIPLIER) {
return 0;
//}
}
return 1;
}
static inline int add_tokens(int chan_num, int *next_token_ms) {
//shm_conn_info->tokens_in_out = 0;
//int tokens_in_out = 0;
// TODO: may be sync on write_buf is required??
if(shm_conn_info->tokens < 0) {
shm_conn_info->tokens = 0;
}
if(chan_num != 1) {
return 1; // for all other chans (e.g. 0-service channel) return drop allowed
}
int ms_for_token = 1;
//int full_rtt = ((shm_conn_info->forced_rtt_recv > shm_conn_info->frtt_local_applied) ? shm_conn_info->forced_rtt_recv : shm_conn_info->frtt_local_applied);
//int tail_idx = shm_conn_info->write_buf[chan_num].frames.rel_tail;
//int buf_len = shm_conn_info->frames_buf[tail_idx].seq_num - shm_conn_info->write_buf[chan_num].last_written_seq;
//int buf_len = shm_conn_info->write_buf[chan_num].last_received_seq[shm_conn_info->remote_head_pnum] - shm_conn_info->write_buf[chan_num].last_written_seq;
//int tokens_above_thresh = shm_conn_info->tokenbuf - MAX_STUB_JITTER;
//if(tokens_above_thresh < 0) tokens_above_thresh = 0;
int buf_len_real = shm_conn_info->write_buf[chan_num].frames.length + shm_conn_info->write_buf[chan_num].frames.stub_total;
//buf_len = buf_len_real;
//struct timeval packet_dtv;
//int BPCS = 0;
//int head_idx = shm_conn_info->write_buf[chan_num].frames.rel_head;
//struct timeval packet_wait_tv;
/*
int pktdiff = buf_len_real; // current diff is just the real buf_len
// now check rtt
timersub(&info.current_time, &shm_conn_info->frames_buf[head_idx].time_stamp, &packet_wait_tv);
// detect stuck condition
// stuck means that we are not allowed to drop due to packet not available
// whenever we have no packet to drop - we are stuck - even if it is not the time to drop yet
int max_total_rtt = (shm_conn_info->total_max_rtt+shm_conn_info->total_max_rtt_var) - (shm_conn_info->total_min_rtt - shm_conn_info->total_min_rtt_var);
if (shm_conn_info->frames_buf[shm_conn_info->write_buf[chan_num].frames.rel_head].seq_num
!= (shm_conn_info->write_buf[chan_num].last_written_seq + 1)) {
int packet_lag = tv2ms(&packet_wait_tv);
// TODO: unused as this method resulted to failure
// we should rather use this info to continue to smoothly push up the value
//if(shm_conn_info->max_stuck_rtt < packet_lag && packet_lag < max_total_rtt){
// shm_conn_info->max_stuck_rtt = packet_lag;
//}
//if(shm_conn_info->max_stuck_buf_len < pktdiff) shm_conn_info->max_stuck_buf_len = pktdiff;
}
*/
//int pktdiff = shm_conn_info->frames_buf[shm_conn_info->write_buf[i].frames.rel_tail].seq_num - shm_conn_info->write_buf[i].last_written_seq;
//int packet_rtt = tv2ms(&packet_wait_tv) + shm_conn_info->frames_buf[head_idx].current_rtt;
/*
if(packet_rtt < shm_conn_info->max_stuck_rtt) {
shm_conn_info->tokens = 0;
if(shm_conn_info->max_stuck_buf_len < pktdiff) shm_conn_info->max_stuck_buf_len = pktdiff; // TODO: use unconditoinal set or not??
*next_token_ms = shm_conn_info->max_stuck_rtt - packet_rtt;
return 0;
}
*/
//int max_msbl = max_msrt_mul * rtt_min * smooth_ACPS;
// TOP the MSBL TODO: move out of HERE
int max_msbl = MSBL_LIMIT;
if(shm_conn_info->max_stuck_buf_len > max_msbl) {
shm_conn_info->max_stuck_buf_len = max_msbl;
}
/*
if(buf_len_real >= 10) {
timersub(&shm_conn_info->frames_buf[tail_idx].time_stamp, &shm_conn_info->frames_buf[head_idx].time_stamp, &packet_dtv);
int pdms = tv2ms(&packet_dtv);