2020年五月月 发布的文章

[Flink] Flink 时间、窗口和函数等相关原理解析

Flink 的时间属性

在日常的流式数据处理中,很多操作都需要依赖时间属性,Flink根据时间产生位置的差异,将时间分成了三种类型分别是

  • Event Time:事件在设备上发生的时间
  • Processing Time:事件在系统中被处理的时间
  • Ingestion Time:数据进入到流计算处理系统的时间

对于 Event Time 来说,它是一个不可变的属性,它在流计算系统中各个节点流转,始终不变,Processing Time 则依赖于处理该事件机器的本地时钟,在系统中流转,该属性是变化的,而 Ingestion Time 则是数据进入到流计算系统时的时间。

Window

窗口是流计算常用的计算方式之一,Flink用窗口将无限连续的数据流按照一定规则划分成为一个个有限的数据集,我们可以利用窗口对窗口内的数据进行聚合运算,从而得到一定范围内的一些数值统计结果

在Flink中Window只有一个属性,就是maxTimestamp,表示这个窗口的最大边界,它的实现类分别为:

  • GlobalWindow:全局窗口,单例,最大的maxTimestamp为Long的最大值
  • TimeWindow:表示一个时间间隔,拥有[start, end)的开始时间和结束时间,

Evictor

它剔除元素的时机是在触发器触发之后,在窗口被处理(apply windowFunction)之前被触发,它在Flink的实现类为:

  • CountEvictor:保持一定数量的Evictor
  • DeltaEvictor:待了解
  • TimeEvictor:基于保留时间作为剔除规则,将大于保存时间的elements剔除

Windows Assigner

在流式计算的系统中,WindowAssigner负责分配0或者多个Window给一个element,我们可以根据WindowAssigner分配窗口的方式归纳为四类:

  • Tumbling Windows(滚动窗口):根据固定时间或大小划分,窗口不重叠,适合按照固定大小周期统计某一个指标
  • Sliding Windows(滑动窗口):在滚动窗口的基础上,增加滑动属性,允许窗口发生重叠,适合根据用户设定的统计频率计算指定窗口的大小统计指标,例如每隔30s统计最近10min的用户
  • Session Windows(会话窗口):主要将某个时间段活跃较高的数据聚成一个窗口,触发条件是Session Gap,适合非连续数据处理或周期产生的数据
  • Global Windows (全局窗口):将相同的Key分配到单个窗口中计算结果,没有开始时间和结束时间,需要借助Trigger触发,否则数据一直保存在内存中

Windows Trigger

数据进入到窗口之后,窗口是否触发WindowFunction计算,取决于窗口是否满足触发条件,每种类型的窗口都有对应的触发机制,常见的Trigger有:

  • CountTrigger:通过判断接入的数据量是否超过设定的阈值决定是否触发窗口
  • EventTimeTrigger:通过比较WaterMark和窗口的EndTime判断窗口是否触发
  • ProcessingTimeTrigger:通过比较ProcessTime和窗口的EndTime判断窗口是否触发
  • 其他

我们可以看下一般实现一个Trigger需要大约实现哪些方法:

  • onElement:针对每一个进入窗口的element进行触发操作
  • onProcessingTime:系统时间时间定时器触发
  • onEventTime:事件时间定时器触发
  • onMerge:对多个窗口和状态进行合并
  • clear:清除窗口和状态

我们通过Trigger返回的TriggerResult的类型决定是否触发端口:

  • CONTINUE:等待,不触发窗口
  • FIRE_AND_PURGE:触发窗口计算同时清除元素
  • FIRE:触发窗口计算,不清除元素
  • PURGE:清除window中的元素

WindowFunction

说完了Trigger之后,我们先简单的了解下WindowFunction,它可以按照计算原理分成了两大类:

  1. 增量聚合函数:ReduceFuction, AggregateFunction, FlodFunction
  2. 全量窗口函数:ProcessWindowFuction

增量聚合函数基于中间状态计算结果,窗口只维护中间状态,不需要换成原数据,而全量窗口需要维护原始数据,如果接入的数据量或者时间较大,可能会导致程序的性能下降

WaterMark

因为基于 Event Time 的模式下,数据由于网络延时等原因,往往不能准时到达,所以导致数据进入到流计算系统时会出现乱序的情况, 所以需要一种手段去衡量,当前数据处理的进度,所以 WaterMark 就是解决这个问题而设计,它表示了小于 WaterMark 的时间的数据已经到达,如果 WaterMark 的时间超过了 Window 的EndTime,就会触发窗口的计算

