Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f317d5c
flp-informers and flp-processors
leandroberetta Mar 26, 2026
f71df0c
fix tls cert names issue
leandroberetta Mar 30, 2026
b574512
removed deployment adn replicaset from event handlers
leandroberetta Apr 13, 2026
fd2805f
feedback addressed
leandroberetta Apr 15, 2026
d037e13
fix linter
leandroberetta Apr 15, 2026
25f4697
informers metrics
leandroberetta Apr 24, 2026
a55a1f2
fixes
leandroberetta Apr 24, 2026
7a8cd62
addressing feedback
leandroberetta May 7, 2026
65cb1e1
fix: skip sending resources without IPs to prevent lookup failures
leandroberetta May 27, 2026
f989d8c
fix: skip sending resources without IPs to prevent lookup failures
leandroberetta May 27, 2026
1292365
fix: remove debugging lines
leandroberetta May 27, 2026
21762bc
fix: send initial snapshot for fresh processors
leandroberetta May 29, 2026
bc786d4
fix: set min tls version
leandroberetta May 29, 2026
d939e23
fix: discovery failure errors not surfaced
leandroberetta May 29, 2026
94574dc
fix: add stop method to informers
leandroberetta May 29, 2026
4b49e0e
fix: graceful stop
leandroberetta May 29, 2026
a6bbc71
fix: prometheus typo
leandroberetta May 29, 2026
b9fbc6d
fix: warn users about no mTLS
leandroberetta May 29, 2026
b587a1e
fix: ipv6 and misleading processor id in client
leandroberetta May 29, 2026
9c59736
fix: unexport internal functions
leandroberetta May 29, 2026
4a88181
fix: non reachable return nil removed
leandroberetta May 29, 2026
4065711
fix: force stream teardown on send error path too
leandroberetta May 29, 2026
7dc4eda
fix: skip nil entries
leandroberetta May 29, 2026
7062540
fix: first match wins contract
leandroberetta May 29, 2026
ea63246
fix: proto generated in the wrong path
leandroberetta May 29, 2026
33c473d
test: add tests for first match wins and nil entries
leandroberetta May 29, 2026
f095747
don't bootstrap central informers with an empty Kubernetes config
leandroberetta May 29, 2026
198d577
fix: set TLS MinVersion to prevent downgrade attacks
leandroberetta May 29, 2026
ef21fb3
fix: make these integration tests assert cache state
leandroberetta May 29, 2026
0353125
resync on reconnect
leandroberetta Jun 9, 2026
c455058
Apply suggestions from code review
leandroberetta Jun 17, 2026
8b053e7
Update pkg/pipeline/transform/kubernetes/k8scache/server_test.go
leandroberetta Jun 18, 2026
6c9166e
Update pkg/pipeline/transform/kubernetes/k8scache/server_test.go
leandroberetta Jun 18, 2026
8acf41e
fix code
leandroberetta Jun 18, 2026
2abed7f
fix: resolve rebase conflicts in config.go
leandroberetta Jun 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .mk/development.mk
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ deploy-netflow-simulator: ## Deploy netflow simulator
undeploy-netflow-simulator: ## Undeploy netflow simulator
kubectl --ignore-not-found=true delete -f contrib/kubernetes/deployment-netflow-simulator.yaml || true

.PHONY: deploy-flp-informers
deploy-flp-informers: ## Deploy flp-informers (centralized K8s cache pusher)
sed 's|%IMAGE_TAG_BASE%|$(IMAGE_TAG_BASE)|g;s|%VERSION%|$(VERSION)|g;s|%NAMESPACE%|$(NAMESPACE)|g' contrib/kubernetes/deployment-flp-informers.yaml > /tmp/deployment-flp-informers.yaml
kubectl apply -f /tmp/deployment-flp-informers.yaml -n $(NAMESPACE)
kubectl rollout status "deploy/flp-informers" --timeout=600s -n $(NAMESPACE)

