Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mprpic committed Jun 23, 2023
1 parent 7cf225b commit feb27ea
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 1 deletion.
65 changes: 65 additions & 0 deletions cvelib/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,71 @@ def publish(
print_cve_record(response_data["created"] if created else response_data["updated"])


@cli.command()
@click.option(
"-j",
"--adp-json",
"adp_json_str",
type=click.STRING,
help="JSON body of the ADP container to publish.",
)
@click.option(
"-f",
"--adp-json-file",
type=click.File(),
help="File containing JSON body of ADP container to publish.",
)
@click.option("--raw", "print_raw", default=False, is_flag=True, help="Print response JSON.")
@click.pass_context
@handle_cve_api_error
def publish_adp(
ctx: click.Context,
cve_id: str,
adp_json_str: Optional[str],
adp_json_file: Optional[TextIO],
print_raw: bool,
) -> None:
"""Publish an ADP container.
"""
if adp_json_file is not None and adp_json_str is not None:
raise click.BadParameter(
"cannot use both `-f/--adp-json-file` and `-j/--adp-json` to provide ADP JSON data."
)
try:
if adp_json_str is not None:
cve_json = json.loads(adp_json_str)
elif adp_json_file is not None:
cve_json = json.load(adp_json_file)
else:
raise click.BadParameter(
"must provide ADP JSON record using one of: "
"`-f/--adp-json-file` or `-j/--adp-json`."
)
except json.JSONDecodeError as exc:
print_error(msg="ADP data is not valid JSON", details=str(exc))
return

if ctx.obj.interactive:
click.echo("You are about to publish an ADP record for ", nl=False)
click.secho(cve_id, bold=True, nl=False)
click.echo(" using the following input:\n\n", nl=False)
print_json_data(cve_json)
if not click.confirm("\n\nDo you want to continue?"):
click.echo("Exiting...")
sys.exit(0)
click.echo()

cve_api = ctx.obj.cve_api
response_data = cve_api.publish_adp(cve_id, cve_json)

if print_raw:
print_json_data(response_data)
else:
click.echo(" the following ADP:\n")
print_cve_record(response_data["updated"])


@cli.command()
@click.argument("cve_id", callback=validate_cve)
@click.option(
Expand Down
29 changes: 28 additions & 1 deletion cvelib/cve_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,27 @@ def _extract_cna_container(cve_json: dict) -> dict:
"""Check if we received a full v5 record and extract just the CNA container part from it.
A record "looks" like a full one if it has a dataType=CVE_RECORD attribute. Otherwise,
it is assumed to be the CNA container.
it is assumed to be a CNA container already so return it as is.
"""
if cve_json.get("dataType", "") == "CVE_RECORD":
return cve_json["containers"]["cna"]
return cve_json

@staticmethod
def _extract_adp_container(cve_json: dict) -> dict:
"""Check if we received a full v5 record and extract just the ADP container for our org.
A record "looks" like a full one if it has a dataType=CVE_RECORD attribute. Otherwise,
it is assumed to be an ADP container already so return it as is.
"""
if cve_json.get("dataType", "") == "CVE_RECORD":
if len(cve_json["containers"]["adp"]) > 1:
raise RuntimeError(
"Cannot extract ADP container if multiple are present in CVE record"
)
return cve_json["containers"]["adp"][0]
return cve_json

def _add_provider_metadata(self, cve_json: dict) -> dict:
"""Add the providerMetadata objects to a CNA container if one is not present.
Expand Down Expand Up @@ -184,6 +199,18 @@ def update_published(self, cve_id: str, cve_json: dict, validate: bool = True) -
response.raise_for_status()
return response.json()

def publish_adp(self, cve_id: str, cve_json: dict, validate: bool = True) -> dict:
"""Add or update an ADP container from a JSON object representing the ADP container data."""
cve_json = self._extract_adp_container(cve_json)
cve_json = self._add_provider_metadata(cve_json)
if validate:
CveRecord.validate(cve_json, CveRecord.Schemas.ADP)

cve_json = {"adpContainer": cve_json}
response = self._put(f"cve/{cve_id}/adp", json=cve_json)
response.raise_for_status()
return response.json()

def reject(self, cve_id: str, cve_json: dict, validate: bool = True) -> dict:
"""Reject a CVE from a JSON object representing the CNA container data."""
cve_json = self._extract_cna_container(cve_json)
Expand Down

0 comments on commit feb27ea

Please sign in to comment.