Skip to content

Commit

Permalink
Merge pull request #74 from RedHatProductSecurity/adp
Browse files Browse the repository at this point in the history
Add ADP functionality
  • Loading branch information
mprpic committed Jul 13, 2023
2 parents f67e9cc + fa2f970 commit 1ec0c22
Show file tree
Hide file tree
Showing 6 changed files with 679 additions and 24 deletions.
207 changes: 186 additions & 21 deletions cvelib/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,29 +314,32 @@ def publish(
cve_json_file: Optional[TextIO],
print_raw: bool,
) -> None:
"""Publish a CVE record for a reserved (or rejected) CVE ID.
"""Publish a CNA container of a CVE record for a reserved (or rejected) CVE ID.
If the CVE is already published, this action will update its record. A published CVE can only be
moved to the rejected state with an appropriate reject record (see `cve reject`). A published
CVE cannot be moved back to the reserved state.
If the CVE is already published, this action will update the CNA container in its record. A
published CVE can only be moved to the rejected state with an appropriate reject record (see
`cve reject`). A published CVE cannot be moved back to the reserved state.
The CVE record can be specified as a string:
The CNA container can be specified as a string:
cve publish CVE-2022-1234 -j '{"affected": [], "descriptions": [], "references": {}, ...}'
Or passed in a file:
cve publish CVE-2022-1234 -f v5_record.json
cve publish CVE-2022-1234 -f cve.json
For information on the required properties in a given CVE JSON record, see the schema in:\n
For information on the required properties in a given CNA container of a CVE record,
see the schema in:\n
https://github.com/CVEProject/cve-schema/blob/master/schema/v5.0/CVE_JSON_5.0_schema.json
https://cveproject.github.io/cve-schema/schema/v5.0/docs/#oneOf_i0_containers_cna
Because the CVE Services API only expects the cnaPublishedContainer contents of the full record,
the record you pass to this command can specify just that data, and not the full record.
Because the CVE Services API only expects the CNA container contents of the full CVE v5 record,
the data you pass to this command can specify just the attributes defined by the
cnaPublishedContainer or cnaRejectedContainer subschemas, and not the full schema record.
"""
if cve_json_file is not None and cve_json_str is not None:
raise click.BadParameter(
"cannot use both `-f/--cve-json-file` and `-j/--cve-json` to provide a CVE JSON record."
"cannot use both `-f/--cve-json-file` and `-j/--cve-json` to provide a CNA JSON data."
)

try:
Expand All @@ -346,19 +349,19 @@ def publish(
cve_json = json.load(cve_json_file)
else:
raise click.BadParameter(
"must provide CVE JSON record using one of: "
"must provide CNA JSON data using one of: "
"`-f/--cve-json-file` or `-j/--cve-json`."
)
except json.JSONDecodeError as exc:
print_error(msg="CVE data is not valid JSON", details=str(exc))
print_error(msg="CNA data is not valid JSON", details=str(exc))
return

if ctx.obj.interactive:
click.echo("You are about to publish a CVE record for ", nl=False)
click.echo("You are about to publish a CNA container in the CVE record for ", nl=False)
click.secho(cve_id, bold=True, nl=False)
click.echo(" using the following input:\n\n", nl=False)
print_json_data(cve_json)
if not click.confirm("\n\nDo you want to continue?"):
if not click.confirm("\nDo you want to continue?"):
click.echo("Exiting...")
sys.exit(0)
click.echo()
Expand All @@ -378,6 +381,95 @@ def publish(
else:
click.echo("Published the following CVE:\n")
print_cve_record(response_data["created"] if created else response_data["updated"])
click.echo(f'\nAPI response: {response_data["message"]}')


@cli.command()
@click.argument("cve_id", callback=validate_cve)
@click.option(
"-j",
"--adp-json",
"adp_json_str",
type=click.STRING,
help="JSON body of the ADP container to publish.",
)
@click.option(
"-f",
"--adp-json-file",
type=click.File(),
help="File containing JSON body of ADP container to publish.",
)
@click.option("--raw", "print_raw", default=False, is_flag=True, help="Print response JSON.")
@click.pass_context
@handle_cve_api_error
def publish_adp(
ctx: click.Context,
cve_id: str,
adp_json_str: Optional[str],
adp_json_file: Optional[TextIO],
print_raw: bool,
) -> None:
"""Add or update an ADP container in a CVE record for a published CVE ID.
NOTE: a published ADP container cannot be removed, only updated with new data.
The ADP container can be specified as a string:
cve publish-adp CVE-2022-1234 -j '{"affected": [], "descriptions": [], "references": {}, ...}'
Or passed in a file:
cve publish-adp CVE-2022-1234 -f adp.json
For information on the required properties in a given ADP container of a CVE record,
see the schema in:\n
https://github.com/CVEProject/cve-schema/blob/master/schema/v5.0/CVE_JSON_5.0_schema.json
https://cveproject.github.io/cve-schema/schema/v5.0/docs/#oneOf_i0_containers_adp
Because the CVE Services API only expects the ADP container contents of the full CVE v5 record,
the data you pass to this command can specify just the attributes defined by the adpContainer
subschema.
"""
if adp_json_file is not None and adp_json_str is not None:
raise click.BadParameter(
"cannot use both `-f/--adp-json-file` and `-j/--adp-json` to provide ADP JSON data."
)
try:
if adp_json_str is not None:
cve_json = json.loads(adp_json_str)
elif adp_json_file is not None:
cve_json = json.load(adp_json_file)
else:
raise click.BadParameter(
"must provide ADP JSON record using one of: "
"`-f/--adp-json-file` or `-j/--adp-json`."
)
except json.JSONDecodeError as exc:
print_error(msg="ADP data is not valid JSON", details=str(exc))
return

if ctx.obj.interactive:
click.echo("You are about to publish an ADP record for ", nl=False)
click.secho(cve_id, bold=True, nl=False)
click.echo(" using the following input:\n\n", nl=False)
print_json_data(cve_json)
if not click.confirm(
"\nADP containers cannot be removed once published; they can only be updated.\n"
"Do you want to continue?"
):
click.echo("Exiting...")
sys.exit(0)
click.echo()

cve_api = ctx.obj.cve_api
response_data = cve_api.publish_adp(cve_id, cve_json)

if print_raw:
print_json_data(response_data)
else:
click.echo("Published an ADP container for:\n")
print_cve_record(response_data["updated"])
click.echo(f'\nAPI response: {response_data["message"]}')


@cli.command()
Expand Down Expand Up @@ -412,6 +504,8 @@ def reject(
A rejected CVE without a record can be moved to the reserved state. A published CVE can only
be rejected with an accompanying record. Reserved CVEs can be rejected with or without a record.
Note: Rejecting a published CVE removes both the CNA and ADP data of that record.
The CVE reject record can be specified as a string:
cve reject CVE-2022-1234 -j '{"rejectedReasons": [{"lang": "en", "value": "A reason."}]}'
Expand Down Expand Up @@ -483,6 +577,7 @@ def reject(
else:
click.echo("Rejected the following CVE:\n")
print_cve_record(response_data["created"] if created else response_data["updated"])
click.echo(f'\nAPI response: {response_data["message"]}')


@cli.command()
Expand Down Expand Up @@ -586,43 +681,113 @@ def reserve(ctx: click.Context, random: bool, year: str, count: int, print_raw:
"--show-record",
default=False,
is_flag=True,
help="Show full CVE record in JSON v5 format.",
help="Show full CVE v5 record.",
)
@click.option(
"-c",
"--show-cna",
default=False,
is_flag=True,
help="Show only the CNA container of a CVE v5 record.",
)
@click.option(
"-a",
"--show-adp",
multiple=True,
is_flag=False,
flag_value="",
help=(
"Show all ADP containers, or a specific one identified by the owning org (option can "
"be used multiple times)."
),
show_default="all ADP containers",
)
@click.option("--raw", "print_raw", default=False, is_flag=True, help="Print response JSON.")
@click.argument("cve_id", callback=validate_cve)
@click.pass_context
@handle_cve_api_error
def show_cve(ctx: click.Context, show_record: bool, print_raw: bool, cve_id: str) -> None:
def show_cve(
ctx: click.Context,
show_record: bool,
show_cna: bool,
show_adp: str,
print_raw: bool,
cve_id: str,
) -> None:
"""Display a specific CVE ID (and optionally its record) owned by your CNA."""
if [show_record, show_cna, bool(show_adp)].count(True) > 1:
raise click.BadParameter(
"use one of: `-r/--show-record` OR `-c/--show-cna` OR `-a/--show-adp`."
)
cve_api = ctx.obj.cve_api

cve_id_data = cve_api.show_cve_id(cve_id=cve_id)
cve_record_data = {}
if show_record:
if show_record or show_cna or show_adp:
try:
cve_record_data = cve_api.show_cve_record(cve_id=cve_id)
except requests.exceptions.HTTPError as exc:
error_msg = exc.response.json()["error"]
if exc.response.status_code != 404 or error_msg != cve_api.Errors.RECORD_DOES_NOT_EXIST:
raise exc

org_to_adp = {}
# Convert tuple of specified org names to a set; if no orgs were specified but the `-a`
# option was used, the tuple will use the default empty string as the only value in the
# tuple (i.e. `('',)`). A resulting empty set indicates we should print all containers.
orgs = sorted({org for org in show_adp if org})
if show_adp and "adp" in cve_record_data.get("containers", {}):
for adp_container in cve_record_data["containers"]["adp"]:
org = adp_container["providerMetadata"]["shortName"]
if orgs and org not in orgs:
continue
org_to_adp[org] = adp_container

if print_raw:
# Display CVE record data only if we're showing the record. Otherwise, show the CVE ID
# data only.
# Display CVE record data (or parts of it) only. Otherwise, show the CVE ID data only.
if show_record:
print_json_data(cve_record_data)
elif show_cna:
cna_container = (
cve_record_data["containers"]["cna"] if cve_record_data else cve_record_data
)
print_json_data(cna_container)
elif show_adp:
print_json_data(list(org_to_adp.values()))
else:
print_json_data(cve_id_data)
else:
print_cve_id(cve_id_data)
# If we're showing the CVE record, display it as either the raw JSON if it exists or show
# an informational message if it does not.
# If we're showing the CVE record (or any of its parts), display it as either the raw
# JSON if it exists or show an informational message if it does not.
if show_record:
click.secho("-----", bold=True)
if cve_record_data:
print_json_data(cve_record_data)
else:
click.echo("CVE record does not exist.")
elif show_cna:
click.secho("-----", bold=True)
if cve_record_data:
print_json_data(cve_record_data["containers"]["cna"])
else:
click.echo("CVE record does not exist.")
elif show_adp:
click.secho("-----", bold=True)
if org_to_adp:
count = len(org_to_adp)
for idx, (org, data) in enumerate(org_to_adp.items()):
click.secho(f"{org} ADP data:", bold=True)
print_json_data(data)
# Add empty line between ADP containers except after the last one.
if idx + 1 < count:
click.echo()
else:
if orgs:
orgs_text = ", ".join(orgs)
click.echo(f"CVE record does not contain ADP data from org(s): {orgs_text}.")
else:
click.echo("CVE record does not contain any ADP data.")


@cli.command(name="list")
Expand Down
34 changes: 32 additions & 2 deletions cvelib/cve_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,33 @@ def _put(self, path: str, **kwargs) -> requests.Response:

@staticmethod
def _extract_cna_container(cve_json: dict) -> dict:
"""Check if we received a full v5 record and extract just the CNA container part from it.
"""Check if we are processing a full v5 record and extract just the CNA container from it.
A record "looks" like a full one if it has a dataType=CVE_RECORD attribute. Otherwise,
it is assumed to be the CNA container.
it is assumed to be a CNA container already so return it as is.
"""
if cve_json.get("dataType", "") == "CVE_RECORD":
return cve_json["containers"]["cna"]
return cve_json

