Skip to content

Commit 17b66a0

Browse files
committed
OY-4784 Use ytlFetchActor for nightly sync, use correct tunniste
1 parent 8d082b4 commit 17b66a0

File tree

4 files changed

+42
-37
lines changed

4 files changed

+42
-37
lines changed

src/main/scala/fi/vm/sade/hakurekisteri/integration/ytl/YtlFetchActor.scala

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,18 @@ import fi.vm.sade.properties.OphProperties
1414
import org.apache.commons.io.IOUtils
1515
import scalaz.concurrent.Task
1616
import support.TypedActorRef
17-
import scala.concurrent.duration._
1817

19-
import java.util.UUID
18+
import java.time.LocalDate
19+
import scala.concurrent.duration._
20+
import java.util.{Date, UUID}
2021
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
2122
import scala.concurrent.{Await, ExecutionContext, Future}
2223
import scala.util.{Failure, Success, Try}
2324

2425
case class YtlSyncHaku(hakuOid: String, tunniste: String)
2526

2627
case class YtlSyncAllHaut(tunniste: String)
28+
case class YtlSyncAllHautNightly(tunniste: String)
2729
case class YtlSyncSingle(personOid: String, tunniste: String)
2830
case class ActiveKkHakuOids(hakuOids: Set[String])
2931
case class YtlFetchActorRef(actor: ActorRef) extends TypedActorRef
@@ -40,15 +42,37 @@ class YtlFetchActor(
4042

4143
val activeKKHakuOids = new AtomicReference[Set[String]](Set.empty)
4244

45+
//val lastSyncStart = new AtomicReference[Option[LocalDate]](None)
46+
val lastSyncStart = new AtomicReference[Long](0)
47+
val minIntervalBetween = 1000 * 60 * 22 //At least 22 hours between nightly syncs
48+
4349
implicit val ec: ExecutionContext = ExecutorUtil.createExecutor(
4450
config.integrations.asyncOperationThreadPoolSize,
4551
getClass.getSimpleName
4652
)
4753

4854
def setAktiivisetKKHaut(hakuOids: Set[String]): Unit = activeKKHakuOids.set(hakuOids)
4955
override def receive: Receive = {
56+
case ah: YtlSyncAllHautNightly =>
57+
val tunniste = ah.tunniste
58+
val now = System.currentTimeMillis()
59+
val lss = lastSyncStart.get()
60+
val timeToStartNewSync = (lss + minIntervalBetween) < now
61+
if (timeToStartNewSync) {
62+
log.info(s"Starting nightly sync for all hakus. Previous run was $lss")
63+
lastSyncStart.set(now)
64+
val resultF = syncAllOneHakuAtATime(tunniste)
65+
resultF.onComplete {
66+
case Success(_) =>
67+
log.info(s"($tunniste) Nightly sync for all hakus success!")
68+
case Failure(t) =>
69+
log.error(t, s"($tunniste) Nightly sync for all hakus failed...")
70+
}
71+
} else {
72+
log.warning(s"Not starting nightly sync for all hakus as the previous run was on $lss")
73+
}
5074
case ah: YtlSyncAllHaut =>
51-
val tunniste = "manual_sync_for_all_hakus_" + System.currentTimeMillis()
75+
val tunniste = ah.tunniste
5276
val resultF = syncAllOneHakuAtATime(tunniste)
5377
resultF.onComplete {
5478
case Success(_) =>
@@ -58,7 +82,7 @@ class YtlFetchActor(
5882
}
5983
sender ! tunniste
6084
case s: YtlSyncHaku =>
61-
val tunniste = System.currentTimeMillis() + "_manual_sync_for_haku_" + s.hakuOid
85+
val tunniste = s.tunniste
6286
val resultF = fetchAndHandleHakemuksetForSingleHakuF(hakuOid = s.hakuOid, s.tunniste)
6387
resultF.onComplete {
6488
case Success(_) =>
@@ -69,7 +93,7 @@ class YtlFetchActor(
6993
log.info(s"Ytl-sync käynnistetty haulle ${s.hakuOid} tunnisteella $tunniste")
7094
resultF pipeTo sender
7195
case s: YtlSyncSingle =>
72-
val tunniste = System.currentTimeMillis() + "_manual_sync_for_person_" + s.personOid
96+
val tunniste = s.tunniste
7397
val resultF = syncSingle(s.personOid)
7498
resultF.onComplete {
7599
case Success(_) =>

src/main/scala/fi/vm/sade/hakurekisteri/integration/ytl/YtlRerunPolicy.scala

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,35 +10,14 @@ import org.slf4j.LoggerFactory
1010
object YtlRerunPolicy {
1111
private val logger = LoggerFactory.getLogger(YtlRerunPolicy.getClass)
1212

13-
def rerunPolicy(expression: String, ytlIntegration: YtlIntegration): () => Unit = {
13+
def rerunPolicy(expression: String, ytlIntegrationActor: YtlFetchActorRef): () => Unit = {
1414
def nextTimestamp(expression: String, d: Date) = new SimpleDateFormat("dd.MM.yyyy HH:mm")
1515
.format(new CronExpression(expression).getNextValidTimeAfter(d))
1616
logger.info(s"First YTL fetch at '${nextTimestamp(expression, new Date())}'")
1717

1818
() => {
19-
val fetchStatus = ytlIntegration.AtomicStatus.getLastStatus
20-
val isRunning = fetchStatus.exists(_.inProgress)
21-
if (isRunning) {
22-
logger.info(
23-
s"Scheduled to make YTL fetch but fetch is already running! Next try will be ${nextTimestamp(expression, new Date())}"
24-
)
25-
} else {
26-
val isYesterday =
27-
fetchStatus.exists(status => !DateUtils.isSameDay(status.start, new Date()))
28-
val isSucceeded = !(fetchStatus.flatMap(_.hasFailures).getOrElse(true))
29-
if ((isSucceeded && isYesterday) || (!isSucceeded)) {
30-
logger.info(
31-
s"Starting new YTL fetch because: last run was yesterday=$isYesterday and that run succeeded=$isSucceeded"
32-
)
33-
ytlIntegration.syncAll()
34-
} else {
35-
logger.info(
36-
s"Scheduled to make YTL fetch but not running because: " +
37-
s"last run was yesterday=$isYesterday and that run succeeded=$isSucceeded! " +
38-
s"Next try will be ${nextTimestamp(expression, new Date())}"
39-
)
40-
}
41-
}
19+
logger.info("Calling YtlFetchActor to start nightly YLT sync")
20+
ytlIntegrationActor.actor ! YtlSyncAllHautNightly("Nightly YTL sync")
4221
}
4322
}
4423

src/main/scala/fi/vm/sade/hakurekisteri/web/integration/ytl/YtlResource.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class YtlResource(ytlIntegration: YtlIntegration, ytlFetchActor: YtlFetchActorRe
6262
shouldBeAdmin
6363
logger.info("Fetching YTL data for everybody")
6464
audit.log(auditUser, YTLSyncForAll, new Target.Builder().build, Changes.EMPTY)
65-
val tunniste = "manual_sync_for_all_hakus_" + System.currentTimeMillis()
65+
val tunniste = "manual_sync_for_all_hakus" + System.currentTimeMillis()
6666
ytlFetchActor.actor ! YtlSyncAllHaut(tunniste)
6767
Accepted(s"YTL sync started, tunniste $tunniste")
6868
}
@@ -83,12 +83,14 @@ class YtlResource(ytlIntegration: YtlIntegration, ytlFetchActor: YtlFetchActorRe
8383
logger.info(s"Fetching YTL data for person OID $personOid")
8484
audit.log(auditUser, YTLSyncForPerson, AuditUtil.targetFromParams(params).build, Changes.EMPTY)
8585
try {
86-
val resultF = ytlFetchActor.actor ? YtlSyncSingle(personOid, tunniste = "sync") recoverWith {
87-
case t: Throwable =>
88-
logger.error(t, s"Error while ytl-syncing $personOid")
89-
Future.failed(
90-
new RuntimeException(s"Error while ytl-syncing $personOid: ${t.getMessage}")
91-
)
86+
val resultF = ytlFetchActor.actor ? YtlSyncSingle(
87+
personOid,
88+
tunniste = s"manual_sync_for_person_${personOid}"
89+
) recoverWith { case t: Throwable =>
90+
logger.error(t, s"Error while ytl-syncing $personOid")
91+
Future.failed(
92+
new RuntimeException(s"Error while ytl-syncing $personOid: ${t.getMessage}")
93+
)
9294
}
9395
logger.info(s"Waiting for result for YTL data for person OID $personOid")
9496
val result = Await.result(resultF, 30.seconds)

src/main/scala/support/Integrations.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ class BaseIntegrations(rekisterit: Registers, system: ActorSystem, config: Confi
584584

585585
val ytlSyncAllEnabled = OphUrlProperties.getProperty("ytl.http.syncAllEnabled").toBoolean
586586
val syncAllCronExpression = OphUrlProperties.getProperty("ytl.http.syncAllCronJob")
587-
val rerunSync = YtlRerunPolicy.rerunPolicy(syncAllCronExpression, ytlIntegration)
587+
val rerunSync = YtlRerunPolicy.rerunPolicy(syncAllCronExpression, ytlFetchActor)
588588
if (ytlSyncAllEnabled) {
589589
quartzScheduler.scheduleJob(
590590
lambdaJob(rerunSync),

0 commit comments

Comments
 (0)