Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/switch-to-S3-user-sync #3264

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .envs/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ REMOTE_PDB_HOST=0.0.0.0
REMOTE_PDB_PORT=4444
GEVENT_SUPPORT=True

ACTIVITY_STREAM_BASE_URL=https://url.to.activity.stream/
ACTIVITY_STREAM_HAWK_CREDENTIALS_ID=some-id
ACTIVITY_STREAM_HAWK_CREDENTIALS_KEY=some-key

PYTHONUNBUFFERED=TRUE

PGAUDIT_LOG_SCOPES='ALL, -MISC'
Expand Down
4 changes: 0 additions & 4 deletions .envs/test-e2e.env
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,6 @@ REMOTE_PDB_HOST=0.0.0.0
REMOTE_PDB_PORT=4444
GEVENT_SUPPORT=True

ACTIVITY_STREAM_BASE_URL=https://url.to.activity.stream/
ACTIVITY_STREAM_HAWK_CREDENTIALS_ID=some-id
ACTIVITY_STREAM_HAWK_CREDENTIALS_KEY=some-key

PYTHONUNBUFFERED=TRUE

PGAUDIT_LOG_SCOPES='ALL, -MISC'
Expand Down
226 changes: 0 additions & 226 deletions dataworkspace/dataworkspace/apps/applications/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import gevent
from psycopg2 import connect, sql
import requests
from mohawk import Sender
from pytz import utc
from smart_open import open as smart_open

