Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload recording feature #787

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 55 additions & 20 deletions admin/recording_uploader/deploy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Entrypoint to deploy the uploader to AWS Lambda."""

import os
import pathlib
import re
import subprocess

from loguru import logger
Expand All @@ -10,31 +12,64 @@
CURRENT_DIR = pathlib.Path(__file__).parent


def main(region_name: str = "us-east-1", guided: bool = True) -> None:
def main(region_name: str = "us-east-1", destroy: bool = False) -> None:
"""Deploy the uploader to AWS Lambda.

Args:
region_name (str): The AWS region to deploy the Lambda function to.
guided (bool): Whether to use the guided SAM deployment.
destroy (bool): Whether to delete the Lambda function.
"""
s3 = boto3.client(
"s3",
region_name=region_name,
endpoint_url=f"https://s3.{region_name}.amazonaws.com",
)
bucket = "openadapt"

s3.create_bucket(
ACL="private",
Bucket=bucket,
)

# deploy the code to AWS Lambda
commands = ["sam", "deploy"]
if guided:
commands.append("--guided")
subprocess.run(commands, cwd=CURRENT_DIR, check=True)
logger.info("Lambda function deployed successfully.")
# check if aws credentials are set
if os.getenv("AWS_ACCESS_KEY_ID") is None:
raise ValueError("AWS_ACCESS_KEY_ID is not set")
if os.getenv("AWS_SECRET_ACCESS_KEY") is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not read this from config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script is not supposed to be part of the OpenAdapt app, its an admin script that needs to be run by the owner (you) on a machine that has the relevant aws creds in its environment. From the PR description

From the project root, run python -m scripts.recording_uploader.deploy (ensure that you have the necessary aws creds configured). Once the command completes, note the api url in the output, and paste that onto the config.py's RECORDING_UPLOAD_URL variable

Because config.py is more closely related to settings of the app, I didn't think it'd be useful to add these there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to read from config.py.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you suggest a user override these settings? The default values will be empty in config.py and config.defaults.json, so the only ways are to manually edit the config.json file in the data folder, or we expose it in the dashboard settings page? A regular user won't be needing to edit this, and might get confused if its in the dashboard settings

raise ValueError("AWS_SECRET_ACCESS_KEY is not set")
if destroy:
commands = ["sam", "delete", "--no-prompts"]
else:
s3 = boto3.client(
"s3",
region_name=region_name,
endpoint_url=f"https://s3.{region_name}.amazonaws.com",
)
bucket = "openadapt"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about defining this is config.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same reason as above, plus this is hardcoded because this script will be run once in a while (only when the lambda function is changed). And ideally we won't be changing bucket names between runs.


s3.create_bucket(
ACL="private",
Bucket=bucket,
)
commands = ["sam", "deploy", "--no-fail-on-empty-changeset"]
try:
std_kwargs = {}
if not destroy:
std_kwargs["stderr"] = subprocess.PIPE
std_kwargs["stdout"] = subprocess.PIPE
ret = subprocess.run(
commands, cwd=CURRENT_DIR, check=True, shell=True, **std_kwargs
)
if destroy:
logger.info("Lambda function deleted successfully.")
else:
stdout = ret.stdout.decode("utf-8") if ret.stdout else ""
# find the url, which is in the format https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/upload/
url_match = re.search(
r"https://([^\.]+)\.execute-api\.([^\.]+)\.amazonaws\.com/Prod/upload/",
stdout,
)
if url_match:
logger.info(
f"Lambda function deployed successfully. URL: {url_match.group(0)},"
" copy it to your config."
)
else:
logger.error("Lambda function deployed, but failed to find the URL")
print(stdout)
except subprocess.CalledProcessError as e:
if destroy:
logger.error("Failed to delete Lambda function")
else:
logger.error("Failed to deploy Lambda function")
raise e


if __name__ == "__main__":
Expand Down
4 changes: 3 additions & 1 deletion admin/recording_uploader/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ Resources:
Method: post
Policies:
- Statement:
- Sid: S3PutObjectPolicy
- Sid: S3GetPutDeleteObjectPolicy
Effect: Allow
Action:
- s3:PutObject
- s3:GetObject
- s3:DeleteObject
Resource: !Sub "arn:aws:s3:::openadapt/*"

Outputs:
Expand Down
85 changes: 65 additions & 20 deletions admin/recording_uploader/uploader/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,47 +14,92 @@

def lambda_handler(event: dict, context: Any) -> dict:
"""Main entry point for the lambda function."""
try:
user_id = json.loads(event["body"])["user_id"]
except Exception as e:
print(e)
data = json.loads(event["body"])
lambda_function = data["lambda_function"]
handler = handlers.get(lambda_function)
if not handler:
return {
"statusCode": 400,
"body": json.dumps({"error": "Missing 'user_id' in request body."}),
"body": json.dumps(
{"error": f"Unknown lambda function: {lambda_function}"}
),
}
return {
"statusCode": 200,
"body": json.dumps(get_presigned_url(user_id)),
}
return handler(data)


def get_presigned_url(
user_id: str, bucket: str = DEFAULT_BUCKET, region_name: str = DEFAULT_REGION_NAME
) -> dict:
def get_presigned_url(data: dict) -> dict:
"""Generate a presigned URL for uploading a recording to S3.