WaterMark是如何生成的?

目前Flink支持两种方式指定 Timestamps 和生成 WaterMark:

  1. 在Source Function中定义,SourceContenxt中可以调用, ctx.emitWatermark 方法进行 Watermark 的传递
  2. 自定义Timestamp Assigner 和 Watermark Generator 生成

WaterMark的生成形式上分为两种类型:

  1. Periodic WaterMarks:根据事件间隔周期性的生成WaterMark,例如DataStream.assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
  2. Punctuated WaterMarks:依赖于数据流中的特殊元素生成WaterMark,例如DataStream.assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)

对于AssignerWithPunctuatedWatermarks/AssignerWithPeriodicWatermarks,最终都会生成一个TimestampsAndXXXWatermarksOperator加入到transform中,例如:

    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

        // match parallelism to input, otherwise dop=1 sources could lead to some strange
        // behaviour: the watermark will creep along very slowly because the elements
        // from the source go to each extraction operator round robin.
        final int inputParallelism = getTransformation().getParallelism();
        final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

        TimestampsAndPeriodicWatermarksOperator<T> operator =
                new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

        return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
                .setParallelism(inputParallelism);
    }

WindowOperator

因为关于窗口的操作基本都在WindowOperator中,所以,今天我们通过一个简单的例子:org.apache.flink.streaming.examples.join.WindowJoin 这个例子分析WindowOperator究竟做了什么,这里是主要的代码

public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
        DataStream<Tuple2<String, Integer>> grades,
        DataStream<Tuple2<String, Integer>> salaries,
        long windowSize) {

    return grades.join(salaries)
            .where(new NameKeySelector())
            .equalTo(new NameKeySelector())
            .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
            .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {
                @Override
                public Tuple3<String, Integer, Integer> join(
                                Tuple2<String, Integer> first,
                                Tuple2<String, Integer> second) {
                    return new Tuple3<String, Integer, Integer>(first.f0, first.f1, second.f1);
                }
            });
}

通过上面的例子,两个DataStream进行join操作产生JoinedStreams,而JoinedStreams底层的实现实际调用了DataStream1.coGroup(DataStream2)产生CoGroupedStreams,而最终将数据封装成TaggedUnion类型,将两个流进行union,然后转成KeyedStream,这样两个数据流所有的相同的key都会汇聚到相同的window,最后调用window方法,得到WindowedStream,然后传入WindowFunction,进行函数计算

    public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
        //clean the closure
        function = input1.getExecutionEnvironment().clean(function);

        UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
        UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);

        DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
                .map(new Input1Tagger<T1, T2>())
                .setParallelism(input1.getParallelism())
                .returns(unionType);
        DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
                .map(new Input2Tagger<T1, T2>())
                .setParallelism(input2.getParallelism())
                .returns(unionType);

        DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

        // we explicitly create the keyed stream to manually pass the key type information in
        windowedStream =
                new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
                .window(windowAssigner);

        if (trigger != null) {
            windowedStream.trigger(trigger);
        }
        if (evictor != null) {
            windowedStream.evictor(evictor);
        }
        if (allowedLateness != null) {
            windowedStream.allowedLateness(allowedLateness);
        }

        return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
    }

在最后传入的CoGroupWindowFunction中,它最终会被封装为InternalIterableWindowFunction 和 WindowAssigner、trigger一起放入到WindowOperator里面

        ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
            input.getType().createSerializer(getExecutionEnvironment().getConfig()));

        operator =
            new WindowOperator<>(windowAssigner,
                windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                keySel,
                input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                stateDesc,
                function,
                trigger,
                allowedLateness,
                lateDataOutputTag);
    }

    return input.transform(opName, resultType, operator);

我们可以先简单的看下WindowOperator有哪些必要的属性:

  • WindowAssigner:分配element到对应的Window
  • Trigger:触发window计算
  • StateDescriptor(windowStateDescriptor):窗口状态的描述符(window-contents)
  • InternalAppendingState(windowState)
  • InternalTimerService:用于注册定时器

接下来我们简单的看下当一个element来临时,它的处理逻辑:

  1. 通过windowAssigner为element找到对应的窗口集合
  2. 将当前的数据加入到windowState(ListState)中
  3. 构建这个Key的triggerContext,调用onElement的方法,返回TriggerResult
  4. 如果TriggerResult是FIRE类型,则执行窗口计算emitWindowContents,否则将window注册到internalTimerService中,等待触发

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);
        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;
    
        final K key = this.<K>getKeyedStateBackend().getCurrentKey();
    
    ...
            for (W window: elementWindows) {
    
                // drop if the window is already late
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;
    
                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());
    
                triggerContext.key = key;
                triggerContext.window = window;
    
                TriggerResult triggerResult = triggerContext.onElement(element);
    
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }
    
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }
        }
    

    … }

