Skip to content

Commit

Permalink
Grace interval for outdated schedules (#34)
Browse files Browse the repository at this point in the history
Sometimes you want to trigger schedules with an epoch set to now. But the message can take some time
to reach the kafka server and the scheduler. The scheduler will flag these schedules as outdated and will ignore them.
This change introduces a new configuration SCHEDULE_GRACE_INTERVAL that allows specifying a grace interval or window.

For example:
if SCHEDULE_GRACE_INTERVAL=3 and a schedule arrives with an epoch = now -1s, it will be accepted and triggered immediately.

Fixes #31 #28

Co-authored-by: fkarakas <[email protected]>
  • Loading branch information
fkarakas and fkarakas authored Jun 10, 2022
1 parent 10e210a commit 564c0f3
Show file tree
Hide file tree
Showing 16 changed files with 297 additions and 205 deletions.
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,20 @@ The scheduler is exposing metrics on a specific port (8001 by default) at the UR

The scheduler can be configured with environment variables:

| Env. Variable | Default | Description |
|----------------------|------------------|----------------------------------------------------------------------------------------------|
| `CONFIGURATION_FILE` | | Optional path to a YAML configuration file (see below) |
| `BOOTSTRAP_SERVERS` | `localhost:9092` | Kafka bootstrap servers list separated by comma |
| `SCHEDULES_TOPICS` | `schedules` | Topic list for incoming schedules separated by comma |
| `SINCE_DELTA` | `0` | Number of days to go back for considering missed schedules (0:today, -1: yesterday, etc ...) |
| `GROUP_ID` | `scheduler-cg` | Consumer group id for the scheduler consumer |
| `METRICS_HTTP_ADDR` | `:8001` | HTTP address where prometheus metrics will be exposed (URI /metrics) |
| `HISTORY_TOPIC` | `history` | Topic name where a copy of triggered schedules will be kept for auditing |
| Env. Variable | Default | Description |
|---------------------------|------------------|----------------------------------------------------------------------------------------------|
| `CONFIGURATION_FILE` | | Optional path to a YAML configuration file (see below) |
| `BOOTSTRAP_SERVERS` | `localhost:9092` | Kafka bootstrap servers list separated by comma |
| `SCHEDULES_TOPICS` | `schedules` | Topic list for incoming schedules separated by comma |
| `SINCE_DELTA` | `0` | Number of days to go back for considering missed schedules (0:today, -1: yesterday, etc ...) |
| `GROUP_ID` | `scheduler-cg` | Consumer group id for the scheduler consumer |
| `METRICS_HTTP_ADDR` | `:8001` | HTTP address where prometheus metrics will be exposed (URI /metrics) |
| `HISTORY_TOPIC` | `history` | Topic name where a copy of triggered schedules will be kept for auditing |
| `SCHEDULE_GRACE_INTERVAL` | | Interval to allow schedule with outdated epoch, grace interval: now-grace_interval and now |

## Schedule grace interval
A number of second between 0 and n, if you want to allow outdated schedules with 3s (grace interval will be between now-3s and now), then set the value
to 3. By default the value is 0, so no grace interval.

## YAML configuration file

Expand Down
2 changes: 1 addition & 1 deletion apiserver/rest/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func newServer() (srv rest.Server, closeFunc func()) {
sch := scheduler.New(hmap.New(), hmapcoll.New())
sch := scheduler.New(hmap.New(), hmapcoll.New(), nil)
sch.Start(scheduler.StartOfToday())

srv = rest.New(&sch)
Expand Down
7 changes: 5 additions & 2 deletions apiserver/rest/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ func TestServer_schedules_found(t *testing.T) {

now := time.Now()
epoch := now.Add(10 * time.Second)
s.Add(simple.NewSchedule("1", epoch, now))
err := s.Add(simple.NewSchedule("1", epoch, now))
if err != nil {
t.Errorf("unexpected error: %v", err)
}

ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFunc()
Expand All @@ -117,7 +120,7 @@ func TestServer_schedules_found(t *testing.T) {
}

list := []schedule{}
err := json.Unmarshal(response.Body.Bytes(), &list)
err = json.Unmarshal(response.Body.Bytes(), &list)
if err != nil {
t.Fatalf("unable to unmarshall json body: %v - %s", err, response.Body.Bytes())
}
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,7 @@ func SchedulesTopics() []string {
func HistoryTopic() string {
return getString("HISTORY_TOPIC", "history")
}

func ScheduleGraceInterval() int {
return getInt("SCHEDULE_GRACE_INTERVAL", 0)
}
6 changes: 3 additions & 3 deletions internal/store/hmap/hmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func (h Hmap) DeleteByFunc(f func(sch schedule.Schedule) bool) {

func (h Hmap) Add(s schedule.Schedule) {
sch := s
errs := schedule.CheckSchedule(s)
if len(errs) > 0 {
err := schedule.CheckSchedule(s)
if err != nil {
sch = schedule.InvalidSchedule{
Schedule: s,
Errors: errs,
Error: err,
}
}
h.events <- sch
Expand Down
10 changes: 5 additions & 5 deletions internal/timers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ func New() Timers {
return timers
}

func (ts Timers) Add(s schedule.Schedule) []error {
func (ts Timers) Add(s schedule.Schedule) error {
ts.mutex.Lock()
defer ts.mutex.Unlock()

errors := schedule.CheckSchedule(s)
if len(errors) > 0 {
return errors
err := schedule.CheckSchedule(s)
if err != nil {
return err
}

if s.Epoch() < time.Now().Unix() {
return []error{fmt.Errorf("%w : %+v", schedule.ErrOutdatedScheduleEpoch, s)}
return fmt.Errorf("%w : %+v", schedule.ErrOutdatedScheduleEpoch, s)
}

// if found, we stop the existing timer
Expand Down
2 changes: 1 addition & 1 deletion runner/kafka/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (k EventHandler) produceTargetMessage(msg kafka.Schedule) error {
func (k EventHandler) Handle(event scheduler.Event) {
switch evt := event.(type) {
case schedule.InvalidSchedule:
log.Printf("received an InvalidSchedule event: %T %+v errors=%v", evt, evt, evt.Errors)
log.Printf("received an InvalidSchedule event: %T %+v error=%v", evt, evt, evt.Error)
// when receiving an InvalidSchedule we should delete it from the topic, so it will not be
// triggered if the scheduler restarts
msg, ok := evt.Schedule.(kafka.Schedule)
Expand Down
43 changes: 30 additions & 13 deletions runner/kafka/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
)

type Config struct {
FilePath string
BootstrapServers string
HistoryTopic string
GroupID string
SessionTimeout int
SchedulesTopics []string
FilePath string
BootstrapServers string
HistoryTopic string
GroupID string
SessionTimeout int
SchedulesTopics []string
ScheduleGraceInterval uint
}

func (c Config) String() string {
Expand All @@ -44,6 +45,7 @@ type Runner struct {
config Config
since time.Time
stopChan chan bool
exitChan chan bool
collector instrument.Collector
}

Expand All @@ -53,12 +55,13 @@ func DefaultCollector() prometheus.Collector {

func DefaultConfig() Config {
return Config{
FilePath: config.ConfigurationFile(),
BootstrapServers: config.BootstrapServers(),
GroupID: config.GroupID(),
SchedulesTopics: config.SchedulesTopics(),
SessionTimeout: config.SessionTimeout(),
HistoryTopic: config.HistoryTopic(),
FilePath: config.ConfigurationFile(),
BootstrapServers: config.BootstrapServers(),
GroupID: config.GroupID(),
SchedulesTopics: config.SchedulesTopics(),
SessionTimeout: config.SessionTimeout(),
HistoryTopic: config.HistoryTopic(),
ScheduleGraceInterval: uint(config.ScheduleGraceInterval()),
}
}

Expand All @@ -79,12 +82,15 @@ func NewRunner(c Config, since time.Time, collector instrument.Collector) *Runne
config: c,
since: since,
stopChan: make(chan bool),
exitChan: make(chan bool),
collector: collector,
}
}

func (r Runner) Close() error {
r.stopChan <- true
<-r.exitChan
log.Printf("kafka runner closed")
return nil
}

Expand All @@ -94,6 +100,10 @@ type closer interface {
}

func (r *Runner) Start() error {
defer func() {
r.exitChan <- true
}()

var configFile config.File
var err error
if r.config.FilePath != "" {
Expand Down Expand Up @@ -129,7 +139,13 @@ func (r *Runner) Start() error {
}
defer store.Close()

sch := scheduler.New(store, r.collector)
var outdatedStrategy scheduler.OutdatedScheduleStrategy
if i := r.config.ScheduleGraceInterval; i != 0 {
fmt.Printf("@@@ %v\n", i)
outdatedStrategy = scheduler.NewOutdatedScheduleStrategyBySecond(i)
}

sch := scheduler.New(store, r.collector, outdatedStrategy)
sch.Start(r.since)

srv := rest.New(&sch)
Expand All @@ -146,6 +162,7 @@ loop:
}
handler.Handle(event)
case <-r.stopChan:
log.Printf("closing kafka runner")
err := srv.Stop()
if err != nil {
log.Errorf("error when stopping api server: %v", err)
Expand Down
Loading

0 comments on commit 564c0f3

Please sign in to comment.