From 9f9b3897c58dbb3518aded5b8f6f59dd0655ea38 Mon Sep 17 00:00:00 2001 From: lukemun Date: Wed, 22 Jan 2025 10:15:20 -0500 Subject: [PATCH] Add Celery Queue for subscribe send email workflow - Setups a celery queue backed by redis - Updates the subscribe feature in nextjs to do the following depending on error_boundary=true/false -- Send a enqueue request to flask -- Showcase error boundary - Adds enqueue endpoint --- flask/src/main.py | 13 +++++++++- flask/src/queues/__init__.py | 0 flask/src/queues/celery.py | 41 ++++++++++++++++++++++++++++++ flask/src/queues/tasks.py | 17 +++++++++++++ next/src/app/api/enqueue/route.js | 42 +++++++++++++++++++++++++++++++ next/src/app/page.jsx | 4 ++- next/src/ui/Footer.jsx | 41 +++++++++++++++++++++++++++--- 7 files changed, 152 insertions(+), 6 deletions(-) create mode 100644 flask/src/queues/__init__.py create mode 100644 flask/src/queues/celery.py create mode 100644 flask/src/queues/tasks.py create mode 100644 next/src/app/api/enqueue/route.js diff --git a/flask/src/main.py b/flask/src/main.py index 6a08caef8..4ad9ed734 100644 --- a/flask/src/main.py +++ b/flask/src/main.py @@ -11,6 +11,7 @@ import dotenv from .db import get_products, get_products_join, get_inventory from .utils import parseHeaders, get_iterator +from .queues.tasks import sendEmail import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration @@ -103,7 +104,7 @@ def __init__(self, import_name, *args, **kwargs): CORS(app) -redis_host = os.environ.get("REDISHOST") +redis_host = os.environ.get("REDISHOST", "localhost") redis_port = int(os.environ.get("REDISPORT")) cache_config = { @@ -120,6 +121,16 @@ def __init__(self, import_name, *args, **kwargs): redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True) +@app.route('/enqueue', methods=['POST']) +def enqueue(): + body = json.loads(request.data) + print(body['email']) + email = body['email'] + with sentry_sdk.start_transaction(name="email-subscribe-task"): + r = sendEmail.apply_async(args=[email]) + print(r.task_id) + return jsonify({"status": "success"}), 200 + @app.route('/suggestion', methods=['GET']) def suggestion(): diff --git a/flask/src/queues/__init__.py b/flask/src/queues/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/flask/src/queues/celery.py b/flask/src/queues/celery.py new file mode 100644 index 000000000..c66168a9a --- /dev/null +++ b/flask/src/queues/celery.py @@ -0,0 +1,41 @@ +from celery import Celery, signals +import sentry_sdk, os, dotenv +import os + +dotenv.load_dotenv() + +redis_host = os.environ.get("REDISHOST", "localhost") +redis_port = int(os.environ.get("REDISPORT")) + +redis_url = f"redis://{redis_host}:{redis_port}/1" + +app = Celery('subscribe', + broker=redis_url, + backend=redis_url, + include=['src.queues.tasks']) + +# Optional configuration, see the application user guide. +app.conf.update( + result_expires=3600, +) + +# Initialize Sentry SDK on Celery startup +@signals.celeryd_init.connect +def init_sentry(**_kwargs): + dotenv.load_dotenv() + RELEASE = os.environ["RELEASE"] + DSN = os.environ["FLASK_APP_DSN"] + ENVIRONMENT = os.environ["FLASK_ENV"] + sentry_sdk.init( + dsn=DSN, + release=RELEASE, + environment=ENVIRONMENT, + traces_sample_rate=1.0, + # Set profiles_sample_rate to 1.0 to profile 100% + # of sampled transactions. + # We recommend adjusting this value in production. + profiles_sample_rate=1.0, + ) + +if __name__ == '__main__': + app.start() diff --git a/flask/src/queues/tasks.py b/flask/src/queues/tasks.py new file mode 100644 index 000000000..98d52a95f --- /dev/null +++ b/flask/src/queues/tasks.py @@ -0,0 +1,17 @@ + +import sentry_sdk +from .celery import app +import time, random + + +@app.task +def sendEmail(email): + try: + time.sleep(random.randrange(5)) # Simulate a delay + if random.randrange(5) % 2 == 0: # Check if the random number is even + raise Exception("sending email error") + else: + print("Sending email to: " + email) + except Exception as e: + # Log the exception to Sentry + sentry_sdk.capture_exception(e) diff --git a/next/src/app/api/enqueue/route.js b/next/src/app/api/enqueue/route.js new file mode 100644 index 000000000..7dcaca156 --- /dev/null +++ b/next/src/app/api/enqueue/route.js @@ -0,0 +1,42 @@ + +import { NextResponse } from "next/server"; +import { getProductsOnly } from '@/lib/data.js'; +import { + determineBackendUrl, +} from '@/src/utils/backendrouter'; + +export async function POST(request) { + try { + console.log("Here in POST"); + + // Parse the incoming request body to extract the email + const { email } = await request.json(); + + if (!email) { + return NextResponse.json({ error: 'Email is required' }, { status: 400 }); + } + + // Determine the backend URL + const backendUrl = determineBackendUrl('flask'); + + // Send the email to the backend + const resp = await fetch(`${backendUrl}/enqueue`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ email }), // Include the extracted email + }); + + if (!resp.ok) { + const errorText = await resp.text(); + console.error('Backend error:', errorText); + return NextResponse.json({ error: 'Failed to enqueue email' }, { status: resp.status }); + } + + const data = await resp.json(); + + return NextResponse.json({ response: data }, { status: 200 }); + } catch (err) { + console.error('Error handling POST request:', err); + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }); + } +} diff --git a/next/src/app/page.jsx b/next/src/app/page.jsx index 4eb9227be..bc1e6b738 100644 --- a/next/src/app/page.jsx +++ b/next/src/app/page.jsx @@ -21,7 +21,9 @@ export default function Page(props) { console.log("in home page"); const router = useRouter(); - const { backend, frontendSlowdown } = useSearchParams(); + const searchParams = useSearchParams(); + const backend = searchParams.get('backend'); + const backendType = determineBackendType(backend); const backendUrl = determineBackendUrl(backendType); console.log('backend is ' + backendUrl); diff --git a/next/src/ui/Footer.jsx b/next/src/ui/Footer.jsx index 465bc403b..38b875108 100644 --- a/next/src/ui/Footer.jsx +++ b/next/src/ui/Footer.jsx @@ -1,15 +1,41 @@ "use client" import * as Sentry from '@sentry/nextjs'; + +import { useSearchParams } from 'next/navigation'; import { Fragment } from 'react'; import Link from 'next/link'; import { useState } from 'react'; function Footer() { + const [email, setEmail] = useState(''); const [subscribed, setSubscribed] = useState(false); + const searchParams = useSearchParams(); + const errorBoundary = searchParams.get('error_boundary'); + + const handleSubmit = (event) => { + console.log("error boundary", errorBoundary); + event.preventDefault(); // Prevent form from submitting and reloading the page + console.log('Email:', email); // Access the email state + setSubscribed(true); // Update subscribed state + addToQueue(email); + }; - const handleSubmit = () => { - setSubscribed(true); + + const addToQueue = async (email) => { + try { + const resp = await fetch(`/api/enqueue`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', // Specify JSON content type + }, + body: JSON.stringify({ email }), // Include email in the request body + }); + const data = await resp.json(); // Parse the JSON response + console.log(data); // Log the response for debugging + } catch (err) { + console.error('Error adding to queue:', err); + } }; return ( @@ -32,7 +58,7 @@ function Footer() { }} >
-
+ @@ -40,15 +66,18 @@ function Footer() { type="email" name="email-subscribe" id="email-subscribe" + value={email} // Bind the input value to state + onChange={(e) => setEmail(e.target.value)} // Update state on change >
- {subscribed && } + {subscribed && (errorBoundary === 'true' ? : )}

@@ -60,6 +89,10 @@ function Footer() { } function SubscribedMessage() { + return

You have successfully subscribed!

; +} + +function SubscribedMessageError() { throw new Error('SubscribedMessage error'); }