Skip to content

Commit 223938e

Browse files
committed
Add support for sp:SignedElements
1 parent 758dd29 commit 223938e

File tree

4 files changed

+83
-4
lines changed

4 files changed

+83
-4
lines changed

src/zeep/wsdl/definitions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ def __init__(self, wsdl, name, port_name):
137137
self._operations = {}
138138
self.signatures = {
139139
"header": [], # Parts of header, that should be signed
140+
"elements": [], # Arbitrary XPath elements that should be signed
140141
"body": False, # If body should be signed
141142
"everything": False, # If every header should be signed
142143
}

src/zeep/wsdl/wsdl.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,39 @@ def parse_binding(
476476
# If we didn't set "everything" to True, update the headers
477477
if not binding.signatures.get("everything", False):
478478
binding.signatures["header"] = [dict(header) for header in all_headers]
479+
480+
# Begin parsing SignedElements assertions
481+
signed_elements = doc.xpath(
482+
'wsp:Policy[@wsu:Id="{}"]//sp:SignedElements'.format(binding_policy),
483+
namespaces=NSMAP,
484+
)
485+
486+
for signed_element in signed_elements:
487+
xpath_version = signed_element.get('XPathVersion', 'http://www.w3.org/TR/1999/REC-xpath-19991116') # Default to XPath 1.0 if not specified
488+
489+
xpath_expressions = signed_element.xpath('sp:XPath', namespaces=NSMAP)
490+
491+
for xpath in xpath_expressions:
492+
xpath_string = xpath.text
493+
if xpath_string:
494+
# Store the XPath expression and its version
495+
binding.signatures.setdefault('elements', []).append({
496+
'xpath': xpath_string,
497+
'xpath_version': xpath_version
498+
})
499+
500+
# If you want to merge multiple SignedElements assertions as per the specification
501+
if 'elements' in binding.signatures:
502+
# Remove duplicates while preserving order
503+
unique_elements = []
504+
seen = set()
505+
for element in binding.signatures['elements']:
506+
element_tuple = (element['xpath'], element['xpath_version'])
507+
if element_tuple not in seen:
508+
seen.add(element_tuple)
509+
unique_elements.append(element)
510+
binding.signatures['elements'] = unique_elements
511+
479512
logger.debug("Adding binding: %s", binding.name.text)
480513
result[binding.name.text] = binding
481514
break

src/zeep/wsse/signature.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,17 @@ def _signature_prepare(envelope, key, signature_method, digest_method, signature
272272
header.find(QName(node["Namespace"], node["Name"])),
273273
digest_method,
274274
)
275+
# Sign elements specified by XPath expressions
276+
for element in signatures.get("elements", []):
277+
_sign_node_by_xpath(
278+
ctx,
279+
signature,
280+
envelope,
281+
element["xpath"],
282+
element["xpath_version"],
283+
digest_method
284+
)
285+
275286
ctx.sign(signature)
276287

277288
# Place the X509 data inside a WSSE SecurityTokenReference within
@@ -281,6 +292,20 @@ def _signature_prepare(envelope, key, signature_method, digest_method, signature
281292
sec_token_ref = etree.SubElement(key_info, QName(ns.WSSE, "SecurityTokenReference"))
282293
return security, sec_token_ref, x509_data
283294

295+
def _sign_node_by_xpath(ctx, signature, envelope, xpath, xpath_version, digest_method):
296+
# Create an XPath evaluator with the appropriate version
297+
if xpath_version == '1.0':
298+
evaluator = etree.XPath(xpath, namespaces=envelope.nsmap)
299+
else:
300+
evaluator = etree.XPath(xpath, namespaces=envelope.nsmap, extension={('http://www.w3.org/TR/1999/REC-xpath-19991116', 'version'): xpath_version})
301+
302+
# Evaluate the XPath expression
303+
nodes = evaluator(envelope)
304+
305+
# Sign each node found by the XPath expression
306+
for node in nodes:
307+
_sign_node(ctx, signature, node, digest_method)
308+
284309

285310
def _sign_envelope_with_key(
286311
envelope, key, signature_method, digest_method, signatures=None

tests/test_wsdl.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1377,7 +1377,7 @@ def test_parse_bindings_signed_unknown():
13771377
document = wsdl.Document(content, None)
13781378
assert document.bindings[
13791379
"{http://tests.python-zeep.org/xsd-main}TestBinding"
1380-
].signatures == {"body": False, "everything": False, "header": []}
1380+
].signatures == {"body": False, "everything": False, "header": [], "elements": []}
13811381

13821382
def test_parse_bindings_signed_body():
13831383
policy = """
@@ -1391,7 +1391,7 @@ def test_parse_bindings_signed_body():
13911391
document = wsdl.Document(content, None)
13921392
assert document.bindings[
13931393
"{http://tests.python-zeep.org/xsd-main}TestBinding"
1394-
].signatures == {"body": True, "everything": False, "header": []}
1394+
].signatures == {"body": True, "everything": False, "header": [], "elements": []}
13951395

13961396

13971397
def test_parse_bindings_signed_everything():
@@ -1404,7 +1404,7 @@ def test_parse_bindings_signed_everything():
14041404
document = wsdl.Document(content, None)
14051405
assert document.bindings[
14061406
"{http://tests.python-zeep.org/xsd-main}TestBinding"
1407-
].signatures == {"body": True, "everything": True, "header": []}
1407+
].signatures == {"body": True, "everything": True, "header": [], "elements": []}
14081408

14091409

14101410
def test_parse_bindings_signed_headers():
@@ -1423,12 +1423,32 @@ def test_parse_bindings_signed_headers():
14231423
"body": False,
14241424
"everything": False,
14251425
"header": [{"Name": "To", "Namespace": "http://www.w3.org/2005/08/addressing"}],
1426+
"elements": []
14261427
}
14271428

1429+
def test_parse_bindings_signed_elements():
1430+
policy = """
1431+
<wsp:Policy wsu:Id="TestBinding_policy">
1432+
<sp:SignedElements>
1433+
<sp:XPath>//wsse:Security/wsu:Timestamp</sp:XPath>
1434+
</sp:SignedElements>
1435+
</wsp:Policy>
1436+
"""
1437+
content = StringIO(BASE_WSDL.format(policy=policy).strip())
1438+
document = wsdl.Document(content, None)
1439+
assert document.bindings[
1440+
"{http://tests.python-zeep.org/xsd-main}TestBinding"
1441+
].signatures == {
1442+
"body": False,
1443+
"everything": False,
1444+
"header": [],
1445+
"elements": [{"xpath": "//wsse:Security/wsu:Timestamp", "xpath_version": "http://www.w3.org/TR/1999/REC-xpath-19991116"}]
1446+
}
1447+
14281448

14291449
def test_parse_bindings_signed_nothing():
14301450
content = StringIO(BASE_WSDL.format(policy="").strip())
14311451
document = wsdl.Document(content, None)
14321452
assert document.bindings[
14331453
"{http://tests.python-zeep.org/xsd-main}TestBinding"
1434-
].signatures == {"body": False, "everything": False, "header": []}
1454+
].signatures == {"body": False, "everything": False, "header": [], "elements": []}

0 commit comments

Comments
 (0)