Expand Down Expand Up @@ -1129,31 +1128,6 @@ def create_user_from_sso(
return user


def hawk_request(method, url, body):
hawk_id = settings.ACTIVITY_STREAM_HAWK_CREDENTIALS_ID
hawk_key = settings.ACTIVITY_STREAM_HAWK_CREDENTIALS_KEY

if not hawk_id or not hawk_key:
raise HawkException("Hawk id or key not configured")

content_type = "application/json"
header = Sender(
{"id": hawk_id, "key": hawk_key, "algorithm": "sha256"},
url,
method,
content=body,
content_type=content_type,
).request_header

response = requests.request(
method,
url,
data=body,
headers={"Authorization": header, "Content-Type": content_type},
)
return response.status_code, response.content


@celery_app.task(autoretry_for=(redis.exceptions.LockError,))
@close_all_connections_if_not_in_atomic_block
def create_tools_access_iam_role_task(user_id):
Expand All @@ -1180,119 +1154,6 @@ def _do_create_tools_access_iam_role(user_id):
gevent.sleep(1)


@celery_app.task(autoretry_for=(redis.exceptions.LockError,))
@close_all_connections_if_not_in_atomic_block
def sync_activity_stream_sso_users():
try:
with cache.lock("sso_sync_last_published_lock", blocking_timeout=0, timeout=1800):
_do_sync_activity_stream_sso_users()
except redis.exceptions.LockError:
logger.info("sync_activity_stream_sso_users: Unable to acquire lock. Not running")


def _do_sync_activity_stream_sso_users(page_size=1000):
last_published = cache.get(
"activity_stream_sync_last_published", datetime.datetime.utcfromtimestamp(0)
)
logger.info(
"sync_activity_stream_sso_users: Starting with last published date of %s", last_published
)

endpoint = f"{settings.ACTIVITY_STREAM_BASE_URL}/v3/activities/_search"
ten_seconds_before_last_published = last_published - datetime.timedelta(seconds=10)

query = {
"size": page_size,
"query": {
"bool": {
"filter": [
{"term": {"object.type": "dit:StaffSSO:User"}},
{
"range": {
"published": {
"gte": f"{ten_seconds_before_last_published.strftime('%Y-%m-%dT%H:%M:%S')}"
}
}
},
]
}
},
"sort": [{"published": "asc"}, {"id": "asc"}],
}

while True:
try:
logger.info(
"sync_activity_stream_sso_users: Calling activity stream with query %s",
json.dumps(query),
)
status_code, response = hawk_request(
"GET",
endpoint,
json.dumps(query),
)
except HawkException as e:
logger.error(
"sync_activity_stream_sso_users: Failed to call activity stream with error %s", e
)
break

if status_code != 200:
raise Exception(f"Failed to fetch SSO users: {response}")

response_json = json.loads(response)

if "failures" in response_json["_shards"]:
raise Exception(
f"Failed to fetch SSO users: {json.dumps(response_json['_shards']['failures'])}"
)

records = response_json["hits"]["hits"]

if not records:
break

logger.info(
"sync_activity_stream_sso_users: Fetched %d record(s) from activity stream",
len(records),
)

for record in records:
obj = record["_source"]["object"]

user_id = obj["dit:StaffSSO:User:userId"]
emails = obj["dit:emailAddress"]
primary_email = obj["dit:StaffSSO:User:contactEmailAddress"] or emails[0]
logger.info("sync_activity_stream_sso_users: processing user %s", primary_email)

try:
create_user_from_sso(
user_id,
primary_email,
obj["dit:firstName"],
obj["dit:lastName"],
obj["dit:StaffSSO:User:status"],
check_tools_access_if_user_exists=True,
)
except IntegrityError:
logger.exception("sync_activity_stream_sso_users: Failed to create user record")

last_published_str = records[-1]["_source"]["published"]
last_published = datetime.datetime.strptime(last_published_str, "%Y-%m-%dT%H:%M:%S.%fZ")

if len(records) < page_size:
break

# paginate to next batch of records
query["search_after"] = records[-1]["sort"]

cache.set("activity_stream_sync_last_published", last_published)
logger.info(
"sync_activity_stream_sso_users: Finished with new last published date of %s",
last_published,
)


@celery_app.task(autoretry_for=(redis.exceptions.LockError,))
@close_all_connections_if_not_in_atomic_block
def sync_s3_sso_users():
Expand Down Expand Up @@ -1982,93 +1843,6 @@ def duplicate_tools_monitor():
logger.info("duplicate_tools_alert: Unable to acquire lock to monitor for duplicate tools")


@celery_app.task()
@close_all_connections_if_not_in_atomic_block
def sync_all_sso_users():
with cache.lock("sso_sync_last_published_lock", blocking_timeout=0, timeout=3600):
user_model = get_user_model()
all_users = user_model.objects.all()
seen_user_ids = []
query = {
"size": 1000,
"query": {
"bool": {
"filter": [
{"term": {"object.type": "dit:StaffSSO:User"}},
]
}
},
"sort": [{"published": "asc"}, {"id": "asc"}],
}
while True:
try:
logger.info("Calling activity stream with query %s", json.dumps(query))
status_code, response = hawk_request(
"GET",
f"{settings.ACTIVITY_STREAM_BASE_URL}/v3/activities/_search",
json.dumps(query),
)
except HawkException as e:
logger.error("Failed to call activity stream with error %s", e)
break

if status_code != 200:
raise Exception(f"Failed to fetch SSO users: {response}")

response_json = json.loads(response)

if "failures" in response_json["_shards"]:
raise Exception(
f"Failed to fetch SSO users: {json.dumps(response_json['_shards']['failures'])}"
)

records = response_json["hits"]["hits"]

if not records:
break

logger.info("Fetched %d record(s) from activity stream", len(records))

for record in records:
obj = record["_source"]["object"]
sso_id = obj["dit:StaffSSO:User:userId"]
logger.info("Syncing SSO record for user %s", sso_id)

try:
user = all_users.get(username=sso_id)
except user_model.DoesNotExist:
continue

changed = False

if user.first_name != obj["dit:firstName"]:
changed = True
user.first_name = obj["dit:firstName"]

if user.last_name != obj["dit:firstName"]:
changed = True
user.last_name = obj["dit:lastName"]

if user.profile.sso_status != obj["dit:StaffSSO:User:status"]:
changed = True
user.profile.sso_status = obj["dit:StaffSSO:User:status"]

if changed:
user.save()

seen_user_ids.append(user.id)
query["search_after"] = records[-1]["sort"]

unseen_user_profiles = Profile.objects.exclude(user_id__in=seen_user_ids).filter(
sso_status="active"
)
logger.info(
"%s active users exist locally but not in SSO. Marking as inactive",
unseen_user_profiles.count(),
)
unseen_user_profiles.update(sso_status="inactive")


def _run_orphaned_tools_monitor():
"""
Find and stop any running application instances that meet one of:
Expand Down
13 changes: 0 additions & 13 deletions dataworkspace/dataworkspace/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,6 @@ def aws_fargate_private_ip():
"schedule": crontab(minute=0, hour=0),
"args": (),
},
"sync-sso-users-from-activity-stream": {
"task": "dataworkspace.apps.applications.utils.sync_activity_stream_sso_users",
"schedule": 60 * 2,
"args": (),
},
"sync-sso-users-from-s3": {
"task": "dataworkspace.apps.applications.utils.sync_s3_sso_users",
"schedule": crontab(minute="*/15"), # Run every 15 minutes
Expand Down Expand Up @@ -447,11 +442,6 @@ def aws_fargate_private_ip():
"schedule": 60 * 10,
"args": (),
},
"full-sso-user-sync": {
"task": "dataworkspace.apps.applications.utils.sync_all_sso_users",
"schedule": crontab(minute=0, hour=1),
"args": (),
},
"orphaned-tools-monitor": {
"task": "dataworkspace.apps.applications.utils.orphaned_tools_monitor",
"schedule": 60 * 60 * 2,
Expand Down Expand Up @@ -674,9 +664,6 @@ def sort_database_config(database_list):
("json", "dataworkspace.apps.explorer.exporters.JSONExporter"),
]

ACTIVITY_STREAM_BASE_URL = env.get("ACTIVITY_STREAM_BASE_URL")
ACTIVITY_STREAM_HAWK_CREDENTIALS_ID = env.get("ACTIVITY_STREAM_HAWK_CREDENTIALS_ID")
ACTIVITY_STREAM_HAWK_CREDENTIALS_KEY = env.get("ACTIVITY_STREAM_HAWK_CREDENTIALS_KEY")

DATASETS_DB_INSTANCE_ID = env.get("DATASETS_DB_INSTANCE_ID", "analysisworkspace-dev-test-1-aurora")
PGAUDIT_LOG_SCOPES = env.get("PGAUDIT_LOG_SCOPES")
Expand Down
Loading