Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

22 worker class hierarchy needs reworking #35

Merged
merged 3 commits into from
Mar 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions src/main/kotlin/com/lowbudgetlcs/Application.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
package com.lowbudgetlcs

import com.lowbudgetlcs.http.RiotApiClient
import com.lowbudgetlcs.repositories.games.AllGamesDatabase
import com.lowbudgetlcs.repositories.players.AllPlayersDatabase
import com.lowbudgetlcs.repositories.riot.MatchRepositoryRiot
import com.lowbudgetlcs.repositories.series.AllSeriesDatabase
import com.lowbudgetlcs.repositories.teams.AllTeamsDatabase
import com.lowbudgetlcs.util.RateLimiter
import com.lowbudgetlcs.workers.StatDaemon
import com.lowbudgetlcs.workers.TournamentEngine
import io.ktor.server.application.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.slf4j.Logger
import org.slf4j.LoggerFactory

private val logger : Logger = LoggerFactory.getLogger(Application::class.java)
private const val NUM_INSTANCES = 3
private const val STAT_DAEMON_QUEUE = "STATS"
private const val TOURNAMENT_ENGINE_QUEUE = "CALLBACK"

fun main(args: Array<String>) = io.ktor.server.netty.EngineMain.main(args)

Expand All @@ -17,19 +27,32 @@ fun Application.module() {

configureRouting()

// Start Tournament Engine and Stat Daemons- this is a perfect opportunity for a builer/factory pattern.
logger.info("🏁 Starting background workers...")

launch {
logger.info("📊 Launching StatDaemon instances ($NUM_INSTANCES)...")
StatDaemon.createInstance("STATS").launchInstances(NUM_INSTANCES)
logger.info("✅ StatDaemon instances are running.")
val statDaemon = StatDaemon(
queue = STAT_DAEMON_QUEUE,
gamesRepository = AllGamesDatabase(),
playersRepository = AllPlayersDatabase(),
teamsRepository = AllTeamsDatabase(),
matchRepository = MatchRepositoryRiot(RiotApiClient(), RateLimiter())
)

val tournamentEngine = TournamentEngine(
queue = TOURNAMENT_ENGINE_QUEUE,
gamesRepository = AllGamesDatabase(),
seriesRepository = AllSeriesDatabase(),
playersRepository = AllPlayersDatabase(),
matchRepository = MatchRepositoryRiot(RiotApiClient(), RateLimiter())
)

CoroutineScope(Dispatchers.IO).launch {
logger.info("📊 Launching StatDaemon...")
statDaemon.start()
}
launch {
logger.info("🎮 Launching TournamentEngine instances ($NUM_INSTANCES)...")
TournamentEngine.createInstance("CALLBACK").launchInstances(NUM_INSTANCES)
logger.info("✅ TournamentEngine instances are running.")

CoroutineScope(Dispatchers.IO).launch {
logger.info("🎮 Launching TournamentEngine...")
tournamentEngine.start()
}

logger.info("🍽️ Denny's is open! Ready to serve requests. 🚀")
}

2 changes: 1 addition & 1 deletion src/main/kotlin/com/lowbudgetlcs/bridges/RabbitMQBridge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class RabbitMQBridge(private val queue: String) {
private val config =
ConfigLoaderBuilder.default().withExplicitSealedTypes().addResourceSource("/rabbitmq.yaml").build()
.loadConfigOrThrow<RabbitMQConfig>()
private val logger : Logger = LoggerFactory.getLogger(RabbitMQBridge::class.java)
private val logger: Logger = LoggerFactory.getLogger(RabbitMQBridge::class.java)
private val factory by lazy {
ConnectionFactory().apply {
host = config.host
Expand Down
43 changes: 0 additions & 43 deletions src/main/kotlin/com/lowbudgetlcs/workers/AbstractWorker.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.rabbitmq.client.Delivery
/**
* Defines properties and behavior for service workers listening on message queues.
*/
interface IMessageQListener {
interface IMessageQueueListener {
/**
* Name of the queue emitted/listened on.
*/
Expand Down
18 changes: 0 additions & 18 deletions src/main/kotlin/com/lowbudgetlcs/workers/IWorker.kt

This file was deleted.

183 changes: 84 additions & 99 deletions src/main/kotlin/com/lowbudgetlcs/workers/StatDaemon.kt
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
package com.lowbudgetlcs.workers

import com.lowbudgetlcs.bridges.RabbitMQBridge
import com.lowbudgetlcs.http.RiotApiClient
import com.lowbudgetlcs.models.*
import com.lowbudgetlcs.models.match.MatchParticipant
import com.lowbudgetlcs.models.match.MatchTeam
import com.lowbudgetlcs.models.match.TeamType
import com.lowbudgetlcs.repositories.games.AllGamesDatabase
import com.lowbudgetlcs.repositories.games.IGameRepository
import com.lowbudgetlcs.repositories.games.ShortcodeCriteria
import com.lowbudgetlcs.repositories.players.AllPlayersDatabase
import com.lowbudgetlcs.repositories.players.IPlayerRepository
import com.lowbudgetlcs.repositories.riot.MatchRepositoryRiot
import com.lowbudgetlcs.repositories.teams.AllTeamsDatabase
import com.lowbudgetlcs.repositories.riot.IMatchRepository
import com.lowbudgetlcs.repositories.teams.ITeamRepository
import com.lowbudgetlcs.routes.riot.RiotCallback
import com.lowbudgetlcs.util.RateLimiter
import com.rabbitmq.client.Delivery
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand All @@ -29,38 +24,22 @@ import org.slf4j.LoggerFactory
* This service worker consumes [RiotCallback]s off of [queue] and saves player
* and team data into storage.
*/
class StatDaemon private constructor(
class StatDaemon(
override val queue: String,
private val gamesR: IGameRepository,
private val playersR: IPlayerRepository,
private val teamsR: ITeamRepository,
private val riotMatchRepository: MatchRepositoryRiot
) : AbstractWorker(), IMessageQListener {
private val gamesRepository: IGameRepository,
private val playersRepository: IPlayerRepository,
private val teamsRepository: ITeamRepository,
private val matchRepository: IMatchRepository
) : IMessageQueueListener {

private val logger: Logger = LoggerFactory.getLogger(StatDaemon::class.java)
private val messageq = RabbitMQBridge(queue)
private val messageQueue = RabbitMQBridge(queue)
private val scope = CoroutineScope(Dispatchers.IO)

/**
* Private constructor and companion object prevent direct instantiation.
*
* This behavior is deprecated and will be removed in future versions.
*/
companion object {
fun createInstance(queue: String): StatDaemon = StatDaemon(
queue,
AllGamesDatabase(),
AllPlayersDatabase(),
AllTeamsDatabase(),
MatchRepositoryRiot(RiotApiClient(), RateLimiter())
)
}

override fun createInstance(instanceId: Int): StatDaemon = Companion.createInstance(queue)

override fun start() {
fun start() {
logger.info("🚀 StatDaemon is running...")
logger.debug("📡 Listening on queue: `$queue`")
messageq.listen { _, delivery ->
messageQueue.listen { _, delivery ->
processMessage(delivery)
}
}
Expand All @@ -75,10 +54,10 @@ class StatDaemon private constructor(
try {
val callback = Json.decodeFromString<RiotCallback>(message)
logger.debug("✅ Successfully decoded RiotCallback for game ID: ${callback.gameId}")
CoroutineScope(Dispatchers.IO).launch {
scope.launch {
processRiotCallback(callback)
}
messageq.channel.basicAck(delivery.envelope.deliveryTag, false)
messageQueue.channel.basicAck(delivery.envelope.deliveryTag, false)
} catch (e: SerializationException) {
logger.error("❌ Failed to decode message: $message", e)
} catch (e: IllegalArgumentException) {
Expand All @@ -94,9 +73,9 @@ class StatDaemon private constructor(
private suspend fun processRiotCallback(callback: RiotCallback) {
logger.info("🔍 Fetching match details for game ID: ${callback.gameId}")

val tournamentMatch = riotMatchRepository.getMatch(callback.gameId)
val tournamentMatch = matchRepository.getMatch(callback.gameId)
tournamentMatch?.let { match ->
gamesR.readByCriteria(ShortcodeCriteria(callback.shortCode)).firstOrNull()?.let { game ->
gamesRepository.readByCriteria(ShortcodeCriteria(callback.shortCode)).firstOrNull()?.let { game ->
logger.info("🎮 Processing match `${match.matchInfo.gameId}` for shortcode `${game.shortCode}`...")

match.matchInfo.teams.forEach { team ->
Expand All @@ -119,37 +98,38 @@ class StatDaemon private constructor(
* Saves game data for a [team] consisting of [players] from [game]. [length] is the game duration.
*/
private fun processTeam(team: MatchTeam, players: List<MatchParticipant>, game: Game, length: Long) =
playersR.fetchTeamId(players)?.let { teamId ->
playersRepository.fetchTeamId(players)?.let { teamId ->
logDebugMessage("📝 Saving game data for", teamId.toString(), game.shortCode, "...")
teamsR.readById(teamId)?.let { t ->
teamsRepository.readById(teamId)?.let { t ->
try {
val side = if (team.teamId == TeamType.BLUE.code) RiftSide.BLUE else RiftSide.RED
if (teamsR.saveTeamData(
t, game, TeamGameData(
team.win, side, players.sumOf { it.goldEarned }, length, kills = Objective(
kills = team.objectives.champion?.kills ?: 0,
first = team.objectives.champion?.firstTaken == true
), barons = Objective(
kills = team.objectives.baron?.kills ?: 0,
first = team.objectives.baron?.firstTaken == true
), grubs = Objective(
kills = team.objectives.horde?.kills ?: 0,
first = team.objectives.horde?.firstTaken == true
), dragons = Objective(
kills = team.objectives.dragon?.kills ?: 0,
first = team.objectives.dragon?.firstTaken == true
), heralds = Objective(
kills = team.objectives.riftHerald?.kills ?: 0,
first = team.objectives.riftHerald?.firstTaken == true
), towers = Objective(
kills = team.objectives.tower?.kills ?: 0,
first = team.objectives.tower?.firstTaken == true
), inhibitors = Objective(
kills = team.objectives.inhibitor?.kills ?: 0,
first = team.objectives.inhibitor?.firstTaken == true
if (teamsRepository.saveTeamData(
t, game, TeamGameData(
team.win, side, players.sumOf { it.goldEarned }, length, kills = Objective(
kills = team.objectives.champion?.kills ?: 0,
first = team.objectives.champion?.firstTaken == true
), barons = Objective(
kills = team.objectives.baron?.kills ?: 0,
first = team.objectives.baron?.firstTaken == true
), grubs = Objective(
kills = team.objectives.horde?.kills ?: 0,
first = team.objectives.horde?.firstTaken == true
), dragons = Objective(
kills = team.objectives.dragon?.kills ?: 0,
first = team.objectives.dragon?.firstTaken == true
), heralds = Objective(
kills = team.objectives.riftHerald?.kills ?: 0,
first = team.objectives.riftHerald?.firstTaken == true
), towers = Objective(
kills = team.objectives.tower?.kills ?: 0,
first = team.objectives.tower?.firstTaken == true
), inhibitors = Objective(
kills = team.objectives.inhibitor?.kills ?: 0,
first = team.objectives.inhibitor?.firstTaken == true
)
)
)
) != null) logDebugMessage("✅ Saved game data for", t.name, game.shortCode)
) != null
) logDebugMessage("✅ Saved game data for", t.name, game.shortCode)
else logDebugMessage("❌ Failed to save game data for", t.name, game.shortCode)
} catch (e: Throwable) {
logError(e, t.name, game.shortCode)
Expand All @@ -166,42 +146,47 @@ class StatDaemon private constructor(
"📝 Saving game data for", "${player.riotGameName}#${player.riotTagline}", game.shortCode, "..."
)
try {
playersR.readByPuuid(player.playerUniqueUserId)?.let { p ->
if (playersR.savePlayerData(
p, game, PlayerGameData(
player.kills,
player.deaths,
player.assists,
player.championLevel,
player.goldEarned.toLong(),
player.visionScore,
player.totalDamageToChampions,
player.totalHealsOnTeammates,
player.totalDamageShieldedOnTeammates,
player.totalDamageTaken,
player.damageSelfMitigated,
player.damageDealtToTurrets,
player.longestTimeSpentLiving,
player.doubleKills,
player.tripleKills,
player.quadraKills,
player.pentaKills,
player.totalMinionsKilled + player.neutralMinionsKilled,
player.championName,
player.item0,
player.item1,
player.item2,
player.item3,
player.item4,
player.item5,
player.item6,
player.perks.perkStyles[0].style,
player.perks.perkStyles[1].style,
player.summoner1Id,
player.summoner2Id
)
) != null) logDebugMessage("✅ Saved stats for", "${player.riotGameName}#${player.riotTagline}", game.shortCode)
else logDebugMessage("❌ Failed to save stats data for", "${player.riotGameName}#${player.riotTagline}", game.shortCode)
playersRepository.readByPuuid(player.playerUniqueUserId)?.let { p ->
if (playersRepository.savePlayerData(
p, game, PlayerGameData(
player.kills,
player.deaths,
player.assists,
player.championLevel,
player.goldEarned.toLong(),
player.visionScore,
player.totalDamageToChampions,
player.totalHealsOnTeammates,
player.totalDamageShieldedOnTeammates,
player.totalDamageTaken,
player.damageSelfMitigated,
player.damageDealtToTurrets,
player.longestTimeSpentLiving,
player.doubleKills,
player.tripleKills,
player.quadraKills,
player.pentaKills,
player.totalMinionsKilled + player.neutralMinionsKilled,
player.championName,
player.item0,
player.item1,
player.item2,
player.item3,
player.item4,
player.item5,
player.item6,
player.perks.perkStyles[0].style,
player.perks.perkStyles[1].style,
player.summoner1Id,
player.summoner2Id
)
) != null
) logDebugMessage("✅ Saved stats for", "${player.riotGameName}#${player.riotTagline}", game.shortCode)
else logDebugMessage(
"❌ Failed to save stats data for",
"${player.riotGameName}#${player.riotTagline}",
game.shortCode
)
}
} catch (e: Throwable) {
logError(e, "${player.riotGameName}#${player.riotTagline}", game.shortCode)
Expand Down
Loading