Skip to content

Commit

Permalink
message: remove the experimental and unused PendingPublish API
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Aug 24, 2024
1 parent aaa6cd8 commit f6ad713
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 211 deletions.
101 changes: 5 additions & 96 deletions message/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ func (p *Publisher) ProducerID() ProducerID { return p.producer }
// its marshaled content, with a UUID sequenced for immediate consumption.
// An error is returned if:
//
// * The Message implements Validator, and it returns an error.
// * The MappingFunc returns an error while mapping the Message to a journal.
// * The journal's Framing returns an error while marshaling the Message,
// or an os.PathError occurs while spooling the frame to a temporary file
// (eg, because local disk is full).
// - The Message implements Validator, and it returns an error.
// - The MappingFunc returns an error while mapping the Message to a journal.
// - The journal's Framing returns an error while marshaling the Message,
// or an os.PathError occurs while spooling the frame to a temporary file
// (eg, because local disk is full).
//
// A particular MappingFunc error to be aware of is ErrEmptyListResponse,
// returned by mapping routines of this package when there are no journals
Expand Down Expand Up @@ -128,97 +128,6 @@ func (p *Publisher) PublishUncommitted(mapping MappingFunc, msg Message) (*clien
return aa, nil
}

// PendingPublish is returned from DeferPublishUncommitted, and allows appending a single message
// that had previously been sequenced.
//
// **This is a new and unstable API, that is subject to breaking changes.**
type PendingPublish struct {
publisher *Publisher
journal pb.Journal
contentType string
uuid UUID
}

// Resolve completes a PendingPublish by appending the finalized content of a message that had
// previously been sequenced. See DeferPublishUncommitted docs for more.
//
// **This is a new and unstable API, that is subject to breaking changes.**
func (pf *PendingPublish) Resolve(msg Message) error {
if pf.publisher == nil {
// Sanity check for if Resolve has already been called, or if PendingPublish is zero-valued
// due to SequenceFutureMessage having returned an error.
panic("Pending publish has already been resolved")
}
if v, ok := msg.(Validator); ok {
if err := v.Validate(); err != nil {
return err
}
}
msg.SetUUID(pf.uuid)

var framing, err = FramingByContentType(pf.contentType)
if err != nil {
return err
}

var aa = pf.publisher.ajc.StartAppend(pb.AppendRequest{Journal: pf.journal}, nil)
aa.Require(framing.Marshal(msg, aa.Writer()))
err = aa.Release()
pf.publisher = nil // so that we can sanity check that Resolve isn't called twice
return err
}

// DeferPublishUncommitted is used to sequence a message that will be published at some future
// point, but before the end of the transaction. It returns a PendingPublish, which can be resolved
// by passing it the actual message to be published. This is used in situations where you need to
// transactionally publish a message when you don't have the content of that message until after the
// ack intents are built. This is an advanced, low level api, and care must be taken to use it
// correctly to avoid corruption of journal content.
//
// The journal and contentType must be known up front, and the acknowledgement Message must also be
// provided by the caller. It's up to the caller to ensure that these things are correct and
// consistent.
//
// The returned PendingPublish does not need to ever be resolved, and can be dropped with no harm
// done. If Resolve is called, then it must be called _before_ the acknowledgements are written.
// Otherwise the resolved message will be ignored by ReadCommitted consumers. Also note that the
// PendingPublish is not safe to Resolve concurrently with other uses of a Publisher.
//
// No other messages should be published to the journal using PublishUncommitted or PublishCommitted
// before the PendingPublish is resolved. It it permissible to publish more than one message using
// DeferPublishUncommitted, as long as all PendingPublish instances are resolved in exactly the
// order in which they were created.
//
// **This is a new and unstable API, that is subject to breaking changes.**
func (p *Publisher) DeferPublishUncommitted(journal pb.Journal, contentType string, ack Message) (fut PendingPublish, err error) {
if p.autoUpdate {
p.clock.Update(time.Now())
}

var framing Framing
if framing, err = FramingByContentType(contentType); err != nil {
return
}

var uuid = BuildUUID(p.producer, p.clock.Tick(), Flag_CONTINUE_TXN)
// Is this the first publish to this journal since our last commit?
if _, ok := p.intentIdx[journal]; !ok {
p.intentIdx[journal] = len(p.intents)
p.intents = append(p.intents, AckIntent{
Journal: journal,
// Call NewAcknowledgement to create the ack, to ensure that each ack message is unique.
msg: ack.NewAcknowledgement(journal),
framing: framing,
})
}
return PendingPublish{
publisher: p,
journal: journal,
contentType: contentType,
uuid: uuid,
}, nil
}

