diff --git a/admin/admin.go b/admin/admin.go new file mode 100644 index 0000000..ae03c7c --- /dev/null +++ b/admin/admin.go @@ -0,0 +1,47 @@ +package admin + +import ( + "github.com/peter-mount/go-kernel" + "github.com/peter-mount/nre-feeds/bin" + "github.com/peter-mount/nre-feeds/darwind3/client" + "github.com/peter-mount/nre-feeds/util/worker" +) + +type Admin struct { + taskQueue *worker.TaskQueue + config *bin.Config +} + +func (a *Admin) Name() string { + return "Admin" +} + +func (a *Admin) Init(k *kernel.Kernel) error { + service, err := k.AddService(&bin.Config{}) + if err != nil { + return err + } + a.config = service.(*bin.Config) + + service, err = k.AddService(&worker.TaskQueue{}) + if err != nil { + return err + } + a.taskQueue = service.(*worker.TaskQueue) + + return err +} + +func (a *Admin) Start() error { + a.taskQueue.SetContext("d3", &client.DarwinD3Client{Url: a.config.Services.DarwinD3}) + + a.config.RabbitMQ.ConnectionName = "darwin admin" + + if err := a.config.RabbitMQ.Connect(); err != nil { + return err + } + + a.taskQueue.SetContext("mq", &a.config.RabbitMQ) + + return nil +} diff --git a/admin/bin/main.go b/admin/bin/main.go new file mode 100644 index 0000000..5ba04b2 --- /dev/null +++ b/admin/bin/main.go @@ -0,0 +1,16 @@ +package main + +import ( + "github.com/peter-mount/go-kernel" + "github.com/peter-mount/nre-feeds/admin/messages" + "log" +) + +func main() { + err := kernel.Launch( + &messages.Messages{}, + ) + if err != nil { + log.Fatal(err) + } +} diff --git a/admin/messages/getMessages.go b/admin/messages/getMessages.go new file mode 100644 index 0000000..7c001f5 --- /dev/null +++ b/admin/messages/getMessages.go @@ -0,0 +1,33 @@ +package messages + +import ( + "context" + "github.com/peter-mount/nre-feeds/darwind3/client" + "github.com/peter-mount/nre-feeds/util/worker" + "time" +) + +type getStationMessages struct { +} + +func (m *getStationMessages) Name() string { + return "Retrieve all active station messages" +} + +func (m *getStationMessages) Run(ctx context.Context) error { + d3Client := ctx.Value("d3").(*client.DarwinD3Client) + + messages, err := d3Client.GetStationMessages() + if err != nil { + return err + } + + limit := time.Now().Add(-24 * time.Hour) + + for _, m := range messages { + if m.Date.Before(limit) { + worker.AddTask(ctx, &pruneStationMessage{ID: m.ID}) + } + } + return nil +} diff --git a/admin/messages/messages.go b/admin/messages/messages.go new file mode 100644 index 0000000..acf630a --- /dev/null +++ b/admin/messages/messages.go @@ -0,0 +1,31 @@ +package messages + +import ( + "github.com/peter-mount/go-kernel" + "github.com/peter-mount/nre-feeds/admin" + "github.com/peter-mount/nre-feeds/util/worker" +) + +// Messages manages cleaning up station messages +type Messages struct { + taskQueue *worker.TaskQueue +} + +func (m *Messages) Name() string { + return "AdminMessages" +} + +func (m *Messages) Init(k *kernel.Kernel) error { + service, err := k.AddService(&worker.TaskQueue{}) + if err != nil { + return err + } + m.taskQueue = service.(*worker.TaskQueue) + + return k.DependsOn(&admin.Admin{}) +} + +func (m *Messages) Start() error { + m.taskQueue.AddTask(&getStationMessages{}) + return nil +} diff --git a/admin/messages/pruneMessage.go b/admin/messages/pruneMessage.go new file mode 100644 index 0000000..91a9f60 --- /dev/null +++ b/admin/messages/pruneMessage.go @@ -0,0 +1,36 @@ +package messages + +import ( + "context" + "fmt" + "github.com/peter-mount/go-kernel/rabbitmq" + "time" +) + +type pruneStationMessage struct { + ID int64 +} + +func (m *pruneStationMessage) Name() string { + return fmt.Sprintf("Remove Station Message %d", m.ID) +} + +const ( + TIMESTAMP = "2006-01-02T15:04:05Z" +) + +func (m *pruneStationMessage) Run(ctx context.Context) error { + //d3Client := ctx.Value("d3").(*client.DarwinD3Client) + mq := ctx.Value("mq").(*rabbitmq.RabbitMQ) + + mq.Publish( + "nre.darwin.pushport-v16", + []byte(fmt.Sprintf(""+ + ""+ + "Deleted"+ + "", + time.Now().UTC().Format(TIMESTAMP), + m.ID))) + + return nil +}