-
Notifications
You must be signed in to change notification settings - Fork 0
/
pingsweep.py
135 lines (109 loc) Β· 3.39 KB
/
pingsweep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from argparse import ArgumentParser
from asyncio import Queue, create_task, gather, run
from datetime import datetime
from ipaddress import IPv4Network
from logging import StreamHandler, basicConfig, getLogger, info
from os import cpu_count
from icmplib import async_ping
def time():
return datetime.now().replace(microsecond=0)
def arguments():
tasks = int(cpu_count() * 2.5)
parser = ArgumentParser()
parser.add_argument(
"range",
help="network id and subnet bits, e.g. 192.168.1.0/24",
type=IPv4Network,
)
parser.add_argument(
"-q",
"--quiet",
action="store_true",
help="only print hosts to screen",
)
parser.add_argument(
"-w",
"--wait",
type=float,
metavar="sec",
default=1,
help="timeout in seconds to wait for each reply (default: 1s)",
)
parser.add_argument(
"-t",
"--tasks",
type=int,
metavar="num",
default=tasks,
help=f"number of concurrent tasks (default: {tasks})",
)
parser.add_argument(
"-o",
"--output",
type=str,
metavar="path",
help="output to a file",
)
args = parser.parse_args()
return args
class PingSweeper():
def __init__(self):
self.tasks = []
self.success = 0
self.failed = 0
self.args = arguments()
self.queue = Queue()
basicConfig(
filename=self.args.output,
filemode="w",
format="%(message)s",
level=20, # INFO
)
if self.args.output is not None:
# Re-enable the logger to logg to the screen if output path is given.
getLogger().addHandler(StreamHandler())
async def sweeper(self, queue: Queue, wait: float):
while True:
host = await async_ping(await queue.get(), count=1, timeout=wait)
if not host.is_alive:
self.failed += 1
else:
self.success += 1
info(host.address)
queue.task_done()
async def run(self):
if not self.args.quiet:
info(
f"\n> IP Range: {self.args.range}"
f"\n> Tasks: {self.args.tasks}"
f"\n> Quiet: {self.args.quiet}"
f"\n> Wait: {self.args.wait}s\n",
)
# Generate a number of hosts from the given network id and subnet bits.
for host in self.args.range.hosts():
# And append each host to the queue.
self.queue.put_nowait(host.exploded)
start = time()
for _ in range(self.args.tasks):
task = create_task(self.sweeper(self.queue, self.args.wait))
self.tasks.append(task)
await self.queue.join()
for task in self.tasks:
task.cancel()
await gather(*self.tasks, return_exceptions=True)
end = time()
if not self.args.quiet:
info(
f"\nFinished in: {end - start} | "
f"Success: {self.success}, "
f"Failed: {self.failed}, "
f"Total: {self.success + self.failed}",
)
async def main():
pingSweeper = PingSweeper()
await pingSweeper.run()
if __name__ == "__main__":
try:
run(main())
except KeyboardInterrupt:
print("Keyboard Interrupt")