Skip to content

Commit

Permalink
Fix usage of X509IdentityToken
Browse files Browse the repository at this point in the history
- sign token with algorithm from policy uri
- verify signature in server!
  • Loading branch information
cziebuhr authored and oroulet committed Dec 4, 2024
1 parent 9f73229 commit 3c6317b
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 47 deletions.
21 changes: 7 additions & 14 deletions asyncua/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ async def set_security_string(self, string: str) -> None:
Set SecureConnection mode.
:param string: Mode format ``Policy,Mode,certificate,private_key[,server_certificate]``
where:
- ``Policy`` is ``Basic128Rsa15``, ``Basic256`` or ``Basic256Sha256``
- ``Policy`` is ``Basic256Sha256``, ``Aes128Sha256RsaOaep`` or ``Aes256Sha256RsaPss``
- ``Mode`` is ``Sign`` or ``SignAndEncrypt``
- ``certificate`` and ``server_private_key`` are paths to ``.pem`` or ``.der`` files
- ``certificate`` and ``server_certificate`` are paths to ``.pem`` or ``.der`` files
- ``private_key`` may be a path to a ``.pem`` or ``.der`` file or a conjunction of ``path``::``password`` where
``password`` is the private key password.
Call this before connect()
Expand Down Expand Up @@ -669,18 +669,11 @@ def _add_certificate_auth(self, params, certificate, challenge):
params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
# specs part 4, 5.6.3.1: the data to sign is created by appending
# the last serverNonce to the serverCertificate
params.UserTokenSignature = ua.SignatureData()
# use signature algorithm that was used for certificate generation
if certificate.signature_hash_algorithm.name == "sha256":
params.UserIdentityToken.PolicyId = self.server_policy(ua.UserTokenType.Certificate).PolicyId
sig = uacrypto.sign_sha256(self.user_private_key, challenge)
params.UserTokenSignature.Algorithm = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"
params.UserTokenSignature.Signature = sig
else:
params.UserIdentityToken.PolicyId = self.server_policy(ua.UserTokenType.Certificate).PolicyId
sig = uacrypto.sign_sha1(self.user_private_key, challenge)
params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
params.UserTokenSignature.Signature = sig
policy = self.server_policy(ua.UserTokenType.Certificate)
sig, alg = security_policies.sign_asymmetric(self.user_private_key, challenge, policy.SecurityPolicyUri)
params.UserIdentityToken.PolicyId = policy.PolicyId
params.UserTokenSignature.Algorithm = alg
params.UserTokenSignature.Signature = sig

def _add_user_auth(self, params, username: str, password: str):
params.UserIdentityToken = ua.UserNameIdentityToken()
Expand Down
40 changes: 40 additions & 0 deletions asyncua/crypto/security_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,10 @@ class SecurityPolicyAes128Sha256RsaOaep(SecurityPolicy):
def encrypt_asymmetric(pubkey, data):
return uacrypto.encrypt_rsa_oaep(pubkey, data)

@staticmethod
def sign_asymmetric(privkey, data):
return uacrypto.sign_sha256(privkey, data)

def __init__(self, peer_cert, host_cert, host_privkey, mode, permission_ruleset=None):
if isinstance(peer_cert, bytes):
peer_cert = uacrypto.x509_from_der(peer_cert)
Expand Down Expand Up @@ -638,6 +642,10 @@ class SecurityPolicyAes256Sha256RsaPss(SecurityPolicy):
def encrypt_asymmetric(pubkey, data):
return uacrypto.encrypt_rsa_oaep_sha256(pubkey, data)

@staticmethod
def sign_asymmetric(privkey, data):
return uacrypto.sign_pss_sha256(privkey, data)

def __init__(self, peer_cert, host_cert, host_privkey, mode, permission_ruleset=None):
if isinstance(peer_cert, bytes):
peer_cert = uacrypto.x509_from_der(peer_cert)
Expand Down Expand Up @@ -718,6 +726,10 @@ class SecurityPolicyBasic128Rsa15(SecurityPolicy):
def encrypt_asymmetric(pubkey, data):
return uacrypto.encrypt_rsa15(pubkey, data)

@staticmethod
def sign_asymmetric(privkey, data):
return uacrypto.sign_sha1(privkey, data)

