Skip to content

Commit

Permalink
Updates to batching code; minor project cleanup (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
Trevor Rosen authored Feb 3, 2020
1 parent f6aefa2 commit d751233
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 36 deletions.
6 changes: 2 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
.PHONY: build clean doc test vet

excluding_vendor := $(shell go list ./... | grep -v /vendor/)

lib_name := appoptics-go

build:
Expand All @@ -14,10 +12,10 @@ doc:
godoc -http=:8080 -index

test:
go test -v $(excluding_vendor)
go test ./...

live_test:
cd _live-tests && go test -v
cd _live-tests && go test ./...

super_test: test
super_test: live_test
Expand Down
20 changes: 10 additions & 10 deletions measurements_batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ type BatchPersister struct {
// batchChan is used to create MeasurementsBatches for persistence to AppOptics
batchChan chan *MeasurementsBatch
// stopBatchingChan is used to cease persisting MeasurementsBatches to AppOptics
stopBatchingChan chan bool
stopBatchingChan chan struct{}
// stopPersistingChan is used to cease persisting MeasurementsBatches to AppOptics
stopPersistingChan chan bool
stopPersistingChan chan struct{}
// stopErrorChan is used to cease the error checking for/select
stopErrorChan chan bool
stopErrorChan chan struct{}
// errorChan is used to tally errors that occur in batching/persisting
errorChan chan error
// maximumPushInterval is the max time (in milliseconds) to wait before pushing a batch whether its length is equal
Expand All @@ -54,9 +54,9 @@ func NewBatchPersister(mc MeasurementsCommunicator, sendStats bool) *BatchPersis
errorLimit: DefaultPersistenceErrorLimit,
prepChan: make(chan []Measurement),
batchChan: make(chan *MeasurementsBatch),
stopBatchingChan: make(chan bool),
stopErrorChan: make(chan bool),
stopPersistingChan: make(chan bool),
stopBatchingChan: make(chan struct{}),
stopErrorChan: make(chan struct{}),
stopPersistingChan: make(chan struct{}),
errorChan: make(chan error),
errors: []error{},
maximumPushInterval: 2000,
Expand All @@ -78,7 +78,7 @@ func (bp *BatchPersister) MeasurementsSink() chan<- []Measurement {
}

// MeasurementsStopBatchingChannel gives calling code write-only access to the Measurements batching control channel
func (bp *BatchPersister) MeasurementsStopBatchingChannel() chan<- bool {
func (bp *BatchPersister) MeasurementsStopBatchingChannel() chan<- struct{} {
return bp.stopBatchingChan
}

Expand Down Expand Up @@ -135,8 +135,8 @@ LOOP:
}
}
close(bp.batchChan)
bp.stopPersistingChan <- true
bp.stopErrorChan <- true
bp.stopPersistingChan <- struct{}{}
bp.stopErrorChan <- struct{}{}
break LOOP
}
}
Expand Down Expand Up @@ -186,7 +186,7 @@ LOOP:
case err := <-bp.errorChan:
bp.errors = append(bp.errors, err)
if len(bp.errors) == bp.errorLimit {
bp.stopBatchingChan <- true
bp.stopBatchingChan <- struct{}{}
break LOOP
}
case <-bp.stopErrorChan:
Expand Down
32 changes: 10 additions & 22 deletions measurements_batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package appoptics

import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
Expand All @@ -27,13 +28,11 @@ func TestBatchPersister(t *testing.T) {
for batch := range bp.batchChan {
batchCollection = append(batchCollection, batch)
if len(batchCollection) == batchCount {
bp.stopBatchingChan <- true
bp.stopBatchingChan <- struct{}{}
}
}

if len(batchCollection) != batchCount {
t.Errorf("expected batch count to be %d but was %d", batchCount, len(batchCollection))
}
assert.Equal(t, batchCount, len(batchCollection))
})

t.Run("respects push interval", func(t *testing.T) {
Expand Down Expand Up @@ -61,21 +60,19 @@ func TestBatchPersister(t *testing.T) {
for batch := range bp.batchChan {
batchCollection = append(batchCollection, batch)
if len(batchCollection[0].Measurements) == measurementsCount {
bp.stopBatchingChan <- true
bp.stopBatchingChan <- struct{}{}
}
}

measurementsInBatch := len(batchCollection[0].Measurements)

if measurementsInBatch != measurementsCount {
t.Errorf("expected Measurements in single batch to be %d but was %d", measurementsCount, measurementsInBatch)
}
assert.Equal(t, measurementsCount, measurementsInBatch)
})

t.Run("errors accumulate and stop batching at limit", func(t *testing.T) {
bp := NewBatchPersister(mms, false)
var pSig bool
var eSig bool
var pSig struct{}
var eSig struct{}
go bp.batchMeasurements()
go bp.managePersistenceErrors()

Expand All @@ -86,17 +83,8 @@ func TestBatchPersister(t *testing.T) {
pSig = <-bp.stopPersistingChan
eSig = <-bp.stopErrorChan

if !pSig {
t.Errorf("expected to find a value on the stopPersistingChan")
}

if !eSig {
t.Errorf("expected to find a value on the stopPersistingChan")
}

if len(bp.errors) != bp.errorLimit {
t.Errorf("expected errors count (%d) to equal error limit (%d) but it did not", len(bp.errors), bp.errorLimit)
}

assert.NotNil(t, pSig)
assert.NotNil(t, eSig)
assert.Equal(t, bp.errorLimit, len(bp.errors))
})
}

0 comments on commit d751233

Please sign in to comment.