Skip to content

Commit

Permalink
fix up tests error with attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
czy006 committed Aug 28, 2024
1 parent 38ad59f commit 47781b0
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public void testMixedFormatSource(FailoverType failoverType) throws Exception {

@Test
public void testDimTaskManagerFailover() throws Exception {
int restartAttempts = 10;
List<RowData> updated = updateRecords();
writeUpdate(updated);
List<RowData> records = generateRecords(2, 1);
Expand All @@ -242,8 +243,8 @@ public void testDimTaskManagerFailover() throws Exception {
MixedFormatSource<RowData> mixedFormatSource = initMixedFormatDimSource(true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 0));
env.enableCheckpointing(3000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, 0));

DataStream<RowData> input =
env.fromSource(
Expand All @@ -262,8 +263,8 @@ public void testDimTaskManagerFailover() throws Exception {
WatermarkAwareFailWrapper::continueProcessing,
miniClusterResource.getMiniCluster());

while (WatermarkAwareFailWrapper.watermarkCounter.get() != PARALLELISM) {
Thread.sleep(1000);
while (WatermarkAwareFailWrapper.watermarkCounter.get() != restartAttempts) {
Thread.sleep(2000);
LOG.info("wait for watermark after failover");
}
Assert.assertEquals(Long.MAX_VALUE, WatermarkAwareFailWrapper.getWatermarkAfterFailover());
Expand Down
Empty file.

0 comments on commit 47781b0

Please sign in to comment.