Skip to content
This repository was archived by the owner on Feb 20, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 185 additions & 52 deletions pilot/filebeat_piloter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ package pilot
import (
"encoding/json"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/elastic/go-ucfg"
"github.com/elastic/go-ucfg/yaml"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
"time"

log "github.com/Sirupsen/logrus"
"github.com/elastic/go-ucfg"
"github.com/elastic/go-ucfg/yaml"
)

// Global variables for FilebeatPiloter
Expand All @@ -29,7 +32,6 @@ const (
ENV_FILEBEAT_OUTPUT = "FILEBEAT_OUTPUT"
)

var filebeat *exec.Cmd
var _ Piloter = (*FilebeatPiloter)(nil)

// FilebeatPiloter for filebeat plugin
Expand All @@ -39,6 +41,10 @@ type FilebeatPiloter struct {
watchDone chan bool
watchDuration time.Duration
watchContainer map[string]string
fbExit chan struct{}
noticeStop chan bool
filebeat *exec.Cmd
mlock sync.Mutex
}

// NewFilebeatPiloter returns a FilebeatPiloter instance
Expand All @@ -48,7 +54,9 @@ func NewFilebeatPiloter(baseDir string) (Piloter, error) {
baseDir: baseDir,
watchDone: make(chan bool),
watchContainer: make(map[string]string, 0),
watchDuration: 60 * time.Second,
watchDuration: 120 * time.Second,
fbExit: make(chan struct{}),
noticeStop: make(chan bool),
}, nil
}

Expand Down Expand Up @@ -84,49 +92,136 @@ func (p *FilebeatPiloter) watch() error {
for {
select {
case <-p.watchDone:
log.Infof("%s watcher stop", p.Name())
return nil
case <-time.After(p.watchDuration):
//log.Debugf("%s watcher scan", p.Name())
err := p.scan()
log.Infof("%s watcher is stopping...", p.Name())
p.noticeStop <- true

//filebeat已经退出
if p.filebeat == nil {
return nil
}

err := p.filebeat.Process.Kill()
if err != nil {
log.Errorf("%s watcher scan error: %v", p.Name(), err)
pgroup := 0 - p.filebeat.Process.Pid
syscall.Kill(pgroup, syscall.SIGKILL)
}
time.Sleep(3 * time.Second) // wait a little
p.fbExit <- struct{}{}
return err
case <-time.After(p.watchDuration):
log.Debugf("%s watcher scan", p.Name())
go func() {
err := p.scan()
if err != nil {
log.Errorf("%s watcher scan error: %v", p.Name(), err)
}
}()
}
}
}

