Skip to content

Commit

Permalink
Merge pull request #6 from zero-gravity-labs/test-sync-chunks
Browse files Browse the repository at this point in the history
add python test for chunks sync by rpc
  • Loading branch information
peilun-conflux authored Jan 22, 2024
2 parents 613c1a8 + 99ead64 commit 77bf2e1
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 39 deletions.
28 changes: 14 additions & 14 deletions node/sync/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,20 +399,20 @@ impl SyncService {
}

// file may be removed, but remote peer still find one from the file location cache
let finalized = self.store.check_tx_completed(request.tx_id.seq).await?;
if !finalized {
info!(%request.tx_id.seq, "Failed to handle chunks request due to tx not finalized");
// FIXME(zz): If remote removes a file, we will also get failure here.
// self.ctx
// .report_peer(peer_id, PeerAction::HighToleranceError, "Tx not finalized");
self.ctx.send(NetworkMessage::SendErrorResponse {
peer_id,
error: RPCResponseErrorCode::InvalidRequest,
reason: "Tx not finalized".into(),
id: request_id,
});
return Ok(());
}
// let finalized = self.store.check_tx_completed(request.tx_id.seq).await?;
// if !finalized {
// info!(%request.tx_id.seq, "Failed to handle chunks request due to tx not finalized");
// // FIXME(zz): If remote removes a file, we will also get failure here.
// // self.ctx
// // .report_peer(peer_id, PeerAction::HighToleranceError, "Tx not finalized");
// self.ctx.send(NetworkMessage::SendErrorResponse {
// peer_id,
// error: RPCResponseErrorCode::InvalidRequest,
// reason: "Tx not finalized".into(),
// id: request_id,
// });
// return Ok(());
// }

let result = self
.store
Expand Down
95 changes: 73 additions & 22 deletions tests/sync_test.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,109 @@
#!/usr/bin/env python3

import random
import time

from test_framework.test_framework import TestFramework
from utility.submission import create_submission
from utility.submission import submit_data
from utility.submission import submit_data, data_to_segments
from utility.utils import (
assert_equal,
wait_until,
)


class SyncTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 2
self.num_nodes = 2
self.__deployed_contracts = 0

def run_test(self):
client1 = self.nodes[0]
client2 = self.nodes[1]

self.stop_storage_node(1)
# By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false,
# and file or chunks sync should be triggered by rpc.
self.__test_sync_file_by_rpc()
self.__test_sync_chunks_by_rpc()

size = 256 * 1024
chunk_data = random.randbytes(size)
def __test_sync_file_by_rpc(self):
self.log.info("Begin to test file sync by rpc")

submissions, data_root = create_submission(chunk_data)
self.log.info("data root: %s, submissions: %s", data_root, submissions)
self.contract.submit(submissions)
client1 = self.nodes[0]
client2 = self.nodes[1]

wait_until(lambda: self.contract.num_submissions() == 1)
# Create submission
chunk_data = random.randbytes(256 * 1024)
data_root = self.__create_submission(chunk_data)

# Ensure log entry sync from blockchain node
wait_until(lambda: client1.zgs_get_file_info(data_root) is not None)
assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False)

# Upload file to storage node
segments = submit_data(client1, chunk_data)
self.log.info(
"segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]
)

self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
wait_until(lambda: client1.zgs_get_file_info(data_root)["finalized"])

self.start_storage_node(1)
self.nodes[1].wait_for_rpc_connection()
# File should not be auto sync on node 2
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
time.sleep(3)
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)

client2.admin_start_sync_file(0)
# Trigger file sync by rpc
assert(client2.admin_start_sync_file(0) is None)
wait_until(lambda: client2.sycn_status_is_completed_or_unknown(0))

wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"])

# Validate data
assert_equal(
client2.zgs_download_segment(data_root, 0, 1),
client1.zgs_download_segment(data_root, 0, 1),
client2.zgs_download_segment(data_root, 0, 1024),
client1.zgs_download_segment(data_root, 0, 1024),
)

def __test_sync_chunks_by_rpc(self):
self.log.info("Begin to test chunks sync by rpc")

