diff --git a/config.py b/config.py index d00d9be..ad37565 100644 --- a/config.py +++ b/config.py @@ -33,6 +33,9 @@ # LOG GROUP INFORMATION: LOG_GROUP_NAME = APP_NAME +# MONITORING +AUTO_MONITOR = 'True' + # CLOUDWATCH DASHBOARD CREATION CREATE_DASHBOARD = 'True' # Create a dashboard in Cloudwatch for run CLEAN_DASHBOARD = 'True' # Automatically remove dashboard at end of run with Monitor diff --git a/documentation/DS-documentation/step_1_configuration.md b/documentation/DS-documentation/step_1_configuration.md index 277958a..24d3bbc 100644 --- a/documentation/DS-documentation/step_1_configuration.md +++ b/documentation/DS-documentation/step_1_configuration.md @@ -64,6 +64,11 @@ This queue will be automatically made if it doesn't exist already. *** +### MONITORING +* **AUTO_MONITOR:** Whether or not to have Auto-Monitor automatically monitor your jobs. + +*** + ### CLOUDWATCH DASHBOARD CREATION * **CREATE_DASHBOARD:** Should DS create a Cloudwatch Dashboard that plots run metrics? diff --git a/documentation/DS-documentation/step_4_monitor.md b/documentation/DS-documentation/step_4_monitor.md index 74f923e..63df9f1 100644 --- a/documentation/DS-documentation/step_4_monitor.md +++ b/documentation/DS-documentation/step_4_monitor.md @@ -2,72 +2,68 @@ Your workflow is now submitted. Distributed-Something will keep an eye on a few things for you at this point without you having to do anything else. - * Each instance is labeled with your APP_NAME, so that you can easily find your instances if you want to look at the instance metrics on the Running Instances section of the [EC2 web interface](https://console.aws.amazon.com/ec2/v2/home) to monitor performance. - * You can also look at the whole-cluster CPU and memory usage statistics related to your APP_NAME in the [ECS web interface](https://console.aws.amazon.com/ecs/home). - * Each instance will have an alarm placed on it so that if CPU usage dips below 1% for 15 consecutive minutes (almost always the result of a crashed machine), the instance will be automatically terminated and a new one will take its place. - * Each individual job processed will create a log of the CellProfiler output, and each Docker container will create a log showing CPU, memory, and disk usage. -If you choose to run the monitor script, Distributed-Something can be even more helpful. -The monitor can be run by entering `python run.py monitor files/APP_NAMESpotFleetRequestId.json`. +If you choose to run the Monitor script, Distributed-Something can be even more helpful. -(**Note:** You should run the monitor inside [Screen](https://www.gnu.org/software/screen/), [tmux](https://tmux.github.io/), or another comparable service to keep a network disconnection from killing your monitor; this is particularly critical the longer your run takes.) +## Running Monitor -*** +### Manually running Monitor +Monitor can be run by entering `python run.py monitor files/APP_NAMESpotFleetRequestId.json`. +While the optimal time to initiate Monitor is as soon as you have triggered a run as it downscales infrastructure as necessary, you can run Monitor at any point in time and it will clean up whatever infrastructure remains. -## Monitor file +**Note:** You should run the monitor inside [Screen](https://www.gnu.org/software/screen/), [tmux](https://tmux.github.io/), or another comparable service to keep a network disconnection from killing your monitor; this is particularly critical the longer your run takes. -The JSON monitor file containing all the information Distributed-Something needs will have been automatically created when you sent the instructions to start your cluster in the [previous step](step_3_start_cluster). -The file itself is quite simple and contains the following information: - -``` -{"MONITOR_FLEET_ID" : "sfr-9999ef99-99fc-9d9d-9999-9999999e99ab", -"MONITOR_APP_NAME" : "2021_12_13_Project_Analysis", -"MONITOR_ECS_CLUSTER" : "default", -"MONITOR_QUEUE_NAME" : "2021_12_13_Project_AnalysisQueue", -"MONITOR_BUCKET_NAME" : "bucket-name", -"MONITOR_LOG_GROUP_NAME" : "2021_12_13_Project_Analysis", -"MONITOR_START_TIME" : "1649187798951"} -``` - -For any DS run where you have run [`startCluster`](step_3_start_cluster) more than once, the most recent values will overwrite the older values in the monitor file. -Therefore, if you have started multiple spot fleets (which you might do in different subnets if you are having trouble getting enough capacity in your spot fleet, for example), monitor will only clean up the latest request unless you manually edit the `MONITOR_FLEET_ID` to match the spot fleet you have kept. - -*** +### Using Auto-Monitor +Instead of manually triggering Monitor, you can have a version of Monitor automatically initiate after you [start your cluster](step_3_start_cluster.md) by setting `AUTO_MONITOR = 'True'` in your [config file](step_1_configuration.md). +Auto-Monitor is an AWS Lambda function that is triggered by alarms placed on the SQS queue. +Read more about the [SQS Queue](SQS_QUEUE_information.md) to better understand the alarm metrics. ## Monitor functions ### While your analysis is running +* Scales down the spot fleet request to match the number of remaining jobs WITHOUT force terminating them. +This happens every 10 minutes with manual Monitor or when the are no Visible Messages in your queue for Auto-Monitor. +* Deletes the alarms for any instances that have been terminated in the last 24 hours (because of spot prices rising above your maximum bid, machine crashes, etc). +This happens every hour with manual Monitor or when the are no Visible Messages in your queue for Auto-Monitor. -* Checks your queue once per minute to see how many jobs are currently processing and how many remain to be processed. - -* Once per hour, it deletes the alarms for any instances that have been terminated in the last 24 hours (because of spot prices rising above your maximum bid, machine crashes, etc). - -### When the number of jobs in your queue goes to 0 - +### When your queue is totally empty (there are no Visible or Not Visible messages) * Downscales the ECS service associated with your APP_NAME. - * Deletes all the alarms associated with your spot fleet (both the currently running and the previously terminated instances). - * Shuts down your spot fleet to keep you from incurring charges after your analysis is over. - * Gets rid of the queue, service, and task definition created for this analysis. - * Exports all the logs from your analysis onto your S3 bucket. +* Deletes your Cloudwatch Dashboard if you created it and set `CLEAN_DASHBOARD = 'True'` in your [config file](step_1_configuration.md). -* Deletes your Cloudwatch Dashboard if you created it and set CLEAN_DASHBOARD to True. +## Cheapest mode + +If you are manually triggering Monitor, you can run the monitor in an optional "cheapest" mode, which will downscale the number of requested machines (but not RUNNING machines) to one machine 15 minutes after the monitor is engaged. +You can engage cheapest mode by adding `True` as a final configurable parameter when starting the monitor, aka `python run.py monitor files/APP_NAMESpotFleetRequestId.json True` + +Cheapest mode is cheapest because it will remove all but 1 machine as soon as that machine crashes and/or runs out of jobs to do; this can save you money, particularly in multi-CPU Dockers running long jobs. +This mode is optional because running this way involves some inherent risks. +If machines stall out due to processing errors, they will not be replaced, meaning your job will take overall longer. +Additionally, if there is limited capacity for your requested configuration when you first start (e.g. you want 200 machines but AWS says it can currently only allocate you 50), more machines will not be added if and when they become available in cheapest mode as they would in normal mode. *** -## Cheapest mode +## Monitor file -You can run the monitor in an optional "cheapest" mode, which will downscale the number of requested machines (but not RUNNING machines) to one 15 minutes after the monitor is engaged. -You can engage cheapest mode by adding `True` as a final configurable parameter when starting the monitor, aka `python run.py monitor files/APP_NAMESpotFleetRequestId.json True` +The JSON monitor file containing all the information Distributed-Something needs will have been automatically created when you sent the instructions to start your cluster in the [previous step](step_3_start_cluster). +The file itself is quite simple and contains the following information: -Cheapest mode is cheapest because it will remove all but 1 machine as soon as that machine crashes and/or runs out of jobs to do; this can save you money, particularly in multi-CPU Dockers running long jobs. +``` +{"MONITOR_FLEET_ID" : "sfr-9999ef99-99fc-9d9d-9999-9999999e99ab", +"MONITOR_APP_NAME" : "2021_12_13_Project_Analysis", +"MONITOR_ECS_CLUSTER" : "default", +"MONITOR_QUEUE_NAME" : "2021_12_13_Project_AnalysisQueue", +"MONITOR_BUCKET_NAME" : "bucket-name", +"MONITOR_LOG_GROUP_NAME" : "2021_12_13_Project_Analysis", +"MONITOR_START_TIME" : "1649187798951"} +``` -This mode is optional because running this way involves some inherent risks- if machines stall out due to processing errors, they will not be replaced, meaning your job will take overall longer. -Additionally, if there is limited capacity for your requested configuration when you first start (e.g. you want 200 machines but AWS says it can currently only allocate you 50), more machines will not be added if and when they become available in cheapest mode as they would in normal mode. +For any DS run where you have run [`startCluster`](step_3_start_cluster) more than once, the most recent values will overwrite the older values in the monitor file. +Therefore, if you have started multiple spot fleets (which you might do in different subnets if you are having trouble getting enough capacity in your spot fleet, for example), Monitor will only clean up the latest request unless you manually edit the `MONITOR_FLEET_ID` to match the spot fleet you have kept. \ No newline at end of file diff --git a/lambda_function.py b/lambda_function.py new file mode 100644 index 0000000..e14ba94 --- /dev/null +++ b/lambda_function.py @@ -0,0 +1,187 @@ +import boto3 +import datetime +import botocore +import json + +s3 = boto3.client("s3") +ecs = boto3.client("ecs") +ec2 = boto3.client("ec2") +cloudwatch = boto3.client("cloudwatch") +sqs = boto3.client("sqs") + +bucket = "BUCKET_NAME" + + +def killdeadAlarms(fleetId, monitorapp, project): + checkdates = [ + datetime.datetime.now().strftime("%Y-%m-%d"), + (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y-%m-%d"), + ] + todel = [] + for eachdate in checkdates: + datedead = ec2.describe_spot_fleet_request_history( + SpotFleetRequestId=fleetId, StartTime=eachdate + ) + for eachevent in datedead["HistoryRecords"]: + if eachevent["EventType"] == "instanceChange": + if eachevent["EventInformation"]["EventSubType"] == "terminated": + todel.append(eachevent["EventInformation"]["InstanceId"]) + todel = [f"{project}_{x}" for x in todel] + cloudwatch.delete_alarms(AlarmNames=todel) + print("Old alarms deleted") + + +def seeIfLogExportIsDone(logExportId): + while True: + result = cloudwatch.describe_export_tasks(taskId=logExportId) + if result["exportTasks"][0]["status"]["code"] != "PENDING": + if result["exportTasks"][0]["status"]["code"] != "RUNNING": + print(result["exportTasks"][0]["status"]["code"]) + break + time.sleep(30) + + +def downscaleSpotFleet(queue, spotFleetID): + response = sqs.get_queue_url(QueueName=queue) + queueUrl = response["QueueUrl"] + response = sqs.get_queue_attributes( + QueueUrl=queueUrl, + AttributeNames=[ + "ApproximateNumberOfMessages", + "ApproximateNumberOfMessagesNotVisible", + ], + ) + visible = int(response["Attributes"]["ApproximateNumberOfMessages"]) + nonvisible = int(response["Attributes"]["ApproximateNumberOfMessagesNotVisible"]) + status = ec2.describe_spot_fleet_instances(SpotFleetRequestId=spotFleetID) + if nonvisible < len(status["ActiveInstances"]): + result = ec2.modify_spot_fleet_request( + ExcessCapacityTerminationPolicy="noTermination", + TargetCapacity=str(nonvisible), + SpotFleetRequestId=spotFleetID, + ) + + +def lambda_handler(event, lambda_context): + # Triggered any time SQS queue ApproximateNumberOfMessagesVisible = 0 + # OR ApproximateNumberOfMessagesNotVisible = 0 + messagestring = event["Records"][0]["Sns"]["Message"] + messagedict = json.loads(messagestring) + queueId = messagedict["Trigger"]["Dimensions"][0]["value"] + project = queueId.rsplit("_", 1)[0] + + # Download monitor file + monitor_file_name = f"{queueId.split('Queue')[0]}SpotFleetRequestId.json" + monitor_local_name = f"/tmp/{monitor_file_name}" + monitor_on_bucket_name = f"monitors/{monitor_file_name}" + + with open(monitor_local_name, "wb") as f: + try: + s3.download_fileobj(bucket, monitor_on_bucket_name, f) + except botocore.exceptions.ClientError as error: + print("Error retrieving monitor file.") + return + with open(monitor_local_name, "r") as input: + monitorInfo = json.load(input) + + monitorcluster = monitorInfo["MONITOR_ECS_CLUSTER"] + monitorapp = monitorInfo["MONITOR_APP_NAME"] + fleetId = monitorInfo["MONITOR_FLEET_ID"] + loggroupId = monitorInfo["MONITOR_LOG_GROUP_NAME"] + starttime = monitorInfo["MONITOR_START_TIME"] + CLEAN_DASHBOARD = monitorInfo["CLEAN_DASHBOARD"] + print(f"Monitor triggered for {monitorcluster} {monitorapp} {fleetId} {loggroupId}") + + # If no visible messages, downscale machines + if "ApproximateNumberOfMessagesVisible" in event["Records"][0]["Sns"]["Message"]: + print("No visible messages. Tidying as we go.") + killdeadAlarms(fleetId, monitorapp, project) + downscaleSpotFleet(queueId, fleetId) + + # If no messages in progress, cleanup + if "ApproximateNumberOfMessagesNotVisible" in event["Records"][0]["Sns"]["Message"]: + print("No messages in progress. Cleaning up.") + ecs.update_service( + cluster=monitorcluster, + service=f"{monitorapp}Service", + desiredCount=0, + ) + print("Service has been downscaled") + + # Delete the alarms from active machines and machines that have died. + active_dictionary = ec2.describe_spot_fleet_instances( + SpotFleetRequestId=fleetId + ) + active_instances = [] + for instance in active_dictionary["ActiveInstances"]: + active_instances.append(instance["InstanceId"]) + cloudwatch.delete_alarms(AlarmNames=active_instances) + killdeadAlarms(fleetId, monitorapp, project) + + # Read spot fleet id and terminate all EC2 instances + ec2.cancel_spot_fleet_requests( + SpotFleetRequestIds=[fleetId], TerminateInstances=True + ) + print("Fleet shut down.") + + # Remove SQS queue, ECS Task Definition, ECS Service + ECS_TASK_NAME = monitorapp + "Task" + ECS_SERVICE_NAME = monitorapp + "Service" + + print("Deleting existing queue.") + queueoutput = sqs.list_queues(QueueNamePrefix=queueId) + try: + if len(queueoutput["QueueUrls"]) == 1: + queueUrl = queueoutput["QueueUrls"][0] + else: # In case we have "AnalysisQueue" and "AnalysisQueue1" and only want to delete the first of those + for eachUrl in queueoutput["QueueUrls"]: + if eachUrl.split("/")[-1] == queueName: + queueUrl = eachUrl + sqs.delete_queue(QueueUrl=queueUrl) + except KeyError: + print("Can't find queue to delete.") + + print("Deleting service") + try: + ecs.delete_service(cluster=monitorcluster, service=ECS_SERVICE_NAME) + except: + print("Couldn't delete service.") + + print("De-registering task") + taskArns = ecs.list_task_definitions() + for eachtask in taskArns["taskDefinitionArns"]: + fulltaskname = eachtask.split("/")[-1] + ecs.deregister_task_definition(taskDefinition=fulltaskname) + + print("Removing cluster if it's not the default and not otherwise in use") + if monitorcluster != "default": + result = ecs.describe_clusters(clusters=[monitorcluster]) + if ( + sum( + [ + result["clusters"][0]["pendingTasksCount"], + result["clusters"][0]["runningTasksCount"], + result["clusters"][0]["activeServicesCount"], + ] + ) + == 0 + ): + ecs.delete_cluster(cluster=monitorcluster) + + # Remove alarms that triggered monitor + print("Removing alarms that triggered Monitor") + cloudwatch.delete_alarms( + AlarmNames=[ + f"ApproximateNumberOfMessagesVisibleisZero_{monitorapp}", + f"ApproximateNumberOfMessagesNotVisibleisZero_{monitorapp}", + ] + ) + + # Remove Cloudwatch dashboard if created and cleanup desired + if CLEAN_DASHBOARD.lower() == "true": + dashboard_list = cloudwatch.list_dashboards() + for entry in dashboard_list["DashboardEntries"]: + if monitorapp in entry["DashboardName"]: + cloudwatch.delete_dashboards( + DashboardNames=[entry["DashboardName"]] + ) diff --git a/run.py b/run.py index be29383..da32537 100644 --- a/run.py +++ b/run.py @@ -11,6 +11,7 @@ CREATE_DASHBOARD = 'False' CLEAN_DASHBOARD = 'False' +AUTO_MONITOR = 'False' from config import * @@ -591,8 +592,16 @@ def startCluster(): createMonitor.write('"MONITOR_BUCKET_NAME" : "'+AWS_BUCKET+'",\n') createMonitor.write('"MONITOR_LOG_GROUP_NAME" : "'+LOG_GROUP_NAME+'",\n') createMonitor.write('"MONITOR_START_TIME" : "'+ starttime+'"}\n') + createMonitor.write('"CLEAN_DASHBOARD" : "'+ CLEAN_DASHBOARD+'"}\n') createMonitor.close() + # Upload monitor file to S3 so it can be read by Auto-Monitor lambda function + if AUTO_MONITOR.lower()=='true': + s3 = boto3.client("s3") + json_on_bucket_name = f'monitors/{APP_NAME}SpotFleetRequestId.json' # match path set in lambda function + with open(monitor_file_name, "rb") as a: + s3.put_object(Body=a, Bucket=AWS_BUCKET, Key=json_on_bucket_name) + # Step 4: Create a log group for this app and date if one does not already exist logclient=boto3.client('logs') loggroupinfo=logclient.describe_log_groups(logGroupNamePrefix=LOG_GROUP_NAME) @@ -642,6 +651,36 @@ def startCluster(): print ("Creating CloudWatch dashboard for run metrics") create_dashboard(requestInfo) + if AUTO_MONITOR.lower()=='true': + # Create alarms that will trigger Monitor based on SQS queue metrics + cloudwatch = boto3.client("cloudwatch") + metricnames = [ + "ApproximateNumberOfMessagesNotVisible", + "ApproximateNumberOfMessagesVisible", + ] + sns = boto3.client("sns") + MonitorARN = sns.create_topic(Name="Monitor")['TopicArn'] # returns ARN since topic already exists + for metric in metricnames: + response = cloudwatch.put_metric_alarm( + AlarmName=f'{metric}isZero_{APP_NAME}', + ActionsEnabled=True, + OKActions=[], + AlarmActions=[MonitorARN], + InsufficientDataActions=[], + MetricName=metric, + Namespace="AWS/SQS", + Statistic="Average", + Dimensions=[ + {"Name": "QueueName", "Value": f'{APP_NAME}Queue'} + ], + Period=300, + EvaluationPeriods=1, + DatapointsToAlarm=1, + Threshold=0, + ComparisonOperator="LessThanOrEqualToThreshold", + TreatMissingData="missing", + ) + ################################# # SERVICE 4: MONITOR JOB #################################