Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,24 @@ import app.softnetwork.elastic.licensing.metrics.{AggregatedMetrics, MetricsApi}

import java.util.concurrent.atomic.AtomicLong

/** Snapshot of runtime telemetry for inclusion in refresh requests. All counters are cumulative
* since server startup (stateless -- reset on restart).
/** Snapshot of runtime telemetry for inclusion in refresh requests.
*
* Backend note: the backend receives cumulative totals per instance_id. To compute per-interval
* deltas, the backend stores the previous snapshot and subtracts. When queries_total drops below
* the previous value for the same instance_id, this indicates a process restart -- start a new
* session, don't compute negative delta.
* `queriesTotal` (and the cluster gauges) are cumulative since server startup (stateless -- reset
* on restart). Backend note: the backend receives cumulative totals per instance_id. To compute
* per-interval deltas, the backend stores the previous snapshot and subtracts. When queries_total
* drops below the previous value for the same instance_id, this indicates a process restart --
* start a new session, don't compute negative delta.
*
* Story 15.3 -- `joinQueryCount` / `joinQueryByRow` are DIFFERENT: they are per-interval DELTAS
* (reset on each `collectAndReset` / `collectAndResetJoinCounts`, flushed on clean shutdown), NOT
* cumulative like `queriesTotal`. `joinQueryCount` is the total of executed classified cross-index
* JOINs in the interval; `joinQueryByRow` disaggregates it into the three execution rows and
* ALWAYS carries all three keys (`passthrough`, `cross_cluster`, `coordinator`) on every COLLECTED
* snapshot (a zero is an explicit `0`, never an absent entry) so the wire shape stays stable for
* the Story-15.2 daily ping. `joinQueryCount == passthrough + cross_cluster + coordinator`. The
* `Map.empty` default below applies only to the bare/no-op construction, never to a `collect()` /
* `collectAndReset()` result. Story 15.2 reads `joinQueryCount` into the `InstancePing` proto
* `join_query_count` field.
*/
case class TelemetryData(
queriesTotal: Long = 0,
Expand All @@ -35,7 +46,9 @@ case class TelemetryData(
clusterId: Option[String] = None,
clusterName: Option[String] = None,
clusterVersion: Option[String] = None,
aggregatedMetrics: AggregatedMetrics = AggregatedMetrics.empty
aggregatedMetrics: AggregatedMetrics = AggregatedMetrics.empty,
joinQueryCount: Long = 0,
joinQueryByRow: Map[String, Long] = Map.empty
)

/** Mutable telemetry collector with atomic counters.
Expand All @@ -52,6 +65,11 @@ case class TelemetryData(
class TelemetryCollector {

private val _queriesTotal = new AtomicLong(0L)
// Story 15.3 -- per-interval JOIN-query DELTA buckets, one per execution row. Lock-free
// (AtomicLong); independent so the increment never contends with collect/collectAndReset.
private val _joinPassthrough = new AtomicLong(0L)
private val _joinCrossCluster = new AtomicLong(0L)
private val _joinCoordinator = new AtomicLong(0L)
private val clusterInfoLock = new AnyRef
private var _mvsActive: Int = 0
private var _clustersConnected: Int = 0
Expand All @@ -63,6 +81,31 @@ class TelemetryCollector {

def incrementQueries(): Unit = { val _ = _queriesTotal.incrementAndGet() }

/** Record one classified, executed cross-index JOIN, attributed to its execution row (Story
* 15.3). Real-time, lock-free (AtomicLong); the three buckets are independent so the increment
* never contends with collect/collectAndReset. Mirrors `incrementQueries`, but keyed by row and
* resettable (per-interval delta, NOT cumulative).
*/
def incrementJoin(row: TelemetryCollector.JoinRow): Unit = {
val _ = (row match {
case TelemetryCollector.JoinRow.Passthrough => _joinPassthrough
case TelemetryCollector.JoinRow.CrossCluster => _joinCrossCluster
case TelemetryCollector.JoinRow.Coordinator => _joinCoordinator
}).incrementAndGet()
}

/** Snapshot+reset ONLY the JOIN delta (read+zero with `getAndSet`). Used by the scheduled tick
* when `config.telemetryEnabled` is false -- so the buckets still reset (AC 5a) -- and by the
* clean-shutdown flush (AC 5b), without going through the full metrics `collectAndReset` (which
* the disabled branch deliberately skips). Returns (total, byRow-with-all-3-keys, even when 0).
*/
def collectAndResetJoinCounts(): (Long, Map[String, Long]) = {
val p = _joinPassthrough.getAndSet(0L)
val x = _joinCrossCluster.getAndSet(0L)
val c = _joinCoordinator.getAndSet(0L)
(p + x + c, Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c))
}

