Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions src/go/cpu/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package cpu
import (
"context"
"fmt"
"io"
"log"
"os"
"strconv"
"strings"

Expand Down Expand Up @@ -37,16 +35,16 @@ type CPU struct {
elasticEndpoint string
elasticIndex string
lokiEndpoint string
metricsEnabled bool
metricsEndpoint string

pusher *msgbus.Pusher

logFile io.Writer
}

func New(name string) *CPU {
return &CPU{
name: name,
logFile: os.Stdout,
name: name,
metricsEndpoint: "127.0.0.1:9100",
}
}

Expand Down Expand Up @@ -117,6 +115,20 @@ func (this *CPU) Configure(e *etree.Element) error {
this.lokiEndpoint = child.Text()
}
}
case "metrics":
enabled, err := strconv.ParseBool(child.SelectAttrValue("enabled", "false"))
if err != nil {
return fmt.Errorf("parsing 'enabled' attribute for metrics: %w", err)
}

this.metricsEnabled = enabled

for _, child := range child.ChildElements() {
switch child.Tag {
case "endpoint":
this.metricsEndpoint = child.Text()
}
}
case "module":
mod := &module{
name: child.SelectAttrValue("name", child.Text()),
Expand Down Expand Up @@ -168,38 +180,45 @@ func (this *CPU) Run(ctx context.Context, pubEndpoint, pullEndpoint string) erro
}

var (
logErrors = make(chan error)
healthErrors = make(chan error)
metricsErrors = make(chan error)
)

var (
logHandlers = []MsgBusHandler{logger}
healthHandlers = []MsgBusHandler{metricsHandler}
runtimeHandlers = []MsgBusHandler{logger}
runtimeErrors = make(chan error)

logHandlers = []MsgBusHandler{logger}
logErrors = make(chan error)

healthHandlers []MsgBusHandler
healthErrors chan error
)

if this.metricsEnabled {
healthHandlers = append(healthHandlers, metricsHandler)
healthErrors = make(chan error)

startMetricsServer(this.metricsEndpoint)
}

go func() {
for {
select {
case <-ctx.Done():
return
case err := <-runtimeErrors:
log.Printf("[CPU] [ERROR] error processing runtime: %v\n", err)
go MonitorMsgBusChannel(ctx, pubEndpoint, "RUNTIME", runtimeHandlers, runtimeErrors)
case err := <-logErrors:
log.Printf("[CPU] [ERROR] processing logs: %v\n", err)
go MonitorMsgBusChannel(ctx, pubEndpoint, "LOG", logHandlers, logErrors)
case err := <-healthErrors:
log.Printf("[CPU] [ERROR] processing health updates: %v\n", err)
log.Printf("[CPU] [ERROR] processing metrics: %v\n", err)
go MonitorMsgBusChannel(ctx, pubEndpoint, "HEALTH", healthHandlers, healthErrors)
case err := <-metricsErrors:
log.Printf("[CPU] [ERROR] error processing metrics: %v\n", err)
go MonitorMsgBusChannel(ctx, pubEndpoint, "RUNTIME", runtimeHandlers, metricsErrors)
}
}
}()

go MonitorMsgBusChannel(ctx, pubEndpoint, "RUNTIME", runtimeHandlers, runtimeErrors)
go MonitorMsgBusChannel(ctx, pubEndpoint, "LOG", logHandlers, logErrors)
go MonitorMsgBusChannel(ctx, pubEndpoint, "HEALTH", healthHandlers, healthErrors)
go MonitorMsgBusChannel(ctx, pubEndpoint, "RUNTIME", runtimeHandlers, metricsErrors)

go MonitorMsgBusChannel(ctx, pubEndpoint, "INTERNAL", []MsgBusHandler{this.internalHandler}, nil)

return nil
Expand Down
84 changes: 84 additions & 0 deletions src/go/cpu/cpu_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cpu

import (
"strings"
"testing"

"github.com/beevik/etree"
)

func parseCPUConfig(t *testing.T, contents string) *etree.Element {
t.Helper()

doc := etree.NewDocument()
if err := doc.ReadFromString(contents); err != nil {
t.Fatalf("failed to parse config: %v", err)
}

return doc.Root()
}

func TestCPUMetricsDisabledByDefault(t *testing.T) {
cpu := New("cpu")

if err := cpu.Configure(parseCPUConfig(t, `<cpu></cpu>`)); err != nil {
t.Fatalf("failed to configure cpu: %v", err)
}

if cpu.metricsEnabled {
t.Fatal("expected metrics to be disabled by default")
}

if cpu.metricsEndpoint != "127.0.0.1:9100" {
t.Fatalf("unexpected metrics endpoint: %s", cpu.metricsEndpoint)
}
}

func TestCPUMetricsCanBeEnabled(t *testing.T) {
cpu := New("cpu")

if err := cpu.Configure(parseCPUConfig(t, `<cpu><metrics enabled="true"></metrics></cpu>`)); err != nil {
t.Fatalf("failed to configure cpu: %v", err)
}

if !cpu.metricsEnabled {
t.Fatal("expected metrics to be enabled")
}
}

func TestCPUMetricsEndpointCanBeConfigured(t *testing.T) {
cpu := New("cpu")

if err := cpu.Configure(parseCPUConfig(t, `<cpu><metrics enabled="true"><endpoint>127.0.0.1:9200</endpoint></metrics></cpu>`)); err != nil {
t.Fatalf("failed to configure cpu: %v", err)
}

if cpu.metricsEndpoint != "127.0.0.1:9200" {
t.Fatalf("unexpected metrics endpoint: %s", cpu.metricsEndpoint)
}
}

func TestCPUMetricsCanBeDisabled(t *testing.T) {
cpu := New("cpu")

if err := cpu.Configure(parseCPUConfig(t, `<cpu><metrics enabled="false"></metrics></cpu>`)); err != nil {
t.Fatalf("failed to configure cpu: %v", err)
}

if cpu.metricsEnabled {
t.Fatal("expected metrics to be disabled")
}
}

func TestCPUMetricsInvalidEnabled(t *testing.T) {
cpu := New("cpu")

err := cpu.Configure(parseCPUConfig(t, `<cpu><metrics enabled="nope"></metrics></cpu>`))
if err == nil {
t.Fatal("expected configure error")
}

if !strings.Contains(err.Error(), "parsing 'enabled' attribute for metrics") {
t.Fatalf("unexpected error: %v", err)
}
}
13 changes: 10 additions & 3 deletions src/go/cpu/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cpu
import (
"errors"
"fmt"
"log"
"net/http"
"regexp"

Expand Down Expand Up @@ -67,10 +68,16 @@ func metricsHandler(topic, msg string) error {
return nil
}

func init() {
func startMetricsServer(endpoint string) {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())

server := http.Server{Addr: ":9100", Handler: mux}
go server.ListenAndServe()
server := http.Server{Addr: endpoint, Handler: mux}
go func() {
log.Printf("[CPU] starting metrics server at %s/metrics\n", server.Addr)

if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Printf("[CPU] [ERROR] starting metrics server: %v\n", err)
}
}()
}
Loading