Args:
bucket (str): The S3 bucket to upload the recording to.
region_name (str): The AWS region the bucket is in.
data (dict): The data from the request.

Returns:
dict: A dictionary containing the presigned URL.
"""
try:
key = data["key"]
client_method = data["client_method"]
except Exception as e:
print(e)
return {
"statusCode": 400,
"body": json.dumps(
{"error": "Missing 'key' or 'client_method' in request body."}
),
}
s3 = boto3.client(
"s3",
config=Config(signature_version="s3v4"),
region_name=region_name,
endpoint_url=f"https://s3.{region_name}.amazonaws.com",
region_name=DEFAULT_REGION_NAME,
endpoint_url=f"https://s3.{DEFAULT_REGION_NAME}.amazonaws.com",
)
key = f"recordings/{user_id}/{uuid4()}.zip"

presigned_url = s3.generate_presigned_url(
ClientMethod="put_object",
ClientMethod=client_method,
Params={
"Bucket": bucket,
"Bucket": DEFAULT_BUCKET,
"Key": key,
},
ExpiresIn=ONE_HOUR_IN_SECONDS,
)

return {"url": presigned_url}
return {
"statusCode": 200,
"body": json.dumps({"url": presigned_url}),
}


def delete_object(data: dict) -> dict:
"""Delete an object from the s3 bucket

Args:
data (dict): The data from the request.

Returns:
dict: A dictionary containing the deleted status
"""
try:
key = data["key"]
except Exception as e:
print(e)
return {
"statusCode": 400,
"body": json.dumps(
{"error": "Missing 'key' or 'client_method' in request body."}
),
}

s3 = boto3.client(
"s3",
config=Config(signature_version="s3v4"),
region_name=DEFAULT_REGION_NAME,
endpoint_url=f"https://s3.{DEFAULT_REGION_NAME}.amazonaws.com",
)
s3.delete_object(
Bucket=DEFAULT_BUCKET,
Key=key,
)
return {"statusCode": 200, "body": json.dumps({"message": "Deleted"})}


handlers = {"get_presigned_url": get_presigned_url, "delete_object": delete_object}
64 changes: 64 additions & 0 deletions openadapt/alembic/versions/46d03b666cd4_add_upload_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""add_upload_fields

Revision ID: 46d03b666cd4
Revises: 98505a067995
Create Date: 2024-11-10 23:14:21.187860

"""
from alembic import op
import sqlalchemy as sa

import openadapt

# revision identifiers, used by Alembic.
revision = "46d03b666cd4"
down_revision = "98505a067995"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"replay",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"timestamp",
openadapt.models.ForceFloat(precision=10, scale=2, asdecimal=False),
nullable=True,
),
sa.Column("strategy_name", sa.String(), nullable=True),
sa.Column("strategy_args", sa.JSON(), nullable=True),
sa.Column("git_hash", sa.String(), nullable=True),
sa.PrimaryKeyConstraint("id", name=op.f("pk_replay")),
)
with op.batch_alter_table("recording", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"upload_status",
sa.Enum("NOT_UPLOADED", "UPLOADING", "UPLOADED", name="uploadstatus"),
nullable=True,
)
)
batch_op.add_column(sa.Column("uploaded_key", sa.String(), nullable=True))
batch_op.add_column(
sa.Column("uploaded_to_custom_bucket", sa.Boolean(), nullable=True)
)

