Skip to content

Commit

Permalink
Remove the activity stream sync, and use an S3 file
Browse files Browse the repository at this point in the history
  • Loading branch information
chopkinsmade committed Aug 13, 2024
1 parent 7825143 commit ac36d45
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 761 deletions.
4 changes: 0 additions & 4 deletions .envs/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,6 @@ REMOTE_PDB_HOST=0.0.0.0
REMOTE_PDB_PORT=4444
GEVENT_SUPPORT=True

ACTIVITY_STREAM_BASE_URL=https://url.to.activity.stream/
ACTIVITY_STREAM_HAWK_CREDENTIALS_ID=some-id
ACTIVITY_STREAM_HAWK_CREDENTIALS_KEY=some-key

PYTHONUNBUFFERED=TRUE

PGAUDIT_LOG_SCOPES='ALL, -MISC'
Expand Down
4 changes: 0 additions & 4 deletions .envs/test-e2e.env
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,6 @@ REMOTE_PDB_HOST=0.0.0.0
REMOTE_PDB_PORT=4444
GEVENT_SUPPORT=True

ACTIVITY_STREAM_BASE_URL=https://url.to.activity.stream/
ACTIVITY_STREAM_HAWK_CREDENTIALS_ID=some-id
ACTIVITY_STREAM_HAWK_CREDENTIALS_KEY=some-key

PYTHONUNBUFFERED=TRUE

PGAUDIT_LOG_SCOPES='ALL, -MISC'
Expand Down
292 changes: 33 additions & 259 deletions dataworkspace/dataworkspace/apps/applications/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import gevent
from psycopg2 import connect, sql
import requests
from mohawk import Sender
from pytz import utc
from smart_open import open as smart_open

