Skip to content

Commit

Permalink
add successful orderes to kafka queue (#13)
Browse files Browse the repository at this point in the history
* add kafka broker to configuration file.
* send the created order to kafka producer on successfull payment.
* add 2 sec deplay for `checkPayment()`.
* add `deplay()` to utility.
  • Loading branch information
satyajitnayk committed Dec 23, 2023
1 parent 12f9b2f commit 3236e65
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 19 deletions.
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
POSTGRES_CONNECTION_STRING=postgres://postgres:1234@localhost/postgres
POSTGRES_CONNECTION_STRING=postgres://postgres:1234@localhost/uptimemonitor
PORT=3000
KAFKA_BROKER=localhost:9093
9 changes: 9 additions & 0 deletions config/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const database = require('./db.js');
const kafka = require('./kafka.js');
const payment = require('./payment.js');

module.exports = {
database,
kafka,
payment,
};
39 changes: 39 additions & 0 deletions config/kafka.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
const { Kafka } = require('kafkajs');

const kafkaProducer = new Kafka({
clientId: 'order-producer',
brokers: [process.env.KAFKA_BROKER],
}).producer();

const kafkaConsumer = new Kafka({
clientId: 'order-consumer',
brokers: ['localhost:9093'],
}).consumer({ groupId: 'order-processing-group' });

const initializeKafka = async () => {
await kafkaProducer.connect();
await kafkaConsumer.connect();
await kafkaConsumer.subscribe({ topic: 'order-queue-topic', fromBeginning: true });
};

const produceOrder = async (order) => {
await kafkaProducer.send({
topic: 'order-queue-topic',
messages: [{ value: JSON.stringify(order) }],
});
};

const consumeOrders = async (processOrderCallback) => {
await kafkaConsumer.run({
eachMessage: async ({ message }) => {
const order = JSON.parse(message.value.toString());
await processOrderCallback(order);
},
});
};

module.exports = {
initializeKafka,
produceOrder,
consumeOrders,
};
6 changes: 5 additions & 1 deletion config/payment.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
const {delay} = require("../utility");

/**
* it returns success 70% of time
* @returns {boolean}
*/
export function checkPayment({txnId, orderId}) {
async function checkPayment({ txnId, orderId }) {
await delay(2000)
return Math.random() < 0.7;
}
module.exports = {checkPayment}
51 changes: 38 additions & 13 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,48 @@
const Fastify = require('fastify');
const { setupDatabase } = require('./config/db.js');
const { userRoutes } = require('./routes/users.js');
const { kafka } = require('./config');
const {checkPayment} = require("./config/payment");

const fastify = Fastify({
const fastify = require('fastify')({
logger: true,
});

setupDatabase(fastify);
userRoutes(fastify);
async function runServer() {
try {
setupDatabase(fastify);
userRoutes(fastify);

fastify.get('/health', async (request, reply) => {
reply.type('application/json').code(200);
return { status: 'ok', uptime: process.uptime() };
});
// Initialize Kafka
await kafka.initializeKafka();

fastify.get('/health', async (request, reply) => {
reply.type('application/json').code(200);
return { status: 'ok', uptime: process.uptime() };
});

fastify.post('/orders', async (request, reply) => {
try {
const order = request.body;
const {orderId= "1234", txnId= "BEY23DD2"} = order;
const isPaymentSuccess = await checkPayment({orderId,txnId})
if(isPaymentSuccess) {
await kafka.produceOrder(order);
reply.send({ success:true, message: `order with orderId ${orderId} placed successfully` });
} else {
reply.status(400).send({success:false, message: "payment failed"});
}
} catch (error) {
reply.status(500).send({ message: 'Error producing order' });
}
});

fastify.listen({ port: process.env.PORT }, (err, address) => {
if (err) {
fastify.log.error(err);
// Use await or return a promise for the listen operation
await fastify.listen({ port: process.env.PORT });
console.log(`Server listening on ${fastify.server.address().port}`);
} catch (error) {
console.error('Error starting the server:', error);
process.exit(1);
}
fastify.log.info(`Server listening on ${address}`);
});
}

runServer();
4 changes: 2 additions & 2 deletions routes/users.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ const userRoutes = (fastify) => {
const { userId, name, email, createdAt, updatedAt } = user;
reply.send({ userId, name, email, createdAt, updatedAt });
} else {
reply.send({ error: 'User not found' });
reply.send({ message: 'User not found' });
}
} catch (err) {
reply.send(err);
reply.send({message:err});
}
});
};
Expand Down
3 changes: 1 addition & 2 deletions setup-kafka/check_kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ async function runTest() {
} catch (error) {
console.error('Error:', error);
} finally {
// Uncomment the following line if you want to stop the producer after the test
// await producer.disconnect();
await producer.disconnect();
}
}

Expand Down
3 changes: 3 additions & 0 deletions utility.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const delay = async (ms) => await new Promise(resolve => setTimeout(resolve, ms));

module.exports = {delay}

0 comments on commit 3236e65

Please sign in to comment.