Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/handler/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/netobserv/network-observability-console-plugin/pkg/config"
"github.com/netobserv/network-observability-console-plugin/pkg/handler/apierrors"
"github.com/netobserv/network-observability-console-plugin/pkg/httpclient"
"github.com/netobserv/network-observability-console-plugin/pkg/loki"
"github.com/netobserv/network-observability-console-plugin/pkg/merger"
"github.com/netobserv/network-observability-console-plugin/pkg/model"
"github.com/netobserv/network-observability-console-plugin/pkg/prometheus"
"github.com/prometheus/client_golang/api"
Expand Down Expand Up @@ -53,7 +53,7 @@ func newLokiClients(cfg *config.Config, requestHeader http.Header, useLokiStatus
return clients{loki: lokiClient}
}

func (c *clients) fetchLokiSingle(logQL string, merger loki.Merger) (int, apierrors.StructuredError) {
func (c *clients) fetchLokiSingle(logQL string, merger merger.Merger) (int, apierrors.StructuredError) {
qr, code, err := fetchLogQL(logQL, c.loki)
if err != nil {
return code, err
Expand All @@ -71,7 +71,7 @@ func (c *clients) getPromClient(isDev bool) api.Client {
return c.promAdmin
}

func (c *clients) fetchPrometheusSingle(ctx context.Context, promQL *prometheus.Query, merger loki.Merger, client api.Client) (int, apierrors.StructuredError) {
func (c *clients) fetchPrometheusSingle(ctx context.Context, promQL *prometheus.Query, merger merger.Merger, client api.Client) (int, apierrors.StructuredError) {
qr, code, err := prometheus.QueryMatrix(ctx, client, promQL)
if err != nil {
return code, apierrors.NewPromClientError(err)
Expand All @@ -82,7 +82,7 @@ func (c *clients) fetchPrometheusSingle(ctx context.Context, promQL *prometheus.
return code, nil
}

func (c *clients) fetchSingle(ctx context.Context, logQL string, promQL *prometheus.Query, merger loki.Merger, isDev bool) (int, apierrors.StructuredError) {
func (c *clients) fetchSingle(ctx context.Context, logQL string, promQL *prometheus.Query, merger merger.Merger, isDev bool) (int, apierrors.StructuredError) {
if promQL != nil {
client := c.getPromClient(isDev)
if client == nil {
Expand All @@ -96,7 +96,7 @@ func (c *clients) fetchSingle(ctx context.Context, logQL string, promQL *prometh
return c.fetchLokiSingle(logQL, merger)
}

func (c *clients) fetchParallel(ctx context.Context, logQL []string, promQL []*prometheus.Query, merger loki.Merger, isDev bool) (int, apierrors.StructuredError) {
func (c *clients) fetchParallel(ctx context.Context, logQL []string, promQL []*prometheus.Query, merger merger.Merger, isDev bool) (int, apierrors.StructuredError) {
type errorWithCode struct {
err apierrors.StructuredError
code int
Expand Down
18 changes: 10 additions & 8 deletions pkg/handler/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"github.com/netobserv/network-observability-console-plugin/pkg/config"
"github.com/netobserv/network-observability-console-plugin/pkg/handler/apierrors"
"github.com/netobserv/network-observability-console-plugin/pkg/loki"
"github.com/netobserv/network-observability-console-plugin/pkg/merger"
"github.com/netobserv/network-observability-console-plugin/pkg/metrics"
"github.com/netobserv/network-observability-console-plugin/pkg/model"
"github.com/netobserv/network-observability-console-plugin/pkg/model/fields"
"github.com/netobserv/network-observability-console-plugin/pkg/model/filters"
"github.com/netobserv/network-observability-console-plugin/pkg/prometheus"
"github.com/netobserv/network-observability-console-plugin/pkg/utils/constants"
"github.com/netobserv/network-observability-console-plugin/pkg/utils/queryparams"

v1 "github.com/prometheus/client_golang/api/prometheus/v1"
)
Expand Down Expand Up @@ -82,8 +84,8 @@ func (h *Handlers) GetTopology(ctx context.Context) func(w http.ResponseWriter,
}
}

func (h *Handlers) extractTopologyQueryParams(params url.Values, ds constants.DataSource) (*loki.TopologyInput, filters.MultiQueries, v1.Range, int, error) {
in := loki.TopologyInput{DataSource: ds}
func (h *Handlers) extractTopologyQueryParams(params url.Values, ds constants.DataSource) (*queryparams.TopologyInput, filters.MultiQueries, v1.Range, int, error) {
in := queryparams.TopologyInput{DataSource: ds}
qr := v1.Range{}
var reqLimit int
var err error
Expand Down Expand Up @@ -161,7 +163,7 @@ func (h *Handlers) getTopologyFlows(ctx context.Context, cl clients, params url.
return nil, http.StatusBadRequest, err
}
isDev := params.Get(namespaceKey) != ""
merger := loki.NewMatrixMerger(reqLimit)
mm := merger.NewMatrixMerger(reqLimit)
if len(filterGroups) > 1 {
// match any, and multiple filters => run in parallel then aggregate
var lokiQ []string
Expand All @@ -179,7 +181,7 @@ func (h *Handlers) getTopologyFlows(ctx context.Context, cl clients, params url.
dataSources[constants.DataSourceLoki] = true
}
}
code, err := cl.fetchParallel(ctx, lokiQ, promQ, merger, isDev)
code, err := cl.fetchParallel(ctx, lokiQ, promQ, mm, isDev)
if err != nil {
return nil, code, err
}
Expand All @@ -199,13 +201,13 @@ func (h *Handlers) getTopologyFlows(ctx context.Context, cl clients, params url.
if promQ != nil {
dataSources[constants.DataSourceProm] = true
}
code, err = cl.fetchSingle(ctx, lokiQ, promQ, merger, isDev)
code, err = cl.fetchSingle(ctx, lokiQ, promQ, mm, isDev)
if err != nil {
return nil, code, err
}
}

qresp := merger.Get()
qresp := mm.Get()
qresp.Stats.DataSources = []constants.DataSource{}
for str, ok := range dataSources {
if ok {
Expand Down Expand Up @@ -272,7 +274,7 @@ func buildTopologyQuery(
cfg *config.Config,
promInventory *prometheus.Inventory,
filters filters.SingleQuery,
in *loki.TopologyInput,
in *queryparams.TopologyInput,
qr *v1.Range,
isDev bool,
) (string, *prometheus.Query, int, error) {
Expand Down Expand Up @@ -310,7 +312,7 @@ func buildTopologyQuery(
return EncodeQuery(qb.Build()), nil, http.StatusOK, nil
}

func getEligiblePromMetric(kl map[string][]string, promInventory *prometheus.Inventory, filters filters.SingleQuery, in *loki.TopologyInput, isDev bool) (*prometheus.SearchResult, string) {
func getEligiblePromMetric(kl map[string][]string, promInventory *prometheus.Inventory, filters filters.SingleQuery, in *queryparams.TopologyInput, isDev bool) (*prometheus.SearchResult, string) {
if in.DataSource != constants.DataSourceAuto && in.DataSource != constants.DataSourceProm {
return nil, ""
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/loki/merger.go

This file was deleted.

1 change: 0 additions & 1 deletion pkg/loki/streams_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
)

type StreamMerger struct {
Merger
index map[string]indexedStream
merged model.Streams
stats []interface{}
Expand Down
74 changes: 17 additions & 57 deletions pkg/loki/topology_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,22 @@ import (

"github.com/netobserv/network-observability-console-plugin/pkg/config"
"github.com/netobserv/network-observability-console-plugin/pkg/utils/constants"
"github.com/netobserv/network-observability-console-plugin/pkg/utils/queryparams"
)

const (
topologyDefaultLimit = "100"
)

type TopologyInput struct {
Start string
End string
Top string
RateInterval string
Step string
DataField string
MetricFunction constants.MetricFunction
RecordType constants.RecordType
DataSource constants.DataSource
PacketLoss constants.PacketLoss
Aggregate string
Groups string
}

func (in *TopologyInput) GetActualDataField() string {
switch in.DataField {
case constants.MetricTypeFlows, constants.MetricTypeDNSFlows, constants.MetricTypeTLSFlows:
return ""
default:
return in.DataField
}
}

// TopologyQueryBuilder builds LogQL metric queries for topology views.
type TopologyQueryBuilder struct {
*FlowQueryBuilder
topology *TopologyInput
topology *queryparams.TopologyInput
aggregateKeyLabels map[string][]string
}

func NewTopologyQuery(cfg *config.Loki, kl map[string][]string, in *TopologyInput) (*TopologyQueryBuilder, error) {
// NewTopologyQuery creates a new TopologyQueryBuilder for the given Loki config and topology input.
func NewTopologyQuery(cfg *config.Loki, kl map[string][]string, in *queryparams.TopologyInput) (*TopologyQueryBuilder, error) {
var rt constants.RecordType
if slices.Contains(constants.AnyConnectionType, string(in.RecordType)) {
rt = "endConnection"
Expand All @@ -59,36 +38,7 @@ func NewTopologyQuery(cfg *config.Loki, kl map[string][]string, in *TopologyInpu
}, nil
}

func GetLabelsAndFilter(kl map[string][]string, aggregate, groups string) ([]string, string) {
var fields []string
var filter string
if fields = kl[aggregate]; fields == nil {
fields = []string{aggregate}
filter = aggregate
}
if groups != "" {
for gr, labels := range kl {
if strings.Contains(groups, gr) {
for _, label := range labels {
if !slices.Contains(fields, label) {
fields = append(fields, label)
}
}
}
}
}
return fields, filter
}

func getFactor(metricType string) string {
switch metricType {
case constants.MetricTypeFlowRTT:
return "/1000000" // nanoseconds to milliseconds
default:
return ""
}
}

// GetFunctionWithQuantile maps a MetricFunction to its LogQL function name and optional quantile value.
func GetFunctionWithQuantile(metricFunction constants.MetricFunction) (string, string) {
switch metricFunction {
case constants.MetricFunctionCount:
Expand All @@ -112,13 +62,23 @@ func GetFunctionWithQuantile(metricFunction constants.MetricFunction) (string, s
}
}

func getFactor(metricType string) string {
switch metricType {
case constants.MetricTypeFlowRTT:
return "/1000000" // nanoseconds to milliseconds
default:
return ""
}
}

// Build constructs the full LogQL topology query URL string.
func (q *TopologyQueryBuilder) Build() string {
top := q.topology.Top
if top == "" {
top = topologyDefaultLimit
}

labels, extraFilter := GetLabelsAndFilter(q.aggregateKeyLabels, q.topology.Aggregate, q.topology.Groups)
labels, extraFilter := queryparams.GetLabelsAndFilter(q.aggregateKeyLabels, q.topology.Aggregate, q.topology.Groups)
if q.config.IsLabel(extraFilter) {
extraFilter = ""
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/loki/topology_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/netobserv/network-observability-console-plugin/pkg/config"
"github.com/netobserv/network-observability-console-plugin/pkg/utils/constants"
"github.com/netobserv/network-observability-console-plugin/pkg/utils/queryparams"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -29,7 +30,7 @@ var aggregateKeyLabels = map[string][]string{
}

func TestBuildTopologyQuery_SimpleAggregate(t *testing.T) {
in := TopologyInput{
in := queryparams.TopologyInput{
Start: "(start)",
End: "",
Top: "50",
Expand All @@ -53,7 +54,7 @@ func TestBuildTopologyQuery_SimpleAggregate(t *testing.T) {
}

func TestBuildTopologyQuery_GroupsAndAggregate(t *testing.T) {
in := TopologyInput{
in := queryparams.TopologyInput{
Start: "(start)",
End: "",
Top: "50",
Expand All @@ -78,7 +79,7 @@ func TestBuildTopologyQuery_GroupsAndAggregate(t *testing.T) {
}

func TestBuildTopologyQuery_CustomAggregate(t *testing.T) {
in := TopologyInput{
in := queryparams.TopologyInput{
Start: "(start)",
End: "",
Top: "50",
Expand All @@ -102,7 +103,7 @@ func TestBuildTopologyQuery_CustomAggregate(t *testing.T) {
}

func TestBuildTopologyQuery_CustomLabelAggregate(t *testing.T) {
in := TopologyInput{
in := queryparams.TopologyInput{
Start: "(start)",
End: "",
Top: "50",
Expand All @@ -126,7 +127,7 @@ func TestBuildTopologyQuery_CustomLabelAggregate(t *testing.T) {
}

func TestBuildTopologyQuery_TlsFlowsOwnerPlusTlsVersionAggregate(t *testing.T) {
in := TopologyInput{
in := queryparams.TopologyInput{
Start: "(start)",
End: "",
Top: "50",
Expand Down
10 changes: 6 additions & 4 deletions pkg/loki/matrix_merger.go → pkg/merger/matrix_merger.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package loki
package merger

import (
"fmt"
Expand All @@ -9,9 +9,8 @@ import (
"github.com/netobserv/network-observability-console-plugin/pkg/model"
)

// MatrixMerger stores a state to build unique Matrix from multiple ones
// MatrixMerger aggregates multiple Matrix results from Loki or Prometheus queries.
type MatrixMerger struct {
Merger
index map[string]indexedSampleStream
merged model.Matrix
stats []interface{}
Expand All @@ -20,6 +19,7 @@ type MatrixMerger struct {
limitReached bool
}

// NewMatrixMerger creates a new MatrixMerger with the given per-query result limit.
func NewMatrixMerger(reqLimit int) *MatrixMerger {
return &MatrixMerger{
reqLimit: reqLimit,
Expand All @@ -35,10 +35,11 @@ type indexedSampleStream struct {
index int
}

// Add merges a new QueryResponseData (Matrix result) into the aggregated result.
func (m *MatrixMerger) Add(from model.QueryResponseData) (model.ResultValue, error) {
matrix, ok := from.Result.(model.Matrix)
if !ok {
return nil, fmt.Errorf("loki returned an unexpected type for MatrixMerger: %T", from)
return nil, fmt.Errorf("unexpected result type for MatrixMerger: %T", from)
}

m.numQueries++
Expand Down Expand Up @@ -75,6 +76,7 @@ func (m *MatrixMerger) Add(from model.QueryResponseData) (model.ResultValue, err
return m.merged, nil
}

// Get returns the final aggregated AggregatedQueryResponse.
func (m *MatrixMerger) Get() *model.AggregatedQueryResponse {
for idx, stream := range m.merged {
skey := stream.Metric.String()
Expand Down
Loading
Loading