Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cloudinary/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ def __init__(self):

if not self.signature_algorithm:
self.signature_algorithm = utils.SIGNATURE_SHA1
if not self.signature_version:
self.signature_version = 2

def _config_from_parsed_url(self, parsed_url):
if not self._is_url_scheme_valid(parsed_url):
Expand Down Expand Up @@ -280,7 +282,7 @@ def __len__(self):
return len(self.public_id) if self.public_id is not None else 0

def validate(self):
return self.signature == self.get_expected_signature()
return utils.verify_api_response_signature(self.public_id, self.version, self.signature)

def get_prep_value(self):
if None in [self.public_id,
Expand All @@ -301,7 +303,7 @@ def get_presigned(self):

def get_expected_signature(self):
return utils.api_sign_request({"public_id": self.public_id, "version": self.version}, config().api_secret,
config().signature_algorithm)
config().signature_algorithm, signature_version=1)

@property
def url(self):
Expand Down
72 changes: 61 additions & 11 deletions cloudinary/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,24 +619,72 @@ def sign_request(params, options):
if not api_secret:
raise ValueError("Must supply api_secret")
signature_algorithm = options.get("signature_algorithm", cloudinary.config().signature_algorithm)
signature_version = options.get("signature_version", cloudinary.config().signature_version)

params = cleanup_params(params)
params["signature"] = api_sign_request(params, api_secret, signature_algorithm)
params["signature"] = api_sign_request(params, api_secret, signature_algorithm, signature_version)
params["api_key"] = api_key

return params


def api_sign_request(params_to_sign, api_secret, algorithm=SIGNATURE_SHA1):
params = [(k + "=" + (
",".join(v) if isinstance(v, list) else
str(v).lower() if isinstance(v, bool) else
str(v)
)) for k, v in params_to_sign.items() if v]
to_sign = "&".join(sorted(params))
def api_sign_request(params_to_sign, api_secret, algorithm=SIGNATURE_SHA1, signature_version=2):
"""
Signs API request parameters using the specified algorithm and signature version.

:param params_to_sign: Parameters to include in the signature
:param api_secret: API secret key
:param algorithm: Signature algorithm (default: SHA1)
:param signature_version: Signature version (default: 2)
- Version 1: Original behavior without parameter encoding
- Version 2+: Includes parameter encoding to prevent parameter smuggling
:return: Computed signature
"""
to_sign = api_string_to_sign(params_to_sign, signature_version)
return compute_hex_hash(to_sign + api_secret, algorithm)


def api_string_to_sign(params_to_sign, signature_version=2):
"""
Generates a string to be signed for API requests.

:param params_to_sign: Parameters to include in the signature
:param signature_version: Version of signature algorithm to use:
- Version 1: Original behavior without parameter encoding
- Version 2+ (default): Includes parameter encoding to prevent parameter smuggling
:return: String to be signed
"""
params = []
for k, v in params_to_sign.items():
if v:
if isinstance(v, list):
value = ",".join(v)
elif isinstance(v, bool):
value = str(v).lower()
else:
value = str(v)

param_string = k + "=" + value
if signature_version >= 2:
param_string = _encode_param(param_string)
params.append(param_string)

return "&".join(sorted(params))


def _encode_param(value):
"""
Encodes a parameter for safe inclusion in URL query strings.

Specifically replaces "&" characters with their percent-encoded equivalent "%26"
to prevent them from being interpreted as parameter separators in URL query strings.

:param value: The parameter to encode
:return: Encoded parameter
"""
return str(value).replace("&", "%26")


def breakpoint_settings_mapper(breakpoint_settings):
breakpoint_settings = copy.deepcopy(breakpoint_settings)
transformation = breakpoint_settings.get("transformation")
Expand Down Expand Up @@ -1566,7 +1614,7 @@ def verify_api_response_signature(public_id, version, signature, algorithm=None)
:param version: The version of the asset as returned in the API response
:param signature: Actual signature. Can be retrieved from the X-Cld-Signature header
:param algorithm: Name of hashing algorithm to use for calculation of HMACs.
By default uses `cloudinary.config().signature_algorithm`
By default, uses `cloudinary.config().signature_algorithm`

