Skip to content

Commit f1a2c02

Browse files
committed
Add pg indexes during startup
sync job is now resumable
1 parent 4a71b88 commit f1a2c02

File tree

7 files changed

+173
-78
lines changed

7 files changed

+173
-78
lines changed

daikoku/app/fr/maif/daikoku/controllers/JobsController.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class JobsController(
4343
TenantHelper.withTenant(ctx, env) { tenant =>
4444
ctx.getQueryString("access_key").orElse(ctx.getQueryString("key")) match {
4545
case Some(key) if env.config.rotationJobKey.contains(key) =>
46-
rotationJob.run(tenant = tenant, parallelism = parallelism)
46+
rotationJob.run(tenant = tenant)
4747
.map(_ => Ok(Json.obj("done" -> true)))
4848
case _ => AppError.Unauthorized.renderF()
4949
}
@@ -55,7 +55,7 @@ class JobsController(
5555
TenantHelper.withTenant(ctx, env) { tenant =>
5656
ctx.getQueryString("access_key").orElse(ctx.getQueryString("key")) match {
5757
case Some(key) if env.config.verifierJobKey.contains(key) =>
58-
verifierJob.run(tenant = tenant, parallelism = parallelism)
58+
verifierJob.run(tenant = tenant)
5959
.map(_ => Ok(Json.obj("done" -> true)))
6060
case _ => AppError.Unauthorized.renderF()
6161
}

daikoku/app/fr/maif/daikoku/domain/entities.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ case class JobInformation(
185185
lockedBy: String,
186186
lockedAt: DateTime,
187187
expiresAt: DateTime,
188-
cursor: String,
188+
cursor: Long,
189189
batchSize: Int = 500,
190190
totalProcessed: BigDecimal = BigDecimal(0),
191191
startedAt: DateTime,

daikoku/app/fr/maif/daikoku/domain/json.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4276,7 +4276,7 @@ object json {
42764276
lockedBy = (json \ "lockedBy").as[String],
42774277
lockedAt = (json \ "lockedAt").as(using DateTimeFormat),
42784278
expiresAt = (json \ "expiresAt").as(using DateTimeFormat),
4279-
cursor = (json \ "cursor").as[String],
4279+
cursor = (json \ "cursor").as[Long],
42804280
batchSize = (json \ "batchSize").as[Int],
42814281
totalProcessed = (json \ "totalProcessed").as[BigDecimal],
42824282
startedAt = (json \ "startedAt").as(using DateTimeFormat),

daikoku/app/fr/maif/daikoku/jobs/ApiKeySecretRotationJob.scala

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,23 @@ import play.api.Logger
1313
import play.api.i18n.MessagesApi
1414
import play.api.libs.json.{JsArray, JsNull, JsObject, JsValue, Json}
1515
import cats.implicits.*
16-
import cron4s.Cron
16+
import cron4s._
17+
import cron4s.lib.joda._
18+
import org.apache.pekko.actor.Cancellable
1719

20+
import java.util.concurrent.atomic.AtomicReference
1821
import scala.concurrent.{ExecutionContext, Future}
22+
import scala.concurrent.duration.*
1923

2024
class ApiKeySecretRotationJob(
2125
client: OtoroshiClient,
2226
env: Env,
2327
translator: Translator,
2428
messagesApi: MessagesApi
2529
) {
26-
private val rotationlogger = Logger("APIkey-Rotation-Synchronizer")
30+
private val logger = Logger("APIkey-Rotation-Synchronizer")
31+
32+
private val ref = new AtomicReference[Cancellable]()
2733

2834
implicit val ec: ExecutionContext = env.defaultExecutionContext
2935
implicit val mat: Materializer = env.defaultMaterializer
@@ -32,7 +38,7 @@ class ApiKeySecretRotationJob(
3238
implicit val tr: Translator = translator
3339

3440
def start(): Unit = {
35-
val syncAvalaible = env.config.rotationJobEnabled && env.config.otoroshiSyncMaster //FIXME: use also otoroshiSyncMaster ???
41+
val syncAvalaible = env.config.rotationJobEnabled && env.config.otoroshiSyncMaster //FIXME: use also otoroshiSyncMaster ???
3642

3743
if (
3844
syncAvalaible && ref.get() == null
@@ -51,9 +57,9 @@ class ApiKeySecretRotationJob(
5157
logger.info(s"next cron run scheduled at $nextRun (in ${delay.toSeconds}s)")
5258

5359
ref.set(
54-
env.defaultActorSystem.scheduler.scheduleOnce(delay) { _ =>
60+
env.defaultActorSystem.scheduler.scheduleOnce(delay) {
5561
logger.info(s"cron triggered at $now")
56-
env.dataStore.tenantRepo
62+
val _ = env.dataStore.tenantRepo
5763
.findAllNotDeleted()
5864
.flatMap(tenants =>
5965
Future.sequence(
@@ -67,6 +73,7 @@ class ApiKeySecretRotationJob(
6773
.andThen { case _ =>
6874
scheduleNext()
6975
}
76+
()
7077
}
7178
)
7279

@@ -80,7 +87,7 @@ class ApiKeySecretRotationJob(
8087
case SchedulingMode.Interval =>
8188
ref.set(
8289
env.defaultActorSystem.scheduler
83-
.scheduleAtFixedRate(10.seconds, env.config.rotationJobInterval) { _ =>
90+
.scheduleAtFixedRate(10.seconds, env.config.rotationJobInterval) { () =>
8491
env.dataStore.tenantRepo
8592
.findAllNotDeleted()
8693
.flatMap(tenants =>
@@ -97,7 +104,7 @@ class ApiKeySecretRotationJob(
97104
}
98105
}
99106
}
100-
107+
101108
def stop(): Unit = {
102109
Option(ref.get()).foreach(_.cancel())
103110
}
@@ -242,7 +249,7 @@ class ApiKeySecretRotationJob(
242249
if (
243250
!pendingRotation && otoroshiNextSecret.isDefined && otoroshiActualSecret == daikokuActualSecret
244251
) {
245-
rotationlogger.info(
252+
logger.info(
246253
s"rotation state updated to Pending for ${apk.clientName}"
247254
)
248255
newSubscription = subscription
@@ -274,7 +281,7 @@ class ApiKeySecretRotationJob(
274281
)
275282

276283
} else if (pendingRotation && otoroshiNextSecret.isEmpty) {
277-
rotationlogger.info(
284+
logger.info(
278285
s"rotation state updated to Ended for ${apk.clientName}"
279286
)
280287
notification = Notification(
@@ -342,16 +349,16 @@ class ApiKeySecretRotationJob(
342349
.save(notification)
343350
} yield ()
344351
case (_, _) =>
345-
rotationlogger.info(s"no need to update rotation for ${apk.clientName}")
352+
logger.info(s"no need to update rotation for ${apk.clientName}")
346353
}
347354
}
348355
}
349356
)
350357
}
351358
}
352-
359+
353360
def run(entryPoint: ApiId | UsagePlanId | ApiSubscriptionId | SyncAllSubscription = SyncAllSubscription(), tenant: Tenant): Future[Unit] = {
354-
rotationlogger.info(s"run apikey rotation check with entry point as $entryPoint")
361+
logger.info(s"run apikey rotation check with entry point as $entryPoint")
355362

356363
val query = entryPoint match {
357364
case apiId: ApiId => Json.obj("api" -> apiId.asJson)
@@ -372,7 +379,7 @@ class ApiKeySecretRotationJob(
372379
"status" -> JobStatus.Running.value))
373380
.flatMap {
374381
case Some(_) =>
375-
rotationlogger.info("can't run another ApiKeyRotationVerifier, already one is running")
382+
logger.info("can't run another ApiKeyRotationVerifier, already one is running")
376383
Future.successful(())
377384
case None =>
378385
val jobInfo = JobInformation(
@@ -382,20 +389,20 @@ class ApiKeySecretRotationJob(
382389
lockedBy = "apikey-rotation-verifier-job",
383390
lockedAt = now,
384391
expiresAt = now.plusHours(1),
385-
cursor = "",
392+
cursor = 0,
386393
startedAt = now,
387394
lastBatchAt = now,
388395
status = JobStatus.Running
389396
)
390397
jobRepo.save(jobInfo).flatMap { _ =>
391398
checkRotation(query)
392399
.flatMap { _ =>
393-
rotationlogger.info("verify rotation ended")
400+
logger.info("verify rotation ended")
394401
jobRepo.save(jobInfo.copy(status = JobStatus.Completed, lastBatchAt = DateTime.now()))
395402
.map(_ => ())
396403
}
397404
.recoverWith { case e =>
398-
rotationlogger.error(s"verify rotation failed: ${e.getMessage}", e)
405+
logger.error(s"verify rotation failed: ${e.getMessage}", e)
399406
jobRepo.save(jobInfo.copy(status = JobStatus.Failed, lastBatchAt = DateTime.now()))
400407
.map(_ => ())
401408
}

daikoku/app/fr/maif/daikoku/jobs/OtoroshiEntitiesVerifierJob.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fr.maif.daikoku.jobs
22

3-
import cron4s.Cron
3+
import cron4s._
4+
import cron4s.lib.joda._
45
import fr.maif.daikoku.audit.JobEvent
56
import fr.maif.daikoku.domain.NotificationAction.{OtoroshiSyncApiError, OtoroshiSyncSubscriptionError}
67
import fr.maif.daikoku.domain.{Api, ApiId, ApiSubscriptionId, AuthorizedEntities, ConsoleMailerSettings, DatastoreId, JobInformation, JobName, JobStatus, Notification, NotificationAction, NotificationId, NotificationStatus, NotificationType, OtoroshiSettings, SchedulingMode, OtoroshiSyncNotificationAction, TeamId, Tenant, TenantId, UsagePlanId, User, UserId}
@@ -17,6 +18,7 @@ import play.api.libs.json.*
1718

1819
import java.util.concurrent.atomic.AtomicReference
1920
import scala.concurrent.{ExecutionContext, Future}
21+
import scala.concurrent.duration.*
2022
import scala.util.{Failure, Success}
2123

2224
class OtoroshiEntitiesVerifierJob(
@@ -26,6 +28,7 @@ class OtoroshiEntitiesVerifierJob(
2628
messagesApi: MessagesApi
2729
) {
2830
private val logger = Logger("otoroshi-entities-verifier")
31+
private val ref = new AtomicReference[Cancellable]()
2932

3033
implicit val ec: ExecutionContext = env.defaultExecutionContext
3134
implicit val mat: Materializer = env.defaultMaterializer
@@ -34,7 +37,7 @@ class OtoroshiEntitiesVerifierJob(
3437
implicit val tr: Translator = translator
3538

3639
def start(): Unit = {
37-
val syncAvalaible = env.config.verifierJobEnabled && env.config.otoroshiSyncMaster //FIXME: use also otoroshiSyncMaster ???
40+
val syncAvalaible = env.config.verifierJobEnabled && env.config.otoroshiSyncMaster //FIXME: use also otoroshiSyncMaster ???
3841

3942
if (
4043
syncAvalaible && ref.get() == null
@@ -53,9 +56,9 @@ class OtoroshiEntitiesVerifierJob(
5356
logger.info(s"next cron run scheduled at $nextRun (in ${delay.toSeconds}s)")
5457

5558
ref.set(
56-
env.defaultActorSystem.scheduler.scheduleOnce(delay) { _ =>
59+
env.defaultActorSystem.scheduler.scheduleOnce(delay) { () =>
5760
logger.info(s"cron triggered at $now")
58-
env.dataStore.tenantRepo
61+
val _ = env.dataStore.tenantRepo
5962
.findAllNotDeleted()
6063
.flatMap(tenants =>
6164
Future.sequence(
@@ -69,6 +72,7 @@ class OtoroshiEntitiesVerifierJob(
6972
.andThen { case _ =>
7073
scheduleNext()
7174
}
75+
()
7276
}
7377
)
7478

@@ -82,7 +86,7 @@ class OtoroshiEntitiesVerifierJob(
8286
case SchedulingMode.Interval =>
8387
ref.set(
8488
env.defaultActorSystem.scheduler
85-
.scheduleAtFixedRate(10.seconds, env.config.verifierJobInterval) { _ =>
89+
.scheduleAtFixedRate(10.seconds, env.config.verifierJobInterval) { () =>
8690
env.dataStore.tenantRepo
8791
.findAllNotDeleted()
8892
.flatMap(tenants =>
@@ -99,7 +103,7 @@ class OtoroshiEntitiesVerifierJob(
99103
}
100104
}
101105
}
102-
106+
103107
def stop(): Unit = {
104108
Option(ref.get()).foreach(_.cancel())
105109
}
@@ -259,7 +263,7 @@ class OtoroshiEntitiesVerifierJob(
259263
lockedBy = "otoroshi-entities-verifier-job",
260264
lockedAt = now,
261265
expiresAt = now.plusHours(1),
262-
cursor = "",
266+
cursor = 0,
263267
startedAt = now,
264268
lastBatchAt = now,
265269
status = JobStatus.Running

0 commit comments

Comments
 (0)