-
Notifications
You must be signed in to change notification settings - Fork 58
/
Copy pathvthelper.py
242 lines (205 loc) · 8.16 KB
/
vthelper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2014-2023 Greenbone AG
#
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Provide functions to handle VT Info."""
from hashlib import sha256
import logging
from typing import Any, Optional, Dict, List, Tuple, Iterator
from itertools import chain
from ospd.cvss import CVSS
from ospd_openvas.nvticache import NVTICache
from ospd_openvas.notus import Notus
logger = logging.getLogger(__name__)
class VtHelper:
def __init__(self, nvticache: NVTICache, notus: Optional[Notus] = None):
self.nvti = nvticache
self.notus = notus
def get_single_vt(self, vt_id: str, oids=None) -> Optional[Dict[str, Any]]:
nr = None
if self.notus:
nr = self.notus.get_nvt_metadata(vt_id)
if nr:
custom = nr
else:
custom = self.nvti.get_nvt_metadata(vt_id)
if not custom:
return None
vt_params = custom.pop('vt_params')
vt_refs = custom.pop('refs')
name = custom.pop('name')
vt_creation_time = custom.pop('creation_date')
vt_modification_time = custom.pop('last_modification')
if oids:
vt_dependencies = list()
if 'dependencies' in custom:
deps = custom.pop('dependencies')
deps_list = deps.split(', ')
for dep_name in deps_list:
# this will bug out on notus since notus does contain
# multiple oids per advisory; however luckily they don't
# have dependencies; otherwise it must be treated as a list
dep_oid = oids.get(dep_name)
if dep_oid:
vt_dependencies.append(dep_oid)
else:
vt_dependencies.append(dep_name)
else:
vt_dependencies = None
summary = None
impact = None
affected = None
insight = None
solution = None
solution_t = None
solution_m = None
vuldetect = None
qod_t = None
qod_v = None
if 'summary' in custom:
summary = custom.pop('summary')
if 'impact' in custom:
impact = custom.pop('impact')
if 'affected' in custom:
affected = custom.pop('affected')
if 'insight' in custom:
insight = custom.pop('insight')
if 'solution' in custom:
solution = custom.pop('solution')
if 'solution_type' in custom:
solution_t = custom.pop('solution_type')
if 'solution_method' in custom:
solution_m = custom.pop('solution_method')
if 'vuldetect' in custom:
vuldetect = custom.pop('vuldetect')
if 'qod_type' in custom:
qod_t = custom.pop('qod_type')
elif 'qod' in custom:
qod_v = custom.pop('qod')
severity = dict()
if 'severity_vector' in custom:
severity_vector = custom.pop('severity_vector')
else:
severity_vector = custom.pop('cvss_base_vector', None)
if not severity_vector:
logger.warning("no severity_vector in %s found.", vt_id)
# when there is no severity than we return None; alternatively we
# could set it to an empty dict and continue
# severity_vector = {}
return None
severity['severity_base_vector'] = severity_vector
if "CVSS:3" in severity_vector:
severity_type = 'cvss_base_v3'
else:
severity_type = 'cvss_base_v2'
severity['severity_type'] = severity_type
if 'severity_date' in custom:
severity['severity_date'] = custom.pop('severity_date')
else:
severity['severity_date'] = vt_creation_time
if 'severity_origin' in custom:
severity['severity_origin'] = custom.pop('severity_origin')
if name is None:
name = ''
vt = {'name': name}
if custom is not None:
vt["custom"] = custom
if vt_params is not None:
vt["vt_params"] = vt_params
if vt_refs is not None:
vt["vt_refs"] = vt_refs
if vt_dependencies is not None:
vt["vt_dependencies"] = vt_dependencies
if vt_creation_time is not None:
vt["creation_time"] = vt_creation_time
if vt_modification_time is not None:
vt["modification_time"] = vt_modification_time
if summary is not None:
vt["summary"] = summary
if impact is not None:
vt["impact"] = impact
if affected is not None:
vt["affected"] = affected
if insight is not None:
vt["insight"] = insight
if solution is not None:
vt["solution"] = solution
if solution_t is not None:
vt["solution_type"] = solution_t
if solution_m is not None:
vt["solution_method"] = solution_m
if vuldetect is not None:
vt["detection"] = vuldetect
if qod_t is not None:
vt["qod_type"] = qod_t
elif qod_v is not None:
vt["qod"] = qod_v
if severity is not None:
vt["severities"] = severity
return vt
def get_vt_iterator(
self, vt_selection: List[str] = None, details: bool = True
) -> Iterator[Tuple[str, Dict]]:
"""Yield the vts from the Redis NVTicache."""
oids = None
if not vt_selection or details:
# notus contains multiple oids per advisory therefore unlike
# nasl they share the filename
# The vt collection is taken from both Caches
if self.notus:
vt_collection = chain(
self.notus.get_oids(), self.nvti.get_oids()
)
else:
vt_collection = self.nvti.get_oids()
if not vt_selection:
vt_selection = [v for _, v in vt_collection]
if details:
# luckily notus doesn't have dependency therefore we can
# treat oids for dependency lookup as a dict
oids = dict(vt_collection)
vt_selection.sort()
for vt_id in vt_selection:
vt = self.get_single_vt(vt_id, oids)
if vt:
yield (vt_id, vt)
def vt_verification_string_iter(self) -> str:
# for a reproducible hash calculation
# the vts must already be sorted in the dictionary.
for vt_id, vt in self.get_vt_iterator(details=False):
param_chain = ""
vt_params = vt.get('vt_params')
if vt_params:
for _, param in sorted(vt_params.items()):
if param:
param_chain += (
param.get('id')
+ param.get('name')
+ param.get('default')
)
yield (
(vt_id + vt.get('modification_time')).encode('utf-8')
+ param_chain.encode('utf-8')
)
def calculate_vts_collection_hash(self) -> str:
"""Calculate the vts collection sha256 hash."""
m = sha256() # pylint: disable=invalid-name
for chunk in self.vt_verification_string_iter():
m.update(chunk)
return m.hexdigest()
def get_severity_score(self, vt_aux: dict) -> Optional[float]:
"""Return the severity score for the given oid.
Arguments:
vt_aux: VT element from which to get the severity vector
Returns:
The calculated cvss base value. None if there is no severity
vector or severity type is not cvss base version 2.
"""
if vt_aux:
severity_type = vt_aux['severities'].get('severity_type')
severity_vector = vt_aux['severities'].get('severity_base_vector')
if severity_type == "cvss_base_v2" and severity_vector:
return CVSS.cvss_base_v2_value(severity_vector)
elif severity_type == "cvss_base_v3" and severity_vector:
return CVSS.cvss_base_v3_value(severity_vector)
return None