diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..1c7e34c --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,60 @@ +name: Go Lint and Test + +on: + workflow_dispatch: + +# todo +# push: +# tags: +# - 'v*' # match tags that start with v (like v1.0.0) +# branches: +# - release +# - master +# - dev +# paths: +# - 'src/**' +# - .github/workflows/test.yml + +jobs: + lint: + name: lint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - uses: actions/setup-go@v5 + with: + go-version: '1.24' + cache: true + + - name: golangci-lint + uses: golangci/golangci-lint-action@v6 + with: + version: v1.64 + working-directory: src + +# test: todo +# name: test go +# runs-on: ubuntu-latest +# steps: +# - uses: actions/checkout@v4 +# with: +# fetch-depth: 0 +# +# - uses: actions/setup-go@v5 +# with: +# go-version: '1.24' +# cache: true +# +# # docker setup for running tests +# - name: Set up Docker Buildx +# uses: docker/setup-buildx-action@v3 +# +# - name: Install dependencies +# run: go mod download +# +# - name: Run test +# run: | +# go test ./service/jobs/ -v -run Test500Jobs diff --git a/Dockerfile b/Dockerfile index 2e3154d..132e53d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -40,7 +40,9 @@ WORKDIR /app/ COPY --from=builder /app/leviathan . -ENV IS_DOCKER=true +ENV LEVIATHAN_IS_DOCKER=true +# default level info when running in docker +ENV LEVIATHAN_LOG_LEVEL=info EXPOSE 9221 diff --git a/Justfile b/Justfile index b0f2615..f295446 100644 --- a/Justfile +++ b/Justfile @@ -44,12 +44,22 @@ bdrn: just dk docker run --rm --network=host -v /var/run/docker.sock:/var/run/docker.sock -v appdata:/app/appdata/ {{imageName}} +alias dc := dclean +dclean: + docker rm -f $(docker ps -aq) + docker image prune -ay + dkrn: docker compose up --build post: docker compose --profile post up +# update all go deps +[working-directory: 'src'] +get: + go get -v -u all + # lint go files [working-directory: 'src'] lint: @@ -63,3 +73,13 @@ tidy: [working-directory: 'src'] vet: go vet ./... + +# go build and run +[working-directory: 'src'] +gb: + go build -o ../bin/leviathan.exe + +# go build +gr: + just gb + ./bin/leviathan.exe diff --git a/src/api/api.go b/src/api/api.go index 9e48382..5ea201f 100644 --- a/src/api/api.go +++ b/src/api/api.go @@ -16,13 +16,7 @@ import ( func StartGrpcServer() { mux := setupEndpoints() - log.Info(). - Str("build_date", common.BuildDate). - Str("build_commit", common.CommitInfo). - Str("git_branch", common.Branch). - Str("go_version", common.GoVersion). - Str("build_version", common.Version). - Msg("Leviathan initialized successfully") + log.Info().Msg("Leviathan initialized successfully") srvAddr := fmt.Sprintf(":%s", common.ServerPort.GetStr()) log.Info().Msgf("starting server on %s", srvAddr) diff --git a/src/common/config.go b/src/common/config.go index 706b425..a967dda 100644 --- a/src/common/config.go +++ b/src/common/config.go @@ -4,16 +4,21 @@ import ( "errors" "fmt" "github.com/joho/godotenv" + "github.com/makeopensource/leviathan/models" "github.com/rs/zerolog/log" "github.com/spf13/viper" "os" "path/filepath" + "strconv" ) func InitConfig() { - err := godotenv.Load() - if err != nil { - log.Warn().Err(err).Msg(".env not found. you can safely ignore this warning if you dont have a .env file") + _, ok := os.LookupEnv("LEVIATHAN_IS_DOCKER") + if !ok { + err := godotenv.Load() // load .env file for non docker env + if err != nil { + log.Warn().Err(err).Msg(".env not found. ignore this warning if you did not intend to load a .env file") + } } defer func() { @@ -27,23 +32,46 @@ func InitConfig() { configDir := getConfigDir(baseDir) viper.SetConfigName("config") - viper.SetConfigType("json") + viper.SetConfigType("toml") viper.AddConfigPath(configDir) - setupDefaultOptions(configDir) - loadPostgresOptions() + // ignore any error to setup default vals + _ = viper.ReadInConfig() + + setIfEnvPresentOrDefault( + loglevelKey, + "LEVIATHAN_LOG_LEVEL", + "debug", + ) - submissionFolderPath := getStringEnvOrDefault("TMP_SUBMISSION_FOLDER", fmt.Sprintf("%s/%s", baseDir, "submissions")) - viper.SetDefault(submissionDirKey, submissionFolderPath) + loadPostgresOptions() + setupDefaultOptions(configDir) - outputFolderPath := getStringEnvOrDefault("LOG_OUTPUT_FOLDER", fmt.Sprintf("%s/%s", baseDir, "output")) - viper.SetDefault(outputDirKey, outputFolderPath) + submissionFolderPath := setIfEnvPresentOrDefault( + submissionDirKey, + "TMP_SUBMISSION_DIR", + fmt.Sprintf("%s/%s", baseDir, "submissions"), + ) + outputFolderPath := setIfEnvPresentOrDefault( + outputDirKey, + "SUBMISSION_OUTPUT_DIR", + fmt.Sprintf("%s/%s", baseDir, "output"), + ) err = makeDirectories([]string{submissionFolderPath, outputFolderPath}) + if err != nil { + log.Fatal().Err(err).Msg("unable to make required directories") + } if err := viper.SafeWriteConfig(); err != nil { var configFileAlreadyExistsError viper.ConfigFileAlreadyExistsError - if !errors.As(err, &configFileAlreadyExistsError) { + if errors.As(err, &configFileAlreadyExistsError) { + // merge any new changes + err := viper.WriteConfig() + if err != nil { + log.Fatal().Err(err).Msg("viper could not write to config file") + } + } else { log.Fatal().Err(err).Msg("viper could not write to config file") } } @@ -52,46 +80,23 @@ func InitConfig() { log.Fatal().Err(err).Msg("could not read config file") } - log.Info().Msgf("watching config file at %s", viper.ConfigFileUsed()) - viper.WatchConfig() - - // maybe create viper instance and return from this function - // future setup in case https://github.com/spf13/viper/issues/1855 is accepted + log.Info().Msgf("loaded config from %s", viper.ConfigFileUsed()) } func loadPostgresOptions() { - enablePost := false - if getStringEnvOrDefault("POSTGRES_ENABLE", "false") == "true" { - enablePost = true + setIfEnvPresentOrDefault(postgresHostKey, "POSTGRES_HOST", "localhost") + setIfEnvPresentOrDefault(postgresPortKey, "POSTGRES_PORT", "5432") + setIfEnvPresentOrDefault(postgresUserKey, "POSTGRES_USER", "postgres") + setIfEnvPresentOrDefault(postgresPassKey, "POSTGRES_PASSWORD", "postgres") + setIfEnvPresentOrDefault(postgresDBKey, "POSTGRES_DB", "postgres") + setIfEnvPresentOrDefault(postgresSslKey, "POSTGRES_SSL", "disable") + + val, isDefault := getBoolEnvOrDefault("POSTGRES_ENABLE", false) + if isDefault { + viper.SetDefault(enablePostgresKey, val) + } else { + viper.Set(enablePostgresKey, val) } - viper.SetDefault( - enablePostgresKey, - enablePost, - ) - viper.SetDefault( - postgresHostKey, - getStringEnvOrDefault("POSTGRES_HOST", "localhost"), - ) - viper.SetDefault( - postgresPortKey, - getStringEnvOrDefault("POSTGRES_PORT", "5432"), - ) - viper.SetDefault( - postgresUserKey, - getStringEnvOrDefault("POSTGRES_USER", "postgres"), - ) - viper.SetDefault( - postgresPassKey, - getStringEnvOrDefault("POSTGRES_PASSWORD", ""), - ) - viper.SetDefault( - postgresDBKey, - getStringEnvOrDefault("POSTGRES_DB", "postgres"), - ) - viper.SetDefault( - postgresSslKey, - getStringEnvOrDefault("POSTGRES_SSL", "disable"), - ) } func getConfigDir(baseDir string) string { @@ -103,12 +108,42 @@ func getConfigDir(baseDir string) string { return configDir } -func getStringEnvOrDefault(key, defaultVal string) string { +// uses viper.Set if env var was found, +// +// else uses' viper.SetDefault and uses defaultValue +// +// this allows us to overwrite any new configration changes passed via env vars, +// but ignore if no env were passed +func setIfEnvPresentOrDefault(configKey, envKeyName, defaultValue string) string { + val, isDefault := getStringEnvOrDefault(envKeyName, defaultValue) + if isDefault { + viper.SetDefault(configKey, val) + } else { + // always overwrite with key + viper.Set(configKey, val) + } + + return val +} + +func getStringEnvOrDefault(key, defaultVal string) (finalVal string, isDefault bool) { + value := os.Getenv(key) + if value == "" { + return defaultVal, true + } + return value, false +} + +func getBoolEnvOrDefault(key string, defaultVal bool) (finalVal, isDefault bool) { value := os.Getenv(key) if value == "" { - return defaultVal + return defaultVal, true + } + parseBool, err := strconv.ParseBool(value) + if err != nil { + return defaultVal, true } - return value + return parseBool, false } func setupDefaultOptions(configDir string) { @@ -118,6 +153,15 @@ func setupDefaultOptions(configDir string) { viper.SetDefault(serverPortKey, "9221") viper.SetDefault(enableLocalDockerKey, true) viper.SetDefault(concurrentJobsKey, 50) + viper.SetDefault(ClientSSHKey, map[string]models.MachineOptions{ + "example": { + Enable: false, + Name: "example", + Host: "http://localhost:8080", + User: "test", + Port: 22, + }, + }) } func getBaseDir() (string, error) { diff --git a/src/common/config_keys.go b/src/common/config_keys.go index 327f244..c4775d1 100644 --- a/src/common/config_keys.go +++ b/src/common/config_keys.go @@ -6,10 +6,11 @@ import ( ) const ( - concurrentJobsKey = "concurrent_jobs" + concurrentJobsKey = "jobs.concurrent_jobs" apiKeyKey = "server.apikey" serverPortKey = "server.port" + loglevelKey = "server.log_level" // folders logDirKey = "folder.log_dir" @@ -17,6 +18,7 @@ const ( outputDirKey = "folder.log_output_dir" // docker config enableLocalDockerKey = "clients.enable_local_docker" + ClientSSHKey = "clients.ssh" sqliteDbPathKey = "db.sqlite.db_path" // postgres @@ -31,7 +33,7 @@ const ( var ( // internal use - + LogLevel = Config{loglevelKey} LogDir = Config{logDirKey} SqliteDbPath = Config{sqliteDbPathKey} diff --git a/src/common/info.go b/src/common/info.go index b791ecd..459b36c 100644 --- a/src/common/info.go +++ b/src/common/info.go @@ -1,8 +1,17 @@ package common -import "runtime" +import ( + "fmt" + "math" + "math/rand/v2" + "runtime" + "sort" + "strings" + "time" +) -// build args +// build args to modify these vars +// // go build -ldflags "\ // -X github.com/makeopensource/leviathan/common.Version=0.1.0 \ // -X github.com/makeopensource/leviathan/common.CommitInfo=$(git rev-parse HEAD) \ @@ -13,7 +22,208 @@ import "runtime" // main.go var Version = "dev" -var CommitInfo = "dev" -var BuildDate = "dev" -var Branch = "dev" // Git branch +var CommitInfo = "unknown" +var BuildDate = "unknown" +var Branch = "unknown" // Git branch var GoVersion = runtime.Version() + +func PrintInfo() { + // generated from https://patorjk.com/software/taag/#p=testall&t=leviathan + var headers = []string{ + // contains some characters that mess with multiline strings leave this alone + "\n (`-') _ (`-') _ (`-') _ (`-') (`-').-> (`-') _ <-. (`-')_ \n <-. ( OO).-/ _(OO ) (_) (OO ).-/ ( OO).-> (OO )__ (OO ).-/ \\( OO) )\n ,--. ) (,------.,--.(_/,-.\\ ,-(`-')/ ,---. / '._ ,--. ,'-' / ,---. ,--./ ,--/ \n | (`-') | .---'\\ \\ / (_/ | ( OO)| \\ /`.\\ |'--...__)| | | | | \\ /`.\\ | \\ | | \n | |OO )(| '--. \\ / / | | )'-'|_.' |`--. .--'| `-' | '-'|_.' || . '| |)\n(| '__ | | .--' _ \\ /_)(| |_/(| .-. | | | | .-. |(| .-. || |\\ | \n | |' | `---.\\-'\\ / | |'->| | | | | | | | | | | | | || | \\ | \n `-----' `------' `-' `--' `--' `--' `--' `--' `--' `--' `--'`--' `--' \n", + "\n __ ______ __ __ ________ ________ _________ ___ ___ ________ ___ __ \n/_/\\ /_____/\\ /_/\\ /_/\\ /_______/\\/_______/\\ /________/\\/__/\\ /__/\\ /_______/\\ /__/\\ /__/\\ \n\\:\\ \\ \\::::_\\/_\\:\\ \\\\ \\ \\\\__.::._\\/\\::: _ \\ \\\\__.::.__\\/\\::\\ \\\\ \\ \\\\::: _ \\ \\\\::\\_\\\\ \\ \\ \n \\:\\ \\ \\:\\/___/\\\\:\\ \\\\ \\ \\ \\::\\ \\ \\::(_) \\ \\ \\::\\ \\ \\::\\/_\\ .\\ \\\\::(_) \\ \\\\:. `-\\ \\ \\ \n \\:\\ \\____\\::___\\/_\\:\\_/.:\\ \\ _\\::\\ \\__\\:: __ \\ \\ \\::\\ \\ \\:: ___::\\ \\\\:: __ \\ \\\\:. _ \\ \\ \n \\:\\/___/\\\\:\\____/\\\\ ..::/ //__\\::\\__/\\\\:.\\ \\ \\ \\ \\::\\ \\ \\: \\ \\\\::\\ \\\\:.\\ \\ \\ \\\\. \\`-\\ \\ \\\n \\_____\\/ \\_____\\/ \\___/_( \\________\\/ \\__\\/\\__\\/ \\__\\/ \\__\\/ \\::\\/ \\__\\/\\__\\/ \\__\\/ \\__\\/\n \n", + ` + ___ _______ ___ ___ ___ _________ ___ ___ ________ ________ +|\ \ |\ ___ \ |\ \ / /||\ \ |\___ ___\|\ \|\ \ |\ __ \ |\ ___ \ +\ \ \ \ \ __/| \ \ \ / / /\ \ \\|___ \ \_|\ \ \\\ \\ \ \|\ \\ \ \\ \ \ + \ \ \ \ \ \_|/__\ \ \/ / / \ \ \ \ \ \ \ \ __ \\ \ __ \\ \ \\ \ \ + \ \ \____ \ \ \_|\ \\ \ / / \ \ \ \ \ \ \ \ \ \ \\ \ \ \ \\ \ \\ \ \ + \ \_______\\ \_______\\ \__/ / \ \__\ \ \__\ \ \__\ \__\\ \__\ \__\\ \__\\ \__\ + \|_______| \|_______| \|__|/ \|__| \|__| \|__|\|__| \|__|\|__| \|__| \|__| +`, + ` + ___ ___ ___ ___ ___ ___ ___ ___ + /\__\ /\ \ /\__\ ___ /\ \ /\ \ /\__\ /\ \ /\__\ + /:/ / /::\ \ /:/ / /\ \ /::\ \ \:\ \ /:/ / /::\ \ /::| | + /:/ / /:/\:\ \ /:/ / \:\ \ /:/\:\ \ \:\ \ /:/__/ /:/\:\ \ /:|:| | + /:/ / /::\~\:\ \ /:/__/ ___ /::\__\ /::\~\:\ \ /::\ \ /::\ \ ___ /::\~\:\ \ /:/|:| |__ + /:/__/ /:/\:\ \:\__\ |:| | /\__\ __/:/\/__/ /:/\:\ \:\__\ /:/\:\__\ /:/\:\ /\__\ /:/\:\ \:\__\ /:/ |:| /\__\ + \:\ \ \:\~\:\ \/__/ |:| |/:/ / /\/:/ / \/__\:\/:/ / /:/ \/__/ \/__\:\/:/ / \/__\:\/:/ / \/__|:|/:/ / + \:\ \ \:\ \:\__\ |:|__/:/ / \::/__/ \::/ / /:/ / \::/ / \::/ / |:/:/ / + \:\ \ \:\ \/__/ \::::/__/ \:\__\ /:/ / \/__/ /:/ / /:/ / |::/ / + \:\__\ \:\__\ ~~~~ \/__/ /:/ / /:/ / /:/ / /:/ / + \/__/ \/__/ \/__/ \/__/ \/__/ \/__/ +`, + ` +██╗ ███████╗██╗ ██╗██╗ █████╗ ████████╗██╗ ██╗ █████╗ ███╗ ██╗ +██║ ██╔════╝██║ ██║██║██╔══██╗╚══██╔══╝██║ ██║██╔══██╗████╗ ██║ +██║ █████╗ ██║ ██║██║███████║ ██║ ███████║███████║██╔██╗ ██║ +██║ ██╔══╝ ╚██╗ ██╔╝██║██╔══██║ ██║ ██╔══██║██╔══██║██║╚██╗██║ +███████╗███████╗ ╚████╔╝ ██║██║ ██║ ██║ ██║ ██║██║ ██║██║ ╚████║ +╚══════╝╚══════╝ ╚═══╝ ╚═╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝ +`, + ` + _ _______ _________ _______ _________ _______ _ +( \ ( ____ \|\ /|\__ __/( ___ )\__ __/|\ /|( ___ )( ( /| +| ( | ( \/| ) ( | ) ( | ( ) | ) ( | ) ( || ( ) || \ ( | +| | | (__ | | | | | | | (___) | | | | (___) || (___) || \ | | +| | | __) ( ( ) ) | | | ___ | | | | ___ || ___ || (\ \) | +| | | ( \ \_/ / | | | ( ) | | | | ( ) || ( ) || | \ | +| (____/\| (____/\ \ / ___) (___| ) ( | | | | ) ( || ) ( || ) \ | +(_______/(_______/ \_/ \_______/|/ \| )_( |/ \||/ \||/ )_) +`, + } + + const ( + width = 90 + colorReset = "\033[0m" + // Nord color palette ANSI equivalents + nord4 = "\033[38;5;188m" // Snow Storm (darkest) - main text color + nord8 = "\033[38;5;110m" // Frost - light blue + nord9 = "\033[38;5;111m" // Frost - blue + nord10 = "\033[38;5;111m" // Frost - deep blue + nord15 = "\033[38;5;139m" // Aurora - purple + ) + + // Print header + dividerContent := strings.Repeat("=", width) + divider := nord9 + dividerContent + colorReset + + fmt.Println(divider) + fmt.Printf("%s%s %s %s\n", nord15, strings.Repeat(" ", (width-24)/2), headers[rand.IntN(len(headers))], colorReset) + fmt.Println(divider) + + // Print app info with aligned values + printField := func(name, value string) { + fmt.Printf("%s%-15s: %s%s%s\n", nord4, name, nord8, value, colorReset) + } + + printField("Version", Version) + printField("CommitInfo", CommitInfo) + printField("BuildDate", formatTime(BuildDate)) + printField("Branch", Branch) + printField("GoVersion", runtime.Version()) + + if Branch != "unknown" && CommitInfo != "unknown" { + fmt.Println(nord10 + strings.Repeat("-", width) + colorReset) + githubURL := GetGitHubURL(Branch, CommitInfo) + fmt.Printf("%s%s%s\n", nord15, githubURL, colorReset) + } + + fmt.Println(divider) +} + +func GetGitHubURL(branch, commitHash string) string { + const ( + repoOwner = "makeopensource" + repoName = "leviathan" + ) + // For browsing at the specific branch + branchURL := fmt.Sprintf("https://github.com/%s/%s/tree/%s", + repoOwner, repoName, branch) + // For viewing the specific commit + commitURL := fmt.Sprintf("https://github.com/%s/%s/commit/%s", + repoOwner, repoName, commitHash) + + return fmt.Sprintf("Branch: %s\nCommit: %s", branchURL, commitURL) +} + +func formatTime(input string) string { + buildTime, err := time.Parse(time.RFC3339, input) + if err != nil { + //fmt.Printf("Error parsing build time: %v\n", err) + return input + } + // Get the local timezone + localLocation, err := time.LoadLocation("Local") + if err != nil { + return input + } + // Convert the time to the local timezone + localBuildTime := buildTime.In(localLocation) + return fmt.Sprintf("%s (%s)", localBuildTime.Format("2006-01-02 3:04 PM MST"), timeago(localBuildTime)) +} + +// Seconds-based time units +const ( + Day = 24 * time.Hour + Week = 7 * Day + Month = 30 * Day + Year = 12 * Month + LongTime = 37 * Year +) + +// Time formats a time into a relative string. +// +// Time(someT) -> "3 weeks ago" +// +// stolen from -> https://github.com/dustin/go-humanize/blob/master/times.go +func timeago(then time.Time) string { + return RelTime(then, time.Now(), "ago", "from now") +} + +type RelTimeMagnitude struct { + D time.Duration + Format string + DivBy time.Duration +} + +var defaultMagnitudes = []RelTimeMagnitude{ + {time.Second, "now", time.Second}, + {2 * time.Second, "1 second %s", 1}, + {time.Minute, "%d seconds %s", time.Second}, + {2 * time.Minute, "1 minute %s", 1}, + {time.Hour, "%d minutes %s", time.Minute}, + {2 * time.Hour, "1 hour %s", 1}, + {Day, "%d hours %s", time.Hour}, + {2 * Day, "1 day %s", 1}, + {Week, "%d days %s", Day}, + {2 * Week, "1 week %s", 1}, + {Month, "%d weeks %s", Week}, + {2 * Month, "1 month %s", 1}, + {Year, "%d months %s", Month}, + {18 * Month, "1 year %s", 1}, + {2 * Year, "2 years %s", 1}, + {LongTime, "%d years %s", Year}, + {math.MaxInt64, "a long while %s", 1}, +} + +func RelTime(a, b time.Time, albl, blbl string) string { + return CustomRelTime(a, b, albl, blbl, defaultMagnitudes) +} + +func CustomRelTime(a, b time.Time, albl, blbl string, magnitudes []RelTimeMagnitude) string { + lbl := albl + diff := b.Sub(a) + + if a.After(b) { + lbl = blbl + diff = a.Sub(b) + } + + n := sort.Search(len(magnitudes), func(i int) bool { + return magnitudes[i].D > diff + }) + + if n >= len(magnitudes) { + n = len(magnitudes) - 1 + } + mag := magnitudes[n] + var args []interface{} + escaped := false + for _, ch := range mag.Format { + if escaped { + switch ch { + case 's': + args = append(args, lbl) + case 'd': + args = append(args, diff/mag.DivBy) + } + escaped = false + } else { + escaped = ch == '%' + } + } + return fmt.Sprintf(mag.Format, args...) +} diff --git a/src/common/logger.go b/src/common/logger.go index 0c6badf..d33a72c 100644 --- a/src/common/logger.go +++ b/src/common/logger.go @@ -25,12 +25,18 @@ func CreateJobSubLoggerCtx(ctx context.Context, jobID string) context.Context { } func FileConsoleLogger() zerolog.Logger { + level, err := zerolog.ParseLevel(LogLevel.GetStr()) + if err != nil { + log.Fatal().Err(err).Msg("unable to parse log level") + } + log.Info().Msgf("log level is now set to %s, this can be changed by using the LEVIATHAN_LOG_LEVEL env", level) + return baseLogger.Output( zerolog.MultiLevelWriter( GetFileLogger(LogDir.GetStr()), consoleWriter, ), - ) + ).Level(level) } func ConsoleLogger() zerolog.Logger { diff --git a/src/main.go b/src/main.go index 696efb1..2ae237f 100644 --- a/src/main.go +++ b/src/main.go @@ -7,6 +7,7 @@ import ( ) func main() { + common.PrintInfo() log.Logger = common.ConsoleLogger() common.InitConfig() api.StartGrpcServer() diff --git a/src/models/broadcast_channel.go b/src/models/broadcast_channel.go index 56899bf..614a729 100644 --- a/src/models/broadcast_channel.go +++ b/src/models/broadcast_channel.go @@ -5,6 +5,10 @@ import ( "github.com/rs/zerolog/log" ) +type BroadcastChannelKey string + +const BroadcastKey BroadcastChannelKey = "broadcast" + type BroadcastChannel struct { subscribers Map[string, chan *Job] } @@ -13,7 +17,7 @@ func NewBroadcastChannel() (*BroadcastChannel, context.Context) { bc := &BroadcastChannel{ subscribers: Map[string, chan *Job]{}, } - return bc, context.WithValue(context.Background(), "broadcast", bc) + return bc, context.WithValue(context.Background(), BroadcastKey, bc) } func (c *BroadcastChannel) Broadcast(v *Job) { diff --git a/src/models/job.go b/src/models/job.go index ca7dbfa..4b5e541 100644 --- a/src/models/job.go +++ b/src/models/job.go @@ -105,7 +105,7 @@ func (j *Job) VerifyJobLimits() { // AfterUpdate adds hooks for job streaming, updates a go channel everytime a job is updated // the consumer is responsible if it wants to use the job func (j *Job) AfterUpdate(tx *gorm.DB) (err error) { - ch := tx.Statement.Context.Value("broadcast") + ch := tx.Statement.Context.Value(BroadcastKey) if ch == nil { log.Warn().Msg("database broadcast channel is nil") return diff --git a/src/models/machine.go b/src/models/machine.go index 1d9a3f2..f998e38 100644 --- a/src/models/machine.go +++ b/src/models/machine.go @@ -1,8 +1,9 @@ package models type MachineOptions struct { - Name string `mapstructure:"name"` - Host string `mapstructure:"host"` - Port int `mapstructure:"port"` - User string `mapstructure:"user"` + Enable bool `mapstructure:"enable"` + Name string `mapstructure:"name"` + Host string `mapstructure:"host"` + Port int `mapstructure:"port"` + User string `mapstructure:"user"` } diff --git a/src/service/docker/docker_client.go b/src/service/docker/docker_client.go index d79abba..1254350 100644 --- a/src/service/docker/docker_client.go +++ b/src/service/docker/docker_client.go @@ -305,7 +305,7 @@ func (c *DkClient) PruneContainers() error { return nil } -func (c *DkClient) GetContainerStatus(ctx context.Context, contId string) (*types.ContainerJSON, error) { +func (c *DkClient) GetContainerStatus(ctx context.Context, contId string) (*container.InspectResponse, error) { inspect, err := c.Client.ContainerInspect(ctx, contId) if err != nil { return nil, err diff --git a/src/service/docker/docker_manager.go b/src/service/docker/docker_manager.go index 12cd61c..9b814e3 100644 --- a/src/service/docker/docker_manager.go +++ b/src/service/docker/docker_manager.go @@ -43,13 +43,14 @@ func GetClientList() []models.MachineOptions { log.Warn().Err(err).Msgf("Error decoding configuration structure for %s", name) continue } - // Set the name manually since it's not part of the nested structure options.Name = name - - // Append to the list - allMachines = append(allMachines, options) - log.Info().Any("options", options).Msgf("Loaded Machine: %s", name) + if options.Enable { + allMachines = append(allMachines, options) + log.Info().Any("options", options).Msgf("found machine config: %s", name) + } else { + log.Debug().Any("options", options).Msgf("found machine config: %s, but it was disabled", name) + } } return allMachines diff --git a/src/service/jobs/job_queue.go b/src/service/jobs/job_queue.go index 384fc4f..3c0aea1 100644 --- a/src/service/jobs/job_queue.go +++ b/src/service/jobs/job_queue.go @@ -125,7 +125,7 @@ func (q *JobQueue) runJob(job *models.Job) { statusCh, errCh := client.Client.ContainerWait(context.Background(), contId, cont.WaitConditionNotRunning) select { - case _ = <-statusCh: + case <-statusCh: wg.Wait() // for logs to complete writing q.verifyLogs(job) return @@ -142,7 +142,7 @@ func (q *JobQueue) runJob(job *models.Job) { } func (q *JobQueue) writeLogs(client *docker.DkClient, msg *models.Job) { - outputFile, err := os.OpenFile(msg.OutputLogFilePath, os.O_RDWR|os.O_CREATE, 660) + outputFile, err := os.OpenFile(msg.OutputLogFilePath, os.O_RDWR|os.O_CREATE, 0660) if err != nil { q.bigProblem(msg, "unable to open output file", err) return diff --git a/src/service/service.go b/src/service/service.go index c844898..1cbf870 100644 --- a/src/service/service.go +++ b/src/service/service.go @@ -28,8 +28,10 @@ func InitServices() (*docker.DkService, *jobs.JobService) { return dkService, jobService } -// removes any job left in am 'active' state before application start, -// fail any jobs that were running before leviathan was killed (for whatever reason) +// removes any job left in an 'active' state before application start, +// fail any jobs that were running before leviathan was able to process them (for whatever reason) +// +// for example machine running leviathan shutdown unexpectedly or leviathan had an unrecoverable error func cleanupOrphanJobs(db *gorm.DB, dk *docker.DkService) { var orphanJobs []*models.Job res := db. @@ -63,6 +65,7 @@ func cleanupOrphanJobs(db *gorm.DB, dk *docker.DkService) { } orphan.Status = models.Failed + orphan.StatusMessage = "job was unable to be processed due to an internal server error" res = db.Save(orphan) if res.Error == nil { log.Warn().Err(res.Error).Msg("unable to update orphan job status")