博客
关于我
14.Flink双流Join——DataStream
阅读量:798 次
发布时间:2023-04-02

本文共 9867 字,大约阅读时间需要 32 分钟。

DataStream的双流Join在Flink中是一个强大的操作,广泛应用于数据处理和分析场景。以下将从实现机制、基于窗口的Join以及基于Interval的Join等多个方面详细阐述DataStream的双流Join机制。

一、实现机制

DataStream的双流Join与批Join的主要区别在于数据的特性。批Join处理的是有界数据,而DataStream则是无界数据。在无界数据上实现Join,需要引入一个“界”来限定数据的范围。这个“界”主要有两种表达方式:窗口和间隔。

在Flink中,Join的底层实现主要依赖于状态机制。系统会将定义的“界”内的数据存储到状态中,等待Join操作完成后,会清空状态。这种状态管理机制是实现Join操作的核心基础。


二、基于窗口的Join

2.1 内连接

Flink提供了内连接操作,可以直接通过以下代码实现:

stream.join(otherStream)    .where(keySelector)    .equalTo(keySelector)    .window(windowAssigner)    .apply(joinFunction);

一个具体的例子如下:

DataStream
> result = stream1.join(stream2) .where(tuple -> tuple.f0) .equalTo(tuple -> tuple.f0) .window(TumblingEventTimeWindows.of(Time.seconds(1))) .apply(new JoinFunction<...> { @Override public Tuple2
join(Tuple2
first, Tuple2
second) throws Exception { return new Tuple2<>(first.f0, first.f1 + second.f1); } });

Flink的Join操作在底层使用coGroup的方式实现。JoinedStreamsapply方法会将两个流中的相同键值的数据分别放入oneValuestwoValues中,然后通过双层循环遍历这两个列表,确保每个列表中的元素都能被处理,避免因为某一列表为空而导致数据未能输出。

2.2 外连接

Flink原生的SQL语法只支持内连接。实现外连接时,可以通过内连接的思路,使用coGroup操作来模拟外连接的效果。例如:

DataStream
> result = stream1.coGroup(stream2) .where(keySelector) .equalTo(keySelector) .window(windowAssigner) .apply(new CoGroupFunction<...> { @Override public void coGroup(Iterable
> first, Iterable
> second, Collector
> out) throws Exception { for (Tuple2
left : first) { boolean hasRight = false; for (Tuple2
right : second) { hasRight = true; out.collect(new Tuple3<>(left.f0, left.f1, right.f1)); } if (!hasRight) { out.collect(new Tuple3<>(left.f0, left.f1, null)); } } } });

三、基于Interval的Join

3.1 IntervalJoinOperator概述

IntervalJoinOperator是Flink中实现Interval Join的核心操作符。它利用MapState来缓存两条流的数据,状态的键是时间戳,值是用户定义的数据类型。一个状态可能对应多个值,存储方式是HashMap。通过重写OnEventTime函数,可以在特定时间触发状态清除。

3.2 内连接

内连接可以直接使用Flink提供的intervalJoin操作:

SingleOutputStreamOperator
> newDataStream = dataStream1.keyBy(f -> f.f0) .intervalJoin(dataStream2.keyBy(t -> t.f0)) .between(Time.seconds(-1), Time.seconds(1)) .process(new ProcessJoinFunction<...> { @Override public void processElement(Tuple3
left, Tuple3
right, Context ctx, Collector
> out) throws Exception { out.collect(new Tuple3<>(left.f0, left.f1, left.f2 + right.f2)); } });

3.3 外连接

由于Flink DataStream的内置Join操作仅支持内连接,实现外连接需要自定义IntervalLeftOuterJoinFunction。该函数参考了Flink的IntervalJoinOperator,主要通过缓存两条流的数据,并在特定时间范围内进行Join操作。以下是一个简单的实现示例:

