当前位置:首页 > 科技  > 软件

聊聊Flink:这次把Flink的触发器(Trigger)、移除器(Evictor)讲透

来源: 责编: 时间:2024-03-27 17:40:10 117观看
导读一、触发器(Trigger)Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。每个 WindowAssigner 都有一个默认的 Trigger。如果默认 trigger 无法满足你的需要,你可以在 trigger(…) 调用

一、触发器(Trigger)

Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。每个 WindowAssigner 都有一个默认的 Trigger。如果默认 trigger 无法满足你的需要,你可以在 trigger(…) 调用中指定自定义的 trigger。yRV28资讯网——每日最新资讯28at.com

1.1 Flink中预置的Trigger

窗口的计算触发依赖于窗口触发器,每种类型的窗口都有对应的窗口触发机制,都有一个默认的窗口触发器,触发器的作用就是去控制什么时候来触发计算。flink内部定义多种触发器,每种触发器对应于不同的WindowAssigner。常见的触发器如下:yRV28资讯网——每日最新资讯28at.com

  • EventTimeTrigger:通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
  • ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
  • ProcessingTimeoutTrigger:可以将任何触发器转变为超时触发器。
  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
  • CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
  • PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
  • NeverTrigger:任何时候都不触发窗口计算

1.2 Trigger的抽象类

Trigger 接口提供了五个方法来响应不同的事件:yRV28资讯网——每日最新资讯28at.com

  • onElement() 方法在每个元素被加入窗口时调用。
  • onEventTime() 方法在注册的 event-time timer 触发时调用。
  • onProcessingTime() 方法在注册的 processing-time timer 触发时调用。
  • canMerge() 方法判断是否可以合并。
  • onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。
  • clear() 方法处理在对应窗口被移除时所需的逻辑。

触发器接口的源码如下:yRV28资讯网——每日最新资讯28at.com

@PublicEvolvingpublic abstract class Trigger<T, W extends Window> implements Serializable {    private static final long serialVersionUID = -4104633972991191369L;    /**     * Called for every element that gets added to a pane. The result of this will determine whether     * the pane is evaluated to emit results.     *     * @param element The element that arrived.     * @param timestamp The timestamp of the element that arrived.     * @param window The window to which the element is being added.     * @param ctx A context object that can be used to register timer callbacks.     */    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)            throws Exception;    /**     * Called when a processing-time timer that was set using the trigger context fires.     *     * @param time The timestamp at which the timer fired.     * @param window The window for which the timer fired.     * @param ctx A context object that can be used to register timer callbacks.     */    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)            throws Exception;    /**     * Called when an event-time timer that was set using the trigger context fires.     *     * @param time The timestamp at which the timer fired.     * @param window The window for which the timer fired.     * @param ctx A context object that can be used to register timer callbacks.     */    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)            throws Exception;    /**     * Returns true if this trigger supports merging of trigger state and can therefore be used with     * a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.     *     * <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,     * OnMergeContext)}     */    public boolean canMerge() {        return false;    }    /**     * Called when several windows have been merged into one window by the {@link     * org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.     *     * @param window The new window that results from the merge.     * @param ctx A context object that can be used to register timer callbacks and access state.     */    public void onMerge(W window, OnMergeContext ctx) throws Exception {        throw new UnsupportedOperationException("This trigger does not support merging.");    }    /**     * Clears any state that the trigger might still hold for the given window. This is called when     * a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and     * {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as     * state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.     */    public abstract void clear(W window, TriggerContext ctx) throws Exception;    // ------------------------------------------------------------------------    /**     * A context object that is given to {@link Trigger} methods to allow them to register timer     * callbacks and deal with state.     */    public interface TriggerContext {        // ...    }    /**     * Extension of {@link TriggerContext} that is given to {@link Trigger#onMerge(Window,     * OnMergeContext)}.     */    public interface OnMergeContext extends TriggerContext {        <S extends MergingState<?, ?>> void mergePartitionedState(                StateDescriptor<S, ?> stateDescriptor);    }}

关于上述方法,需要注意三件事:yRV28资讯网——每日最新资讯28at.com

(1)前三个方法返回TriggerResult枚举类型,其包含四个枚举值:yRV28资讯网——每日最新资讯28at.com

  • CONTINUE:表示对窗口不执行任何操作。即不触发窗口计算,也不删除元素。
  • FIRE:触发窗口计算,但是保留窗口元素。
  • PURGE:不触发窗口计算,丢弃窗口,并且删除窗口的元素。
  • FIRE_AND_PURGE:触发窗口计算,输出结果,然后将窗口中的数据和窗口进行清除。

源码如下:yRV28资讯网——每日最新资讯28at.com

