Skip to content

Commit

Permalink
Add Celery Queue for subscribe send email workflow
Browse files Browse the repository at this point in the history
- Setups a celery queue backed by redis
- Updates the subscribe feature in nextjs to do the following depending on error_boundary=true/false
-- Send a enqueue request to flask
-- Showcase error boundary
- Adds enqueue endpoint
  • Loading branch information
lukemun committed Jan 22, 2025
1 parent 76731e7 commit 9f9b389
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 6 deletions.
13 changes: 12 additions & 1 deletion flask/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import dotenv
from .db import get_products, get_products_join, get_inventory
from .utils import parseHeaders, get_iterator
from .queues.tasks import sendEmail
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
Expand Down Expand Up @@ -103,7 +104,7 @@ def __init__(self, import_name, *args, **kwargs):
CORS(app)


redis_host = os.environ.get("REDISHOST")
redis_host = os.environ.get("REDISHOST", "localhost")
redis_port = int(os.environ.get("REDISPORT"))

cache_config = {
Expand All @@ -120,6 +121,16 @@ def __init__(self, import_name, *args, **kwargs):

redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)

@app.route('/enqueue', methods=['POST'])
def enqueue():
body = json.loads(request.data)
print(body['email'])
email = body['email']
with sentry_sdk.start_transaction(name="email-subscribe-task"):
r = sendEmail.apply_async(args=[email])
print(r.task_id)
return jsonify({"status": "success"}), 200


@app.route('/suggestion', methods=['GET'])
def suggestion():
Expand Down
Empty file added flask/src/queues/__init__.py
Empty file.
41 changes: 41 additions & 0 deletions flask/src/queues/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from celery import Celery, signals
import sentry_sdk, os, dotenv
import os

dotenv.load_dotenv()

redis_host = os.environ.get("REDISHOST", "localhost")
redis_port = int(os.environ.get("REDISPORT"))

redis_url = f"redis://{redis_host}:{redis_port}/1"

app = Celery('subscribe',
broker=redis_url,
backend=redis_url,
include=['src.queues.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)

# Initialize Sentry SDK on Celery startup
@signals.celeryd_init.connect
def init_sentry(**_kwargs):
dotenv.load_dotenv()
RELEASE = os.environ["RELEASE"]
DSN = os.environ["FLASK_APP_DSN"]
ENVIRONMENT = os.environ["FLASK_ENV"]
sentry_sdk.init(
dsn=DSN,
release=RELEASE,
environment=ENVIRONMENT,
traces_sample_rate=1.0,
# Set profiles_sample_rate to 1.0 to profile 100%
# of sampled transactions.
# We recommend adjusting this value in production.
profiles_sample_rate=1.0,
)

if __name__ == '__main__':
app.start()
17 changes: 17 additions & 0 deletions flask/src/queues/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

import sentry_sdk
from .celery import app
import time, random


@app.task
def sendEmail(email):
try:
time.sleep(random.randrange(5)) # Simulate a delay
if random.randrange(5) % 2 == 0: # Check if the random number is even
raise Exception("sending email error")
else:
print("Sending email to: " + email)
except Exception as e:
# Log the exception to Sentry
sentry_sdk.capture_exception(e)
42 changes: 42 additions & 0 deletions next/src/app/api/enqueue/route.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

import { NextResponse } from "next/server";
import { getProductsOnly } from '@/lib/data.js';
import {
determineBackendUrl,
} from '@/src/utils/backendrouter';

export async function POST(request) {
try {
console.log("Here in POST");

// Parse the incoming request body to extract the email
const { email } = await request.json();

if (!email) {
return NextResponse.json({ error: 'Email is required' }, { status: 400 });
}

// Determine the backend URL
const backendUrl = determineBackendUrl('flask');

// Send the email to the backend
const resp = await fetch(`${backendUrl}/enqueue`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ email }), // Include the extracted email
});

if (!resp.ok) {
const errorText = await resp.text();
console.error('Backend error:', errorText);
return NextResponse.json({ error: 'Failed to enqueue email' }, { status: resp.status });
}

const data = await resp.json();

return NextResponse.json({ response: data }, { status: 200 });
} catch (err) {
console.error('Error handling POST request:', err);
return NextResponse.json({ error: 'Internal server error' }, { status: 500 });
}
}
4 changes: 3 additions & 1 deletion next/src/app/page.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ export default function Page(props) {

console.log("in home page");
const router = useRouter();
const { backend, frontendSlowdown } = useSearchParams();
const searchParams = useSearchParams();
const backend = searchParams.get('backend');

const backendType = determineBackendType(backend);
const backendUrl = determineBackendUrl(backendType);
console.log('backend is ' + backendUrl);
Expand Down
41 changes: 37 additions & 4 deletions next/src/ui/Footer.jsx
Original file line number Diff line number Diff line change
@@ -1,15 +1,41 @@
"use client"

import * as Sentry from '@sentry/nextjs';

import { useSearchParams } from 'next/navigation';
import { Fragment } from 'react';
import Link from 'next/link';
import { useState } from 'react';

function Footer() {
const [email, setEmail] = useState('');
const [subscribed, setSubscribed] = useState(false);
const searchParams = useSearchParams();
const errorBoundary = searchParams.get('error_boundary');

const handleSubmit = (event) => {
console.log("error boundary", errorBoundary);
event.preventDefault(); // Prevent form from submitting and reloading the page
console.log('Email:', email); // Access the email state
setSubscribed(true); // Update subscribed state
addToQueue(email);
};

const handleSubmit = () => {
setSubscribed(true);

const addToQueue = async (email) => {
try {
const resp = await fetch(`/api/enqueue`, {
method: 'POST',
headers: {
'Content-Type': 'application/json', // Specify JSON content type
},
body: JSON.stringify({ email }), // Include email in the request body
});
const data = await resp.json(); // Parse the JSON response
console.log(data); // Log the response for debugging
} catch (err) {
console.error('Error adding to queue:', err);
}
};

return (
Expand All @@ -32,23 +58,26 @@ function Footer() {
}}
>
<div className="formContainer">
<form onSubmitCapture={handleSubmit}>
<form action={handleSubmit}>
<label htmlFor="email-subscribe" className="sentry-unmask">
Email
</label>
<input
type="email"
name="email-subscribe"
id="email-subscribe"
value={email} // Bind the input value to state
onChange={(e) => setEmail(e.target.value)} // Update state on change
></input>
</form>
<input
name="email"
type="submit"
value="Subscribe"
className="sentry-unmask"
onClick={handleSubmit}
/>
{subscribed && <SubscribedMessage />}
{subscribed && (errorBoundary === 'true' ? <SubscribedMessageError /> : <SubscribedMessage />)}
</div>
</Sentry.ErrorBoundary>
<p className="sentry-unmask">
Expand All @@ -60,6 +89,10 @@ function Footer() {
}

function SubscribedMessage() {
return <p>You have successfully subscribed!</p>;
}

function SubscribedMessageError() {
throw new Error('SubscribedMessage error');
}

Expand Down

0 comments on commit 9f9b389

Please sign in to comment.