-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(GTI): Private URL scanning #117
Changes from 7 commits
fabd756
ae36ed6
8ba853d
b201ea4
207e453
f09316a
8b9e8a5
d0bfd33
f257336
95a8f1a
b04ae4d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -170,6 +170,17 @@ def url(self, url: str, relationships: str = ''): | |
ok_codes=(404, 429, 200) | ||
) | ||
|
||
def private_url(self, url: str): | ||
""" | ||
See Also: | ||
https://gtidocs.virustotal.com/reference/get-a-private-url-analysis-report | ||
""" | ||
return self._http_request( | ||
'GET', | ||
f'private/urls/{encode_url_to_base64(url)}', | ||
ok_codes=(404, 429, 200) | ||
) | ||
|
||
def domain(self, domain: str, relationships: str = '') -> dict: | ||
""" | ||
See Also: | ||
|
@@ -362,7 +373,7 @@ def private_file_scan(self, file_path: str) -> dict: | |
resp_type='response' | ||
) | ||
demisto.debug( | ||
f'scan_file response:\n' | ||
f'scan_private_file response:\n' | ||
f'{str(response.status_code)=}, {str(response.headers)=}, {str(response.content)}' | ||
) | ||
return response.json() | ||
|
@@ -398,6 +409,17 @@ def url_scan(self, url: str) -> dict: | |
data={'url': url} | ||
) | ||
|
||
def private_url_scan(self, url: str) -> dict: | ||
""" | ||
See Also: | ||
https://gtidocs.virustotal.com/reference/private-scan-url | ||
""" | ||
return self._http_request( | ||
'POST', | ||
'/private/urls', | ||
data={'url': url} | ||
) | ||
|
||
# endregion | ||
|
||
def file_sandbox_report(self, file_hash: dict, limit: int) -> dict: | ||
|
@@ -454,14 +476,14 @@ def get_private_analysis(self, analysis_id: str) -> dict: | |
f'private/analyses/{analysis_id}' | ||
) | ||
|
||
def get_private_file_from_analysis(self, analysis_id: str) -> dict: | ||
def get_private_item_from_analysis(self, analysis_id: str) -> dict: | ||
""" | ||
See Also: | ||
https://gtidocs.virustotal.com/reference/analysesidrelationship | ||
""" | ||
return self._http_request( | ||
'GET', | ||
f'private/analyses/{analysis_id}/item?attributes=threat_severity,threat_verdict' | ||
f'private/analyses/{analysis_id}/item' | ||
) | ||
|
||
def get_file_sigma_analysis(self, file_hash: str) -> dict: | ||
|
@@ -1410,6 +1432,37 @@ def build_url_output( | |
) | ||
|
||
|
||
def build_private_url_output(url: str, raw_response: dict) -> CommandResults: | ||
data = raw_response.get('data', {}) | ||
attributes = data.get('attributes', {}) | ||
|
||
last_analysis_stats = attributes.get('last_analysis_stats', {}) | ||
positive_detections = last_analysis_stats.get('malicious', 0) | ||
detection_engines = sum(last_analysis_stats.values()) | ||
|
||
return CommandResults( | ||
outputs_prefix=f'{INTEGRATION_ENTRY_CONTEXT}.URL', | ||
outputs_key_field='id', | ||
readable_output=tableToMarkdown( | ||
f'URL data of "{url}"', | ||
{ | ||
**attributes, | ||
'positives': f'{positive_detections}/{detection_engines}', | ||
}, | ||
headers=[ | ||
'url', | ||
'title', | ||
'last_http_response_content_sha256', | ||
'positives', | ||
], | ||
removeNull=True, | ||
headerTransform=string_to_table_header | ||
), | ||
outputs=data, | ||
raw_response=raw_response, | ||
) | ||
|
||
|
||
def build_ip_output( | ||
client: Client, | ||
score_calculator: ScoreCalculator, | ||
|
@@ -1784,7 +1837,7 @@ def private_file_command(client: Client, args: dict) -> List[CommandResults]: | |
execution_metrics.success += 1 | ||
except Exception as exc: | ||
# If anything happens, just keep going | ||
demisto.debug(f'Could not process file: "{file}"\n {str(exc)}') | ||
demisto.debug(f'Could not process private file: "{file}"\n {str(exc)}') | ||
execution_metrics.general_error += 1 | ||
results.append(build_error_file_output(client, file)) | ||
continue | ||
|
@@ -1803,6 +1856,7 @@ def url_command(client: Client, score_calculator: ScoreCalculator, args: dict, r | |
extended_data = argToBoolean(args.get('extended_data', False)) | ||
results: List[CommandResults] = [] | ||
execution_metrics = ExecutionMetrics() | ||
|
||
for url in urls: | ||
try: | ||
raw_response = client.url(url, relationships) | ||
|
@@ -1828,6 +1882,39 @@ def url_command(client: Client, score_calculator: ScoreCalculator, args: dict, r | |
return results | ||
|
||
|
||
def private_url_command(client: Client, args: dict) -> List[CommandResults]: | ||
""" | ||
1 API Call | ||
""" | ||
urls = argToList(args['url']) | ||
results: List[CommandResults] = [] | ||
execution_metrics = ExecutionMetrics() | ||
|
||
for url in urls: | ||
try: | ||
raw_response = client.private_url(url) | ||
if raw_response.get('error', {}).get('code') == 'QuotaExceededError': | ||
execution_metrics.quota_error += 1 | ||
results.append(build_quota_exceeded_url_output(client, url)) | ||
continue | ||
if raw_response.get('error', {}).get('code') == 'NotFoundError': | ||
results.append(build_unknown_url_output(client, url)) | ||
continue | ||
except Exception as exc: | ||
# If anything happens, just keep going | ||
demisto.debug(f'Could not process private URL: "{url}".\n {str(exc)}') | ||
execution_metrics.general_error += 1 | ||
results.append(build_error_url_output(client, url)) | ||
continue | ||
execution_metrics.success += 1 | ||
results.append(build_private_url_output(url, raw_response)) | ||
if execution_metrics.is_supported(): | ||
_metric_results = execution_metrics.metrics | ||
metric_results = cast(CommandResults, _metric_results) | ||
results.append(metric_results) | ||
return results | ||
|
||
|
||
def domain_command(client: Client, score_calculator: ScoreCalculator, args: dict, relationships: str) -> List[CommandResults]: | ||
""" | ||
1 API Call for regular | ||
|
@@ -2031,7 +2118,6 @@ def file_scan_and_get_analysis( | |
def private_file_scan_and_get_analysis(client: Client, args: dict): | ||
"""Calls to gti-privatescanning-file-scan and gti-privatescanning-analysis-get.""" | ||
interval = int(args.get('interval_in_seconds', 60)) | ||
extended = argToBoolean(args.get('extended_data', False)) | ||
|
||
if not args.get('id'): | ||
command_results = private_file_scan(client, args) | ||
|
@@ -2046,7 +2132,6 @@ def private_file_scan_and_get_analysis(client: Client, args: dict): | |
'entryID': args.get('entryID'), | ||
'id': outputs.get('vtScanID'), | ||
'interval_in_seconds': interval, | ||
'extended_data': extended, | ||
}, | ||
timeout_in_seconds=6000, | ||
) | ||
|
@@ -2066,7 +2151,6 @@ def private_file_scan_and_get_analysis(client: Client, args: dict): | |
'entryID': args.get('entryID'), | ||
'id': outputs.get('id'), | ||
'interval_in_seconds': interval, | ||
'extended_data': extended, | ||
}, | ||
timeout_in_seconds=6000, | ||
) | ||
|
@@ -2122,6 +2206,50 @@ def url_scan_and_get_analysis( | |
return CommandResults(scheduled_command=scheduled_command) | ||
|
||
|
||
def private_url_scan_and_get_analysis( | ||
client: Client, | ||
args: dict | ||
): | ||
"""Calls to gti-privatescanning-url-scan and gti-privatescanning-analysis-get.""" | ||
interval = int(args.get('interval_in_seconds', 60)) | ||
|
||
if not args.get('id'): | ||
command_result = private_scan_url_command(client, args) | ||
outputs = command_result.outputs | ||
if not isinstance(outputs, dict): | ||
raise DemistoException('outputs is expected to be a dict') | ||
scheduled_command = ScheduledCommand( | ||
command=f'{COMMAND_PREFIX}-private-url-scan-and-analysis-get', | ||
next_run_in_seconds=interval, | ||
args={ | ||
'url': args.get('url'), | ||
'id': outputs.get('vtScanID'), | ||
'interval_in_seconds': interval, | ||
}, | ||
timeout_in_seconds=6000, | ||
) | ||
command_result.scheduled_command = scheduled_command | ||
return command_result | ||
|
||
command_result = private_get_analysis_command(client, args) | ||
outputs = command_result.outputs | ||
if not isinstance(outputs, dict): | ||
raise DemistoException('outputs is expected to be a dict') | ||
if outputs.get('data', {}).get('attributes', {}).get('status') == 'completed': | ||
return command_result | ||
scheduled_command = ScheduledCommand( | ||
command=f'{COMMAND_PREFIX}-private-url-scan-and-analysis-get', | ||
next_run_in_seconds=interval, | ||
args={ | ||
'url': args.get('url'), | ||
'id': outputs.get('id'), | ||
'interval_in_seconds': interval, | ||
}, | ||
timeout_in_seconds=6000, | ||
) | ||
return CommandResults(scheduled_command=scheduled_command) | ||
|
||
|
||
def get_upload_url(client: Client) -> CommandResults: | ||
""" | ||
1 API Call | ||
|
@@ -2143,6 +2271,20 @@ def get_upload_url(client: Client) -> CommandResults: | |
|
||
|
||
def scan_url_command(client: Client, args: dict) -> CommandResults: | ||
""" | ||
1 API Call | ||
""" | ||
return scan_url(client, args) | ||
|
||
|
||
def private_scan_url_command(client: Client, args: dict) -> CommandResults: | ||
""" | ||
1 API Call | ||
""" | ||
return scan_url(client, args, True) | ||
|
||
|
||
def scan_url(client: Client, args: dict, private: bool = False) -> CommandResults: | ||
""" | ||
1 API Call | ||
""" | ||
|
@@ -2153,7 +2295,10 @@ def scan_url_command(client: Client, args: dict) -> CommandResults: | |
headers = ['id', 'url'] | ||
|
||
try: | ||
raw_response = client.url_scan(url) | ||
if private: | ||
raw_response = client.private_url_scan(url) | ||
else: | ||
raw_response = client.url_scan(url) | ||
data = raw_response['data'] | ||
|
||
data['url'] = url | ||
|
@@ -2507,22 +2652,38 @@ def private_get_analysis_command(client: Client, args: dict) -> CommandResults: | |
raw_response = client.get_private_analysis(analysis_id) | ||
data = raw_response.get('data', {}) | ||
attributes = data.get('attributes', {}) | ||
stats = { | ||
'threat_severity_level': '', | ||
'popular_threat_category': '', | ||
'threat_verdict': '', | ||
} | ||
|
||
if sha256 := raw_response.get('meta', {}).get('file_info', {}).get('sha256'): | ||
attributes['sha256'] = sha256 | ||
|
||
if url := raw_response.get('meta', {}).get('url_info', {}).get('url'): | ||
attributes['url'] = url | ||
|
||
if attributes.get('status', '') == 'completed': | ||
file_response = client.get_private_file_from_analysis(analysis_id) | ||
file_attributes = file_response.get('data', {}).get('attributes', {}) | ||
threat_severity = file_attributes.get('threat_severity', {}) | ||
severity_level = threat_severity.get('threat_severity_level', '') | ||
stats['threat_severity_level'] = SEVERITY_LEVELS.get(severity_level, severity_level) | ||
threat_severity_data = threat_severity.get('threat_severity_data', {}) | ||
stats['popular_threat_category'] = threat_severity_data.get('popular_threat_category', '') | ||
verdict = file_attributes.get('threat_verdict', '') | ||
stats['threat_verdict'] = VERDICTS.get(verdict, verdict) | ||
attributes.update(stats) | ||
stats = {} | ||
item_response = client.get_private_item_from_analysis(analysis_id) | ||
item_attributes = item_response.get('data', {}).get('attributes', {}) | ||
|
||
# File attributes | ||
if threat_severity := item_attributes.get('threat_severity'): | ||
if severity_level := threat_severity.get('threat_severity_level'): | ||
stats['threat_severity_level'] = SEVERITY_LEVELS.get(severity_level, severity_level) | ||
if popular_threat_category := threat_severity.get('threat_severity_data', {}).get('popular_threat_category'): | ||
stats['popular_threat_category'] = popular_threat_category | ||
if verdict := item_attributes.get('threat_verdict'): | ||
stats['threat_verdict'] = VERDICTS.get(verdict, verdict) | ||
|
||
# URL attributes | ||
if last_analysis_stats := item_attributes.get('last_analysis_stats'): | ||
if detection_engines := sum(last_analysis_stats.values()): | ||
positive_detections = last_analysis_stats.get('malicious', 0) | ||
stats['positives'] = f'{positive_detections}/{detection_engines}' | ||
|
||
attributes.update(stats) | ||
for field in ['title', 'last_http_response_content_sha256']: | ||
if value := item_attributes.get(field): | ||
attributes[field] = value | ||
|
||
return CommandResults( | ||
f'{INTEGRATION_ENTRY_CONTEXT}.Analysis', | ||
'id', | ||
|
@@ -2532,7 +2693,21 @@ def private_get_analysis_command(client: Client, args: dict) -> CommandResults: | |
**attributes, | ||
'id': analysis_id | ||
}, | ||
headers=['id', 'threat_severity_level', 'popular_threat_category', 'threat_verdict', 'status'], | ||
headers=[ | ||
# Common headers | ||
'id', | ||
'status', | ||
# File attributes | ||
'sha256' | ||
'threat_severity_level', | ||
'popular_threat_category', | ||
'threat_verdict', | ||
# URL attributes | ||
'url', | ||
'title', | ||
'last_http_response_content_sha256', | ||
'positives', | ||
], | ||
removeNull=True, | ||
headerTransform=string_to_table_header | ||
), | ||
|
@@ -2794,6 +2969,10 @@ def main(params: dict, args: dict, command: str): | |
results = private_file_command(client, args) | ||
elif command == f'{COMMAND_PREFIX}-privatescanning-file-scan': | ||
results = private_file_scan(client, args) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should rename this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes but URL scan command is called |
||
elif command == f'{COMMAND_PREFIX}-privatescanning-url': | ||
results = private_url_command(client, args) | ||
elif command == f'{COMMAND_PREFIX}-privatescanning-url-scan': | ||
results = private_scan_url_command(client, args) | ||
elif command == f'{COMMAND_PREFIX}-privatescanning-analysis-get': | ||
results = private_get_analysis_command(client, args) | ||
elif command == f'{COMMAND_PREFIX}-assessment-get': | ||
|
@@ -2804,6 +2983,8 @@ def main(params: dict, args: dict, command: str): | |
results = private_file_scan_and_get_analysis(client, args) | ||
elif command == f'{COMMAND_PREFIX}-url-scan-and-analysis-get': | ||
results = url_scan_and_get_analysis(client, score_calculator, args, url_relationships) | ||
elif command == f'{COMMAND_PREFIX}-private-url-scan-and-analysis-get': | ||
results = private_url_scan_and_get_analysis(client, args) | ||
elif command == f'{COMMAND_PREFIX}-curated-campaigns-get': | ||
results = get_curated_campaigns_command(client, args) | ||
elif command == f'{COMMAND_PREFIX}-curated-malware-families-get': | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call this
detection_ratio
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are calling this attribute
positives
in all commands (it appears in allbuild_<ioc>_output
methods).