Skip to content

Commit

Permalink
OK-567 Also report ensikertalaistieto amounts
Browse files Browse the repository at this point in the history
  • Loading branch information
msiukola committed Jul 4, 2024
1 parent 7a7ead0 commit 8dfbbe4
Showing 1 changed file with 51 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt

/**
* Muodostetaan siirtotiedostot kaikille neljälle tyypille. Jos dataa on aikavälillä paljon, muodostuu useita tiedostoja per tyyppi.
* Tiedostot tallennetaan s3:seen.
*/
* Muodostetaan siirtotiedostot kaikille neljälle tyypille. Jos dataa on aikavälillä paljon, muodostuu useita tiedostoja per tyyppi.
* Tiedostot tallennetaan s3:seen.
*/
trait IOvaraService {
def muodostaSeuraavaSiirtotiedosto(): SiirtotiedostoProcess
def formSiirtotiedostotPaged(process: SiirtotiedostoProcess): SiirtotiedostoProcess
def formEnsikertalainenSiirtotiedostoForHakus(hakuOids: Seq[String]): String
def formEnsikertalainenSiirtotiedostoForHakus(
hakuOids: Seq[String]
): Seq[HaunEnsikertalaisetResult]
}

case class HaunEnsikertalaisetResult(hakuOid: String, total: Int, error: Option[Throwable])

class OvaraService(
db: OvaraDbRepository,
s3Client: SiirtotiedostoClient,
Expand Down Expand Up @@ -93,7 +97,7 @@ class OvaraService(
)

//Haetaan hakujen oidit ja synkataan ensikertalaiset näille
def triggerEnsikertalaiset(vainAktiiviset: Boolean) = {
def triggerEnsikertalaiset(vainAktiiviset: Boolean): Seq[HaunEnsikertalaisetResult] = {
implicit val to: Timeout = Timeout(5.minutes)

val MILLIS_TO_WAIT = 5000
Expand Down Expand Up @@ -130,15 +134,18 @@ class OvaraService(
} catch {
case t: Throwable =>
logger.error(s"Ensikertalaisten siirtotiedostojen muodostaminen epäonnistui: ", t)
Seq.empty
}
}

//Ensivaiheessa ajetaan tämä kaikille kk-hauille kerran, myöhemmin riittää synkata kerran päivässä aktiivisten kk-hakujen tiedot
def formEnsikertalainenSiirtotiedostoForHakus(hakuOids: Seq[String]): String = {
def formEnsikertalainenSiirtotiedostoForHakus(
hakuOids: Seq[String]
): Seq[HaunEnsikertalaisetResult] = {
val executionId = UUID.randomUUID().toString
val fileCounter = new AtomicReference[Int](0)
val results = new AtomicReference[List[(String, Option[Throwable])]](List.empty)
def formEnsikertalainenSiirtotiedostoForHaku(hakuOid: String) = {
val results = new AtomicReference[List[HaunEnsikertalaisetResult]](List.empty)
def formEnsikertalainenSiirtotiedostoForHaku(hakuOid: String): Future[(String, Int)] = {
implicit val to: Timeout = Timeout(30.minutes)
logger.info(s"($executionId) Ei löytynyt lainkaan ensikertalaisuustietoja haulle $hakuOid")
val ensikertalaiset: Future[Seq[Ensikertalainen]] =
Expand All @@ -163,12 +170,13 @@ class OvaraService(
executionId,
fileCounter.updateAndGet(c => c + 1)
)
(hakuOid, ensikertalaiset.size)
} else {
logger.info(
s"($executionId) Ei löytynyt lainkaan ensikertalaisuustietoja haulle $hakuOid"
)
(hakuOid, 0)
}

})
}

Expand All @@ -179,15 +187,15 @@ class OvaraService(
.foreach(hakuOid => {
val start = System.currentTimeMillis()
try {
val result = formEnsikertalainenSiirtotiedostoForHaku(hakuOid)
Await.result(result, 45.minutes)
val result = Await.result(formEnsikertalainenSiirtotiedostoForHaku(hakuOid), 45.minutes)
logger.info(
s"($executionId) Valmista haulle $hakuOid, kesto ${System.currentTimeMillis() - start} ms"
)
val totalProcessed = results.updateAndGet(r => (hakuOid, None) :: r)
val totalProcessed =
results.updateAndGet(r => HaunEnsikertalaisetResult(result._1, result._2, None) :: r)
logger.info(
s"Valmiina ${totalProcessed.size} / ${hakuOids.size}, onnistuneita ${totalProcessed
.count(_._2.isEmpty)} ja epäonnistuneita ${totalProcessed.count(_._2.nonEmpty)}"
.count(_.error.isEmpty)} ja epäonnistuneita ${totalProcessed.count(_.error.nonEmpty)}"
)
} catch {
case t: Throwable =>
Expand All @@ -196,20 +204,23 @@ class OvaraService(
s"($executionId) (kesto ${System.currentTimeMillis() - start} ms) Siirtotiedoston muodostaminen haun $hakuOid ensikertalaisista epäonnistui:",
t
)
val totalProcessed = results.updateAndGet(r => (hakuOid, Some(t)) :: r)
val totalProcessed =
results.updateAndGet(r => HaunEnsikertalaisetResult(hakuOid, 0, Some(t)) :: r)
logger.info(
s"Valmiina ${totalProcessed.size} / ${hakuOids.size}, onnistuneita ${totalProcessed
.count(_._2.isEmpty)} ja epäonnistuneita ${totalProcessed.count(_._2.nonEmpty)}"
.count(_.error.isEmpty)} ja epäonnistuneita ${totalProcessed.count(_.error.nonEmpty)}"
)
}
})
val failed = results.get().filter(_._2.isDefined)
val finalResults = results.get()
val failed = finalResults.filter(_.error.isDefined)
failed.foreach(result =>
logger.error(
s"Ei saatu muodostettua ensikertalaisten siirtotiedostoa haulle ${result._1}: ${result._2}"
s"Ei saatu muodostettua ensikertalaisten siirtotiedostoa haulle ${result.hakuOid}: ${result.error}"
)
)
s"Onnistuneita ${hakuOids.size - failed.size}, epäonnistuneita ${failed.size}"
logger.info(s"Onnistuneita ${hakuOids.size - failed.size}, epäonnistuneita ${failed.size}")
finalResults
}

