Skip to content

Commit 27b8079

Browse files
committed
added context checks
1 parent dc88fa0 commit 27b8079

2 files changed

Lines changed: 48 additions & 24 deletions

File tree

config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func DefaultConfig() *Config {
3636
return &Config{
3737
ChunkSize: int(1e6), // 1M
3838
NumWorkers: 2,
39-
ChanBuffSize: 1,
39+
ChanBuffSize: 2,
4040
SortedChanBuffSize: 1000,
4141
TempFilesDir: "",
4242
}

sort_generic.go

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,13 @@ func (s *GenericSorter[E]) buildChunks() error {
256256
break
257257
}
258258

259+
select {
259260
// chunk is now full
260-
s.chunkChan <- c
261+
case s.chunkChan <- c:
262+
case <-s.buildSortCtx.Done():
263+
s.putChunk(c) // Return unused chunk to pool
264+
return s.buildSortCtx.Err()
265+
}
261266
}
262267

263268
return nil
@@ -345,19 +350,31 @@ func (s *GenericSorter[E]) outputSingleChunk(ctx context.Context) {
345350
// For single chunk: stores it in memory to avoid disk I/O
346351
// For multiple chunks: saves all chunks to disk normally
347352
func (s *GenericSorter[E]) saveChunksOptimized() error {
348-
// Get the first chunk
349-
firstChunk, ok := <-s.saveChunkChan
350-
if !ok {
351-
// No chunks at all
352-
return nil
353-
}
354-
355-
// Try to get a second chunk
356-
secondChunk, ok := <-s.saveChunkChan
357-
if !ok {
358-
// Only one chunk - single chunk optimization
359-
s.singleChunk = firstChunk
360-
return nil
353+
// Get the first chunk with context checking
354+
var firstChunk *genericChunk[E]
355+
var ok bool
356+
select {
357+
case firstChunk, ok = <-s.saveChunkChan:
358+
if !ok {
359+
// Channel closed, no chunks at all
360+
return nil
361+
}
362+
case <-s.saveCtx.Done():
363+
return s.saveCtx.Err()
364+
}
365+
366+
// Try to get a second chunk with context checking
367+
var secondChunk *genericChunk[E]
368+
select {
369+
case secondChunk, ok = <-s.saveChunkChan:
370+
if !ok {
371+
// Channel closed after first chunk - single chunk optimization
372+
s.singleChunk = firstChunk
373+
return nil
374+
}
375+
case <-s.saveCtx.Done():
376+
s.putChunk(firstChunk) // Return to pool before exiting
377+
return s.saveCtx.Err()
361378
}
362379

363380
// We have at least 2 chunks - use multi-chunk path
@@ -372,17 +389,24 @@ func (s *GenericSorter[E]) saveChunksOptimized() error {
372389
return err
373390
}
374391

375-
// Continue saving any remaining chunks
376-
for chunk := range s.saveChunkChan {
377-
if err := s.saveChunk(chunk); err != nil {
378-
return err
392+
// Continue saving any remaining chunks with context checking
393+
for {
394+
select {
395+
case chunk, ok := <-s.saveChunkChan:
396+
if !ok {
397+
// Channel closed, we're done
398+
// Finalize the temp writer and save it for reading
399+
var err error
400+
s.tempReader, err = s.tempWriter.Save()
401+
return err
402+
}
403+
if err := s.saveChunk(chunk); err != nil {
404+
return err
405+
}
406+
case <-s.saveCtx.Done():
407+
return s.saveCtx.Err()
379408
}
380409
}
381-
382-
// Finalize the temp writer and save it for reading
383-
var err error
384-
s.tempReader, err = s.tempWriter.Save()
385-
return err
386410
}
387411

388412
// saveChunk processes a single chunk

0 commit comments

Comments
 (0)