-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
PoisonIvory.py
100 lines (92 loc) · 3.6 KB
/
PoisonIvory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# ############# .__ ################## .__ ######################## #
# ______ ____ |__| __________ ____ |__| __ _____________ ___.__. #
# \____ \ / _ \| |/ ___/ _ \ / \ | \ \/ / _ \_ __ < | | #
# | |_> > <_> ) |\___ ( <_> ) | \ | |\ ( <_> ) | \/\___ | #
# | __/ \____/|__/____ >____/|___| / |__| \_/ \____/|__| / ____| #
# |__| lite v.30.5.23 \/ \/ © 2008-2023 Volkan Sah \/ #
# https://github.com/VolkanSah/PoisonIvory-lite/
from scapy.all import *
from stem import Signal
from stem.control import Controller
import requests
import re
# Get onion address from monitoring script loaded before
onion_address = "INSERT .ONION ADDRESS HERE" # .onion address you want to monitor
# Set controller
with Controller.from_port(port=9051) as controller:
controller.authenticate()
# Set Stream Listener
class MaliciousTrafficListener(StreamListener):
def __init__(self, keywords):
self.keywords = keywords
def stream_new(self, stream):
if any(re.search(keyword, stream.target_host) for keyword in self.keywords):
print(f"Malicious traffic detected to {stream.target_host}")
# Register the Stream Listener
with Controller.from_port(port=9051) as controller:
controller.authenticate()
listener = MaliciousTrafficListener(["abuse", "child"])
controller.add_event_listener(listener)
input("Press Enter to exit")
def check_if_tor_traffic(packet):
"""
Check if the packet belongs to Tor traffic.
Args:
packet: The packet to be checked.
Returns:
True if the packet belongs to Tor traffic, False otherwise.
"""
if packet.haslayer(TCP) and packet[TCP].dport == 443:
with Controller.from_port(port=9051) as controller:
controller.authenticate()
if controller.get_info("address") == packet[IP].dst:
return True
return False
def intercept_api_requests(request):
"""
Intercept API requests and execute an external script.
Args:
request: The intercepted API request.
"""
if sniff_packets():
# This is where you would load and execute your external script.
# Remember that executing code fetched from the internet can be risky
# e.g external_script_url = "/opt/your_folder/master_script.py"
external_script_url = "external_script.py"
response = requests.get(external_script_url)
if response.status_code == 200:
exec(response.text)
else:
print("Failed to load the external script.")
def sniff_packets():
"""
Sniff packets and filter for Tor traffic.
Returns:
The sniffed packets.
"""
packets = sniff(filter="tcp and (port 9050 or port 9051)", prn=check_if_tor_traffic)
return packets
def get_circuit_hops():
"""
Get the current circuit and its hops.
Returns:
The circuit hops.
"""
with Controller.from_port(port=9051) as controller:
controller.authenticate()
circuit_id = controller.get_circuit_id()
hops = controller.get_circuit(circuit_id).path
return hops
def exclude_malicious_relays():
"""
Add malicious relays to the circuit blacklist.
"""
hops = get_circuit_hops()
malicious_relays = ["FINGERPRINT1", "FINGERPRINT2"] # Add the fingerprints of malicious relays
with Controller.from_port(port=9051) as controller:
controller.authenticate()
for hop in hops:
if hop.fingerprint in malicious_relays:
controller.set_conf(f"ExcludeExitNodes {hop.fingerprint}")
# Example usage
exclude_malicious_relays()