-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpool.go
More file actions
773 lines (657 loc) · 21.4 KB
/
pool.go
File metadata and controls
773 lines (657 loc) · 21.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
package patternx
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
// Production constants
const (
MaxWorkersLimitPool = 1000
MinWorkersLimitPool = 1
MaxQueueSizeLimitPool = 10000
MinQueueSizeLimitPool = 1
MaxIdleTimeoutLimitPool = 10 * time.Minute
MinIdleTimeoutLimitPool = 1 * time.Second
MaxScaleUpThresholdLimitPool = 100
MinScaleUpThresholdLimitPool = 1
MaxScaleDownThresholdLimitPool = 50
MinScaleDownThresholdLimitPool = 1
MaxScaleUpCooldownLimitPool = 1 * time.Minute
MinScaleUpCooldownLimitPool = 100 * time.Millisecond
MaxScaleDownCooldownLimitPool = 5 * time.Minute
MinScaleDownCooldownLimitPool = 1 * time.Second
MaxJobTimeoutLimitPool = 1 * time.Hour
MinJobTimeoutLimitPool = 1 * time.Millisecond
DefaultJobTimeoutPool = 30 * time.Second
GracefulShutdownTimeoutPool = 30 * time.Second
AutoScaleIntervalPool = 5 * time.Second
MetricsCollectionIntervalPool = 1 * time.Second
)
// WorkerPool provides a configurable worker pool with backpressure for batch operations
type WorkerPool struct {
// Configuration
config ConfigPool
// Worker management
workers []*Worker
workerPool chan *Worker
mu sync.RWMutex
// Job management
jobQueue chan JobPool
jobResults chan JobResultPool
// Statistics - all atomic for thread safety
stats *PoolStatsPool
// Lifecycle
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
closed int32
// Health monitoring
healthStatus atomic.Value // *HealthStatus
lastHealthCheck time.Time
healthMu sync.RWMutex
}
// Config defines worker pool configuration
type ConfigPool struct {
// Pool size configuration
MinWorkers int `json:"min_workers"`
MaxWorkers int `json:"max_workers"`
QueueSize int `json:"queue_size"`
IdleTimeout time.Duration `json:"idle_timeout"`
// Performance tuning
ScaleUpThreshold int `json:"scale_up_threshold"`
ScaleDownThreshold int `json:"scale_down_threshold"`
ScaleUpCooldown time.Duration `json:"scale_up_cooldown"`
ScaleDownCooldown time.Duration `json:"scale_down_cooldown"`
// Monitoring
EnableMetrics bool `json:"enable_metrics"`
}
// DefaultConfig returns a default configuration
func DefaultConfigPool() ConfigPool {
return ConfigPool{
MinWorkers: 2,
MaxWorkers: 10,
QueueSize: 100,
IdleTimeout: 30 * time.Second,
ScaleUpThreshold: 5,
ScaleDownThreshold: 2,
ScaleUpCooldown: 5 * time.Second,
ScaleDownCooldown: 10 * time.Second,
EnableMetrics: true,
}
}
// HighPerformanceConfig returns a configuration optimized for high throughput
func HighPerformanceConfigPool() ConfigPool {
return ConfigPool{
MinWorkers: 5,
MaxWorkers: 50,
QueueSize: 1000,
IdleTimeout: 60 * time.Second,
ScaleUpThreshold: 10,
ScaleDownThreshold: 3,
ScaleUpCooldown: 2 * time.Second,
ScaleDownCooldown: 15 * time.Second,
EnableMetrics: true,
}
}
// ResourceConstrainedConfig returns a configuration for resource-constrained environments
func ResourceConstrainedConfigPool() ConfigPool {
return ConfigPool{
MinWorkers: 1,
MaxWorkers: 5,
QueueSize: 50,
IdleTimeout: 15 * time.Second,
ScaleUpThreshold: 3,
ScaleDownThreshold: 1,
ScaleUpCooldown: 10 * time.Second,
ScaleDownCooldown: 5 * time.Second,
EnableMetrics: false,
}
}
// EnterpriseConfig returns a configuration for enterprise environments
func EnterpriseConfigPool() ConfigPool {
return ConfigPool{
MinWorkers: 10,
MaxWorkers: 200,
QueueSize: 5000,
IdleTimeout: 5 * time.Minute,
ScaleUpThreshold: 20,
ScaleDownThreshold: 5,
ScaleUpCooldown: 1 * time.Second,
ScaleDownCooldown: 30 * time.Second,
EnableMetrics: true,
}
}
// JobPool represents a task to be executed by a worker
type JobPool struct {
ID string `json:"id"`
Task func() (interface{}, error) `json:"-"`
Priority int `json:"priority"`
Timeout time.Duration `json:"timeout"`
Created time.Time `json:"created"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// JobResultPool represents the result of a job execution
type JobResultPool struct {
JobID string `json:"job_id"`
Result interface{} `json:"result"`
Error error `json:"error"`
Duration time.Duration `json:"duration"`
WorkerID int `json:"worker_id"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// PoolStats holds worker pool statistics - all atomic for thread safety
type PoolStatsPool struct {
ActiveWorkers atomic.Int64
IdleWorkers atomic.Int64
TotalWorkers atomic.Int64
QueuedJobs atomic.Int64
CompletedJobs atomic.Int64
FailedJobs atomic.Int64
TotalJobTime atomic.Int64 // nanoseconds
AverageJobTime atomic.Int64 // nanoseconds
ScaleUpCount atomic.Int64
ScaleDownCount atomic.Int64
LastScaleUp atomic.Value // time.Time
LastScaleDown atomic.Value // time.Time
JobsPerSecond atomic.Value // float64
QueueUtilization atomic.Value // float64
}
// HealthStatus represents the health status of the worker pool
type HealthStatus struct {
IsHealthy bool `json:"is_healthy"`
IsClosed bool `json:"is_closed"`
LastCheck time.Time `json:"last_check"`
ErrorCount int64 `json:"error_count"`
WorkerCount int64 `json:"worker_count"`
QueueSize int64 `json:"queue_size"`
Utilization float64 `json:"utilization"`
Issues []string `json:"issues,omitempty"`
}
// New creates a new worker pool with the given configuration
func NewPool(config ConfigPool) (*WorkerPool, error) {
if err := validateConfigPool(config); err != nil {
return nil, fmt.Errorf("%w: %v", ErrPoolInvalidConfigPool, err)
}
ctx, cancel := context.WithCancel(context.Background())
pool := &WorkerPool{
config: config,
jobQueue: make(chan JobPool, config.QueueSize),
jobResults: make(chan JobResultPool, config.QueueSize),
stats: &PoolStatsPool{},
ctx: ctx,
cancel: cancel,
}
// Initialize health status
pool.healthStatus.Store(&HealthStatus{
IsHealthy: true,
LastCheck: time.Now(),
})
// Initialize workers
if err := pool.initializeWorkers(); err != nil {
cancel()
return nil, fmt.Errorf("%w: %v", ErrPoolWorkerCreationFailed, err)
}
// Start background processes
pool.startBackgroundProcesses()
return pool, nil
}
// validateConfig validates the pool configuration
func validateConfigPool(config ConfigPool) error {
if config.MinWorkers < MinWorkersLimitPool || config.MinWorkers > MaxWorkersLimitPool {
return fmt.Errorf("min workers must be between %d and %d, got %d", MinWorkersLimitPool, MaxWorkersLimitPool, config.MinWorkers)
}
if config.MaxWorkers < MinWorkersLimitPool || config.MaxWorkers > MaxWorkersLimitPool {
return fmt.Errorf("max workers must be between %d and %d, got %d", MinWorkersLimitPool, MaxWorkersLimitPool, config.MaxWorkers)
}
if config.MinWorkers > config.MaxWorkers {
return fmt.Errorf("min workers (%d) cannot exceed max workers (%d)", config.MinWorkers, config.MaxWorkers)
}
if config.QueueSize < MinQueueSizeLimitPool || config.QueueSize > MaxQueueSizeLimitPool {
return fmt.Errorf("queue size must be between %d and %d, got %d", MinQueueSizeLimitPool, MaxQueueSizeLimitPool, config.QueueSize)
}
if config.IdleTimeout < MinIdleTimeoutLimitPool || config.IdleTimeout > MaxIdleTimeoutLimitPool {
return fmt.Errorf("idle timeout must be between %v and %v, got %v", MinIdleTimeoutLimitPool, MaxIdleTimeoutLimitPool, config.IdleTimeout)
}
if config.ScaleUpThreshold < MinScaleUpThresholdLimitPool || config.ScaleUpThreshold > MaxScaleUpThresholdLimitPool {
return fmt.Errorf("scale up threshold must be between %d and %d, got %d", MinScaleUpThresholdLimitPool, MaxScaleUpThresholdLimitPool, config.ScaleUpThreshold)
}
if config.ScaleDownThreshold < MinScaleDownThresholdLimitPool || config.ScaleDownThreshold > MaxScaleDownThresholdLimitPool {
return fmt.Errorf("scale down threshold must be between %d and %d, got %d", MinScaleDownThresholdLimitPool, MaxScaleDownThresholdLimitPool, config.ScaleDownThreshold)
}
if config.ScaleUpCooldown < MinScaleUpCooldownLimitPool || config.ScaleUpCooldown > MaxScaleUpCooldownLimitPool {
return fmt.Errorf("scale up cooldown must be between %v and %v, got %v", MinScaleUpCooldownLimitPool, MaxScaleUpCooldownLimitPool, config.ScaleUpCooldown)
}
if config.ScaleDownCooldown < MinScaleDownCooldownLimitPool || config.ScaleDownCooldown > MaxScaleDownCooldownLimitPool {
return fmt.Errorf("scale down cooldown must be between %v and %v, got %v", MinScaleDownCooldownLimitPool, MaxScaleDownCooldownLimitPool, config.ScaleDownCooldown)
}
return nil
}
// validateJob validates a job before submission
func validateJobPool(job JobPool) error {
if job.Task == nil {
return fmt.Errorf("job task function cannot be nil")
}
if job.Timeout < 0 {
return fmt.Errorf("job timeout cannot be negative")
}
if job.Timeout > 0 && (job.Timeout < MinJobTimeoutLimitPool || job.Timeout > MaxJobTimeoutLimitPool) {
return fmt.Errorf("job timeout must be between %v and %v, got %v", MinJobTimeoutLimitPool, MaxJobTimeoutLimitPool, job.Timeout)
}
return nil
}
// initializeWorkers creates the initial set of workers
func (wp *WorkerPool) initializeWorkers() error {
wp.workerPool = make(chan *Worker, wp.config.MaxWorkers)
wp.workers = make([]*Worker, 0, wp.config.MaxWorkers)
// Create minimum number of workers
for i := 0; i < wp.config.MinWorkers; i++ {
worker := wp.createWorker(i)
if worker == nil {
return fmt.Errorf("failed to create worker %d", i)
}
wp.workers = append(wp.workers, worker)
wp.workerPool <- worker
wp.stats.TotalWorkers.Add(1)
wp.stats.IdleWorkers.Add(1)
}
return nil
}
// createWorker creates a new worker
func (wp *WorkerPool) createWorker(id int) *Worker {
worker := &Worker{
id: id,
pool: wp,
jobChan: make(chan JobPool, 1),
stopChan: make(chan struct{}, 1), // Buffered to prevent blocking
stats: &WorkerStats{},
active: 0,
}
wp.wg.Add(1)
go worker.start()
return worker
}
// startBackgroundProcesses starts background monitoring and scaling processes
func (wp *WorkerPool) startBackgroundProcesses() {
// Start job result processor
go wp.processJobResults()
// Start auto-scaling if enabled and metrics are enabled
if wp.config.MaxWorkers > wp.config.MinWorkers && wp.config.EnableMetrics {
go wp.autoScale()
}
// Start metrics collection if enabled
if wp.config.EnableMetrics {
go wp.collectMetrics()
}
// Start health monitoring
go wp.monitorHealth()
}
// Submit submits a job to the worker pool
func (wp *WorkerPool) Submit(job JobPool) error {
if atomic.LoadInt32(&wp.closed) == 1 {
return ErrPoolClosed
}
// Validate job
if err := validateJobPool(job); err != nil {
return fmt.Errorf("%w: %v", ErrPoolInvalidJob, err)
}
// Set default values
if job.ID == "" {
job.ID = fmt.Sprintf("job-%d", time.Now().UnixNano())
}
if job.Created.IsZero() {
job.Created = time.Now()
}
if job.Timeout == 0 {
job.Timeout = DefaultJobTimeoutPool
}
// Try to submit job (this provides backpressure)
select {
case wp.jobQueue <- job:
wp.stats.QueuedJobs.Add(1)
return nil
default:
return ErrPoolJobQueueFull
}
}
// SubmitWithContext submits a job with context support
func (wp *WorkerPool) SubmitWithContext(ctx context.Context, job JobPool) error {
if atomic.LoadInt32(&wp.closed) == 1 {
return ErrPoolClosed
}
// Validate context
if ctx == nil {
return fmt.Errorf("%w: context cannot be nil", ErrPoolInvalidJob)
}
// Validate job
if err := validateJobPool(job); err != nil {
return fmt.Errorf("%w: %v", ErrPoolInvalidJob, err)
}
// Set default values
if job.ID == "" {
job.ID = fmt.Sprintf("job-%d", time.Now().UnixNano())
}
if job.Created.IsZero() {
job.Created = time.Now()
}
if job.Timeout == 0 {
job.Timeout = DefaultJobTimeoutPool
}
// Try to submit job with context
select {
case wp.jobQueue <- job:
wp.stats.QueuedJobs.Add(1)
return nil
case <-ctx.Done():
return fmt.Errorf("%w: %v", ErrPoolContextCancelled, ctx.Err())
default:
return ErrPoolJobQueueFull
}
}
// SubmitBatch submits multiple jobs as a batch
func (wp *WorkerPool) SubmitBatch(jobs []JobPool) ([]error, error) {
if atomic.LoadInt32(&wp.closed) == 1 {
return nil, ErrPoolClosed
}
if len(jobs) == 0 {
return nil, fmt.Errorf("%w: batch cannot be empty", ErrPoolInvalidJob)
}
errors := make([]error, len(jobs))
var batchError error
successCount := 0
for i, job := range jobs {
if err := wp.Submit(JobPool(job)); err != nil {
errors[i] = err
if batchError == nil {
batchError = fmt.Errorf("%w: %d/%d jobs failed", ErrPoolInvalidJob, len(jobs)-successCount, len(jobs))
}
} else {
successCount++
}
}
return errors, batchError
}
// GetResult retrieves a job result
func (wp *WorkerPool) GetResult() (JobResultPool, error) {
select {
case result := <-wp.jobResults:
return result, nil
case <-wp.ctx.Done():
return JobResultPool{}, ErrPoolClosed
default:
return JobResultPool{}, ErrPoolNoResultsAvailable
}
}
// GetResultWithTimeout retrieves a job result with timeout
func (wp *WorkerPool) GetResultWithTimeout(timeout time.Duration) (JobResultPool, error) {
if timeout <= 0 {
return JobResultPool{}, fmt.Errorf("%w: timeout must be positive", ErrPoolTimeout)
}
select {
case result := <-wp.jobResults:
return result, nil
case <-time.After(timeout):
return JobResultPool{}, ErrPoolTimeout
case <-wp.ctx.Done():
return JobResultPool{}, ErrPoolClosed
}
}
// GetStats returns current pool statistics
func (wp *WorkerPool) GetStats() *PoolStatsPool {
// Calculate derived metrics
completedJobs := wp.stats.CompletedJobs.Load()
totalJobTime := wp.stats.TotalJobTime.Load()
if completedJobs > 0 {
wp.stats.AverageJobTime.Store(totalJobTime / completedJobs)
}
// Calculate jobs per second (rolling average)
wp.calculateJobsPerSecond()
// Calculate queue utilization
queuedJobs := wp.stats.QueuedJobs.Load()
queueSize := int64(wp.config.QueueSize)
if queueSize > 0 {
utilization := float64(queuedJobs) / float64(queueSize)
wp.stats.QueueUtilization.Store(utilization)
}
return wp.stats
}
// calculateJobsPerSecond calculates the jobs per second rate
func (wp *WorkerPool) calculateJobsPerSecond() {
// This is a simplified calculation - in production you might want a more sophisticated rolling average
completedJobs := wp.stats.CompletedJobs.Load()
if completedJobs > 0 {
// Simple calculation - in production you'd track time windows
wp.stats.JobsPerSecond.Store(float64(completedJobs) / time.Since(wp.lastHealthCheck).Seconds())
}
}
// ScaleUp adds workers to the pool
func (wp *WorkerPool) ScaleUp(count int) error {
if count <= 0 {
return fmt.Errorf("%w: scale up count must be positive", ErrPoolScalingFailed)
}
wp.mu.Lock()
defer wp.mu.Unlock()
currentWorkers := len(wp.workers)
maxNewWorkers := wp.config.MaxWorkers - currentWorkers
if maxNewWorkers <= 0 {
return fmt.Errorf("%w: cannot scale up - already at maximum workers (%d)", ErrPoolScalingFailed, wp.config.MaxWorkers)
}
if count > maxNewWorkers {
count = maxNewWorkers
}
for i := 0; i < count; i++ {
workerID := currentWorkers + i
worker := wp.createWorker(workerID)
if worker == nil {
return fmt.Errorf("%w: failed to create worker %d", ErrPoolScalingFailed, workerID)
}
wp.workers = append(wp.workers, worker)
wp.workerPool <- worker
wp.stats.TotalWorkers.Add(1)
wp.stats.IdleWorkers.Add(1)
}
wp.stats.ScaleUpCount.Add(1)
wp.stats.LastScaleUp.Store(time.Now())
return nil
}
// ScaleDown removes workers from the pool
func (wp *WorkerPool) ScaleDown(count int) error {
if count <= 0 {
return fmt.Errorf("%w: scale down count must be positive", ErrPoolScalingFailed)
}
wp.mu.Lock()
defer wp.mu.Unlock()
currentWorkers := len(wp.workers)
minWorkers := wp.config.MinWorkers
if currentWorkers <= minWorkers {
return fmt.Errorf("%w: cannot scale down - already at minimum workers (%d)", ErrPoolScalingFailed, minWorkers)
}
maxRemovable := currentWorkers - minWorkers
if count > maxRemovable {
count = maxRemovable
}
// Stop workers from the end of the list
for i := 0; i < count; i++ {
workerIndex := currentWorkers - 1 - i
worker := wp.workers[workerIndex]
worker.stop()
}
// Remove workers from slice
wp.workers = wp.workers[:currentWorkers-count]
// Update worker counts
wp.stats.TotalWorkers.Add(-int64(count))
wp.stats.IdleWorkers.Add(-int64(count))
wp.stats.ScaleDownCount.Add(1)
wp.stats.LastScaleDown.Store(time.Now())
return nil
}
// Close gracefully shuts down the worker pool
func (wp *WorkerPool) Close() error {
if !atomic.CompareAndSwapInt32(&wp.closed, 0, 1) {
return fmt.Errorf("worker pool already closed")
}
// Cancel context to stop background processes
wp.cancel()
// Wait a bit for background processes to stop
time.Sleep(100 * time.Millisecond)
// Stop all workers
workers := make([]*Worker, 0)
wp.mu.RLock()
workers = append(workers, wp.workers...)
wp.mu.RUnlock()
for _, worker := range workers {
worker.stop()
}
// Wait for all workers to finish with timeout
done := make(chan struct{})
go func() {
wp.wg.Wait()
close(done)
}()
select {
case <-done:
// All workers finished successfully
case <-time.After(GracefulShutdownTimeoutPool):
// Force shutdown after timeout
}
// Close channels
close(wp.jobQueue)
close(wp.jobResults)
close(wp.workerPool)
// Update health status
wp.updateHealthStatus(false, []string{"pool closed"})
return nil
}
// IsHealthy returns true if the worker pool is healthy
func (wp *WorkerPool) IsHealthy() bool {
health := wp.healthStatus.Load().(*HealthStatus)
return health.IsHealthy
}
// GetHealthStatus returns detailed health information
func (wp *WorkerPool) GetHealthStatus() *HealthStatus {
wp.healthMu.RLock()
defer wp.healthMu.RUnlock()
health := wp.healthStatus.Load().(*HealthStatus)
return health
}
// updateHealthStatus updates the health status
func (wp *WorkerPool) updateHealthStatus(isHealthy bool, issues []string) {
wp.healthMu.Lock()
defer wp.healthMu.Unlock()
stats := wp.GetStats()
health := &HealthStatus{
IsHealthy: isHealthy,
IsClosed: atomic.LoadInt32(&wp.closed) == 1,
LastCheck: time.Now(),
ErrorCount: stats.FailedJobs.Load(),
WorkerCount: stats.TotalWorkers.Load(),
QueueSize: stats.QueuedJobs.Load(),
Utilization: stats.QueueUtilization.Load().(float64),
Issues: issues,
}
wp.healthStatus.Store(health)
wp.lastHealthCheck = time.Now()
}
// processJobResults processes completed job results
func (wp *WorkerPool) processJobResults() {
for {
select {
case result := <-wp.jobResults:
wp.stats.QueuedJobs.Add(-1)
if result.Error != nil {
wp.stats.FailedJobs.Add(1)
} else {
wp.stats.CompletedJobs.Add(1)
}
// Update total job time atomically
wp.stats.TotalJobTime.Add(int64(result.Duration))
case <-wp.ctx.Done():
return
}
}
}
// autoScale automatically scales the worker pool based on load
func (wp *WorkerPool) autoScale() {
ticker := time.NewTicker(AutoScaleIntervalPool)
defer ticker.Stop()
var lastScaleUp, lastScaleDown time.Time
for {
select {
case <-ticker.C:
// Check if pool is closed before attempting to scale
if atomic.LoadInt32(&wp.closed) == 1 {
return
}
stats := wp.GetStats()
// Check if we should scale up
if stats.QueuedJobs.Load() >= int64(wp.config.ScaleUpThreshold) &&
time.Since(lastScaleUp) >= wp.config.ScaleUpCooldown &&
stats.TotalWorkers.Load() < int64(wp.config.MaxWorkers) {
scaleUpCount := 1
if stats.QueuedJobs.Load() >= int64(wp.config.ScaleUpThreshold*2) {
scaleUpCount = 2
}
// Scale up directly to avoid deadlocks
if err := wp.ScaleUp(scaleUpCount); err == nil {
lastScaleUp = time.Now()
}
}
// Check if we should scale down
if stats.IdleWorkers.Load() >= int64(wp.config.ScaleDownThreshold) &&
time.Since(lastScaleDown) >= wp.config.ScaleDownCooldown &&
stats.TotalWorkers.Load() > int64(wp.config.MinWorkers) {
scaleDownCount := 1
if stats.IdleWorkers.Load() >= int64(wp.config.ScaleDownThreshold*2) {
scaleDownCount = 2
}
// Scale down directly to avoid deadlocks
if err := wp.ScaleDown(scaleDownCount); err == nil {
lastScaleDown = time.Now()
}
}
case <-wp.ctx.Done():
return
}
}
}
// collectMetrics collects detailed metrics if enabled
func (wp *WorkerPool) collectMetrics() {
ticker := time.NewTicker(MetricsCollectionIntervalPool)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Update health status
wp.updateHealthStatus(true, nil)
case <-wp.ctx.Done():
return
}
}
}
// monitorHealth monitors the health of the worker pool
func (wp *WorkerPool) monitorHealth() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
stats := wp.GetStats()
var issues []string
// Check for potential issues
if stats.FailedJobs.Load() > stats.CompletedJobs.Load()/2 {
issues = append(issues, "high failure rate")
}
if stats.QueueUtilization.Load().(float64) > 0.9 {
issues = append(issues, "high queue utilization")
}
if stats.TotalWorkers.Load() == 0 {
issues = append(issues, "no workers available")
}
isHealthy := len(issues) == 0
wp.updateHealthStatus(isHealthy, issues)
case <-wp.ctx.Done():
return
}
}
}