client1 = self.nodes[0]
client2 = self.nodes[1]

# Prepare 3 segments to upload
chunk_data = random.randbytes(256 * 1024 * 3)
data_root = self.__create_submission(chunk_data)

# Ensure log entry sync from blockchain node
wait_until(lambda: client1.zgs_get_file_info(data_root) is not None)
assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False)

# Upload only 2nd segment to storage node
segments = data_to_segments(chunk_data)
self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
assert(client1.zgs_upload_segment(segments[1]) is None)

# segment 0 is not able to download
assert(client1.zgs_download_segment_decoded(data_root, 0, 1024) is None)
# segment 1 is available to download
assert_equal(client1.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256])
# segment 2 is not able to download
assert(client1.zgs_download_segment_decoded(data_root, 2048, 3072) is None)

# Segment 1 should not be able to download on node 2
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)
assert(client2.zgs_download_segment_decoded(data_root, 1024, 2048) is None)

# Trigger chunks sync by rpc
assert(client2.admin_start_sync_chunks(1, 1024, 2048) is None)
wait_until(lambda: client2.sycn_status_is_completed_or_unknown(1))
wait_until(lambda: client2.zgs_download_segment_decoded(data_root, 1024, 2048) is not None)

# Validate data
assert_equal(client2.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256])

def __create_submission(self, chunk_data: bytes) -> str:
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
self.__deployed_contracts += 1
wait_until(lambda: self.contract.num_submissions() == self.__deployed_contracts)
self.log.info("Submission created, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions)
return data_root

if __name__ == "__main__":
SyncTest().main()
5 changes: 3 additions & 2 deletions tests/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

__file_path__ = os.path.dirname(os.path.realpath(__file__))

CONFLUX_BINARY = "conflux.exe" if is_windows_platform() else "conflux"

def run_single_test(py, script, test_dir, index, port_min, port_max):
try:
Expand Down Expand Up @@ -61,7 +62,7 @@ def run():
if not os.path.exists(dir_name):
os.makedirs(dir_name, exist_ok=True)

conflux_path = os.path.join(dir_name, "conflux")
conflux_path = os.path.join(dir_name, CONFLUX_BINARY)
if not os.path.exists(conflux_path):
build_conflux(conflux_path)

Expand Down Expand Up @@ -155,7 +156,7 @@ def build_conflux(conflux_path):
os.chdir(destination_path)
os.system("cargo build --release --bin conflux")

path = os.path.join(destination_path, "target", "release", "conflux")
path = os.path.join(destination_path, "target", "release", CONFLUX_BINARY)
shutil.copyfile(path, conflux_path)

if not is_windows_platform():
Expand Down
9 changes: 8 additions & 1 deletion tests/test_framework/zgs_node.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import shutil
import sys
import base64

from config.node_config import ZGS_CONFIG
from test_framework.blockchain_node import NodeType, TestNode
Expand Down Expand Up @@ -85,6 +85,10 @@ def zgs_upload_segment(self, segment):

def zgs_download_segment(self, data_root, start_index, end_index):
return self.rpc.zgs_downloadSegment([data_root, start_index, end_index])

def zgs_download_segment_decoded(self, data_root: str, start_chunk_index: int, end_chunk_index: int) -> bytes:
encodedSegment = self.rpc.zgs_downloadSegment([data_root, start_chunk_index, end_chunk_index])
return None if encodedSegment is None else base64.b64decode(encodedSegment)

def zgs_get_file_info(self, data_root):
return self.rpc.zgs_getFileInfo([data_root])
Expand All @@ -98,6 +102,9 @@ def shutdown(self):

def admin_start_sync_file(self, tx_seq):
return self.rpc.admin_startSyncFile([tx_seq])

def admin_start_sync_chunks(self, tx_seq: int, start_chunk_index: int, end_chunk_index: int):
return self.rpc.admin_startSyncChunks([tx_seq, start_chunk_index, end_chunk_index])

def admin_get_sync_status(self, tx_seq):
return self.rpc.admin_getSyncStatus([tx_seq])
Expand Down

0 comments on commit 77bf2e1

Please sign in to comment.