.PHONY: undeploy-flp-informers
undeploy-flp-informers: ## Undeploy flp-informers
sed 's|%IMAGE_TAG_BASE%|$(IMAGE_TAG_BASE)|g;s|%VERSION%|$(VERSION)|g;s|%NAMESPACE%|$(NAMESPACE)|g' contrib/kubernetes/deployment-flp-informers.yaml > /tmp/deployment-flp-informers.yaml
kubectl --ignore-not-found=true delete -f /tmp/deployment-flp-informers.yaml -n $(NAMESPACE) || true

##@ kind

.PHONY: create-kind-cluster
Expand Down Expand Up @@ -121,6 +132,32 @@ local-cleanup: prereqs-kind local-deployments-cleanup delete-kind-cluster ## Und
.PHONY: local-redeploy
local-redeploy: local-deployments-cleanup local-deployments-deploy ## Redeploy locally (on current kind)

.PHONY: local-deployments-deploy-k8scache
local-deployments-deploy-k8scache: prereqs-kind deploy-prometheus deploy-loki deploy-grafana build-image kind-load-image deploy-k8scache deploy-flp-informers deploy-netflow-simulator
kubectl get pods -n $(NAMESPACE)
kubectl rollout status -w deployment/flowlogs-pipeline -n $(NAMESPACE)
kubectl rollout status -w deployment/flp-informers -n $(NAMESPACE)
kubectl logs -l app=flowlogs-pipeline -n $(NAMESPACE)

.PHONY: deploy-k8scache
deploy-k8scache: ## Deploy FLP with k8scache server enabled
sed 's|%IMAGE_TAG_BASE%|$(IMAGE_TAG_BASE)|g;s|%VERSION%|$(VERSION)|g;s|%NAMESPACE%|$(NAMESPACE)|g' contrib/kubernetes/deployment-k8scache.yaml > /tmp/deployment-k8scache.yaml
kubectl create configmap flowlogs-pipeline-configuration --from-file=flowlogs-pipeline.conf.yaml=$(FLP_CONF_FILE) -n $(NAMESPACE)
kubectl apply -f /tmp/deployment-k8scache.yaml -n $(NAMESPACE)
kubectl rollout status "deploy/flowlogs-pipeline" --timeout=600s -n $(NAMESPACE)

.PHONY: local-deploy-k8scache
local-deploy-k8scache: prereqs-kind local-cleanup-k8scache create-kind-cluster local-deployments-deploy-k8scache ## Deploy locally on kind with k8scache and flp-informers

.PHONY: local-deployments-cleanup-k8scache
local-deployments-cleanup-k8scache: prereqs-kind undeploy-netflow-simulator undeploy undeploy-flp-informers undeploy-grafana undeploy-loki undeploy-prometheus

.PHONY: local-cleanup-k8scache
local-cleanup-k8scache: prereqs-kind local-deployments-cleanup-k8scache delete-kind-cluster ## Undeploy k8scache setup from local kind

.PHONY: local-redeploy-k8scache
local-redeploy-k8scache: local-deployments-cleanup-k8scache local-deployments-deploy-k8scache ## Redeploy locally with k8scache (on current kind)

.PHONY: ocp-deploy
ocp-deploy: ocp-cleanup deploy-prometheus deploy-loki deploy-grafana deploy ## Deploy to OCP
flowlogs_pipeline_svc_ip=$$(kubectl get svc flowlogs-pipeline -o jsonpath='{.spec.clusterIP}'); \
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ IMAGE_TAG_BASE ?= $(IMAGE_REGISTRY)/$(IMAGE_ORG)/flowlogs-pipeline
# Image URL to use all building/pushing image targets
IMAGE ?= $(IMAGE_TAG_BASE):$(VERSION)

