Skip to content

Commit

Permalink
Merge pull request #27 from samcm/disable-txpool
Browse files Browse the repository at this point in the history
feat(consensus): Reduce polling frequency
  • Loading branch information
samcm authored Jun 23, 2022
2 parents 0f416b2 + fbcfddc commit 353eeaa
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 26 deletions.
9 changes: 2 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,15 @@ Naturally this means that the exporter is limited to metrics that are exposed by
## Usage

```
A tool to report the state of ethereum nodes
A tool to export the state of ethereum nodes
Usage:
ethereum-metrics-exporter [flags]
ethereum-metrics-exporter [command]
Available Commands:
completion Generate the autocompletion script for the specified shell
help Help about any command
serve Run a metrics server and poll the configured clients.
Flags:
--config string config file (default is $HOME/.ethereum-metrics-exporter.yaml)
--consensus-url string (optional) URL to the consensus node
--execution-modules strings (optional) execution modules that are enabled on the node
--execution-url string (optional) URL to the execution node
-h, --help help for ethereum-metrics-exporter
--metrics-port int Port to serve Prometheus metrics on (default 9090)
Expand Down
62 changes: 53 additions & 9 deletions pkg/exporter/consensus/jobs/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ func (b *Beacon) Name() string {
func (b *Beacon) Start(ctx context.Context) {
b.tick(ctx)

go b.getInitialData(ctx)

for {
select {
case <-ctx.Done():
Expand All @@ -164,22 +166,30 @@ func (b *Beacon) Start(ctx context.Context) {
}

func (b *Beacon) tick(ctx context.Context) {
for _, id := range []string{"head", "finalized"} {
if err := b.GetFinality(ctx, id); err != nil {
b.log.WithError(err).Error("Failed to get finality")
}

if err := b.GetSignedBeaconBlock(ctx, id); err != nil {
b.log.WithError(err).Error("Failed to get signed beacon block")
}

func (b *Beacon) getInitialData(ctx context.Context) {
for {
if b.client == nil {
time.Sleep(time.Second * 5)
continue
}

b.updateBeaconBlock(ctx)
b.updateFinalizedCheckpoint(ctx)

break
}
}

func (b *Beacon) HandleEvent(ctx context.Context, event *v1.Event) {
if event.Topic == EventTopicBlock {
if err := b.GetSignedBeaconBlock(ctx, "head"); err != nil {
b.log.WithError(err).Error("Failed to get signed beacon block")
}
b.handleBlockEvent(ctx, event)
}

if event.Topic == EventTopicFinalizedCheckpoint {
b.handleFinalizedCheckpointEvent(ctx, event)
}

if event.Topic == EventTopicChainReorg {
Expand All @@ -197,6 +207,40 @@ func (b *Beacon) handleChainReorg(event *v1.Event) {
b.ReOrgDepth.Add(float64(reorg.Depth))
}

func (b *Beacon) handleFinalizedCheckpointEvent(ctx context.Context, event *v1.Event) {
_, ok := event.Data.(*v1.FinalizedCheckpointEvent)
if !ok {
return
}

b.updateFinalizedCheckpoint(ctx)
}

func (b *Beacon) updateFinalizedCheckpoint(ctx context.Context) {
if err := b.GetFinality(ctx, "head"); err != nil {
b.log.WithError(err).Error("Failed to get finality")
}

if err := b.GetSignedBeaconBlock(ctx, "finalized"); err != nil {
b.log.WithError(err).Error("Failed to get signed beacon block")
}
}

func (b *Beacon) updateBeaconBlock(ctx context.Context) {
if err := b.GetSignedBeaconBlock(ctx, "head"); err != nil {
b.log.WithError(err).Error("Failed to get signed beacon block")
}
}

func (b *Beacon) handleBlockEvent(ctx context.Context, event *v1.Event) {
_, ok := event.Data.(*v1.BlockEvent)
if !ok {
return
}

b.updateBeaconBlock(ctx)
}

func (b *Beacon) GetSignedBeaconBlock(ctx context.Context, blockID string) error {
provider, isProvider := b.client.(eth2client.SignedBeaconBlockProvider)
if !isProvider {
Expand Down
35 changes: 32 additions & 3 deletions pkg/exporter/consensus/jobs/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jobs

import (
"context"
"time"

eth2client "github.com/attestantio/go-eth2-client"
v1 "github.com/attestantio/go-eth2-client/api/v1"
Expand All @@ -12,8 +13,11 @@ import (

// Event reports event counts.
type Event struct {
log logrus.FieldLogger
Count prometheus.CounterVec
log logrus.FieldLogger
Count prometheus.CounterVec
TimeSinceLastEvent prometheus.Gauge

LastEventTime time.Time
}

const (
Expand All @@ -38,15 +42,40 @@ func NewEventJob(client eth2client.Service, ap api.ConsensusClient, log logrus.F
"name",
},
),
TimeSinceLastEvent: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "time_since_last_subscription_event_ms",
Help: "The amount of time since the last subscription event (in milliseconds).",
ConstLabels: constLabels,
},
),
LastEventTime: time.Now(),
}
}

func (b *Event) Name() string {
return NameEvent
}

func (b *Event) Start(ctx context.Context) {}
func (b *Event) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 1):
b.tick(ctx)
}
}
}

//nolint:unparam // ctx will probably be used in the future
func (b *Event) tick(ctx context.Context) {
b.TimeSinceLastEvent.Set(float64(time.Since(b.LastEventTime).Milliseconds()))
}

func (b *Event) HandleEvent(ctx context.Context, event *v1.Event) {
b.Count.WithLabelValues(event.Topic).Inc()
b.LastEventTime = time.Now()
b.TimeSinceLastEvent.Set(0)
}
19 changes: 13 additions & 6 deletions pkg/exporter/consensus/jobs/general.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import (

// General reports general information about the node.
type General struct {
client eth2client.Service
api api.ConsensusClient
log logrus.FieldLogger
NodeVersion prometheus.GaugeVec
ClientName prometheus.GaugeVec
Peers prometheus.GaugeVec
client eth2client.Service
api api.ConsensusClient
log logrus.FieldLogger
NodeVersion prometheus.GaugeVec
ClientName prometheus.GaugeVec
Peers prometheus.GaugeVec
nodeVersionFetchedAt time.Time
}

const (
Expand Down Expand Up @@ -93,6 +94,10 @@ func (g *General) tick(ctx context.Context) {
}

func (g *General) GetNodeVersion(ctx context.Context) error {
if time.Since(g.nodeVersionFetchedAt) < (30 * time.Minute) {
return nil
}

provider, isProvider := g.client.(eth2client.NodeVersionProvider)
if !isProvider {
return errors.New("client does not implement eth2client.NodeVersionProvider")
Expand All @@ -106,6 +111,8 @@ func (g *General) GetNodeVersion(ctx context.Context) error {
g.NodeVersion.Reset()
g.NodeVersion.WithLabelValues(version).Set(1)

g.nodeVersionFetchedAt = time.Now()

return nil
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/exporter/consensus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func NewMetrics(client eth2client.Service, ap api.ConsensusClient, log logrus.Fi
prometheus.MustRegister(m.beaconMetrics.ReOrgDepth)

prometheus.MustRegister(m.eventMetrics.Count)
prometheus.MustRegister(m.eventMetrics.TimeSinceLastEvent)

return m
}
Expand All @@ -116,14 +117,18 @@ func (m *metrics) subscriptionLoop(ctx context.Context) {
subscribed := false

for {
if !subscribed {
if !subscribed && m.client != nil {
if err := m.startSubscriptions(ctx); err != nil {
m.log.Errorf("Failed to subscribe to eth2 node: %v", err)
} else {
subscribed = true
}
}

if subscribed && time.Since(m.eventMetrics.LastEventTime) > (2*time.Minute) {
subscribed = false
}

time.Sleep(5 * time.Second)
}
}
Expand Down

0 comments on commit 353eeaa

Please sign in to comment.