func (p *FilebeatPiloter) scan() error {
if len(p.watchContainer) == 0 {
return nil
}

//wait for other container in the same pod
time.Sleep(3 * time.Second)

states, err := p.getRegsitryState()
if err != nil {
return nil
log.Error("Get registry error: ", err)
return err
}

configPaths := p.loadConfigPaths()
delConfs := make(map[string]string)
delLogs := make(map[string]string)

p.mlock.Lock()
for container := range p.watchContainer {
confPath := p.GetConfPath(container)
if _, err := os.Stat(confPath); err != nil && os.IsNotExist(err) {
log.Infof("log config %s.yml has been removed and ignore", container)
delete(p.watchContainer, container)
} else if p.canRemoveConf(container, states, configPaths) {
log.Infof("try to remove log config %s.yml", container)
if err := os.Remove(confPath); err != nil {
log.Errorf("remove log config %s.yml fail: %v", container, err)
} else {
delete(p.watchContainer, container)
} else if logm, b := p.canRemoveConf(container, states, configPaths); b {
// 在这里加入自定义的补充动作。
// 这里config文件的清理动作做一个调整:
// 不在循环中进行实际的文件删除动作,每次循环只记录要执行删除的container, 在循环结束后统一处理。
delConfs[confPath] = container
for log, c := range logm {
delLogs[log] = c
}
}
}
return nil
p.mlock.Unlock()

if len(delConfs) == 0 {
log.Debugf("No filebeat config will modify, current scan end")
return nil
}

// 对filebeat进行container释放清理操作
p.Stop() //停止filebeat
<-p.fbExit //等待filebeat退出
defer func() {
time.Sleep(2 * time.Second)
p.Start()
time.Sleep(2 * time.Second)
}()

b, _ := ioutil.ReadFile(FILEBEAT_REGISTRY)
origStates := make([]RegistryState, 0)
newStates := make([]RegistryState, 0)
if err := json.Unmarshal(b, &origStates); err != nil {
log.Error("json error: ", err)
return err
}

failDelContainers := make(map[string]bool)
// 删除detroyed container的配置文件
for delConf, container := range delConfs {
log.Debug("start remove conf: ", delConf)
if err := os.Remove(delConf); err != nil {
log.Errorf("remove log config %s.yml fail: %v", container, err)
failDelContainers[container] = true
} else {
log.Infof("%s removed", delConf)
delete(p.watchContainer, container)
}
}

// 更新registry文件
for _, state := range origStates {
if !FileExist(state.Source) {
//当前的文件已经被删除了,可能是未清理的过期配置
log.Debugf("logfile(%s) has been removed, the item could be deleted: %v", state.Source, state)
continue
} else if container, ok := delLogs[state.Source]; !ok {
//当前state不是destroying container的log,需要继续保留
newStates = append(newStates, state)
} else if _, ok := failDelContainers[container]; ok {
//当前state是destroying container的log,但是conf文件删除失败了,也需要继续保留
newStates = append(newStates, state)
}
}
nb, err := json.Marshal(newStates)
if err != nil {
return err
}
err = ioutil.WriteFile(FILEBEAT_REGISTRY, nb, 0600)
return err
}

func (p *FilebeatPiloter) canRemoveConf(container string, registry map[string]RegistryState,
configPaths map[string]string) bool {
configPaths map[string]string) (map[string]string, bool) {
config, err := p.loadConfig(container)
if err != nil {
return false
log.Error(err)
return nil, false
}

delLogs := make(map[string]string)
for _, path := range config.Paths {
autoMount := p.isAutoMountPath(filepath.Dir(path))
logFiles, _ := filepath.Glob(path)
Expand All @@ -140,18 +235,19 @@ func (p *FilebeatPiloter) canRemoveConf(container string, registry map[string]Re
continue
}
if registry[logFile].Offset < info.Size() {
if autoMount { // ephemeral logs
if autoMount {
log.Infof("%s->%s does not finish to read", container, logFile)
return false
return nil, false
} else if _, ok := configPaths[path]; !ok { // host path bind
log.Infof("%s->%s does not finish to read and not exist in other config",
container, logFile)
return false
return nil, false
}
}
delLogs[logFile] = container
}
}
return true
return delLogs, true
}

func (p *FilebeatPiloter) loadConfig(container string) (*Config, error) {
Expand All @@ -163,10 +259,18 @@ func (p *FilebeatPiloter) loadConfig(container string) (*Config, error) {
}

var config Config
if err := c.Unpack(&config); err != nil {

var configs []Config
var paths []string
if err := c.Unpack(&configs); err != nil {
log.Errorf("parse %s.yml log config error: %v", container, err)
return nil, err
}

for _, c := range configs {
paths = append(paths, c.Paths...)
}
config.Paths = paths
return &config, nil
}

Expand Down Expand Up @@ -228,53 +332,82 @@ func (p *FilebeatPiloter) getRegsitryState() (map[string]RegistryState, error) {
}

func (p *FilebeatPiloter) feed(containerID string) error {
p.mlock.Lock()
if _, ok := p.watchContainer[containerID]; !ok {
p.watchContainer[containerID] = containerID
log.Infof("begin to watch log config: %s.yml", containerID)
}
p.mlock.Unlock()
return nil
}

// Start starting and watching filebeat process
func (p *FilebeatPiloter) Start() error {
if filebeat != nil {
pid := filebeat.Process.Pid
log.Infof("filebeat started, pid: %v", pid)
return fmt.Errorf(ERR_ALREADY_STARTED)
}

log.Info("starting filebeat")
filebeat = exec.Command(FILEBEAT_EXEC_CMD, "-c", FILEBEAT_CONF_FILE)
filebeat.Stderr = os.Stderr
filebeat.Stdout = os.Stdout
err := filebeat.Start()
if err != nil {
log.Errorf("filebeat start fail: %v", err)
log.Debug("Start the filebeat piloter")
if err := p.start(); err != nil {
return err
}

go func() {
log.Infof("filebeat started: %v", filebeat.Process.Pid)
err := filebeat.Wait()
if err != nil {
log.Errorf("filebeat exited: %v", err)
if exitError, ok := err.(*exec.ExitError); ok {
processState := exitError.ProcessState
log.Errorf("filebeat exited pid: %v", processState.Pid())
log.Infof("filebeat started: %v", p.filebeat.Process.Pid)
for {
select {
case err := <-Func2Chan(p.filebeat.Wait):
if err != nil {
log.Errorf("filebeat exited: %v", err)
if exitError, ok := err.(*exec.ExitError); ok {
processState := exitError.ProcessState
log.Errorf("filebeat exited pid: %v", processState.Pid())
}
}

// try to restart filebeat
log.Warningf("filebeat exited and try to restart")
if err := p.start(); err != nil {
//启动失败,重启piloter,通知watchDone
p.Stop()
}
case <-p.noticeStop:
return
}
}

// try to restart filebeat
log.Warningf("filebeat exited and try to restart")
filebeat = nil
p.Start()
}()

go p.watch()
return err
return nil
}

// start filebeat process
func (p *FilebeatPiloter) start() error {
if p.filebeat != nil {
pid := p.filebeat.Process.Pid
process, err := os.FindProcess(pid)
if err == nil {
err = process.Signal(syscall.Signal(0))
if err == nil {
log.Infof("filebeat started, pid: %v", pid)
return err
}
}
}

p.filebeat = nil
log.Info("starting filebeat")
cmd := exec.Command(FILEBEAT_EXEC_CMD, "-c", FILEBEAT_CONF_FILE)
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout
err := cmd.Start()
if err != nil {
log.Errorf("filebeat start fail: %v", err)
return err
}
p.filebeat = cmd
return nil
}

// Stop log collection
func (p *FilebeatPiloter) Stop() error {
log.Debug("Stop the filebeat piloter")
p.watchDone <- true
return nil
}
Expand Down
20 changes: 20 additions & 0 deletions pilot/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pilot

import (
"io/ioutil"
"os"
"strings"
)

Expand All @@ -14,3 +15,22 @@ func ReadFile(path string, separator string) ([]string, error) {

return strings.Split(string(data), separator), nil
}

func Func2Chan(f func() error) <-chan error {
retChan := make(chan error)
go func(c chan error) {
err := f()
c <- err
}(retChan)

return retChan
}

func FileExist(path string) bool {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}