Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module ramp rate to google sheet #14868

Merged
merged 2 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 77 additions & 67 deletions abr-testing/abr_testing/data_collection/abr_google_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,77 +43,87 @@ def create_data_dictionary(
file_results = json.load(file)
else:
continue
run_id = file_results.get("run_id", "NaN")
if run_id in runs_to_save:
robot = file_results.get("robot_name")
protocol_name = file_results["protocol"]["metadata"].get("protocolName", "")
software_version = file_results.get("API_Version", "")
left_pipette = file_results.get("left", "")
right_pipette = file_results.get("right", "")
extension = file_results.get("extension", "")
(
num_of_errors,
error_type,
error_code,
error_instrument,
error_level,
) = read_robot_logs.get_error_info(file_results)
if isinstance(file_results, dict):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with something like this, it's usually a good idea to split out these checks and the code inside them by creating some boundary where

  • we do some validity checks before entering the boundary, and therefore
  • once inside the boundary, the data is known to be valid

A common way to do it would be to have a function that takes a dict and only a dict - and doesn't check - and returns some data; and before calling the function you check that it's a dict.

Here though a good intermediate step would be to invert the logic to

if not isinstance(file_results, dict):
    continue
run_id = file_results.get("run_id", "NaN")
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha that makes sense.

run_id = file_results.get("run_id", "NaN")
if run_id in runs_to_save:
robot = file_results.get("robot_name")
protocol_name = file_results["protocol"]["metadata"].get(
"protocolName", ""
)
software_version = file_results.get("API_Version", "")
left_pipette = file_results.get("left", "")
right_pipette = file_results.get("right", "")
extension = file_results.get("extension", "")
(
num_of_errors,
error_type,
error_code,
error_instrument,
error_level,
) = read_robot_logs.get_error_info(file_results)

all_modules = get_modules(file_results)
all_modules = get_modules(file_results)

start_time_str, complete_time_str, start_date, run_time_min = (
"",
"",
"",
0.0,
)
try:
start_time = datetime.strptime(
file_results.get("startedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
)
adjusted_start_time = start_time - timedelta(hours=5)
start_date = str(adjusted_start_time.date())
start_time_str = str(adjusted_start_time).split("+")[0]
complete_time = datetime.strptime(
file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
start_time_str, complete_time_str, start_date, run_time_min = (
"",
"",
"",
0.0,
)
adjusted_complete_time = complete_time - timedelta(hours=5)
complete_time_str = str(adjusted_complete_time).split("+")[0]
run_time = complete_time - start_time
run_time_min = run_time.total_seconds() / 60
except ValueError:
pass # Handle datetime parsing errors if necessary
try:
start_time = datetime.strptime(
file_results.get("startedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
)
adjusted_start_time = start_time - timedelta(hours=5)
start_date = str(adjusted_start_time.date())
start_time_str = str(adjusted_start_time).split("+")[0]
complete_time = datetime.strptime(
file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
)
adjusted_complete_time = complete_time - timedelta(hours=5)
complete_time_str = str(adjusted_complete_time).split("+")[0]
run_time = complete_time - start_time
run_time_min = run_time.total_seconds() / 60
except ValueError:
pass # Handle datetime parsing errors if necessary

if run_time_min > 0:
row = {
"Robot": robot,
"Run_ID": run_id,
"Protocol_Name": protocol_name,
"Software Version": software_version,
"Date": start_date,
"Start_Time": start_time_str,
"End_Time": complete_time_str,
"Run_Time (min)": run_time_min,
"Errors": num_of_errors,
"Error_Code": error_code,
"Error_Type": error_type,
"Error_Instrument": error_instrument,
"Error_Level": error_level,
"Left Mount": left_pipette,
"Right Mount": right_pipette,
"Extension": extension,
}
tc_dict = read_robot_logs.thermocycler_commands(file_results)
hs_dict = read_robot_logs.hs_commands(file_results)
tm_dict = read_robot_logs.temperature_module_commands(file_results)
notes = {"Note1": "", "Jira Link": issue_url}
row_2 = {**row, **all_modules, **notes, **hs_dict, **tm_dict, **tc_dict}
headers = list(row_2.keys())
runs_and_robots[run_id] = row_2
else:
continue
# os.remove(file_path)
# print(f"Run ID: {run_id} has a run time of 0 minutes. Run removed.")
if run_time_min > 0:
row = {
"Robot": robot,
"Run_ID": run_id,
"Protocol_Name": protocol_name,
"Software Version": software_version,
"Date": start_date,
"Start_Time": start_time_str,
"End_Time": complete_time_str,
"Run_Time (min)": run_time_min,
"Errors": num_of_errors,
"Error_Code": error_code,
"Error_Type": error_type,
"Error_Instrument": error_instrument,
"Error_Level": error_level,
"Left Mount": left_pipette,
"Right Mount": right_pipette,
"Extension": extension,
}
tc_dict = read_robot_logs.thermocycler_commands(file_results)
hs_dict = read_robot_logs.hs_commands(file_results)
tm_dict = read_robot_logs.temperature_module_commands(file_results)
notes = {"Note1": "", "Jira Link": issue_url}
row_2 = {
**row,
**all_modules,
**notes,
**hs_dict,
**tm_dict,
**tc_dict,
}
headers = list(row_2.keys())
runs_and_robots[run_id] = row_2
else:
continue
# os.remove(file_path)
# print(f"Run ID: {run_id} has a run time of 0 minutes. Run removed.")
return runs_and_robots, headers


Expand Down
20 changes: 9 additions & 11 deletions abr-testing/abr_testing/data_collection/abr_robot_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,6 @@ def get_error_info_from_robot(
nargs=1,
help="Path to long term storage directory for run logs.",
)
parser.add_argument(
"robot_ip",
metavar="ROBOT_IP",
type=str,
nargs=1,
help="IP address of robot as string.",
)
parser.add_argument(
"jira_api_token",
metavar="JIRA_API_TOKEN",
Expand Down Expand Up @@ -130,14 +123,18 @@ def get_error_info_from_robot(
)
args = parser.parse_args()
storage_directory = args.storage_directory[0]
ip = args.robot_ip[0]
ip = str(input("Enter Robot IP: "))
url = "https://opentrons.atlassian.net"
api_token = args.jira_api_token[0]
email = args.email[0]
board_id = args.board_id[0]
reporter_id = args.reporter_id[0]
ticket = jira_tool.JiraTicket(url, api_token, email)
error_runs = get_error_runs_from_robot(ip)
try:
error_runs = get_error_runs_from_robot(ip)
except requests.exceptions.InvalidURL:
print("Invalid IP address.")
sys.exit()
one_run = error_runs[-1] # Most recent run with error.
(
summary,
Expand All @@ -147,7 +144,7 @@ def get_error_info_from_robot(
whole_description_str,
run_log_file_path,
) = get_error_info_from_robot(ip, one_run, storage_directory)
# get calibration data
# Get Calibration Data
saved_file_path_calibration, calibration = read_robot_logs.get_calibration_offsets(
ip, storage_directory
)
Expand All @@ -156,6 +153,7 @@ def get_error_info_from_robot(
# TODO: make argument or see if I can get rid of with using board_id.
project_key = "RABR"
parent_key = project_key + "-" + robot[-1]
# TODO: read board to see if ticket for run id already exists.
# CREATE TICKET
issue_key = ticket.create_ticket(
summary,
Expand All @@ -172,7 +170,7 @@ def get_error_info_from_robot(
issue_url = ticket.open_issue(issue_key)
# MOVE FILES TO ERROR FOLDER.
error_files = [saved_file_path_calibration, run_log_file_path] + file_paths
error_folder_path = os.path.join(storage_directory, str("RABR-238"))
error_folder_path = os.path.join(storage_directory, issue_key)
os.makedirs(error_folder_path, exist_ok=True)
for source_file in error_files:
destination_file = os.path.join(
Expand Down
154 changes: 154 additions & 0 deletions abr-testing/abr_testing/data_collection/module_ramp_rates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""Get ramp rates of modules."""
from abr_testing.automation import google_sheets_tool
from abr_testing.data_collection import read_robot_logs
import gspread # type: ignore[import]
import argparse
import os
import sys
import json
from datetime import datetime
from typing import Dict, Any
import requests


def ramp_rate(file_results: Dict[str, Any]) -> Dict[int, float]:
"""Get ramp rates."""
i = 0
commands = file_results["commands"]
for command in commands:
commandType = command["commandType"]
if (
commandType == "thermocycler/setTargetBlockTemperature"
or commandType == "temperatureModule/setTargetTemperature"
or commandType == "heaterShaker/setTargetTemperature"
):
temp = command["params"].get("celsius", 0.0)
if (
commandType == "thermocycler/waitForBlockTemperature"
or commandType == "temperatureModule/waitForTemperature"
or commandType == "heaterShaker/waitForTemperature"
):
start_time = datetime.strptime(
command.get("startedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
)
end_time = datetime.strptime(
command.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
)
duration = (end_time - start_time).total_seconds()
i += 1
temps_and_durations[duration] = temp
ramp_rates = {}
times = list(temps_and_durations.keys())
for i in range(len(times) - 1):
time1 = times[i]
time2 = times[i + 1]
temp1 = temps_and_durations[time1]
temp2 = temps_and_durations[time2]
ramp_rate = (temp2 - temp1) / (time2)
ramp_rates[i] = ramp_rate
return ramp_rates


if __name__ == "__main__":
# SCRIPT ARGUMENTS
parser = argparse.ArgumentParser(description="Read run logs on google drive.")
parser.add_argument(
"storage_directory",
metavar="STORAGE_DIRECTORY",
type=str,
nargs=1,
help="Path to long term storage directory for run logs.",
)
parser.add_argument(
"google_sheet_name",
metavar="GOOGLE_SHEET_NAME",
type=str,
nargs=1,
help="Google sheet name.",
)
parser.add_argument(
"email", metavar="EMAIL", type=str, nargs=1, help="opentrons gmail."
)
args = parser.parse_args()
storage_directory = args.storage_directory[0]
google_sheet_name = args.google_sheet_name[0]
# FIND CREDENTIALS FILE
try:
credentials_path = os.path.join(storage_directory, "credentials.json")
except FileNotFoundError:
print(f"Add credentials.json file to: {storage_directory}.")
sys.exit()
# CONNECT TO GOOGLE SHEET
try:
google_sheet = google_sheets_tool.google_sheet(
credentials_path, google_sheet_name, 1
)
print(f"Connected to google sheet: {google_sheet_name}")
except gspread.exceptions.APIError:
print("ERROR: Check google sheet name. Check credentials file.")
sys.exit()
run_ids_on_sheet = google_sheet.get_column(2)
runs_and_robots = {}
for filename in os.listdir(storage_directory):
file_path = os.path.join(storage_directory, filename)
if file_path.endswith(".json"):
with open(file_path) as file:
file_results = json.load(file)
else:
continue
# CHECK if file is ramp rate run
run_id = file_results.get("run_id", None)
temps_and_durations: Dict[float, float] = dict()
if run_id is not None and run_id not in run_ids_on_sheet:

ramp_rates = ramp_rate(file_results)
protocol_name = file_results["protocol"]["metadata"].get("protocolName", "")
if "Ramp Rate" in protocol_name:
ip = filename.split("_")[0]
if len(ramp_rates) > 1:
cooling_ramp_rate = abs(min(ramp_rates.values()))
heating_ramp_rate = abs(max(ramp_rates.values()))
start_time = datetime.strptime(
file_results.get("startedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z"
)
start_date = str(start_time.date())
module_serial_number = file_results["modules"][0].get(
"serialNumber", "NaN"
)
try:
response = requests.get(
f"http://{ip}:31950/modules",
headers={"opentrons-version": "3"},
)
modules = response.json()
for module in modules["data"]:
if module["serialNumber"] == module_serial_number:
firmwareVersion = module["firmwareVersion"]
else:
firmwareVersion = "NaN"
except requests.exceptions.ConnectionError:
firmwareVersion = "NaN"
row = {
"Robot": file_results.get("robot_name", ""),
"Run_ID": run_id,
"Protocol_Name": file_results["protocol"]["metadata"].get(
"protocolName", ""
),
"Software Version": file_results.get("API_Version", ""),
"Firmware Version": firmwareVersion,
"Date": start_date,
"Serial Number": module_serial_number,
"Approx. Average Heating Ramp Rate (C/s)": heating_ramp_rate,
"Approx. Average Cooling Ramp Rate (C/s)": cooling_ramp_rate,
}
headers = list(row.keys())
runs_and_robots[run_id] = row
read_robot_logs.write_to_local_and_google_sheet(
runs_and_robots,
storage_directory,
google_sheet_name,
google_sheet,
headers,
)
else:
continue
Loading