Skip to content

Commit

Permalink
Merge pull request #192 from kyrofa/feature/api-clean-key
Browse files Browse the repository at this point in the history
api: reorganize key API
  • Loading branch information
kyrofa authored Apr 8, 2020
2 parents 98d856a + 5b05fce commit 1b41a2a
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 143 deletions.
130 changes: 4 additions & 126 deletions sros2/sros2/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,12 @@
# limitations under the License.

from collections import namedtuple
import errno
import os
import sys

from cryptography import x509
from cryptography.hazmat.backends import default_backend as cryptography_backend
from cryptography.hazmat.primitives import serialization
from sros2.policy import load_policy

from rclpy.exceptions import InvalidNamespaceException
from rclpy.validate_namespace import validate_namespace

from sros2.policy import (
get_policy_default,
load_policy,
)

from . import _keystore, _permission, _policy, _utilities
from . import _key, _keystore, _permission, _policy

HIDDEN_NODE_PREFIX = '_'

Expand Down Expand Up @@ -77,99 +66,6 @@ def get_client_info(node, node_name):
return get_topics(node_name, node.get_client_names_and_types_by_node)


def is_key_name_valid(name):
# TODO(ivanpauno): Use validate_security_context_name when it's propagated to `rclpy`.
# This is not to bad for the moment.
# Related with https://github.com/ros2/rclpy/issues/528.
try:
return validate_namespace(name)
except InvalidNamespaceException as e:
print(e)
return False


def create_key(keystore_path, identity):
if not _keystore.is_valid_keystore(keystore_path):
print("'%s' is not a valid keystore " % keystore_path)
return False
if not is_key_name_valid(identity):
return False
print("creating key for identity: '%s'" % identity)

relative_path = os.path.normpath(identity.lstrip('/'))
key_dir = os.path.join(_keystore.get_keystore_context_dir(keystore_path), relative_path)
os.makedirs(key_dir, exist_ok=True)

# symlink the CA cert in there
public_certs = ['identity_ca.cert.pem', 'permissions_ca.cert.pem']
for public_cert in public_certs:
dst = os.path.join(key_dir, public_cert)
keystore_ca_cert_path = os.path.join(
_keystore.get_keystore_public_dir(keystore_path), public_cert)
relativepath = os.path.relpath(keystore_ca_cert_path, key_dir)
_utilities.create_symlink(src=relativepath, dst=dst)

# symlink the governance file in there
keystore_governance_path = os.path.join(
_keystore.get_keystore_context_dir(keystore_path), 'governance.p7s')
dest_governance_path = os.path.join(key_dir, 'governance.p7s')
relativepath = os.path.relpath(keystore_governance_path, key_dir)
_utilities.create_symlink(src=relativepath, dst=dest_governance_path)

keystore_identity_ca_cert_path = os.path.join(
_keystore.get_keystore_public_dir(keystore_path), 'identity_ca.cert.pem')
keystore_identity_ca_key_path = os.path.join(
_keystore.get_keystore_private_dir(keystore_path), 'identity_ca.key.pem')

cert_path = os.path.join(key_dir, 'cert.pem')
key_path = os.path.join(key_dir, 'key.pem')
if not os.path.isfile(cert_path) or not os.path.isfile(key_path):
print('creating cert and key')
_create_key_and_cert(
keystore_identity_ca_cert_path,
keystore_identity_ca_key_path,
identity,
cert_path,
key_path
)
else:
print('found cert and key; not creating new ones!')

# create a wildcard permissions file for this node which can be overridden
# later using a policy if desired
policy_file_path = get_policy_default('policy.xml')
policy_element = _policy.get_policy('/', policy_file_path)
context_element = policy_element.find('contexts/context')
context_element.attrib['path'] = identity

permissions_path = os.path.join(key_dir, 'permissions.xml')
_permission.create_permission_file(permissions_path, _utilities.domain_id(), policy_element)

signed_permissions_path = os.path.join(key_dir, 'permissions.p7s')
keystore_permissions_ca_key_path = os.path.join(
_keystore.get_keystore_private_dir(keystore_path), 'permissions_ca.key.pem')
_utilities.create_smime_signed_file(
keystore_ca_cert_path,
keystore_permissions_ca_key_path,
permissions_path,
signed_permissions_path
)

return True


def list_keys(keystore_path):
contexts_path = _keystore.get_keystore_context_dir(keystore_path)
if not os.path.isdir(keystore_path):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), keystore_path)
if not os.path.isdir(contexts_path):
return True
for name in os.listdir(contexts_path):
if os.path.isdir(os.path.join(contexts_path, name)):
print(name)
return True


