Skip to content

Commit

Permalink
add l2blockend to DS (#3751)
Browse files Browse the repository at this point in the history
* add l2blockend to DS

* move l2blockend creation
  • Loading branch information
ToniRamirezM authored Aug 6, 2024
1 parent 8735f06 commit 45a7413
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 127 deletions.
6 changes: 6 additions & 0 deletions proto/src/proto/datastream/v1/datastream.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ message L2Block {
Debug debug = 14;
}

message L2BlockEnd {
uint64 number = 1;
}

message Transaction {
uint64 l2block_number = 1;
uint64 index = 2;
Expand Down Expand Up @@ -79,11 +83,13 @@ enum EntryType {
ENTRY_TYPE_TRANSACTION = 3;
ENTRY_TYPE_BATCH_END = 4;
ENTRY_TYPE_UPDATE_GER = 5;
ENTRY_TYPE_L2_BLOCK_END = 6;
}

enum BatchType {
BATCH_TYPE_UNSPECIFIED = 0;
BATCH_TYPE_REGULAR = 1;
BATCH_TYPE_FORCED = 2;
BATCH_TYPE_INJECTED = 3;
BATCH_TYPE_INVALID = 4;
}
24 changes: 21 additions & 3 deletions sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *Sequencer) Start(ctx context.Context) {
}

if s.streamServer != nil {
go s.sendDataToStreamer(s.cfg.StreamServer.ChainID)
go s.sendDataToStreamer(s.cfg.StreamServer.ChainID, s.cfg.StreamServer.Version)
}

s.workerReadyTxsCond = newTimeoutCond(&sync.Mutex{})
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *Sequencer) checkStateInconsistency(ctx context.Context) {
}

func (s *Sequencer) updateDataStreamerFile(ctx context.Context, chainID uint64) {
err := state.GenerateDataStreamFile(ctx, s.streamServer, s.stateIntf, true, nil, chainID, s.cfg.StreamServer.UpgradeEtrogBatchNumber)
err := state.GenerateDataStreamFile(ctx, s.streamServer, s.stateIntf, true, nil, chainID, s.cfg.StreamServer.UpgradeEtrogBatchNumber, s.cfg.StreamServer.Version)
if err != nil {
log.Fatalf("failed to generate data streamer file, error: %v", err)
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func (s *Sequencer) addTxToWorker(ctx context.Context, tx pool.Transaction) erro
}

// sendDataToStreamer sends data to the data stream server
func (s *Sequencer) sendDataToStreamer(chainID uint64) {
func (s *Sequencer) sendDataToStreamer(chainID uint64, version uint8) {
var err error
for {
// Read error from previous iteration
Expand Down Expand Up @@ -369,6 +369,24 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
}
}

if version >= state.DSVersion4 {
streamL2BlockEnd := &datastream.L2BlockEnd{
Number: l2Block.L2BlockNumber,
}

marshalledL2BlockEnd, err := proto.Marshal(streamL2BlockEnd)
if err != nil {
log.Errorf("failed to marshal l2block %d, error: %v", l2Block.L2BlockNumber, err)
continue
}

_, err = s.streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK_END), marshalledL2BlockEnd)
if err != nil {
log.Errorf("failed to add stream entry for l2blockEnd %d, error: %v", l2Block.L2BlockNumber, err)
continue
}
}

err = s.streamServer.CommitAtomicOp()
if err != nil {
log.Errorf("failed to commit atomic op for l2block %d, error: %v ", l2Block.L2BlockNumber, err)
Expand Down
75 changes: 74 additions & 1 deletion state/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ const (
SystemSC = "0x000000000000000000000000000000005ca1ab1e"
// posConstant is the constant used to compute the position of the intermediate state root
posConstant = 1
// DSVersion3 is the first protobuf version
DSVersion3 uint8 = 3
// DSVersion4 is the second protobuf version, includes l2BlockEnd
DSVersion4 uint8 = 4
)

// DSBatch represents a data stream batch
Expand Down Expand Up @@ -87,7 +91,7 @@ type DSState interface {
}

// GenerateDataStreamFile generates or resumes a data stream file
func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool, imStateRoots *map[uint64][]byte, chainID uint64, upgradeEtrogBatchNumber uint64) error {
func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool, imStateRoots *map[uint64][]byte, chainID uint64, upgradeEtrogBatchNumber uint64, version uint8) error {
header := streamServer.GetHeader()

var currentBatchNumber uint64 = 0
Expand Down Expand Up @@ -177,6 +181,22 @@ func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.Stre
return err
}

if version >= DSVersion4 {
genesisBlockEnd := &datastream.L2BlockEnd{
Number: genesisL2Block.L2BlockNumber,
}

marshalledGenesisBlockEnd, err := proto.Marshal(genesisBlockEnd)
if err != nil {
return err
}

_, err = streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK_END), marshalledGenesisBlockEnd)
if err != nil {
return err
}
}

genesisBatchEnd := &datastream.BatchEnd{
Number: genesisL2Block.BatchNumber,
LocalExitRoot: common.Hash{}.Bytes(),
Expand Down Expand Up @@ -249,6 +269,43 @@ func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.Stre
currentBatchNumber = l2Block.BatchNumber
previousTimestamp = l2Block.Timestamp
lastAddedL2BlockNumber = currentL2BlockNumber

case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK_END):
log.Info("Latest entry type is L2BlockEnd")

l2BlockEnd := &datastream.L2BlockEnd{}
if err := proto.Unmarshal(latestEntry.Data, l2BlockEnd); err != nil {
return err
}

currentL2BlockNumber := l2BlockEnd.Number

// Getting the l2 block is needed in order to get the batch number and the timestamp
bookMark := &datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK,
Value: currentL2BlockNumber,
}

marshalledBookMark, err := proto.Marshal(bookMark)
if err != nil {
return err
}

l2BlockEntry, err := streamServer.GetFirstEventAfterBookmark(marshalledBookMark)
if err != nil {
return err
}

l2Block := &datastream.L2Block{}

if err := proto.Unmarshal(l2BlockEntry.Data, l2Block); err != nil {
return err
}

currentBatchNumber = l2Block.BatchNumber
previousTimestamp = l2Block.Timestamp
lastAddedL2BlockNumber = currentL2BlockNumber

case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_TRANSACTION):
log.Info("Latest entry type is Transaction")

Expand Down Expand Up @@ -626,6 +683,22 @@ func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.Stre
}

currentGER = l2Block.GlobalExitRoot

if version >= DSVersion4 {
streamL2BlockEnd := &datastream.L2BlockEnd{
Number: l2Block.L2BlockNumber,
}

marshalledL2BlockEnd, err := proto.Marshal(streamL2BlockEnd)
if err != nil {
return err
}

_, err = streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_L2_BLOCK_END), marshalledL2BlockEnd)
if err != nil {
return err
}
}
}
}

Expand Down
Loading

0 comments on commit 45a7413

Please sign in to comment.