public class IntervalLeftOuterJoinFunction
extends KeyedCoProcessFunction
{ private static final String LEFT_BUFFER = "LEFT_BUFFER"; private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; private final long lowerBound; private final long upperBound; private transient MapState
>> leftBuffer; private transient MapState
>> rightBuffer; private final TypeSerializer
leftTypeSerializer; private final TypeSerializer
rightTypeSerializer; public IntervalLeftOuterJoinFunction(Time lowerBound, Time upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive, TypeSerializer
leftTypeSerializer, TypeSerializer
rightTypeSerializer) { long millLowerBound = lowerBound.toMilliseconds(); long millUpperBound = upperBound.toMilliseconds(); Preconditions.checkArgument(millLowerBound <= millUpperBound, "lowerBound <= upperBound must be fulfilled"); this.lowerBound = lowerBoundInclusive ? millLowerBound : millLowerBound + 1L; this.upperBound = upperBoundInclusive ? millUpperBound : millUpperBound - 1L; this.leftTypeSerializer = leftTypeSerializer; this.rightTypeSerializer = rightTypeSerializer; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.milliseconds(upperBound)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); MapStateDescriptor
>> leftMapStateDescriptor = new MapStateDescriptor<>( LEFT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>( new IntervalJoinOperator.BufferEntrySerializer<>(leftTypeSerializer) ) ); leftMapStateDescriptor.enableTimeToLive(ttlConfig); MapStateDescriptor
>> rightMapStateDescriptor = new MapStateDescriptor<>( RIGHT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>( new IntervalJoinOperator.BufferEntrySerializer<>(rightTypeSerializer) ) ); rightMapStateDescriptor.enableTimeToLive(ttlConfig); this.leftBuffer = getRuntimeContext().getMapState(leftMapStateDescriptor); this.rightBuffer = getRuntimeContext().getMapState(rightMapStateDescriptor); } public abstract T3 userProcess(T1 leftValue, T2 rightValue); @Override public void processElement1(T1 value, Context ctx, Collector
out) throws Exception { processElement(new StreamRecord<>(value, ctx.timestamp()), leftBuffer, rightBuffer, lowerBound, upperBound, true, out, ctx); } @Override public void processElement2(T2 value, Context ctx, Collector
out) throws Exception { processElement(new StreamRecord<>(value, ctx.timestamp()), rightBuffer, leftBuffer, -upperBound, -lowerBound, false, out, ctx); } private void processElement( final StreamRecord
record, final MapState
>> ourBuffer, final MapState
>> otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft, Collector
out, Context ctx ) throws Exception { final T recordValue = record.getValue(); final long recordTimestamp = record.getTimestamp(); if (recordTimestamp == Long.MIN_VALUE) { throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in interval stream joins need to have meaningful timestamps."); } if (isLate(recordTimestamp, ctx, record)) { return; } addToBuffer(ourBuffer, recordValue, recordTimestamp); boolean flag = false; for (Map.Entry
>> bucket : otherBuffer.entries()) { final long timestamp = bucket.getKey(); if (timestamp < recordTimestamp + relativeLowerBound || timestamp > recordTimestamp + relativeUpperBound) { continue; } for (BufferEntry
entry : bucket.getValue()) { flag = true; if (isLeft) { out.collect(userProcess(recordValue, entry.getValue())); } else { out.collect(userProcess(entry.getValue(), recordValue)); } } } if (!flag && isLeft) { out.collect(userProcess(recordValue, null)); } long cleanupTime = (relativeUpperBound > 0L) ? recordTimestamp + relativeUpperBound : recordTimestamp; ctx.timerService().registerEventTimeTimer(cleanupTime); } private boolean isLate(long timestamp, Context ctx, StreamRecord
value) { long currentWatermark = ctx.timerService().currentWatermark(); return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark; } private static
void addToBuffer( final MapState
>> buffer, final T value, final long timestamp ) throws Exception { List
> elemsInBucket = buffer.get(timestamp); if (elemsInBucket == null) { elemsInBucket = new ArrayList<>(); } elemsInBucket.add(new BufferEntry<>(value, false)); buffer.put(timestamp, elemsInBucket); }}

3.4 使用方法

以下是一个使用IntervalLeftOuterJoinFunction的示例:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.typeinfo.TypeHint;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.api.java.tuple.Tuple4;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.api.common.time.Time;import java.time.Duration;import java.util.Arrays;import java.util.List;public class IntervalOuterJoinExample {    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        List
> tuple3List1 = Arrays.asList( new Tuple3<>("A", 10, 1690700400000L), new Tuple3<>("A", 11, 1690700402000L), new Tuple3<>("B", 2, 1690700400021L), new Tuple3<>("C", 3, 1690700400002L), new Tuple3<>("D", 3, 1690700400003L) ); List
> tuple3List2 = Arrays.asList( new Tuple3<>("A", 13, 1690700400000L), new Tuple3<>("A", 12, 1690700400000L), new Tuple3<>("B", 21, 1690700400001L), new Tuple3<>("C", 31, 1690700400002L), new Tuple3<>("D", 41, 1690700400003L) ); DataStream
> dataStream1 = env.fromCollection(tuple3List1) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))) .withTimestampAssigner((element, timestamp) -> element.f2); DataStream
> dataStream2 = env.fromCollection(tuple3List2) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))) .withTimestampAssigner((element, timestamp) -> element.f2); SingleOutputStreamOperator
> newDataStream = dataStream1.connect(dataStream2) .keyBy(f -> f.f0, t -> t.f0) .process(new IntervalLeftOuterJoinFunction<>( Time.milliseconds(-10), Time.milliseconds(10), true, true, TypeInformation.of(new TypeHint
>()), TypeInformation.of(new TypeHint
>()))); newDataStream.print(); env.execute("Flink Interval Join Example Job"); }}

四、Regular Join

如果我们希望实现普通的内连接、左连接或右连接,不考虑窗口和Interval,可以将基于Interval Join的Function进行修改,使其不再判断两边流的状态是否过期。同时,为了避免状态过大,可以为状态设置一个ttl(生存时间)。

总之,DataStream的双流Join在Flink中提供了强大的数据处理能力,通过灵活的配置和丰富的操作,可以满足不同的业务需求。

转载地址:http://mtefk.baihongyu.com/

你可能感兴趣的文章