Operator中的internalTimerService,主要由InternalTimeServiceManager管理,InternalTimeServiceManager的主要职责有:

  • 维护了一个timerServices的HashMap,例如这里的(window-timers,InternalTimerServiceImpl)
  • 当Operator进行Snapshot时,将数据序列化到外界存储中
  • 当Operator进行Restore时,恢复相关的数据结构
  • 当WaterMark到来时,遍历执行InternalTimerServiceImpl的advanceWatermark方法,从而触发Triggerable实现类的onEventTime的实现逻辑

InternalTimerServiceImpl(internalTimerService)内部维护了两个优先级队列eventTimeTimersQueue,processingTimeTimersQueue 用来存储对应的Timer,对于Event time模式来说,当Watermark到来时,会找出队列中已经排好序的timer调用Triggerable.onEventTime(WindowOperator也实现了Triggerable接口),接下来通过Trigger.onEventTime去判断TriggerResult去决定触发窗口

OneInputStreamTask.emitWatermark
    -> AbstractStreamOperator.processWatermark
        -> InternalTimeServiceManager.advanceWatermark
            -> InternalTimerServiceImpl.advanceWatermark
                -> triggerTarget.onEventTime(WindowOperator.onEventTime)
                    -> triggerContext.onEventTime
                        -> EventTimeTrigger.onEventTime

而对于ProcessingTime模式来说,不需要外界触发,通过内部的一个ScheduledThreadPoolExecutor,在registerTimer时,将定时任务的callback函数注册到定时器中,到时间则触发,最终也是调用Triggerable.onProcessingTime方法,触发窗口计算

总 结

这边文章是一直以来的一些笔记总结,从Flink的时间概念开始,到和时间相关的一些特性,例如窗口,和触发器,最终通过WindowJoin这个例子简单的了解了WindowOperator在不同的时间类型下是如何工作的,进一步的加深对Flink的理解

[Flink] Flink Kafka Connector 实现机制简析

一、FlinkKafkaConnector 能做什么?

Flink 主要由 FlinkKafkaConsumer、FlinkKafkaProducer 两部分组成,它为 Flink 应用提供了用来读取/写入数据到 Kafka 能力,并在 Kafka Consumer 中结合了自身的提供的 Checkpointing 机制为 Flink 应用提供了 EXACTLY_ONCE 的语义,它并不完全依赖 Kafka 的 Consumer group 来对 Offset进行记录,而是在内部利用 Checkpoint 中的状态记录了这些的偏移量来完成这个功能

二、FlinkKafkaConsumer 实现分析

2.1 FlinkKafkaConsumer 的运行机制

FlinkKafkaConsumer 是一个不断从 Kafka 获取的一个流式数据的数据源,它可以同时的在不同的并行实例中运行,从一个或者多个 Kafka partition 中拉取数据

我们可以先简单的从类图的关系了解下 FlinkKafkaConsumer 的实现,上面是 FlinkKafkaConsumer 的之间的关系,不同 Kafka 版本实现的 FlinkKafkaConsumer 都基于 FlinkKafkaConsumerBase 这个抽象类。在 FlinkKafkaConsumerBase 中,它的主要负责初始化一些列对象来构建KafkaFetcher,用于获取 Kafka 上面的数据,如果开启了 Partition Discoverer 功能,则启动 DiscoveryLoopThread 定期获取 KafkaTopicPartition 信息。并将这些信息更新到 KafkaFetcher 中,从而获取新的 Topic 或 Partition 数据

在 KafkaFetcher 里,它并没有直接使用 KafkaClient 进行直接的数据读取,它通过创建了一条 KafkaConsumerThread 线程定时 ( flink.poll-timeout, default 100ms) 的从 Kafka 拉取数据, 并调用 Handover.produce 将获取到的 Records 放到 Handover中,对于 Handover 来说,它相当于一个阻塞对列,KafkaFetcher 线程和 KafkaConsumerThread 线程通过 Handover 进行通信,KafkaFetcher 线程的 run 方法中轮询 handover.pollNext 获取来自KafkaConsumerThread 的数据,否则则阻塞,如果获取到数据,则通过 SourceContext 将数据 emit 到下游

2.2 FlinkKafkaConsumer 的状态管理

