Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ All notable changes to this project will be documented in this file.
- Add `OutboundIcmp` target type (`= 2`) to the geolocation onchain program, enabling ICMP-based probing as an alternative to TWAMP for outbound geolocation targets
- Tools
- Add `twamp-debug` diagnostic tool for testing kernel timestamping support on switches; sends real TWAMP probes to verify which SO_TIMESTAMPING modes (RX/TX software/hardware/sched) actually deliver timestamps, and reports RTT statistics comparing userspace vs kernel timestamp sources
- Geolocation
- geoprobe-target can now store LocationOffset messages in ClickHouse

## [v0.15.0](https://github.com/malbeclabs/doublezero/compare/client/v0.14.0...client/v0.15.0) - 2026-03-27

Expand Down
43 changes: 39 additions & 4 deletions controlplane/telemetry/cmd/geoprobe-target/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

var chWriter *geoprobe.ClickhouseWriter
if chCfg := geoprobe.ClickhouseConfigFromEnv(); chCfg != nil {
log.Info("clickhouse enabled", "addr", chCfg.Addr, "db", chCfg.Database)

if err := geoprobe.RunMigrations(*chCfg, log); err != nil {
fmt.Fprintf(os.Stderr, "clickhouse migration failed: %v\n", err)
os.Exit(1)
}

chConn, err := geoprobe.NewClickhouseConn(*chCfg)
if err != nil {
fmt.Fprintf(os.Stderr, "clickhouse connect failed: %v\n", err)
os.Exit(1)
}
defer chConn.Close()

chWriter = geoprobe.NewClickhouseWriter(chConn, chCfg.Database, log)
go chWriter.Run(ctx)
log.Info("clickhouse writer started")
}

errCh := make(chan error, 2)

limiter := newRateLimiter(*rateLimit)
Expand All @@ -86,7 +107,7 @@ func main() {
}

go runTWAMPReflector(ctx, log, *twampPort, errCh)
go runUDPListener(ctx, log, *udpPort, *verifySignature, limiter, errCh)
go runUDPListener(ctx, log, *udpPort, *verifySignature, limiter, chWriter, errCh)

select {
case err := <-errCh:
Expand Down Expand Up @@ -214,7 +235,7 @@ func runTWAMPReflector(ctx context.Context, log *slog.Logger, port uint, errCh c
}
}

