Skip to content

Commit 9a620a0

Browse files
authored
[chore] Small cleanups in the batcher code (#13163)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent efac6c7 commit 9a620a0

File tree

2 files changed

+35
-45
lines changed

2 files changed

+35
-45
lines changed

exporter/exporterhelper/internal/queuebatch/multi_batcher.go

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ import (
1313
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1414
)
1515

16+
type batcherSettings[T any] struct {
17+
sizerType request.SizerType
18+
sizer request.Sizer[T]
19+
partitioner Partitioner[T]
20+
next sender.SendFunc[T]
21+
maxWorkers int
22+
}
23+
1624
type multiBatcher struct {
1725
cfg BatchConfig
1826
workerPool chan struct{}
@@ -45,15 +53,7 @@ func newMultiBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) *m
4553
}
4654

4755
if bSet.partitioner == nil {
48-
mb.singleShard = &shardBatcher{
49-
cfg: bCfg,
50-
workerPool: mb.workerPool,
51-
sizerType: bSet.sizerType,
52-
sizer: bSet.sizer,
53-
consumeFunc: bSet.next,
54-
stopWG: sync.WaitGroup{},
55-
shutdownCh: make(chan struct{}, 1),
56-
}
56+
mb.singleShard = newShard(mb.cfg, mb.sizerType, mb.sizer, mb.workerPool, mb.consumeFunc)
5757
} else {
5858
mb.shards = xsync.NewMapOf[string, *shardBatcher]()
5959
}
@@ -67,15 +67,7 @@ func (mb *multiBatcher) getShard(ctx context.Context, req request.Request) *shar
6767

6868
key := mb.partitioner.GetKey(ctx, req)
6969
result, _ := mb.shards.LoadOrCompute(key, func() *shardBatcher {
70-
s := &shardBatcher{
71-
cfg: mb.cfg,
72-
workerPool: mb.workerPool,
73-
sizerType: mb.sizerType,
74-
sizer: mb.sizer,
75-
consumeFunc: mb.consumeFunc,
76-
stopWG: sync.WaitGroup{},
77-
shutdownCh: make(chan struct{}, 1),
78-
}
70+
s := newShard(mb.cfg, mb.sizerType, mb.sizer, mb.workerPool, mb.consumeFunc)
7971
s.start(ctx, nil)
8072
return s
8173
})

exporter/exporterhelper/internal/queuebatch/shard_batcher.go

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,6 @@ type batch struct {
2121
done multiDone
2222
}
2323

24-
type batcherSettings[T any] struct {
25-
sizerType request.SizerType
26-
sizer request.Sizer[T]
27-
partitioner Partitioner[T]
28-
next sender.SendFunc[T]
29-
maxWorkers int
30-
}
31-
3224
// shardBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
3325
type shardBatcher struct {
3426
cfg BatchConfig
@@ -43,6 +35,17 @@ type shardBatcher struct {
4335
shutdownCh chan struct{}
4436
}
4537

38+
func newShard(cfg BatchConfig, sizerType request.SizerType, sizer request.Sizer[request.Request], workerPool chan struct{}, next sender.SendFunc[request.Request]) *shardBatcher {
39+
return &shardBatcher{
40+
cfg: cfg,
41+
workerPool: workerPool,
42+
sizerType: sizerType,
43+
sizer: sizer,
44+
consumeFunc: next,
45+
shutdownCh: make(chan struct{}, 1),
46+
}
47+
}
48+
4649
func (qb *shardBatcher) resetTimer() {
4750
if qb.cfg.FlushTimeout > 0 {
4851
qb.timer.Reset(qb.cfg.FlushTimeout)
@@ -88,7 +91,7 @@ func (qb *shardBatcher) Consume(ctx context.Context, req request.Request, done D
8891
}
8992

9093
reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, req)
91-
// If failed to merge signal all Done callbacks from current batch as well as the current request and reset the current batch.
94+
// If failed to merge signal all Done callbacks from the current batch as well as the current request and reset the current batch.
9295
if mergeSplitErr != nil || len(reqList) == 0 {
9396
done.OnDone(mergeSplitErr)
9497
qb.currentBatchMu.Unlock()
@@ -102,7 +105,7 @@ func (qb *shardBatcher) Consume(ctx context.Context, req request.Request, done D
102105

103106
// We have at least one result in the reqList, if more results here is what that means:
104107
// - First result will contain items from the current batch + some results from the current request.
105-
// - All other results except first will contain items only from current request.
108+
// - All other results except first will contain items only from the current request.
106109
// - Last result may not have enough data to be flushed.
107110

108111
// Logic on how to deal with the current batch:
@@ -145,27 +148,22 @@ func (qb *shardBatcher) Consume(ctx context.Context, req request.Request, done D
145148
}
146149
}
147150

148-
// startTimeBasedFlushingGoroutine starts a goroutine that flushes on timeout.
149-
func (qb *shardBatcher) startTimeBasedFlushingGoroutine() {
150-
qb.stopWG.Add(1)
151-
go func() {
152-
defer qb.stopWG.Done()
153-
for {
154-
select {
155-
case <-qb.shutdownCh:
156-
return
157-
case <-qb.timer.C:
158-
qb.flushCurrentBatchIfNecessary()
159-
}
160-
}
161-
}()
162-
}
163-
164151
// Start starts the goroutine that reads from the queue and flushes asynchronously.
165152
func (qb *shardBatcher) start(_ context.Context, _ component.Host) {
166153
if qb.cfg.FlushTimeout > 0 {
167154
qb.timer = time.NewTimer(qb.cfg.FlushTimeout)
168-
qb.startTimeBasedFlushingGoroutine()
155+
qb.stopWG.Add(1)
156+
go func() {
157+
defer qb.stopWG.Done()
158+
for {
159+
select {
160+
case <-qb.shutdownCh:
161+
return
162+
case <-qb.timer.C:
163+
qb.flushCurrentBatchIfNecessary()
164+
}
165+
}
166+
}()
169167
}
170168
}
171169

0 commit comments

Comments
 (0)