From b54f444705ba474ffbb97b04a0bbbe6c5f8fe7d9 Mon Sep 17 00:00:00 2001 From: Luca Lusso Date: Fri, 21 Sep 2018 17:00:16 +0200 Subject: [PATCH] =?UTF-8?q?Consentire=20avvio=20in=20modalit=C3=A0=20headl?= =?UTF-8?q?ess=20(con=20login=20automatico)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- testenv/parser.py | 9 +- testenv/server.py | 329 ++++++++++++++++++++++++++-------------------- testenv/users.py | 17 +++ 3 files changed, 215 insertions(+), 140 deletions(-) diff --git a/testenv/parser.py b/testenv/parser.py index 7b27f17a..85889543 100644 --- a/testenv/parser.py +++ b/testenv/parser.py @@ -23,7 +23,7 @@ HTTPRedirectRequest = namedtuple( 'HTTPRedirectRequest', - ['saml_request', 'relay_state', 'sig_alg', 'signature', 'signed_data'], + ['saml_request', 'relay_state', 'sig_alg', 'signature', 'signed_data', 'auto_login'], ) @@ -59,6 +59,7 @@ def __init__(self, querystring, request_class=None): self._sig_alg = None self._signature = None self._signed_data = None + self._auto_login = None def parse(self): self._saml_request = self._parse_saml_request() @@ -66,6 +67,7 @@ def parse(self): self._sig_alg = self._parse_sig_alg() self._signature = self._parse_signature() self._signed_data = self._build_signed_data() + self._auto_login = self._build_auto_login() return self._build_request() def _parse_saml_request(self): @@ -121,6 +123,10 @@ def _build_signed_data(self): ) return signed_data.encode('ascii') + def _build_auto_login(self): + auto_login = self._querystring.get('auto_login', None) + return auto_login + def _build_request(self): return self._request_class( self._saml_request, @@ -128,6 +134,7 @@ def _build_request(self): self._sig_alg, self._signature, self._signed_data, + self._auto_login ) diff --git a/testenv/server.py b/testenv/server.py index 1afd4a27..41b2e4f7 100644 --- a/testenv/server.py +++ b/testenv/server.py @@ -24,7 +24,7 @@ AUTH_NO_CONSENT, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT, CHALLENGES_TIMEOUT, SPID_ATTRIBUTES, SPID_LEVELS, STATUS_SUCCESS, ) -from testenv.users import JsonUserManager +from testenv.users import AutoLoginJsonUserManager, JsonUserManager from testenv.utils import Key, Slo, Sso, get_spid_error # FIXME: move to a the parser.py module after metadata refactoring @@ -60,6 +60,7 @@ def __init__(self, app, conf=None, registry=None, *args, **kwargs): # bind Flask app self.app = app self.user_manager = JsonUserManager() + self.auto_login_user_manager = AutoLoginJsonUserManager() # setup self._config = conf or config.params self._registry = registry or spmetadata.registry @@ -293,6 +294,11 @@ def single_sign_on_service(self): key = self._store_request(spid_request.saml_tree) session['request_key'] = key session['relay_state'] = spid_request.data.relay_state or '' + + if spid_request.data.auto_login: + username = spid_request.data.auto_login + return self._auto_login(username), 200 + return redirect(url_for('login')) except RequestParserError as err: self._raise_error(err.args[0]) @@ -303,6 +309,186 @@ def single_sign_on_service(self): except DeserializationError as err: return self._handle_errors(err.initial_data, err.details) + def _auto_login(self, username): + """ + Returns an AuthnResponse for the specific username bypassing the login form + """ + + self.app.logger.debug( + 'Auto login richiesto per l\'utente: {}'.format(username) + ) + + key = from_session('request_key') + relay_state = from_session('relay_state') + self.app.logger.debug('Request key: {}'.format(key)) + if key and key in self.ticket: + authn_request = self.ticket[key] + sp_id = authn_request.issuer.text + destination = self.get_destination(authn_request, sp_id) + authn_context = authn_request.requested_authn_context + spid_level = authn_context.authn_context_class_ref.text + + # verify user credentials + user_id, user = self.auto_login_user_manager.get( + username, + '', # no need for password with auto_login_user_manager + sp_id + ) + if user_id is not None: + rendered_template, response_xmlstr, _identity = self._build_success_response(user, authn_request, spid_level, destination, sp_id, relay_state) + return rendered_template + else: + rendered_template = self._build_failed_response(authn_request, destination, key, relay_state) + return rendered_template + + def _build_success_response(self, user, authn_request, spid_level, destination, sp_id, relay_state): + """ + Build and return a successful response + """ + + identity = user['attrs'].copy() + self.app.logger.debug( + 'Unfiltered data: {}'.format(identity) + ) + atcs_idx = getattr( + authn_request, 'attribute_consuming_service_index', None) + self.app.logger.debug( + 'AttributeConsumingServiceIndex: {}'.format( + atcs_idx + ) + ) + sp_metadata = self._registry.get(sp_id) + required = [] + optional = [] + if atcs_idx and sp_metadata: + attrs = sp_metadata.attributes(atcs_idx) + required = [el for el in attrs.get('required')] + optional = [el for el in attrs.get('optional')] + + for attr_name, val in identity.items(): + _type = self._attribute_type(attr_name) + identity[attr_name] = (_type, val) + + _identity = {} + # TODO: refactor a bit the following snippet + for _key in required: + try: + _identity[_key] = identity[_key] + except KeyError: + _identity[_key] = ( + '', self._attribute_type(_key)) + for _key in optional: + try: + _identity[_key] = identity[_key] + except KeyError: + _identity[_key] = ( + '', self._attribute_type(_key)) + + self.app.logger.debug( + 'Filtered data: {}'.format(_identity) + ) + + response_xmlstr = create_response( + { + 'response': { + 'attrs': { + 'in_response_to': authn_request.id, + 'destination': destination + } + }, + 'issuer': { + 'attrs': { + 'name_qualifier': self._config.entity_id, + }, + 'text': self._config.entity_id + }, + 'name_id': { + 'attrs': { + 'name_qualifier': self._config.entity_id, + } + }, + + 'subject_confirmation_data': { + 'attrs': { + 'recipient': destination + } + }, + 'audience': { + 'text': sp_id + }, + 'authn_context_class_ref': { + 'text': spid_level + } + }, + { + 'status_code': STATUS_SUCCESS + }, + _identity.copy() + ).to_xml() + response = sign_http_post( + response_xmlstr, + self._config.idp_key, + self._config.idp_certificate, + ) + rendered_template = render_template( + 'form_http_post.html', + **{ + 'action': destination, + 'relay_state': relay_state, + 'message': response, + 'message_type': 'SAMLResponse' + } + ) + return rendered_template, response_xmlstr, _identity + + def _build_failed_response(self, authn_request, destination, key, relay_state): + """ + Build and return a failed response + """ + + error_info = get_spid_error( + AUTH_NO_CONSENT + ) + response = create_error_response( + { + 'response': { + 'attrs': { + 'in_response_to': authn_request.id, + 'destination': destination + } + }, + 'issuer': { + 'attrs': { + 'name_qualifier': self._config.entity_id, + }, + 'text': self._config.entity_id + }, + }, + { + 'status_code': error_info[0], + 'status_message': error_info[1] + } + ).to_xml() + self.app.logger.debug( + 'Error response: \n{}'.format(response) + ) + response = sign_http_post( + response, + self._config.idp_key, + self._config.idp_certificate, + ) + del self.ticket[key] + rendered_template = render_template( + 'form_http_post.html', + **{ + 'action': destination, + 'relay_state': relay_state, + 'message': response, + 'message_type': 'SAMLResponse' + } + ) + return rendered_template + @property def _spid_main_fields(self): """ @@ -453,103 +639,8 @@ def login(self): sp_id ) if user_id is not None: - # setup response - identity = user['attrs'].copy() - self.app.logger.debug( - 'Unfiltered data: {}'.format(identity) - ) - atcs_idx = getattr( - authn_request, 'attribute_consuming_service_index', None) - self.app.logger.debug( - 'AttributeConsumingServiceIndex: {}'.format( - atcs_idx - ) - ) - sp_metadata = self._registry.get(sp_id) - required = [] - optional = [] - if atcs_idx and sp_metadata: - attrs = sp_metadata.attributes(atcs_idx) - required = [el for el in attrs.get('required')] - optional = [el for el in attrs.get('optional')] - - for attr_name, val in identity.items(): - _type = self._attribute_type(attr_name) - identity[attr_name] = (_type, val) - - _identity = {} - # TODO: refactor a bit the following snippet - for _key in required: - try: - _identity[_key] = identity[_key] - except KeyError: - _identity[_key] = ( - '', self._attribute_type(_key)) - for _key in optional: - try: - _identity[_key] = identity[_key] - except KeyError: - _identity[_key] = ( - '', self._attribute_type(_key)) - - self.app.logger.debug( - 'Filtered data: {}'.format(_identity) - ) - - response_xmlstr = create_response( - { - 'response': { - 'attrs': { - 'in_response_to': authn_request.id, - 'destination': destination - } - }, - 'issuer': { - 'attrs': { - 'name_qualifier': self._config.entity_id, - }, - 'text': self._config.entity_id - }, - 'name_id': { - 'attrs': { - 'name_qualifier': self._config.entity_id, - } - }, - - 'subject_confirmation_data': { - 'attrs': { - 'recipient': destination - } - }, - 'audience': { - 'text': sp_id - }, - 'authn_context_class_ref': { - 'text': spid_level - } - }, - { - 'status_code': STATUS_SUCCESS - }, - _identity.copy() - ).to_xml() - response = sign_http_post( - response_xmlstr, - self._config.idp_key, - self._config.idp_certificate, - ) - self.app.logger.debug( - 'Response: \n{}'.format(response) - ) - rendered_template = render_template( - 'form_http_post.html', - **{ - 'action': destination, - 'relay_state': relay_state, - 'message': response, - 'message_type': 'SAMLResponse' - } - ) + rendered_template, response_xmlstr, _identity = self._build_success_response(user, authn_request, spid_level, destination, + sp_id, relay_state) self.responses[key] = rendered_template # Setup confirmation page data rendered_response = render_template( @@ -566,47 +657,7 @@ def login(self): ) return rendered_response, 200 elif 'delete' in request.form: - error_info = get_spid_error( - AUTH_NO_CONSENT - ) - response = create_error_response( - { - 'response': { - 'attrs': { - 'in_response_to': authn_request.id, - 'destination': destination - } - }, - 'issuer': { - 'attrs': { - 'name_qualifier': self._config.entity_id, - }, - 'text': self._config.entity_id - }, - }, - { - 'status_code': error_info[0], - 'status_message': error_info[1] - } - ).to_xml() - self.app.logger.debug( - 'Error response: \n{}'.format(response) - ) - response = sign_http_post( - response, - self._config.idp_key, - self._config.idp_certificate, - ) - del self.ticket[key] - rendered_template = render_template( - 'form_http_post.html', - **{ - 'action': destination, - 'relay_state': relay_state, - 'message': response, - 'message_type': 'SAMLResponse' - } - ) + rendered_template = self._build_failed_response(authn_request, destination, key, relay_state) return rendered_template, 200 return render_template('403.html'), 403 diff --git a/testenv/users.py b/testenv/users.py index 1d615411..5b4cbf69 100644 --- a/testenv/users.py +++ b/testenv/users.py @@ -99,3 +99,20 @@ def add(self, uid, pwd, sp_id=None, extra=None): def all(self): return self.users + + +class AutoLoginJsonUserManager(JsonUserManager): + """ + User manager class that bypass the password check + """ + + def __init__(self, *args, **kwargs): + super(AutoLoginJsonUserManager, self).__init__(*args, **kwargs) + + def get(self, uid, pwd, sp_id): + for user, _attrs in self.users.items(): + if user == uid: + if _attrs['sp'] is not None and _attrs['sp'] != sp_id: + return None, None + return user, self.users[user] + return None, None