Skip to content

Commit

Permalink
refactor the overall service
Browse files Browse the repository at this point in the history
Signed-off-by: Jeeva Kandasamy <[email protected]>
  • Loading branch information
jkandasa committed Mar 30, 2024
1 parent a6f8d19 commit 546d5cd
Show file tree
Hide file tree
Showing 28 changed files with 681 additions and 438 deletions.
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ You can have more than one adapter configurations
Provider options: `mysensors_v2` and `raw`
```yaml
logger:
mode: development # logger mode: development, production
encoding: console # encoding options: console, json
level: info # log levels: debug, info, warn, error, fatal
mode: development # logger mode: development, production
encoding: console # encoding options: console, json
level: info # log levels: debug, info, warn, error, fatal
enable_stacktrace: false # enable or disable error stack trace

adapters: # you can have more than one adapter
- name: adapter1 # name of the adapter
Expand All @@ -63,6 +64,7 @@ adapters: # you can have more than one adapter
qos: 0 # qos number: 0, 1, 2
transmit_pre_delay: 0s
reconnect_delay: 5s
connection_timeout: 30s # mqtt connection timeout (default 30 seconds)
```
### Source device configuration
Based on the source type the configurations will be different.
Expand Down Expand Up @@ -136,9 +138,10 @@ In `raw` provider we can add script to support custom specification. If we leave
### Configuration file with raw provider and a script support
```yaml
logger:
mode: development # logger mode: development, production
encoding: console # encoding options: console, json
level: info # log levels: debug, info, warn, error, fatal
mode: development # logger mode: development, production
encoding: console # encoding options: console, json
level: info # log levels: debug, info, warn, error, fatal
enable_stacktrace: false # enable or disable error stack trace

adapters: # you can have more than one adapter
- name: adapter1 # name of the adapter
Expand All @@ -160,6 +163,7 @@ adapters: # you can have more than one adapter
qos: 0 # qos number: 0, 1, 2
transmit_pre_delay: 0s
reconnect_delay: 5s
connection_timeout: 30s # mqtt connection timeout (default 30 seconds)
formatter_script: # script used to perform custom formatting
to_mqtt: |
// your multiline javascript
Expand Down
37 changes: 37 additions & 0 deletions cmd/helper/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package helper

import (
"context"

"github.com/mycontroller-org/2mqtt/pkg/version"
"go.uber.org/zap"

contextTY "github.com/mycontroller-org/2mqtt/pkg/types/context"

cfgTY "github.com/mycontroller-org/2mqtt/pkg/types/config"
coreScheduler "github.com/mycontroller-org/server/v2/pkg/service/core_scheduler"
schedulerTY "github.com/mycontroller-org/server/v2/pkg/types/scheduler"
loggerUtils "github.com/mycontroller-org/server/v2/pkg/utils/logger"
)

// loads logger
func loadLogger(ctx context.Context, cfg cfgTY.LoggerConfig) (context.Context, *zap.Logger) {
logger := loggerUtils.GetLogger(cfg.Mode, cfg.Level, cfg.Encoding, false, 0, cfg.EnableStacktrace)
logger.Info("welcome to the 2mqtt adapter server :)")
ver := version.Get()
logger.Info("server information", zap.Any("version", ver), zap.Any("logger", cfg))

// in some places still using "z.L()...", which needs global logger should be enabled
// enabling global logger.
// to fix this, do `grep -rl "zap\.L()"` and fix those manually.
zap.ReplaceGlobals(logger)

return contextTY.LoggerWithContext(ctx, logger), logger
}

// load core scheduler
func loadCoreScheduler(ctx context.Context) (context.Context, schedulerTY.CoreScheduler) {
coreScheduler := coreScheduler.New()
ctx = schedulerTY.WithContext(ctx, coreScheduler)
return ctx, coreScheduler
}
81 changes: 81 additions & 0 deletions cmd/helper/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package helper

import (
"context"
"time"

adapterSVC "github.com/mycontroller-org/2mqtt/pkg/service/adapter"
"github.com/mycontroller-org/2mqtt/pkg/service/scheduler"
"github.com/mycontroller-org/2mqtt/pkg/types/config"
schedulerTY "github.com/mycontroller-org/server/v2/pkg/types/scheduler"
"go.uber.org/zap"
)

type ToMqtt struct {
ctx context.Context
config *config.Config
logger *zap.Logger

// services
coreSchedulerSVC schedulerTY.CoreScheduler // core scheduler, used to execute all the cron jobs
}

func (g *ToMqtt) Start(ctx context.Context, cfg *config.Config) error {
startTime := time.Now()

g.ctx = ctx

// load logger
ctx, logger := loadLogger(ctx, cfg.Logger)

// get core scheduler and inject into context
ctx, coreScheduler := loadCoreScheduler(ctx)
err := coreScheduler.Start()
if err != nil {
logger.Error("error on starting core scheduler", zap.Error(err))
return err
}

// inject custom scheduler into context
customScheduler, err := scheduler.New(ctx)
if err != nil {
logger.Error("error on loading custom scheduler", zap.Error(err))
return err
}
ctx = scheduler.WithContext(ctx, customScheduler)

// add into struct
g.ctx = ctx
g.config = cfg
g.logger = logger
g.coreSchedulerSVC = coreScheduler

// start adapter services
err = adapterSVC.Start(ctx, cfg.Adapters)
if err != nil {
logger.Error("error on starting adapter services", zap.Error(err))
return err
}

logger.Info("services are started", zap.String("timeTaken", time.Since(startTime).String()))

// call shutdown hook
shutdownHook := NewShutdownHook(g.logger, g.stop)
shutdownHook.Start()

return nil
}

func (g *ToMqtt) stop() {
// stop services

// stop adapter services
g.logger.Debug("closing adapter services")
adapterSVC.Close()

g.logger.Debug("closing core scheduler")
// stop core scheduler
if err := g.coreSchedulerSVC.Close(); err != nil {
g.logger.Error("error on closing core scheduler", zap.Error(err))
}
}
77 changes: 77 additions & 0 deletions cmd/helper/shutdown_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package helper

import (
"os"
"os/signal"
"syscall"
"time"

"go.uber.org/zap"
)

var (
graceTerminationPeriod = time.Second * 30 // shutdown grace termination period
)

type ShutdownHook struct {
logger *zap.Logger
callbackFunc func()
}

func NewShutdownHook(logger *zap.Logger, callbackFunc func()) *ShutdownHook {
return &ShutdownHook{
logger: logger.Named("shutdown_hook"),
callbackFunc: callbackFunc,
}
}

func (sh *ShutdownHook) Start() {
sh.handleShutdownSignal()
}

// handel process shutdown signal
func (sh *ShutdownHook) handleShutdownSignal() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

// waiting for signal
sig := <-sigs
close(sigs)

sh.logger.Info("shutdown initiated..", zap.Any("signal", sig))
sh.triggerShutdown()
}

func (sh *ShutdownHook) triggerShutdown() {
start := time.Now()

// force termination block
ticker := time.NewTicker(graceTerminationPeriod)
done := make(chan bool)
go func() {
for {
select {
case <-done:
return
case <-ticker.C:
sh.logger.Warn("some services are not terminating on graceful period. Performing force termination", zap.String("gracePeriod", graceTerminationPeriod.String()))
os.Exit(-1)
}
}
}()

// trigger callback function
if sh.callbackFunc != nil {
sh.callbackFunc()
}

// stop force termination ticker
ticker.Stop()
done <- true

sh.logger.Info("closing services completed", zap.String("timeTaken", time.Since(start).String()))
sh.logger.Debug("bye, see you soon :)")

// stop web/api service
os.Exit(0)
}
7 changes: 4 additions & 3 deletions cmd/sub/generate_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ var generateConfigCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
sampleConfig := cfgTY.Config{
Logger: cfgTY.LoggerConfig{
Mode: "development",
Encoding: "console",
Level: "info",
Mode: "development",
Encoding: "console",
Level: "info",
EnableStacktrace: false,
},
Adapters: []cfgTY.AdapterConfig{
{
Expand Down
12 changes: 6 additions & 6 deletions cmd/sub/root.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package sub

import (
"context"
"fmt"
"os"

"github.com/mycontroller-org/2mqtt/pkg/service/start"
"github.com/mycontroller-org/2mqtt/cmd/helper"
cfgTY "github.com/mycontroller-org/2mqtt/pkg/types/config"
"github.com/mycontroller-org/2mqtt/pkg/version"
loggerUtils "github.com/mycontroller-org/server/v2/pkg/utils/logger"
Expand Down Expand Up @@ -50,11 +51,10 @@ var rootCmd = &cobra.Command{
logger.Fatal("failed to parse config file", zap.Error(err))
}

// load actual logger
start.InitLogger(cfg.Logger)

// start services
start.StartServices(cfg)
// start service
ctx := context.Background()
toMqtt := helper.ToMqtt{}
toMqtt.Start(ctx, cfg)

Check failure on line 57 in cmd/sub/root.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `toMqtt.Start` is not checked (errcheck)
},
}

Expand Down
48 changes: 25 additions & 23 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,41 +1,43 @@
module github.com/mycontroller-org/2mqtt

go 1.20
go 1.21

toolchain go1.21.4

require (
github.com/eclipse/paho.mqtt.golang v1.4.2
github.com/mycontroller-org/server/v2 v2.0.0-20230219022220-c22a66475357
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.1
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/mycontroller-org/server/v2 v2.0.1-0.20240330143153-d3837c02560c
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.9.0
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07
go.uber.org/zap v1.24.0
go.uber.org/zap v1.27.0
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dlclark/regexp2 v1.8.0 // indirect
github.com/dop251/goja v0.0.0-20230216180835-5937a312edda // indirect
github.com/dop251/goja_nodejs v0.0.0-20230207183254-2229640ea097 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dlclark/regexp2 v1.11.0 // indirect
github.com/dop251/goja v0.0.0-20240220182346-e401ed450204 // indirect
github.com/dop251/goja_nodejs v0.0.0-20240221231712-27eeffc9c235 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/robfig/cron/v3 v3.0.2-0.20210106135023-bc59245fe10e // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.mongodb.org/mongo-driver v1.11.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
go.mongodb.org/mongo-driver v1.14.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
)
Loading

0 comments on commit 546d5cd

Please sign in to comment.