Skip to content

Commit

Permalink
Merge pull request #597 from Opetushallitus/OY-4836
Browse files Browse the repository at this point in the history
OY-4836 Use separate thread pools instead of global for hakemukset an…
  • Loading branch information
msiukola authored May 16, 2024
2 parents e8266f4 + 6977252 commit 7d093ec
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ suoritusrekisteri.koski.update.cronJob={{ suoritusrekisteri_koski_update_cronjob
suoritusrekisteri.koski.update.kkHaut={{ suoritusrekisteri_koski_update_kkHaut | default('false') }}
suoritusrekisteri.koski.update.toisenAsteenHaut={{ suoritusrekisteri_koski_update_toisenAsteenHaut | default('false') }}
suoritusrekisteri.koski.update.jatkuvatHaut={{ suoritusrekisteri_koski_update_jatkuvatHaut | default('false') }}
suoritusrekisteri.modifiedhakemukset.backtrack.days={{ suoritusrekisteri_modifiedhakemukset_backtrack_days | default('2')}}
suoritusrekisteri.modifiedhakemukset.backtrack.hours={{ suoritusrekisteri_modifiedhakemukset_backtrack_hours | default('2')}}
suoritusrekisteri.oppijanumerorekisteri-service.max-connections={{ suoritusrekisteri_oppijanumerorekisteriservice_max_connections | default('50')}}
suoritusrekisteri.oppijanumerorekisteri-service.max-connection-queue-ms={{ suoritusrekisteri_oppijanumerorekisteriservice_max_connection_queue_ms | default('60000')}}
suoritusrekisteri.oppijanumerorekisteri-service.max.oppijat.batch.size={{ suoritusrekisteri_oppijanumerorekisteriservice_max_oppijat_batch_size | default('5000')}}
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/fi/vm/sade/hakurekisteri/audit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ case object YTLSyncForPerson extends Operation {
def name: String = "REQUEST_YTL_SYNC_FOR_PERSON"
}

case object YTLSyncForPersons extends Operation {
def name: String = "REQUEST_YTL_SYNC_FOR_PERSONS"
}

case object YTLSyncForHaku extends Operation {
def name: String = "REQUEST_YTL_SYNC_FOR_HAKU"
}

case object HenkilonTiedotVirrasta extends Operation {
def name: String = "READ_VIRTA_TIEDOT"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ import fi.vm.sade.hakurekisteri.integration.kouta.{
KoutaInternalHakukohde
}
import fi.vm.sade.hakurekisteri.integration.organisaatio.{Organisaatio, OrganisaatioActorRef}
import fi.vm.sade.hakurekisteri.integration.{OphUrlProperties, ServiceConfig, VirkailijaRestClient}
import fi.vm.sade.hakurekisteri.integration.{
ExecutorUtil,
OphUrlProperties,
ServiceConfig,
VirkailijaRestClient
}
import fi.vm.sade.hakurekisteri.rest.support.{HakurekisteriJsonSupport, Query}
import fi.vm.sade.properties.OphProperties
import org.joda.time.{DateTimeZone, LocalDate}
Expand All @@ -42,8 +47,7 @@ import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.TimeUnit
import scala.compat.Platform
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -192,6 +196,11 @@ class HakemusService(
)(implicit val system: ActorSystem)
extends IHakemusService {

implicit val ec: ExecutionContext = ExecutorUtil.createExecutor(
config.integrations.asyncOperationThreadPoolSize,
getClass.getSimpleName
)

case class SearchParams(
aoOids: Seq[String] = null,
asId: String = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package fi.vm.sade.hakurekisteri.integration.henkilo
import akka.actor.ActorSystem
import akka.event.Logging
import fi.vm.sade.hakurekisteri.Config
import fi.vm.sade.hakurekisteri.integration.VirkailijaRestClient
import fi.vm.sade.hakurekisteri.integration.{ExecutorUtil, VirkailijaRestClient}
import fi.vm.sade.hakurekisteri.integration.hakemus.HakemusHenkilotiedot
import fi.vm.sade.hakurekisteri.integration.mocks.HenkiloMock
import org.apache.commons.httpclient.HttpStatus
Expand All @@ -12,8 +12,7 @@ import org.json4s.{DefaultFormats, _}
import support.PersonAliasesProvider

import scala.collection.Iterator
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

/**
Expand All @@ -35,12 +34,7 @@ case class LinkedHenkiloOids(

trait IOppijaNumeroRekisteri {
def fetchLinkedHenkiloOidsMap(henkiloOids: Set[String]): Future[LinkedHenkiloOids]

def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = {
fetchLinkedHenkiloOidsMap(henkiloOids)
.map(_.oidToLinkedOids)
.map(PersonOidsWithAliases(henkiloOids, _))
}
def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases]

def getByHetu(hetu: String): Future[Henkilo]
def fetchHenkilotInBatches(henkiloOids: Set[String]): Future[Map[String, Henkilo]]
Expand All @@ -64,6 +58,11 @@ class OppijaNumeroRekisteri(client: VirkailijaRestClient, val system: ActorSyste
extends IOppijaNumeroRekisteri {
private val logger = Logging.getLogger(system, this)

implicit val ec: ExecutionContext = ExecutorUtil.createExecutor(
config.integrations.asyncOperationThreadPoolSize,
getClass.getSimpleName
)

def fetchInBatches(henkiloOids: Set[String], batchSize: Int) = {
val started = System.currentTimeMillis()
val batches = henkiloOids.grouped(batchSize).zipWithIndex.toList
Expand All @@ -86,6 +85,12 @@ class OppijaNumeroRekisteri(client: VirkailijaRestClient, val system: ActorSyste
}
}

override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = {
fetchLinkedHenkiloOidsMap(henkiloOids)
.map(_.oidToLinkedOids)
.map(PersonOidsWithAliases(henkiloOids, _))
}

override def fetchLinkedHenkiloOidsMap(henkiloOids: Set[String]): Future[LinkedHenkiloOids] = {

if (henkiloOids.isEmpty) {
Expand Down Expand Up @@ -183,6 +188,17 @@ object MockOppijaNumeroRekisteri extends IOppijaNumeroRekisteri {
val henkiloOid = "1.2.246.562.24.58099330694"
val linkedTestPersonOids = Seq(henkiloOid, masterOid)

implicit val ec: ExecutionContext = ExecutorUtil.createExecutor(
1,
getClass.getSimpleName
)

override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = {
fetchLinkedHenkiloOidsMap(henkiloOids)
.map(_.oidToLinkedOids)
.map(PersonOidsWithAliases(henkiloOids, _))
}

def fetchLinkedHenkiloOidsMap(henkiloOids: Set[String]): Future[LinkedHenkiloOids] = {
Future.successful({
val oidToLinkedOids = henkiloOids.map { queriedOid =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package fi.vm.sade.hakurekisteri.integration.ytl

import akka.actor.{Actor, ActorLogging, ActorRef}
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem}
import akka.pattern.pipe
import fi.vm.sade.hakurekisteri.Config
import fi.vm.sade.hakurekisteri.integration.ExecutorUtil
Expand Down Expand Up @@ -52,7 +52,7 @@ class YtlFetchActor(
val minIntervalBetween = 1000 * 60 * 60 * 22 //At least 22 hours between nightly syncs

implicit val ec: ExecutionContext = ExecutorUtil.createExecutor(
config.integrations.asyncOperationThreadPoolSize,
20,
getClass.getSimpleName
)

Expand Down Expand Up @@ -99,7 +99,7 @@ class YtlFetchActor(
log.error(t, s"($tunniste) Manual sync for haku ${s.hakuOid} failed...")
}
log.info(s"Ytl-sync käynnistetty haulle ${s.hakuOid} tunnisteella $tunniste")
resultF pipeTo sender
sender ! tunniste
case s: YtlSyncSingle =>
if (s.needsToBeActiveKkHakuOid.forall(oid => activeKKHakuOids.get().contains(oid))) {
val tunniste = s.tunniste
Expand All @@ -110,7 +110,7 @@ class YtlFetchActor(
case Failure(t) =>
log.error(t, s"($tunniste) Manual sync for person ${s.personOid} failed...")
}
log.info(s"Ytl-sync käynnistetty haulle ${s.personOid} tunnisteella $tunniste")
log.info(s"Ytl-sync käynnistetty oidille ${s.personOid} tunnisteella $tunniste")
resultF pipeTo sender
} else {
val infoStr = s"Not ytl-syncing $s because the haku is not an active kk-haku"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ import _root_.akka.event.{Logging, LoggingAdapter}
import akka.pattern.ask
import akka.util.Timeout
import fi.vm.sade.auditlog.{Changes, Target}
import fi.vm.sade.hakurekisteri.{AuditUtil, YTLSyncForAll, YTLSyncForPerson}
import fi.vm.sade.hakurekisteri.{
AuditUtil,
YTLSyncForAll,
YTLSyncForHaku,
YTLSyncForPerson,
YTLSyncForPersons
}
import fi.vm.sade.hakurekisteri.integration.ytl.{
Kokelas,
YtlFetchActorRef,
Expand Down Expand Up @@ -46,24 +52,26 @@ class YtlResource(ytlFetchActor: YtlFetchActorRef)(implicit

def shouldBeAdmin = if (!currentUser.exists(_.isAdmin)) throw UserNotAuthorized("not authorized")

post("/http_request") {
post("/http_request", operation(syncAll)) {
shouldBeAdmin
logger.info("Fetching YTL data for everybody")
audit.log(auditUser, YTLSyncForAll, new Target.Builder().build, Changes.EMPTY)
val tunniste = "manual_sync_for_all_hakus" + System.currentTimeMillis()
ytlFetchActor.actor ! YtlSyncAllHaut(tunniste)
Accepted(s"YTL sync started, tunniste $tunniste")
}
get("/http_request_byhaku/:hakuOid") {

get("/http_request_byhaku/:hakuOid", operation(syncHaku)) {
shouldBeAdmin
val hakuOid = params("hakuOid")
logger.info(s"Syncing YTL data for haku $hakuOid")
audit.log(auditUser, YTLSyncForAll, new Target.Builder().build, Changes.EMPTY)
audit.log(auditUser, YTLSyncForHaku, AuditUtil.targetFromParams(params).build, Changes.EMPTY)
val tunniste = "manual_sync_for_haku_" + hakuOid
ytlFetchActor.actor ! YtlSyncHaku(hakuOid, tunniste)
logger.info(s"Returning tunniste $tunniste to caller")
Accepted(s"YTL sync started for haku $hakuOid, tunniste $tunniste")
}

get("/http_request/:personOid", operation(syncPerson)) {
implicit val to: Timeout = Timeout(30.seconds)
shouldBeAdmin
Expand Down Expand Up @@ -91,4 +99,24 @@ class YtlResource(ytlFetchActor: YtlFetchActorRef)(implicit
InternalServerError(errorStr)
}
}

post("/http_request/persons", operation(syncPersons)) {
implicit val to: Timeout = Timeout(30.seconds)
shouldBeAdmin
val personOids = parse(request.body).extract[Set[String]]
logger.info(s"Fetching YTL data for multiple person OIDs $personOids")
audit.log(
auditUser,
YTLSyncForPersons,
AuditUtil.targetFromParams(params).setField("oppijaOids", personOids.toString()).build(),
Changes.EMPTY
)
personOids.foreach(personOid => {
ytlFetchActor.actor ? YtlSyncSingle(
personOid,
tunniste = s"manual_sync_for_person_${personOid}"
)
})
Accepted(s"YTL sync started for persons $personOids")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,31 @@ trait YtlSwaggerApi extends SwaggerSupport {
.required
)
.tags("Ytl-resource")

val syncHaku = apiOperation[Any]("paivitaHaunOpiskelijatYTLTiedot")
.summary("Päivittää haun henkilöiden YTL-tiedot Suoritusrekisteriin.")
.description("Päivittää haunt YTL-tiedot Suoritusrekisteriin.")
.parameter(
pathParam[String]("hakuOid")
.description("hakuOid")
.required
)
.tags("Ytl-resource")

val syncPersons = apiOperation[Any]("paivitaOpiskelijatYTLTiedot")
.summary("Päivittää annetun oppijalistan tiedot YTl:stä Suoritusrekisteriin")
.description("Päivittää annetun oppijalistan tiedot YTl:stä Suoritusrekisteriin")
.parameter(
bodyParam[String]("oppijaoids")
.description(
s"""lista oppijanumeroista (esim ["1.2.246.562.24.00000000001", "1.2.246.562.24.00000000002"]"""
)
.required
)
.tags("Ytl-resource")

val syncAll = apiOperation[Any]("paivitaKaikkiYTLTiedot")
.summary("Päivittää aktiivisten kk-hakujen hakijoiden tiedot YTl:stä Suoritusrekisteriin")
.description("Päivittää aktiivisten kk-hakujen hakijoiden tiedot YTl:stä Suoritusrekisteriin")
.tags("Ytl-resource")
}
6 changes: 3 additions & 3 deletions src/main/scala/support/Integrations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,11 @@ class BaseIntegrations(rekisterit: Registers, system: ActorSystem, config: Confi
hakemusService.addTrigger(arvosanaTrigger)
hakemusService.addTrigger(ytlTrigger)

val daysToBacktrack: Int =
OphUrlProperties.getProperty("suoritusrekisteri.modifiedhakemukset.backtrack.days").toInt
val hoursToBacktrack: Int =
OphUrlProperties.getProperty("suoritusrekisteri.modifiedhakemukset.backtrack.hours").toInt
implicit val scheduler = system.scheduler
hakemusService.processModifiedHakemukset(modifiedAfter =
new Date(Platform.currentTime - TimeUnit.DAYS.toMillis(daysToBacktrack))
new Date(Platform.currentTime - TimeUnit.HOURS.toMillis(hoursToBacktrack))
)

val quartzScheduler = StdSchedulerFactory.getDefaultScheduler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import fi.vm.sade.hakurekisteri.integration.henkilo.{
Henkilo,
IOppijaNumeroRekisteri,
LinkedHenkiloOids,
MockPersonAliasesProvider
MockPersonAliasesProvider,
PersonOidsWithAliases
}
import fi.vm.sade.hakurekisteri.integration.tarjonta._
import fi.vm.sade.hakurekisteri.integration.valintarekisteri.{
Expand Down Expand Up @@ -91,6 +92,13 @@ class OppijaResourceSpec
)
)
}

override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = {
fetchLinkedHenkiloOidsMap(henkiloOids)
.map(_.oidToLinkedOids)
.map(PersonOidsWithAliases(henkiloOids, _))
}

override def getByHetu(hetu: String): Future[Henkilo] = {
throw new UnsupportedOperationException("Not implemented")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import fi.vm.sade.hakurekisteri.integration.hakemus.{
import fi.vm.sade.hakurekisteri.integration.henkilo.{
Henkilo,
IOppijaNumeroRekisteri,
LinkedHenkiloOids
LinkedHenkiloOids,
PersonOidsWithAliases
}
import fi.vm.sade.hakurekisteri.integration.virta.{
VirtaClient,
Expand Down Expand Up @@ -143,6 +144,12 @@ class VirtaSuoritusResourceSpec extends ScalatraFunSuite with DispatchSupport wi
}
}

override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = {
fetchLinkedHenkiloOidsMap(henkiloOids)
.map(_.oidToLinkedOids)
.map(PersonOidsWithAliases(henkiloOids, _))
}

override def getByOids(oids: Set[String]): Future[Map[String, Henkilo]] = Future.successful(
Map(
(
Expand Down

0 comments on commit 7d093ec

Please sign in to comment.