-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathsqmfeedback.erl
executable file
·266 lines (220 loc) · 9.43 KB
/
sqmfeedback.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
-module(sqmfeedback).
-export([main/0,pingtimes_ms/3,monitor_ifaces/3,monitor_a_site/1,adjuster/1,monitor_delays/2,timer_process/2,monitor_bw/2 ]).
%% Read a line of ping output and extract the time in milliseconds
pingline_to_ms(Line) ->
case re:replace(Line,".*time=([0-9.]+) ms","\\1") of
[[L]] ->
try binary_to_float(L)
catch
error:_Reason ->
try float(binary_to_integer(L))
catch
error:_ -> false
end
end;
_ -> false
end.
%% ping an Internet site I, N times, using packets sized S bytes and
%% extract the ping times into a list of floating point times in ms
pingtimes_ms(I,N,S) ->
Pingstr = os:cmd(io_lib:format("ping -c ~B -i 0.2 -s ~B ~s",[N,S,I])),
[_|Lines]=string:split(Pingstr,"\n",all),
TT = [pingline_to_ms(L) || L <- Lines],
Times = lists:sort([T || T <-TT, is_float(T)]),
io:format("ping ~s with results: ~w\n",[I,Times]),
Times.
%% change the bandwidth on a given interface to NewBW by using the Cmd
%% as a format string and then running the command, also print the
%% command for debugging
new_bandwidth(Cmd,NewBW) ->
os:cmd(io_lib:format(Cmd,[NewBW])),
io:format(Cmd ++"\n",[NewBW]).
%% repeatedly select a random delay, and then sleep that long and try
%% pinging a given site and report to our parent PID if we detect a
%% delay. Throw away the shortest ping, keep the next 5 as our
%% baseline. Always comparing the 3rd ping to the highest ping to
%% detect delay.
monitor_a_site(Rpid,Name,N,Inc,FiveTimes) ->
T = rand:uniform()*20+10, % sleep 10 to 30 seconds
erlang:send_after(round(T*1000),self(),go),
receive
_ ->
try pingtimes_ms(Name,N,50) of
[] ->
monitor_a_site(Rpid,Name,N,Inc,FiveTimes);
MS ->
Times = lists:sort(lists:append(FiveTimes,MS)),
Delay= lists:last(Times) - lists:nth(3,Times),
if
Delay > 20.0 ->
Rpid ! {delay,Name,Delay,erlang:system_time(seconds)};
true ->
true
end,
monitor_a_site(Rpid,Name,N,Inc,lists:sublist(Times,2,5))
% throw out the lowest, keep the next 5;
catch
error:_ERR -> monitor_a_site(Rpid,Name,N,Inc,FiveTimes);
exit:_ -> monitor_a_site(Rpid,Name,N,Inc,FiveTimes);
throw:_ -> monitor_a_site(Rpid,Name,N,Inc,FiveTimes)
end
end.
%% gets us started in the loop, by trying to get a minimum of 5 pings
%% as our initial baseline
monitor_a_site(Monitor) ->
{RPid,Name} = Monitor,
try pingtimes_ms(Name,5,50) of
T ->
monitor_a_site(RPid,Name,5,20,T)
catch
error:_ -> monitor_a_site(Monitor)
end.
read_bw(Iface) ->
{ok,IoDevRx} = file:open("/sys/class/net/" ++ Iface ++ "/statistics/rx_bytes",read),
{ok,IoDevTx} = file:open("/sys/class/net/" ++ Iface ++ "/statistics/tx_bytes",read),
{ok,[Rxbytes]} = io:fread(IoDevRx,"","~d"),
{ok,[Txbytes]} = io:fread(IoDevTx,"","~d"),
file:close(IoDevRx),
file:close(IoDevTx),
{Rxbytes,Txbytes}.
monitor_bw(Iface,AdjPid,LastMSecs,Rxbytes,Txbytes) ->
receive
{timer,MSecs} ->
{NewRx,NewTx} = read_bw(Iface),
RxBW = 8 * (NewRx - Rxbytes) / (MSecs - LastMSecs), % kBps
TxBW = 8 * (NewTx - Txbytes) / (MSecs - LastMSecs),
AdjPid ! {bandwidths,RxBW,TxBW,MSecs},
%io:format("Current bandwidth: ~f, ~f kBps\n",[RxBW,TxBW]),
monitor_bw(Iface,AdjPid,MSecs,NewRx,NewTx);
_ ->
io:format("wth?\n"),
monitor_bw(Iface,AdjPid,LastMSecs,Rxbytes,Txbytes)
end.
monitor_bw(Iface,AdjPid) ->
{Rxbytes,Txbytes} = read_bw(Iface),
Now = erlang:system_time(millisecond),
monitor_bw(Iface,AdjPid,Now,Rxbytes,Txbytes).
%% run this in a thread, we wait for our pingers to report delays to
%% us. We keep track of delays and a separate thread gives us timer
%% interrupts where we regularly check if a sufficient number of sites
%% experience a delay in the last 30 seconds, we adjust the bandwidth
%% down, otherwise we adjust the bandwidth up.
monitor_delays(RepPid, Sites) ->
receive
{delay,Site,Delay,Time} ->
NewSites=[{Site,Delay,Time}|Sites],
monitor_delays(RepPid,NewSites);
{timer,_Time} ->
io:format("Checking up on things: ~B\n",[erlang:system_time(seconds)]),
Now=erlang:system_time(seconds),
RecentSites = [{Site, Del, T} || {Site,Del,T} <- Sites, T > Now-30],
io:format("Full Delayed Site List: ~w\n",[Sites]),
io:format("Recent Delayed Site List: ~w\n",[RecentSites]),
%% use random scaling factors, but make sure down followed
%% by up averages slightly less than 1, based on
%% simulations this averages around .98... this ensures
%% we don't grow too fast.
RNG = rand:uniform(),
if (length(RecentSites) > 3) and (RNG < 1.0/20) ->
Factor = rand:uniform() * 0.15 + 0.85,
RepPid ! {factor,Factor};
RNG < 1.0/20 -> %% about every 20 seconds on average
Factor = rand:uniform() * 0.12 + 1.0,
RepPid ! {factor,Factor};
true -> true
end,
monitor_delays(RepPid,RecentSites)
end.
%% When the monitor_delays thread detects a need to change the
%% bandwidth it fires off a message to a thread running this code,
%% where it just multiplies the bandwidth by a random factor and then
%% calls the OS to change the bandwidth. Note that we keep track of
%% bandwidth in integer Kbps units so we round the number.
adjuster(Tuples) ->
[{I1,L1,C1,H1},{I2,L2,C2,H2}] = Tuples, %% teh L1,C1 etc are upload, L2,C2 etc are download
receive
{bandwidths,RxBW,TxBW,MSecs} ->
io:format("Received bandwidth report: Rx: ~f, Tx: ~f at ~w\n",[RxBW,TxBW,MSecs]),
%% here is where we need to make decisions about bandwidth relevant changes
%% for now, if current bandwidth is lower than 1/2 the current set point,
%% we're going to decay down until we either hit the low end, or we're less than 2x the current
%% usage. This ensures we're never super high when a bandwidth spike occurs, causing bbloat
RND = rand:uniform(),
if
(((C2 > L2) and (RxBW < C2/2)) or ((C1 > L1) and (TxBW < C1/2))) and (RND < 1.0/20) ->
self() ! {factor,0.85} ;
true -> true
end,
adjuster(Tuples);
{factor,F} ->
CC1 = round(min(max(L1,C1*F),H1)),
CC2 = round(min(max(L2,C2*F),H2)),
new_bandwidth(I1,CC1),
new_bandwidth(I2,CC2),
adjuster([{I1,L1,CC1,H1},{I2,L2,CC2,H2}])
end.
%% this is a process that just tells the monitor process to check for
%% sufficient delay on a regular basis. P1 is the adjuster process, P2
%% is the bw estimator
timer_process(P1,P2) ->
erlang:send_after(1000,P1,{timer,erlang:system_time(seconds)}),
erlang:send_after(1000,P2,{timer,erlang:system_time(millisecond)}),
erlang:send_after(1000,self(),go),
receive
_ -> timer_process(P1,P2)
end.
%% a supervisory thread that fires off processes to do all the
%% pinging, and checking and timing etc, and then restarts them if
%% they die.
monitor_ifaces(Tuples,Sites,Iface) ->
AdjPid = proc_lib:spawn_link(sqmfeedback,adjuster,[Tuples]),
MonPid = proc_lib:spawn_link(sqmfeedback,monitor_delays,[AdjPid,[]]),
BwMonPid = proc_lib:spawn_link(sqmfeedback,monitor_bw,[Iface,AdjPid]),
SitePids = lists:map(fun(Tup) ->
{_,Site} = Tup,
{proc_lib:spawn_link(sqmfeedback,monitor_a_site,[Tup]),Site} end,
[{MonPid,S}||S <- Sites]),
TimerPid = proc_lib:spawn_link(sqmfeedback,timer_process,[MonPid,BwMonPid]),
process_flag(trap_exit,true), %% we want to hear about exits
monitor_ifaces(Tuples,Sites,SitePids,AdjPid,MonPid,BwMonPid,TimerPid,Iface).
monitor_ifaces(Tuples,Sites,SitePids,AdjPid,MonPid,BwMonPid,TimerPid,Iface) ->
receive
{'EXIT',Pid, _Reason} ->
DeadSite = [ S || {P,S} <- Sites,P == Pid],
if
DeadSite /= [] ->
{Pid,Site} = lists:nth(1,DeadSite),
NewSitePid = proc_lib:spawn_link(sqmfeedback,monitor_a_site,{MonPid,Site}),
monitor_ifaces(Tuples,Sites,[{NewSitePid,Site}|Sites--[Pid]],
AdjPid,MonPid,BwMonPid,TimerPid,Iface);
Pid == AdjPid ->
NewAdj = proc_lib:spawn_link(sqmfeedback,adjuster,[Tuples]),
monitor_ifaces(Tuples,Sites,SitePids,NewAdj,MonPid,BwMonPid,TimerPid,Iface);
Pid == MonPid ->
NewMon = proc_lib:spawn_link(sqmfeedback,monitor_delays,[AdjPid,[]]),
monitor_ifaces(Tuples,Sites,SitePids,AdjPid,NewMon,BwMonPid,TimerPid,Iface);
Pid == TimerPid ->
io:format("respawning timer:\n"),
NewTimer = proc_lib:spawn_link(sqmfeedback,timer_process,[MonPid,BwMonPid]),
monitor_ifaces(Tuples,Sites,SitePids,AdjPid,MonPid,BwMonPid,NewTimer,Iface);
Pid == BwMonPid ->
io:format("respawning bw usage monitor;\n"),
NewBwMon = proc_lib:spawn_link(sqmfeedback,monitor_bw,[Iface,MonPid]),
monitor_ifaces(Tuples,Sites,SitePids,AdjPid,MonPid,NewBwMon,TimerPid,Iface);
true -> true
end;
_ -> monitor_ifaces(Tuples,Sites,SitePids,AdjPid,MonPid,BwMonPid,TimerPid,Iface)
end.
%% entry point to the code, adjust the interface names and bandwidths
%% here. the 3 bandwidths are min, start, and max values for the
%% interface in *integer* Kbps units. You can also select different
%% sites to monitor with pings
main() ->
monitor_ifaces([{"tc qdisc change root dev eth0.2 cake bandwidth ~BKbit diffserv4 dual-srchost overhead 34 ", 4000, 6000, 8000},
{"tc qdisc change root dev ifb4eth0.2 cake bandwidth ~BKbit diffserv4 dual-dsthost nat overhead 34 ingress",15000,30000,35000}],
["dns.google.com","one.one.one.one","quad9.net","facebook.com",
"gstatic.com","cloudflare.com","fbcdn.com","akamai.com","amazon.com"],
"lan"),
receive
_ -> true
end.