public enum TriggerResult {    // 不触发,也不删除元素    CONTINUE(false, false),    // 触发窗口,窗口出发后删除窗口中的元素    FIRE_AND_PURGE(true, true),    // 触发窗口,但是保留窗口元素    FIRE(true, false),    // 不触发窗口,丢弃窗口,并且删除窗口的元素    PURGE(false, true);    // ------------------------------------------------------------------------    private final boolean fire;    private final boolean purge;    TriggerResult(boolean fire, boolean purge) {        this.purge = purge;        this.fire = fire;    }    public boolean isFire() {        return fire;    }    public boolean isPurge() {        return purge;    }}

(2) 每一个窗口分配器都拥有一个属于自己的 Trigger,Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除,当定时器触发后,会调用对应的回调返回,返回TriggerResult。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。yRV28资讯网——每日最新资讯28at.com

1.3 ProcessingTimeTrigger源码分析

@PublicEvolvingpublic class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {    private static final long serialVersionUID = 1L;    private ProcessingTimeTrigger() {}    @Override    public TriggerResult onElement(            Object element, long timestamp, TimeWindow window, TriggerContext ctx) {        ctx.registerProcessingTimeTimer(window.maxTimestamp());        return TriggerResult.CONTINUE;    }    @Override    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)            throws Exception {        return TriggerResult.CONTINUE;    }    @Override    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {        return TriggerResult.FIRE;    }    @Override    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {        ctx.deleteProcessingTimeTimer(window.maxTimestamp());    }    @Override    public boolean canMerge() {        return true;    }    @Override    public void onMerge(TimeWindow window, OnMergeContext ctx) {        // only register a timer if the time is not yet past the end of the merged window        // this is in line with the logic in onElement(). If the time is past the end of        // the window onElement() will fire and setting a timer here would fire the window twice.        long windowMaxTimestamp = window.maxTimestamp();        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {            ctx.registerProcessingTimeTimer(windowMaxTimestamp);        }    }    @Override    public String toString() {        return "ProcessingTimeTrigger()";    }    /** Creates a new trigger that fires once system time passes the end of the window. */    public static ProcessingTimeTrigger create() {        return new ProcessingTimeTrigger();    }}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())将会注册一个ProcessingTime定时器,时间参数是window.maxTimestamp(),也就是窗口的最终时间,当时间到达这个窗口最终时间,定时器触发并调用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,触发窗口中数据的计算,但是会保留窗口元素。yRV28资讯网——每日最新资讯28at.com

需要注意的是ProcessingTimeTrigger类只会在窗口的最终时间到达的时候触发窗口函数的计算,计算完成后并不会清除窗口中的数据,这些数据存储在内存中,除非调用PURGE或FIRE_AND_PURGE,否则数据将一直存在内存中。实际上,Flink中提供的Trigger类,除了PurgingTrigger类,其他的都不会对窗口中的数据进行清除。yRV28资讯网——每日最新资讯28at.com

EventTimeTriggerr在onElement设置的定时器:yRV28资讯网——每日最新资讯28at.com

图片图片yRV28资讯网——每日最新资讯28at.com

EventTime通过registerEventTimeTimer注册定时器,在内部Watermark达到或超过Timer设定的时间戳时触发。yRV28资讯网——每日最新资讯28at.com

二、移除器(Evictor)

2.1 Evictor扮演的角色

图片图片yRV28资讯网——每日最新资讯28at.com

当一个元素进入stream中之后,一般要经历Window(开窗)、Trigger(触发器)、Evitor(移除器)、Windowfunction(窗口计算操作),具体过程如下:yRV28资讯网——每日最新资讯28at.com


yRV28资讯网——每日最新资讯28at.com

