Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(GTI): Private URL scanning #117

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ def url(self, url: str, relationships: str = ''):
ok_codes=(404, 429, 200)
)

def private_url(self, url: str):
"""
See Also:
https://gtidocs.virustotal.com/reference/get-a-private-url-analysis-report
"""
return self._http_request(
'GET',
f'private/urls/{encode_url_to_base64(url)}',
ok_codes=(404, 429, 200)
)

def domain(self, domain: str, relationships: str = '') -> dict:
"""
See Also:
Expand Down Expand Up @@ -362,7 +373,7 @@ def private_file_scan(self, file_path: str) -> dict:
resp_type='response'
)
demisto.debug(
f'scan_file response:\n'
f'scan_private_file response:\n'
f'{str(response.status_code)=}, {str(response.headers)=}, {str(response.content)}'
)
return response.json()
Expand Down Expand Up @@ -398,6 +409,17 @@ def url_scan(self, url: str) -> dict:
data={'url': url}
)

def private_url_scan(self, url: str) -> dict:
"""
See Also:
https://gtidocs.virustotal.com/reference/private-scan-url
"""
return self._http_request(
'POST',
'/private/urls',
data={'url': url}
)

# endregion

def file_sandbox_report(self, file_hash: dict, limit: int) -> dict:
Expand Down Expand Up @@ -454,14 +476,14 @@ def get_private_analysis(self, analysis_id: str) -> dict:
f'private/analyses/{analysis_id}'
)

def get_private_file_from_analysis(self, analysis_id: str) -> dict:
def get_private_item_from_analysis(self, analysis_id: str) -> dict:
"""
See Also:
https://gtidocs.virustotal.com/reference/analysesidrelationship
"""
return self._http_request(
'GET',
f'private/analyses/{analysis_id}/item?attributes=threat_severity,threat_verdict'
f'private/analyses/{analysis_id}/item'
)

def get_file_sigma_analysis(self, file_hash: str) -> dict:
Expand Down Expand Up @@ -1410,6 +1432,37 @@ def build_url_output(
)


def build_private_url_output(url: str, raw_response: dict) -> CommandResults:
data = raw_response.get('data', {})
attributes = data.get('attributes', {})

last_analysis_stats = attributes.get('last_analysis_stats', {})
positive_detections = last_analysis_stats.get('malicious', 0)
detection_engines = sum(last_analysis_stats.values())

return CommandResults(
outputs_prefix=f'{INTEGRATION_ENTRY_CONTEXT}.URL',
outputs_key_field='id',
readable_output=tableToMarkdown(
f'URL data of "{url}"',
{
**attributes,
'positives': f'{positive_detections}/{detection_engines}',
},
headers=[
'url',
'title',
'last_http_response_content_sha256',
'positives',
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this detection_ratio

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are calling this attribute positives in all commands (it appears in all build_<ioc>_output methods).

],
removeNull=True,
headerTransform=string_to_table_header
),
outputs=data,
raw_response=raw_response,
)


def build_ip_output(
client: Client,
score_calculator: ScoreCalculator,
Expand Down Expand Up @@ -1784,7 +1837,7 @@ def private_file_command(client: Client, args: dict) -> List[CommandResults]:
execution_metrics.success += 1
except Exception as exc:
# If anything happens, just keep going
demisto.debug(f'Could not process file: "{file}"\n {str(exc)}')
demisto.debug(f'Could not process private file: "{file}"\n {str(exc)}')
execution_metrics.general_error += 1
results.append(build_error_file_output(client, file))
continue
Expand All @@ -1803,6 +1856,7 @@ def url_command(client: Client, score_calculator: ScoreCalculator, args: dict, r
extended_data = argToBoolean(args.get('extended_data', False))
results: List[CommandResults] = []
execution_metrics = ExecutionMetrics()

for url in urls:
try:
raw_response = client.url(url, relationships)
Expand All @@ -1828,6 +1882,39 @@ def url_command(client: Client, score_calculator: ScoreCalculator, args: dict, r
return results


def private_url_command(client: Client, args: dict) -> List[CommandResults]:
"""
1 API Call
"""
urls = argToList(args['url'])
results: List[CommandResults] = []
execution_metrics = ExecutionMetrics()

for url in urls:
try:
raw_response = client.private_url(url)
if raw_response.get('error', {}).get('code') == 'QuotaExceededError':
execution_metrics.quota_error += 1
results.append(build_quota_exceeded_url_output(client, url))
continue
if raw_response.get('error', {}).get('code') == 'NotFoundError':
results.append(build_unknown_url_output(client, url))
continue
except Exception as exc:
# If anything happens, just keep going
demisto.debug(f'Could not process private URL: "{url}".\n {str(exc)}')
execution_metrics.general_error += 1
results.append(build_error_url_output(client, url))
continue
execution_metrics.success += 1
results.append(build_private_url_output(url, raw_response))
if execution_metrics.is_supported():
_metric_results = execution_metrics.metrics
metric_results = cast(CommandResults, _metric_results)
results.append(metric_results)
return results


def domain_command(client: Client, score_calculator: ScoreCalculator, args: dict, relationships: str) -> List[CommandResults]:
"""
1 API Call for regular
Expand Down Expand Up @@ -2031,7 +2118,6 @@ def file_scan_and_get_analysis(
def private_file_scan_and_get_analysis(client: Client, args: dict):
"""Calls to gti-privatescanning-file-scan and gti-privatescanning-analysis-get."""
interval = int(args.get('interval_in_seconds', 60))
extended = argToBoolean(args.get('extended_data', False))

if not args.get('id'):
command_results = private_file_scan(client, args)
Expand All @@ -2046,7 +2132,6 @@ def private_file_scan_and_get_analysis(client: Client, args: dict):
'entryID': args.get('entryID'),
'id': outputs.get('vtScanID'),
'interval_in_seconds': interval,
'extended_data': extended,
},
timeout_in_seconds=6000,
)
Expand All @@ -2066,7 +2151,6 @@ def private_file_scan_and_get_analysis(client: Client, args: dict):
'entryID': args.get('entryID'),
'id': outputs.get('id'),
'interval_in_seconds': interval,
'extended_data': extended,
},
timeout_in_seconds=6000,
)
Expand Down Expand Up @@ -2122,6 +2206,50 @@ def url_scan_and_get_analysis(
return CommandResults(scheduled_command=scheduled_command)


def private_url_scan_and_get_analysis(
client: Client,
args: dict
):
"""Calls to gti-privatescanning-url-scan and gti-privatescanning-analysis-get."""
interval = int(args.get('interval_in_seconds', 60))

if not args.get('id'):
command_result = private_scan_url_command(client, args)
outputs = command_result.outputs
if not isinstance(outputs, dict):
raise DemistoException('outputs is expected to be a dict')
scheduled_command = ScheduledCommand(
command=f'{COMMAND_PREFIX}-private-url-scan-and-analysis-get',
next_run_in_seconds=interval,
args={
'url': args.get('url'),
'id': outputs.get('vtScanID'),
'interval_in_seconds': interval,
},
timeout_in_seconds=6000,
)
command_result.scheduled_command = scheduled_command
return command_result

command_result = private_get_analysis_command(client, args)
outputs = command_result.outputs
if not isinstance(outputs, dict):
raise DemistoException('outputs is expected to be a dict')
if outputs.get('data', {}).get('attributes', {}).get('status') == 'completed':
return command_result
scheduled_command = ScheduledCommand(
command=f'{COMMAND_PREFIX}-private-url-scan-and-analysis-get',
next_run_in_seconds=interval,
args={
'url': args.get('url'),
'id': outputs.get('id'),
'interval_in_seconds': interval,
},
timeout_in_seconds=6000,
)
return CommandResults(scheduled_command=scheduled_command)


def get_upload_url(client: Client) -> CommandResults:
"""
1 API Call
Expand All @@ -2143,6 +2271,20 @@ def get_upload_url(client: Client) -> CommandResults:


def scan_url_command(client: Client, args: dict) -> CommandResults:
"""
1 API Call
"""
return scan_url(client, args)


def private_scan_url_command(client: Client, args: dict) -> CommandResults:
"""
1 API Call
"""
return scan_url(client, args, True)


def scan_url(client: Client, args: dict, private: bool = False) -> CommandResults:
"""
1 API Call
"""
Expand All @@ -2153,7 +2295,10 @@ def scan_url_command(client: Client, args: dict) -> CommandResults:
headers = ['id', 'url']

try:
raw_response = client.url_scan(url)
if private:
raw_response = client.private_url_scan(url)
else:
raw_response = client.url_scan(url)
data = raw_response['data']

data['url'] = url
Expand Down Expand Up @@ -2507,22 +2652,38 @@ def private_get_analysis_command(client: Client, args: dict) -> CommandResults:
raw_response = client.get_private_analysis(analysis_id)
data = raw_response.get('data', {})
attributes = data.get('attributes', {})
stats = {
'threat_severity_level': '',
'popular_threat_category': '',
'threat_verdict': '',
}

if sha256 := raw_response.get('meta', {}).get('file_info', {}).get('sha256'):
attributes['sha256'] = sha256

if url := raw_response.get('meta', {}).get('url_info', {}).get('url'):
attributes['url'] = url

if attributes.get('status', '') == 'completed':
file_response = client.get_private_file_from_analysis(analysis_id)
file_attributes = file_response.get('data', {}).get('attributes', {})
threat_severity = file_attributes.get('threat_severity', {})
severity_level = threat_severity.get('threat_severity_level', '')
stats['threat_severity_level'] = SEVERITY_LEVELS.get(severity_level, severity_level)
threat_severity_data = threat_severity.get('threat_severity_data', {})
stats['popular_threat_category'] = threat_severity_data.get('popular_threat_category', '')
verdict = file_attributes.get('threat_verdict', '')
stats['threat_verdict'] = VERDICTS.get(verdict, verdict)
attributes.update(stats)
stats = {}
item_response = client.get_private_item_from_analysis(analysis_id)
item_attributes = item_response.get('data', {}).get('attributes', {})

# File attributes
if threat_severity := item_attributes.get('threat_severity'):
if severity_level := threat_severity.get('threat_severity_level'):
stats['threat_severity_level'] = SEVERITY_LEVELS.get(severity_level, severity_level)
if popular_threat_category := threat_severity.get('threat_severity_data', {}).get('popular_threat_category'):
stats['popular_threat_category'] = popular_threat_category
if verdict := item_attributes.get('threat_verdict'):
stats['threat_verdict'] = VERDICTS.get(verdict, verdict)

# URL attributes
if last_analysis_stats := item_attributes.get('last_analysis_stats'):
if detection_engines := sum(last_analysis_stats.values()):
positive_detections = last_analysis_stats.get('malicious', 0)
stats['positives'] = f'{positive_detections}/{detection_engines}'

attributes.update(stats)
for field in ['title', 'last_http_response_content_sha256']:
if value := item_attributes.get(field):
attributes[field] = value

return CommandResults(
f'{INTEGRATION_ENTRY_CONTEXT}.Analysis',
'id',
Expand All @@ -2532,7 +2693,21 @@ def private_get_analysis_command(client: Client, args: dict) -> CommandResults:
**attributes,
'id': analysis_id
},
headers=['id', 'threat_severity_level', 'popular_threat_category', 'threat_verdict', 'status'],
headers=[
# Common headers
'id',
'status',
# File attributes
'sha256'
'threat_severity_level',
'popular_threat_category',
'threat_verdict',
# URL attributes
'url',
'title',
'last_http_response_content_sha256',
'positives',
],
removeNull=True,
headerTransform=string_to_table_header
),
Expand Down Expand Up @@ -2794,6 +2969,10 @@ def main(params: dict, args: dict, command: str):
results = private_file_command(client, args)
elif command == f'{COMMAND_PREFIX}-privatescanning-file-scan':
results = private_file_scan(client, args)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should rename this to private_scan_file_command to keep consistency.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but URL scan command is called scan_url_command while file scan command is called file_scan. We should rename this command too.

elif command == f'{COMMAND_PREFIX}-privatescanning-url':
results = private_url_command(client, args)
elif command == f'{COMMAND_PREFIX}-privatescanning-url-scan':
results = private_scan_url_command(client, args)
elif command == f'{COMMAND_PREFIX}-privatescanning-analysis-get':
results = private_get_analysis_command(client, args)
elif command == f'{COMMAND_PREFIX}-assessment-get':
Expand All @@ -2804,6 +2983,8 @@ def main(params: dict, args: dict, command: str):
results = private_file_scan_and_get_analysis(client, args)
elif command == f'{COMMAND_PREFIX}-url-scan-and-analysis-get':
results = url_scan_and_get_analysis(client, score_calculator, args, url_relationships)
elif command == f'{COMMAND_PREFIX}-private-url-scan-and-analysis-get':
results = private_url_scan_and_get_analysis(client, args)
elif command == f'{COMMAND_PREFIX}-curated-campaigns-get':
results = get_curated_campaigns_command(client, args)
elif command == f'{COMMAND_PREFIX}-curated-malware-families-get':
Expand Down
Loading
Loading