本文共 9867 字,大约阅读时间需要 32 分钟。
DataStream的双流Join在Flink中是一个强大的操作,广泛应用于数据处理和分析场景。以下将从实现机制、基于窗口的Join以及基于Interval的Join等多个方面详细阐述DataStream的双流Join机制。
DataStream的双流Join与批Join的主要区别在于数据的特性。批Join处理的是有界数据,而DataStream则是无界数据。在无界数据上实现Join,需要引入一个“界”来限定数据的范围。这个“界”主要有两种表达方式:窗口和间隔。
在Flink中,Join的底层实现主要依赖于状态机制。系统会将定义的“界”内的数据存储到状态中,等待Join操作完成后,会清空状态。这种状态管理机制是实现Join操作的核心基础。
Flink提供了内连接操作,可以直接通过以下代码实现:
stream.join(otherStream) .where(keySelector) .equalTo(keySelector) .window(windowAssigner) .apply(joinFunction);
一个具体的例子如下:
DataStream> result = stream1.join(stream2) .where(tuple -> tuple.f0) .equalTo(tuple -> tuple.f0) .window(TumblingEventTimeWindows.of(Time.seconds(1))) .apply(new JoinFunction<...> { @Override public Tuple2 join(Tuple2 first, Tuple2 second) throws Exception { return new Tuple2<>(first.f0, first.f1 + second.f1); } });
Flink的Join操作在底层使用coGroup的方式实现。JoinedStreams的apply方法会将两个流中的相同键值的数据分别放入oneValues和twoValues中,然后通过双层循环遍历这两个列表,确保每个列表中的元素都能被处理,避免因为某一列表为空而导致数据未能输出。
Flink原生的SQL语法只支持内连接。实现外连接时,可以通过内连接的思路,使用coGroup操作来模拟外连接的效果。例如:
DataStream> result = stream1.coGroup(stream2) .where(keySelector) .equalTo(keySelector) .window(windowAssigner) .apply(new CoGroupFunction<...> { @Override public void coGroup(Iterable > first, Iterable > second, Collector > out) throws Exception { for (Tuple2 left : first) { boolean hasRight = false; for (Tuple2 right : second) { hasRight = true; out.collect(new Tuple3<>(left.f0, left.f1, right.f1)); } if (!hasRight) { out.collect(new Tuple3<>(left.f0, left.f1, null)); } } } });
IntervalJoinOperator是Flink中实现Interval Join的核心操作符。它利用MapState来缓存两条流的数据,状态的键是时间戳,值是用户定义的数据类型。一个状态可能对应多个值,存储方式是HashMap。通过重写OnEventTime函数,可以在特定时间触发状态清除。
内连接可以直接使用Flink提供的intervalJoin操作:
SingleOutputStreamOperator> newDataStream = dataStream1.keyBy(f -> f.f0) .intervalJoin(dataStream2.keyBy(t -> t.f0)) .between(Time.seconds(-1), Time.seconds(1)) .process(new ProcessJoinFunction<...> { @Override public void processElement(Tuple3 left, Tuple3 right, Context ctx, Collector > out) throws Exception { out.collect(new Tuple3<>(left.f0, left.f1, left.f2 + right.f2)); } });
由于Flink DataStream的内置Join操作仅支持内连接,实现外连接需要自定义IntervalLeftOuterJoinFunction。该函数参考了Flink的IntervalJoinOperator,主要通过缓存两条流的数据,并在特定时间范围内进行Join操作。以下是一个简单的实现示例:
public class IntervalLeftOuterJoinFunctionextends KeyedCoProcessFunction { private static final String LEFT_BUFFER = "LEFT_BUFFER"; private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; private final long lowerBound; private final long upperBound; private transient MapState >> leftBuffer; private transient MapState >> rightBuffer; private final TypeSerializer leftTypeSerializer; private final TypeSerializer rightTypeSerializer; public IntervalLeftOuterJoinFunction(Time lowerBound, Time upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive, TypeSerializer leftTypeSerializer, TypeSerializer rightTypeSerializer) { long millLowerBound = lowerBound.toMilliseconds(); long millUpperBound = upperBound.toMilliseconds(); Preconditions.checkArgument(millLowerBound <= millUpperBound, "lowerBound <= upperBound must be fulfilled"); this.lowerBound = lowerBoundInclusive ? millLowerBound : millLowerBound + 1L; this.upperBound = upperBoundInclusive ? millUpperBound : millUpperBound - 1L; this.leftTypeSerializer = leftTypeSerializer; this.rightTypeSerializer = rightTypeSerializer; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.milliseconds(upperBound)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); MapStateDescriptor >> leftMapStateDescriptor = new MapStateDescriptor<>( LEFT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>( new IntervalJoinOperator.BufferEntrySerializer<>(leftTypeSerializer) ) ); leftMapStateDescriptor.enableTimeToLive(ttlConfig); MapStateDescriptor >> rightMapStateDescriptor = new MapStateDescriptor<>( RIGHT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>( new IntervalJoinOperator.BufferEntrySerializer<>(rightTypeSerializer) ) ); rightMapStateDescriptor.enableTimeToLive(ttlConfig); this.leftBuffer = getRuntimeContext().getMapState(leftMapStateDescriptor); this.rightBuffer = getRuntimeContext().getMapState(rightMapStateDescriptor); } public abstract T3 userProcess(T1 leftValue, T2 rightValue); @Override public void processElement1(T1 value, Context ctx, Collector out) throws Exception { processElement(new StreamRecord<>(value, ctx.timestamp()), leftBuffer, rightBuffer, lowerBound, upperBound, true, out, ctx); } @Override public void processElement2(T2 value, Context ctx, Collector out) throws Exception { processElement(new StreamRecord<>(value, ctx.timestamp()), rightBuffer, leftBuffer, -upperBound, -lowerBound, false, out, ctx); } private void processElement( final StreamRecord record, final MapState >> ourBuffer, final MapState >> otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft, Collector out, Context ctx ) throws Exception { final T recordValue = record.getValue(); final long recordTimestamp = record.getTimestamp(); if (recordTimestamp == Long.MIN_VALUE) { throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in interval stream joins need to have meaningful timestamps."); } if (isLate(recordTimestamp, ctx, record)) { return; } addToBuffer(ourBuffer, recordValue, recordTimestamp); boolean flag = false; for (Map.Entry >> bucket : otherBuffer.entries()) { final long timestamp = bucket.getKey(); if (timestamp < recordTimestamp + relativeLowerBound || timestamp > recordTimestamp + relativeUpperBound) { continue; } for (BufferEntry entry : bucket.getValue()) { flag = true; if (isLeft) { out.collect(userProcess(recordValue, entry.getValue())); } else { out.collect(userProcess(entry.getValue(), recordValue)); } } } if (!flag && isLeft) { out.collect(userProcess(recordValue, null)); } long cleanupTime = (relativeUpperBound > 0L) ? recordTimestamp + relativeUpperBound : recordTimestamp; ctx.timerService().registerEventTimeTimer(cleanupTime); } private boolean isLate(long timestamp, Context ctx, StreamRecord value) { long currentWatermark = ctx.timerService().currentWatermark(); return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark; } private static void addToBuffer( final MapState >> buffer, final T value, final long timestamp ) throws Exception { List > elemsInBucket = buffer.get(timestamp); if (elemsInBucket == null) { elemsInBucket = new ArrayList<>(); } elemsInBucket.add(new BufferEntry<>(value, false)); buffer.put(timestamp, elemsInBucket); }}
以下是一个使用IntervalLeftOuterJoinFunction的示例:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.api.java.tuple.Tuple4;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.api.common.time.Time;import java.time.Duration;import java.util.Arrays;import java.util.List;public class IntervalOuterJoinExample { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); List > tuple3List1 = Arrays.asList( new Tuple3<>("A", 10, 1690700400000L), new Tuple3<>("A", 11, 1690700402000L), new Tuple3<>("B", 2, 1690700400021L), new Tuple3<>("C", 3, 1690700400002L), new Tuple3<>("D", 3, 1690700400003L) ); List > tuple3List2 = Arrays.asList( new Tuple3<>("A", 13, 1690700400000L), new Tuple3<>("A", 12, 1690700400000L), new Tuple3<>("B", 21, 1690700400001L), new Tuple3<>("C", 31, 1690700400002L), new Tuple3<>("D", 41, 1690700400003L) ); DataStream > dataStream1 = env.fromCollection(tuple3List1) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))) .withTimestampAssigner((element, timestamp) -> element.f2); DataStream > dataStream2 = env.fromCollection(tuple3List2) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))) .withTimestampAssigner((element, timestamp) -> element.f2); SingleOutputStreamOperator > newDataStream = dataStream1.connect(dataStream2) .keyBy(f -> f.f0, t -> t.f0) .process(new IntervalLeftOuterJoinFunction<>( Time.milliseconds(-10), Time.milliseconds(10), true, true, TypeInformation.of(new TypeHint >()), TypeInformation.of(new TypeHint >()))); newDataStream.print(); env.execute("Flink Interval Join Example Job"); }} 如果我们希望实现普通的内连接、左连接或右连接,不考虑窗口和Interval,可以将基于Interval Join的Function进行修改,使其不再判断两边流的状态是否过期。同时,为了避免状态过大,可以为状态设置一个ttl(生存时间)。
总之,DataStream的双流Join在Flink中提供了强大的数据处理能力,通过灵活的配置和丰富的操作,可以满足不同的业务需求。
转载地址:http://mtefk.baihongyu.com/