Skip to content

Commit

Permalink
add logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Foxcapades committed Jun 11, 2022
1 parent f2d00a5 commit 6625733
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 6 deletions.
4 changes: 3 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ plugins {
}

group = "org.veupathdb.lib"
version = "1.1.1"
version = "1.2.0"

repositories {
mavenCentral()
Expand All @@ -27,6 +27,8 @@ dependencies {
implementation(kotlin("stdlib"))
implementation(kotlin("stdlib-jdk8"))

implementation("org.slf4j:slf4j-api:1.7.36")

// Jackson and modules (gotta catch em all)
implementation("com.fasterxml.jackson.core:jackson-databind:2.13.3")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-json-org:2.13.3")
Expand Down
23 changes: 21 additions & 2 deletions src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.veupathdb.lib.rabbit.jobs

import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback
import org.slf4j.LoggerFactory
import org.veupathdb.lib.rabbit.jobs.fn.ErrorHandler
import org.veupathdb.lib.rabbit.jobs.fn.SuccessHandler
import org.veupathdb.lib.rabbit.jobs.model.ErrorNotification
Expand All @@ -15,6 +16,9 @@ import org.veupathdb.lib.rabbit.jobs.serialization.Json
* Job dispatcher.
*/
class QueueDispatcher : QueueWrapper {

private val Log = LoggerFactory.getLogger(javaClass)

private val errorHandlers = ErrorHandlers()

private val successHandlers = SuccessHandlers()
Expand All @@ -29,6 +33,7 @@ class QueueDispatcher : QueueWrapper {
* @param fn Success callback.
*/
fun onSuccess(fn: SuccessHandler) {
Log.debug("registering success handler {}", fn)
successHandlers.register(fn)
}

Expand All @@ -38,6 +43,7 @@ class QueueDispatcher : QueueWrapper {
* @param fn Error callback.
*/
fun onError(fn: ErrorHandler) {
Log.debug("registering error handler {}", fn)
errorHandlers.register(fn)
}

Expand All @@ -47,6 +53,7 @@ class QueueDispatcher : QueueWrapper {
* @param job Job definition.
*/
fun dispatch(job: JobDispatch) {
Log.debug("dispatching job {}", job)
withDispatchQueue { publish(dispatchQueueName, job) }
}

Expand All @@ -56,8 +63,14 @@ class QueueDispatcher : QueueWrapper {
errorQueueName,
false,
DeliverCallback { _, msg ->
Log.debug("handling error message {}", msg.envelope.deliveryTag)
workers.execute {
errorHandlers.execute(ErrorNotification.fromJson(Json.from(msg.body)))
try {
errorHandlers.execute(ErrorNotification.fromJson(Json.from(msg.body)))
} finally {
Log.debug("acknowledging error message {}", msg.envelope.deliveryTag)
basicAck(msg.envelope.deliveryTag, false)
}
}
},
CancelCallback { }
Expand All @@ -69,8 +82,14 @@ class QueueDispatcher : QueueWrapper {
successQueueName,
false,
DeliverCallback { _, msg ->
Log.debug("handling success message {}", msg.envelope.deliveryTag)
workers.execute {
successHandlers.execute(SuccessNotification.fromJson(Json.from(msg.body)))
try {
successHandlers.execute(SuccessNotification.fromJson(Json.from(msg.body)))
} finally {
Log.debug("acknowledging success message {}", msg.envelope.deliveryTag)
basicAck(msg.envelope.deliveryTag, false)
}
}
},
CancelCallback { }
Expand Down
17 changes: 15 additions & 2 deletions src/main/kotlin/org/veupathdb/lib/rabbit/jobs/QueueWorker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.veupathdb.lib.rabbit.jobs

import com.rabbitmq.client.CancelCallback
import com.rabbitmq.client.DeliverCallback
import org.slf4j.LoggerFactory
import org.veupathdb.lib.rabbit.jobs.fn.JobHandler
import org.veupathdb.lib.rabbit.jobs.model.ErrorNotification
import org.veupathdb.lib.rabbit.jobs.model.JobDispatch
Expand All @@ -13,6 +14,9 @@ import org.veupathdb.lib.rabbit.jobs.serialization.Json
* Job executor end of the job queue.
*/
class QueueWorker : QueueWrapper {

private val Log = LoggerFactory.getLogger(javaClass)

private val handlers = JobHandlers()

/**
Expand All @@ -37,6 +41,7 @@ class QueueWorker : QueueWrapper {
* @param fn Job request callback.
*/
fun onJob(fn: JobHandler) {
Log.debug("registering job handler {}", fn)
handlers.register(fn)
}

Expand All @@ -47,6 +52,7 @@ class QueueWorker : QueueWrapper {
* @param err Error notification to send.
*/
fun sendError(err: ErrorNotification) {
Log.debug("sending error notification {}", err)
withErrorQueue { publish(errorQueueName, err) }
}

Expand All @@ -57,6 +63,7 @@ class QueueWorker : QueueWrapper {
* @param msg Success notification to send.
*/
fun sendSuccess(msg: SuccessNotification) {
Log.debug("sending success notification {}", msg)
withSuccessQueue { publish(successQueueName, msg) }
}

Expand All @@ -67,10 +74,16 @@ class QueueWorker : QueueWrapper {
withDispatchQueue {
basicConsume(
dispatchQueueName,
true,
false,
DeliverCallback { _, msg ->
Log.debug("handling job message {}", msg.envelope.deliveryTag)
workers.execute {
handlers.execute(JobDispatch.fromJson(Json.from(msg.body)))
try {
handlers.execute(JobDispatch.fromJson(Json.from(msg.body)))
} finally {
Log.debug("acknowledging job message {}", msg.envelope.deliveryTag)
basicAck(msg.envelope.deliveryTag, false)
}
}
},
CancelCallback { }
Expand Down
2 changes: 1 addition & 1 deletion test/server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ dependencies {
implementation(kotlin("stdlib-jdk8"))
implementation(rootProject)
implementation("org.veupathdb.lib:hash-id:1.0.2")
implementation("com.fasterxml.jackson.core:jackson-databind:2.13.0")
implementation("com.fasterxml.jackson.core:jackson-databind:2.13.3")
}

kotlin {
Expand Down

0 comments on commit 6625733

Please sign in to comment.