# Kubernetes namespace for deployments - defaults to current context namespace or "default"
NAMESPACE ?= $(shell ns=$$(kubectl config view --minify --output 'jsonpath={..namespace}' 2>/dev/null); echo "$${ns:-default}")

# Image building tool (docker / podman) - docker is preferred in CI
OCI_BIN_PATH = $(shell which docker 2>/dev/null || which podman)
OCI_BIN ?= $(shell basename ${OCI_BIN_PATH})
Expand Down
26 changes: 19 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,20 @@ Usage:
flowlogs-pipeline [flags]

Flags:
--config string config file (default is $HOME/.flowlogs-pipeline)
--dynamicParameters string json of configmap location for dynamic parameters
--config string config file (default is $HOME/.flowlogs-pipeline)
--dynamicParameters string json of configmap location for dynamic parameters
--healthAddr string Health server address such as ':8080' (default: disabled)
-h, --help help for flowlogs-pipeline
--log-level string Log level: debug, info, warning, error (default "error")
--metricsSettings string json for global metrics settings
--parameters string json of config file parameters field
--pipeline string json of config file pipeline field
-h, --help help for flowlogs-pipeline
--k8scache.address string K8s cache sync server address (default "0.0.0.0")
--k8scache.port int K8s cache sync server port (default: disabled)
--k8scache.tls-ca-path string Path to TLS CA certificate for client verification
--k8scache.tls-cert-path string Path to TLS server certificate
--k8scache.tls-enabled Enable TLS for K8s cache sync server
--k8scache.tls-key-path string Path to TLS server private key
--log-level string Log level: debug, info, warning, error (default "error")
--metricsSettings string json for global metrics settings
--parameters string json of config file parameters field
--pipeline string json of config file pipeline field
--pprofAddr string Go pprof address such as '127.0.0.1:6060', used for profiling (default: disabled). Do not expose publicly.
```
<!---END-AUTO-flowlogs-pipeline_help--->
Expand Down Expand Up @@ -974,6 +980,8 @@ kubernetes
undeploy-grafana Undeploy grafana
deploy-netflow-simulator Deploy netflow simulator
undeploy-netflow-simulator Undeploy netflow simulator
deploy-flp-informers Deploy flp-informers (centralized K8s cache pusher)
undeploy-flp-informers Undeploy flp-informers

kind
create-kind-cluster Create cluster
Expand All @@ -987,6 +995,10 @@ End2End
local-deploy Deploy locally on kind (with simulated flowlogs)
local-cleanup Undeploy from local kind
local-redeploy Redeploy locally (on current kind)
deploy-k8scache Deploy FLP with k8scache server enabled
local-deploy-k8scache Deploy locally on kind with k8scache and flp-informers
local-cleanup-k8scache Undeploy k8scache setup from local kind
local-redeploy-k8scache Redeploy locally with k8scache (on current kind)
ocp-deploy Deploy to OCP
ocp-cleanup Undeploy from OCP
dev-local-deploy Deploy locally with simulated netflows
Expand Down
157 changes: 157 additions & 0 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
Expand All @@ -33,13 +36,19 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/datasource"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/k8scache"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
"github.com/netobserv/flowlogs-pipeline/pkg/server"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)

var (
Expand Down Expand Up @@ -145,6 +154,12 @@ func initFlags() {
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error")
rootCmd.PersistentFlags().StringVar(&opts.HealthAddr, "healthAddr", "", "Health server address such as ':8080' (default: disabled)")
rootCmd.PersistentFlags().StringVar(&opts.PprofAddr, "pprofAddr", "", "Go pprof address such as '127.0.0.1:6060', used for profiling (default: disabled). Do not expose publicly.")
rootCmd.PersistentFlags().StringVar(&opts.K8sCacheServer.Address, "k8scache.address", "0.0.0.0", "K8s cache sync server address")
rootCmd.PersistentFlags().IntVar(&opts.K8sCacheServer.Port, "k8scache.port", 0, "K8s cache sync server port (default: disabled)")
rootCmd.PersistentFlags().BoolVar(&opts.K8sCacheServer.TLSEnabled, "k8scache.tls-enabled", false, "Enable TLS for K8s cache sync server")
rootCmd.PersistentFlags().StringVar(&opts.K8sCacheServer.TLSCertPath, "k8scache.tls-cert-path", "", "Path to TLS server certificate")
rootCmd.PersistentFlags().StringVar(&opts.K8sCacheServer.TLSKeyPath, "k8scache.tls-key-path", "", "Path to TLS server private key")
rootCmd.PersistentFlags().StringVar(&opts.K8sCacheServer.TLSCAPath, "k8scache.tls-ca-path", "", "Path to TLS CA certificate for client verification")
rootCmd.PersistentFlags().StringVar(&opts.PipeLine, "pipeline", "", "json of config file pipeline field")
rootCmd.PersistentFlags().StringVar(&opts.Parameters, "parameters", "", "json of config file parameters field")
rootCmd.PersistentFlags().StringVar(&opts.DynamicParameters, "dynamicParameters", "", "json of configmap location for dynamic parameters")
Expand Down Expand Up @@ -183,6 +198,11 @@ func run() {
utils.SetupElegantExit()
promServer := prometheus.InitializePrometheus(&cfg.MetricsSettings)

// Enable k8scache mode if configured (disables local informers to save resources)
if opts.K8sCacheServer.Port > 0 {
kubernetes.SetK8sCacheEnabled(true)
}

// Create new flows pipeline
mainPipeline, err = pipeline.NewPipeline(&cfg)
if err != nil {
Expand All @@ -205,6 +225,12 @@ func run() {
healthServer = operational.NewHealthServer(&opts, mainPipeline.IsAlive, mainPipeline.IsReady)
}

// Start K8s cache server
var grpcServer *grpc.Server
if opts.K8sCacheServer.Port > 0 {
grpcServer = startK8sCacheServer(&opts.K8sCacheServer)
}

// Starts the flows pipeline
mainPipeline.Run()

Expand All @@ -214,9 +240,140 @@ func run() {
if healthServer != nil {
_ = healthServer.Shutdown(context.Background())
}
if grpcServer != nil {
log.Info("stopping K8s cache sync server")
// GracefulStop can hang indefinitely if StreamUpdates connections are still active
// Use a timeout and fall back to force-stop if needed
stopped := make(chan struct{})
go func() {
grpcServer.GracefulStop()
close(stopped)
}()
select {
case <-stopped:
log.Info("K8s cache sync server stopped gracefully")
case <-time.After(5 * time.Second):
log.Warn("timed out waiting for K8s cache sync streams to drain; forcing stop")
grpcServer.Stop()
}
}

// Give all threads a chance to exit and then exit the process
time.Sleep(time.Second)
log.Debugf("exiting main run")
os.Exit(0)
}

// startK8sCacheServer initializes and starts the gRPC server for K8s cache synchronization
// Returns nil if the datasource is not available (e.g., no kubernetes enrichment configured)
func startK8sCacheServer(cfg *config.K8sCacheServer) *grpc.Server {
// Check if kubernetes datasource is available
ds := kubernetes.GetDatasource()
if ds == nil {
log.Warn("K8s cache server requested but kubernetes datasource not initialized. " +
"Make sure kubernetes enrichment is configured in the pipeline.")
return nil
}

// Attach a Kubernetes store so the cache server can apply received updates; enrichment will use it for lookups
ds.SetKubernetesStore(datasource.NewKubernetesStore())
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Create cache server
cacheServer := k8scache.NewKubernetesCacheServer(ds)

// Configure keepalive and resource limits to prevent resource exhaustion
// These settings protect against misbehaving clients and ensure graceful connection management
kaPolicy := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // Minimum time between client pings
PermitWithoutStream: false, // Require active stream for keepalive
}
kaParams := keepalive.ServerParameters{
MaxConnectionIdle: 15 * time.Minute, // Close idle connections
MaxConnectionAge: 30 * time.Minute, // Max connection lifetime
MaxConnectionAgeGrace: 5 * time.Second, // Grace period before forcing close
Time: 30 * time.Second, // Ping interval when idle
Timeout: 10 * time.Second, // Ping timeout
}

// Base server options (applied to both TLS and non-TLS)
serverOpts := []grpc.ServerOption{
grpc.MaxConcurrentStreams(100), // Limit concurrent streams per connection
grpc.KeepaliveParams(kaParams), // Configure keepalive behavior
grpc.KeepaliveEnforcementPolicy(kaPolicy), // Enforce keepalive policy
grpc.MaxRecvMsgSize(50 * 1024 * 1024), // 50MB max message size
}

// Create gRPC server with optional TLS
var grpcServer *grpc.Server
if cfg.TLSEnabled {
tlsConfig, err := createServerTLSConfig(cfg)
if err != nil {
log.WithError(err).Fatal("failed to configure TLS for K8s cache server")
}
Comment thread
leandroberetta marked this conversation as resolved.
grpcServer = grpc.NewServer(append(serverOpts, grpc.Creds(tlsConfig))...)
log.Info("K8s cache server TLS enabled")
} else {
grpcServer = grpc.NewServer(serverOpts...)
log.Warn("K8s cache server TLS disabled - connections are insecure (not recommended for production)")
}
k8scache.RegisterKubernetesCacheServiceServer(grpcServer, cacheServer)

// Start listening
// Use net.JoinHostPort to properly handle IPv6 addresses (adds brackets when needed)
address := net.JoinHostPort(cfg.Address, fmt.Sprintf("%d", cfg.Port))
listener, err := net.Listen("tcp", address)
if err != nil {
log.WithError(err).WithField("address", address).Fatal("failed to start K8s cache server")
}
Comment thread
leandroberetta marked this conversation as resolved.

// Start server in background
go func() {
log.WithField("address", address).Info("starting K8s cache sync server")
if err := grpcServer.Serve(listener); err != nil {
log.WithError(err).Error("K8s cache sync server stopped with error")
}
}()

return grpcServer
}

// createServerTLSConfig creates TLS credentials for the gRPC server
func createServerTLSConfig(cfg *config.K8sCacheServer) (credentials.TransportCredentials, error) {
// Load server certificate and private key
if cfg.TLSCertPath == "" || cfg.TLSKeyPath == "" {
return nil, fmt.Errorf("TLS enabled but cert/key paths not provided")
}

cert, err := tls.LoadX509KeyPair(cfg.TLSCertPath, cfg.TLSKeyPath)
if err != nil {
return nil, fmt.Errorf("failed to load server cert/key: %w", err)
}

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.NoClientCert, // Default: no client cert required
Comment thread
leandroberetta marked this conversation as resolved.
MinVersion: tls.VersionTLS13, // Enforce TLS 1.3+ to prevent downgrade attacks
}
Comment thread
leandroberetta marked this conversation as resolved.

// If CA is provided, require and verify client certificates
if cfg.TLSCAPath != "" {
caCert, err := os.ReadFile(cfg.TLSCAPath)
if err != nil {
return nil, fmt.Errorf("failed to read CA cert: %w", err)
}

caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to append CA cert")
}

tlsConfig.ClientCAs = caCertPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
log.Info("K8s cache server: mutual TLS enabled (client certificates required)")
} else {
log.Warn("K8s cache server: TLS enabled but no client certificate verification (no CA provided). " +
"Any client with TLS can connect. Use --k8scache.tls-ca-path for mTLS or ensure network policies restrict access.")
}
Comment thread
leandroberetta marked this conversation as resolved.

return credentials.NewTLS(tlsConfig), nil
}
Loading
Loading