diff --git a/core/attach.go b/core/attach.go
index 1558fe4..5fa9baa 100644
--- a/core/attach.go
+++ b/core/attach.go
@@ -124,6 +124,10 @@ func (b *Bgpipe) AttachStages() error {
})
}
+ if err := b.attachHTTPStages(); err != nil {
+ return err
+ }
+
return nil
}
diff --git a/core/bgpipe.go b/core/bgpipe.go
index 3ccd118..b68672a 100644
--- a/core/bgpipe.go
+++ b/core/bgpipe.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
+ "net/http"
"os"
"os/signal"
"slices"
@@ -16,6 +17,7 @@ import (
"github.com/bgpfix/bgpfix/dir"
"github.com/bgpfix/bgpfix/msg"
"github.com/bgpfix/bgpfix/pipe"
+ "github.com/go-chi/chi/v5"
"github.com/knadh/koanf/v2"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
@@ -34,8 +36,11 @@ type Bgpipe struct {
K *koanf.Koanf // global config
Pipe *pipe.Pipe // bgpfix pipe
Stages []*StageBase // pipe stages
+ HTTP *http.Server // optional shared HTTP server
+ StartTime time.Time // when the pipeline started
- repo map[string]NewStage // maps cmd to new stage func
+ repo map[string]NewStage // maps cmd to new stage func
+ httpmux *chi.Mux // shared HTTP routes
wg_lwrite sync.WaitGroup // stages that write to pipe L
wg_lread sync.WaitGroup // stages that read from pipe L
@@ -102,6 +107,14 @@ func (b *Bgpipe) Run() error {
// attach our b.Start
b.Pipe.Options.OnStart(b.onStart)
+ // record start time and start optional HTTP API
+ b.StartTime = time.Now()
+ if err := b.startHTTP(); err != nil {
+ b.Error().Err(err).Msg("could not start HTTP API")
+ return err
+ }
+ defer b.stopHTTP()
+
// handle signals
go b.handleSignals()
diff --git a/core/config.go b/core/config.go
index 8e67b0c..edb7554 100644
--- a/core/config.go
+++ b/core/config.go
@@ -7,9 +7,6 @@ import (
"slices"
"strings"
- "net/http"
- _ "net/http/pprof"
-
"github.com/bgpfix/bgpfix/filter"
"github.com/knadh/koanf/providers/posflag"
"github.com/rs/zerolog"
@@ -33,11 +30,8 @@ func (b *Bgpipe) Configure() error {
zerolog.SetGlobalLevel(lvl)
}
- // pprof?
- if v := k.String("pprof"); len(v) > 0 {
- go func() {
- b.Fatal().Err(http.ListenAndServe(v, nil)).Msg("pprof failed")
- }()
+ if err := b.configureHTTP(); err != nil {
+ return err
}
// capabilities?
@@ -69,7 +63,8 @@ func (b *Bgpipe) addFlags() {
f.BoolP("version", "v", false, "print detailed version info and quit")
f.BoolP("explain", "n", false, "print the pipeline as configured and quit")
f.StringP("log", "l", "info", "log level (debug/info/warn/error/disabled)")
- f.String("pprof", "", "bind pprof to given listen address")
+ f.String("http", "", "bind HTTP API + Prometheus /metrics to given address")
+ f.Bool("pprof", false, "enable pprof at /debug/pprof/ (requires --http)")
f.StringSliceP("events", "e", []string{"PARSE", "ESTABLISHED", "EOR"}, "log given events (\"all\" means all events)")
f.StringSliceP("kill", "k", nil, "kill session on any of these events")
f.BoolP("stdin", "i", false, "read JSON from stdin")
diff --git a/core/http.go b/core/http.go
new file mode 100644
index 0000000..e85b12b
--- /dev/null
+++ b/core/http.go
@@ -0,0 +1,265 @@
+package core
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "html"
+ "net"
+ "net/http"
+ "net/http/pprof"
+ "strings"
+ "time"
+
+ vmmetrics "github.com/VictoriaMetrics/metrics"
+ "github.com/go-chi/chi/v5"
+)
+
+func (b *Bgpipe) configureHTTP() error {
+ addr := strings.TrimSpace(b.K.String("http"))
+ if addr == "" {
+ b.HTTP = nil
+ b.httpmux = nil
+ return nil
+ }
+
+ m := chi.NewRouter()
+ b.httpmux = m
+ b.HTTP = &http.Server{
+ Addr: addr,
+ Handler: m,
+ ReadHeaderTimeout: 5 * time.Second,
+ }
+
+ return nil
+}
+
+func (b *Bgpipe) startHTTP() error {
+ if b.HTTP == nil {
+ return nil
+ }
+
+ ln, err := net.Listen("tcp", b.HTTP.Addr)
+ if err != nil {
+ return fmt.Errorf("could not bind --http %s: %w", b.HTTP.Addr, err)
+ }
+
+ go func() {
+ err := b.HTTP.Serve(ln)
+ if err == nil || errors.Is(err, http.ErrServerClosed) {
+ return
+ }
+ b.Cancel(fmt.Errorf("http server failed: %w", err))
+ }()
+
+ b.Info().Str("addr", ln.Addr().String()).Msg("HTTP API listening")
+ return nil
+}
+
+func (b *Bgpipe) stopHTTP() {
+ if b.HTTP == nil {
+ return
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+ if err := b.HTTP.Shutdown(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
+ b.Warn().Err(err).Msg("HTTP API shutdown error")
+ }
+}
+
+func (b *Bgpipe) attachHTTPStages() error {
+ if b.httpmux == nil {
+ return nil
+ }
+
+ m := b.httpmux
+ used := make(map[string]struct{})
+
+ // mount per-stage routes
+ for _, s := range b.Stages {
+ if s == nil {
+ continue
+ }
+
+ r := chi.NewRouter()
+ if err := s.Stage.RouteHTTP(r); err != nil {
+ return s.Errorf("could not register HTTP API: %w", err)
+ }
+ if len(r.Routes()) == 0 {
+ continue
+ }
+
+ base := s.HTTPSlug()
+ if _, exists := used[base]; exists {
+ base = fmt.Sprintf("%s-%d", base, s.Index)
+ }
+ used[base] = struct{}{}
+
+ s.HTTPPath = "/stage/" + base
+ m.Mount(s.HTTPPath, r)
+
+ s.Info().Str("http", s.HTTPPath).Msg("stage HTTP API mounted")
+ }
+
+ // GET /metrics — Prometheus
+ m.Get("/metrics", func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
+ vmmetrics.WritePrometheus(w, true)
+ })
+
+ // GET /hc — k8s health check
+ m.Get("/hc", func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]any{
+ "status": "ok",
+ "version": b.Version,
+ "stages": b.StageCount(),
+ "uptime": time.Since(b.StartTime).Truncate(time.Second).String(),
+ })
+ })
+
+ // GET / — web dashboard
+ m.Get("/", b.httpDashboard)
+
+ // pprof?
+ if b.K.Bool("pprof") {
+ m.HandleFunc("/debug/pprof/", pprof.Index)
+ m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
+ m.HandleFunc("/debug/pprof/profile", pprof.Profile)
+ m.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
+ m.HandleFunc("/debug/pprof/trace", pprof.Trace)
+ b.Info().Msg("pprof enabled at /debug/pprof/")
+ }
+
+ return nil
+}
+
+func (b *Bgpipe) httpDashboard(w http.ResponseWriter, r *http.Request) {
+ uptime := time.Since(b.StartTime).Truncate(time.Second)
+
+ // collect stage info
+ type stageInfo struct {
+ Index int
+ Name string
+ Cmd string
+ Dir string
+ HTTPPath string
+ }
+ var stages []stageInfo
+ for _, s := range b.Stages {
+ if s == nil {
+ continue
+ }
+ stages = append(stages, stageInfo{
+ Index: s.Index,
+ Name: s.Name,
+ Cmd: s.Cmd,
+ Dir: s.StringLR(),
+ HTTPPath: s.HTTPPath,
+ })
+ }
+
+ // render pipeline text (like --explain)
+ var pipeR, pipeL bytes.Buffer
+ b.StageDump(1, &pipeR) // DIR_R = 1
+ b.StageDump(2, &pipeL) // DIR_L = 2
+
+ var buf bytes.Buffer
+ fmt.Fprintf(&buf, `
+
+
+
+
+bgpipe %s
+
+
+
+
+
bgpipe dashboard
+
BGP pipeline processor
+
+
+
+
Pipeline
+
`, html.EscapeString(b.Version),
+ html.EscapeString(b.Version),
+ html.EscapeString(uptime.String()),
+ b.StageCount())
+
+ fmt.Fprintf(&buf, "--> Messages flowing right -->\n%s\n<-- Messages flowing left <--\n%s",
+ html.EscapeString(pipeR.String()),
+ html.EscapeString(pipeL.String()))
+
+ fmt.Fprintf(&buf, `
+
+
Stages
+
+ | # | Name | Command | Direction | HTTP |
`)
+
+ for _, s := range stages {
+ httpCol := "-"
+ if s.HTTPPath != "" {
+ httpCol = fmt.Sprintf(`%s/`, s.HTTPPath, s.HTTPPath)
+ }
+ fmt.Fprintf(&buf, "\n | %d | %s | %s | %s | %s |
",
+ s.Index,
+ html.EscapeString(s.Name),
+ html.EscapeString(s.Cmd),
+ html.EscapeString(s.Dir),
+ httpCol)
+ }
+
+ fmt.Fprintf(&buf, `
+
+
+
Links
+
+
+
+`)
+
+ w.Header().Set("Content-Type", "text/html; charset=utf-8")
+ w.Write(buf.Bytes())
+}
diff --git a/core/stage.go b/core/stage.go
index 1ad9a28..05f9054 100644
--- a/core/stage.go
+++ b/core/stage.go
@@ -3,6 +3,7 @@ package core
import (
"context"
"fmt"
+ "regexp"
"strings"
"sync/atomic"
"time"
@@ -10,6 +11,7 @@ import (
"github.com/bgpfix/bgpfix/dir"
"github.com/bgpfix/bgpfix/filter"
"github.com/bgpfix/bgpfix/pipe"
+ "github.com/go-chi/chi/v5"
"github.com/knadh/koanf/v2"
"github.com/rs/zerolog"
"github.com/spf13/pflag"
@@ -39,6 +41,10 @@ type Stage interface {
// or after Run() exits (in order to clean-up).
// It should safely finish all I/O and make Run return if it's still running.
Stop() error
+
+ // RouteHTTP can register optional HTTP routes for this stage under //*.
+ // A stage may leave this empty.
+ RouteHTTP(r chi.Router) error
}
// StageOptions describe high-level settings of a stage
@@ -78,12 +84,13 @@ type StageBase struct {
P *pipe.Pipe // bgpfix pipe
K *koanf.Koanf // integrated config (args / config file / etc)
- Index int // stage index (zero means internal)
- Cmd string // stage command name
- Name string // human-friendly stage name
- Flags []string // consumed flags
- Args []string // consumed args
- Options StageOptions // stage options
+ Index int // stage index (zero means internal)
+ Cmd string // stage command name
+ Name string // human-friendly stage name
+ HTTPPath string // mounted HTTP path (eg. /metrics), if any
+ Flags []string // consumed flags
+ Args []string // consumed args
+ Options StageOptions // stage options
FilterIn []*filter.Filter // message filter for pipe callbacks (input to stage)
FilterOut []*filter.Filter // message filter for pipe inputs (output from stage)
@@ -124,6 +131,11 @@ func (s *StageBase) Stop() error {
return nil
}
+// RouteHTTP is the default Stage implementation that does nothing.
+func (s *StageBase) RouteHTTP(r chi.Router) error {
+ return nil
+}
+
// NewStage returns a new Stage for given parent base. It should modify base.Options.
type NewStage func(base *StageBase) Stage
@@ -222,6 +234,59 @@ func (s *StageBase) String() string {
}
}
+var reMetricUnsafe = regexp.MustCompile(`[^a-z0-9_]+`)
+
+// SanitizeMetricLabel converts a string to a valid Prometheus label [a-z0-9_]
+func SanitizeMetricLabel(s string) string {
+ s = strings.ToLower(s)
+ s = reMetricUnsafe.ReplaceAllString(s, "_")
+ s = strings.Trim(s, "_")
+ if s == "" {
+ s = "unknown"
+ }
+ return s
+}
+
+// MetricPrefix returns a sanitized Prometheus metric prefix for this stage,
+// e.g. "bgpipe_metrics_" or "bgpipe_my_counters_".
+func (s *StageBase) MetricPrefix() string {
+ name := SanitizeMetricLabel(s.Name)
+ if name == "unknown" {
+ name = fmt.Sprintf("stage_%d", s.Index)
+ }
+ return "bgpipe_" + name + "_"
+}
+
+// HTTPSlug returns a URL-safe path component for this stage,
+// e.g. "metrics" or "my-counters".
+func (s *StageBase) HTTPSlug() string {
+ name := strings.TrimPrefix(strings.TrimSpace(s.Name), "@")
+ if name == "" {
+ name = s.Cmd
+ }
+
+ name = strings.Map(func(r rune) rune {
+ switch {
+ case r >= 'a' && r <= 'z':
+ return r
+ case r >= 'A' && r <= 'Z':
+ return r
+ case r >= '0' && r <= '9':
+ return r
+ case r == '-', r == '_', r == '.':
+ return r
+ default:
+ return '-'
+ }
+ }, name)
+
+ name = strings.Trim(name, "-_.")
+ if name == "" {
+ return fmt.Sprintf("stage-%d", s.Index)
+ }
+ return name
+}
+
// StringDir returns eg. "-LR [FIRST]" depending on stage direction
func (s *StageBase) StringLR() string {
var str strings.Builder
diff --git a/docs/stages/index.md b/docs/stages/index.md
index fcb8562..d838656 100644
--- a/docs/stages/index.md
+++ b/docs/stages/index.md
@@ -19,6 +19,7 @@ bgpipe [OPTIONS] [--] STAGE1 [OPTIONS] [ARGS] [--] STAGE2 [OPTIONS] [ARGS] ...
| [head](head.md) | Stop the pipeline after N messages |
| [limit](limit.md) | Enforce prefix length and count limits |
| [listen](listen.md) | Accept an incoming BGP connection over TCP |
+| [metrics](metrics.md) | Count messages matching filters (Prometheus metrics) |
| [pipe](pipe.md) | Exchange messages through a named pipe (FIFO) |
| [read](read.md) | Read messages from a file or URL |
| [ris-live](ris-live.md) | Stream BGP updates from RIPE RIS Live |
diff --git a/docs/stages/metrics.md b/docs/stages/metrics.md
new file mode 100644
index 0000000..dd76d13
--- /dev/null
+++ b/docs/stages/metrics.md
@@ -0,0 +1,136 @@
+# metrics
+
+Count messages matching filter expressions and expose Prometheus metrics.
+
+## Synopsis
+
+```
+bgpipe --http :8080 [...] -- metrics [OPTIONS] [-A] [LABEL: FILTER] ...
+```
+
+## Description
+
+The **metrics** stage counts BGP messages flowing through the pipeline
+and exposes them as Prometheus counters. It always tracks generic counters
+(total messages, by direction, by message type), and optionally evaluates
+user-defined filter expressions with labeled counters.
+
+Requires `--http` to expose metrics via HTTP. The global `/metrics` endpoint
+serves all Prometheus metrics in text format. The stage also mounts a JSON
+summary at `/stage/metrics/`.
+
+### Filter arguments
+
+Each positional argument defines a filter rule in the format `LABEL: FILTER`.
+If no `: ` (colon-space) separator is present, the argument text is used as
+both label and filter expression. Labels are sanitized to `[a-z0-9_]` for
+Prometheus compatibility.
+
+```bash
+bgpipe --http :8080 \
+ -- connect 192.0.2.1 \
+ -- metrics -LR -A \
+ ipv4 \
+ 'v6: ipv6' \
+ 'google: as_origin == 15169' \
+ 'long_path: aspath_len > 5' \
+ -- connect 10.0.0.1
+```
+
+This creates counters like:
+
+```
+bgpipe_metrics_match{filter="ipv4"} 42
+bgpipe_metrics_match{filter="v6"} 18
+bgpipe_metrics_match{filter="google"} 5
+bgpipe_metrics_match{filter="long_path"} 3
+```
+
+### Batch mode
+
+For offline analysis (e.g., measuring an MRT file), use `--output` to write
+the final counter values to a file when the pipeline exits. This captures
+metrics that would otherwise be lost because the pipeline finishes before
+a scraper can read `/metrics`.
+
+Place the `metrics` stage **after** the source stage so it sees the messages:
+
+```bash
+bgpipe \
+ -- read updates.mrt \
+ -- metrics --output results.prom -A ipv4 ipv6
+grep bgpipe results.prom
+```
+
+## Options
+
+| Option | Type | Default | Description |
+|--------|------|---------|-------------|
+| `--output` | string | | Write final metrics to file on exit (Prometheus text format) |
+
+## Generic counters
+
+These are always created (prefix derived from stage name):
+
+| Counter | Description |
+|---------|-------------|
+| `bgpipe_metrics_messages_total` | Total messages seen (all directions/types) |
+| `bgpipe_metrics_messages_total{dir="left",type="update"}` | UPDATE messages flowing left |
+| `bgpipe_metrics_messages_total{dir="right",type="update"}` | UPDATE messages flowing right |
+| `bgpipe_metrics_messages_total{dir="left",type="open"}` | OPEN messages flowing left |
+| `bgpipe_metrics_messages_total{dir="right",type="keepalive"}` | KEEPALIVE messages flowing right |
+| *(all dir × type combinations)* | One series per direction+type pair |
+
+Each `(dir, type)` combination is a separate Prometheus series, enabling PromQL
+aggregations like `sum by (dir)`, `sum by (type)`, or filtering on
+`{dir="right",type="update"}`.
+
+## HTTP endpoints
+
+When `--http` is set:
+
+| Endpoint | Description |
+|----------|-------------|
+| `/` | Web dashboard with pipeline info, stages, uptime, and links |
+| `/metrics` | Global Prometheus metrics (all stages, all Go runtime) |
+| `/hc` | Health check (JSON, k8s-compatible) |
+| `/stage/metrics/` | JSON summary of this stage's counters |
+| `/debug/pprof/` | Go pprof (when `--pprof` is enabled) |
+
+## Examples
+
+Monitor a live BGP session with Prometheus scraping:
+
+```bash
+bgpipe --http :9090 \
+ -- connect 192.0.2.1 \
+ -- metrics -LR -A ipv4 ipv6 'bogon: as_origin > 64512' \
+ -- connect 10.0.0.1
+```
+
+Measure an MRT dump offline and save results:
+
+```bash
+bgpipe \
+ -- read updates.20230301.0000.mrt \
+ -- metrics --output stats.prom -A \
+ ipv4 ipv6 'google: as_origin == 15169' \
+ -- write updates.20230301.json
+```
+
+Named stage with custom metric prefix:
+
+```bash
+bgpipe --http :8080 \
+ -- connect 192.0.2.1 \
+ -- @my_counters metrics -LR -A ipv4 ipv6 \
+ -- connect 10.0.0.1
+# produces: bgpipe_my_counters_messages_total, bgpipe_my_counters_match{filter="ipv4"}, etc.
+```
+
+## See Also
+
+[Message Filters](../filters.md),
+[grep](grep.md),
+[rpki](rpki.md),
+[Stages overview](index.md)
diff --git a/go.mod b/go.mod
index 3406b7e..b8e138f 100644
--- a/go.mod
+++ b/go.mod
@@ -5,18 +5,20 @@ go 1.24.0
toolchain go1.26.0
require (
+ github.com/VictoriaMetrics/metrics v1.41.2
github.com/bgp/stayrtr v0.6.3
github.com/bgpfix/bgpfix v0.18.0
github.com/buger/jsonparser v1.1.1
github.com/dsnet/compress v0.0.2-0.20230904184137-39efe44ab707
+ github.com/go-chi/chi/v5 v5.2.5
github.com/gorilla/websocket v1.5.3
github.com/klauspost/compress v1.18.4
github.com/knadh/koanf/providers/posflag v1.0.1
- github.com/knadh/koanf/v2 v2.3.2
+ github.com/knadh/koanf/v2 v2.3.3
github.com/puzpuzpuz/xsync/v4 v4.4.0
github.com/rs/zerolog v1.34.0
github.com/spf13/pflag v1.0.10
- github.com/twmb/franz-go v1.20.6
+ github.com/twmb/franz-go v1.20.7
github.com/twmb/franz-go/pkg/kadm v1.17.2
github.com/valyala/bytebufferpool v1.0.0
golang.org/x/sys v0.41.0
@@ -31,8 +33,10 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
- github.com/pierrec/lz4/v4 v4.1.25 // indirect
+ github.com/pierrec/lz4/v4 v4.1.26 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.12.0 // indirect
+ github.com/valyala/fastrand v1.1.0 // indirect
+ github.com/valyala/histogram v1.2.0 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/sync v0.19.0 // indirect
)
diff --git a/go.sum b/go.sum
index d367ffb..b932337 100644
--- a/go.sum
+++ b/go.sum
@@ -1,3 +1,5 @@
+github.com/VictoriaMetrics/metrics v1.41.2 h1:pLQ4Mw9TqXFq3ZsZVJkz88JHpjL9LY5NHTY3v2gBNAw=
+github.com/VictoriaMetrics/metrics v1.41.2/go.mod h1:xDM82ULLYCYdFRgQ2JBxi8Uf1+8En1So9YUwlGTOqTc=
github.com/bgp/stayrtr v0.6.3 h1:d3MW7FV7yV9bP46aEZhggCcxK8tgooB5XEwa6dYVXM4=
github.com/bgp/stayrtr v0.6.3/go.mod h1:gLXsFU0j2wTKYxDQqXHl9R34FpD1IH59KHwhgFdI3r8=
github.com/bgpfix/bgpfix v0.18.0 h1:UJVxpeJXyPfo+2p4i/LBxkRtTQKonU9tnpYWcUhJoY0=
@@ -10,6 +12,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/dsnet/compress v0.0.2-0.20230904184137-39efe44ab707 h1:2tV76y6Q9BB+NEBasnqvs7e49aEBFI8ejC89PSnWH+4=
github.com/dsnet/compress v0.0.2-0.20230904184137-39efe44ab707/go.mod h1:qssHWj60/X5sZFNxpG4HBPDHVqxNm4DfnCKgrbZOT+s=
github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdfkVLjJ8T6VcRQv3SXugXy999NBtR9aFY=
+github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
+github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0=
github.com/go-viper/mapstructure/v2 v2.5.0 h1:vM5IJoUAy3d7zRSVtIwQgBj7BiWtMPfmPEgAXnvj1Ro=
github.com/go-viper/mapstructure/v2 v2.5.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
@@ -28,8 +32,8 @@ github.com/knadh/koanf/maps v0.1.2 h1:RBfmAW5CnZT+PJ1CVc1QSJKf4Xu9kxfQgYVQSu8hpb
github.com/knadh/koanf/maps v0.1.2/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI=
github.com/knadh/koanf/providers/posflag v1.0.1 h1:EnMxHSrPkYCFnKgBUl5KBgrjed8gVFrcXDzaW4l/C6Y=
github.com/knadh/koanf/providers/posflag v1.0.1/go.mod h1:3Wn3+YG3f4ljzRyCUgIwH7G0sZ1pMjCOsNBovrbKmAk=
-github.com/knadh/koanf/v2 v2.3.2 h1:Ee6tuzQYFwcZXQpc2MiVeC6qHMandf5SMUJJNoFp/c4=
-github.com/knadh/koanf/v2 v2.3.2/go.mod h1:gRb40VRAbd4iJMYYD5IxZ6hfuopFcXBpc9bbQpZwo28=
+github.com/knadh/koanf/v2 v2.3.3 h1:jLJC8XCRfLC7n4F+ZKKdBsbq1bfXTpuFhf4L7t94D94=
+github.com/knadh/koanf/v2 v2.3.3/go.mod h1:gRb40VRAbd4iJMYYD5IxZ6hfuopFcXBpc9bbQpZwo28=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
@@ -41,8 +45,8 @@ github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa1
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
-github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0=
-github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=
+github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY=
+github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -55,8 +59,8 @@ github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk=
github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
-github.com/twmb/franz-go v1.20.6 h1:TpQTt4QcixJ1cHEmQGPOERvTzo99s8jAutmS7rbSD6w=
-github.com/twmb/franz-go v1.20.6/go.mod h1:u+FzH2sInp7b9HNVv2cZN8AxdXy6y/AQ1Bkptu4c0FM=
+github.com/twmb/franz-go v1.20.7 h1:P4MGSXJjjAPP3NRGPCks/Lrq+j+twWMVl1qYCVgNmWY=
+github.com/twmb/franz-go v1.20.7/go.mod h1:0bRX9HZVaoueqFWhPZNi2ODnJL7DNa6mK0HeCrC2bNU=
github.com/twmb/franz-go/pkg/kadm v1.17.2 h1:g5f1sAxnTkYC6G96pV5u715HWhxd66hWaDZUAQ8xHY8=
github.com/twmb/franz-go/pkg/kadm v1.17.2/go.mod h1:ST55zUB+sUS+0y+GcKY/Tf1XxgVilaFpB9I19UubLmU=
github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc=
@@ -64,6 +68,10 @@ github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6
github.com/ulikunitz/xz v0.5.8/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
+github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
+github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
+github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
+github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
diff --git a/mkdocs.yml b/mkdocs.yml
index d5b2f0f..0087a81 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -12,6 +12,7 @@ nav:
- head: stages/head.md
- limit: stages/limit.md
- listen: stages/listen.md
+ - metrics: stages/metrics.md
- pipe: stages/pipe.md
- read: stages/read.md
- ris-live: stages/ris-live.md
diff --git a/stages/metrics.go b/stages/metrics.go
new file mode 100644
index 0000000..8a37857
--- /dev/null
+++ b/stages/metrics.go
@@ -0,0 +1,167 @@
+package stages
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "os"
+ "strings"
+
+ "github.com/VictoriaMetrics/metrics"
+ "github.com/bgpfix/bgpfix/dir"
+ "github.com/bgpfix/bgpfix/filter"
+ "github.com/bgpfix/bgpfix/msg"
+ "github.com/bgpfix/bgpfix/pipe"
+ "github.com/bgpfix/bgpipe/core"
+ "github.com/go-chi/chi/v5"
+)
+
+// msgKey identifies a (direction, type) pair for per-combination counters
+type msgKey struct {
+ d dir.Dir
+ t msg.Type
+}
+
+type Metrics struct {
+ *core.StageBase
+ eval *filter.Eval
+ rules []metricsRule
+ output string // --output file path
+
+ // generic labeled counters
+ cTotal *metrics.Counter // messages_total (no labels)
+ cDirType map[msgKey]*metrics.Counter // messages_total{dir=...,type=...}
+}
+
+type metricsRule struct {
+ filter *filter.Filter
+ label string
+ counter *metrics.Counter
+}
+
+func NewMetrics(parent *core.StageBase) core.Stage {
+ s := &Metrics{StageBase: parent}
+ s.eval = filter.NewEval(false)
+
+ o := &s.Options
+ o.Descr = "count messages matching filters (Prometheus metrics)"
+ o.Bidir = true
+ o.FilterIn = true
+
+ f := o.Flags
+ f.String("output", "", "write final metrics to file on exit (Prometheus text format)")
+
+ return s
+}
+
+func (s *Metrics) Attach() error {
+ s.output = s.K.String("output")
+ prefix := s.MetricPrefix()
+
+ // total (no labels) + per-(dir,type) labeled counters
+ s.cTotal = metrics.GetOrCreateCounter(prefix + "messages_total")
+
+ dirNames := map[dir.Dir]string{
+ dir.DIR_L: "left",
+ dir.DIR_R: "right",
+ }
+ s.cDirType = make(map[msgKey]*metrics.Counter, len(dirNames)*len(msg.TypeValues()))
+ for d, dname := range dirNames {
+ for _, t := range msg.TypeValues() {
+ tname := strings.ToLower(t.String())
+ s.cDirType[msgKey{d, t}] = metrics.GetOrCreateCounter(
+ fmt.Sprintf(`%smessages_total{dir=%q,type=%q}`, prefix, dname, tname),
+ )
+ }
+ }
+
+ // parse positional args as [LABEL: ]FILTER
+ args := s.K.Strings("args")
+ for _, arg := range args {
+ var label, expr string
+ if l, e, ok := strings.Cut(arg, ": "); ok {
+ label = l
+ expr = e
+ } else {
+ label = arg
+ expr = arg
+ }
+
+ label = core.SanitizeMetricLabel(label)
+
+ f, err := filter.NewFilter(expr)
+ if err != nil {
+ return fmt.Errorf("invalid filter %q: %w", arg, err)
+ }
+
+ counter := metrics.GetOrCreateCounter(
+ fmt.Sprintf(`%smatch{filter=%q}`, prefix, label),
+ )
+
+ s.rules = append(s.rules, metricsRule{
+ filter: f,
+ label: label,
+ counter: counter,
+ })
+ }
+
+ s.P.OnMsg(s.onMsg, s.Dir)
+ return nil
+}
+
+func (s *Metrics) onMsg(m *msg.Msg) bool {
+ s.cTotal.Inc()
+
+ if c := s.cDirType[msgKey{m.Dir, m.Type}]; c != nil {
+ c.Inc()
+ }
+
+ // evaluate filter rules
+ if len(s.rules) > 0 {
+ mx := pipe.UseContext(m)
+ s.eval.Set(m, mx.Pipe.KV, mx.Pipe.Caps, mx.GetTags())
+ for i := range s.rules {
+ if s.eval.Run(s.rules[i].filter) {
+ s.rules[i].counter.Inc()
+ }
+ }
+ }
+
+ return true
+}
+
+func (s *Metrics) Run() error {
+ <-s.Ctx.Done()
+
+ // write final metrics to file?
+ if s.output != "" {
+ f, err := os.Create(s.output)
+ if err != nil {
+ s.Err(err).Str("file", s.output).Msg("could not write final metrics")
+ } else {
+ metrics.WritePrometheus(f, false)
+ f.Close()
+ s.Info().Str("file", s.output).Msg("final metrics written")
+ }
+ }
+
+ return nil
+}
+
+func (s *Metrics) RouteHTTP(r chi.Router) error {
+ r.Get("/", func(w http.ResponseWriter, req *http.Request) {
+ summary := map[string]any{
+ "total": s.cTotal.Get(),
+ }
+ if len(s.rules) > 0 {
+ rules := make(map[string]uint64, len(s.rules))
+ for i := range s.rules {
+ rules[s.rules[i].label] = s.rules[i].counter.Get()
+ }
+ summary["rules"] = rules
+ }
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(summary)
+ })
+ return nil
+}
diff --git a/stages/repo.go b/stages/repo.go
index 1015161..5887457 100644
--- a/stages/repo.go
+++ b/stages/repo.go
@@ -14,6 +14,7 @@ var Repo = map[string]core.NewStage{
"head": NewHead,
"limit": NewLimit,
"listen": NewListen,
+ "metrics": NewMetrics,
"pipe": NewPipe,
"read": NewRead,
"ris-live": NewRisLive,
diff --git a/stages/rpki/rpki.go b/stages/rpki/rpki.go
index e30a891..8e982a9 100644
--- a/stages/rpki/rpki.go
+++ b/stages/rpki/rpki.go
@@ -1,8 +1,10 @@
package rpki
import (
+ "encoding/json"
"fmt"
"net"
+ "net/http"
"net/netip"
"strings"
"sync"
@@ -10,9 +12,11 @@ import (
"time"
rtrlib "github.com/bgp/stayrtr/lib"
+ "github.com/VictoriaMetrics/metrics"
"github.com/bgpfix/bgpfix/msg"
"github.com/bgpfix/bgpfix/pipe"
"github.com/bgpfix/bgpipe/core"
+ "github.com/go-chi/chi/v5"
)
const (
@@ -65,6 +69,12 @@ type Rpki struct {
next4 ROA // next roa4 (pending apply)
next6 ROA // next roa6 (pending apply)
+ // prometheus metrics
+ cMessages *metrics.Counter // bgpipe_rpki_messages_total
+ cValid *metrics.Counter // bgpipe_rpki_valid_total
+ cInvalid *metrics.Counter // bgpipe_rpki_invalid_total
+ cNotFound *metrics.Counter // bgpipe_rpki_not_found_total
+
// file watcher state
file_mod time.Time // last modification time
file_hash [32]byte // last file hash
@@ -116,6 +126,25 @@ func NewRpki(parent *core.StageBase) core.Stage {
func (s *Rpki) Attach() error {
k := s.K
+ // create prometheus counters and gauges
+ prefix := s.MetricPrefix()
+ s.cMessages = metrics.GetOrCreateCounter(prefix + "messages_total")
+ s.cValid = metrics.GetOrCreateCounter(prefix + "valid_total")
+ s.cInvalid = metrics.GetOrCreateCounter(prefix + "invalid_total")
+ s.cNotFound = metrics.GetOrCreateCounter(prefix + "not_found_total")
+ metrics.NewGauge(prefix+"roa4_prefixes", func() float64 {
+ if r4 := s.roa4.Load(); r4 != nil {
+ return float64(len(*r4))
+ }
+ return 0
+ })
+ metrics.NewGauge(prefix+"roa6_prefixes", func() float64 {
+ if r6 := s.roa6.Load(); r6 != nil {
+ return float64(len(*r6))
+ }
+ return 0
+ })
+
// Parse invalid action
switch strings.ToLower(k.String("invalid")) {
case "withdraw":
@@ -183,3 +212,34 @@ func (s *Rpki) Stop() error {
s.rtr_mu.Unlock()
return nil
}
+
+func (s *Rpki) RouteHTTP(r chi.Router) error {
+ r.Get("/", func(w http.ResponseWriter, req *http.Request) {
+ var roa4size, roa6size int
+ if r4 := s.roa4.Load(); r4 != nil {
+ roa4size = len(*r4)
+ }
+ if r6 := s.roa6.Load(); r6 != nil {
+ roa6size = len(*r6)
+ }
+
+ source := "rtr"
+ if s.file != "" {
+ source = "file"
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]any{
+ "source": source,
+ "roa4": roa4size,
+ "roa6": roa6size,
+ "metrics": map[string]uint64{
+ "messages": s.cMessages.Get(),
+ "valid": s.cValid.Get(),
+ "invalid": s.cInvalid.Get(),
+ "not_found": s.cNotFound.Get(),
+ },
+ })
+ })
+ return nil
+}
diff --git a/stages/rpki/validate.go b/stages/rpki/validate.go
index b5d76e5..4b94ad6 100644
--- a/stages/rpki/validate.go
+++ b/stages/rpki/validate.go
@@ -55,6 +55,8 @@ func (s *Rpki) validatePrefix(roa4, roa6 ROA, p netip.Prefix, origin uint32) int
// validateMsg is the callback for UPDATE messages
func (s *Rpki) validateMsg(m *msg.Msg) bool {
+ s.cMessages.Inc()
+
u := &m.Update
tags := pipe.UseTags(m)
@@ -70,6 +72,7 @@ func (s *Rpki) validateMsg(m *msg.Msg) bool {
check_delete := func(p nlri.Prefix) bool {
switch s.validatePrefix(roa4, roa6, p.Prefix, origin) {
case rpki_valid:
+ s.cValid.Inc()
valid = append(valid, p)
if s.tag {
tags["rpki/"+p.String()] = "VALID"
@@ -77,6 +80,7 @@ func (s *Rpki) validateMsg(m *msg.Msg) bool {
return false // keep prefix
case rpki_not_found:
+ s.cNotFound.Inc()
not_found = append(not_found, p)
if s.tag {
tags["rpki/"+p.String()] = "NOT_FOUND"
@@ -84,6 +88,7 @@ func (s *Rpki) validateMsg(m *msg.Msg) bool {
return false // keep prefix
case rpki_invalid:
+ s.cInvalid.Inc()
invalid = append(invalid, p)
return invalid_delete // drop prefix iff requested
}