和大多数有状态的算子一样,为了容错和更容易的进行水平扩展,它也实现了 CheckpointedFunction,CheckpointListener 接口,当 checkpoint 开启时,snapshotState 方法会被周期性的调用,就会将当前 fetcher 提交的 KafkaTopicPartition 信息和 Offset 定期的存储到状态存储 ListState 上,并在所有任务都成功 checkpoint 时,notifyCheckpointComplete 方法将会被调用,并将等待提交的 offset,从 pendingOffsetsToCommit 取出,并提交到 Kafka 上

三、FlinkKafkaProducer 实现分析

3.1 FlinkKafkaProducer 的运行机制

FlinkKafkaProducer实现了Flink Sink的接口,能够将上游的数据写入到Kafka对应的Topic中,在默认情况下,使用AT_LEAST_ONCE语义

通过类图的关系看出,FlinkKafkaProducer 是 TwoPhaseCommitSinkFunction 的一个实现类,在 Flink 中,TwoPhaseCommitSinkFunction 被推荐为Sink实现 EXACTLY_ONCE 语义的基类,它也需要配合 CheckpointedFunction,CheckpointListener 使用

这里简单的说说什么是2PC-二阶段提交协议?

二阶段提交(Two-phaseCommit)是指,在计算机网络以及数据库领域内,为了使基于分布式系统架构下的所有节点在进行事务提交时保持一致性而设计的一种算法(Algorithm)。通常,二阶段提交也被称为是一种协议(Protocol))。

在分布式系统中,每个节点虽然可以知晓自己的操作时成功或者失败,却无法知道其他节点的操作的成功或失败。

当一个事务跨越多个节点时,为了保持事务的ACID特性,需要引入一个作为协调者的组件来统一掌控所有节点(称作参与者)的操作结果并最终指示这些节点是否要把操作结果进行真正的提交(比如将更新后的数据写入磁盘等等)。

因此,二阶段提交的算法思路可以概括为: 参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。 所谓的两个阶段是指:

第一阶段:voting phase 投票阶段

第二阶段:commit phase 提交阶段

对于Flink的实现来说,第一阶段就是 preCommit 预提交,第二个阶段是提交阶段,协调者就是JM

所以,我们先看看 TwoPhaseCommitSinkFunction 的主要抽象方法:

  • invoke
  • beginTransaction
  • preCommit
  • commit
  • recoverAndCommit
  • abort
  • recoverAndAbort

和上面的 FlinkKafkaConsumer 类似,假设应用开启了 Checkpoint,那么这个 Operator 就会定时的收到来自上游的 barrier,就会调用TwoPhaseCommitSinkFunction 的 snapshotState 的方法,将当前的状态进行预提交( flush 数据到 Kafka ),并将这次 TransactionHolder 放到pendingCommitTransactions,开启新的 transaction,并将状态保存到 ListState 中

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    // this is like the pre-commit of a 2-phase-commit transaction
    // we are ready to commit and remember the transaction

    checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");

    long checkpointId = context.getCheckpointId();
    LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);

            // 进行预提交
    preCommit(currentTransactionHolder.handle);
            // 将flush后的currentTransactionHolder放入到pendingCommitTransactions中等待完成
    pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
    LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);

    currentTransactionHolder = beginTransactionInternal();
    LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);

    state.clear();
    state.add(new State<>(
        this.currentTransactionHolder,
        new ArrayList<>(pendingCommitTransactions.values()),
        userContext));
}

当所有的任务都 Checkpoint 完成,notifyCheckpointComplete 方法就会被调用,这时后就会调用 TwoPhaseCommitSinkFunction.commit 方法,从pendingCommitTransactions 中,找出小于这次 checkpointId 的 transactionHolder 执行提交,完成事务

说完了 preCommit 和 commit,在两次 Checkpoint 发生的间隔当中,会持续的执行 invoke 方法将数据的写到 Kafka,直到 snapshotState 方法被调用,旧的数据被进行预提交,同时生成新的事务,数据继续写入,直到所有任务Checkpoint完成,收到通知,对完成的checkpointId把事务进行正式的提交

四、总 结

Flink 在 Source 对 Kafka 进行消费拉取 Records,记录状态并定时 snapshot 对应的 topic,partition,offset 到 Flink 管理的状态中,Sink 持续发送数据,进行预提交,直到本次的 snapshot 都完成,Source 提交 Offset 到 Kafka,Sink 完成事务,完成整个流程的确认,当任务发生异常时,从状态中恢复或者从提交的 Kafka 的 Offset 进行恢复,能够恢复当时算子的一个 Snapshot 状态,从而实现端到端的只算一次。