Skip to content

Commit

Permalink
auth check added (#1968)
Browse files Browse the repository at this point in the history
  • Loading branch information
saravanpa-aot authored Aug 8, 2023
1 parent 8a68a66 commit f796add
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 50 deletions.
3 changes: 1 addition & 2 deletions met-api/src/met_api/resources/engagement.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ class Engagement(Resource):
def get(engagement_id):
"""Fetch a single engagement matching the provided id."""
try:
user_id = TokenInfo.get_id()
engagement_record = EngagementService().get_engagement(engagement_id, user_id)
engagement_record = EngagementService().get_engagement(engagement_id)

if engagement_record:
return engagement_record, HTTPStatus.OK
Expand Down
1 change: 0 additions & 1 deletion met-api/src/met_api/resources/survey.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def get(survey_id):
try:
user_id = TokenInfo.get_id()
if user_id:
# authenticated users have access to any survey/engagement status
survey_record = SurveyService().get(survey_id)
else:
survey_record = SurveyService().get_open(survey_id)
Expand Down
36 changes: 21 additions & 15 deletions met-api/src/met_api/services/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,34 @@
def check_auth(**kwargs):
"""Check if user is authorized to perform action on the service."""
user_from_context: UserContext = kwargs['user_context']
token_roles = user_from_context.roles
permitted_roles = kwargs.get('one_of_roles', [])
has_valid_roles = bool(set(token_roles) & set(permitted_roles))
token_roles = set(user_from_context.roles)
permitted_roles = set(kwargs.get('one_of_roles', []))
has_valid_roles = token_roles & permitted_roles
if has_valid_roles:
return
if MembershipType.TEAM_MEMBER.name in permitted_roles:

team_permitted_roles = {MembershipType.TEAM_MEMBER.name, MembershipType.REVIEWER.name} & permitted_roles

if team_permitted_roles:
# check if he is a member of particular engagement.
is_a_member = _has_team_membership(kwargs, user_from_context)
if is_a_member:
has_valid_team_access = _has_team_membership(kwargs, user_from_context, team_permitted_roles)
if has_valid_team_access:
return

abort(403)


def _has_team_membership(kwargs, user_from_context) -> bool:
eng_id = kwargs.get('engagement_id', None)
external_id = user_from_context.sub
user = StaffUserModel.get_user_by_external_id(external_id)
if not eng_id or not user:
def _has_team_membership(kwargs, user_from_context, team_permitted_roles) -> bool:
eng_id = kwargs.get('engagement_id')

if not eng_id:
return False

user = StaffUserModel.get_user_by_external_id(user_from_context.sub)

if not user:
return False

memberships = MembershipModel.find_by_engagement_and_user_id(eng_id, user.id)
# TODO when multiple memberships are supported , iterate list and check role.
if memberships and memberships[0].type == MembershipType.TEAM_MEMBER:
return True
return False

return any(membership.type.name in team_permitted_roles for membership in memberships)
23 changes: 15 additions & 8 deletions met-api/src/met_api/services/engagement_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,33 @@ class EngagementService:
otherdateformat = '%Y-%m-%d'

@staticmethod
def get_engagement(engagement_id, user_id) -> EngagementSchema:
def get_engagement(engagement_id) -> EngagementSchema:
"""Get Engagement by the id."""
engagement_model: EngagementModel = EngagementModel.find_by_id(engagement_id)

if engagement_model:
if user_id is None \
if TokenInfo.get_id() is None \
and engagement_model.status_id not in (Status.Published.value, Status.Closed.value):
# Non authenticated users only have access to published and closed engagements
return None
if engagement_model.status_id in (Status.Draft.value, Status.Scheduled.value):
one_of_roles = (
MembershipType.TEAM_MEMBER.name,
Role.VIEW_ENGAGEMENT.value
)
authorization.check_auth(one_of_roles=one_of_roles, engagement_id=engagement_id)

engagement = EngagementSchema().dump(engagement_model)
engagement['banner_url'] = ObjectStorageService.get_url(engagement_model.banner_filename)
return engagement

@classmethod
def get_engagements_paginated(
cls,
external_user_id,
pagination_options: PaginationOptions,
search_options=None,
include_banner_url=False,
cls,
external_user_id,
pagination_options: PaginationOptions,
search_options=None,
include_banner_url=False,
):
"""Get engagements paginated."""
user_roles = TokenInfo.get_user_roles()
Expand Down Expand Up @@ -199,7 +206,7 @@ def _save_or_update_eng_block(engagement_id, status_block):
# see if there is one existing for the status ;if not create one
survey_status = survey_block.get('survey_status')
survey_block = survey_block.get('block_text')
status_block: EngagementStatusBlockModel = EngagementStatusBlockModel.\
status_block: EngagementStatusBlockModel = EngagementStatusBlockModel. \
get_by_status(engagement_id, survey_status)
if status_block:
status_block.block_text = survey_block
Expand Down
44 changes: 39 additions & 5 deletions met-api/src/met_api/services/survey_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
from met_api.schemas.survey import SurveySchema
from met_api.services import authorization
from met_api.services.membership_service import MembershipService
from met_api.services.report_setting_service import ReportSettingService
from met_api.services.object_storage_service import ObjectStorageService
from met_api.services.report_setting_service import ReportSettingService
from met_api.utils.roles import Role
from met_api.utils.token_info import TokenInfo

from ..exceptions.business_exception import BusinessException


Expand All @@ -29,8 +28,32 @@ class SurveyService:

@classmethod
def get(cls, survey_id):
"""Get survey by the id."""
survey_model: SurveyModel = SurveyModel.find_by_id(survey_id)
"""Get survey by the ID."""
survey_model = SurveyModel.find_by_id(survey_id)
eng_id = None
one_of_roles = (Role.VIEW_SURVEYS.value,)
skip_auth = False

if survey_model.is_hidden:
# Only Admins can view hidden surveys.
one_of_roles = (Role.VIEW_ALL_SURVEYS.value,)
elif survey_model.engagement_id:
engagement_model = EngagementModel.find_by_id(survey_model.engagement_id)
if engagement_model:
eng_id = engagement_model.id
if engagement_model.status_id == Status.Published.value:
# Published Engagement anyone can access.
skip_auth = True
else:
one_of_roles = (
MembershipType.TEAM_MEMBER.name,
MembershipType.REVIEWER.name,
Role.VIEW_SURVEYS.value
)

if not skip_auth:
authorization.check_auth(one_of_roles=one_of_roles, engagement_id=eng_id)

survey = SurveySchema().dump(survey_model)
return survey

Expand Down Expand Up @@ -112,6 +135,13 @@ def clone(cls, data, survey_id):

if not survey_to_clone:
raise KeyError('Survey to clone was not found')
eng_id = None
if engagement_id := data.get('engagement_id', None):
engagement_model = EngagementModel.find_by_id(engagement_id)
eng_id = getattr(engagement_model, 'id', None)

authorization.check_auth(one_of_roles=(MembershipType.TEAM_MEMBER.name,
Role.CLONE_SURVEY.value), engagement_id=eng_id)

cloned_survey = SurveyModel.create_survey({
'name': data.get('name'),
Expand Down Expand Up @@ -143,7 +173,7 @@ def update(cls, data: SurveySchema):
engagement_id = survey.get('engagement_id', None)

authorization.check_auth(one_of_roles=(MembershipType.TEAM_MEMBER.name,
Role.EDIT_ALL_SURVEYS.value), engagement_id=engagement_id)
Role.EDIT_SURVEY.value), engagement_id=engagement_id)

# check if user has edit all surveys access to edit template surveys as well
user_roles = TokenInfo.get_user_roles()
Expand Down Expand Up @@ -189,6 +219,8 @@ def validate_create_fields(data):
def link(cls, survey_id, engagement_id):
"""Update survey."""
cls.validate_link_fields(survey_id, engagement_id)
authorization.check_auth(one_of_roles=(MembershipType.TEAM_MEMBER.name,
Role.EDIT_SURVEY.value), engagement_id=engagement_id)
return SurveyModel.link_survey(survey_id, engagement_id)

@classmethod
Expand All @@ -210,6 +242,8 @@ def validate_link_fields(cls, survey_id, engagement_id):
def unlink(cls, survey_id, engagement_id):
"""Unlink survey."""
cls.validate_unlink_fields(survey_id, engagement_id)
authorization.check_auth(one_of_roles=(MembershipType.TEAM_MEMBER.name,
Role.EDIT_SURVEY.value), engagement_id=engagement_id)
return SurveyModel.unlink_survey(survey_id)

@classmethod
Expand Down
3 changes: 2 additions & 1 deletion met-api/src/met_api/utils/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Role(Enum):
CREATE_ENGAGEMENT = 'create_engagement'
VIEW_SURVEYS = 'view_surveys'
CREATE_SURVEY = 'create_survey'
EDIT_SURVEY = 'edit_survey'
CLONE_SURVEY = 'clone_survey'
PUBLISH_ENGAGEMENT = 'publish_engagement'
VIEW_ENGAGEMENT = 'view_engagement'
Expand All @@ -41,7 +42,7 @@ class Role(Enum):
ACCESS_DASHBOARD = 'access_dashboard'
VIEW_MEMBERS = 'view_members'
EDIT_MEMBERS = 'edit_members'
VIEW_ALL_SURVEYS = 'view_all_surveys'
VIEW_ALL_SURVEYS = 'view_all_surveys' # Super user can view all kind of surveys including hidden
EDIT_ALL_SURVEYS = 'edit_all_surveys'
EDIT_DRAFT_ENGAGEMENT = 'edit_draft_engagement'
EDIT_SCHEDULED_ENGAGEMENT = 'edit_scheduled_engagement'
Expand Down
27 changes: 27 additions & 0 deletions met-api/tests/unit/api/test_engagement.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""
import copy
import json
from http import HTTPStatus

import pytest
from faker import Faker
Expand Down Expand Up @@ -107,6 +108,32 @@ def test_get_engagements(client, jwt, session, engagement_info): # pylint:disab
assert created_eng.get('content') == rv.json.get('content')


@pytest.mark.parametrize('engagement_info', [TestEngagementInfo.engagement_draft])
def test_get_engagements_reviewer(client, jwt, session, engagement_info): # pylint:disable=unused-argument
"""Assert reviewers access on an engagement."""
headers = factory_auth_header(jwt=jwt, claims=TestJwtClaims.staff_admin_role)
rv = client.post('/api/engagements/', data=json.dumps(engagement_info),
headers=headers, content_type=ContentType.JSON.value)
assert rv.status_code == HTTPStatus.OK.value
created_eng = rv.json
eng_id = created_eng.get('id')
staff_1 = dict(TestUserInfo.user_staff_1)
user = factory_staff_user_model(user_info=staff_1)
claims = copy.deepcopy(TestJwtClaims.reviewer_role.value)
claims['sub'] = str(user.external_id)
headers = factory_auth_header(jwt=jwt, claims=claims)
rv = client.get(f'/api/engagements/{eng_id}',
headers=headers, content_type=ContentType.JSON.value)
assert rv.status_code == HTTPStatus.FORBIDDEN.value

factory_membership_model(user_id=user.id, engagement_id=eng_id, member_type='REVIEWER')

# Reveiwer has no access to draft engagement
rv = client.get(f'/api/engagements/{eng_id}',
headers=headers, content_type=ContentType.JSON.value)
assert rv.status_code == HTTPStatus.FORBIDDEN.value


@pytest.mark.parametrize('engagement_info', [TestEngagementInfo.engagement1])
def test_search_engagements_by_status(client, jwt,
session, engagement_info): # pylint:disable=unused-argument
Expand Down
67 changes: 63 additions & 4 deletions met-api/tests/unit/api/test_survey.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@
Test-Suite to ensure that the /Engagement endpoint is working as expected.
"""
import copy
import json
from http import HTTPStatus

import pytest
from flask import current_app

from met_api.constants.engagement_status import Status
from met_api.models.engagement import Engagement as EngagementModel
from met_api.models.membership import Membership as MembershipModel
from met_api.models.tenant import Tenant as TenantModel
from met_api.utils.constants import TENANT_ID_HEADER
from met_api.utils.enums import ContentType
from tests.utilities.factory_scenarios import TestJwtClaims, TestSurveyInfo, TestTenantInfo
from met_api.utils.enums import ContentType, MembershipStatus
from tests.utilities.factory_scenarios import TestJwtClaims, TestSurveyInfo, TestTenantInfo, TestUserInfo
from tests.utilities.factory_utils import (
factory_auth_header, factory_engagement_model, factory_survey_model, factory_tenant_model, set_global_tenant)
factory_auth_header, factory_engagement_model, factory_membership_model, factory_staff_user_model,
factory_survey_model, factory_tenant_model, set_global_tenant)


surveys_url = '/api/surveys/'

Expand Down Expand Up @@ -105,7 +111,7 @@ def test_survey_link(client, jwt, session): # pylint:disable=unused-argument
"""Assert that a survey can be POSTed."""
survey = factory_survey_model()
survey_id = survey.id
headers = factory_auth_header(jwt=jwt, claims=TestJwtClaims.no_role)
headers = factory_auth_header(jwt=jwt, claims=TestJwtClaims.staff_admin_role)

eng = factory_engagement_model()
eng_id = eng.id
Expand Down Expand Up @@ -151,6 +157,59 @@ def test_get_hidden_survey_for_admins(client, jwt, session): # pylint:disable=u
assert rv.json.get('total') == 1


def test_get_survey_for_reviewer(client, jwt, session): # pylint:disable=unused-argument
"""Assert reviewers different permission."""
staff_1 = dict(TestUserInfo.user_staff_1)
user = factory_staff_user_model(user_info=staff_1)
claims = copy.deepcopy(TestJwtClaims.reviewer_role.value)
claims['sub'] = str(user.external_id)
headers = factory_auth_header(jwt=jwt, claims=claims)
set_global_tenant()
survey1 = factory_survey_model(TestSurveyInfo.survey1)

# Attempt to access unlinked survey
rv = client.get(f'{surveys_url}{survey1.id}',
headers=headers, content_type=ContentType.JSON.value)
assert rv.status_code == 403

# Link to a draft engagement
eng: EngagementModel = factory_engagement_model(status=Status.Draft.value)
survey1.engagement_id = eng.id
survey1.commit()

# Attempt to access survey linked to draft engagement
rv = client.get(f'{surveys_url}{survey1.id}',
headers=headers, content_type=ContentType.JSON.value)
assert rv.status_code == 403

# Add user as a reviewer in the team
factory_membership_model(user_id=user.id, engagement_id=eng.id, member_type='REVIEWER')

# Assert Reviewer can see the survey since he is added to the team.
rv = client.get(f'{surveys_url}{survey1.id}',
headers=headers, content_type=ContentType.JSON.value)
assert rv.status_code == 200

# Deactivate membership
membership_model: MembershipModel = MembershipModel.find_by_engagement_and_user_id(eng.id, user.id)
membership_model[0].status = MembershipStatus.INACTIVE.value
membership_model[0].commit()

rv = client.get(f'{surveys_url}{survey1.id}',
headers=headers, content_type=ContentType.JSON.value)
# Verify reviewer lost access after being removed from the team
assert rv.status_code == 403

# Publish the engagement
eng.status_id = Status.Published.value
eng.commit()
rv = client.get(f'{surveys_url}{survey1.id}',
headers=headers, content_type=ContentType.JSON.value)

# Assert user can access the survey even when he is removed from the team since its published.
assert rv.status_code == 200


def test_get_hidden_survey_for_team_member(client, jwt, session): # pylint:disable=unused-argument
"""Assert that a hidden survey cannot be fetched by team members."""
headers = factory_auth_header(jwt=jwt, claims=TestJwtClaims.team_member_role)
Expand Down
Loading

0 comments on commit f796add

Please sign in to comment.