// BuildAckIntents returns the []AckIntents which acknowledge all pending
// Messages published since its last invocation. It's the caller's job to
// actually append the intents to their respective journals, and only *after*
Expand Down
115 changes: 0 additions & 115 deletions message/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,121 +215,6 @@ func TestIntegrationOfPublisherWithSequencerAndReader(t *testing.T) {
require.NoError(t, bk.Tasks.Wait())
}

func TestDeferPublishUncommitted(t *testing.T) {
var etcd = etcdtest.TestClient()
defer etcdtest.Cleanup()

var (
clock Clock
ctx = context.Background()
spec = newTestMsgSpec("a/journal")
bk = brokertest.NewBroker(t, etcd, "local", "broker")
ajc = client.NewAppendService(ctx, bk.Client())
)
brokertest.CreateJournals(t, bk, spec)

// Start a long-lived RetryReader of |spec|.
var rr = client.NewRetryReader(ctx, bk.Client(), pb.ReadRequest{
Journal: spec.Name,
Block: true,
})
var r = NewReadUncommittedIter(rr, newTestMsg)

var seq = NewSequencer(nil, nil, 5)

var seqPump = func() (out []testMsg) {
var env, err = r.Next()
require.NoError(t, err)

if seq.QueueUncommitted(env) == QueueAckCommitReplay {
// The sequencer buffer is large enough that we should never need to replay for this
// test.
panic("unexpected need to replay")
}
for {
if err := seq.Step(); err == io.EOF {
return
}
require.NoError(t, err)
out = append(out, *seq.Dequeued.Message.(*testMsg))
}
}

var mapping = func(Mappable) (pb.Journal, string, error) {
return spec.Name, labels.ContentType_JSONLines, nil
}
var pub = NewPublisher(ajc, &clock)

// Happy path: An uncommitted message can be written before a deferred one, and should get
// sequenced normally with respect to the deferred message, since the deferred publish is
// started after.
var _, err = pub.PublishUncommitted(mapping, &testMsg{Str: "one"})
require.NoError(t, err)
require.Equal(t, []testMsg(nil), seqPump())

fut, err := pub.DeferPublishUncommitted(spec.Name, labels.ContentType_JSONLines, new(testMsg))
require.NoError(t, err)

intents, err := pub.BuildAckIntents()
require.NoError(t, err)

require.NoError(t, fut.Resolve(&testMsg{Str: "two"}))
require.Equal(t, []testMsg(nil), seqPump())

writeIntents(t, ajc, intents)

var actual = seqPump()
require.Equal(t, 3, len(actual))
require.Equal(t, "one", actual[0].Str)
require.Equal(t, "two", actual[1].Str)
require.Equal(t, "", actual[2].Str)

// Sad path cases:
// The deferred publish message will not be seen because it sequences before "three"
fut, err = pub.DeferPublishUncommitted(spec.Name, labels.ContentType_JSONLines, new(testMsg))
require.NoError(t, err)

_, err = pub.PublishUncommitted(mapping, &testMsg{Str: "three"})
require.NoError(t, err)
require.Equal(t, []testMsg(nil), seqPump())
intents, err = pub.BuildAckIntents()
require.NoError(t, err)
require.NoError(t, fut.Resolve(&testMsg{Str: "wont see four"}))
require.Equal(t, []testMsg(nil), seqPump())

writeIntents(t, ajc, intents)
actual = seqPump()
require.Equal(t, 2, len(actual))
require.Equal(t, "three", actual[0].Str)
require.Equal(t, "", actual[1].Str)

// The deferred publish isn't resolved until after the acks were written, so will not be seen.
_, err = pub.PublishUncommitted(mapping, &testMsg{Str: "five"})
require.NoError(t, err)
require.Equal(t, []testMsg(nil), seqPump())

fut, err = pub.DeferPublishUncommitted(spec.Name, labels.ContentType_JSONLines, new(testMsg))
require.NoError(t, err)

intents, err = pub.BuildAckIntents()
require.NoError(t, err)
writeIntents(t, ajc, intents)

actual = seqPump()
require.Equal(t, 2, len(actual))
require.Equal(t, "five", actual[0].Str)
require.Equal(t, "", actual[1].Str)

require.NoError(t, fut.Resolve(&testMsg{Str: "wont see six"}))
require.Equal(t, []testMsg(nil), seqPump())

_, err = pub.PublishCommitted(mapping, &testMsg{Str: "seven"})
require.NoError(t, err)
actual = seqPump()
require.Equal(t, 1, len(actual))
require.Equal(t, "seven", actual[0].Str)
}

func readAllMsgs(t require.TestingT, bk *brokertest.Broker, spec *pb.JournalSpec) (out []testMsg) {
var rr = client.NewRetryReader(context.Background(), bk.Client(), pb.ReadRequest{Journal: spec.Name})
var r = NewReadUncommittedIter(rr, newTestMsg)
Expand Down

0 comments on commit f6ad713

Please sign in to comment.