Skip to content

Commit

Permalink
feat(pgp): Use crypto.py during Egg and Collection verification
Browse files Browse the repository at this point in the history
* Card ID: CCT-131
* Card ID: RHEL-2480
* Card ID: RHEL-2482

Signed-off-by: mhorky <[email protected]>
  • Loading branch information
m-horky committed Jun 25, 2024
1 parent 5a75eca commit 7c46fcd
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 54 deletions.
71 changes: 33 additions & 38 deletions insights/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
import os
import logging
import tempfile
import shlex
import shutil
import sys
import atexit
from subprocess import Popen, PIPE
from requests import ConnectionError

from .. import package_info
from . import client
from . import crypto
from .constants import InsightsConstants as constants
from .config import InsightsConfig
from .auto_config import try_auto_configuration
Expand Down Expand Up @@ -268,7 +267,7 @@ def update(self):
else:
logger.debug("Egg update disabled")

def verify(self, egg_path, gpg_key=constants.pub_gpg_path):
def verify(self, egg_path):
"""
Verifies the GPG signature of the egg. The signature is assumed to
be in the same directory as the egg and named the same as the egg
Expand All @@ -283,46 +282,42 @@ def verify(self, egg_path, gpg_key=constants.pub_gpg_path):
if egg_path and not os.path.isfile(egg_path):
the_message = "Provided egg path %s does not exist, cannot verify." % (egg_path)
logger.debug(the_message)
return {'gpg': False,
'stderr': the_message,
'stdout': the_message,
'rc': 1,
'message': the_message}
if self.config.gpg and gpg_key and not os.path.isfile(gpg_key):
the_message = ("Running in GPG mode but cannot find "
"file %s to verify against." % (gpg_key))
logger.debug(the_message)
return {'gpg': False,
'stderr': the_message,
'stdout': the_message,
'rc': 1,
'message': the_message}
return {
'gpg': False,
'stderr': the_message,
'stdout': the_message,
'rc': 1,
'message': the_message,
}

# if we are running in no_gpg or not gpg mode then return true
if not self.config.gpg:
return {'gpg': True,
'stderr': None,
'stdout': None,
'rc': 0}
return {
'gpg': True,
'stderr': None,
'stdout': None,
'rc': 0,
}

# if a valid egg path and gpg were received do the verification
if egg_path and gpg_key:
cmd_template = '/usr/bin/gpg --verify --keyring %s %s %s'
cmd = cmd_template % (gpg_key, egg_path + '.asc', egg_path)
logger.debug(cmd)
process = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
stdout, stderr = process.communicate()
rc = process.returncode
logger.debug("GPG return code: %s" % rc)
return {'gpg': True if rc == 0 else False,
'stderr': stderr,
'stdout': stdout,
'rc': rc}
else:
return {'gpg': False,
'stderr': 'Must specify a valid core and gpg key.',
'stdout': 'Must specify a valid core and gpg key.',
'rc': 1}
if egg_path:
result = crypto.verify_gpg_signed_file(
file=egg_path, signature=egg_path + ".asc",
key=constants.pub_gpg_path,
)
return {
'gpg': result.ok,
'rc': result.return_code,
'stdout': result.stdout,
'stderr': result.stderr,
}

return {
'gpg': False,
'stderr': 'Must specify a valid core and gpg key.',
'stdout': 'Must specify a valid core and gpg key.',
'rc': 1,
}

def install(self, new_egg, new_egg_gpg_sig):
"""
Expand Down
24 changes: 8 additions & 16 deletions insights/client/collection_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
import json
import logging
import six
import shlex
import os
import requests
import yaml
import stat
from six.moves import configparser as ConfigParser

from subprocess import Popen, PIPE, STDOUT
from tempfile import NamedTemporaryFile
from . import crypto
from .constants import InsightsConstants as constants

APP_NAME = constants.app_name
Expand Down Expand Up @@ -139,22 +138,15 @@ def validate_gpg_sig(self, path, sig=None):
Validate the collection rules
"""
logger.debug("Verifying GPG signature of Insights configuration")

if sig is None:
sig = path + ".asc"
command = ("/usr/bin/gpg --no-default-keyring "
"--keyring " + constants.pub_gpg_path +
" --verify " + sig + " " + path)
if not six.PY3:
command = command.encode('utf-8', 'ignore')
args = shlex.split(command)
logger.debug("Executing: %s", args)
proc = Popen(
args, shell=False, stdout=PIPE, stderr=STDOUT, close_fds=True)
stdout, stderr = proc.communicate()
logger.debug("STDOUT: %s", stdout)
logger.debug("STDERR: %s", stderr)
logger.debug("Status: %s", proc.returncode)
if proc.returncode:

result = crypto.verify_gpg_signed_file(
file=path, signature=sig,
key=constants.pub_gpg_path,
)
if not result.ok:
logger.error("ERROR: Unable to validate GPG signature: %s", path)
return False
else:
Expand Down

0 comments on commit 7c46fcd

Please sign in to comment.