Skip to content

Commit

Permalink
Add multithreading for Sentinel API requests. Update REMOTE_URL on di…
Browse files Browse the repository at this point in the history
…fference in API and DB. Add check to only update changed nodes. Fix str typecast from int.
  • Loading branch information
freQniK committed Feb 18, 2024
1 parent 961d45e commit 70e0519
Showing 1 changed file with 86 additions and 41 deletions.
127 changes: 86 additions & 41 deletions meile_intelligence.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@

APIKEYS = scrtsxx.IP_REGISTRY_API_KEYS

VERSION = 20240121.2251
VERSION = 20240217.1659
APIURL = 'https://api.sentinel.mathnodes.com'

IPREGISTRY_URL = "https://api.ipregistry.co/%s?key=%s"

class UpdateNodeType():
NodeAPIurl = {}

def connDB(self):
db = pymysql.connect(host=scrtsxx.HOST,
Expand All @@ -41,47 +42,55 @@ def get_node_type_table(self,db):


def get_ip_of_node(self, db, NodeData):
NodeDBIP = {}
NodeIP = {}
NodeIPURLChanged = {}

c = db.cursor()

self.api_rurl_multithread(NodeData)

for n in NodeData:
address = n['node_address']
endpoint = APIURL + '/sentinel/nodes/' + address


# Retrieve remote_url from the table for nodes that have it stored
query = f"SELECT remote_url FROM node_uptime WHERE node_address = '{address}';"
c.execute(query)
result = c.fetchone()
#print(result['remote_url'])
if not result['remote_url'] or result['remote_url'] == '':

endpoint = APIURL + '/nodes/' + address
remote_url = result['remote_url'].split('//')[-1].split(':')[0]
#print(f"Getting remote_url of: {address}", end=":")

try:
r = requests.get(endpoint)
remote_url = r.json()['node']['remote_url'].split('//')[-1].split(':')[0]
except Exception as e:
print(str(e))
continue
#print(f"{remote_url}")
else:
remote_url = result['remote_url'].split('//')[-1].split(':')[0]
try:
db_rurl = result['remote_url']
except:
db_rurl = ""

NodeIP[n['node_address']] = remote_url
try:
NodeIP[n['node_address']] = ipaddress.ip_address(remote_url)
except ValueError:
try:
NodeIP[n['node_address']] = socket.gethostbyname(remote_url)
except socket.gaierror:
continue
#print(f"{n['node_address']},{NodeIP[n['node_address']]}")
#print(NodeRemoteURL)

return NodeIP
NodeDBIP[address] = db_rurl
try:
if NodeDBIP[address] != self.NodeAPIurl[address]:
self.__UpdateUptimeTable(db, address, self.NodeAPIurl[address])
remote_url = self.NodeAPIurl[address].split('//')[-1].split(':')[0]
try:
NodeIPURLChanged[address] = ipaddress.ip_address(remote_url)
except ValueError:
try:
NodeIPURLChanged[address] = socket.gethostbyname(remote_url)
except socket.gaierror:
continue

else:
remote_url = NodeDBIP[address].split('//')[-1].split(':')[0]
try:
NodeIP[address] = ipaddress.ip_address(remote_url)
except ValueError:
try:
NodeIPURLChanged[address] = socket.gethostbyname(remote_url)
except socket.gaierror:
continue
except Exception as e:
print(f"{n}:{str(e)}")
continue

return NodeIP, NodeIPURLChanged


def check_asn_null(self, db, node_address):
Expand All @@ -93,18 +102,40 @@ def check_asn_null(self, db, node_address):
return True
return False

def api_rurl_multithread(self, NodeData):
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# Submit tasks in batches of 3
futures = [executor.submit(self.__api_url_worker, node['node_address']) for node in NodeData]

# Wait for all tasks to complete
concurrent.futures.wait(futures)

def ip_registry_multithread(self, db, NodeIP):
def ip_registry_multithread(self, db, NodeIP, changed):
print(NodeIP)
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks in batches of 3
futures = [executor.submit(self.__ip_registry_worker, node, ip, db) for node, ip in NodeIP.items()]
futures = [executor.submit(self.__ip_registry_worker, node, ip, db, changed) for node, ip in NodeIP.items()]

# Wait for all tasks to complete
concurrent.futures.wait(futures)

def __ip_registry_worker(self, node, ip, db):
if not self.check_asn_null(db, node):
return

def __api_url_worker(self, address):
endpoint = APIURL + '/sentinel/nodes/' + address

try:
r = requests.get(endpoint)
api_rurl = r.json()['node']['remote_url']
except Exception as e:
print(f"API URL WORKER ERROR: {str(e)}")
api_rurl = ""

self.NodeAPIurl[address] = api_rurl


def __ip_registry_worker(self, node, ip, db, changed):
if not changed:
if not self.check_asn_null(db, node):
return

N = random.randint(0,len(APIKEYS)-1)
API_KEY = APIKEYS[N]
Expand All @@ -117,6 +148,9 @@ def __ip_registry_worker(self, node, ip, db):
print(str(e))
return
try:
ASN = "AS" + str(rJSON['connection']['asn'])
ISP = rJSON['connection']['organization']

if rJSON['security']['is_cloud_provider']:
TYPE['hosting'] = True

Expand All @@ -142,24 +176,35 @@ def __ip_registry_worker(self, node, ip, db):

for k,v in TYPE.items():
if v:
self.__UpdateNodeTypeTable(db, node,k)
except KeyError:
self.__UpdateNodeTypeTable(db, node,k, ASN, ISP)
except KeyError as e:
print(str(e))
pass

def __UpdateNodeTypeTable(self, db, node, ntype):
def __UpdateNodeTypeTable(self, db, node, ntype, asn, isp):

query = 'UPDATE node_score SET isp_type = "%s" WHERE node_address = "%s";' % (ntype, node)
query = 'UPDATE node_score SET asn = "%s", isp = "%s", isp_type = "%s" WHERE node_address = "%s";' % (asn, isp, ntype, node)
print(query)
c = db.cursor();
c.execute(query)
db.commit()

def __UpdateUptimeTable(self, db, node, rurl):
query = 'UPDATE node_uptime SET remote_url = "%s" WHERE node_address = "%s";' % (rurl, node)
print(f"Updating node_uptime:\n {query}")

c = db.cursor()
c.execute(query)
db.commit()

if __name__ == "__main__":
NType = UpdateNodeType()
db = NType.connDB()
NodeData = NType.get_node_type_table(db)
NodeIP = NType.get_ip_of_node(db, NodeData)
NType.ip_registry_multithread(db, NodeIP)
NodeIP, URLsChanged = NType.get_ip_of_node(db, NodeData)
NType.ip_registry_multithread(db, NodeIP, False)
print("------------------Computing URLs Changed---------------------------")
NType.ip_registry_multithread(db, URLsChanged, True)



Expand Down

0 comments on commit 70e0519

Please sign in to comment.