diff --git a/src/go/cpu/cpu.go b/src/go/cpu/cpu.go index 5f5ff40..22acb91 100644 --- a/src/go/cpu/cpu.go +++ b/src/go/cpu/cpu.go @@ -3,9 +3,7 @@ package cpu import ( "context" "fmt" - "io" "log" - "os" "strconv" "strings" @@ -37,16 +35,16 @@ type CPU struct { elasticEndpoint string elasticIndex string lokiEndpoint string + metricsEnabled bool + metricsEndpoint string pusher *msgbus.Pusher - - logFile io.Writer } func New(name string) *CPU { return &CPU{ - name: name, - logFile: os.Stdout, + name: name, + metricsEndpoint: "127.0.0.1:9100", } } @@ -117,6 +115,20 @@ func (this *CPU) Configure(e *etree.Element) error { this.lokiEndpoint = child.Text() } } + case "metrics": + enabled, err := strconv.ParseBool(child.SelectAttrValue("enabled", "false")) + if err != nil { + return fmt.Errorf("parsing 'enabled' attribute for metrics: %w", err) + } + + this.metricsEnabled = enabled + + for _, child := range child.ChildElements() { + switch child.Tag { + case "endpoint": + this.metricsEndpoint = child.Text() + } + } case "module": mod := &module{ name: child.SelectAttrValue("name", child.Text()), @@ -168,38 +180,45 @@ func (this *CPU) Run(ctx context.Context, pubEndpoint, pullEndpoint string) erro } var ( - logErrors = make(chan error) - healthErrors = make(chan error) - metricsErrors = make(chan error) - ) - - var ( - logHandlers = []MsgBusHandler{logger} - healthHandlers = []MsgBusHandler{metricsHandler} runtimeHandlers = []MsgBusHandler{logger} + runtimeErrors = make(chan error) + + logHandlers = []MsgBusHandler{logger} + logErrors = make(chan error) + + healthHandlers []MsgBusHandler + healthErrors chan error ) + if this.metricsEnabled { + healthHandlers = append(healthHandlers, metricsHandler) + healthErrors = make(chan error) + + startMetricsServer(this.metricsEndpoint) + } + go func() { for { select { case <-ctx.Done(): return + case err := <-runtimeErrors: + log.Printf("[CPU] [ERROR] error processing runtime: %v\n", err) + go MonitorMsgBusChannel(ctx, pubEndpoint, "RUNTIME", runtimeHandlers, runtimeErrors) case err := <-logErrors: log.Printf("[CPU] [ERROR] processing logs: %v\n", err) go MonitorMsgBusChannel(ctx, pubEndpoint, "LOG", logHandlers, logErrors) case err := <-healthErrors: - log.Printf("[CPU] [ERROR] processing health updates: %v\n", err) + log.Printf("[CPU] [ERROR] processing metrics: %v\n", err) go MonitorMsgBusChannel(ctx, pubEndpoint, "HEALTH", healthHandlers, healthErrors) - case err := <-metricsErrors: - log.Printf("[CPU] [ERROR] error processing metrics: %v\n", err) - go MonitorMsgBusChannel(ctx, pubEndpoint, "RUNTIME", runtimeHandlers, metricsErrors) } } }() + go MonitorMsgBusChannel(ctx, pubEndpoint, "RUNTIME", runtimeHandlers, runtimeErrors) go MonitorMsgBusChannel(ctx, pubEndpoint, "LOG", logHandlers, logErrors) go MonitorMsgBusChannel(ctx, pubEndpoint, "HEALTH", healthHandlers, healthErrors) - go MonitorMsgBusChannel(ctx, pubEndpoint, "RUNTIME", runtimeHandlers, metricsErrors) + go MonitorMsgBusChannel(ctx, pubEndpoint, "INTERNAL", []MsgBusHandler{this.internalHandler}, nil) return nil diff --git a/src/go/cpu/cpu_test.go b/src/go/cpu/cpu_test.go new file mode 100644 index 0000000..9f5f554 --- /dev/null +++ b/src/go/cpu/cpu_test.go @@ -0,0 +1,84 @@ +package cpu + +import ( + "strings" + "testing" + + "github.com/beevik/etree" +) + +func parseCPUConfig(t *testing.T, contents string) *etree.Element { + t.Helper() + + doc := etree.NewDocument() + if err := doc.ReadFromString(contents); err != nil { + t.Fatalf("failed to parse config: %v", err) + } + + return doc.Root() +} + +func TestCPUMetricsDisabledByDefault(t *testing.T) { + cpu := New("cpu") + + if err := cpu.Configure(parseCPUConfig(t, ``)); err != nil { + t.Fatalf("failed to configure cpu: %v", err) + } + + if cpu.metricsEnabled { + t.Fatal("expected metrics to be disabled by default") + } + + if cpu.metricsEndpoint != "127.0.0.1:9100" { + t.Fatalf("unexpected metrics endpoint: %s", cpu.metricsEndpoint) + } +} + +func TestCPUMetricsCanBeEnabled(t *testing.T) { + cpu := New("cpu") + + if err := cpu.Configure(parseCPUConfig(t, ``)); err != nil { + t.Fatalf("failed to configure cpu: %v", err) + } + + if !cpu.metricsEnabled { + t.Fatal("expected metrics to be enabled") + } +} + +func TestCPUMetricsEndpointCanBeConfigured(t *testing.T) { + cpu := New("cpu") + + if err := cpu.Configure(parseCPUConfig(t, `127.0.0.1:9200`)); err != nil { + t.Fatalf("failed to configure cpu: %v", err) + } + + if cpu.metricsEndpoint != "127.0.0.1:9200" { + t.Fatalf("unexpected metrics endpoint: %s", cpu.metricsEndpoint) + } +} + +func TestCPUMetricsCanBeDisabled(t *testing.T) { + cpu := New("cpu") + + if err := cpu.Configure(parseCPUConfig(t, ``)); err != nil { + t.Fatalf("failed to configure cpu: %v", err) + } + + if cpu.metricsEnabled { + t.Fatal("expected metrics to be disabled") + } +} + +func TestCPUMetricsInvalidEnabled(t *testing.T) { + cpu := New("cpu") + + err := cpu.Configure(parseCPUConfig(t, ``)) + if err == nil { + t.Fatal("expected configure error") + } + + if !strings.Contains(err.Error(), "parsing 'enabled' attribute for metrics") { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/src/go/cpu/metrics.go b/src/go/cpu/metrics.go index 2ca8024..7b56355 100644 --- a/src/go/cpu/metrics.go +++ b/src/go/cpu/metrics.go @@ -3,6 +3,7 @@ package cpu import ( "errors" "fmt" + "log" "net/http" "regexp" @@ -67,10 +68,16 @@ func metricsHandler(topic, msg string) error { return nil } -func init() { +func startMetricsServer(endpoint string) { mux := http.NewServeMux() mux.Handle("/metrics", promhttp.Handler()) - server := http.Server{Addr: ":9100", Handler: mux} - go server.ListenAndServe() + server := http.Server{Addr: endpoint, Handler: mux} + go func() { + log.Printf("[CPU] starting metrics server at %s/metrics\n", server.Addr) + + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Printf("[CPU] [ERROR] starting metrics server: %v\n", err) + } + }() }