Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streaming(push) #334

Open
defool opened this issue Mar 3, 2022 · 1 comment
Open

Support streaming(push) #334

defool opened this issue Mar 3, 2022 · 1 comment
Assignees

Comments

@defool
Copy link
Collaborator

defool commented Mar 3, 2022

Draft1: Support streaming api like this: https://grpc.io/docs/languages/go/basics/#server-side-streaming-rpc。

proto.tars:

module ServerTest {
	interface RPCServant {  // RPC API
	    string Echo(string in); 
	};

	stream PushServant { // Stream API
	    string Message(string in);
	};
};

Client:

streamCli := &ServerTest.PushServant{}
comm.StringToProxy("App.ServerTest.PushServantObj", streamCli)

msg1, err := streamCli.RecvMessage() // block if no message
msg2, err = streamCli.RecvMessage() // block if no message
err := streamCli.SendMessage("abc") // block if no message

Server Start:

ServerTest.AddStreamSevant("App.ServerTest.PushServantObj", serverImp)

Server Implement:

type serverImp struct{}

func (s *serverImp) Messsage(ctx context.Context, stream ServerTest.MesssageStream) error {
	for {
		msg, err := stream.RecvMessage() // block if no message
		err = stream.SendMessage("hello") // block if no message
	}
}
@defool
Copy link
Collaborator Author

defool commented Mar 4, 2022

Draft2: Support low level push API like this: https://github.com/TarsCloud/TarsCpp/tree/master/examples/PushDemo

client:

package main

import (
	"fmt"
	"time"

	"github.com/TarsCloud/TarsGo/tars"
	"github.com/TarsCloud/TarsGo/tars/protocol/push"
)

func callback(data []byte) {
	fmt.Println("recv message:", string(data))
}

func main() {
	comm := tars.NewCommunicator()
	obj := fmt.Sprintf("TestApp.PushServer.MessageObj@tcp -h 127.0.0.1 -p 10015 -t 60000")
	client := push.NewClient()
	comm.StringToProxy(obj, client)
	err := client.Connect("pushFunc", []byte("hello"), callback)
	if err != nil {
		panic(err)
	}
	// Wait for receving message
	time.Sleep(time.Second * 1)
}

server:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/TarsCloud/TarsGo/tars"
	"github.com/TarsCloud/TarsGo/tars/protocol/push"
	"github.com/TarsCloud/TarsGo/tars/util/current"
)

var (
	logger = tars.GetLogger("")
)

type pushImp struct{}

func (p *pushImp) OnConnect(ctx context.Context, req []byte) []byte {
	logger.Debugf("push client: ", current.GetClientIPFromContext(ctx))
	go func() {
		for i := 0; i < 3; i++ {
			time.Sleep(time.Millisecond * 100)
			push.Send(ctx, []byte("msg"+fmt.Sprint(i)))
		}
	}()
	return req
}

func main() {
	cfg := tars.GetServerConfig()
	proto := push.NewServer(&pushImp{})
	tars.AddServantWithProtocol(proto, cfg.App+"."+cfg.Server+".MessageObj")
	tars.Run()
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant