Skip to content

Commit

Permalink
first python API proposition
Browse files Browse the repository at this point in the history
first round-trip tests

feat: made asn1 structures readable

refacto: adapted existing functions accordingly

feat/pkcs12: added symmetric_decrypt

feat: deserialize 3 possible encodings

feat: handling AES-128 & AES-256 CBC

feat: raise error when no recipient is found

feat/pkcs7: added decanonicalize function

feat/asn1: added decode_der_data

feat/pkcs7: added smime_enveloped_decode

tests are the round-trip (encrypt & decrypt)
  • Loading branch information
nitneuqr committed Sep 8, 2024
1 parent 914b1d2 commit e1c4620
Show file tree
Hide file tree
Showing 8 changed files with 497 additions and 29 deletions.
5 changes: 5 additions & 0 deletions src/cryptography/hazmat/bindings/_rust/pkcs7.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ def sign_and_serialize(
encoding: serialization.Encoding,
options: typing.Iterable[pkcs7.PKCS7Options],
) -> bytes: ...
def deserialize_and_decrypt(
decryptor: pkcs7.PKCS7EnvelopeDecryptor,
encoding: serialization.Encoding,
options: typing.Iterable[pkcs7.PKCS7Options],
) -> bytes: ...
def load_pem_pkcs7_certificates(
data: bytes,
) -> list[x509.Certificate]: ...
Expand Down
112 changes: 112 additions & 0 deletions src/cryptography/hazmat/primitives/serialization/pkcs7.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,111 @@ def encrypt(
return rust_pkcs7.encrypt_and_serialize(self, encoding, options)


class PKCS7EnvelopeDecryptor:
def __init__(
self,
*,
_data: bytes | None = None,
_recipient: x509.Certificate | None = None,
_private_key: rsa.RSAPrivateKey | None = None,
):
from cryptography.hazmat.backends.openssl.backend import (
backend as ossl,
)

if not ossl.rsa_encryption_supported(padding=padding.PKCS1v15()):
raise UnsupportedAlgorithm(
"RSA with PKCS1 v1.5 padding is not supported by this version"
" of OpenSSL.",
_Reasons.UNSUPPORTED_PADDING,
)
self._data = _data
self._recipient = _recipient
self._private_key = _private_key

def set_data(self, data: bytes) -> PKCS7EnvelopeDecryptor:
_check_byteslike("data", data)
if self._data is not None:
raise ValueError("data may only be set once")

return PKCS7EnvelopeDecryptor(
_data=data,
_recipient=self._recipient,
_private_key=self._private_key,
)

def set_recipient(
self, certificate: x509.Certificate
) -> PKCS7EnvelopeDecryptor:
if self._recipient is not None:
raise ValueError("recipient may only be set once")

if not isinstance(certificate, x509.Certificate):
raise TypeError("certificate must be a x509.Certificate")

if not isinstance(certificate.public_key(), rsa.RSAPublicKey):
raise TypeError("Only RSA keys are supported at this time.")

return PKCS7EnvelopeDecryptor(
_data=self._data,
_recipient=certificate,
_private_key=self._private_key,
)

def set_private_key(
self, private_key: rsa.RSAPrivateKey
) -> PKCS7EnvelopeDecryptor:
if self._private_key is not None:
raise ValueError("private key may only be set once")

return PKCS7EnvelopeDecryptor(
_data=self._data,
_recipient=self._recipient,
_private_key=private_key,
)

def decrypt(
self,
encoding: serialization.Encoding,
options: typing.Iterable[PKCS7Options],
) -> bytes:
if self._data is None:
raise ValueError("You must add data to decrypt")
if self._recipient is None:
raise ValueError("You must add a recipient to decrypt")
if self._private_key is None:
raise ValueError("You must add a private key to decrypt")
options = list(options)
if not all(isinstance(x, PKCS7Options) for x in options):
raise ValueError("options must be from the PKCS7Options enum")
if encoding not in (
serialization.Encoding.PEM,
serialization.Encoding.DER,
serialization.Encoding.SMIME,
):
raise ValueError(
"Must be PEM, DER, or SMIME from the Encoding enum"
)

# Only allow options that make sense for encryption
if any(
opt not in [PKCS7Options.Text, PKCS7Options.Binary]
for opt in options
):
raise ValueError(
"Only the following options are supported for encryption: "
"Text, Binary"
)
elif PKCS7Options.Text in options and PKCS7Options.Binary in options:
# OpenSSL accepts both options at the same time, but ignores Text.
# We fail defensively to avoid unexpected outputs.
raise ValueError(
"Cannot use Binary and Text options at the same time"
)

return rust_pkcs7.deserialize_and_decrypt(self, encoding, options)


def _smime_signed_encode(
data: bytes, signature: bytes, micalg: str, text_mode: bool
) -> bytes:
Expand Down Expand Up @@ -328,6 +433,13 @@ def _smime_enveloped_encode(data: bytes) -> bytes:
return m.as_bytes(policy=m.policy.clone(linesep="\n", max_line_length=0))


def _smime_enveloped_decode(data: bytes) -> bytes:
m = email.message_from_bytes(data)
if m.get_content_type() != "application/pkcs7-mime":
raise ValueError("Not an S/MIME enveloped message")
return bytes(m.get_payload(decode=True))


class OpenSSLMimePart(email.message.MIMEPart):
# A MIMEPart subclass that replicates OpenSSL's behavior of not including
# a newline if there are no headers.
Expand Down
51 changes: 35 additions & 16 deletions src/rust/cryptography-x509/src/pkcs7.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ pub const PKCS7_SIGNED_DATA_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840,
pub const PKCS7_ENVELOPED_DATA_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 7, 3);
pub const PKCS7_ENCRYPTED_DATA_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 7, 6);

#[derive(asn1::Asn1Write)]
#[derive(asn1::Asn1Write, asn1::Asn1Read)]
pub struct ContentInfo<'a> {
pub _content_type: asn1::DefinedByMarker<asn1::ObjectIdentifier>,

#[defined_by(_content_type)]
pub content: Content<'a>,
}

#[derive(asn1::Asn1DefinedByWrite)]
#[derive(asn1::Asn1DefinedByWrite, asn1::Asn1DefinedByRead)]
pub enum Content<'a> {
#[defined_by(PKCS7_ENVELOPED_DATA_OID)]
EnvelopedData(asn1::Explicit<Box<EnvelopedData<'a>>, 0>),
Expand All @@ -29,22 +29,38 @@ pub enum Content<'a> {
EncryptedData(asn1::Explicit<EncryptedData<'a>, 0>),
}

#[derive(asn1::Asn1Write)]
#[derive(asn1::Asn1Write, asn1::Asn1Read)]
pub struct SignedData<'a> {
pub version: u8,
pub digest_algorithms: asn1::SetOfWriter<'a, common::AlgorithmIdentifier<'a>>,
pub digest_algorithms: common::Asn1ReadableOrWritable<
asn1::SetOf<'a, common::AlgorithmIdentifier<'a>>,
asn1::SetOfWriter<'a, common::AlgorithmIdentifier<'a>>,
>,
pub content_info: ContentInfo<'a>,
#[implicit(0)]
pub certificates: Option<asn1::SetOfWriter<'a, &'a certificate::Certificate<'a>>>,
pub certificates: Option<
common::Asn1ReadableOrWritable<
asn1::SetOf<'a, certificate::Certificate<'a>>,
asn1::SetOfWriter<'a, &'a certificate::Certificate<'a>>,
>,
>,

// We don't ever supply any of these, so for now, don't fill out the fields.
#[implicit(1)]
pub crls: Option<asn1::SetOfWriter<'a, asn1::Sequence<'a>>>,

pub signer_infos: asn1::SetOfWriter<'a, SignerInfo<'a>>,
pub crls: Option<
common::Asn1ReadableOrWritable<
asn1::SetOf<'a, asn1::Sequence<'a>>,
asn1::SetOfWriter<'a, asn1::Sequence<'a>>,
>,
>,

pub signer_infos: common::Asn1ReadableOrWritable<
asn1::SetOf<'a, SignerInfo<'a>>,
asn1::SetOfWriter<'a, SignerInfo<'a>>,
>,
}

#[derive(asn1::Asn1Write)]
#[derive(asn1::Asn1Write, asn1::Asn1Read)]
pub struct SignerInfo<'a> {
pub version: u8,
pub issuer_and_serial_number: IssuerAndSerialNumber<'a>,
Expand All @@ -59,42 +75,45 @@ pub struct SignerInfo<'a> {
pub unauthenticated_attributes: Option<csr::Attributes<'a>>,
}

#[derive(asn1::Asn1Write)]
#[derive(asn1::Asn1Write, asn1::Asn1Read)]
pub struct EnvelopedData<'a> {
pub version: u8,
pub recipient_infos: asn1::SetOfWriter<'a, RecipientInfo<'a>>,
pub recipient_infos: common::Asn1ReadableOrWritable<
asn1::SetOf<'a, RecipientInfo<'a>>,
asn1::SetOfWriter<'a, RecipientInfo<'a>>,
>,
pub encrypted_content_info: EncryptedContentInfo<'a>,
}

#[derive(asn1::Asn1Write)]
#[derive(asn1::Asn1Write, asn1::Asn1Read)]
pub struct RecipientInfo<'a> {
pub version: u8,
pub issuer_and_serial_number: IssuerAndSerialNumber<'a>,
pub key_encryption_algorithm: common::AlgorithmIdentifier<'a>,
pub encrypted_key: &'a [u8],
}

