Skip to content

Commit

Permalink
feat(app): Update Custom Trigger Example (#81)
Browse files Browse the repository at this point in the history
Use new MessageReceived function on trigger config to hand off to
pipeline WITHOUT context construction to take advantage of multiple
pipelines configured if needed

Signed-off-by: Alex Ullrich <[email protected]>
  • Loading branch information
AlexCuse authored Dec 29, 2021
1 parent d4c782e commit c20f36e
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 9 deletions.
1 change: 1 addition & 0 deletions application-services/custom/custom-trigger/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ require (
github.com/edgexfoundry/app-functions-sdk-go/v2 v2.1.0
github.com/edgexfoundry/go-mod-bootstrap/v2 v2.1.0
github.com/edgexfoundry/go-mod-messaging/v2 v2.1.0
github.com/google/uuid v1.3.0
)
73 changes: 64 additions & 9 deletions application-services/custom/custom-trigger/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//
// Copyright (c) 2020 Technotects
// Copyright (c) 2021 One Track Consulting
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -20,6 +21,7 @@ import (
"bufio"
"context"
"fmt"
"github.com/google/uuid"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -53,13 +55,14 @@ func (t *stdinTrigger) Initialize(_ *sync.WaitGroup, ctx context.Context, _ <-ch
rdr := bufio.NewReader(os.Stdin)
for receiveMessage {
s, err := rdr.ReadString('\n')
s = strings.TrimRight(s, "\n")

if err != nil {
t.tc.Logger.Error(err.Error())
continue
}

s = strings.TrimRight(s, "\n")

msgs <- []byte(s)
}
}()
Expand All @@ -71,13 +74,21 @@ func (t *stdinTrigger) Initialize(_ *sync.WaitGroup, ctx context.Context, _ <-ch
receiveMessage = false
case m := <-msgs:
go func() {
spoofTopic := "even"

if len(m)%2 == 1 {
spoofTopic = "odd"
}

env := types.MessageEnvelope{
Payload: m,
CorrelationID: uuid.NewString(),
Payload: m,
ReceivedTopic: spoofTopic,
}

ctx := t.tc.ContextBuilder(env)
t.tc.Logger.Tracef("sending message to runtime %+v", env)

err := t.tc.MessageProcessor(ctx, env)
err := t.tc.MessageReceived(nil, env, nil)

if err != nil {
t.tc.Logger.Error(err.Error())
Expand Down Expand Up @@ -108,22 +119,66 @@ func main() {
}, nil
})

service.SetFunctionsPipeline(
printToConsole,
var err error

//use this to process using default pipeline only
//err = service.SetDefaultFunctionsPipeline(printLowerToConsole)
//if err != nil {
// service.LoggingClient().Errorf("SetDefaultFunctionsPipeline returned error: %s", err.Error())
// os.Exit(-1)
//}

//use this to process using varied pipelines by topic (odd/even string length)
err = service.AddFunctionsPipelineForTopics("odd", []string{"odd"},
printLowerToConsole,
)

if err != nil {
service.LoggingClient().Errorf("AddFunctionsPipelineForTopic returned error: %s", err.Error())
os.Exit(-1)
}

err = service.AddFunctionsPipelineForTopics("even", []string{"even"},
printUpperToConsole,
)

if err != nil {
service.LoggingClient().Errorf("AddFunctionsPipelineForTopic returned error: %s", err.Error())
os.Exit(-1)
}

// Lastly, we'll go ahead and tell the SDK to "start" and begin listening for events
err := service.MakeItRun()
err = service.MakeItRun()
if err != nil {
service.LoggingClient().Error("MakeItRun returned error: ", err.Error())
os.Exit(-1)
}

service.LoggingClient().Info("Exiting service")
// Do any required cleanup here
os.Exit(0)
}

func printToConsole(appContext interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
func printLowerToConsole(appContext interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
input, err := util.CoerceType(data)

if err != nil {
appContext.LoggingClient().Error(err.Error())
return false, err
}

wait := time.Millisecond * time.Duration(len(input))

time.Sleep(wait)

appContext.LoggingClient().Info("PrintToConsole")

os.Stdout.WriteString(fmt.Sprintf("'%s' received %s ago\n>", strings.ToLower(string(input)), wait.String()))

return false, nil
}

func printUpperToConsole(appContext interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
input, err := util.CoerceType(data)

if err != nil {
Expand All @@ -137,7 +192,7 @@ func printToConsole(appContext interfaces.AppFunctionContext, data interface{})

appContext.LoggingClient().Info("PrintToConsole")

os.Stdout.WriteString(fmt.Sprintf("'%s' received %s ago\n>", string(input), wait.String()))
os.Stdout.WriteString(fmt.Sprintf("'%s' received %s ago\n>", strings.ToUpper(string(input)), wait.String()))

return false, nil
}

0 comments on commit c20f36e

Please sign in to comment.