def distribute_key(source_keystore_path, taget_keystore_path):
raise NotImplementedError()

Expand All @@ -193,35 +89,17 @@ def generate_artifacts(keystore_path=None, identity_names=[], policy_files=[]):

# create keys for all provided identities
for identity in identity_names:
if not create_key(keystore_path, identity):
if not _key.create_key(keystore_path, identity):
return False
for policy_file in policy_files:
policy_tree = load_policy(policy_file)
contexts_element = policy_tree.find('contexts')
for context in contexts_element:
identity_name = context.get('path')
if identity_name not in identity_names:
if not create_key(keystore_path, identity_name):
if not _key.create_key(keystore_path, identity_name):
return False
policy_element = _policy.get_policy_from_tree(identity_name, policy_tree)
_permission.create_permissions_from_policy_element(
keystore_path, identity_name, policy_element)
return True


def _create_key_and_cert(
keystore_ca_cert_path, keystore_ca_key_path, identity, cert_path, key_path):
# Load the CA cert and key from disk
with open(keystore_ca_cert_path, 'rb') as f:
ca_cert = x509.load_pem_x509_certificate(f.read(), cryptography_backend())

with open(keystore_ca_key_path, 'rb') as f:
ca_key = serialization.load_pem_private_key(f.read(), None, cryptography_backend())

cert, private_key = _utilities.build_key_and_cert(
x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, identity)]),
issuer_name=ca_cert.subject,
ca_key=ca_key)

_utilities.write_key(private_key, key_path)
_utilities.write_cert(cert, cert_path)
139 changes: 139 additions & 0 deletions sros2/sros2/api/_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright 2019-2020 Canonical Ltd
# Copyright 2016-2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import errno
import os

from cryptography import x509
from cryptography.hazmat.backends import default_backend as cryptography_backend
from cryptography.hazmat.primitives import serialization

from rclpy.exceptions import InvalidNamespaceException
from rclpy.validate_namespace import validate_namespace

from sros2.policy import get_policy_default

from . import _keystore, _permission, _policy, _utilities


def create_key(keystore_path, identity):
if not _keystore.is_valid_keystore(keystore_path):
print("'%s' is not a valid keystore " % keystore_path)
return False
if not _is_key_name_valid(identity):
return False
print("creating key for identity: '%s'" % identity)

relative_path = os.path.normpath(identity.lstrip('/'))
key_dir = os.path.join(_keystore.get_keystore_context_dir(keystore_path), relative_path)
os.makedirs(key_dir, exist_ok=True)

# symlink the CA cert in there
public_certs = ['identity_ca.cert.pem', 'permissions_ca.cert.pem']
for public_cert in public_certs:
dst = os.path.join(key_dir, public_cert)
keystore_ca_cert_path = os.path.join(
_keystore.get_keystore_public_dir(keystore_path), public_cert)
relativepath = os.path.relpath(keystore_ca_cert_path, key_dir)
_utilities.create_symlink(src=relativepath, dst=dst)

# symlink the governance file in there
keystore_governance_path = os.path.join(
_keystore.get_keystore_context_dir(keystore_path), 'governance.p7s')
dest_governance_path = os.path.join(key_dir, 'governance.p7s')
relativepath = os.path.relpath(keystore_governance_path, key_dir)
_utilities.create_symlink(src=relativepath, dst=dest_governance_path)

keystore_identity_ca_cert_path = os.path.join(
_keystore.get_keystore_public_dir(keystore_path), 'identity_ca.cert.pem')
keystore_identity_ca_key_path = os.path.join(
_keystore.get_keystore_private_dir(keystore_path), 'identity_ca.key.pem')

cert_path = os.path.join(key_dir, 'cert.pem')
key_path = os.path.join(key_dir, 'key.pem')
if not os.path.isfile(cert_path) or not os.path.isfile(key_path):
print('creating cert and key')
_create_key_and_cert(
keystore_identity_ca_cert_path,
keystore_identity_ca_key_path,
identity,
cert_path,
key_path
)
else:
print('found cert and key; not creating new ones!')

# create a wildcard permissions file for this node which can be overridden
# later using a policy if desired
policy_file_path = get_policy_default('policy.xml')
policy_element = _policy.get_policy('/', policy_file_path)
context_element = policy_element.find('contexts/context')
context_element.attrib['path'] = identity

permissions_path = os.path.join(key_dir, 'permissions.xml')
_permission.create_permission_file(permissions_path, _utilities.domain_id(), policy_element)