@staticmethod
def _extract_adp_container(cve_json: dict) -> dict:
"""Check if we are processing a full v5 record and extract just the ADP container it.
A record "looks" like a full one if it has a dataType=CVE_RECORD attribute. Otherwise,
it is assumed to be an ADP container already so return it as is.
If multiple ADP containers are present in the record, return an error since we don't know
which ADP container is the one that we want to use.
"""
if cve_json.get("dataType", "") == "CVE_RECORD":
if len(cve_json["containers"]["adp"]) > 1:
raise RuntimeError(
"Cannot extract ADP container if multiple are present in CVE record"
)
return cve_json["containers"]["adp"][0]
return cve_json

def _add_provider_metadata(self, cve_json: dict) -> dict:
"""Add the providerMetadata objects to a CNA container if one is not present.
Expand Down Expand Up @@ -184,6 +202,18 @@ def update_published(self, cve_id: str, cve_json: dict, validate: bool = True) -
response.raise_for_status()
return response.json()

def publish_adp(self, cve_id: str, cve_json: dict, validate: bool = True) -> dict:
"""Add or update an ADP container from a JSON object representing the ADP container data."""
cve_json = self._extract_adp_container(cve_json)
cve_json = self._add_provider_metadata(cve_json)
if validate:
CveRecord.validate(cve_json, CveRecord.Schemas.ADP)

cve_json = {"adpContainer": cve_json}
response = self._put(f"cve/{cve_id}/adp", json=cve_json)
response.raise_for_status()
return response.json()

def reject(self, cve_id: str, cve_json: dict, validate: bool = True) -> dict:
"""Reject a CVE from a JSON object representing the CNA container data."""
cve_json = self._extract_cna_container(cve_json)
Expand Down
8 changes: 8 additions & 0 deletions tests/data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,11 @@ The example CVE records (`CVEv5_*.json`) in this directory are copied verbatim f
https://github.com/CVEProject/cve-schema/tree/v5.0.0/schema/v5.0/docs

They should be kept up to date with the schema versions in `cvelib/schemas/`.

The `container-*` files are standalone CNA or ADP (the two differ only in the set of required attributes) containers
that were extracted from their respective full CVE record files:

```shell
jq .containers.cna CVEv5_advanced-example.json > container-advanced-example.json
jq .containers.cna CVEv5_basic-example.json > container-basic-example.json
```
Loading

0 comments on commit 1ec0c22

Please sign in to comment.