func runUDPListener(ctx context.Context, log *slog.Logger, port uint, verifySignatures bool, limiter *rateLimiter, errCh chan<- error) {
func runUDPListener(ctx context.Context, log *slog.Logger, port uint, verifySignatures bool, limiter *rateLimiter, chWriter *geoprobe.ClickhouseWriter, errCh chan<- error) {
conn, err := geoprobe.NewUDPListener(int(port))
if err != nil {
errCh <- fmt.Errorf("failed to create UDP listener: %w", err)
Expand Down Expand Up @@ -276,7 +297,7 @@ func runUDPListener(ctx context.Context, log *slog.Logger, port uint, verifySign
continue
}

handleOffset(log, offset, addr, verifySignatures)
handleOffset(log, offset, addr, verifySignatures, chWriter)
}
}

Expand All @@ -294,7 +315,7 @@ func countReferenceDepth(offset *geoprobe.LocationOffset) int {
return maxDepth + 1
}

func handleOffset(log *slog.Logger, offset *geoprobe.LocationOffset, addr *net.UDPAddr, verifySignatures bool) {
func handleOffset(log *slog.Logger, offset *geoprobe.LocationOffset, addr *net.UDPAddr, verifySignatures bool, chWriter *geoprobe.ClickhouseWriter) {
signatureValid := true
var verifyError error

Expand All @@ -304,6 +325,20 @@ func handleOffset(log *slog.Logger, offset *geoprobe.LocationOffset, addr *net.U
log.Debug("signature verification complete", "authority_pubkey", solana.PublicKeyFromBytes(offset.AuthorityPubkey[:]).String(), "valid", signatureValid)
}

if chWriter != nil {
rawBytes, err := offset.Marshal()
if err != nil {
log.Error("failed to marshal offset for clickhouse", "error", err)
} else {
sigErrStr := ""
if verifyError != nil {
sigErrStr = verifyError.Error()
}
row := geoprobe.OffsetRowFromLocationOffset(offset, addr.String(), signatureValid, sigErrStr, rawBytes)
chWriter.Record(row)
}
}

output := formatLocationOffset(offset, addr, signatureValid, verifyError)

if *logFormat == "json" {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- +goose Up
CREATE TABLE IF NOT EXISTS location_offsets (
received_at DateTime64(3),
source_addr String,
authority_pubkey LowCardinality(String),
sender_pubkey LowCardinality(String),
measurement_slot UInt64,
lat Float64,
lng Float64,
measured_rtt_ns UInt64,
rtt_ns UInt64,
target_ip String,
num_references UInt8,
signature_valid Bool,
signature_error String,
raw_offset String,
ref_authority_pubkeys Array(String),
ref_sender_pubkeys Array(String),
ref_measured_rtt_ns Array(UInt64),
ref_rtt_ns Array(UInt64)
) ENGINE = MergeTree
PARTITION BY toYYYYMM(received_at)
ORDER BY (received_at, sender_pubkey)
TTL toDateTime(received_at) + INTERVAL 90 DAY;

-- +goose Down
DROP TABLE IF EXISTS location_offsets;
6 changes: 6 additions & 0 deletions controlplane/telemetry/db/clickhouse/migrations/embed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package migrations

import "embed"

//go:embed *.sql
var FS embed.FS
215 changes: 215 additions & 0 deletions controlplane/telemetry/internal/geoprobe/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package geoprobe

import (
"context"
"crypto/tls"
"encoding/hex"
"fmt"
"log/slog"
"os"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/gagliardetto/solana-go"
)

type ClickhouseConfig struct {
Addr string
Database string
Username string
Password string
Secure bool
}

func ClickhouseConfigFromEnv() *ClickhouseConfig {
addr := os.Getenv("CLICKHOUSE_ADDR")
if addr == "" {
return nil
}
db := os.Getenv("CLICKHOUSE_DB")
if db == "" {
db = "default"
}
user := os.Getenv("CLICKHOUSE_USER")
if user == "" {
user = "default"
}
return &ClickhouseConfig{
Addr: addr,
Database: db,
Username: user,
Password: os.Getenv("CLICKHOUSE_PASS"),
Secure: os.Getenv("CLICKHOUSE_TLS_DISABLED") != "true",
}
}

func NewClickhouseConn(cfg ClickhouseConfig) (driver.Conn, error) {
opts := &clickhouse.Options{
Addr: []string{cfg.Addr},
Auth: clickhouse.Auth{
Database: cfg.Database,
Username: cfg.Username,
Password: cfg.Password,
},
MaxOpenConns: 5,
DialTimeout: 30 * time.Second,
}
if cfg.Secure {
opts.TLS = &tls.Config{}
}

conn, err := clickhouse.Open(opts)
if err != nil {
return nil, fmt.Errorf("clickhouse open: %w", err)
}
if err := conn.Ping(context.Background()); err != nil {
return nil, fmt.Errorf("clickhouse ping: %w", err)
}
return conn, nil
}

type OffsetRow struct {
ReceivedAt time.Time
SourceAddr string
AuthorityPubkey string
SenderPubkey string
MeasurementSlot uint64
Lat float64
Lng float64
MeasuredRttNs uint64
RttNs uint64
TargetIP string
NumReferences uint8
SignatureValid bool
SignatureError string
RawOffset string
RefAuthorityPubkeys []string
RefSenderPubkeys []string
RefMeasuredRttNs []uint64
RefRttNs []uint64
}

func OffsetRowFromLocationOffset(offset *LocationOffset, sourceAddr string, sigValid bool, sigError string, rawBytes []byte) OffsetRow {
row := OffsetRow{
ReceivedAt: time.Now(),
SourceAddr: sourceAddr,
AuthorityPubkey: solana.PublicKeyFromBytes(offset.AuthorityPubkey[:]).String(),
SenderPubkey: solana.PublicKeyFromBytes(offset.SenderPubkey[:]).String(),
MeasurementSlot: offset.MeasurementSlot,
Lat: offset.Lat,
Lng: offset.Lng,
MeasuredRttNs: offset.MeasuredRttNs,
RttNs: offset.RttNs,
TargetIP: FormatTargetIP(offset.TargetIP),
NumReferences: offset.NumReferences,
SignatureValid: sigValid,
SignatureError: sigError,
RawOffset: hex.EncodeToString(rawBytes),
}

for _, ref := range offset.References {
row.RefAuthorityPubkeys = append(row.RefAuthorityPubkeys, solana.PublicKeyFromBytes(ref.AuthorityPubkey[:]).String())
row.RefSenderPubkeys = append(row.RefSenderPubkeys, solana.PublicKeyFromBytes(ref.SenderPubkey[:]).String())
row.RefMeasuredRttNs = append(row.RefMeasuredRttNs, ref.MeasuredRttNs)
row.RefRttNs = append(row.RefRttNs, ref.RttNs)
}

return row
}

type ClickhouseWriter struct {
conn driver.Conn
db string
buf []OffsetRow
mu sync.Mutex
log *slog.Logger
}

func NewClickhouseWriter(conn driver.Conn, db string, log *slog.Logger) *ClickhouseWriter {
return &ClickhouseWriter{
conn: conn,
db: db,
buf: make([]OffsetRow, 0, 64),
log: log,
}
}

func (w *ClickhouseWriter) Record(row OffsetRow) {
w.mu.Lock()
w.buf = append(w.buf, row)
w.mu.Unlock()
}

func (w *ClickhouseWriter) Run(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
w.flush(shutdownCtx)
cancel()
return
case <-ticker.C:
w.flush(ctx)
}
}
}

func (w *ClickhouseWriter) flush(ctx context.Context) {
w.mu.Lock()
if len(w.buf) == 0 {
w.mu.Unlock()
return
}
rows := w.buf
w.buf = make([]OffsetRow, 0, 64)
w.mu.Unlock()

batch, err := w.conn.PrepareBatch(ctx, fmt.Sprintf(
`INSERT INTO "%s".location_offsets`, w.db,
))
if err != nil {
w.log.Error("failed to prepare batch", "error", err, "dropped_rows", len(rows))
return
}

for _, r := range rows {
if err := batch.Append(
r.ReceivedAt,
r.SourceAddr,
r.AuthorityPubkey,
r.SenderPubkey,
r.MeasurementSlot,
r.Lat,
r.Lng,
r.MeasuredRttNs,
r.RttNs,
r.TargetIP,
r.NumReferences,
r.SignatureValid,
r.SignatureError,
r.RawOffset,
r.RefAuthorityPubkeys,
r.RefSenderPubkeys,
r.RefMeasuredRttNs,
r.RefRttNs,
); err != nil {
w.log.Error("failed to append row", "error", err, "dropped_rows", len(rows))
_ = batch.Abort()
return
}
}

if err := batch.Send(); err != nil {
w.log.Error("failed to send batch", "error", err, "dropped_rows", len(rows))
_ = batch.Close()
return
}
_ = batch.Close()

w.log.Debug("flushed offsets to clickhouse", "count", len(rows))
}
Loading