Skip to content

Commit 8827f32

Browse files
committed
Add Celery Queue for subscribe send email workflow
- Setups a celery queue backed by redis - Updates the subscribe feature in nextjs to do the following depending on error_boundary=true/false -- Send a enqueue request to flask -- Showcase error boundary - Adds enqueue endpoint - Update run.sh to start celery with the flask server - Update run.sh to start celery with the flask server - Update run.sh to start celery with the flask server - Update run.sh to start celery with the flask server - Update run.sh to start celery with the flask server - Update run.sh to start celery with the flask server - Update run.sh to start celery with the flask server - Update run.sh to start celery with the flask server - Update run.sh to start celery with the flask server
1 parent 76731e7 commit 8827f32

File tree

10 files changed

+160
-8
lines changed

10 files changed

+160
-8
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Query params to be added to the demo app. These query parameters can be stacked
2929
- `[email protected]` - lets you [pass in a specific user email](https://github.com/sentry-demos/empower/blob/fce289530f72ce47fe2c7482cdbd9aa8bcc13b6e/react/src/index.js#L218-L219)
3030
- `?frontendSlowdown=true` - used in the [frontend-only demo flow](https://github.com/sentry-demos/empower/blob/fce289530f72ce47fe2c7482cdbd9aa8bcc13b6e/react/src/index.js#L200-L207), which showcases a frontend slowdown via profiling.
3131
- `?rageclick=true` - causes the checkout button to stop working, so you can rageclick it. This will prevent the checkout error from happening. If you want to still demo the checkout error AND a rageclick, you can rageclick manually on the 'Contact Us' button that shows on the page after the Checkout Error occurs.
32-
32+
- `?error_boundary=true` - enables the error boundary functionality in subscribe instead of putting a message on the queue (NextJS only, react will always use error boundary)
3333
```
3434
# example
3535
https://localhost:5000/?se=chris&backend=flask&frontendSlowdown=true
@@ -183,6 +183,6 @@ gcloud config list, to display current account
183183

184184
1. Add your OPENAI_API_KEY= to local.env
185185
2. Run next and flask (./deploy.sh --env=local next flask)
186-
3. Get suggestion button should show automatically
186+
3. Get suggestion button should show automatically
187187

188188
On main page load, next will check with flask if it has the OPEN_API_KEY and conditionally show the get suggestion input.

flask/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ Werkzeug==3.0.3
1515
flask_caching==2.3.0
1616
openai==1.52.2
1717
tiktoken==0.8.0
18+
celery==5.4.0

flask/run.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,9 @@ if [ -z "$ACTIVE_ACCOUNT" ]; then
2424
fi
2525
gcloud compute ssh redis-relay --zone=us-central1-a -- -N -L 6379:10.251.35.179:6379 &
2626

27+
# wait for relay to be setup before celery connects
28+
sleep 1
29+
30+
celery -A src.queues.celery worker -l INFO &
31+
2732
flask run --port $LOCAL_PORT

flask/src/main.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import dotenv
1212
from .db import get_products, get_products_join, get_inventory
1313
from .utils import parseHeaders, get_iterator
14+
from .queues.tasks import sendEmail
1415
import sentry_sdk
1516
from sentry_sdk.integrations.flask import FlaskIntegration
1617
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
@@ -103,7 +104,7 @@ def __init__(self, import_name, *args, **kwargs):
103104
CORS(app)
104105

105106

106-
redis_host = os.environ.get("REDISHOST")
107+
redis_host = os.environ.get("REDISHOST", "localhost")
107108
redis_port = int(os.environ.get("REDISPORT"))
108109

109110
cache_config = {
@@ -120,6 +121,16 @@ def __init__(self, import_name, *args, **kwargs):
120121

121122
redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
122123

124+
@app.route('/enqueue', methods=['POST'])
125+
def enqueue():
126+
body = json.loads(request.data)
127+
print(body['email'])
128+
email = body['email']
129+
with sentry_sdk.start_transaction(name="email-subscribe-task"):
130+
r = sendEmail.apply_async(args=[email])
131+
print(r.task_id)
132+
return jsonify({"status": "success"}), 200
133+
123134

124135
@app.route('/suggestion', methods=['GET'])
125136
def suggestion():

flask/src/queues/__init__.py

Whitespace-only changes.

flask/src/queues/celery.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from celery import Celery, signals
2+
import sentry_sdk, os, dotenv
3+
import os
4+
5+
dotenv.load_dotenv()
6+
7+
redis_host = os.environ.get("REDISHOST", "localhost")
8+
redis_port = int(os.environ.get("REDISPORT"))
9+
10+
redis_url = f"redis://{redis_host}:{redis_port}/1"
11+
12+
app = Celery('subscribe',
13+
broker=redis_url,
14+
backend=redis_url,
15+
include=['src.queues.tasks'])
16+
17+
# Optional configuration, see the application user guide.
18+
app.conf.update(
19+
result_expires=3600,
20+
)
21+
22+
# Initialize Sentry SDK on Celery startup
23+
@signals.celeryd_init.connect
24+
def init_sentry(**_kwargs):
25+
dotenv.load_dotenv()
26+
RELEASE = os.environ["RELEASE"]
27+
DSN = os.environ["FLASK_APP_DSN"]
28+
ENVIRONMENT = os.environ["FLASK_ENV"]
29+
sentry_sdk.init(
30+
dsn=DSN,
31+
release=RELEASE,
32+
environment=ENVIRONMENT,
33+
traces_sample_rate=1.0,
34+
# Set profiles_sample_rate to 1.0 to profile 100%
35+
# of sampled transactions.
36+
# We recommend adjusting this value in production.
37+
profiles_sample_rate=1.0,
38+
)
39+
40+
if __name__ == '__main__':
41+
app.start()

flask/src/queues/tasks.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
2+
import sentry_sdk
3+
from .celery import app
4+
import time, random
5+
6+
7+
@app.task
8+
def sendEmail(email):
9+
try:
10+
time.sleep(random.randrange(5)) # Simulate a delay
11+
if random.randrange(5) % 2 == 0: # Check if the random number is even
12+
raise Exception("sending email error")
13+
else:
14+
print("Sending email to: " + email)
15+
except Exception as e:
16+
# Log the exception to Sentry
17+
sentry_sdk.capture_exception(e)

next/src/app/api/enqueue/route.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
2+
import { NextResponse } from "next/server";
3+
import { getProductsOnly } from '@/lib/data.js';
4+
import {
5+
determineBackendUrl,
6+
} from '@/src/utils/backendrouter';
7+
8+
export async function POST(request) {
9+
try {
10+
console.log("Here in POST");
11+
12+
// Parse the incoming request body to extract the email
13+
const { email } = await request.json();
14+
15+
if (!email) {
16+
return NextResponse.json({ error: 'Email is required' }, { status: 400 });
17+
}
18+
19+
// Determine the backend URL
20+
const backendUrl = determineBackendUrl('flask');
21+
22+
// Send the email to the backend
23+
const resp = await fetch(`${backendUrl}/enqueue`, {
24+
method: 'POST',
25+
headers: { 'Content-Type': 'application/json' },
26+
body: JSON.stringify({ email }), // Include the extracted email
27+
});
28+
29+
if (!resp.ok) {
30+
const errorText = await resp.text();
31+
console.error('Backend error:', errorText);
32+
return NextResponse.json({ error: 'Failed to enqueue email' }, { status: resp.status });
33+
}
34+
35+
const data = await resp.json();
36+
37+
return NextResponse.json({ response: data }, { status: 200 });
38+
} catch (err) {
39+
console.error('Error handling POST request:', err);
40+
return NextResponse.json({ error: 'Internal server error' }, { status: 500 });
41+
}
42+
}

next/src/app/page.jsx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ export default function Page(props) {
2121

2222
console.log("in home page");
2323
const router = useRouter();
24-
const { backend, frontendSlowdown } = useSearchParams();
24+
const searchParams = useSearchParams();
25+
const backend = searchParams.get('backend');
26+
2527
const backendType = determineBackendType(backend);
2628
const backendUrl = determineBackendUrl(backendType);
2729
console.log('backend is ' + backendUrl);

next/src/ui/Footer.jsx

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,41 @@
11
"use client"
22

33
import * as Sentry from '@sentry/nextjs';
4+
5+
import { useSearchParams } from 'next/navigation';
46
import { Fragment } from 'react';
57
import Link from 'next/link';
68
import { useState } from 'react';
79

810
function Footer() {
11+
const [email, setEmail] = useState('');
912
const [subscribed, setSubscribed] = useState(false);
13+
const searchParams = useSearchParams();
14+
const errorBoundary = searchParams.get('error_boundary');
15+
16+
const handleSubmit = (event) => {
17+
console.log("error boundary", errorBoundary);
18+
event.preventDefault(); // Prevent form from submitting and reloading the page
19+
console.log('Email:', email); // Access the email state
20+
setSubscribed(true); // Update subscribed state
21+
addToQueue(email);
22+
};
1023

11-
const handleSubmit = () => {
12-
setSubscribed(true);
24+
25+
const addToQueue = async (email) => {
26+
try {
27+
const resp = await fetch(`/api/enqueue`, {
28+
method: 'POST',
29+
headers: {
30+
'Content-Type': 'application/json', // Specify JSON content type
31+
},
32+
body: JSON.stringify({ email }), // Include email in the request body
33+
});
34+
const data = await resp.json(); // Parse the JSON response
35+
console.log(data); // Log the response for debugging
36+
} catch (err) {
37+
console.error('Error adding to queue:', err);
38+
}
1339
};
1440

1541
return (
@@ -32,23 +58,26 @@ function Footer() {
3258
}}
3359
>
3460
<div className="formContainer">
35-
<form onSubmitCapture={handleSubmit}>
61+
<form action={handleSubmit}>
3662
<label htmlFor="email-subscribe" className="sentry-unmask">
3763
Email
3864
</label>
3965
<input
4066
type="email"
4167
name="email-subscribe"
4268
id="email-subscribe"
69+
value={email} // Bind the input value to state
70+
onChange={(e) => setEmail(e.target.value)} // Update state on change
4371
></input>
4472
</form>
4573
<input
74+
name="email"
4675
type="submit"
4776
value="Subscribe"
4877
className="sentry-unmask"
4978
onClick={handleSubmit}
5079
/>
51-
{subscribed && <SubscribedMessage />}
80+
{subscribed && (errorBoundary === 'true' ? <SubscribedMessageError /> : <SubscribedMessage />)}
5281
</div>
5382
</Sentry.ErrorBoundary>
5483
<p className="sentry-unmask">
@@ -60,6 +89,10 @@ function Footer() {
6089
}
6190

6291
function SubscribedMessage() {
92+
return <p>You have successfully subscribed!</p>;
93+
}
94+
95+
function SubscribedMessageError() {
6396
throw new Error('SubscribedMessage error');
6497
}
6598

0 commit comments

Comments
 (0)