Skip to content

Commit

Permalink
OK-567 WIP Form ensikertalaisuus-siirtotiedostos in timed-ovara
Browse files Browse the repository at this point in the history
  • Loading branch information
msiukola committed Jul 3, 2024
1 parent 88b4948 commit 2ea4df8
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 78 deletions.
2 changes: 1 addition & 1 deletion ovara-suoritusrekisteri/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<artifactId>ovara-suoritusrekisteri</artifactId>
<packaging>jar</packaging>

<name>ovara-hakurekisteri</name>
<name>ovara-suoritusrekisteri</name>
<url>http://maven.apache.org</url>

<properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
package fi.vm.sade.hakurekisteri.ovara.ajastus

import akka.actor.ActorSystem
import akka.actor.{ActorSystem, Props}
import org.slf4j.{Logger, LoggerFactory}
import fi.vm.sade.hakurekisteri.Config
import fi.vm.sade.hakurekisteri.ensikertalainen.EnsikertalainenActor
import fi.vm.sade.hakurekisteri.integration.henkilo.PersonOidsWithAliases
import fi.vm.sade.hakurekisteri.ovara.{
OvaraDbRepository,
OvaraDbRepositoryImpl,
OvaraService,
SiirtotiedostoClient,
SiirtotiedostoProcess,
SiirtotiedostoProcessInfo
}
import support.{BareRegisters, BaseKoosteet, DbJournals, Integrations, PersonAliasesProvider}
import fi.vm.sade.hakurekisteri.ovara.{OvaraService, SiirtotiedostoClient}
import support.{BareRegisters, DbJournals, Integrations, OvaraIntegrations, PersonAliasesProvider}

import java.util.UUID
import scala.concurrent.Future

