diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala index 371529de..16712dab 100644 --- a/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala @@ -20,13 +20,24 @@ import app.softnetwork.elastic.licensing.metrics.{AggregatedMetrics, MetricsApi} import java.util.concurrent.atomic.AtomicLong -/** Snapshot of runtime telemetry for inclusion in refresh requests. All counters are cumulative - * since server startup (stateless -- reset on restart). +/** Snapshot of runtime telemetry for inclusion in refresh requests. * - * Backend note: the backend receives cumulative totals per instance_id. To compute per-interval - * deltas, the backend stores the previous snapshot and subtracts. When queries_total drops below - * the previous value for the same instance_id, this indicates a process restart -- start a new - * session, don't compute negative delta. + * `queriesTotal` (and the cluster gauges) are cumulative since server startup (stateless -- reset + * on restart). Backend note: the backend receives cumulative totals per instance_id. To compute + * per-interval deltas, the backend stores the previous snapshot and subtracts. When queries_total + * drops below the previous value for the same instance_id, this indicates a process restart -- + * start a new session, don't compute negative delta. + * + * Story 15.3 -- `joinQueryCount` / `joinQueryByRow` are DIFFERENT: they are per-interval DELTAS + * (reset on each `collectAndReset` / `collectAndResetJoinCounts`, flushed on clean shutdown), NOT + * cumulative like `queriesTotal`. `joinQueryCount` is the total of executed classified cross-index + * JOINs in the interval; `joinQueryByRow` disaggregates it into the three execution rows and + * ALWAYS carries all three keys (`passthrough`, `cross_cluster`, `coordinator`) on every COLLECTED + * snapshot (a zero is an explicit `0`, never an absent entry) so the wire shape stays stable for + * the Story-15.2 daily ping. `joinQueryCount == passthrough + cross_cluster + coordinator`. The + * `Map.empty` default below applies only to the bare/no-op construction, never to a `collect()` / + * `collectAndReset()` result. Story 15.2 reads `joinQueryCount` into the `InstancePing` proto + * `join_query_count` field. */ case class TelemetryData( queriesTotal: Long = 0, @@ -35,7 +46,9 @@ case class TelemetryData( clusterId: Option[String] = None, clusterName: Option[String] = None, clusterVersion: Option[String] = None, - aggregatedMetrics: AggregatedMetrics = AggregatedMetrics.empty + aggregatedMetrics: AggregatedMetrics = AggregatedMetrics.empty, + joinQueryCount: Long = 0, + joinQueryByRow: Map[String, Long] = Map.empty ) /** Mutable telemetry collector with atomic counters. @@ -52,6 +65,11 @@ case class TelemetryData( class TelemetryCollector { private val _queriesTotal = new AtomicLong(0L) + // Story 15.3 -- per-interval JOIN-query DELTA buckets, one per execution row. Lock-free + // (AtomicLong); independent so the increment never contends with collect/collectAndReset. + private val _joinPassthrough = new AtomicLong(0L) + private val _joinCrossCluster = new AtomicLong(0L) + private val _joinCoordinator = new AtomicLong(0L) private val clusterInfoLock = new AnyRef private var _mvsActive: Int = 0 private var _clustersConnected: Int = 0 @@ -63,6 +81,31 @@ class TelemetryCollector { def incrementQueries(): Unit = { val _ = _queriesTotal.incrementAndGet() } + /** Record one classified, executed cross-index JOIN, attributed to its execution row (Story + * 15.3). Real-time, lock-free (AtomicLong); the three buckets are independent so the increment + * never contends with collect/collectAndReset. Mirrors `incrementQueries`, but keyed by row and + * resettable (per-interval delta, NOT cumulative). + */ + def incrementJoin(row: TelemetryCollector.JoinRow): Unit = { + val _ = (row match { + case TelemetryCollector.JoinRow.Passthrough => _joinPassthrough + case TelemetryCollector.JoinRow.CrossCluster => _joinCrossCluster + case TelemetryCollector.JoinRow.Coordinator => _joinCoordinator + }).incrementAndGet() + } + + /** Snapshot+reset ONLY the JOIN delta (read+zero with `getAndSet`). Used by the scheduled tick + * when `config.telemetryEnabled` is false -- so the buckets still reset (AC 5a) -- and by the + * clean-shutdown flush (AC 5b), without going through the full metrics `collectAndReset` (which + * the disabled branch deliberately skips). Returns (total, byRow-with-all-3-keys, even when 0). + */ + def collectAndResetJoinCounts(): (Long, Map[String, Long]) = { + val p = _joinPassthrough.getAndSet(0L) + val x = _joinCrossCluster.getAndSet(0L) + val c = _joinCoordinator.getAndSet(0L) + (p + x + c, Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c)) + } + def setMvsActive(count: Int): Unit = clusterInfoLock.synchronized { _mvsActive = count } def setClustersConnected(count: Int): Unit = clusterInfoLock.synchronized { @@ -82,6 +125,10 @@ class TelemetryCollector { // --- Read methods (called by AutoRefreshStrategy.doScheduleRefresh) --- def collect(metrics: MetricsApi): TelemetryData = clusterInfoLock.synchronized { + // JOIN buckets: read with .get() -- collect NEVER resets (interval read without flush). + val p = _joinPassthrough.get() + val x = _joinCrossCluster.get() + val c = _joinCoordinator.get() TelemetryData( queriesTotal = _queriesTotal.get(), mvsActive = _mvsActive, @@ -89,14 +136,21 @@ class TelemetryCollector { clusterId = _clusterId, clusterName = _clusterName, clusterVersion = _clusterVersion, - aggregatedMetrics = metrics.getAggregatedMetrics + aggregatedMetrics = metrics.getAggregatedMetrics, + joinQueryCount = p + x + c, + joinQueryByRow = Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c) ) } /** Collect a snapshot and atomically reset the metrics, ensuring no operations recorded between - * snapshot and reset are lost. + * snapshot and reset are lost. The JOIN buckets are per-interval deltas -- they are read+zeroed + * here (getAndSet) so the next interval starts from zero (Story 15.3 AC 5). */ def collectAndReset(metrics: MetricsApi): TelemetryData = clusterInfoLock.synchronized { + // JOIN buckets: read+zero with .getAndSet(0L) -- per-interval delta; this IS the flush. + val p = _joinPassthrough.getAndSet(0L) + val x = _joinCrossCluster.getAndSet(0L) + val c = _joinCoordinator.getAndSet(0L) TelemetryData( queriesTotal = _queriesTotal.get(), mvsActive = _mvsActive, @@ -104,19 +158,46 @@ class TelemetryCollector { clusterId = _clusterId, clusterName = _clusterName, clusterVersion = _clusterVersion, - aggregatedMetrics = metrics.collectAndResetAggregatedMetrics + aggregatedMetrics = metrics.collectAndResetAggregatedMetrics, + joinQueryCount = p + x + c, + joinQueryByRow = Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c) ) } } object TelemetryCollector { + /** Execution row a classified cross-index JOIN is attributed to (Story 15.3). Each executed JOIN + * increments exactly one row bucket (no double-counting across rows). + */ + sealed trait JoinRow + object JoinRow { + + /** Row 1 -- a single downstream owns the whole join (+ write); sidecar local JOIN; federation + * `WriteWithJoinPassthroughResult`. + */ + case object Passthrough extends JoinRow + + /** Row 2 -- federation cross-cluster conveyor (single source, different target; + * `WriteWithJoinCrossClusterResult`). + */ + case object CrossCluster extends JoinRow + + /** Row 3 -- federation multi-source coordinator (`WriteWithJoinCoordinatorResult`; also the + * cross-cluster SELECT `FederatedResult` the coordinator runs). + */ + case object Coordinator extends JoinRow + } + /** Default collector returning zero-valued data. Used when telemetry is disabled or no runtime * wires a real collector. Write methods are no-ops to prevent accidental mutation of the shared * singleton. */ val Noop: TelemetryCollector = new TelemetryCollector { override def incrementQueries(): Unit = () + override def incrementJoin(row: JoinRow): Unit = () + override def collectAndResetJoinCounts(): (Long, Map[String, Long]) = + (0L, Map("passthrough" -> 0L, "cross_cluster" -> 0L, "coordinator" -> 0L)) override def setMvsActive(count: Int): Unit = () override def setClustersConnected(count: Int): Unit = () override def setClusterInfo( diff --git a/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala b/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala index b2fb5029..e4c5647a 100644 --- a/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala +++ b/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala @@ -149,4 +149,93 @@ class TelemetryCollectorSpec extends AnyFlatSpec with Matchers { val data = collector.collect(noop) data.queriesTotal shouldBe 10000 } + + // --- Story 15.3: JOIN query counter --- + + "TelemetryCollector" should "increment the JOIN counter per row and sum to the total" in { + val collector = new TelemetryCollector + collector.incrementJoin(TelemetryCollector.JoinRow.Passthrough) + collector.incrementJoin(TelemetryCollector.JoinRow.CrossCluster) + collector.incrementJoin(TelemetryCollector.JoinRow.CrossCluster) + collector.incrementJoin(TelemetryCollector.JoinRow.Coordinator) + val data = collector.collect(noop) + data.joinQueryByRow("passthrough") shouldBe 1 + data.joinQueryByRow("cross_cluster") shouldBe 2 + data.joinQueryByRow("coordinator") shouldBe 1 + data.joinQueryCount shouldBe 4 + } + + it should "ALWAYS carry all three JOIN row keys even when their buckets are 0" in { + val collector = new TelemetryCollector + val data = collector.collect(noop) + data.joinQueryByRow.keySet shouldBe Set("passthrough", "cross_cluster", "coordinator") + data.joinQueryByRow.values.toSet shouldBe Set(0L) + data.joinQueryCount shouldBe 0 + // and the same on a reset read + val reset = collector.collectAndReset(noop) + reset.joinQueryByRow.keySet shouldBe Set("passthrough", "cross_cluster", "coordinator") + reset.joinQueryCount shouldBe 0 + } + + it should "NOT reset the JOIN counters on collect (interval read without flush)" in { + val collector = new TelemetryCollector + collector.incrementJoin(TelemetryCollector.JoinRow.Passthrough) + collector.collect(noop).joinQueryCount shouldBe 1 + collector.collect(noop).joinQueryCount shouldBe 1 // still 1 -- collect never resets + } + + it should "reset the JOIN counters on collectAndReset (per-interval delta; shutdown flush)" in { + val collector = new TelemetryCollector + collector.incrementJoin(TelemetryCollector.JoinRow.Coordinator) + collector.collectAndReset(noop).joinQueryCount shouldBe 1 + collector.collectAndReset(noop).joinQueryCount shouldBe 0 // delta reset after the interval + } + + it should "snapshot+reset only the JOIN delta via collectAndResetJoinCounts (all 3 keys)" in { + val collector = new TelemetryCollector + collector.incrementJoin(TelemetryCollector.JoinRow.Passthrough) + collector.incrementJoin(TelemetryCollector.JoinRow.Coordinator) + val (total, byRow) = collector.collectAndResetJoinCounts() + total shouldBe 2 + byRow shouldBe Map("passthrough" -> 1L, "cross_cluster" -> 0L, "coordinator" -> 1L) + // reset: a second snapshot returns zeros, all 3 keys present + val (total2, byRow2) = collector.collectAndResetJoinCounts() + total2 shouldBe 0 + byRow2 shouldBe Map("passthrough" -> 0L, "cross_cluster" -> 0L, "coordinator" -> 0L) + // and queriesTotal is NOT affected by the JOIN flush + collector.incrementQueries() + collector.collect(noop).queriesTotal shouldBe 1 + } + + it should "increment the JOIN counters concurrently without loss" in { + val collector = new TelemetryCollector + val threads = (1 to 10).map { _ => + new Thread(() => { + (1 to 1000).foreach { _ => + collector.incrementJoin(TelemetryCollector.JoinRow.Passthrough) + } + }) + } + threads.foreach(_.start()) + threads.foreach(_.join()) + collector.collect(noop).joinQueryCount shouldBe 10000 + } + + it should "record ONLY integer JOIN counts (no SQL text / identifiers -- AC 7)" in { + val collector = new TelemetryCollector + collector.incrementJoin(TelemetryCollector.JoinRow.CrossCluster) + val data = collector.collect(noop) + // The carrier exposes a Long total and a Map[String, Long] -- no String payload derived from + // any SQL. The only String keys are the fixed row labels, never query-derived. + data.joinQueryByRow.keySet shouldBe Set("passthrough", "cross_cluster", "coordinator") + data.joinQueryCount shouldBe 1 + } + + "TelemetryCollector.Noop" should "no-op incrementJoin and zero-out collectAndResetJoinCounts" in { + TelemetryCollector.Noop.incrementJoin(TelemetryCollector.JoinRow.Coordinator) + TelemetryCollector.Noop.collect(noop).joinQueryCount shouldBe 0 + val (total, byRow) = TelemetryCollector.Noop.collectAndResetJoinCounts() + total shouldBe 0 + byRow shouldBe Map("passthrough" -> 0L, "cross_cluster" -> 0L, "coordinator" -> 0L) + } }