From 1e67fc53ad1b880b8fdd98182636b878d41008de Mon Sep 17 00:00:00 2001 From: Mikko Siukola Date: Tue, 14 May 2024 13:21:11 +0300 Subject: [PATCH 1/3] OY-4836 Use separate thread pools instead of global for hakemukset and onr --- .../integration/hakemus/HakemusService.scala | 15 ++++++-- .../henkilo/oppijaNumeroRekisteri.scala | 34 ++++++++++++++----- .../rest/OppijaResourceSpec.scala | 10 +++++- .../rest/VirtaSuoritusResourceSpec.scala | 9 ++++- 4 files changed, 54 insertions(+), 14 deletions(-) diff --git a/src/main/scala/fi/vm/sade/hakurekisteri/integration/hakemus/HakemusService.scala b/src/main/scala/fi/vm/sade/hakurekisteri/integration/hakemus/HakemusService.scala index ad5324a70..eeabf0b59 100644 --- a/src/main/scala/fi/vm/sade/hakurekisteri/integration/hakemus/HakemusService.scala +++ b/src/main/scala/fi/vm/sade/hakurekisteri/integration/hakemus/HakemusService.scala @@ -30,7 +30,12 @@ import fi.vm.sade.hakurekisteri.integration.kouta.{ KoutaInternalHakukohde } import fi.vm.sade.hakurekisteri.integration.organisaatio.{Organisaatio, OrganisaatioActorRef} -import fi.vm.sade.hakurekisteri.integration.{OphUrlProperties, ServiceConfig, VirkailijaRestClient} +import fi.vm.sade.hakurekisteri.integration.{ + ExecutorUtil, + OphUrlProperties, + ServiceConfig, + VirkailijaRestClient +} import fi.vm.sade.hakurekisteri.rest.support.{HakurekisteriJsonSupport, Query} import fi.vm.sade.properties.OphProperties import org.joda.time.{DateTimeZone, LocalDate} @@ -42,8 +47,7 @@ import java.text.SimpleDateFormat import java.util.Date import java.util.concurrent.TimeUnit import scala.compat.Platform -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} @@ -192,6 +196,11 @@ class HakemusService( )(implicit val system: ActorSystem) extends IHakemusService { + implicit val ec: ExecutionContext = ExecutorUtil.createExecutor( + config.integrations.asyncOperationThreadPoolSize, + getClass.getSimpleName + ) + case class SearchParams( aoOids: Seq[String] = null, asId: String = null, diff --git a/src/main/scala/fi/vm/sade/hakurekisteri/integration/henkilo/oppijaNumeroRekisteri.scala b/src/main/scala/fi/vm/sade/hakurekisteri/integration/henkilo/oppijaNumeroRekisteri.scala index 1940b1099..5297ef0f6 100644 --- a/src/main/scala/fi/vm/sade/hakurekisteri/integration/henkilo/oppijaNumeroRekisteri.scala +++ b/src/main/scala/fi/vm/sade/hakurekisteri/integration/henkilo/oppijaNumeroRekisteri.scala @@ -3,7 +3,7 @@ package fi.vm.sade.hakurekisteri.integration.henkilo import akka.actor.ActorSystem import akka.event.Logging import fi.vm.sade.hakurekisteri.Config -import fi.vm.sade.hakurekisteri.integration.VirkailijaRestClient +import fi.vm.sade.hakurekisteri.integration.{ExecutorUtil, VirkailijaRestClient} import fi.vm.sade.hakurekisteri.integration.hakemus.HakemusHenkilotiedot import fi.vm.sade.hakurekisteri.integration.mocks.HenkiloMock import org.apache.commons.httpclient.HttpStatus @@ -12,8 +12,7 @@ import org.json4s.{DefaultFormats, _} import support.PersonAliasesProvider import scala.collection.Iterator -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} /** @@ -35,12 +34,7 @@ case class LinkedHenkiloOids( trait IOppijaNumeroRekisteri { def fetchLinkedHenkiloOidsMap(henkiloOids: Set[String]): Future[LinkedHenkiloOids] - - def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = { - fetchLinkedHenkiloOidsMap(henkiloOids) - .map(_.oidToLinkedOids) - .map(PersonOidsWithAliases(henkiloOids, _)) - } + def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] def getByHetu(hetu: String): Future[Henkilo] def fetchHenkilotInBatches(henkiloOids: Set[String]): Future[Map[String, Henkilo]] @@ -64,6 +58,11 @@ class OppijaNumeroRekisteri(client: VirkailijaRestClient, val system: ActorSyste extends IOppijaNumeroRekisteri { private val logger = Logging.getLogger(system, this) + implicit val ec: ExecutionContext = ExecutorUtil.createExecutor( + config.integrations.asyncOperationThreadPoolSize, + getClass.getSimpleName + ) + def fetchInBatches(henkiloOids: Set[String], batchSize: Int) = { val started = System.currentTimeMillis() val batches = henkiloOids.grouped(batchSize).zipWithIndex.toList @@ -86,6 +85,12 @@ class OppijaNumeroRekisteri(client: VirkailijaRestClient, val system: ActorSyste } } + override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = { + fetchLinkedHenkiloOidsMap(henkiloOids) + .map(_.oidToLinkedOids) + .map(PersonOidsWithAliases(henkiloOids, _)) + } + override def fetchLinkedHenkiloOidsMap(henkiloOids: Set[String]): Future[LinkedHenkiloOids] = { if (henkiloOids.isEmpty) { @@ -183,6 +188,17 @@ object MockOppijaNumeroRekisteri extends IOppijaNumeroRekisteri { val henkiloOid = "1.2.246.562.24.58099330694" val linkedTestPersonOids = Seq(henkiloOid, masterOid) + implicit val ec: ExecutionContext = ExecutorUtil.createExecutor( + 1, + getClass.getSimpleName + ) + + override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = { + fetchLinkedHenkiloOidsMap(henkiloOids) + .map(_.oidToLinkedOids) + .map(PersonOidsWithAliases(henkiloOids, _)) + } + def fetchLinkedHenkiloOidsMap(henkiloOids: Set[String]): Future[LinkedHenkiloOids] = { Future.successful({ val oidToLinkedOids = henkiloOids.map { queriedOid => diff --git a/src/test/scala/fi/vm/sade/hakurekisteri/rest/OppijaResourceSpec.scala b/src/test/scala/fi/vm/sade/hakurekisteri/rest/OppijaResourceSpec.scala index e9aaba8d0..be3c5792a 100644 --- a/src/test/scala/fi/vm/sade/hakurekisteri/rest/OppijaResourceSpec.scala +++ b/src/test/scala/fi/vm/sade/hakurekisteri/rest/OppijaResourceSpec.scala @@ -16,7 +16,8 @@ import fi.vm.sade.hakurekisteri.integration.henkilo.{ Henkilo, IOppijaNumeroRekisteri, LinkedHenkiloOids, - MockPersonAliasesProvider + MockPersonAliasesProvider, + PersonOidsWithAliases } import fi.vm.sade.hakurekisteri.integration.tarjonta._ import fi.vm.sade.hakurekisteri.integration.valintarekisteri.{ @@ -91,6 +92,13 @@ class OppijaResourceSpec ) ) } + + override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = { + fetchLinkedHenkiloOidsMap(henkiloOids) + .map(_.oidToLinkedOids) + .map(PersonOidsWithAliases(henkiloOids, _)) + } + override def getByHetu(hetu: String): Future[Henkilo] = { throw new UnsupportedOperationException("Not implemented") } diff --git a/src/test/scala/fi/vm/sade/hakurekisteri/rest/VirtaSuoritusResourceSpec.scala b/src/test/scala/fi/vm/sade/hakurekisteri/rest/VirtaSuoritusResourceSpec.scala index 21eb58f17..9559cb79e 100644 --- a/src/test/scala/fi/vm/sade/hakurekisteri/rest/VirtaSuoritusResourceSpec.scala +++ b/src/test/scala/fi/vm/sade/hakurekisteri/rest/VirtaSuoritusResourceSpec.scala @@ -10,7 +10,8 @@ import fi.vm.sade.hakurekisteri.integration.hakemus.{ import fi.vm.sade.hakurekisteri.integration.henkilo.{ Henkilo, IOppijaNumeroRekisteri, - LinkedHenkiloOids + LinkedHenkiloOids, + PersonOidsWithAliases } import fi.vm.sade.hakurekisteri.integration.virta.{ VirtaClient, @@ -143,6 +144,12 @@ class VirtaSuoritusResourceSpec extends ScalatraFunSuite with DispatchSupport wi } } + override def enrichWithAliases(henkiloOids: Set[String]): Future[PersonOidsWithAliases] = { + fetchLinkedHenkiloOidsMap(henkiloOids) + .map(_.oidToLinkedOids) + .map(PersonOidsWithAliases(henkiloOids, _)) + } + override def getByOids(oids: Set[String]): Future[Map[String, Henkilo]] = Future.successful( Map( ( From d4355031ac997211ce8d50964f9777650b2c8a4c Mon Sep 17 00:00:00 2001 From: Mikko Siukola Date: Tue, 14 May 2024 17:09:06 +0300 Subject: [PATCH 2/3] OY-4836 Backtrack hours instead of days, tweaks to YtlFetchActor --- .../suoritusrekisteri.properties.template | 2 +- .../hakurekisteri/integration/ytl/YtlFetchActor.scala | 8 ++++---- src/main/scala/support/Integrations.scala | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/resources/oph-configuration/suoritusrekisteri.properties.template b/src/main/resources/oph-configuration/suoritusrekisteri.properties.template index b4c9aef31..e366a7802 100644 --- a/src/main/resources/oph-configuration/suoritusrekisteri.properties.template +++ b/src/main/resources/oph-configuration/suoritusrekisteri.properties.template @@ -126,7 +126,7 @@ suoritusrekisteri.koski.update.cronJob={{ suoritusrekisteri_koski_update_cronjob suoritusrekisteri.koski.update.kkHaut={{ suoritusrekisteri_koski_update_kkHaut | default('false') }} suoritusrekisteri.koski.update.toisenAsteenHaut={{ suoritusrekisteri_koski_update_toisenAsteenHaut | default('false') }} suoritusrekisteri.koski.update.jatkuvatHaut={{ suoritusrekisteri_koski_update_jatkuvatHaut | default('false') }} -suoritusrekisteri.modifiedhakemukset.backtrack.days={{ suoritusrekisteri_modifiedhakemukset_backtrack_days | default('2')}} +suoritusrekisteri.modifiedhakemukset.backtrack.hours={{ suoritusrekisteri_modifiedhakemukset_backtrack_hours | default('2')}} suoritusrekisteri.oppijanumerorekisteri-service.max-connections={{ suoritusrekisteri_oppijanumerorekisteriservice_max_connections | default('50')}} suoritusrekisteri.oppijanumerorekisteri-service.max-connection-queue-ms={{ suoritusrekisteri_oppijanumerorekisteriservice_max_connection_queue_ms | default('60000')}} suoritusrekisteri.oppijanumerorekisteri-service.max.oppijat.batch.size={{ suoritusrekisteri_oppijanumerorekisteriservice_max_oppijat_batch_size | default('5000')}} diff --git a/src/main/scala/fi/vm/sade/hakurekisteri/integration/ytl/YtlFetchActor.scala b/src/main/scala/fi/vm/sade/hakurekisteri/integration/ytl/YtlFetchActor.scala index 597c574d9..0fb5ae155 100644 --- a/src/main/scala/fi/vm/sade/hakurekisteri/integration/ytl/YtlFetchActor.scala +++ b/src/main/scala/fi/vm/sade/hakurekisteri/integration/ytl/YtlFetchActor.scala @@ -1,6 +1,6 @@ package fi.vm.sade.hakurekisteri.integration.ytl -import akka.actor.{Actor, ActorLogging, ActorRef} +import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem} import akka.pattern.pipe import fi.vm.sade.hakurekisteri.Config import fi.vm.sade.hakurekisteri.integration.ExecutorUtil @@ -52,7 +52,7 @@ class YtlFetchActor( val minIntervalBetween = 1000 * 60 * 60 * 22 //At least 22 hours between nightly syncs implicit val ec: ExecutionContext = ExecutorUtil.createExecutor( - config.integrations.asyncOperationThreadPoolSize, + 20, getClass.getSimpleName ) @@ -99,7 +99,7 @@ class YtlFetchActor( log.error(t, s"($tunniste) Manual sync for haku ${s.hakuOid} failed...") } log.info(s"Ytl-sync käynnistetty haulle ${s.hakuOid} tunnisteella $tunniste") - resultF pipeTo sender + sender ! tunniste case s: YtlSyncSingle => if (s.needsToBeActiveKkHakuOid.forall(oid => activeKKHakuOids.get().contains(oid))) { val tunniste = s.tunniste @@ -110,7 +110,7 @@ class YtlFetchActor( case Failure(t) => log.error(t, s"($tunniste) Manual sync for person ${s.personOid} failed...") } - log.info(s"Ytl-sync käynnistetty haulle ${s.personOid} tunnisteella $tunniste") + log.info(s"Ytl-sync käynnistetty oidille ${s.personOid} tunnisteella $tunniste") resultF pipeTo sender } else { val infoStr = s"Not ytl-syncing $s because the haku is not an active kk-haku" diff --git a/src/main/scala/support/Integrations.scala b/src/main/scala/support/Integrations.scala index 51f20d965..70741f4dd 100644 --- a/src/main/scala/support/Integrations.scala +++ b/src/main/scala/support/Integrations.scala @@ -550,11 +550,11 @@ class BaseIntegrations(rekisterit: Registers, system: ActorSystem, config: Confi hakemusService.addTrigger(arvosanaTrigger) hakemusService.addTrigger(ytlTrigger) - val daysToBacktrack: Int = - OphUrlProperties.getProperty("suoritusrekisteri.modifiedhakemukset.backtrack.days").toInt + val hoursToBacktrack: Int = + OphUrlProperties.getProperty("suoritusrekisteri.modifiedhakemukset.backtrack.hours").toInt implicit val scheduler = system.scheduler hakemusService.processModifiedHakemukset(modifiedAfter = - new Date(Platform.currentTime - TimeUnit.DAYS.toMillis(daysToBacktrack)) + new Date(Platform.currentTime - TimeUnit.HOURS.toMillis(hoursToBacktrack)) ) val quartzScheduler = StdSchedulerFactory.getDefaultScheduler() From 3583e52bbbfc8c5f93b886d41b7750f9505808a7 Mon Sep 17 00:00:00 2001 From: August Kilponen Date: Wed, 15 May 2024 09:05:18 +0300 Subject: [PATCH 3/3] =?UTF-8?q?Lis=C3=A4tty=20monen=20henkil=C3=B6n=20YTL-?= =?UTF-8?q?p=C3=A4ivitysrajapinta?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fi/vm/sade/hakurekisteri/audit.scala | 8 +++++ .../web/integration/ytl/YtlResource.scala | 36 ++++++++++++++++--- .../web/integration/ytl/YtlSwaggerApi.scala | 27 ++++++++++++++ 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/src/main/scala/fi/vm/sade/hakurekisteri/audit.scala b/src/main/scala/fi/vm/sade/hakurekisteri/audit.scala index 638775a96..2e3fc6f63 100644 --- a/src/main/scala/fi/vm/sade/hakurekisteri/audit.scala +++ b/src/main/scala/fi/vm/sade/hakurekisteri/audit.scala @@ -46,6 +46,14 @@ case object YTLSyncForPerson extends Operation { def name: String = "REQUEST_YTL_SYNC_FOR_PERSON" } +case object YTLSyncForPersons extends Operation { + def name: String = "REQUEST_YTL_SYNC_FOR_PERSONS" +} + +case object YTLSyncForHaku extends Operation { + def name: String = "REQUEST_YTL_SYNC_FOR_HAKU" +} + case object HenkilonTiedotVirrasta extends Operation { def name: String = "READ_VIRTA_TIEDOT" } diff --git a/src/main/scala/fi/vm/sade/hakurekisteri/web/integration/ytl/YtlResource.scala b/src/main/scala/fi/vm/sade/hakurekisteri/web/integration/ytl/YtlResource.scala index 098ba031e..871e4188e 100644 --- a/src/main/scala/fi/vm/sade/hakurekisteri/web/integration/ytl/YtlResource.scala +++ b/src/main/scala/fi/vm/sade/hakurekisteri/web/integration/ytl/YtlResource.scala @@ -5,7 +5,13 @@ import _root_.akka.event.{Logging, LoggingAdapter} import akka.pattern.ask import akka.util.Timeout import fi.vm.sade.auditlog.{Changes, Target} -import fi.vm.sade.hakurekisteri.{AuditUtil, YTLSyncForAll, YTLSyncForPerson} +import fi.vm.sade.hakurekisteri.{ + AuditUtil, + YTLSyncForAll, + YTLSyncForHaku, + YTLSyncForPerson, + YTLSyncForPersons +} import fi.vm.sade.hakurekisteri.integration.ytl.{ Kokelas, YtlFetchActorRef, @@ -46,7 +52,7 @@ class YtlResource(ytlFetchActor: YtlFetchActorRef)(implicit def shouldBeAdmin = if (!currentUser.exists(_.isAdmin)) throw UserNotAuthorized("not authorized") - post("/http_request") { + post("/http_request", operation(syncAll)) { shouldBeAdmin logger.info("Fetching YTL data for everybody") audit.log(auditUser, YTLSyncForAll, new Target.Builder().build, Changes.EMPTY) @@ -54,16 +60,18 @@ class YtlResource(ytlFetchActor: YtlFetchActorRef)(implicit ytlFetchActor.actor ! YtlSyncAllHaut(tunniste) Accepted(s"YTL sync started, tunniste $tunniste") } - get("/http_request_byhaku/:hakuOid") { + + get("/http_request_byhaku/:hakuOid", operation(syncHaku)) { shouldBeAdmin val hakuOid = params("hakuOid") logger.info(s"Syncing YTL data for haku $hakuOid") - audit.log(auditUser, YTLSyncForAll, new Target.Builder().build, Changes.EMPTY) + audit.log(auditUser, YTLSyncForHaku, AuditUtil.targetFromParams(params).build, Changes.EMPTY) val tunniste = "manual_sync_for_haku_" + hakuOid ytlFetchActor.actor ! YtlSyncHaku(hakuOid, tunniste) logger.info(s"Returning tunniste $tunniste to caller") Accepted(s"YTL sync started for haku $hakuOid, tunniste $tunniste") } + get("/http_request/:personOid", operation(syncPerson)) { implicit val to: Timeout = Timeout(30.seconds) shouldBeAdmin @@ -91,4 +99,24 @@ class YtlResource(ytlFetchActor: YtlFetchActorRef)(implicit InternalServerError(errorStr) } } + + post("/http_request/persons", operation(syncPersons)) { + implicit val to: Timeout = Timeout(30.seconds) + shouldBeAdmin + val personOids = parse(request.body).extract[Set[String]] + logger.info(s"Fetching YTL data for multiple person OIDs $personOids") + audit.log( + auditUser, + YTLSyncForPersons, + AuditUtil.targetFromParams(params).setField("oppijaOids", personOids.toString()).build(), + Changes.EMPTY + ) + personOids.foreach(personOid => { + ytlFetchActor.actor ? YtlSyncSingle( + personOid, + tunniste = s"manual_sync_for_person_${personOid}" + ) + }) + Accepted(s"YTL sync started for persons $personOids") + } } diff --git a/src/main/scala/fi/vm/sade/hakurekisteri/web/integration/ytl/YtlSwaggerApi.scala b/src/main/scala/fi/vm/sade/hakurekisteri/web/integration/ytl/YtlSwaggerApi.scala index 023e50f4c..94afa855d 100644 --- a/src/main/scala/fi/vm/sade/hakurekisteri/web/integration/ytl/YtlSwaggerApi.scala +++ b/src/main/scala/fi/vm/sade/hakurekisteri/web/integration/ytl/YtlSwaggerApi.scala @@ -13,4 +13,31 @@ trait YtlSwaggerApi extends SwaggerSupport { .required ) .tags("Ytl-resource") + + val syncHaku = apiOperation[Any]("paivitaHaunOpiskelijatYTLTiedot") + .summary("Päivittää haun henkilöiden YTL-tiedot Suoritusrekisteriin.") + .description("Päivittää haunt YTL-tiedot Suoritusrekisteriin.") + .parameter( + pathParam[String]("hakuOid") + .description("hakuOid") + .required + ) + .tags("Ytl-resource") + + val syncPersons = apiOperation[Any]("paivitaOpiskelijatYTLTiedot") + .summary("Päivittää annetun oppijalistan tiedot YTl:stä Suoritusrekisteriin") + .description("Päivittää annetun oppijalistan tiedot YTl:stä Suoritusrekisteriin") + .parameter( + bodyParam[String]("oppijaoids") + .description( + s"""lista oppijanumeroista (esim ["1.2.246.562.24.00000000001", "1.2.246.562.24.00000000002"]""" + ) + .required + ) + .tags("Ytl-resource") + + val syncAll = apiOperation[Any]("paivitaKaikkiYTLTiedot") + .summary("Päivittää aktiivisten kk-hakujen hakijoiden tiedot YTl:stä Suoritusrekisteriin") + .description("Päivittää aktiivisten kk-hakujen hakijoiden tiedot YTl:stä Suoritusrekisteriin") + .tags("Ytl-resource") }