object SiirtotiedostoApp {
Expand All @@ -25,35 +18,44 @@ object SiirtotiedostoApp {
implicit val actorSystem: ActorSystem = system
val journals = new DbJournals(config)

val noAliasesProvider = new PersonAliasesProvider {
override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = {
Future.successful(PersonOidsWithAliases(henkiloOids, Map()))
}
val ovaraIntegrations = new OvaraIntegrations(system, config)

val personAliasesProvider = new PersonAliasesProvider {
override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] =
ovaraIntegrations.oppijaNumeroRekisteri.enrichWithAliases(henkiloOids)
}

//Todo, saadaanko jotenkin helposti käsiin ensikertalaisActor ja hakuActor ilman että initialisoidaan koko himmeli,
//vrt. ovaraDbRepository. HakuActoria ei tarvita, jos haetaan aktiiviset kk-haut jollain muulla tavalla.
//val registers =
// new BareRegisters(system, journals, journals.database, noAliasesProvider, config)
//val integrations = Integrations(registers, system, config)
val registers =
new BareRegisters(system, journals, journals.database, personAliasesProvider, config)

val ovaraDbRepository: OvaraDbRepository = new OvaraDbRepositoryImpl(journals.database)
//val koosteet = new BaseKoosteet(system, integrations, registers, config)
val ensikertalainenActor = system.actorOf(
Props(
new EnsikertalainenActor(
registers.suoritusRekisteri,
registers.opiskeluoikeusRekisteri,
ovaraIntegrations.valintarekisteri,
ovaraIntegrations.tarjonta,
ovaraIntegrations.haut,
ovaraIntegrations.hakemusService,
ovaraIntegrations.oppijaNumeroRekisteri,
config
)
),
"ovara-ensikertalainen"
)

new OvaraService(
ovaraDbRepository,
registers.ovaraDbRepository,
new SiirtotiedostoClient(config.siirtotiedostoClientConfig),
null, //fixme
null, //fixme
ensikertalainenActor,
ovaraIntegrations.haut,
config.siirtotiedostoPageSize
)
}

def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("ovara-suoritusrekisteri")

try {
println("Hello, ovara-suoritusrekisteri world!")

logger.info(s"Hello, ovara-suoritusrekisteri world!")

val config = Config.fromString("default")
Expand All @@ -64,14 +66,16 @@ object SiirtotiedostoApp {
val ovaraService = createOvaraService(config, system)

//Todo implement ajastus db logic and instantiate OvaraService etc
val result = ovaraService.formNextSiirtotiedosto
logger.info(s"Operation result: $result")
ovaraService.muodostaSeuraavaSiirtotiedosto
system.terminate()
} catch {
case t: Throwable =>
println(s"Virhe siirtotiedoston muodostamisessa: ${t.getMessage}")
logger.error(s"Virhe siirtotiedoston muodostamisessa: ${t.getMessage}", t)
logger.error(s"Siirtotiedoston muodostaminen epäonnistui, lopetetaan: ${t.getMessage}", t)
system.terminate()
Thread.sleep(5000)
System.exit(
1
) //Fixme, juuri nyt tämä on tarpeellinen että suoritus saadaan katki, mutta siistimmin voisi yrittää. Joku service/actor ilmeisesti jää ajoon myös system.terminaten jölkeen.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,8 @@ import fi.vm.sade.hakurekisteri.ensikertalainen.{
MenettamisenPeruste
}
import fi.vm.sade.hakurekisteri.integration.ExecutorUtil
import fi.vm.sade.hakurekisteri.integration.haku.{
AllHaut,
GetHautQuery,
Haku,
HakuRequest,
RestHaku,
RestHakuResult
}
import fi.vm.sade.hakurekisteri.integration.kouta.KoutaInternalActorRef
import fi.vm.sade.hakurekisteri.rest.support.HakurekisteriJsonSupport
import fi.vm.sade.hakurekisteri.integration.haku.{AllHaut, Haku, HakuRequest}

import fi.vm.sade.utils.slf4j.Logging

import java.util.UUID
Expand All @@ -29,9 +21,9 @@ import scala.concurrent.duration.DurationInt
trait IOvaraService {
//Muodostetaan siirtotiedostot kaikille neljälle tyypille. Jos dataa on aikavälillä paljon, muodostuu useita tiedostoja per tyyppi.
//Tiedostot tallennetaan s3:seen.
def formNextSiirtotiedosto(): SiirtotiedostoProcess
def muodostaSeuraavaSiirtotiedosto(): SiirtotiedostoProcess
def formSiirtotiedostotPaged(process: SiirtotiedostoProcess): SiirtotiedostoProcess
def formEnsikertalainenSiirtotiedostoForHakus(hakuOids: Seq[String])
def formEnsikertalainenSiirtotiedostoForHakus(hakuOids: Seq[String]): String
}

class OvaraService(
Expand All @@ -44,7 +36,7 @@ class OvaraService(
with Logging {

implicit val ec: ExecutionContext = ExecutorUtil.createExecutor(
2,
6,
getClass.getSimpleName
)

Expand Down Expand Up @@ -101,23 +93,48 @@ class OvaraService(
//Haetaan hakujen oidit ja synkataan ensikertalaiset näille
def triggerEnsikertalaiset(vainAktiiviset: Boolean) = {
implicit val to: Timeout = Timeout(5.minutes)
val allHaut: Future[AllHaut] =
(hakuActor ? HakuRequest)
.mapTo[AllHaut] //fixme, tää on ongelma koska palautuu 0 hakua jos kaikkia hakuja ei oo vielä haettu actorin päässä.
val hakuResult = Await.result(allHaut, 5.minutes)
val kiinnostavat =
hakuResult.haut.filter(haku => (!vainAktiiviset || haku.isActive) && haku.kkHaku).map(_.oid)
logger.info(
s"Löydettiin ${kiinnostavat.size} kiinnostavaa hakua yhteensä ${hakuResult.haut.size} hausta. Vain aktiiviset: $vainAktiiviset"
)
formEnsikertalainenSiirtotiedostoForHakus(kiinnostavat)

val MILLIS_TO_WAIT = 5000
//Odotetaan, että HakuActor saa haut ladattua cacheen.
//Haut tarvitaan cacheen myös sitä varten, että EnsikertalaisActor kutsuu myöhemmin HakuActoria.
def waitForHautCache(millisToWaitLeft: Long = 600 * 1000): Seq[Haku] = {
if (millisToWaitLeft > 0) {
val allHaut: Future[AllHaut] =
(hakuActor ? HakuRequest)
.mapTo[AllHaut]
val hakuResult: AllHaut = Await.result(allHaut, 10.seconds)
if (hakuResult.haut.nonEmpty) {
hakuResult.haut
} else {
logger.info(s"HakuCache ei vielä valmis, odotetaan $MILLIS_TO_WAIT ms")
Thread.sleep(MILLIS_TO_WAIT)
waitForHautCache(millisToWaitLeft - MILLIS_TO_WAIT)
}
} else {
throw new RuntimeException(s"Hakuja ei saatu ladattua")
}
}

logger.info(s"Muodostetaan ensikertalaisuudet, vain aktiiviset: $vainAktiiviset")
try {
val haut = waitForHautCache(600 * 1000)
val kiinnostavat =
haut.filter(haku => (!vainAktiiviset || haku.isActive) && haku.kkHaku).map(_.oid)
logger.info(
s"Löydettiin ${kiinnostavat.size} kiinnostavaa hakua yhteensä ${haut.size} hausta. Vain aktiiviset: $vainAktiiviset"
)
formEnsikertalainenSiirtotiedostoForHakus(kiinnostavat)
} catch {
case t: Throwable =>
logger.error(s"Ensikertalaisten siirtotiedostojen muodostaminen epäonnistui: ", t)
}
}

//Ensivaiheessa ajetaan tämä kaikille kk-hauille kerran, myöhemmin riittää synkata kerran päivässä aktiivisten kk-hakujen tiedot
def formEnsikertalainenSiirtotiedostoForHakus(hakuOids: Seq[String]) = {
def formEnsikertalainenSiirtotiedostoForHakus(hakuOids: Seq[String]): String = {
val executionId = UUID.randomUUID().toString
var fileNumber = 1
def formSiirtotiedostoForHaku(hakuOid: String) = {
def formEnsikertalainenSiirtotiedostoForHaku(hakuOid: String) = {
implicit val to: Timeout = Timeout(30.minutes)

val ensikertalaiset: Future[Seq[Ensikertalainen]] =
Expand Down Expand Up @@ -148,34 +165,39 @@ class OvaraService(
if (hakuOids.size <= 5) s"hauille ${hakuOids.toString}" else s"${hakuOids.size} haulle."
logger.info(s"($executionId) Muodostetaan siirtotiedostot $infoStr")
val resultsByHaku = hakuOids.map(hakuOid => {
val start = System.currentTimeMillis()
try {
val result = formSiirtotiedostoForHaku(hakuOid)
//Todo, muu toteutus tälle? Mikä on riittävä timeout, mitä jos jäädään jumiin? Käsiteltäviä hakuja voi olla paljon,
//kaikkea ei voi tehdä rinnakkain. Muutaman kerrallaan varmaan voisi.
val result = formEnsikertalainenSiirtotiedostoForHaku(hakuOid)
Await.result(result, 45.minutes)
logger.info(s"($executionId) Valmista haulle $hakuOid")
logger.info(
s"($executionId) Valmista haulle $hakuOid, kesto ${System.currentTimeMillis() - start} ms"
)
fileNumber += 1
(hakuOid, None)
} catch {
case t: Throwable =>
logger
.error(
s"($executionId) Siirtotiedoston muodostaminen haun $hakuOid ensikertalaisista epäonnistui:",
t
s"($executionId) (${System.currentTimeMillis() - start} ms) Siirtotiedoston muodostaminen haun $hakuOid ensikertalaisista epäonnistui:",
t.getMessage
)
(
hakuOid,
Some(t.getMessage)
) //Todo, retry? Voisi olla järkevää, jos muodostetaan siirtotiedosto kymmenille hauille ja yksi satunnaissepäonnistuu.
)
}

})
val failed = resultsByHaku.filter(_._2.isDefined)
logger.error(s"($executionId) Failed: $failed")
failed.foreach(result =>
logger.error(
s"Ei saatu muodostettua ensikertalaisten siirtotiedostoa haulle ${result._1}: ${result._2}"
)
)
s"Onnistuneita ${hakuOids.size - failed.size}, epäonnistuneita ${failed.size}"
}

def formNextSiirtotiedosto = {
def muodostaSeuraavaSiirtotiedosto = {
val executionId = UUID.randomUUID().toString
val latestProcessInfo: Option[SiirtotiedostoProcess] =
db.getLatestProcessInfo
Expand All @@ -189,17 +211,35 @@ class OvaraService(

val windowEnd = System.currentTimeMillis()

val newProcessInfo =
val newProcessInfo: SiirtotiedostoProcess =
db.createNewProcess(executionId, windowStart, windowEnd)
.getOrElse(throw new RuntimeException("Siirtotiedosto process does not exist!"))
logger.info(s"Luotiin ja persistoitiin tieto luodusta: $newProcessInfo")

//Todo, jonkinlainen mekanismi sille että muodostetaan ensikertalaisuudet kerran päivässä, muulloin vain muut muutokset.
val result = formSiirtotiedostotPaged(
newProcessInfo.getOrElse(throw new RuntimeException("Siirtotiedosto process does not exist!"))
)
logger.info(s"Siirtotiedostojen muodostus valmistui, persistoidaan tulokset: $result")
db.persistFinishedProcess(result)
result
try {
//Todo, only do this once a day, eg. when hour == 0
if (true) {
logger.info(s"${newProcessInfo.executionId} Muodostetaan ensikertalaisuudet")
triggerEnsikertalaiset(true)
}

val processResult: SiirtotiedostoProcess = formSiirtotiedostotPaged(
newProcessInfo
)

logger.info(s"Siirtotiedostojen muodostus valmistui, persistoidaan tulokset: $processResult")
db.persistFinishedProcess(processResult)
processResult
} catch {
case t: Throwable =>
logger.error(
s"Virhe siirtotiedoston muodostamisessa tai persistoinnissa, merkitään virhe kantaan...",
t
)
db.persistFinishedProcess(newProcessInfo.copy(errorMessage = Some(t.getMessage)))
throw t
}

}

def formSiirtotiedostotPaged(process: SiirtotiedostoProcess): SiirtotiedostoProcess = {
Expand Down Expand Up @@ -240,15 +280,15 @@ class OvaraService(
"opiskeluoikeus" -> opiskeluoikeusResult
)
logger.info(s"(${process.executionId}) Siirtotiedostot muodostettu, tuloksia: $resultCounts")
process.copy(info = SiirtotiedostoProcessInfo(resultCounts))
process.copy(info = SiirtotiedostoProcessInfo(resultCounts), finishedSuccessfully = true)
}

}

class OvaraServiceMock extends IOvaraService {
override def formSiirtotiedostotPaged(process: SiirtotiedostoProcess) = ???

override def formEnsikertalainenSiirtotiedostoForHakus(hakuOids: Seq[String]): Unit = ???
override def formEnsikertalainenSiirtotiedostoForHakus(hakuOids: Seq[String]): String = ???

override def formNextSiirtotiedosto(): SiirtotiedostoProcess = ???
override def muodostaSeuraavaSiirtotiedosto(): SiirtotiedostoProcess = ???
}
Loading

0 comments on commit 2ea4df8

Please sign in to comment.