Expand Down Expand Up @@ -1127,31 +1126,6 @@ def create_user_from_sso(
return user


def hawk_request(method, url, body):
hawk_id = settings.ACTIVITY_STREAM_HAWK_CREDENTIALS_ID
hawk_key = settings.ACTIVITY_STREAM_HAWK_CREDENTIALS_KEY

if not hawk_id or not hawk_key:
raise HawkException("Hawk id or key not configured")

content_type = "application/json"
header = Sender(
{"id": hawk_id, "key": hawk_key, "algorithm": "sha256"},
url,
method,
content=body,
content_type=content_type,
).request_header

response = requests.request(
method,
url,
data=body,
headers={"Authorization": header, "Content-Type": content_type},
)
return response.status_code, response.content


@celery_app.task(autoretry_for=(redis.exceptions.LockError,))
@close_all_connections_if_not_in_atomic_block
def create_tools_access_iam_role_task(user_id):
Expand All @@ -1178,119 +1152,6 @@ def _do_create_tools_access_iam_role(user_id):
gevent.sleep(1)


@celery_app.task(autoretry_for=(redis.exceptions.LockError,))
@close_all_connections_if_not_in_atomic_block
def sync_activity_stream_sso_users():
try:
with cache.lock("sso_sync_last_published_lock", blocking_timeout=0, timeout=1800):
_do_sync_activity_stream_sso_users()
except redis.exceptions.LockError:
logger.info("sync_activity_stream_sso_users: Unable to acquire lock. Not running")


def _do_sync_activity_stream_sso_users(page_size=1000):
last_published = cache.get(
"activity_stream_sync_last_published", datetime.datetime.utcfromtimestamp(0)
)
logger.info(
"sync_activity_stream_sso_users: Starting with last published date of %s", last_published
)

endpoint = f"{settings.ACTIVITY_STREAM_BASE_URL}/v3/activities/_search"
ten_seconds_before_last_published = last_published - datetime.timedelta(seconds=10)

query = {
"size": page_size,
"query": {
"bool": {
"filter": [
{"term": {"object.type": "dit:StaffSSO:User"}},
{
"range": {
"published": {
"gte": f"{ten_seconds_before_last_published.strftime('%Y-%m-%dT%H:%M:%S')}"
}
}
},
]
}
},
"sort": [{"published": "asc"}, {"id": "asc"}],
}

while True:
try:
logger.info(
"sync_activity_stream_sso_users: Calling activity stream with query %s",
json.dumps(query),
)
status_code, response = hawk_request(
"GET",
endpoint,
json.dumps(query),
)
except HawkException as e:
logger.error(
"sync_activity_stream_sso_users: Failed to call activity stream with error %s", e
)
break

if status_code != 200:
raise Exception(f"Failed to fetch SSO users: {response}")

response_json = json.loads(response)

if "failures" in response_json["_shards"]:
raise Exception(
f"Failed to fetch SSO users: {json.dumps(response_json['_shards']['failures'])}"
)

records = response_json["hits"]["hits"]

if not records:
break

logger.info(
"sync_activity_stream_sso_users: Fetched %d record(s) from activity stream",
len(records),
)

for record in records:
obj = record["_source"]["object"]

user_id = obj["dit:StaffSSO:User:userId"]
emails = obj["dit:emailAddress"]
primary_email = obj["dit:StaffSSO:User:contactEmailAddress"] or emails[0]
logger.info("sync_activity_stream_sso_users: processing user %s", primary_email)

try:
create_user_from_sso(
user_id,
primary_email,
obj["dit:firstName"],
obj["dit:lastName"],
obj["dit:StaffSSO:User:status"],
check_tools_access_if_user_exists=True,
)
except IntegrityError:
logger.exception("sync_activity_stream_sso_users: Failed to create user record")

last_published_str = records[-1]["_source"]["published"]
last_published = datetime.datetime.strptime(last_published_str, "%Y-%m-%dT%H:%M:%S.%fZ")

if len(records) < page_size:
break

# paginate to next batch of records
query["search_after"] = records[-1]["sort"]

cache.set("activity_stream_sync_last_published", last_published)
logger.info(
"sync_activity_stream_sso_users: Finished with new last published date of %s",
last_published,
)


@celery_app.task(autoretry_for=(redis.exceptions.LockError,))
@close_all_connections_if_not_in_atomic_block
def sync_s3_sso_users():
Expand Down Expand Up @@ -1351,34 +1212,28 @@ def _process_staff_sso_file(
last_name = user_obj.get("dit:lastName")
status = user_obj.get("dit:StaffSSO:User:status")

if settings.S3_SSO_IMPORT_ENABLED:
logger.info(
"sync_s3_sso_users: User id %s published date %s is after previous date %s, creating the user from sso",
logger.info(
"sync_s3_sso_users: User id %s published date %s is after previous date %s, creating the user from sso",
user_id,
published_date,
last_processed_datetime,
)
try:
user = create_user_from_sso(
user_id,
published_date,
last_processed_datetime,
primary_email,
first_name,
last_name,
status,
check_tools_access_if_user_exists=True,
)
try:
user = create_user_from_sso(
user_id,
primary_email,
first_name,
last_name,
status,
check_tools_access_if_user_exists=True,
)

if published_date > new_last_processed_datetime:
new_last_processed_datetime = published_date
if published_date > new_last_processed_datetime:
new_last_processed_datetime = published_date

seen_user_ids.append(user_id)
except IntegrityError:
logger.exception("sync_s3_sso_users: Failed to create user record")
else:
logger.info(
"sync_s3_sso_users: S3_SSO_IMPORT_ENABLED is disabled, user %s will not be added",
user_id,
)
seen_user_ids.append(user_id)
except IntegrityError:
logger.exception("sync_s3_sso_users: Failed to create user record")

return seen_user_ids, new_last_processed_datetime

Expand Down Expand Up @@ -1421,24 +1276,30 @@ def _do_sync_s3_sso_users():
user__username__in=seen_result[0]
).filter(sso_status="active")

logger.info(
"sync_s3_sso_users: %s active users exist locally but not in SSO. Marking as inactive",
unseen_user_profiles.count(),
)
unseen_user_profiles.update(sso_status="inactive")

logger.info("sync_s3_sso_users: New last_published date for cache %s", new_last_processed)
if settings.S3_SSO_IMPORT_ENABLED:
logger.info(
"sync_s3_sso_users: %s active users exist locally but not in SSO. Marking as inactive",
unseen_user_profiles.count(),
)
unseen_user_profiles.update(sso_status="inactive")
else:
logger.info(
"sync_s3_sso_users: %s active users exist locally but not in SSO. S3_SSO_IMPORT_ENABLED is FALSE",
unseen_user_profiles.count(),
)

# At the end of the loop, delete all loaded files
delete_keys = [{"Key": file.key} for file in files]
logger.info("sync_s3_sso_users: Deleting keys %s", delete_keys)
bucket.delete_objects(Delete={"Objects": delete_keys})

# At the end of the loop set the cache
cache.set("s3_sso_sync_last_published", new_last_processed)
else:
logger.info("sync_s3_sso_users: No files to process")

# Always reset the cache
logger.info("sync_s3_sso_users: New last_published date for cache %s", new_last_processed)
cache.set("s3_sso_sync_last_published", new_last_processed, timeout=30 * 60) # 30 minutes


def fetch_visualisation_log_events(log_group, log_stream):
client = boto3.client("logs")
Expand Down Expand Up @@ -1967,93 +1828,6 @@ def duplicate_tools_monitor():
logger.info("duplicate_tools_alert: Unable to acquire lock to monitor for duplicate tools")


@celery_app.task()
@close_all_connections_if_not_in_atomic_block
def sync_all_sso_users():
with cache.lock("sso_sync_last_published_lock", blocking_timeout=0, timeout=3600):
user_model = get_user_model()
all_users = user_model.objects.all()
seen_user_ids = []
query = {
"size": 1000,
"query": {
"bool": {
"filter": [
{"term": {"object.type": "dit:StaffSSO:User"}},
]
}
},
"sort": [{"published": "asc"}, {"id": "asc"}],
}
while True:
try:
logger.info("Calling activity stream with query %s", json.dumps(query))
status_code, response = hawk_request(
"GET",
f"{settings.ACTIVITY_STREAM_BASE_URL}/v3/activities/_search",
json.dumps(query),
)
except HawkException as e:
logger.error("Failed to call activity stream with error %s", e)
break

if status_code != 200:
raise Exception(f"Failed to fetch SSO users: {response}")

response_json = json.loads(response)

if "failures" in response_json["_shards"]:
raise Exception(
f"Failed to fetch SSO users: {json.dumps(response_json['_shards']['failures'])}"
)

records = response_json["hits"]["hits"]

if not records:
break

logger.info("Fetched %d record(s) from activity stream", len(records))

for record in records:
obj = record["_source"]["object"]
sso_id = obj["dit:StaffSSO:User:userId"]
logger.info("Syncing SSO record for user %s", sso_id)

try:
user = all_users.get(username=sso_id)
except user_model.DoesNotExist:
continue

changed = False

if user.first_name != obj["dit:firstName"]:
changed = True
user.first_name = obj["dit:firstName"]

if user.last_name != obj["dit:firstName"]:
changed = True
user.last_name = obj["dit:lastName"]

if user.profile.sso_status != obj["dit:StaffSSO:User:status"]:
changed = True
user.profile.sso_status = obj["dit:StaffSSO:User:status"]

if changed:
user.save()

seen_user_ids.append(user.id)
query["search_after"] = records[-1]["sort"]

unseen_user_profiles = Profile.objects.exclude(user_id__in=seen_user_ids).filter(
sso_status="active"
)
logger.info(
"%s active users exist locally but not in SSO. Marking as inactive",
unseen_user_profiles.count(),
)
unseen_user_profiles.update(sso_status="inactive")


def _run_orphaned_tools_monitor():
"""
Find and stop any running application instances that meet one of:
Expand Down
Loading

0 comments on commit ac36d45

Please sign in to comment.