  • Window中的WindowAssigner(窗口分配器)定义了数据应该被分配到哪个窗口中,每一个 WindowAssigner都会有一个默认的Trigger,如果用户在代码中指定了窗口的trigger,默认的 trigger 将会被覆盖。
  • Trigger上会有定时器,用来决定一个窗口何时能够被计算或清除。Trigger的返回结果可以是 continue(不做任何操作),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口可以被重复计算多次知道它被 purge 了。在purge之前,窗口会一直占用着内存。
  • 当Trigger fire了,窗口中的元素集合就会交给Evictor(如果指定了的话)。Evictor 主要用来遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。如果没有 Evictor 的话,窗口中的所有元素会一起交给WindowFunction进行计算。
  • WindowFunction收到了窗口的元素(可能经过了 Evictor 的过滤),并计算出窗口的结果值,并发送给下游。窗口计算操作有很多,比如预定义的sum(),min(),max(),还有 ReduceFunction,WindowFunction。WindowFunction 是最通用的计算函数,其他的预定义的函数基本都是基于该函数实现的。

现在,大致了解了Evitor(移除器)扮演的角色和移除器在流中的哪个位置,让我们继续看为何使用Evictor。yRV28资讯网——每日最新资讯28at.com

Evictor接口定义如下:yRV28资讯网——每日最新资讯28at.com

图片图片yRV28资讯网——每日最新资讯28at.com

evictBefore()包含要在窗口函数之前应用的清除逻辑,而evictAfter()包含要在窗口函数之后应用的清除逻辑。应用窗口函数之前清除的元素将不会被窗口函数处理。yRV28资讯网——每日最新资讯28at.com

窗格是具有相同Key和相同窗口的元素组成的桶,即同一个窗口中相同Key的元素一定属于同一个窗格。一个元素可以在多个窗格中(当一个元素被分配给多个窗口时),这些窗格都有自己的清除器实例。yRV28资讯网——每日最新资讯28at.com

注:window默认没有evictor,一旦把window指定Evictor,该window会由EvictWindowOperator类来负责操作。yRV28资讯网——每日最新资讯28at.com

2.2 Flink内置的EvitoryRV28资讯网——每日最新资讯28at.com

  • CountEvictor:保留窗口中用户指定的元素数量,并丢弃窗口缓冲区剩余的元素。
  • DeltaEvictor:依次计算窗口缓冲区中的最后一个元素与其余每个元素之间的delta值,若delta值大于等于指定的阈值,则该元素会被移除。使用DeltaEvictor清除器需要指定两个参数,一个是double类型的阈值;另一个是DeltaFunction接口的实例,DeltaFunction用于指定具体的delta值计算逻辑。
  • TimeEvictor:传入一个以毫秒为单位的时间间隔参数(例如以size表示),对于给定的窗口,取窗口中元素的最大时间戳(例如以max表示),使用TimeEvictor清除器将删除所有时间戳小于或等于max-size的元素(即清除从窗口开头到指定的截止时间之间的元素)。

2.2.1 CountEvictor

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {    if (size <= maxCount) {        // 小于最大数量,不做处理        return;    } else {        int evictedCount = 0;        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){            iterator.next();            evictedCount++;            if (evictedCount > size - maxCount) {                break;            } else {                // 移除前size - maxCount个元素,只剩下最后maxCount个元素                iterator.remove();            }        }    }}

2.2.2 DeltaEvictor

DeltaEvictor通过计算DeltaFunction的值(依次传入每个元素和最后一个元素),并将其与threshold进行对比,如果DeltaFunction计算结果大于等于threshold,则该元素会被移除。DeltaEvictor的实现如下:yRV28资讯网——每日最新资讯28at.com

private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {    // 获取最后一个元素    TimestampedValue<T> lastElement = Iterables.getLast(elements);    for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){        TimestampedValue<T> element = iterator.next();        // 依次计算每个元素和最后一个元素的delta值,同时和threshold的值进行比较        // 若计算结果大于threshold值或者是相等,则该元素会被移除        if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {            iterator.remove();        }    }}

2.2.3 TimeEvictor

TimeEvictor以时间为判断标准,决定元素是否会被移除。TimeEvictor会获取窗口中所有元素的最大时间戳currentTime,currentTime减去窗口大小(windowSize) 可得到能保留最久的元素的时间戳evictCutoff,然后再遍历窗口中的元素,如果元素的时间戳小于evictCutoff,就执行移除操作,否则不移除。具体逻辑如下图所示:yRV28资讯网——每日最新资讯28at.com

TimeEvictor的代码实现如下:yRV28资讯网——每日最新资讯28at.com

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {    // 如果element没有timestamp,直接返回    if (!hasTimestamp(elements)) {        return;    }    // 获取elements中最大的时间戳(到来最晚的元素的时间)    long currentTime = getMaxTimestamp(elements);    // 截止时间为: 到来最晚的元素的时间 - 窗口大小(可以理解为保留最近的多久的元素)    long evictCutoff = currentTime - windowSize;    for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {        TimestampedValue<Object> record = iterator.next();        // 清除所有时间戳小于截止时间的元素        if (record.getTimestamp() <= evictCutoff) {            iterator.remove();        }    }}


yRV28资讯网——每日最新资讯28at.com

本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-79842-0.html聊聊Flink:这次把Flink的触发器(Trigger)、移除器(Evictor)讲透

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 「字符串」存在「栈内存」?那我可要杠你了哦!

下一篇: 被问到JVM类加载机制中双亲委派模型是什么,三次被破坏指什么?

标签:
  • 热门焦点
Top
Baidu
map