# update all recordings to not uploaded
op.execute("UPDATE recording SET upload_status = 'NOT_UPLOADED' WHERE 1=1")
# update all recordings to not uploaded to custom bucket
op.execute("UPDATE recording SET uploaded_to_custom_bucket = FALSE WHERE 1=1")

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("recording", schema=None) as batch_op:
batch_op.drop_column("uploaded_to_custom_bucket")
batch_op.drop_column("uploaded_key")
batch_op.drop_column("upload_status")

op.drop_table("replay")
# ### end Alembic commands ###
7 changes: 5 additions & 2 deletions openadapt/app/dashboard/api.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
export async function get<T>(url: string, options: Partial<RequestInit> = {}): Promise<T> {
return fetch(url, options).then((res) => res.json());
export async function get<T>(
url: string,
options: Partial<RequestInit> = {}
): Promise<T> {
return fetch(url, options).then((res) => res.json())
}
46 changes: 43 additions & 3 deletions openadapt/app/dashboard/api/recordings.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@
import json

from fastapi import APIRouter, WebSocket
from starlette.responses import HTMLResponse, RedirectResponse, Response

from openadapt.custom_logger import logger
from openadapt.config import config
from openadapt.custom_logger import logger
from openadapt.db import crud
from openadapt.deprecated.app import cards
from openadapt.events import get_events
from openadapt.models import Recording
from openadapt.plotting import display_event
from openadapt.share import upload_recording_to_s3
from openadapt.utils import image2utf8, row2dict
from openadapt.utils import (
delete_uploaded_recording,
get_recording_url,
image2utf8,
row2dict,
)


class RecordingsAPI:
Expand All @@ -32,7 +38,15 @@ def attach_routes(self) -> APIRouter:
self.app.add_api_route("/stop", self.stop_recording)
self.app.add_api_route("/status", self.recording_status)
self.app.add_api_route(
"/{recording_id}/upload", self.upload_recording, methods=["POST"]
"/cloud/{recording_id}/upload", self.upload_recording, methods=["POST"]
)
self.app.add_api_route(
"/cloud/{recording_id}/view", self.view_recording_on_cloud, methods=["GET"]
)
self.app.add_api_route(
"/cloud/{recording_id}/delete",
self.delete_recording_on_cloud,
methods=["POST"],
)
self.recording_detail_route()
return self.app
Expand Down Expand Up @@ -70,9 +84,35 @@ def recording_status() -> dict[str, bool]:

def upload_recording(self, recording_id: int) -> dict[str, str]:
"""Upload a recording."""
with crud.get_new_session(read_and_write=True) as session:
crud.start_uploading_recording(session, recording_id)
upload_recording_to_s3(config.UNIQUE_USER_ID, recording_id)
return {"message": "Recording uploaded"}

@staticmethod
def view_recording_on_cloud(recording_id: int) -> Response:
"""View a recording from cloud."""
session = crud.get_new_session(read_only=True)
recording = crud.get_recording_by_id(session, recording_id)
if recording.upload_status == Recording.UploadStatus.NOT_UPLOADED:
return HTMLResponse(status_code=404)
url = get_recording_url(
recording.uploaded_key, recording.uploaded_to_custom_bucket
)
return RedirectResponse(url)

@staticmethod
def delete_recording_on_cloud(recording_id: int) -> dict[str, bool]:
"""Delete a recording from cloud"""
session = crud.get_new_session(read_only=True)
recording = crud.get_recording_by_id(session, recording_id)
if recording.upload_status == Recording.UploadStatus.NOT_UPLOADED:
return {"success": True}
delete_uploaded_recording(
recording_id, recording.uploaded_key, recording.uploaded_to_custom_bucket
)
return {"success": True}

def recording_detail_route(self) -> None:
"""Add the recording detail route as a websocket."""

Expand Down
7 changes: 6 additions & 1 deletion openadapt/app/dashboard/api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ def attach_routes(self) -> APIRouter:
return self.app

Category = Literal[
"api_keys", "scrubbing", "record_and_replay", "general", "onboarding"
"api_keys",
"scrubbing",
"record_and_replay",
"general",
"onboarding",
"recording_upload",
]

@staticmethod
Expand Down
Loading
Loading