-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathnet-traffic
executable file
·114 lines (94 loc) · 3.79 KB
/
net-traffic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/python3
# Copyright 2023 Robert Krawitz/Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pathlib import Path
import argparse
import fcntl
import os
import socket
import struct
import sys
import time
NETPATH = '/sys/class/net'
STATS = ['rx_bytes', 'rx_packets', 'tx_bytes', 'tx_packets']
DEFAULT_INTERVAL = 5
DEFAULT_COUNT = -1
stat_last_values = {}
SOCKET = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
parser = argparse.ArgumentParser(description='Analyze network traffic')
parser.add_argument('-a', '--all', action='store_true', help='Monitor all interfaces')
parser.add_argument('-p', '--physical', '--physical-only', action='store_true', help='Monitor physical interfaces')
parser.add_argument('-v', '--virtual', '--virtual-only', action='store_true', help='Monitor virtual interfaces')
parser.add_argument('-i', '--interface', action='append', help='Monitor specific interface(s)')
parser.add_argument('interval', nargs='?')
parser.add_argument('count', nargs='?')
args = parser.parse_args()
interval = float(args.interval) if args.interval is not None else DEFAULT_INTERVAL
count = int(args.count) + 1 if args.count is not None else DEFAULT_COUNT
def net_is_physical(net: str):
net_path = os.path.join(NETPATH, net)
if Path(net_path).is_symlink():
net_path = os.readlink(net_path)
return 'virtual' not in net_path
def get_ip_address(ifname: str):
try:
# 0x8915 is SIOCGIFADDR
return socket.inet_ntoa(fcntl.ioctl(SOCKET.fileno(), 0x8915, struct.pack('256s', bytes(ifname, 'ascii')[:15]))[20:24])
except (KeyboardInterrupt, BrokenPipeError):
sys.exit()
except Exception:
return None
def net_is_desired(net: str):
if not args.all and not get_ip_address(net.name):
return False
if args.interface:
return net in args.interface
if args.all:
return True
if args.virtual and not net_is_physical(net):
return True
if args.physical and net_is_physical(net):
return True
return False
def get_networks():
with os.scandir(NETPATH) as n:
return [net.name for net in n if net_is_desired(net)]
def print_stat(metric, net: str = None, interval: float = None):
with open(os.path.join(NETPATH, net, 'statistics', metric), 'r') as f:
value = int(f.read())
if net not in stat_last_values:
stat_last_values[net] = {}
if metric in stat_last_values[net]:
answer = (value - stat_last_values[net][metric]) / interval
else:
answer = float(value)
stat_last_values[net][metric] = value
return answer
try:
print(f'{"%-15s" % ("Device")}' + '\t'.join([f'{"%15s" % (metric + "/s")}' for metric in STATS]))
last_time = 0.0
while count != 0:
# The list of networks might change over time.
networks_to_scan = get_networks()
cur_time = time.time_ns() / (1000 * 1000 * 1000)
print('\n'.join([f'{"%-15s" % (net)}' +
'\t'.join([f'{"%15d" % (print_stat(metric, net, cur_time - last_time))}'
for metric in STATS])
for net in networks_to_scan]) + '\n')
count -= 1
if count != 0:
time.sleep(interval - min(0, (cur_time - last_time)))
last_time = cur_time
except (KeyboardInterrupt, BrokenPipeError):
sys.exit()