#[derive(asn1::Asn1Write)]
#[derive(asn1::Asn1Write, asn1::Asn1Read)]
pub struct IssuerAndSerialNumber<'a> {
pub issuer: name::Name<'a>,
pub serial_number: asn1::BigInt<'a>,
}

#[derive(asn1::Asn1Write)]
#[derive(asn1::Asn1Write, asn1::Asn1Read)]
pub struct EncryptedData<'a> {
pub version: u8,
pub encrypted_content_info: EncryptedContentInfo<'a>,
}

#[derive(asn1::Asn1Write)]
#[derive(asn1::Asn1Write, asn1::Asn1Read)]
pub struct EncryptedContentInfo<'a> {
pub content_type: asn1::ObjectIdentifier,
pub content_encryption_algorithm: common::AlgorithmIdentifier<'a>,
#[implicit(0)]
pub encrypted_content: Option<&'a [u8]>,
}

#[derive(asn1::Asn1Write)]
#[derive(asn1::Asn1Write, asn1::Asn1Read)]
pub struct DigestInfo<'a> {
pub algorithm: common::AlgorithmIdentifier<'a>,
pub digest: &'a [u8],
Expand Down
25 changes: 25 additions & 0 deletions src/rust/src/asn1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@ pub(crate) fn encode_der_data<'p>(
}
}

pub(crate) fn decode_der_data<'p>(
py: pyo3::Python<'p>,
pem_tag: String,
data: Vec<u8>,
encoding: &pyo3::Bound<'p, pyo3::PyAny>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
if encoding.is(&types::ENCODING_DER.get(py)?) {
Ok(pyo3::types::PyBytes::new_bound(py, &data))
} else if encoding.is(&types::ENCODING_PEM.get(py)?) {
let pem_str = std::str::from_utf8(&data)
.map_err(|_| pyo3::exceptions::PyValueError::new_err("Invalid PEM data"))?;
let pem = pem::parse(pem_str)
.map_err(|_| pyo3::exceptions::PyValueError::new_err("Failed to parse PEM data"))?;
if pem.tag() != pem_tag {
return Err(pyo3::exceptions::PyValueError::new_err("PEM tag mismatch").into());
}
Ok(pyo3::types::PyBytes::new_bound(py, pem.contents()))
} else {
Err(
pyo3::exceptions::PyTypeError::new_err("encoding must be Encoding.DER or Encoding.PEM")
.into(),
)
}
}

#[pyo3::pyfunction]
fn encode_dss_signature<'p>(
py: pyo3::Python<'p>,
Expand Down
36 changes: 35 additions & 1 deletion src/rust/src/pkcs12.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::backend::{ciphers, hashes, hmac, kdf, keys};
use crate::buf::CffiBuf;
use crate::error::{CryptographyError, CryptographyResult};
use crate::padding::PKCS7PaddingContext;
use crate::padding::{PKCS7PaddingContext, PKCS7UnpaddingContext};
use crate::x509::certificate::Certificate;
use crate::{types, x509};
use cryptography_x509::common::Utf8StoredBMPString;
Expand Down Expand Up @@ -107,6 +107,40 @@ pub(crate) fn symmetric_encrypt(
Ok(ciphertext)
}

pub(crate) fn symmetric_decrypt(
py: pyo3::Python<'_>,
algorithm: pyo3::Bound<'_, pyo3::PyAny>,
mode: pyo3::Bound<'_, pyo3::PyAny>,
data: &[u8],
) -> CryptographyResult<Vec<u8>> {
let block_size = algorithm
.getattr(pyo3::intern!(py, "block_size"))?
.extract()?;

let mut cipher =
ciphers::CipherContext::new(py, algorithm, mode, openssl::symm::Mode::Decrypt)?;

// Decrypt the data
let mut decrypted_data = vec![0; data.len() + (block_size / 8)];
let count = cipher.update_into(py, data, &mut decrypted_data)?;
let final_block = cipher.finalize(py)?;
assert!(final_block.as_bytes().is_empty());
decrypted_data.truncate(count);

// Unpad the data
let mut unpadder = PKCS7UnpaddingContext::new(block_size);
let unpadded_first_blocks = unpadder.update(py, CffiBuf::from_bytes(py, &decrypted_data))?;
let unpadded_last_block = unpadder.finalize(py)?;

let unpadded_data = [
unpadded_first_blocks.as_bytes(),
unpadded_last_block.as_bytes(),
]
.concat();

Ok(unpadded_data)
}

enum EncryptionAlgorithm {
PBESv1SHA1And3KeyTripleDESCBC,
PBESv2SHA256AndAES256CBC,
Expand Down
Loading

0 comments on commit e1c4620

Please sign in to comment.