From f958ad36998842567526c3a66320fb27724824ec Mon Sep 17 00:00:00 2001 From: lukemun Date: Wed, 22 Jan 2025 10:15:20 -0500 Subject: [PATCH] Add Celery Queue for subscribe send email workflow - Setups a celery queue backed by redis - Updates the subscribe feature in nextjs to do the following depending on error_boundary=true/false -- Send a enqueue request to flask -- Showcase error boundary - Adds enqueue endpoint - Update run.sh to start celery with the flask server --- README.md | 4 +-- deploy.sh | 6 +++-- flask/requirements.txt | 1 + flask/run.sh | 5 ++++ flask/src/main.py | 13 +++++++++- flask/src/queues/__init__.py | 0 flask/src/queues/celery.py | 33 +++++++++++++++++++++++++ flask/src/queues/tasks.py | 16 ++++++++++++ next/src/app/api/enqueue/route.js | 36 +++++++++++++++++++++++++++ next/src/app/page.jsx | 4 ++- next/src/ui/Footer.jsx | 41 ++++++++++++++++++++++++++++--- 11 files changed, 150 insertions(+), 9 deletions(-) create mode 100644 flask/src/queues/__init__.py create mode 100644 flask/src/queues/celery.py create mode 100644 flask/src/queues/tasks.py create mode 100644 next/src/app/api/enqueue/route.js diff --git a/README.md b/README.md index 09a3a8f81..1b526f5c4 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Query params to be added to the demo app. These query parameters can be stacked - `?userEmail=someemail@example.com` - lets you [pass in a specific user email](https://github.com/sentry-demos/empower/blob/fce289530f72ce47fe2c7482cdbd9aa8bcc13b6e/react/src/index.js#L218-L219) - `?frontendSlowdown=true` - used in the [frontend-only demo flow](https://github.com/sentry-demos/empower/blob/fce289530f72ce47fe2c7482cdbd9aa8bcc13b6e/react/src/index.js#L200-L207), which showcases a frontend slowdown via profiling. - `?rageclick=true` - causes the checkout button to stop working, so you can rageclick it. This will prevent the checkout error from happening. If you want to still demo the checkout error AND a rageclick, you can rageclick manually on the 'Contact Us' button that shows on the page after the Checkout Error occurs. - +- `?error_boundary=true` - enables the error boundary functionality in subscribe instead of putting a message on the queue (NextJS only, react will always use error boundary) ``` # example https://localhost:5000/?se=chris&backend=flask&frontendSlowdown=true @@ -183,6 +183,6 @@ gcloud config list, to display current account 1. Add your OPENAI_API_KEY= to local.env 2. Run next and flask (./deploy.sh --env=local next flask) -3. Get suggestion button should show automatically +3. Get suggestion button should show automatically On main page load, next will check with flask if it has the OPEN_API_KEY and conditionally show the get suggestion input. diff --git a/deploy.sh b/deploy.sh index 29e4301fc..6424beca9 100755 --- a/deploy.sh +++ b/deploy.sh @@ -106,6 +106,8 @@ function cleanup { if [ "$generated_envs" != "" ]; then rm -f $generated_envs # bash only (passed as separate args) fi + # terminate the celery workers + pkill -f "celery worker" } trap cleanup EXIT @@ -145,7 +147,7 @@ for proj in $projects; do # bash only if [ "$proj" == "next" ]; then # Next env variables need to start with NEXT_PUBLIC_* backend_var=$(var_name.sh NEXT_PUBLIC_%s_BACKEND $be_proj) - else + else backend_var=$(var_name.sh %s_APP_%s_BACKEND $proj $be_proj) fi . get_proj_var.sh "%s_LOCAL_PORT" $be_proj # sets $local_port @@ -213,7 +215,7 @@ for proj in $projects; do # bash only if [[ "$proj" =~ ^crons- ]]; then . get_proj_var.sh "%s_DEPLOY_DIR" $proj escaped_deploy_dir=$(echo "$deploy_dir" | sed 's_/_\\/_g') - sed -e 's//'$escaped_deploy_dir'/g' crontab.template > crontab + sed -e 's//'$escaped_deploy_dir'/g' crontab.template > crontab fi ./deploy_project.sh else diff --git a/flask/requirements.txt b/flask/requirements.txt index 4835f87c0..249bef38e 100644 --- a/flask/requirements.txt +++ b/flask/requirements.txt @@ -15,3 +15,4 @@ Werkzeug==3.0.3 flask_caching==2.3.0 openai==1.52.2 tiktoken==0.8.0 +celery==5.4.0 diff --git a/flask/run.sh b/flask/run.sh index 69ae17cf8..5545a1853 100755 --- a/flask/run.sh +++ b/flask/run.sh @@ -24,4 +24,9 @@ if [ -z "$ACTIVE_ACCOUNT" ]; then fi gcloud compute ssh redis-relay --zone=us-central1-a -- -N -L 6379:10.251.35.179:6379 & +# wait for relay to be setup before celery connects +sleep 1 + +celery -A src.queues.celery worker -l INFO & + flask run --port $LOCAL_PORT diff --git a/flask/src/main.py b/flask/src/main.py index 6a08caef8..4ad9ed734 100644 --- a/flask/src/main.py +++ b/flask/src/main.py @@ -11,6 +11,7 @@ import dotenv from .db import get_products, get_products_join, get_inventory from .utils import parseHeaders, get_iterator +from .queues.tasks import sendEmail import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration @@ -103,7 +104,7 @@ def __init__(self, import_name, *args, **kwargs): CORS(app) -redis_host = os.environ.get("REDISHOST") +redis_host = os.environ.get("REDISHOST", "localhost") redis_port = int(os.environ.get("REDISPORT")) cache_config = { @@ -120,6 +121,16 @@ def __init__(self, import_name, *args, **kwargs): redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True) +@app.route('/enqueue', methods=['POST']) +def enqueue(): + body = json.loads(request.data) + print(body['email']) + email = body['email'] + with sentry_sdk.start_transaction(name="email-subscribe-task"): + r = sendEmail.apply_async(args=[email]) + print(r.task_id) + return jsonify({"status": "success"}), 200 + @app.route('/suggestion', methods=['GET']) def suggestion(): diff --git a/flask/src/queues/__init__.py b/flask/src/queues/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/flask/src/queues/celery.py b/flask/src/queues/celery.py new file mode 100644 index 000000000..a739fa247 --- /dev/null +++ b/flask/src/queues/celery.py @@ -0,0 +1,33 @@ +from celery import Celery, signals +import sentry_sdk, os, dotenv +import os + +dotenv.load_dotenv() + +redis_host = os.environ.get("REDISHOST", "localhost") +redis_port = int(os.environ.get("REDISPORT")) + +redis_url = f"redis://{redis_host}:{redis_port}/1" + +app = Celery('subscribe', + broker=redis_url, + backend=redis_url, + include=['src.queues.tasks']) + +# Initialize Sentry SDK on Celery startup +@signals.celeryd_init.connect +def init_sentry(**_kwargs): + dotenv.load_dotenv() + RELEASE = os.environ["RELEASE"] + DSN = os.environ["FLASK_APP_DSN"] + ENVIRONMENT = os.environ["FLASK_ENV"] + sentry_sdk.init( + dsn=DSN, + release=RELEASE, + environment=ENVIRONMENT, + traces_sample_rate=1.0, + profiles_sample_rate=1.0, + ) + +if __name__ == '__main__': + app.start() diff --git a/flask/src/queues/tasks.py b/flask/src/queues/tasks.py new file mode 100644 index 000000000..756689dd8 --- /dev/null +++ b/flask/src/queues/tasks.py @@ -0,0 +1,16 @@ +import sentry_sdk +from .celery import app +import time, random + + +@app.task +def sendEmail(email): + try: + time.sleep(random.randrange(5)) # Simulate a delay + if random.randrange(5) % 2 == 0: # Check if the random number is even + raise Exception("sending email error") + else: + print("Sending email to: " + email) + except Exception as e: + # Log the exception to Sentry + sentry_sdk.capture_exception(e) diff --git a/next/src/app/api/enqueue/route.js b/next/src/app/api/enqueue/route.js new file mode 100644 index 000000000..2e62a874b --- /dev/null +++ b/next/src/app/api/enqueue/route.js @@ -0,0 +1,36 @@ +import { NextResponse } from "next/server"; +import { + determineBackendUrl, +} from '@/src/utils/backendrouter'; + +export async function POST(request) { + try { + console.log("Here in POST"); + + + const { email } = await request.json(); + if (!email) { + return NextResponse.json({ error: 'Email is required' }, { status: 400 }); + } + + const backendUrl = determineBackendUrl('flask'); + const resp = await fetch(`${backendUrl}/enqueue`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ email }), + }); + + if (!resp.ok) { + const errorText = await resp.text(); + console.error('Backend error:', errorText); + return NextResponse.json({ error: 'Failed to enqueue email' }, { status: resp.status }); + } + + const data = await resp.json(); + + return NextResponse.json({ response: data }, { status: 200 }); + } catch (err) { + console.error('Error handling POST request:', err); + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }); + } +} diff --git a/next/src/app/page.jsx b/next/src/app/page.jsx index 4eb9227be..bc1e6b738 100644 --- a/next/src/app/page.jsx +++ b/next/src/app/page.jsx @@ -21,7 +21,9 @@ export default function Page(props) { console.log("in home page"); const router = useRouter(); - const { backend, frontendSlowdown } = useSearchParams(); + const searchParams = useSearchParams(); + const backend = searchParams.get('backend'); + const backendType = determineBackendType(backend); const backendUrl = determineBackendUrl(backendType); console.log('backend is ' + backendUrl); diff --git a/next/src/ui/Footer.jsx b/next/src/ui/Footer.jsx index 465bc403b..b4d3f562c 100644 --- a/next/src/ui/Footer.jsx +++ b/next/src/ui/Footer.jsx @@ -1,15 +1,43 @@ "use client" import * as Sentry from '@sentry/nextjs'; + +import { useSearchParams } from 'next/navigation'; import { Fragment } from 'react'; import Link from 'next/link'; import { useState } from 'react'; function Footer() { + const [email, setEmail] = useState(''); const [subscribed, setSubscribed] = useState(false); + const searchParams = useSearchParams(); + const errorBoundary = searchParams.get('error_boundary'); - const handleSubmit = () => { + const handleSubmit = (event) => { + console.log("error boundary = ", errorBoundary); + event.preventDefault(); + console.log('Email:', email); setSubscribed(true); + if (errorBoundary !== 'true') { + addToQueue(email); + } + }; + + + const addToQueue = async (email) => { + try { + const resp = await fetch(`/api/enqueue`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ email }), + }); + const data = await resp.json(); + console.log(data); + } catch (err) { + console.error('Error adding to queue:', err); + } }; return ( @@ -32,7 +60,7 @@ function Footer() { }} >
-
+ @@ -40,15 +68,18 @@ function Footer() { type="email" name="email-subscribe" id="email-subscribe" + value={email} + onChange={(e) => setEmail(e.target.value)} >
- {subscribed && } + {subscribed && (errorBoundary === 'true' ? : )}

@@ -60,6 +91,10 @@ function Footer() { } function SubscribedMessage() { + return

You have successfully subscribed!

; +} + +function SubscribedMessageError() { throw new Error('SubscribedMessage error'); }