diff --git a/core/src/main/scala/app/softnetwork/elastic/client/extensions/CoreDqlExtension.scala b/core/src/main/scala/app/softnetwork/elastic/client/extensions/CoreDqlExtension.scala index 264c36c2..60e8161e 100644 --- a/core/src/main/scala/app/softnetwork/elastic/client/extensions/CoreDqlExtension.scala +++ b/core/src/main/scala/app/softnetwork/elastic/client/extensions/CoreDqlExtension.scala @@ -60,6 +60,15 @@ class CoreDqlExtension extends ExtensionSpi { protected var licenseManager: Option[LicenseManager] = None protected val logger = org.slf4j.LoggerFactory.getLogger(getClass) + /** Story P0.6 -- the cap-hit collector captured at `initialize`. It is the SAME per-strategy + * `TelemetryCollector` that `GatewayApi.run` increments for `queriesTotal`, so a `QueryResults` + * cap-hit recorded here rides the existing `InstancePing` delta to the license-server. Defaults + * to `Noop` until initialized (or if no strategy carries a real collector). The closed + * `EnforcedDqlExtension` (priority < 100) extends this class and inherits the increment via + * the shared `capOrReject`, so the paid enforcement path counts cap-hits too. + */ + protected var capHitCollector: TelemetryCollector = TelemetryCollector.Noop + override def extensionId: String = "core-dql" override def extensionName: String = "Core DQL Quotas" override def version: String = "0.1.0" @@ -70,6 +79,7 @@ class CoreDqlExtension extends ExtensionSpi { ): Either[String, Unit] = { logger.info("๐Ÿ”Œ Initializing Core DQL extension") licenseManager = Some(licenseRefreshStrategy.licenseManager) + capHitCollector = licenseRefreshStrategy.telemetryCollector Right(()) } @@ -172,6 +182,9 @@ class CoreDqlExtension extends ExtensionSpi { (single.limit, quota.maxQueryResults) match { // (1) Explicit LIMIT over a finite quota โ†’ hard 402 (UNCHANGED). case (Some(l), Some(max)) if max < l.limit => + // Story P0.6 โ€” the meter bit: record a QueryResults cap-hit BEFORE building the 402 + // (side-effect only; the reject itself is unchanged โ€” AC 8). + capHitCollector.incrementCapHit(TelemetryCollector.CapHitKind.QueryResults) logger.warn( s"โš ๏ธ Query result limit (${l.limit}) exceeds license quota ($max)" ) @@ -198,6 +211,12 @@ class CoreDqlExtension extends ExtensionSpi { // and must NOT be re-routed to scroll (it would mishandle aggregation buckets). JOIN legs // (ResultCapContext.isSuppressed) also skip the cap so the join input is not truncated. case (None, Some(max)) if single.fields.nonEmpty && !ResultCapContext.isSuppressed => + // Story P0.6 (OQ-2) โ€” the no-LIMIT truncation IS the meter biting (non-fatally): count it + // as a QueryResults cap-hit, the SAME kind as the explicit-LIMIT 402 above. The cap is + // suppressed for JOIN legs (the `!isSuppressed` guard), so a per-leg input truncation is + // never counted โ€” only a genuine single-index result cap. (JOIN-output truncation is + // counted on the Joins axis downstream, not here โ€” no double-count.) + capHitCollector.incrementCapHit(TelemetryCollector.CapHitKind.QueryResults) logger.info( s"โ„น๏ธ No LIMIT on single-index scroll query; capping the stream at license quota ($max rows) and flagging truncation" ) diff --git a/core/src/test/scala/app/softnetwork/elastic/client/extensions/CoreDqlExtensionSpec.scala b/core/src/test/scala/app/softnetwork/elastic/client/extensions/CoreDqlExtensionSpec.scala index 9fe16da9..c5b509eb 100644 --- a/core/src/test/scala/app/softnetwork/elastic/client/extensions/CoreDqlExtensionSpec.scala +++ b/core/src/test/scala/app/softnetwork/elastic/client/extensions/CoreDqlExtensionSpec.scala @@ -23,6 +23,7 @@ import app.softnetwork.elastic.client._ import app.softnetwork.elastic.client.result._ import app.softnetwork.elastic.client.scroll.{ScrollConfig, ScrollMetrics} import app.softnetwork.elastic.licensing._ +import app.softnetwork.elastic.licensing.metrics.MetricsApi import app.softnetwork.elastic.sql.parser.Parser import app.softnetwork.elastic.sql.query.{SearchStatement, SelectStatement, SingleSearch} import com.typesafe.config.ConfigFactory @@ -63,11 +64,15 @@ class CoreDqlExtensionSpec extends AnyFlatSpec with Matchers { override def licenseType: LicenseType = tier } - private def strategy(mgr: LicenseManager): LicenseRefreshStrategy = + private def strategy( + mgr: LicenseManager, + collector: TelemetryCollector = TelemetryCollector.Noop + ): LicenseRefreshStrategy = new LicenseRefreshStrategy { override def initialize(): LicenseKey = LicenseKey.Community override def refresh(): Either[LicenseError, LicenseKey] = Left(RefreshNotSupported) override def licenseManager: LicenseManager = mgr + override def telemetryCollector: TelemetryCollector = collector } /** Records every statement forwarded to the scroll / searchAsync seam plus the scroll config, so @@ -283,4 +288,78 @@ class CoreDqlExtensionSpec extends AnyFlatSpec with Matchers { client.scrolledConfig.get().maxDocuments shouldBe None truncationOf(res) shouldBe None } + + // ---- Story P0.6: QueryResults cap-hit recorded on BOTH reject branches ---- + + behavior of "CoreDqlExtension cap-hit instrumentation (P0.6)" + + private def runWithCollector( + sql: String, + quota: Quota, + tier: LicenseType, + suppress: Boolean = false + ): (TelemetryCollector, ElasticResult[QueryResult]) = { + val parsed = Parser(sql) match { + case Right(s) => s + case Left(e) => fail(s"parse failed: ${e.msg}") + } + val collector = new TelemetryCollector + val client = new RecordingClient() + val ext = new CoreDqlExtension() + ext.initialize(ConfigFactory.empty(), strategy(managerWithQuota(quota, tier), collector)) + val res = + if (suppress) + ResultCapContext.suppressed(Await.result(ext.execute(parsed, client), 5.seconds)) + else Await.result(ext.execute(parsed, client), 5.seconds) + (collector, res) + } + + private def capHits(c: TelemetryCollector): Map[String, Long] = + c.collect(MetricsApi.Noop).capHitsByKind + + it should "increment the QueryResults cap-hit on the explicit-LIMIT 402 branch" in { + val (collector, res) = + runWithCollector("SELECT a FROM idx LIMIT 20000", Quota.Community, LicenseType.Community) + res shouldBe a[ElasticFailure] + res.asInstanceOf[ElasticFailure].elasticError.statusCode shouldBe Some(402) + capHits(collector)("max_query_results") shouldBe 1L + // no other bucket bumped + capHits(collector)("max_joins") shouldBe 0L + capHits(collector)("max_clusters") shouldBe 0L + capHits(collector)("max_materialized_views") shouldBe 0L + } + + it should "increment the QueryResults cap-hit on the P0.5 no-LIMIT truncation branch (OQ-2)" in { + val (collector, res) = + runWithCollector("SELECT a, b FROM idx", Quota.Community, LicenseType.Community) + res shouldBe a[ElasticSuccess[_]] + truncationOf(res).map(_.truncated) shouldBe Some(true) + capHits(collector)("max_query_results") shouldBe 1L + } + + it should "NOT increment any cap-hit when an explicit LIMIT is within quota" in { + val (collector, res) = + runWithCollector("SELECT a FROM idx LIMIT 50", Quota.Community, LicenseType.Community) + res shouldBe a[ElasticSuccess[_]] + capHits(collector).values.toSet shouldBe Set(0L) + } + + it should "NOT increment a cap-hit when a no-LIMIT query is a suppressed JOIN leg" in { + val (collector, res) = + runWithCollector( + "SELECT a, b FROM idx", + Quota.Community, + LicenseType.Community, + suppress = true + ) + res shouldBe a[ElasticSuccess[_]] + capHits(collector).values.toSet shouldBe Set(0L) + } + + it should "NOT increment a cap-hit for an unlimited (Enterprise) no-LIMIT query" in { + val (collector, res) = + runWithCollector("SELECT a, b FROM idx", Quota.Enterprise, LicenseType.Enterprise) + res shouldBe a[ElasticSuccess[_]] + capHits(collector).values.toSet shouldBe Set(0L) + } } diff --git a/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala b/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala index 16712dab..f356878f 100644 --- a/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala +++ b/licensing/src/main/scala/app/softnetwork/elastic/licensing/TelemetryCollector.scala @@ -38,6 +38,16 @@ import java.util.concurrent.atomic.AtomicLong * `Map.empty` default below applies only to the bare/no-op construction, never to a `collect()` / * `collectAndReset()` result. Story 15.2 reads `joinQueryCount` into the `InstancePing` proto * `join_query_count` field. + * + * Story P0.6 -- `capHitsByKind` is the per-quota-type cap-hit DELTA, with the SAME per-interval + * semantics as `joinQueryByRow` (NOT cumulative like `queriesTotal`): it counts how many times a + * license quota REJECTED an operation in the interval, keyed by the four quota types + * (`max_materialized_views`, `max_query_results`, `max_joins`, `max_clusters`). This is the launch + * "is the meter biting?" leading indicator (PRD ยง15.1 cap-hits). Like the JOIN buckets it ALWAYS + * carries all four keys on a COLLECTED snapshot (a zero is an explicit `0`, never an absent entry) + * so the wire shape stays stable for the daily ping; the `Map.empty` default applies only to the + * bare/no-op construction. Story P0.6 reads these into the `InstancePing` proto `capHitMax*` + * fields (14-17). */ case class TelemetryData( queriesTotal: Long = 0, @@ -48,7 +58,8 @@ case class TelemetryData( clusterVersion: Option[String] = None, aggregatedMetrics: AggregatedMetrics = AggregatedMetrics.empty, joinQueryCount: Long = 0, - joinQueryByRow: Map[String, Long] = Map.empty + joinQueryByRow: Map[String, Long] = Map.empty, + capHitsByKind: Map[String, Long] = Map.empty ) /** Mutable telemetry collector with atomic counters. @@ -70,6 +81,12 @@ class TelemetryCollector { private val _joinPassthrough = new AtomicLong(0L) private val _joinCrossCluster = new AtomicLong(0L) private val _joinCoordinator = new AtomicLong(0L) + // Story P0.6 -- per-interval cap-hit DELTA buckets, one per quota type. Same lock-free pattern as + // the JOIN buckets: independent AtomicLong so the increment never contends with collect/reset. + private val _capMv = new AtomicLong(0L) + private val _capResults = new AtomicLong(0L) + private val _capJoins = new AtomicLong(0L) + private val _capClusters = new AtomicLong(0L) private val clusterInfoLock = new AnyRef private var _mvsActive: Int = 0 private var _clustersConnected: Int = 0 @@ -106,6 +123,36 @@ class TelemetryCollector { (p + x + c, Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c)) } + /** Record one license cap-hit (a quota REJECTED an operation), attributed to its quota type + * (Story P0.6). Real-time, lock-free (AtomicLong); the four buckets are independent so the + * increment never contends with collect/collectAndReset. Called at each of the four reject sites + * (CoreDqlExtension / MaterializedViewExtension / JoinPlanner / JoinLicenseGuard), regardless of + * whether that reject surfaces as an HTTP 402, a `Left(String)`, or a startup exit -- this is a + * SEMANTIC cap-hit, not an HTTP-status filter (PRD ยง15.1 / P0.6 OQ-1). + */ + def incrementCapHit(kind: TelemetryCollector.CapHitKind): Unit = { + val _ = (kind match { + case TelemetryCollector.CapHitKind.MaterializedViews => _capMv + case TelemetryCollector.CapHitKind.QueryResults => _capResults + case TelemetryCollector.CapHitKind.Joins => _capJoins + case TelemetryCollector.CapHitKind.Clusters => _capClusters + }).incrementAndGet() + } + + /** Snapshot+reset ONLY the cap-hit delta (read+zero with `getAndSet`). Used by the scheduled tick + * when `config.telemetryEnabled` is false (so the buckets still reset, mirroring AC 5a) and by + * the clean-shutdown flush (mirroring AC 5b / P0.6 OQ-8 -- the `maxClusters` cap-hit happens on + * a startup CrashLoop, so its delta MUST be flushed before `sys.exit`). ALWAYS returns all four + * keys (an explicit `0`, never an absent entry) so the wire shape stays stable for the daily + * ping, exactly like `collectAndResetJoinCounts`. + */ + def collectAndResetCapHits(): Map[String, Long] = Map( + "max_materialized_views" -> _capMv.getAndSet(0L), + "max_query_results" -> _capResults.getAndSet(0L), + "max_joins" -> _capJoins.getAndSet(0L), + "max_clusters" -> _capClusters.getAndSet(0L) + ) + def setMvsActive(count: Int): Unit = clusterInfoLock.synchronized { _mvsActive = count } def setClustersConnected(count: Int): Unit = clusterInfoLock.synchronized { @@ -124,6 +171,14 @@ class TelemetryCollector { // --- Read methods (called by AutoRefreshStrategy.doScheduleRefresh) --- + /** Read-only snapshot of the cap-hit buckets (all four keys, NO reset). Used by `collect`. */ + private def readCapHits(): Map[String, Long] = Map( + "max_materialized_views" -> _capMv.get(), + "max_query_results" -> _capResults.get(), + "max_joins" -> _capJoins.get(), + "max_clusters" -> _capClusters.get() + ) + def collect(metrics: MetricsApi): TelemetryData = clusterInfoLock.synchronized { // JOIN buckets: read with .get() -- collect NEVER resets (interval read without flush). val p = _joinPassthrough.get() @@ -138,7 +193,9 @@ class TelemetryCollector { clusterVersion = _clusterVersion, aggregatedMetrics = metrics.getAggregatedMetrics, joinQueryCount = p + x + c, - joinQueryByRow = Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c) + joinQueryByRow = Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c), + // Cap-hit buckets: read with .get() -- collect NEVER resets (interval read without flush). + capHitsByKind = readCapHits() ) } @@ -160,7 +217,9 @@ class TelemetryCollector { clusterVersion = _clusterVersion, aggregatedMetrics = metrics.collectAndResetAggregatedMetrics, joinQueryCount = p + x + c, - joinQueryByRow = Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c) + joinQueryByRow = Map("passthrough" -> p, "cross_cluster" -> x, "coordinator" -> c), + // Cap-hit buckets: read+zero with .getAndSet(0L) -- per-interval delta; this IS the flush. + capHitsByKind = collectAndResetCapHits() ) } } @@ -189,6 +248,37 @@ object TelemetryCollector { case object Coordinator extends JoinRow } + /** The license quota type a cap-hit is attributed to (Story P0.6). Each rejected operation + * increments exactly one bucket. The four types map 1:1 to the four enforced quotas; the `key` + * is the snake-case wire/column name carried to the license-server `instance_ping` cap-hit + * columns. + */ + sealed trait CapHitKind { def key: String } + object CapHitKind { + + /** `maxMaterializedViews` -- `MaterializedViewExtension` CREATE over quota (the quota-exceeded + * 402 branch ONLY; NOT the feature-absent path -- P0.6 OQ-7). + */ + case object MaterializedViews extends CapHitKind { val key = "max_materialized_views" } + + /** `maxQueryResults` -- `CoreDqlExtension` single-index result boundary: BOTH the + * explicit-LIMIT 402 reject AND the P0.5 no-LIMIT truncation/cappedScroll bite (P0.6 OQ-2). + */ + case object QueryResults extends CapHitKind { val key = "max_query_results" } + + /** `maxJoins` -- `JoinPlanner.plan()` cross-index JOIN-count reject (`Left(String)`, NOT a + * 402). + */ + case object Joins extends CapHitKind { val key = "max_joins" } + + /** `maxClusters` -- `JoinLicenseGuard` federation cluster-count reject (startup `Left`/exit or + * `checkClusterLimit` `Left(QuotaExceeded)`, NOT a 402). + */ + case object Clusters extends CapHitKind { val key = "max_clusters" } + + val values: Seq[CapHitKind] = Seq(MaterializedViews, QueryResults, Joins, Clusters) + } + /** Default collector returning zero-valued data. Used when telemetry is disabled or no runtime * wires a real collector. Write methods are no-ops to prevent accidental mutation of the shared * singleton. @@ -198,6 +288,9 @@ object TelemetryCollector { override def incrementJoin(row: JoinRow): Unit = () override def collectAndResetJoinCounts(): (Long, Map[String, Long]) = (0L, Map("passthrough" -> 0L, "cross_cluster" -> 0L, "coordinator" -> 0L)) + override def incrementCapHit(kind: CapHitKind): Unit = () + override def collectAndResetCapHits(): Map[String, Long] = + CapHitKind.values.map(_.key -> 0L).toMap override def setMvsActive(count: Int): Unit = () override def setClustersConnected(count: Int): Unit = () override def setClusterInfo( diff --git a/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala b/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala index e4c5647a..2915a67b 100644 --- a/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala +++ b/licensing/src/test/scala/app/softnetwork/elastic/licensing/TelemetryCollectorSpec.scala @@ -238,4 +238,115 @@ class TelemetryCollectorSpec extends AnyFlatSpec with Matchers { total shouldBe 0 byRow shouldBe Map("passthrough" -> 0L, "cross_cluster" -> 0L, "coordinator" -> 0L) } + + // --- Story P0.6: cap-hit counter --- + + private val allCapKeys = + Set("max_materialized_views", "max_query_results", "max_joins", "max_clusters") + + "TelemetryCollector" should "increment ONLY the bucket for the given cap-hit kind" in { + val collector = new TelemetryCollector + collector.incrementCapHit(TelemetryCollector.CapHitKind.MaterializedViews) + collector.incrementCapHit(TelemetryCollector.CapHitKind.QueryResults) + collector.incrementCapHit(TelemetryCollector.CapHitKind.QueryResults) + collector.incrementCapHit(TelemetryCollector.CapHitKind.Joins) + collector.incrementCapHit(TelemetryCollector.CapHitKind.Clusters) + collector.incrementCapHit(TelemetryCollector.CapHitKind.Clusters) + collector.incrementCapHit(TelemetryCollector.CapHitKind.Clusters) + val data = collector.collect(noop) + data.capHitsByKind("max_materialized_views") shouldBe 1 + data.capHitsByKind("max_query_results") shouldBe 2 + data.capHitsByKind("max_joins") shouldBe 1 + data.capHitsByKind("max_clusters") shouldBe 3 + } + + it should "ALWAYS carry all four cap-hit keys even when their buckets are 0" in { + val collector = new TelemetryCollector + val data = collector.collect(noop) + data.capHitsByKind.keySet shouldBe allCapKeys + data.capHitsByKind.values.toSet shouldBe Set(0L) + // and the same on a reset read + val reset = collector.collectAndReset(noop) + reset.capHitsByKind.keySet shouldBe allCapKeys + reset.capHitsByKind.values.toSet shouldBe Set(0L) + } + + it should "NOT reset the cap-hit counters on collect (interval read without flush)" in { + val collector = new TelemetryCollector + collector.incrementCapHit(TelemetryCollector.CapHitKind.Joins) + collector.collect(noop).capHitsByKind("max_joins") shouldBe 1 + collector.collect(noop).capHitsByKind("max_joins") shouldBe 1 // still 1 -- collect never resets + } + + it should "reset the cap-hit counters on collectAndReset (per-interval delta; shutdown flush)" in { + val collector = new TelemetryCollector + collector.incrementCapHit(TelemetryCollector.CapHitKind.Clusters) + collector.collectAndReset(noop).capHitsByKind("max_clusters") shouldBe 1 + // delta reset after the interval + collector.collectAndReset(noop).capHitsByKind("max_clusters") shouldBe 0 + } + + it should "snapshot+reset only the cap-hit delta via collectAndResetCapHits (all 4 keys)" in { + val collector = new TelemetryCollector + collector.incrementCapHit(TelemetryCollector.CapHitKind.MaterializedViews) + collector.incrementCapHit(TelemetryCollector.CapHitKind.Clusters) + val first = collector.collectAndResetCapHits() + first shouldBe Map( + "max_materialized_views" -> 1L, + "max_query_results" -> 0L, + "max_joins" -> 0L, + "max_clusters" -> 1L + ) + // reset: a second snapshot returns zeros, all 4 keys present + val second = collector.collectAndResetCapHits() + second shouldBe Map( + "max_materialized_views" -> 0L, + "max_query_results" -> 0L, + "max_joins" -> 0L, + "max_clusters" -> 0L + ) + // the cap-hit flush does NOT affect the JOIN buckets or queriesTotal + collector.incrementQueries() + collector.collect(noop).queriesTotal shouldBe 1 + collector.collect(noop).joinQueryCount shouldBe 0 + } + + it should "increment the cap-hit counters concurrently without loss" in { + val collector = new TelemetryCollector + val threads = (1 to 10).map { _ => + new Thread(() => { + (1 to 1000).foreach { _ => + collector.incrementCapHit(TelemetryCollector.CapHitKind.QueryResults) + } + }) + } + threads.foreach(_.start()) + threads.foreach(_.join()) + collector.collect(noop).capHitsByKind("max_query_results") shouldBe 10000 + } + + it should "expose exactly the four CapHitKind values with their snake-case keys" in { + TelemetryCollector.CapHitKind.values should have size 4 + TelemetryCollector.CapHitKind.values.map(_.key).toSet shouldBe allCapKeys + TelemetryCollector.CapHitKind.MaterializedViews.key shouldBe "max_materialized_views" + TelemetryCollector.CapHitKind.QueryResults.key shouldBe "max_query_results" + TelemetryCollector.CapHitKind.Joins.key shouldBe "max_joins" + TelemetryCollector.CapHitKind.Clusters.key shouldBe "max_clusters" + } + + "TelemetryCollector.Noop" should "no-op incrementCapHit and zero-out collectAndResetCapHits" in { + TelemetryCollector.Noop.incrementCapHit(TelemetryCollector.CapHitKind.Joins) + TelemetryCollector.Noop.collect(noop).capHitsByKind shouldBe Map( + "max_materialized_views" -> 0L, + "max_query_results" -> 0L, + "max_joins" -> 0L, + "max_clusters" -> 0L + ) + TelemetryCollector.Noop.collectAndResetCapHits() shouldBe Map( + "max_materialized_views" -> 0L, + "max_query_results" -> 0L, + "max_joins" -> 0L, + "max_clusters" -> 0L + ) + } }