:return: Boolean result of the validation
"""
Expand All @@ -1576,10 +1624,12 @@ def verify_api_response_signature(public_id, version, signature, algorithm=None)
parameters_to_sign = {'public_id': public_id,
'version': version}

# Use signature version 1 for backward compatibility
return signature == api_sign_request(
parameters_to_sign,
cloudinary.config().api_secret,
algorithm or cloudinary.config().signature_algorithm
algorithm or cloudinary.config().signature_algorithm,
signature_version=1
)


Expand All @@ -1592,7 +1642,7 @@ def verify_notification_signature(body, timestamp, signature, valid_for=7200, al
:param signature: Actual signature. Can be retrieved from the X-Cld-Signature header
:param valid_for: The desired time in seconds for considering the request valid
:param algorithm: Name of hashing algorithm to use for calculation of HMACs.
By default uses `cloudinary.config().signature_algorithm`
By default, uses `cloudinary.config().signature_algorithm`

:return: Boolean result of the validation
"""
Expand Down
147 changes: 142 additions & 5 deletions test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@

MOCKED_NOW = 1549533574
API_SECRET = 'X7qLTrsES31MzxxkxPPA-pAGGfU'
API_SIGN_REQUEST_TEST_SECRET = "hdcixPpR2iKERPwqvH6sHdK9cyac"
API_SIGN_REQUEST_CLOUD_NAME = "dn6ot3ged"


class TestUtils(unittest.TestCase):
Expand All @@ -76,7 +78,8 @@ def setUp(self):
cname=None, # for these tests without actual upload, we ignore cname
api_key="a", api_secret="b",
secure_distribution=None,
private_cdn=False)
private_cdn=False,
signature_version=2)

def __test_cloudinary_url(self, public_id=TEST_ID, options=None, expected_url=None, expected_options=None):
if expected_options is None:
Expand Down Expand Up @@ -1443,17 +1446,151 @@ def test_support_long_url_signature(self):
expected_url=DEFAULT_UPLOAD_PATH + long_signature + "/" + image_name)

def test_api_sign_request_sha1(self):
params = dict(cloud_name="dn6ot3ged", timestamp=1568810420, username="[email protected]")
signature = api_sign_request(params, "hdcixPpR2iKERPwqvH6sHdK9cyac")
params = dict(cloud_name=API_SIGN_REQUEST_CLOUD_NAME, timestamp=1568810420, username="[email protected]")
signature = api_sign_request(params, API_SIGN_REQUEST_TEST_SECRET)
expected = "14c00ba6d0dfdedbc86b316847d95b9e6cd46d94"
self.assertEqual(expected, signature)

def test_api_sign_request_sha256(self):
params = dict(cloud_name="dn6ot3ged", timestamp=1568810420, username="[email protected]")
signature = api_sign_request(params, "hdcixPpR2iKERPwqvH6sHdK9cyac", cloudinary.utils.SIGNATURE_SHA256)
params = dict(cloud_name=API_SIGN_REQUEST_CLOUD_NAME, timestamp=1568810420, username="[email protected]")
signature = api_sign_request(params, API_SIGN_REQUEST_TEST_SECRET, cloudinary.utils.SIGNATURE_SHA256)
expected = "45ddaa4fa01f0c2826f32f669d2e4514faf275fe6df053f1a150e7beae58a3bd"
self.assertEqual(expected, signature)

def test_api_sign_request_prevents_parameter_smuggling(self):
"""Should prevent parameter smuggling via & characters in parameter values"""
# Test with notification_url containing & characters
params_with_ampersand = {
"cloud_name": API_SIGN_REQUEST_CLOUD_NAME,
"timestamp": 1568810420,
"notification_url": "https://fake.com/callback?a=1&tags=hello,world"
}

signature_with_ampersand = api_sign_request(params_with_ampersand, API_SIGN_REQUEST_TEST_SECRET)

# Test that attempting to smuggle parameters by splitting the notification_url fails
params_smuggled = {
"cloud_name": API_SIGN_REQUEST_CLOUD_NAME,
"timestamp": 1568810420,
"notification_url": "https://fake.com/callback?a=1",
"tags": "hello,world" # This would be smuggled if & encoding didn't work
}

signature_smuggled = api_sign_request(params_smuggled, API_SIGN_REQUEST_TEST_SECRET)

# The signatures should be different, proving that parameter smuggling is prevented
self.assertNotEqual(signature_with_ampersand, signature_smuggled,
"Signatures should be different to prevent parameter smuggling")

# Verify the expected signature for the properly encoded case
expected_signature = "4fdf465dd89451cc1ed8ec5b3e314e8a51695704"
self.assertEqual(expected_signature, signature_with_ampersand)