def __init__(self, peer_cert, host_cert, host_privkey, mode, permission_ruleset=None):
_logger.warning("DEPRECATED! Do not use SecurityPolicyBasic128Rsa15 anymore!")

Expand Down Expand Up @@ -798,6 +810,10 @@ class SecurityPolicyBasic256(SecurityPolicy):
def encrypt_asymmetric(pubkey, data):
return uacrypto.encrypt_rsa_oaep(pubkey, data)

@staticmethod
def sign_asymmetric(privkey, data):
return uacrypto.sign_sha1(privkey, data)

def __init__(self, peer_cert, host_cert, host_privkey, mode, permission_ruleset=None):
_logger.warning("DEPRECATED! Do not use SecurityPolicyBasic256 anymore!")

Expand Down Expand Up @@ -878,6 +894,10 @@ class SecurityPolicyBasic256Sha256(SecurityPolicy):
def encrypt_asymmetric(pubkey, data):
return uacrypto.encrypt_rsa_oaep(pubkey, data)

@staticmethod
def sign_asymmetric(privkey, data):
return uacrypto.sign_sha256(privkey, data)

def __init__(self, peer_cert, host_cert, host_privkey, mode, permission_ruleset=None):
if isinstance(peer_cert, bytes):
peer_cert = uacrypto.x509_from_der(peer_cert)
Expand Down Expand Up @@ -961,6 +981,26 @@ def create(self, peer_certificate):
)


def sign_asymmetric(privkey, data, policy_uri):
"""
Sign data with privkey using an asymmetric algorithm.
The algorithm is selected by policy_uri.
Returns a tuple (signature, algorithm_uri)
"""
for cls in [
SecurityPolicyBasic256Sha256,
SecurityPolicyBasic256,
SecurityPolicyBasic128Rsa15,
SecurityPolicyAes128Sha256RsaOaep,
SecurityPolicyAes256Sha256RsaPss,
]:
if policy_uri == cls.URI:
return (cls.sign_asymmetric(privkey, data), cls.AsymmetricSignatureURI)
if not policy_uri or policy_uri == SecurityPolicyNone.URI:
return data, ""
raise UaError(f"Unsupported security policy `{policy_uri}`")