def setMvsActive(count: Int): Unit = clusterInfoLock.synchronized { _mvsActive = count }

def setClustersConnected(count: Int): Unit = clusterInfoLock.synchronized {
Expand All @@ -82,41 +125,79 @@ class TelemetryCollector {
// --- Read methods (called by AutoRefreshStrategy.doScheduleRefresh) ---

def collect(metrics: MetricsApi): TelemetryData = clusterInfoLock.synchronized {
// JOIN buckets: read with .get() -- collect NEVER resets (interval read without flush).
val p = _joinPassthrough.get()
val x = _joinCrossCluster.get()
val c = _joinCoordinator.get()
TelemetryData(
queriesTotal = _queriesTotal.get(),
mvsActive = _mvsActive,
clustersConnected = _clustersConnected,
clusterId = _clusterId,
clusterName = _clusterName,
clusterVersion = _clusterVersion,
aggregatedMetrics = metrics.getAggregatedMetrics
aggregatedMetrics = metrics.getAggregatedMetrics,
joinQueryCount = p + x + c,
joinQueryByRow = Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c)
)
}

/** Collect a snapshot and atomically reset the metrics, ensuring no operations recorded between
* snapshot and reset are lost.
* snapshot and reset are lost. The JOIN buckets are per-interval deltas -- they are read+zeroed
* here (getAndSet) so the next interval starts from zero (Story 15.3 AC 5).
*/
def collectAndReset(metrics: MetricsApi): TelemetryData = clusterInfoLock.synchronized {
// JOIN buckets: read+zero with .getAndSet(0L) -- per-interval delta; this IS the flush.
val p = _joinPassthrough.getAndSet(0L)
val x = _joinCrossCluster.getAndSet(0L)
val c = _joinCoordinator.getAndSet(0L)
TelemetryData(
queriesTotal = _queriesTotal.get(),
mvsActive = _mvsActive,
clustersConnected = _clustersConnected,
clusterId = _clusterId,
clusterName = _clusterName,
clusterVersion = _clusterVersion,
aggregatedMetrics = metrics.collectAndResetAggregatedMetrics
aggregatedMetrics = metrics.collectAndResetAggregatedMetrics,
joinQueryCount = p + x + c,
joinQueryByRow = Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c)
)
}
}