# Verify the expected signature for the smuggled parameters case
expected_smuggled_signature = "7b4e3a539ff1fa6e6700c41b3a2ee77586a025f9"
self.assertEqual(expected_smuggled_signature, signature_smuggled)

def test_api_sign_request_signature_versions(self):
"""Should use signature version 1 (without parameter encoding) for backward compatibility"""
public_id_with_ampersand = 'tests/logo&version=2'
test_version = 1

expected_signature_v1 = api_sign_request(
{'public_id': public_id_with_ampersand, 'version': test_version},
API_SIGN_REQUEST_TEST_SECRET,
cloudinary.utils.SIGNATURE_SHA1,
signature_version=1
)

expected_signature_v2 = api_sign_request(
{'public_id': public_id_with_ampersand, 'version': test_version},
API_SIGN_REQUEST_TEST_SECRET,
cloudinary.utils.SIGNATURE_SHA1,
signature_version=2
)

self.assertNotEqual(expected_signature_v1, expected_signature_v2)

# verify_api_response_signature should use version 1 for backward compatibility
with patch('cloudinary.config', return_value=cloudinary.config(api_secret=API_SIGN_REQUEST_TEST_SECRET)):
self.assertTrue(
verify_api_response_signature(
public_id_with_ampersand,
test_version,
expected_signature_v1
)
)

self.assertFalse(
verify_api_response_signature(
public_id_with_ampersand,
test_version,
expected_signature_v2
)
)

def test_signature_version_config_support(self):
"""Should use signature_version from config and produce different signatures for v1 vs v2"""
# Use params with & characters to show the encoding difference between versions
params = {'public_id': 'test&image', 'notification_url': 'https://example.com/callback?param=value&other=data'}

# Test with config signature_version = 1
cloudinary.config().signature_version = 1

# Test sign_request function uses config values
options_with_config = {'api_key': 'test_key', 'api_secret': API_SIGN_REQUEST_TEST_SECRET}
signed_params_config_v1 = cloudinary.utils.sign_request(params.copy(), options_with_config)

# Test explicit signature version
options_explicit_v1 = options_with_config.copy()
options_explicit_v1['signature_version'] = 1
signed_params_explicit_v1 = cloudinary.utils.sign_request(params.copy(), options_explicit_v1)

self.assertEqual(signed_params_config_v1['signature'], signed_params_explicit_v1['signature'])

# Test with config signature_version = 2
cloudinary.config().signature_version = 2

signed_params_config_v2 = cloudinary.utils.sign_request(params.copy(), options_with_config)

options_explicit_v2 = options_with_config.copy()
options_explicit_v2['signature_version'] = 2
signed_params_explicit_v2 = cloudinary.utils.sign_request(params.copy(), options_explicit_v2)

self.assertEqual(signed_params_config_v2['signature'], signed_params_explicit_v2['signature'])

# Verify that v1 and v2 actually produce different signatures due to parameter encoding
self.assertNotEqual(signed_params_config_v1['signature'], signed_params_config_v2['signature'],
"Signature v1 and v2 should be different for parameters with & characters")

def test_sign_request_with_signature_version(self):
"""Should support signature_version parameter in sign_request function"""
params = {'public_id': 'test_image', 'version': 1234}
options = {'api_key': 'test_key', 'api_secret': API_SIGN_REQUEST_TEST_SECRET}

# Test with signature_version in options
options_v1 = options.copy()
options_v1['signature_version'] = 1
signed_params_v1 = cloudinary.utils.sign_request(params.copy(), options_v1)

options_v2 = options.copy()
options_v2['signature_version'] = 2
signed_params_v2 = cloudinary.utils.sign_request(params.copy(), options_v2)

# The signatures should be different for different versions (for params with & characters)
# For these simple params without & they might be the same, but let's test the structure
self.assertIn('signature', signed_params_v1)
self.assertIn('signature', signed_params_v2)
self.assertIn('api_key', signed_params_v1)
self.assertIn('api_key', signed_params_v2)

# Test that signature_version is passed through correctly
expected_sig_v1 = api_sign_request(params, API_SIGN_REQUEST_TEST_SECRET, cloudinary.utils.SIGNATURE_SHA1, 1)
expected_sig_v2 = api_sign_request(params, API_SIGN_REQUEST_TEST_SECRET, cloudinary.utils.SIGNATURE_SHA1, 2)

self.assertEqual(signed_params_v1['signature'], expected_sig_v1)
self.assertEqual(signed_params_v2['signature'], expected_sig_v2)


if __name__ == '__main__':
unittest.main()