Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions core/src/main/scala/app/softnetwork/elastic/client/repl/Repl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,64 @@ class Repl(
private var running = true
private var multilineBuffer = new StringBuilder()

// ==================== Story 15.2 -- daily product-instance telemetry ping ====================

/** Wall-clock start of this REPL session, for `session_duration_seconds`. */
private val sessionStart: java.time.Instant = java.time.Instant.now()

/** Count of user-typed SQL statements executed this session (success + fail), EXCLUDING
* meta/backslash commands and empty lines (OQ-3/P3). Incremented in `executeStatement`
* (interactive) and `executeBatch` (batch mode) -- NOT in the shared `executeStatementDirect`.
*/
@volatile private var commandsExecuted: Int = 0

/** Guards against a double session-end ping (e.g. a >24h daily tick racing the loop-exit tail).
*/
private val pingEmitted = new java.util.concurrent.atomic.AtomicBoolean(false)

/** Optional >24h daily ping timer (a long-lived interactive session pings each day). */
@volatile private var dailyPingTimer: Option[akka.actor.Cancellable] = None

private def sessionDurationSeconds: Long =
java.time.Duration.between(sessionStart, java.time.Instant.now()).getSeconds

/** Best-effort daily ping for a session that survives > 24h. Fire-and-forget; never blocks. */
private def emitDailyPing(): Unit =
Try {
executor.licenseRefreshStrategy.emitInstancePing(
product = "repl",
version = version,
sessionDurationSeconds = Some(sessionDurationSeconds),
commandsExecuted = Some(commandsExecuted)
)
}

/** Session-end ping (P1): fire-and-forget on a future with a SHORT bounded await (~2s) so a quick
* exit usually lands the ping but NEVER blocks teardown beyond the cap. Idempotent.
*/
private def emitSessionEndPing(): Unit =
if (pingEmitted.compareAndSet(false, true)) {
dailyPingTimer.foreach(_.cancel())
val f = scala.concurrent.Future {
executor.licenseRefreshStrategy.emitInstancePing(
product = "repl",
version = version,
sessionDurationSeconds = Some(sessionDurationSeconds),
commandsExecuted = Some(commandsExecuted)
)
}
Try(Await.result(f, 2.seconds))
()
}

def start(): Unit = {
printWelcomeBanner()

// A session that survives > 24h pings daily while it runs (P1 / AC-2.8).
dailyPingTimer = Some(
system.scheduler.scheduleWithFixedDelay(24.hours, 24.hours)(() => emitDailyPing())
)

while (running) {
try {
val prompt = if (multilineBuffer.isEmpty) {
Expand Down Expand Up @@ -120,6 +175,8 @@ class Repl(
}

printGoodbyeBanner()
// Story 15.2 (AC-2.8) -- session-end daily ping (best-effort, bounded await), before teardown.
emitSessionEndPing()
terminal.close()
}

Expand Down Expand Up @@ -163,6 +220,9 @@ class Repl(
private def executeStatement(sql: String): Unit = {
val cleanSql = sql.stripSuffix(";").trim
if (cleanSql.isEmpty) return
// Story 15.2 (OQ-3/P3) -- count user-typed SQL only (the SQL-only choke-point); meta/backslash
// commands reach executeStatementDirect by a different path and must NOT bump this counter.
commandsExecuted += 1
executeStatementDirect(cleanSql)
}

Expand Down Expand Up @@ -200,6 +260,7 @@ class Repl(
} match {
case Success(sql) =>
executeBatch(sql)
emitSessionEndPing() // Story 15.2 (AC-2.8) -- one ping at end of a batch run
0 // Success

case Failure(ex) =>
Expand All @@ -211,6 +272,7 @@ class Repl(
/** Execute single SQL command (batch mode) */
def executeCommand(sql: String): Int = {
executeBatch(sql)
emitSessionEndPing() // Story 15.2 (AC-2.8) -- one ping at end of a batch run
0
}

Expand All @@ -219,6 +281,8 @@ class Repl(

statements.foreach { stmt =>
println(s"\n${cyan("=>")} ${gray(stmt)}")
// Story 15.2 (OQ-3/P3) -- batch statements ARE user SQL; count each non-empty one here.
commandsExecuted += 1
executeStatementDirect(stmt)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package app.softnetwork.elastic.client.repl

import akka.actor.ActorSystem
import app.softnetwork.elastic.client.GatewayApi
import app.softnetwork.elastic.client.ElasticClientApi
import app.softnetwork.elastic.client.result.{
ElasticError,
ElasticFailure,
Expand All @@ -28,7 +28,23 @@ import app.softnetwork.elastic.client.result.{
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

class ReplExecutor(gateway: GatewayApi)(implicit system: ActorSystem, ec: ExecutionContext) {
// Story 15.2 -- the executor takes an `ElasticClientApi` (which IS a `GatewayApi` AND an
// `ExtensionApi`) rather than the narrower `GatewayApi`, so the REPL can reach
// `licenseRefreshStrategy`/`licenseManager` (ExtensionApi members) to emit the daily ping.
class ReplExecutor(gateway: ElasticClientApi)(implicit system: ActorSystem, ec: ExecutionContext) {

/** Story 15.2 -- expose the gateway's license refresh strategy so the REPL can emit the daily
* product-instance telemetry ping (the strategy carries instance_id / uptime / tier /
* join_query_count and the ELv2 ping transport). The Repl reaches it through the executor.
*/
def licenseRefreshStrategy: app.softnetwork.elastic.licensing.LicenseRefreshStrategy =
gateway.licenseRefreshStrategy

/** Story 15.2 -- the resolved license tier (displayName, e.g. "Community"), for the ping
* `license_tier`.
*/
def licenseType: app.softnetwork.elastic.licensing.LicenseType =
gateway.licenseManager.licenseType

/** Execute SQL and return formatted result */
def execute(sql: String): Future[ExecutionResult] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package app.softnetwork.elastic.client.repl
import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import app.softnetwork.elastic.client.GatewayApi
import app.softnetwork.elastic.client.ElasticClientApi
import app.softnetwork.elastic.client.result.{
ElasticError,
ElasticFailure,
Expand All @@ -36,7 +36,7 @@ import scala.concurrent.duration._

/** Extended executor with stream support
*/
class StreamingReplExecutor(gateway: GatewayApi)(implicit
class StreamingReplExecutor(gateway: ElasticClientApi)(implicit
system: ActorSystem,
ec: ExecutionContext,
materializer: Materializer
Expand Down
9 changes: 9 additions & 0 deletions licensing/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,13 @@ softclient4es {

cache-dir = ${user.home}/.softclient4es
}

# Story 15.2 (A8) -- the daily product-instance telemetry ping opt-out switch (all five surfaces,
# all tiers including Community). DISTINCT from `softclient4es.license.telemetry.enabled` above
# (which gates only detailed metrics inside the license-refresh body). Set to false to disable the
# daily ping; read by TelemetryConfig.load, NOT by LicenseConfig.
telemetry {
enabled = true
enabled = ${?SOFTCLIENT4ES_TELEMETRY_ENABLED}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,39 @@ trait LicenseRefreshStrategy {

/** Shutdown background resources (scheduler, etc.). Default is no-op. */
def shutdown(): Unit = ()

/** Story 15.2 (A15) -- emit ONE daily product-instance telemetry ping for the given surface.
*
* Best-effort, fire-and-forget: never throws to the caller, gated on the NEW
* `softclient4es.telemetry.enabled` opt-out ([[TelemetryConfig]]). This trait lives in the
* Apache `licensing` module (visible to `core`/REPL), but the actual ELv2 ping transport
* (`InstancePingClient`) lives downstream in the extensions module — so this is the seam the
* REPL (and any surface holding only a `LicenseRefreshStrategy`) calls; the extensions
* strategies override it to build the surface-specific report and POST it. The strategy itself
* supplies the stable `instance_id`, `uptime_seconds`, `license_tier` and `join_query_count`
* (from [[telemetryCollector]] / [[licenseManager]]); the caller supplies only the
* surface-specific fields. The default implementation is a no-op (Community/Apache build with no
* extension wired).
*
* @param product
* the surface discriminator (`ProductType.displayName`, e.g. `"repl"`)
* @param version
* the surface's own build version
* @param sessionDurationSeconds
* REPL session duration (None for non-REPL surfaces)
* @param commandsExecuted
* REPL executed-statement count (None for non-REPL surfaces)
* @param clusterName
* sidecar single-cluster name (None where N/A)
* @param clusterCount
* federation connected-cluster count (None where N/A)
*/
def emitInstancePing(
product: String,
version: String,
sessionDurationSeconds: Option[Long] = None,
commandsExecuted: Option[Int] = None,
clusterName: Option[String] = None,
clusterCount: Option[Int] = None
): Unit = ()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2025 SOFTNETWORK
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package app.softnetwork.elastic.licensing

import com.typesafe.config.{Config, ConfigFactory}

/** Story 15.2 (A8) -- reads the NEW top-level `softclient4es.telemetry.enabled` opt-out switch for
* the daily product-instance telemetry ping (all five surfaces, all tiers including Community).
*
* This is DELIBERATELY DISTINCT from `softclient4es.license.telemetry.enabled`
* ([[LicenseConfig.telemetryEnabled]]), which gates ONLY the detailed operation/index metrics
* inside the license-REFRESH body (and is semantically meaningless for Community, which has no
* refresh). The daily ping reads THIS key, never `LicenseConfig`. See the privacy doc (Story 15.6)
* two-key table.
*
* Default is `true` (opt-out, not opt-in) for ALL tiers -- supplied by the `reference.conf`
* `softclient4es.telemetry { enabled = true }` block.
*/
case class TelemetryConfig(enabled: Boolean)

object TelemetryConfig {

/** The HOCON key the daily ping gates on. The endpoint is NOT a config key (A9 anti-tamper). */
val EnabledKey: String = "softclient4es.telemetry.enabled"

def load(): TelemetryConfig = load(ConfigFactory.load())

def load(config: Config): TelemetryConfig =
TelemetryConfig(enabled =
if (config.hasPath(EnabledKey)) config.getBoolean(EnabledKey) else true
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ package object licensing {
case object Community extends LicenseType // Gratuit
case object Pro extends LicenseType // Payant
case object Enterprise extends LicenseType // Payant + support

/** Story 15.2 (A11) -- allowlist source for the daily-ping `license_tier` validation
* (`validTiers = LicenseType.values.map(_.displayName)`).
*/
def values: Seq[LicenseType] = Seq(Community, Pro, Enterprise)

def upgradeTo(licenseType: LicenseType): LicenseType = licenseType match {
case Community => Pro
case Pro => Enterprise
Expand All @@ -53,43 +59,67 @@ package object licensing {

sealed trait Feature

/** Story 15.2 (A11) -- marker for product-surface features. The daily product-instance ping's
* `product` discriminator is `ProductType.displayName` (the snake feature name), and the
* license-server `InstancePingDecorator.validate()` allowlist is
* `ProductType.values.map(_.displayName)`.
*/
sealed trait ProductType extends Feature {
def displayName: String = Feature.toSnakeCase(this)
}

object ProductType {
def values: Seq[ProductType] =
Seq(
Feature.JdbcDriver,
Feature.FlightSql,
Feature.AdbcDriver,
Feature.Repl,
Feature.Federation
)
}

object Feature {
case object MaterializedViews extends Feature
case object JdbcDriver extends Feature
case object OdbcDriver extends Feature
case object JdbcDriver extends ProductType
case object AdbcDriver extends ProductType // Story 15.2 (A11) -- replaces OdbcDriver
case object UnlimitedResults extends Feature
case object AdvancedAggregations extends Feature
case object FlightSql extends Feature
case object Federation extends Feature
case object FlightSql extends ProductType
case object Federation extends ProductType
case object Repl extends ProductType // Story 15.2 (A11) -- NEW
def values: Seq[Feature] = Seq(
MaterializedViews,
JdbcDriver,
OdbcDriver,
AdbcDriver,
UnlimitedResults,
AdvancedAggregations,
FlightSql,
Federation
Federation,
Repl
)

def fromString(s: String): Option[Feature] = s.trim.toLowerCase match {
case "materialized_views" => Some(MaterializedViews)
case "jdbc_driver" => Some(JdbcDriver)
case "odbc_driver" => Some(OdbcDriver)
case "adbc_driver" => Some(AdbcDriver)
case "unlimited_results" => Some(UnlimitedResults)
case "advanced_aggregations" => Some(AdvancedAggregations)
case "flight_sql" => Some(FlightSql)
case "federation" => Some(Federation)
case "repl" => Some(Repl)
case _ => None
}

def toSnakeCase(f: Feature): String = f match {
case MaterializedViews => "materialized_views"
case JdbcDriver => "jdbc_driver"
case OdbcDriver => "odbc_driver"
case AdbcDriver => "adbc_driver"
case UnlimitedResults => "unlimited_results"
case AdvancedAggregations => "advanced_aggregations"
case FlightSql => "flight_sql"
case Federation => "federation"
case Repl => "repl"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class FeatureFromStringSpec extends AnyFlatSpec with Matchers {
Feature.fromString("jdbc_driver") shouldBe Some(Feature.JdbcDriver)
}

it should "map odbc_driver" in {
Feature.fromString("odbc_driver") shouldBe Some(Feature.OdbcDriver)
it should "map adbc_driver" in {
Feature.fromString("adbc_driver") shouldBe Some(Feature.AdbcDriver)
}

it should "map unlimited_results" in {
Expand All @@ -49,6 +49,10 @@ class FeatureFromStringSpec extends AnyFlatSpec with Matchers {
Feature.fromString("federation") shouldBe Some(Feature.Federation)
}

it should "map repl" in {
Feature.fromString("repl") shouldBe Some(Feature.Repl)
}

it should "return None for unknown string" in {
Feature.fromString("warp_drive") shouldBe None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class FeatureSpec extends AnyFlatSpec with Matchers {
Feature.Federation shouldBe a[Feature]
}

"Feature.values" should "contain all 7 features" in {
Feature.values should have size 7
"Feature.values" should "contain all 8 features" in {
Feature.values should have size 8
}

it should "contain FlightSql and Federation" in {
Expand All @@ -42,11 +42,12 @@ class FeatureSpec extends AnyFlatSpec with Matchers {
Feature.values shouldBe Seq(
Feature.MaterializedViews,
Feature.JdbcDriver,
Feature.OdbcDriver,
Feature.AdbcDriver,
Feature.UnlimitedResults,
Feature.AdvancedAggregations,
Feature.FlightSql,
Feature.Federation
Feature.Federation,
Feature.Repl
)
}
}
Loading
Loading