Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statsd: aggregate metrics from a chunk in batch #146

Open
wants to merge 1 commit into
base: aiven-release-v1.32.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 108 additions & 100 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error {
case s.in <- input{
Buffer: b,
Time: time.Now(),
Addr: addr.IP.String()}:
Addr: addr.IP.String(),
}:
s.PendingMessages.Set(int64(len(s.in)))
default:
s.UDPPacketsDrop.Incr(1)
Expand Down Expand Up @@ -541,6 +542,8 @@ func (s *Statsd) parser() error {
start := time.Now()
lines := strings.Split(in.Buffer.String(), "\n")
s.bufPool.Put(in.Buffer)
// Each parsed line might pruduce multiple metrics.
metrics := make([]metric, 0)
for _, line := range lines {
line = strings.TrimSpace(line)
switch {
Expand All @@ -554,15 +557,18 @@ func (s *Statsd) parser() error {
s.Log.Debugf(" line was: %s", line)
}
default:
if err := s.parseStatsdLine(line); err != nil {
ms, err := s.parseStatsdLine(line)
if err != nil {
if !errors.Is(err, errParsing) {
// Ignore parsing errors but error out on
// everything else...
return err
}
}
metrics = append(metrics, ms...)
}
}
s.aggregate(metrics...)
elapsed := time.Since(start)
s.ParseTimeNS.Set(elapsed.Nanoseconds())
}
Expand All @@ -571,7 +577,7 @@ func (s *Statsd) parser() error {

// parseStatsdLine will parse the given statsd line, validating it as it goes.
// If the line is valid, it will be cached for the next call to Gather()
func (s *Statsd) parseStatsdLine(line string) error {
func (s *Statsd) parseStatsdLine(line string) ([]metric, error) {
lineTags := make(map[string]string)
if s.DataDogExtensions {
recombinedSegments := make([]string, 0)
Expand Down Expand Up @@ -601,12 +607,13 @@ func (s *Statsd) parseStatsdLine(line string) error {
bits := strings.Split(line, ":")
if len(bits) < 2 {
s.Log.Errorf("Splitting ':', unable to parse metric: %s", line)
return errParsing
return nil, errParsing
}

// Extract bucket name from individual metric bits
bucketName, bits := bits[0], bits[1:]

metrics := make([]metric, 0)
// Add a metric for each bit available
for _, bit := range bits {
m := metric{}
Expand All @@ -617,7 +624,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
pipesplit := strings.Split(bit, "|")
if len(pipesplit) < 2 {
s.Log.Errorf("Splitting '|', unable to parse metric: %s", line)
return errParsing
return nil, errParsing
} else if len(pipesplit) > 2 {
sr := pipesplit[2]

Expand All @@ -641,14 +648,14 @@ func (s *Statsd) parseStatsdLine(line string) error {
m.mtype = pipesplit[1]
default:
s.Log.Errorf("Metric type %q unsupported", pipesplit[1])
return errParsing
return nil, errParsing
}

// Parse the value
if strings.HasPrefix(pipesplit[0], "-") || strings.HasPrefix(pipesplit[0], "+") {
if m.mtype != "g" && m.mtype != "c" {
s.Log.Errorf("+- values are only supported for gauges & counters, unable to parse metric: %s", line)
return errParsing
return nil, errParsing
}
m.additive = true
}
Expand All @@ -658,7 +665,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
v, err := strconv.ParseFloat(pipesplit[0], 64)
if err != nil {
s.Log.Errorf("Parsing value to float64, unable to parse metric: %s", line)
return errParsing
return nil, errParsing
}
m.floatvalue = v
case "c":
Expand All @@ -668,7 +675,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
v2, err2 := strconv.ParseFloat(pipesplit[0], 64)
if err2 != nil {
s.Log.Errorf("Parsing value to int64, unable to parse metric: %s", line)
return errParsing
return nil, errParsing
}
v = int64(v2)
}
Expand Down Expand Up @@ -719,11 +726,10 @@ func (s *Statsd) parseStatsdLine(line string) error {
sort.Strings(tg)
tg = append(tg, m.name)
m.hash = strings.Join(tg, "")

s.aggregate(m)
metrics = append(metrics, m)
}

return nil
return metrics, nil
}

// parseName parses the given bucket name with the list of bucket maps in the
Expand Down Expand Up @@ -808,107 +814,109 @@ func parseKeyValue(keyValue string) (key string, val string) {
// aggregate takes in a metric. It then
// aggregates and caches the current value(s). It does not deal with the
// Delete* options, because those are dealt with in the Gather function.
func (s *Statsd) aggregate(m metric) {
func (s *Statsd) aggregate(metrics ...metric) {
s.Lock()
defer s.Unlock()

switch m.mtype {
case "d":
if s.DataDogExtensions && s.DataDogDistributions {
cached := cacheddistributions{
name: m.name,
value: m.floatvalue,
tags: m.tags,
for _, m := range metrics {
switch m.mtype {
case "d":
if s.DataDogExtensions && s.DataDogDistributions {
cached := cacheddistributions{
name: m.name,
value: m.floatvalue,
tags: m.tags,
}
s.distributions = append(s.distributions, cached)
}
s.distributions = append(s.distributions, cached)
}
case "ms", "h":
// Check if the measurement exists
cached, ok := s.timings[m.hash]
if !ok {
cached = cachedtimings{
name: m.name,
fields: make(map[string]RunningStats),
tags: m.tags,
case "ms", "h":
// Check if the measurement exists
cached, ok := s.timings[m.hash]
if !ok {
cached = cachedtimings{
name: m.name,
fields: make(map[string]RunningStats),
tags: m.tags,
}
}
}
// Check if the field exists. If we've not enabled multiple fields per timer
// this will be the default field name, eg. "value"
field, ok := cached.fields[m.field]
if !ok {
field = RunningStats{
PercLimit: s.PercentileLimit,
// Check if the field exists. If we've not enabled multiple fields per timer
// this will be the default field name, eg. "value"
field, ok := cached.fields[m.field]
if !ok {
field = RunningStats{
PercLimit: s.PercentileLimit,
}
}
}
if m.samplerate > 0 {
for i := 0; i < int(1.0/m.samplerate); i++ {
if m.samplerate > 0 {
for i := 0; i < int(1.0/m.samplerate); i++ {
field.AddValue(m.floatvalue)
}
} else {
field.AddValue(m.floatvalue)
}
} else {
field.AddValue(m.floatvalue)
}
cached.fields[m.field] = field
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.timings[m.hash] = cached
case "c":
// check if the measurement exists
cached, ok := s.counters[m.hash]
if !ok {
cached = cachedcounter{
name: m.name,
fields: make(map[string]interface{}),
tags: m.tags,
cached.fields[m.field] = field
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.timings[m.hash] = cached
case "c":
// check if the measurement exists
cached, ok := s.counters[m.hash]
if !ok {
cached = cachedcounter{
name: m.name,
fields: make(map[string]interface{}),
tags: m.tags,
}
}
}
// check if the field exists
_, ok = cached.fields[m.field]
if !ok {
cached.fields[m.field] = int64(0)
}
cached.fields[m.field] = cached.fields[m.field].(int64) + m.intvalue
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.counters[m.hash] = cached
case "g":
// check if the measurement exists
cached, ok := s.gauges[m.hash]
if !ok {
cached = cachedgauge{
name: m.name,
fields: make(map[string]interface{}),
tags: m.tags,
// check if the field exists
_, ok = cached.fields[m.field]
if !ok {
cached.fields[m.field] = int64(0)
}
cached.fields[m.field] = cached.fields[m.field].(int64) + m.intvalue
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.counters[m.hash] = cached
case "g":
// check if the measurement exists
cached, ok := s.gauges[m.hash]
if !ok {
cached = cachedgauge{
name: m.name,
fields: make(map[string]interface{}),
tags: m.tags,
}
}
// check if the field exists
_, ok = cached.fields[m.field]
if !ok {
cached.fields[m.field] = float64(0)
}
if m.additive {
cached.fields[m.field] = cached.fields[m.field].(float64) + m.floatvalue
} else {
cached.fields[m.field] = m.floatvalue
}
}
// check if the field exists
_, ok = cached.fields[m.field]
if !ok {
cached.fields[m.field] = float64(0)
}
if m.additive {
cached.fields[m.field] = cached.fields[m.field].(float64) + m.floatvalue
} else {
cached.fields[m.field] = m.floatvalue
}

cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.gauges[m.hash] = cached
case "s":
// check if the measurement exists
cached, ok := s.sets[m.hash]
if !ok {
cached = cachedset{
name: m.name,
fields: make(map[string]map[string]bool),
tags: m.tags,
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.gauges[m.hash] = cached
case "s":
// check if the measurement exists
cached, ok := s.sets[m.hash]
if !ok {
cached = cachedset{
name: m.name,
fields: make(map[string]map[string]bool),
tags: m.tags,
}
}
// check if the field exists
_, ok = cached.fields[m.field]
if !ok {
cached.fields[m.field] = make(map[string]bool)
}
cached.fields[m.field][m.strvalue] = true
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.sets[m.hash] = cached
}
// check if the field exists
_, ok = cached.fields[m.field]
if !ok {
cached.fields[m.field] = make(map[string]bool)
}
cached.fields[m.field][m.strvalue] = true
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.sets[m.hash] = cached
}
}

Expand Down
Loading
Loading