From 43b3360428bc87f3ac585afc498352d163376a25 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Wed, 6 May 2026 14:57:51 +0800 Subject: [PATCH] feat: skip shim managed attachments in manager the manager tries to attach the bpf programs before the shim completes the attachment. The shim now creates a dir to signal to the manager that the attachment is managed by the shim and can be skipped by the manager. --- activator/activator.go | 7 +++--- activator/activator_test.go | 1 + activator/bpf.go | 42 +++++++++++++++++++++++++++++----- cmd/shim/main.go | 1 + manager/redirector_attacher.go | 14 +++++++++++- 5 files changed, 54 insertions(+), 11 deletions(-) diff --git a/activator/activator.go b/activator/activator.go index e30409e..4dfb4f3 100644 --- a/activator/activator.go +++ b/activator/activator.go @@ -55,9 +55,6 @@ func NewServer(ctx context.Context, nn ns.NetNS) (*Server, error) { ns: nn, sandboxPid: parsePidFromNetNS(nn), } - if err := os.MkdirAll(PinPath(s.sandboxPid), os.ModePerm); err != nil { - return nil, err - } return s, nil } @@ -211,7 +208,9 @@ func (s *Server) Stop(ctx context.Context) { } log.G(ctx).Debugf("removing %s", PinPath(s.sandboxPid)) - _ = os.RemoveAll(PinPath(s.sandboxPid)) + if err := cleanPinPath(s.sandboxPid); err != nil { + log.G(ctx).WithError(err).Error("cleaning pin path") + } s.wg.Wait() log.G(ctx).Debug("activator stopped") diff --git a/activator/activator_test.go b/activator/activator_test.go index ded8f58..d365527 100644 --- a/activator/activator_test.go +++ b/activator/activator_test.go @@ -173,6 +173,7 @@ func TestActivator(t *testing.T) { // disable pinning for this test since this is flaky on some // systems (gh actions mostly) DisablePinning(), + ShimManaged(), ) require.NoError(t, err) require.NoError(t, bpf.AttachRedirector("lo")) diff --git a/activator/bpf.go b/activator/bpf.go index 0b33f58..088c05d 100644 --- a/activator/bpf.go +++ b/activator/bpf.go @@ -33,6 +33,7 @@ const ( taskCommOffsetVariable = "task_comm_offset" tcxIngressPinName = "tcx_ingress" tcxEgressPinName = "tcx_egress" + ManagedByShimSuffix = "_managed_by_shim" ) type BPF struct { @@ -50,6 +51,7 @@ type BPFConfig struct { probeBinaryName string trackerIgnoreLocalhost bool disablePinning bool + managedByShim bool } type BPFOpts func(cfg *BPFConfig) @@ -78,6 +80,12 @@ func DisablePinning() BPFOpts { } } +func ShimManaged() BPFOpts { + return func(cfg *BPFConfig) { + cfg.managedByShim = true + } +} + func InitBPF(pid int, log *slog.Logger, opts ...BPFOpts) (*BPF, error) { cfg := &BPFConfig{ mapSizes: map[string]uint32{ @@ -94,8 +102,13 @@ func InitBPF(pid int, log *slog.Logger, opts ...BPFOpts) (*BPF, error) { // as a single shim process can host multiple pods, we store the map in a // directory per sandbox pid. - path := PinPath(pid) - if err := os.MkdirAll(path, os.ModePerm); err != nil { + pinPath := PinPath(pid) + if cfg.managedByShim { + if err := os.MkdirAll(pinPath+ManagedByShimSuffix, os.ModePerm); err != nil { + return nil, fmt.Errorf("failed to create bpf fs subpath: %w", err) + } + } + if err := os.MkdirAll(pinPath, os.ModePerm); err != nil { return nil, fmt.Errorf("failed to create bpf fs subpath: %w", err) } @@ -137,7 +150,7 @@ func InitBPF(pid int, log *slog.Logger, opts ...BPFOpts) (*BPF, error) { objs := bpfObjects{} if err := spec.LoadAndAssign(&objs, &ebpf.CollectionOptions{ Maps: ebpf.MapOptions{ - PinPath: path, + PinPath: pinPath, }, }); err != nil { return nil, fmt.Errorf("loading objects: %w", err) @@ -146,6 +159,14 @@ func InitBPF(pid int, log *slog.Logger, opts ...BPFOpts) (*BPF, error) { return &BPF{pid: pid, log: log, objs: &objs, noPin: cfg.disablePinning}, nil } +// ManagedByShim returns true if loading/pinning is managed by the shim itself. +func ManagedByShim(pid int) bool { + if _, err := os.Stat(PinPath(pid) + ManagedByShimSuffix); err == nil { + return true + } + return false +} + // TCXPinned returns true if all TCX programs for the pid are pinned. func TCXPinned(pid int, ifaces ...string) bool { for _, iface := range ifaces { @@ -193,8 +214,10 @@ func (bpf *BPF) Cleanup() error { errs = append(errs, fmt.Errorf("closing link: %w", err)) } } - if err := bpf.objs.Close(); err != nil { - errs = append(errs, fmt.Errorf("unable to close bpf objects: %w", err)) + if bpf.objs != nil { + if err := bpf.objs.Close(); err != nil { + errs = append(errs, fmt.Errorf("unable to close bpf objects: %w", err)) + } } for _, qdisc := range bpf.qdiscs { if err := netlink.QdiscDel(qdisc); !os.IsNotExist(err) { @@ -208,10 +231,17 @@ func (bpf *BPF) Cleanup() error { } bpf.log.Info("deleting", "path", PinPath(bpf.pid)) - errs = append(errs, os.RemoveAll(PinPath(bpf.pid))) + errs = append(errs, cleanPinPath(bpf.pid)) return errors.Join(errs...) } +func cleanPinPath(pid int) error { + return errors.Join( + os.RemoveAll(PinPath(pid)), + os.RemoveAll(PinPath(pid)+ManagedByShimSuffix), + ) +} + func (bpf *BPF) AttachInNetNS(pid int, ifaces ...string) error { netNS, err := ns.GetNS(netNSPath(pid)) if err != nil { diff --git a/cmd/shim/main.go b/cmd/shim/main.go index 31d1969..db9b2b0 100644 --- a/cmd/shim/main.go +++ b/cmd/shim/main.go @@ -87,6 +87,7 @@ func attachActivator() bool { pid, log, activator.ProbeBinaryName(cfg.ProbeBinaryName), activator.TrackerIgnoreLocalhost(cfg.TrackerIgnoreLocalhost), + activator.ShimManaged(), ) if err != nil { log.Error("unable to initialize BPF", "error", err) diff --git a/manager/redirector_attacher.go b/manager/redirector_attacher.go index e298399..5d47a60 100644 --- a/manager/redirector_attacher.go +++ b/manager/redirector_attacher.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "sync" "github.com/containernetworking/plugins/pkg/ns" @@ -64,6 +65,11 @@ func AttachRedirectors(ctx context.Context, log *slog.Logger, activatorOpts ...a continue } + if activator.ManagedByShim(pid) { + r.log.Debug("skipping shim managed attach", "pid", pid) + continue + } + if activator.TCXPinned(pid) { r.log.Debug("skipping already pinned attach", "pid", pid) continue @@ -101,6 +107,11 @@ func (r *Redirector) watchForSandboxPids(ctx context.Context) error { continue } + if activator.ManagedByShim(pid) { + r.log.Debug("skipping shim managed attach", "pid", pid) + continue + } + if activator.TCXPinned(pid, activator.DefaultIfaces...) { r.log.Debug("skipping already pinned attach", "pid", pid) continue @@ -212,7 +223,8 @@ func (r *Redirector) getSandboxPids() ([]int, error) { func ignoredDir(dir string) bool { return dir == activator.SocketTrackerMap || dir == activator.PodKubeletAddrsMapv4 || - dir == activator.PodKubeletAddrsMapv6 + dir == activator.PodKubeletAddrsMapv6 || + strings.HasSuffix(dir, activator.ManagedByShimSuffix) } func (sb sandbox) Remove() error {