signed_permissions_path = os.path.join(key_dir, 'permissions.p7s')
keystore_permissions_ca_key_path = os.path.join(
_keystore.get_keystore_private_dir(keystore_path), 'permissions_ca.key.pem')
_utilities.create_smime_signed_file(
keystore_ca_cert_path,
keystore_permissions_ca_key_path,
permissions_path,
signed_permissions_path
)

return True


def list_keys(keystore_path):
contexts_path = _keystore.get_keystore_context_dir(keystore_path)
if not os.path.isdir(keystore_path):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), keystore_path)
if not os.path.isdir(contexts_path):
return True
for name in os.listdir(contexts_path):
if os.path.isdir(os.path.join(contexts_path, name)):
print(name)
return True


def _is_key_name_valid(name):
# TODO(ivanpauno): Use validate_security_context_name when it's propagated to `rclpy`.
# This is not to bad for the moment.
# Related with https://github.com/ros2/rclpy/issues/528.
try:
return validate_namespace(name)
except InvalidNamespaceException as e:
print(e)
return False


def _create_key_and_cert(
keystore_ca_cert_path, keystore_ca_key_path, identity, cert_path, key_path):
# Load the CA cert and key from disk
with open(keystore_ca_cert_path, 'rb') as f:
ca_cert = x509.load_pem_x509_certificate(f.read(), cryptography_backend())

with open(keystore_ca_key_path, 'rb') as f:
ca_key = serialization.load_pem_private_key(f.read(), None, cryptography_backend())

cert, private_key = _utilities.build_key_and_cert(
x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, identity)]),
issuer_name=ca_cert.subject,
ca_key=ca_key)

_utilities.write_key(private_key, key_path)
_utilities.write_cert(cert, cert_path)
4 changes: 2 additions & 2 deletions sros2/sros2/verb/create_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
def DirectoriesCompleter():
return None

from sros2.api import create_key
from sros2.api import _key
from sros2.verb import VerbExtension


Expand All @@ -31,5 +31,5 @@ def add_arguments(self, parser, cli_name):
parser.add_argument('NAME', help='key name, aka ROS security context name')

def main(self, *, args):
success = create_key(args.ROOT, args.NAME)
success = _key.create_key(args.ROOT, args.NAME)
return 0 if success else 1
4 changes: 2 additions & 2 deletions sros2/sros2/verb/list_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def DirectoriesCompleter():

import sys

from sros2.api import list_keys
from sros2.api import _key
from sros2.verb import VerbExtension


Expand All @@ -33,7 +33,7 @@ def add_arguments(self, parser, cli_name):

def main(self, *, args):
try:
if list_keys(args.ROOT):
if _key.list_keys(args.ROOT):
return 0
except FileNotFoundError as e:
print('No such file or directory: {!r}'.format(e.filename), file=sys.stderr)
Expand Down
22 changes: 11 additions & 11 deletions sros2/test/sros2/test_api.py → sros2/test/sros2/api/test_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from sros2.api import is_key_name_valid
from sros2.api import _key


def test_is_key_name_valid():
# Valid cases
assert is_key_name_valid('/foo')
assert is_key_name_valid('/foo/bar')
assert is_key_name_valid('/foo/bar123/_/baz_')
assert _key._is_key_name_valid('/foo')
assert _key._is_key_name_valid('/foo/bar')
assert _key._is_key_name_valid('/foo/bar123/_/baz_')

# Invalid cases
assert not is_key_name_valid('')
assert not is_key_name_valid(' ')
assert not is_key_name_valid('//')
assert not is_key_name_valid('foo')
assert not is_key_name_valid('foo/bar')
assert not is_key_name_valid('/42foo')
assert not is_key_name_valid('/foo/42bar')
assert not _key._is_key_name_valid('')
assert not _key._is_key_name_valid(' ')
assert not _key._is_key_name_valid('//')
assert not _key._is_key_name_valid('foo')
assert not _key._is_key_name_valid('foo/bar')
assert not _key._is_key_name_valid('/42foo')
assert not _key._is_key_name_valid('/foo/42bar')
4 changes: 2 additions & 2 deletions sros2/test/sros2/commands/security/verbs/test_list_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import tempfile

from ros2cli import cli
from sros2.api import _keystore, create_key
from sros2.api import _key, _keystore


def test_list_keys(capsys):
Expand All @@ -26,7 +26,7 @@ def test_list_keys(capsys):
assert _keystore.create_keystore(keystore_dir)

# Now using that keystore, create a keypair
assert create_key(keystore_dir, '/test_context')
assert _key.create_key(keystore_dir, '/test_context')

# Now verify that the key we just created is included in the list
assert cli.main(argv=['security', 'list_keys', keystore_dir]) == 0
Expand Down

0 comments on commit 1b41a2a

Please sign in to comment.