Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flask, next] Add Celery Queue for subscribe send email workflow #666

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Query params to be added to the demo app. These query parameters can be stacked
- `[email protected]` - lets you [pass in a specific user email](https://github.com/sentry-demos/empower/blob/fce289530f72ce47fe2c7482cdbd9aa8bcc13b6e/react/src/index.js#L218-L219)
- `?frontendSlowdown=true` - used in the [frontend-only demo flow](https://github.com/sentry-demos/empower/blob/fce289530f72ce47fe2c7482cdbd9aa8bcc13b6e/react/src/index.js#L200-L207), which showcases a frontend slowdown via profiling.
- `?rageclick=true` - causes the checkout button to stop working, so you can rageclick it. This will prevent the checkout error from happening. If you want to still demo the checkout error AND a rageclick, you can rageclick manually on the 'Contact Us' button that shows on the page after the Checkout Error occurs.

- `?error_boundary=true` - enables the error boundary functionality in subscribe instead of putting a message on the queue (NextJS only, react will always use error boundary)
```
# example
https://localhost:5000/?se=chris&backend=flask&frontendSlowdown=true
Expand Down Expand Up @@ -183,6 +183,6 @@ gcloud config list, to display current account

1. Add your OPENAI_API_KEY= to local.env
2. Run next and flask (./deploy.sh --env=local next flask)
3. Get suggestion button should show automatically
3. Get suggestion button should show automatically

On main page load, next will check with flask if it has the OPEN_API_KEY and conditionally show the get suggestion input.
6 changes: 4 additions & 2 deletions deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ function cleanup {
if [ "$generated_envs" != "" ]; then
rm -f $generated_envs # bash only (passed as separate args)
fi
# terminate the celery workers
pkill -f "celery worker"
}
trap cleanup EXIT

Expand Down Expand Up @@ -145,7 +147,7 @@ for proj in $projects; do # bash only
if [ "$proj" == "next" ]; then
# Next env variables need to start with NEXT_PUBLIC_*
backend_var=$(var_name.sh NEXT_PUBLIC_%s_BACKEND $be_proj)
else
else
backend_var=$(var_name.sh %s_APP_%s_BACKEND $proj $be_proj)
fi
. get_proj_var.sh "%s_LOCAL_PORT" $be_proj # sets $local_port
Expand Down Expand Up @@ -213,7 +215,7 @@ for proj in $projects; do # bash only
if [[ "$proj" =~ ^crons- ]]; then
. get_proj_var.sh "%s_DEPLOY_DIR" $proj
escaped_deploy_dir=$(echo "$deploy_dir" | sed 's_/_\\/_g')
sed -e 's/<CRONSPYTHON_DEPLOY_DIR>/'$escaped_deploy_dir'/g' crontab.template > crontab
sed -e 's/<CRONSPYTHON_DEPLOY_DIR>/'$escaped_deploy_dir'/g' crontab.template > crontab
fi
./deploy_project.sh
else
Expand Down
3 changes: 1 addition & 2 deletions flask/app.yaml.template
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
service: <SERVICE>
runtime: python312
entrypoint: gunicorn -b :$PORT -w 2 --timeout 60 src.main:app
entrypoint: ./start.sh

env_variables:
REDISHOST: 10.251.35.179
REDISPORT: 6379

vpc_access_connector:
name: projects/sales-engineering-sf/locations/us-central1/connectors/empower-plant-connector

1 change: 1 addition & 0 deletions flask/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ Werkzeug==3.0.3
flask_caching==2.3.0
openai==1.52.2
tiktoken==0.8.0
celery==5.4.0
5 changes: 5 additions & 0 deletions flask/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ if [ -z "$ACTIVE_ACCOUNT" ]; then
fi
gcloud compute ssh redis-relay --zone=us-central1-a -- -N -L 6379:10.251.35.179:6379 &

# wait for relay to be setup before celery connects
sleep 1

celery -A src.queues.celery worker -l INFO &

flask run --port $LOCAL_PORT
17 changes: 14 additions & 3 deletions flask/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import dotenv
from .db import get_products, get_products_join, get_inventory
from .utils import parseHeaders, get_iterator
from .queues.tasks import sendEmail
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
Expand Down Expand Up @@ -103,12 +104,12 @@ def __init__(self, import_name, *args, **kwargs):
CORS(app)


redis_host = os.environ.get("REDISHOST")
redis_host = os.environ.get("REDISHOST", "localhost")
redis_port = int(os.environ.get("REDISPORT"))

cache_config = {
"DEBUG": True, # some Flask specific configs
"CACHE_TYPE": "RedisCache", # Flask-Caching related configs
"DEBUG": True,
"CACHE_TYPE": "RedisCache",
"CACHE_DEFAULT_TIMEOUT": 300,
"CACHE_REDIS_HOST": redis_host,
"CACHE_REDIS_PORT": redis_port,
Expand All @@ -120,6 +121,16 @@ def __init__(self, import_name, *args, **kwargs):

redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)

@app.route('/enqueue', methods=['POST'])
def enqueue():
body = json.loads(request.data)
print(body['email'])
email = body['email']
with sentry_sdk.start_transaction(name="email-subscribe-task"):
r = sendEmail.apply_async(args=[email])
print(r.task_id)
return jsonify({"status": "success"}), 200


@app.route('/suggestion', methods=['GET'])
def suggestion():
Expand Down
Empty file added flask/src/queues/__init__.py
Empty file.
34 changes: 34 additions & 0 deletions flask/src/queues/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from celery import Celery, signals
import sentry_sdk, os, dotenv
import os

dotenv.load_dotenv()

redis_host = os.environ.get("REDISHOST", "localhost")
redis_port = int(os.environ.get("REDISPORT"))

redis_url = f"redis://{redis_host}:{redis_port}/1"

app = Celery('subscribe',
broker=redis_url,
backend=redis_url,
include=['src.queues.tasks'],
broker_connection_retry_on_startup=True)

# Initialize Sentry SDK on Celery startup
@signals.celeryd_init.connect
def init_sentry(**_kwargs):
dotenv.load_dotenv()
RELEASE = os.environ["RELEASE"]
DSN = os.environ["FLASK_APP_DSN"]
ENVIRONMENT = os.environ["FLASK_ENV"]
sentry_sdk.init(
dsn=DSN,
release=RELEASE,
environment=ENVIRONMENT,
traces_sample_rate=1.0,
profiles_sample_rate=1.0,
)

if __name__ == '__main__':
app.start()
10 changes: 10 additions & 0 deletions flask/src/queues/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .celery import app
import time, random


@app.task
def sendEmail(email):
time.sleep(random.randrange(5))
print("Sending email to: " + email)
raise Exception("sending email error")

7 changes: 7 additions & 0 deletions flask/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

# Start the Gunicorn server for Flask
gunicorn -b :$PORT -w 2 --timeout 60 src.main:app &

# Start the Celery worker
celery -A src.queues.celery worker -l INFO
36 changes: 36 additions & 0 deletions next/src/app/api/enqueue/route.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { NextResponse } from "next/server";
import {
determineBackendUrl,
} from '@/src/utils/backendrouter';

export async function POST(request) {
try {
console.log("Here in POST");


const { email } = await request.json();
if (!email) {
return NextResponse.json({ error: 'Email is required' }, { status: 400 });
}

const backendUrl = determineBackendUrl('flask');
const resp = await fetch(`${backendUrl}/enqueue`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ email }),
});

if (!resp.ok) {
const errorText = await resp.text();
console.error('Backend error:', errorText);
return NextResponse.json({ error: 'Failed to enqueue email' }, { status: resp.status });
}

const data = await resp.json();

return NextResponse.json({ response: data }, { status: 200 });
} catch (err) {
console.error('Error handling POST request:', err);
return NextResponse.json({ error: 'Internal server error' }, { status: 500 });
}
}
4 changes: 3 additions & 1 deletion next/src/app/page.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ export default function Page(props) {

console.log("in home page");
const router = useRouter();
const { backend, frontendSlowdown } = useSearchParams();
const searchParams = useSearchParams();
lukemun marked this conversation as resolved.
Show resolved Hide resolved
const backend = searchParams.get('backend');

const backendType = determineBackendType(backend);
const backendUrl = determineBackendUrl(backendType);
console.log('backend is ' + backendUrl);
Expand Down
41 changes: 38 additions & 3 deletions next/src/ui/Footer.jsx
Original file line number Diff line number Diff line change
@@ -1,15 +1,43 @@
"use client"

import * as Sentry from '@sentry/nextjs';

import { useSearchParams } from 'next/navigation';
import { Fragment } from 'react';
import Link from 'next/link';
import { useState } from 'react';

function Footer() {
const [email, setEmail] = useState('');
const [subscribed, setSubscribed] = useState(false);
const searchParams = useSearchParams();
const errorBoundary = searchParams.get('error_boundary');

const handleSubmit = () => {
const handleSubmit = (event) => {
console.log("error boundary = ", errorBoundary);
event.preventDefault();
console.log('Email:', email);
setSubscribed(true);
if (errorBoundary !== 'true') {
addToQueue(email);
}
};


const addToQueue = async (email) => {
try {
const resp = await fetch(`/api/enqueue`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ email }),
});
const data = await resp.json();
console.log(data);
} catch (err) {
console.error('Error adding to queue:', err);
}
};

return (
Expand All @@ -32,23 +60,26 @@ function Footer() {
}}
>
<div className="formContainer">
<form onSubmitCapture={handleSubmit}>
<form action={handleSubmit}>
<label htmlFor="email-subscribe" className="sentry-unmask">
Email
</label>
<input
type="email"
name="email-subscribe"
id="email-subscribe"
value={email}
onChange={(e) => setEmail(e.target.value)}
></input>
</form>
<input
name="email"
type="submit"
value="Subscribe"
className="sentry-unmask"
onClick={handleSubmit}
/>
{subscribed && <SubscribedMessage />}
{subscribed && (errorBoundary === 'true' ? <SubscribedMessageError /> : <SubscribedMessage />)}
</div>
</Sentry.ErrorBoundary>
<p className="sentry-unmask">
Expand All @@ -60,6 +91,10 @@ function Footer() {
}

function SubscribedMessage() {
return <p>You have successfully subscribed!</p>;
}

function SubscribedMessageError() {
throw new Error('SubscribedMessage error');
}

Expand Down
Loading