object TelemetryCollector {

/** Execution row a classified cross-index JOIN is attributed to (Story 15.3). Each executed JOIN
* increments exactly one row bucket (no double-counting across rows).
*/
sealed trait JoinRow
object JoinRow {

/** Row 1 -- a single downstream owns the whole join (+ write); sidecar local JOIN; federation
* `WriteWithJoinPassthroughResult`.
*/
case object Passthrough extends JoinRow

/** Row 2 -- federation cross-cluster conveyor (single source, different target;
* `WriteWithJoinCrossClusterResult`).
*/
case object CrossCluster extends JoinRow

/** Row 3 -- federation multi-source coordinator (`WriteWithJoinCoordinatorResult`; also the
* cross-cluster SELECT `FederatedResult` the coordinator runs).
*/
case object Coordinator extends JoinRow
}

/** Default collector returning zero-valued data. Used when telemetry is disabled or no runtime
* wires a real collector. Write methods are no-ops to prevent accidental mutation of the shared
* singleton.
*/
val Noop: TelemetryCollector = new TelemetryCollector {
override def incrementQueries(): Unit = ()
override def incrementJoin(row: JoinRow): Unit = ()
override def collectAndResetJoinCounts(): (Long, Map[String, Long]) =
(0L, Map("passthrough" -> 0L, "cross_cluster" -> 0L, "coordinator" -> 0L))
override def setMvsActive(count: Int): Unit = ()
override def setClustersConnected(count: Int): Unit = ()
override def setClusterInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,93 @@ class TelemetryCollectorSpec extends AnyFlatSpec with Matchers {
val data = collector.collect(noop)
data.queriesTotal shouldBe 10000
}

// --- Story 15.3: JOIN query counter ---

"TelemetryCollector" should "increment the JOIN counter per row and sum to the total" in {
val collector = new TelemetryCollector
collector.incrementJoin(TelemetryCollector.JoinRow.Passthrough)
collector.incrementJoin(TelemetryCollector.JoinRow.CrossCluster)
collector.incrementJoin(TelemetryCollector.JoinRow.CrossCluster)
collector.incrementJoin(TelemetryCollector.JoinRow.Coordinator)
val data = collector.collect(noop)
data.joinQueryByRow("passthrough") shouldBe 1
data.joinQueryByRow("cross_cluster") shouldBe 2
data.joinQueryByRow("coordinator") shouldBe 1
data.joinQueryCount shouldBe 4
}

it should "ALWAYS carry all three JOIN row keys even when their buckets are 0" in {
val collector = new TelemetryCollector
val data = collector.collect(noop)
data.joinQueryByRow.keySet shouldBe Set("passthrough", "cross_cluster", "coordinator")
data.joinQueryByRow.values.toSet shouldBe Set(0L)
data.joinQueryCount shouldBe 0
// and the same on a reset read
val reset = collector.collectAndReset(noop)
reset.joinQueryByRow.keySet shouldBe Set("passthrough", "cross_cluster", "coordinator")
reset.joinQueryCount shouldBe 0
}

it should "NOT reset the JOIN counters on collect (interval read without flush)" in {
val collector = new TelemetryCollector
collector.incrementJoin(TelemetryCollector.JoinRow.Passthrough)
collector.collect(noop).joinQueryCount shouldBe 1
collector.collect(noop).joinQueryCount shouldBe 1 // still 1 -- collect never resets
}

it should "reset the JOIN counters on collectAndReset (per-interval delta; shutdown flush)" in {
val collector = new TelemetryCollector
collector.incrementJoin(TelemetryCollector.JoinRow.Coordinator)
collector.collectAndReset(noop).joinQueryCount shouldBe 1
collector.collectAndReset(noop).joinQueryCount shouldBe 0 // delta reset after the interval
}

it should "snapshot+reset only the JOIN delta via collectAndResetJoinCounts (all 3 keys)" in {
val collector = new TelemetryCollector
collector.incrementJoin(TelemetryCollector.JoinRow.Passthrough)
collector.incrementJoin(TelemetryCollector.JoinRow.Coordinator)
val (total, byRow) = collector.collectAndResetJoinCounts()
total shouldBe 2
byRow shouldBe Map("passthrough" -> 1L, "cross_cluster" -> 0L, "coordinator" -> 1L)
// reset: a second snapshot returns zeros, all 3 keys present
val (total2, byRow2) = collector.collectAndResetJoinCounts()
total2 shouldBe 0
byRow2 shouldBe Map("passthrough" -> 0L, "cross_cluster" -> 0L, "coordinator" -> 0L)
// and queriesTotal is NOT affected by the JOIN flush
collector.incrementQueries()
collector.collect(noop).queriesTotal shouldBe 1
}

it should "increment the JOIN counters concurrently without loss" in {
val collector = new TelemetryCollector
val threads = (1 to 10).map { _ =>
new Thread(() => {
(1 to 1000).foreach { _ =>
collector.incrementJoin(TelemetryCollector.JoinRow.Passthrough)
}
})
}
threads.foreach(_.start())
threads.foreach(_.join())
collector.collect(noop).joinQueryCount shouldBe 10000
}

it should "record ONLY integer JOIN counts (no SQL text / identifiers -- AC 7)" in {
val collector = new TelemetryCollector
collector.incrementJoin(TelemetryCollector.JoinRow.CrossCluster)
val data = collector.collect(noop)
// The carrier exposes a Long total and a Map[String, Long] -- no String payload derived from
// any SQL. The only String keys are the fixed row labels, never query-derived.
data.joinQueryByRow.keySet shouldBe Set("passthrough", "cross_cluster", "coordinator")
data.joinQueryCount shouldBe 1
}

"TelemetryCollector.Noop" should "no-op incrementJoin and zero-out collectAndResetJoinCounts" in {
TelemetryCollector.Noop.incrementJoin(TelemetryCollector.JoinRow.Coordinator)
TelemetryCollector.Noop.collect(noop).joinQueryCount shouldBe 0
val (total, byRow) = TelemetryCollector.Noop.collectAndResetJoinCounts()
total shouldBe 0
byRow shouldBe Map("passthrough" -> 0L, "cross_cluster" -> 0L, "coordinator" -> 0L)
}
}
Loading