Skip to content

Commit

Permalink
Add Celery Queue for subscribe send email workflow
Browse files Browse the repository at this point in the history
- Setups a celery queue backed by redis
- Updates the subscribe feature in nextjs to do the following depending on error_boundary=true/false
-- Send a enqueue request to flask
-- Showcase error boundary
- Adds enqueue endpoint
- Update run.sh to start celery with the flask server
  • Loading branch information
lukemun committed Jan 22, 2025
1 parent 76731e7 commit f958ad3
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 9 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Query params to be added to the demo app. These query parameters can be stacked
- `[email protected]` - lets you [pass in a specific user email](https://github.com/sentry-demos/empower/blob/fce289530f72ce47fe2c7482cdbd9aa8bcc13b6e/react/src/index.js#L218-L219)
- `?frontendSlowdown=true` - used in the [frontend-only demo flow](https://github.com/sentry-demos/empower/blob/fce289530f72ce47fe2c7482cdbd9aa8bcc13b6e/react/src/index.js#L200-L207), which showcases a frontend slowdown via profiling.
- `?rageclick=true` - causes the checkout button to stop working, so you can rageclick it. This will prevent the checkout error from happening. If you want to still demo the checkout error AND a rageclick, you can rageclick manually on the 'Contact Us' button that shows on the page after the Checkout Error occurs.

- `?error_boundary=true` - enables the error boundary functionality in subscribe instead of putting a message on the queue (NextJS only, react will always use error boundary)
```
# example
https://localhost:5000/?se=chris&backend=flask&frontendSlowdown=true
Expand Down Expand Up @@ -183,6 +183,6 @@ gcloud config list, to display current account

1. Add your OPENAI_API_KEY= to local.env
2. Run next and flask (./deploy.sh --env=local next flask)
3. Get suggestion button should show automatically
3. Get suggestion button should show automatically

On main page load, next will check with flask if it has the OPEN_API_KEY and conditionally show the get suggestion input.
6 changes: 4 additions & 2 deletions deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ function cleanup {
if [ "$generated_envs" != "" ]; then
rm -f $generated_envs # bash only (passed as separate args)
fi
# terminate the celery workers
pkill -f "celery worker"
}
trap cleanup EXIT

Expand Down Expand Up @@ -145,7 +147,7 @@ for proj in $projects; do # bash only
if [ "$proj" == "next" ]; then
# Next env variables need to start with NEXT_PUBLIC_*
backend_var=$(var_name.sh NEXT_PUBLIC_%s_BACKEND $be_proj)
else
else
backend_var=$(var_name.sh %s_APP_%s_BACKEND $proj $be_proj)
fi
. get_proj_var.sh "%s_LOCAL_PORT" $be_proj # sets $local_port
Expand Down Expand Up @@ -213,7 +215,7 @@ for proj in $projects; do # bash only
if [[ "$proj" =~ ^crons- ]]; then
. get_proj_var.sh "%s_DEPLOY_DIR" $proj
escaped_deploy_dir=$(echo "$deploy_dir" | sed 's_/_\\/_g')
sed -e 's/<CRONSPYTHON_DEPLOY_DIR>/'$escaped_deploy_dir'/g' crontab.template > crontab
sed -e 's/<CRONSPYTHON_DEPLOY_DIR>/'$escaped_deploy_dir'/g' crontab.template > crontab
fi
./deploy_project.sh
else
Expand Down
1 change: 1 addition & 0 deletions flask/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ Werkzeug==3.0.3
flask_caching==2.3.0
openai==1.52.2
tiktoken==0.8.0
celery==5.4.0
5 changes: 5 additions & 0 deletions flask/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ if [ -z "$ACTIVE_ACCOUNT" ]; then
fi
gcloud compute ssh redis-relay --zone=us-central1-a -- -N -L 6379:10.251.35.179:6379 &

# wait for relay to be setup before celery connects
sleep 1

celery -A src.queues.celery worker -l INFO &

flask run --port $LOCAL_PORT
13 changes: 12 additions & 1 deletion flask/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import dotenv
from .db import get_products, get_products_join, get_inventory
from .utils import parseHeaders, get_iterator
from .queues.tasks import sendEmail
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
Expand Down Expand Up @@ -103,7 +104,7 @@ def __init__(self, import_name, *args, **kwargs):
CORS(app)


redis_host = os.environ.get("REDISHOST")
redis_host = os.environ.get("REDISHOST", "localhost")
redis_port = int(os.environ.get("REDISPORT"))

cache_config = {
Expand All @@ -120,6 +121,16 @@ def __init__(self, import_name, *args, **kwargs):

redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)

@app.route('/enqueue', methods=['POST'])
def enqueue():
body = json.loads(request.data)
print(body['email'])
email = body['email']
with sentry_sdk.start_transaction(name="email-subscribe-task"):
r = sendEmail.apply_async(args=[email])
print(r.task_id)
return jsonify({"status": "success"}), 200


@app.route('/suggestion', methods=['GET'])
def suggestion():
Expand Down
Empty file added flask/src/queues/__init__.py
Empty file.
33 changes: 33 additions & 0 deletions flask/src/queues/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from celery import Celery, signals
import sentry_sdk, os, dotenv
import os

dotenv.load_dotenv()

redis_host = os.environ.get("REDISHOST", "localhost")
redis_port = int(os.environ.get("REDISPORT"))

redis_url = f"redis://{redis_host}:{redis_port}/1"

app = Celery('subscribe',
broker=redis_url,
backend=redis_url,
include=['src.queues.tasks'])

# Initialize Sentry SDK on Celery startup
@signals.celeryd_init.connect
def init_sentry(**_kwargs):
dotenv.load_dotenv()
RELEASE = os.environ["RELEASE"]
DSN = os.environ["FLASK_APP_DSN"]
ENVIRONMENT = os.environ["FLASK_ENV"]
sentry_sdk.init(
dsn=DSN,
release=RELEASE,
environment=ENVIRONMENT,
traces_sample_rate=1.0,
profiles_sample_rate=1.0,
)

if __name__ == '__main__':
app.start()
16 changes: 16 additions & 0 deletions flask/src/queues/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import sentry_sdk
from .celery import app
import time, random


@app.task
def sendEmail(email):
try:
time.sleep(random.randrange(5)) # Simulate a delay
if random.randrange(5) % 2 == 0: # Check if the random number is even
raise Exception("sending email error")
else:
print("Sending email to: " + email)
except Exception as e:
# Log the exception to Sentry
sentry_sdk.capture_exception(e)
36 changes: 36 additions & 0 deletions next/src/app/api/enqueue/route.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { NextResponse } from "next/server";
import {
determineBackendUrl,
} from '@/src/utils/backendrouter';

export async function POST(request) {
try {
console.log("Here in POST");


const { email } = await request.json();
if (!email) {
return NextResponse.json({ error: 'Email is required' }, { status: 400 });
}

const backendUrl = determineBackendUrl('flask');
const resp = await fetch(`${backendUrl}/enqueue`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ email }),
});

if (!resp.ok) {
const errorText = await resp.text();
console.error('Backend error:', errorText);
return NextResponse.json({ error: 'Failed to enqueue email' }, { status: resp.status });
}

const data = await resp.json();

return NextResponse.json({ response: data }, { status: 200 });
} catch (err) {
console.error('Error handling POST request:', err);
return NextResponse.json({ error: 'Internal server error' }, { status: 500 });
}
}
4 changes: 3 additions & 1 deletion next/src/app/page.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ export default function Page(props) {

console.log("in home page");
const router = useRouter();
const { backend, frontendSlowdown } = useSearchParams();
const searchParams = useSearchParams();
const backend = searchParams.get('backend');

const backendType = determineBackendType(backend);
const backendUrl = determineBackendUrl(backendType);
console.log('backend is ' + backendUrl);
Expand Down
41 changes: 38 additions & 3 deletions next/src/ui/Footer.jsx
Original file line number Diff line number Diff line change
@@ -1,15 +1,43 @@
"use client"

import * as Sentry from '@sentry/nextjs';

import { useSearchParams } from 'next/navigation';
import { Fragment } from 'react';
import Link from 'next/link';
import { useState } from 'react';

function Footer() {
const [email, setEmail] = useState('');
const [subscribed, setSubscribed] = useState(false);
const searchParams = useSearchParams();
const errorBoundary = searchParams.get('error_boundary');

const handleSubmit = () => {
const handleSubmit = (event) => {
console.log("error boundary = ", errorBoundary);
event.preventDefault();
console.log('Email:', email);
setSubscribed(true);
if (errorBoundary !== 'true') {
addToQueue(email);
}
};


const addToQueue = async (email) => {
try {
const resp = await fetch(`/api/enqueue`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ email }),
});
const data = await resp.json();
console.log(data);
} catch (err) {
console.error('Error adding to queue:', err);
}
};

return (
Expand All @@ -32,23 +60,26 @@ function Footer() {
}}
>
<div className="formContainer">
<form onSubmitCapture={handleSubmit}>
<form action={handleSubmit}>
<label htmlFor="email-subscribe" className="sentry-unmask">
Email
</label>
<input
type="email"
name="email-subscribe"
id="email-subscribe"
value={email}
onChange={(e) => setEmail(e.target.value)}
></input>
</form>
<input
name="email"
type="submit"
value="Subscribe"
className="sentry-unmask"
onClick={handleSubmit}
/>
{subscribed && <SubscribedMessage />}
{subscribed && (errorBoundary === 'true' ? <SubscribedMessageError /> : <SubscribedMessage />)}
</div>
</Sentry.ErrorBoundary>
<p className="sentry-unmask">
Expand All @@ -60,6 +91,10 @@ function Footer() {
}

function SubscribedMessage() {
return <p>You have successfully subscribed!</p>;
}

function SubscribedMessageError() {
throw new Error('SubscribedMessage error');
}

Expand Down

0 comments on commit f958ad3

Please sign in to comment.