def muodostaSeuraavaSiirtotiedosto = {
Expand All @@ -232,18 +243,29 @@ class OvaraService(
logger.info(s"Luotiin ja persistoitiin tieto luodusta: $newProcessInfo")

try {
if (OvaraUtil.shouldFormEnsikertalaiset()) {
logger.info(s"${newProcessInfo.executionId} Muodostetaan ensikertalaisuudet")
triggerEnsikertalaiset(true)
}

val processResult: SiirtotiedostoProcess = formSiirtotiedostotPaged(
val mainResults: SiirtotiedostoProcess = formSiirtotiedostotPaged(
newProcessInfo
)

logger.info(s"Siirtotiedostojen muodostus valmistui, persistoidaan tulokset: $processResult")
db.persistFinishedProcess(processResult)
processResult
val ensikertalaisetResults =
if (true)
triggerEnsikertalaiset(true)
else Seq.empty

val combinedInfo = SiirtotiedostoProcessInfo(
mainResults.info.entityTotals ++ ensikertalaisetResults
.map(r => "ek_" + r.hakuOid -> r.total.toLong)
.toMap
)

val combinedResults = mainResults.copy(info = combinedInfo)

logger.info(
s"Siirtotiedostojen muodostus valmistui, persistoidaan tulokset: $combinedResults"
)
db.persistFinishedProcess(combinedResults)
combinedResults
} catch {
case t: Throwable =>
logger.error(
Expand Down Expand Up @@ -302,7 +324,9 @@ class OvaraService(
class OvaraServiceMock extends IOvaraService {
override def formSiirtotiedostotPaged(process: SiirtotiedostoProcess) = ???

override def formEnsikertalainenSiirtotiedostoForHakus(hakuOids: Seq[String]): String = ???
override def formEnsikertalainenSiirtotiedostoForHakus(
hakuOids: Seq[String]
): Seq[HaunEnsikertalaisetResult] = ???

override def muodostaSeuraavaSiirtotiedosto(): SiirtotiedostoProcess = ???
}

0 comments on commit 8dfbbe4

Please sign in to comment.