Skip to content

Commit

Permalink
feat: added deduplication database table (#4206)
Browse files Browse the repository at this point in the history
* feat: added deduplication database table
* feat: removed a test for 'FAIL-PKG-INFO'
* feat: changed logic of handling products with 'UNKNOWN' vendors

Signed-off-by: Meet Soni <[email protected]>
  • Loading branch information
inosmeet committed Jun 24, 2024
1 parent a8210b9 commit b95b8be
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 31 deletions.
12 changes: 12 additions & 0 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ class CVEDB:
PRIMARY KEY(metrics_id)
)
""",
"deduplication": """
CREATE TABLE IF NOT EXISTS deduplication (
purl TEXT,
vendor TEXT,
PRIMARY KEY (purl, vendor)
)
""",
}

EMPTY_SELECT_QUERIES = {
Expand Down Expand Up @@ -392,20 +399,25 @@ def init_database(self) -> None:
exploit_table_create,
cve_metrics_table_create,
metrics_table_create,
deduplication,
) = (
self.TABLE_SCHEMAS["cve_severity"],
self.TABLE_SCHEMAS["cve_range"],
self.TABLE_SCHEMAS["cve_exploited"],
self.TABLE_SCHEMAS["cve_metrics"],
self.TABLE_SCHEMAS["metrics"],
self.TABLE_SCHEMAS["deduplication"],
)
index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)"
index_purl = "CREATE INDEX IF NOT EXISTS purl_index ON deduplication (purl)"
cursor.execute(cve_data_create)
cursor.execute(version_range_create)
cursor.execute(exploit_table_create)
cursor.execute(cve_metrics_table_create)
cursor.execute(metrics_table_create)
cursor.execute(deduplication)
cursor.execute(index_range)
cursor.execute(index_purl)

(
severity_schema,
Expand Down
72 changes: 60 additions & 12 deletions cve_bin_tool/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, cve_db, logger):
self.logger = logger
self.filename = ""
self.purl_pkg_type = "default"
self.connections = {}

def run_checker(self, filename):
"""
Expand Down Expand Up @@ -117,7 +118,7 @@ def find_vendor_from_purl(self, purl, ver) -> tuple[list[ScanInfo], bool]:
UNION
SELECT cpe from purl2cpe WHERE purl LIKE ?
"""
cursor = self.db_open_and_get_cursor()
cursor = self.db_open_and_get_cursor("purl2cpe/purl2cpe.db")
cursor.execute(query, (param1, param2))
cpeList = cursor.fetchall()
vendorlist: list[ScanInfo] = []
Expand Down Expand Up @@ -147,22 +148,69 @@ def find_vendor_from_purl(self, purl, ver) -> tuple[list[ScanInfo], bool]:

return vendorlist, True
except Exception as e:
self.logger.error(f"Error occurred: {e}")
self.logger.debug(f"Error occurred: {e}")
self.logger.error("Unable to access purl2cpe database.")
return [], False

def db_open_and_get_cursor(self) -> sqlite3.Cursor:
"""Opens connection to sqlite database, returns cursor object."""
def deduplication(self, purl, vendorlist) -> list[ScanInfo]:
"""
Modifies invalid vendors associated with a given PURL using the deduplication database.
dbpath = (
Path("~").expanduser() / ".cache" / "cve-bin-tool" / "purl2cpe/purl2cpe.db"
)
connection = sqlite3.connect(dbpath)
It queries the database for vendors associated with the PURL and filters the input 'vendorlist'
accordingly:
- If a vendor from 'vendorlist' is found in the database (valid vendor), it is added directly
to 'vendorlist_filtered'.
- If a vendor from 'vendorlist' is not found in the database (invalid vendor), a new ScanInfo
object is created with the vendor marked as 'UNKNOWN' and added to 'vendorlist_filtered'.
"""
try:
purl = purl.to_dict()
param = f"pkg:{purl['type']}/{purl['name']}"
query = """
SELECT vendor FROM deduplication WHERE purl LIKE ?
"""
vendorlist_filtered: list[ScanInfo] = []
cursor = self.db_open_and_get_cursor("cve.db")
cursor.execute(query, (param,))

invalidVendorList = [i[0] for i in cursor.fetchall()]

for item in vendorlist:
if item.product_info.vendor not in invalidVendorList:
vendorlist_filtered.append(item)

if len(vendorlist_filtered) == 0:
vendorlist_filtered.append(
ScanInfo(
ProductInfo(
"UNKNOWN",
item.product_info.product,
item.product_info.version,
item.file_path,
item.product_info.purl,
),
item.file_path,
)
)
return vendorlist_filtered
except Exception as e:
self.logger.debug(f"error: {e}")
self.logger.error("Unable to access deduplication database.")
return vendorlist

def db_open_and_get_cursor(self, dbname) -> sqlite3.Cursor:
"""Opens connection to sqlite database, returns cursor object."""

if connection is not None:
cursor = connection.cursor()
if cursor is None:
dbpath = Path("~").expanduser() / ".cache" / "cve-bin-tool" / dbname
if dbname not in self.connections:
self.connections[dbname] = sqlite3.connect(dbpath)
connection = self.connections[dbname]
if connection.cursor() is None:
self.logger.error("Database cursor does not exist")
raise CVEDBError
return cursor
return connection.cursor()

def decode_cpe23(self, cpe23) -> tuple[str, str, str]:
"""
Expand Down
19 changes: 4 additions & 15 deletions cve_bin_tool/parsers/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from cve_bin_tool.parsers import Parser
from cve_bin_tool.strings import parse_strings
from cve_bin_tool.util import ProductInfo, ScanInfo


class PythonRequirementsParser(Parser):
Expand Down Expand Up @@ -106,6 +105,7 @@ def run_checker(self, filename):
if not result:
vendor = self.find_vendor(product, version)

vendor = self.deduplication(purl, vendor)
if vendor is not None:
yield from vendor
self.logger.debug(f"Done scanning file: {self.filename}")
Expand Down Expand Up @@ -159,23 +159,12 @@ def run_checker(self, filename):
purl = self.generate_purl(product)
vendor, result = self.find_vendor_from_purl(purl, version)

if not result:
vendor = self.find_vendor(product, version)

if vendor is not None:
yield from vendor

if not result:
vendor_package_pair = self.cve_db.get_vendor_product_pairs(product)
if vendor_package_pair != []:
for pair in vendor_package_pair:
vendor = pair["vendor"]
location = pair.get("location", self.filename)
file_path = self.filename
self.logger.debug(
f"{file_path} is {vendor}.{product} {version}"
)
yield ScanInfo(
ProductInfo(vendor, product, version, location), file_path
)

# There are packages with a METADATA file in them containing different data from what the tool expects
except AttributeError:
self.logger.debug(f"{filename} is an invalid METADATA/PKG-INFO")
Expand Down
3 changes: 0 additions & 3 deletions test/language_data/FAIL-PKG-INFO

This file was deleted.

1 change: 0 additions & 1 deletion test/test_language_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ def test_javascript_package_none_found(self, filename: str) -> None:
@pytest.mark.parametrize(
"filename",
[
(str(TEST_FILE_PATH / "FAIL-PKG-INFO")),
(str(TEST_FILE_PATH / "fail_pom.xml")),
],
)
Expand Down

0 comments on commit b95b8be

Please sign in to comment.