# policy, mode, security_level
SECURITY_POLICY_TYPE_MAP = {
SecurityPolicyType.NoSecurity: [SecurityPolicyNone, MessageSecurityMode.None_, 0],
Expand Down
31 changes: 31 additions & 0 deletions asyncua/server/internal_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,34 @@ def decrypt_user_token(self, isession, token):
password = password.decode("utf-8")

return user_name, password

def verify_x509_token(self, isession, token, signature):
"""
verify certificate signature
"""
cert = uacrypto.x509_from_der(token.CertificateData)
alg = signature.Algorithm
sig = signature.Signature

# TODO check if algorithm is allowed, throw BadSecurityPolicyRejected if not

challenge = b""
if self.certificate is not None:
challenge += uacrypto.der_from_x509(self.certificate)
if isession.nonce is not None:
challenge += isession.nonce

if not (alg and sig):
raise ValueError("No signature")

if alg == "http://www.w3.org/2000/09/xmldsig#rsa-sha1":
uacrypto.verify_sha1(cert, challenge, sig)
elif alg == "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256":
uacrypto.verify_sha256(cert, challenge, sig)
elif alg == "http://opcfoundation.org/UA/security/rsa-pss-sha2-256":
uacrypto.verify_pss_sha256(cert, challenge, sig)
else:
self.logger.warning("Unknown certificate signature algorithm %s", alg)
raise ValueError("Unknown algorithm")

return token.CertificateData
3 changes: 1 addition & 2 deletions asyncua/server/internal_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ def activate_session(self, params, peer_certificate):
if isinstance(id_token, ua.UserNameIdentityToken):
username, password = self.iserver.decrypt_user_token(self, id_token)
elif isinstance(id_token, ua.X509IdentityToken):
# TODO implement verify_x509_token
peer_certificate = id_token.CertificateData
peer_certificate = self.iserver.verify_x509_token(self, id_token, params.UserTokenSignature)
username, password = None, None
else:
username, password = None, None
Expand Down
36 changes: 23 additions & 13 deletions asyncua/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import asyncio
import logging
import math
from cryptography import x509
from datetime import timedelta, datetime
import socket
from urllib.parse import urlparse
Expand Down Expand Up @@ -113,7 +112,6 @@ def __init__(self, iserver: InternalServer = None, user_manager=None):
]
# allow all certificates by default
self._permission_ruleset = SimpleRoleRuleset()
self.certificate: Optional[x509.Certificate] = None
# Use acceptable limits
buffer_sz = 65535
max_msg_sz = 100 * 1024 * 1024 # 100mb
Expand Down Expand Up @@ -233,7 +231,7 @@ async def load_certificate(self, path_or_content: Union[str, bytes, Path], forma
"""
load server certificate from file, either pem or der
"""
self.certificate = await uacrypto.load_certificate(path_or_content, format)
self.iserver.certificate = await uacrypto.load_certificate(path_or_content, format)

async def load_private_key(self, path_or_content: Union[str, Path, bytes], password=None, format=None):
self.iserver.private_key = await uacrypto.load_private_key(path_or_content, password, format)
Expand Down Expand Up @@ -385,7 +383,7 @@ async def _setup_server_nodes(self):
for policy_type in self._security_policy:
policy, mode, level = security_policies.SECURITY_POLICY_TYPE_MAP[policy_type]
if policy is not security_policies.SecurityPolicyNone and not (
self.certificate and self.iserver.private_key
self.iserver.certificate and self.iserver.private_key
):
no_cert = True
continue
Expand All @@ -394,7 +392,7 @@ async def _setup_server_nodes(self):
security_policies.SecurityPolicyFactory(
policy,
mode,
self.certificate,
self.iserver.certificate,
self.iserver.private_key,
permission_ruleset=self._permission_ruleset,
)
Expand All @@ -417,9 +415,21 @@ def _set_endpoints(self, policy, mode, level):
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = "certificate"
idtoken.TokenType = ua.UserTokenType.Certificate
idtoken.SecurityPolicyUri = policy.URI
# TODO request signing if mode == ua.MessageSecurityMode.None_ (also need to verify signature then)
idtokens.append(idtoken)
# always request signing
if mode == ua.MessageSecurityMode.None_:
# find first policy with signing
for token_policy_type in self._security_policy:
token_policy, token_mode, _ = security_policies.SECURITY_POLICY_TYPE_MAP[token_policy_type]
if token_mode == ua.MessageSecurityMode.None_:
continue
idtoken.SecurityPolicyUri = token_policy.URI
idtokens.append(idtoken)
break
else:
_logger.warning("No signing policy available, user certificate cannot get verified")
else:
idtoken.SecurityPolicyUri = policy.URI
idtokens.append(idtoken)

if ua.UserNameIdentityToken in tokens:
idtoken = ua.UserTokenPolicy()
Expand All @@ -432,7 +442,7 @@ def _set_endpoints(self, policy, mode, level):
# use same policy for encryption
idtoken.SecurityPolicyUri = policy.URI
# try to avoid plaintext password, find first policy with encryption
elif self.certificate and self.iserver.private_key:
elif self.iserver.certificate and self.iserver.private_key:
for token_policy_type in self._security_policy:
token_policy, token_mode, _ = security_policies.SECURITY_POLICY_TYPE_MAP[token_policy_type]
if token_mode != ua.MessageSecurityMode.SignAndEncrypt:
Expand All @@ -457,8 +467,8 @@ def _set_endpoints(self, policy, mode, level):
edp = ua.EndpointDescription()
edp.EndpointUrl = self.endpoint.geturl()
edp.Server = appdesc
if self.certificate:
edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
if self.iserver.certificate:
edp.ServerCertificate = uacrypto.der_from_x509(self.iserver.certificate)
edp.SecurityMode = mode
edp.SecurityPolicyUri = policy.URI
edp.UserIdentityTokens = idtokens
Expand All @@ -473,9 +483,9 @@ async def start(self):
"""
Start to listen on network
"""
if self.certificate is not None:
if self.iserver.certificate is not None:
# Log warnings about the certificate
uacrypto.check_certificate(self.certificate, self._application_uri, socket.gethostname())
uacrypto.check_certificate(self.iserver.certificate, self._application_uri, socket.gethostname())
await self._setup_server_nodes()
await self.iserver.start()
try:
Expand Down
Loading

0 comments on commit 3c6317b

Please sign in to comment.