Skip to content

Commit

Permalink
SECURITY-5161: pull in new test cases, fix tests
Browse files Browse the repository at this point in the history
Co-authored-by: Laszlo Hammerl <[email protected]>
  • Loading branch information
knagy and potato committed Jun 7, 2024
1 parent fd1c980 commit e8f4b38
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 11 deletions.
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3'
services:
escher:
build: .
Expand Down
24 changes: 16 additions & 8 deletions escherauth/escherauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def __init__(self, api_key, api_secret, credential_scope, options=None):
self.clock_skew = options.get('clock_skew', 300)
self.algo = self.create_algo()
self.algo_id = self.algo_prefix + '-HMAC-' + self.hash_algo
self.debug_info = {}

def sign_request(self, request, headers_to_sign=None):
request = EscherRequest(request)
Expand All @@ -262,13 +263,14 @@ def sign_request(self, request, headers_to_sign=None):
else:
request.add_header(self.date_header_name, self.long_date(current_time))

signature = self.generate_signature(self.api_secret, request, headers_to_sign, current_time)
request.add_header(self.auth_header_name, ", ".join([
self.algo_id + ' Credential=' + self.api_key + '/' + self.short_date(
current_time) + '/' + self.credential_scope,
auth_header_value = self.algo_id + ' ' + ', '.join([
'Credential=' + self.credential(current_time),
'SignedHeaders=' + self.prepare_headers_to_sign(headers_to_sign),
'Signature=' + signature
]))
'Signature=' + self.generate_signature(self.api_secret, request, headers_to_sign, current_time),
])
request.add_header(self.auth_header_name, auth_header_value)
self.debug_info['auth_header_value'] = auth_header_value

return request.request

def presign_url(self, url, expires):
Expand All @@ -279,7 +281,7 @@ def presign_url(self, url, expires):

url_to_sign = url + ('&' if '?' in url else '?') + urlencode({
f'X-{self.vendor_key}-Algorithm': self.algo_id,
f'X-{self.vendor_key}-Credentials': self.api_key + '/' + self.short_date(current_time) + '/' + self.credential_scope,
f'X-{self.vendor_key}-Credentials': self.credential(current_time),
f'X-{self.vendor_key}-Date': self.long_date(current_time),
f'X-{self.vendor_key}-Expires': expires,
f'X-{self.vendor_key}-SignedHeaders': 'host',
Expand Down Expand Up @@ -360,6 +362,9 @@ def generate_signature(self, api_secret, req, headers_to_sign, current_time):
canonicalized_request = self.canonicalize(req, headers_to_sign)
string_to_sign = self.get_string_to_sign(canonicalized_request, current_time)

self.debug_info['canonicalized_request'] = canonicalized_request
self.debug_info['string_to_sign'] = string_to_sign

signing_key = self.hmac_digest(self.algo_prefix + api_secret, self.short_date(current_time))
for data in self.credential_scope.split('/'):
signing_key = self.hmac_digest(signing_key, data)
Expand Down Expand Up @@ -408,7 +413,7 @@ def normalize_white_spaces(self, value):
return '"'.join(value_normalized).strip()

def canonicalize_query(self, query_parts):
safe = "~+!'*"
safe = "!*"
query_list = []
for key, value in query_parts:
if key == 'X-' + self.vendor_key + '-Signature':
Expand All @@ -430,6 +435,9 @@ def create_algo(self):
if self.hash_algo == 'SHA512':
return sha512

def credential(self, time):
return self.api_key + '/' + self.short_date(time) + '/' + self.credential_scope

def header_date(self, time):
return time.strftime('%a, %d %b %Y %H:%M:%S GMT')

Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
nose2==0.14.1
nose2==0.15.1
requests>=2.0.0,<3.0.0
pycodestyle==2.11.1
15 changes: 15 additions & 0 deletions tests/test_escherauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,21 @@ def test_sign_request(self, test_case: TestCase):

try:
request = escher.sign_request(test_case.request, test_case.headers_to_sign)
if 'canonicalizedRequest' in test_case.expected:
self.assertEqual(
escher.debug_info['canonicalized_request'],
test_case.expected['canonicalizedRequest']
)
if 'stringToSign' in test_case.expected:
self.assertEqual(
escher.debug_info['string_to_sign'],
test_case.expected['stringToSign']
)
if 'authHeader' in test_case.expected:
self.assertEqual(
escher.debug_info['auth_header_value'],
test_case.expected['authHeader']
)
if 'request' in test_case.expected:
self.assertEqual(request, test_case.expected['request'])
else:
Expand Down

0 comments on commit e8f4b38

Please sign in to comment.