Skip to content

Commit

Permalink
validate results
Browse files Browse the repository at this point in the history
  • Loading branch information
lsgrep committed Oct 11, 2024
1 parent 6b2b4cb commit 7d48eec
Showing 1 changed file with 37 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ protected <K, T, U, W extends Window> KeyedStream<Tuple2<Long, Pair<K, U>>, K> c
if (w instanceof EventTimeBasedSlidingWindow) {
final EventTimeBasedSlidingWindow sw = (EventTimeBasedSlidingWindow) w;
return new KeyedStream<>(ks.process(new KeyedProcessFunction<K, Tuple2<Long, Pair<K, T>>, Tuple2<Long, Pair<K, U>>>() {
private transient MapState<Long, T> elementMap;
private transient ListState<Tuple2<Long, T>> sortedElements;

public void open(Configuration parameters) throws Exception {
long ttl = sw.size().toSeconds() + sw.slide().toSeconds() * 14;
Expand All @@ -259,48 +259,55 @@ public void open(Configuration parameters) throws Exception {
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupIncrementally(10, false)
.build();
MapStateDescriptor<Long, T> descriptor = new MapStateDescriptor<>("elements", BasicTypeInfo.LONG_TYPE_INFO, typeInfo(clz));
ListStateDescriptor<Tuple2<Long, T>> descriptor = new ListStateDescriptor<>("elements", tuple2TypeInfo(clz));
descriptor.enableTimeToLive(ttlConfig);
elementMap = getRuntimeContext().getMapState(descriptor);
sortedElements = getRuntimeContext().getListState(descriptor);
}

private void updateMap(Long timestamp, T value) throws Exception {
long lowerBoundInclusive = timestamp - sw.size().toMillis() - sw.slide().toMillis() * 14;
// two things:
// 1. insert t into sorted, while maintaining the order
// 2. remove elements in sorted that is too old
private void update(List<Tuple2<Long, T>> sorted, Tuple2<Long, T> t) {
// keep a 14 times of the slide size as a buffer in case we see a late event
// TODO: revisit this. ideally, we should be able to change the number
// through a config
long lowerBoundInclusive = t.f0 - sw.size().toMillis() - sw.slide().toMillis() * 14;
while (!sorted.isEmpty() && sorted.get(0).f0 < lowerBoundInclusive) {
sorted.remove(0);
}

// Remove old elements
Iterator<Map.Entry<Long, T>> iterator = elementMap.entries().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, T> entry = iterator.next();
if (entry.getKey() < lowerBoundInclusive) {
iterator.remove();
int i = sorted.size() - 1;
for (; i >= 0; i--) {
Tuple2<Long, T> e = sorted.get(i);
if (e.f0 <= t.f0) {
break;
}
}

// Add new element
elementMap.put(timestamp, value);
sorted.add(i + 1, t);
}

@Override
public void processElement(Tuple2<Long, Pair<K, T>> tuple, Context context, Collector<Tuple2<Long, Pair<K, U>>> collector) throws Exception {
Long timestamp = tuple.f0;
T value = tuple.f1.r;

updateMap(timestamp, value);

List<T> windowElements = new ArrayList<>();
long windowStart = timestamp - sw.size().toMillis();

for (Map.Entry<Long, T> entry : elementMap.entries()) {
if (entry.getKey() >= windowStart && entry.getKey() <= timestamp) {
windowElements.add(entry.getValue());
List<Tuple2<Long, T>> elements = Lists.newArrayList(sortedElements.get());
Tuple2<Long, T> e = new Tuple2<>(tuple.f0, tuple.f1.r);
update(elements, e);

List<T> ts = new ArrayList<>();
long lowerBoundInclusive = e.f0 - sw.size().toMillis();
long upperBoundInclusive = e.f0;
for (Tuple2<Long, T> t : elements) {
if (t.f0 >= lowerBoundInclusive && t.f0 <= upperBoundInclusive) {
ts.add(t.f1);
}
}

List<U> results = Lists.newArrayList(f.apply(windowElements));
if (!results.isEmpty()) {
U last = results.get(results.size() - 1);
collector.collect(new Tuple2<>(timestamp, new Pair<>(tuple.f1.l, last)));
List<U> us = Lists.newArrayList(f.apply(ts));
if (!us.isEmpty()) {
U last = us.get(us.size() - 1);
// we only need the last one
collector.collect(new Tuple2<>(e.f0, new Pair<>(tuple.f1.l, last)));
}

sortedElements.update(elements);
}
}, new TupleTypeInfo<>(BasicTypeInfo.LONG_TYPE_INFO, new PairTypeInfo<>(typeInfo(stream.keyClass()), typeInfo(stream.uc)))), t -> t.f1.l);
}
Expand Down

0 comments on commit 7d48eec

Please sign in to comment.