![Flink内核原理与实现](https://wfqqreader-1252317822.image.myqcloud.com/cover/481/37323481/b_37323481.jpg)
3.5 函数体系
函数在Flink中叫作Function,开发者编写的函数叫作UDF(User Defined Function),当然Flink对于通用场景也内置了大量的预定义的通用UDF来简化开发,如Join、GroupBy、Sum等SQL语义等价的UDF。UDF在Flink的DataStream开发和SQL开发中被广泛使用。开发者使用UDF主要是实现非通用的计算逻辑,一般是业务逻辑。在本书语境中,UDF、Function、用户自定义函数的含义是相同的。
按照输入和输入的不同特点分类,Flink中的UDF大概分为3类(见图3-23)。
1. SourceFunction
无上游Function,SourceFunction直接从外部数据存储读取数据,所以SourceFunction所在的算子是起始,没有上游算子。
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/62_01.jpg?sign=1738894430-iqLK5KeB3nkzbqxNodFau6dmqnawJGSL-0-526099d320cc43c946e7ea4d55c7119b)
图3-23 Function分类与关系
2. SinkFunction
无下游Function,SinkFunction直接将数据写入外部存储,所以Sink函数所在的算子是作业的重点,没有下游算子。
3. 一般Function
一般的UDF函数用在作业的中间处理步骤中,其接口定义与SourceFunction和SinkFunction不同。一般UDF所在的算子有上游算子,也有下游算子。
Flink的一般UDF有单流输入和双流输入两种,从UDF输入、输出的模型来说,多流输入可以通过多个双流输入串联而成,这种设计比较简单实用,如图3-24所示。
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/62_02.jpg?sign=1738894430-R0eNE60Tvc9sEBi5Lk0KvpT9NkJmOKna-0-2dd20bd36104619b1fe96d7da8198d7f)
图3-24 多流输入转换为多层双流输入
SourceFunction和SinkFunction主要在Flink中的连接器使用,也会在自定义读取、写出数据的时候使用。其余的大量实现逻辑的函数都属于一般UDF。
3.5.1 函数层次
UDF在DataStream API层使用,Flink提供的函数体系从接口的层级来看,从高阶Function到低阶Function如图3-25所示。
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/62_03.jpg?sign=1738894430-W7aABn8SZWZvBd8M9dmTKKKuPiCjE28s-0-2e53b4337c2e43858d765fa6a7ef81d5)
图3-25 Function层次
Flink内置的DataStream上的API接口,如DataStream#map、DataStream#flatMap、DataStreamFilter#filter等,使用的都是高阶函数,开发者使用高阶函数的时候,无须关心定时器之类的底层概念,只需要关注业务逻辑即可。低阶函数即ProcessFunction。
无状态Function用来做无状态计算,使用比较简单,如MapFunction。无状态Function和RichFunction是一一对应的,如MapFunction对应RichMapFunction,如代码清单3-5所示。
代码清单3-5 MapFunction代码示例
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/63_01.jpg?sign=1738894430-INtlK3kHgyVQxlHq2rvTWleNvu3sj0x0-0-547f4b37d84d615b9338f7d26ef60590)
从上边的代码可以看出来,使用MapFunction只需实现Map方法即可,所以无状态Function一般都是直接继承接口,如Map接口,或者通过匿名类实现接口。
RichFunction相比无状态Function,有两方面的增强:
1)增加了open和close方法来管理Function的生命周期,在作业启动时,Function在open方法中执行初始化,在Function停止时,在close方法中执行清理,释放占用的资源等。无状态Function不具备此能力。
2)增加了getRuntimeContext和setRuntimeContext。通过RuntimeContext,RichFunction能够获取到执行时作业级别的参数信息,而无状态Function不具备此能力。
无状态Function天然是容错的,作业失败之后,重新执行即可,但是有状态的Function(RichFunction)需要处理中间结果的保存和恢复,待有了状态的访问能力,也就意味着Function是可以容错的,执行过程中,状态会进行快照然后备份,在作业失败,Function能够从快照中恢复回来。
3.5.2 处理函数
处理函数(ProcessFunction)可以访问流应用程序所有(非循环)基本构建块:
1)事件(数据流元素)。
2)状态(容错和一致性)。
3)定时器(事件时间和处理时间)。
ProcessFunction根据场景不同有几种实现,如图3-26所示。
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/63_02.jpg?sign=1738894430-ba405Cf1dn9cHu5GylXCoTyDkApqgBjy-0-208408c5fd4f7f67fcf8131cfd5417a8)
图3-26 ProcessFunction类体系
1)ProcessFunction:单流输入函数。
2)CoProcessFunction:双流输入函数。
3)KeyedProcessFunction:单流输入函数。
4)KeyedCoProcessFunction:双流输入函数。
Kyed ProcessFunction与Non-Keyed ProcessFunction的区别是,Keyed ProcessFunction只能用在KeyedStream上。
ProcessFunction和CoProcessFunction的区别是,CoProcessFunction是双流输入,而ProcessFunction是单流输入。
1.双流Join
下面是使用CoProcessFunction实现双流Join的例子。
(1)即时双流Join
其逻辑如下(见图3-27)。
1)创建1个State对象。
2)接收到输入流1事件后更新State。
3)接收到输入流2的事件后遍历State,根据Join条件进行匹配,将匹配后的结果发送到下游。
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/64_01.jpg?sign=1738894430-Oyg928BwXYXxKR3UvS84WammLjcFftj8-0-9b4fea4be7c57820a7ea9bfdfc6e72b3)
图3-27 双流即时Join
(2)延迟双流Join
在流式数据里,数据可能是乱序的,数据会延迟到达,并且为了提供处理效率,使用小批量计算模式,而不是每个事件触发一次Join计算,如图3-28所示。
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/64_02.jpg?sign=1738894430-WjZiICS84zmmzYZN27s9wQacmEOwzELG-0-9fc85c64cb7821570311812799741f50)
图3-28 双流延迟Join
其逻辑如下。
1)创建2个State对象,分别缓存输入流1和输入流2的事件。
2)创建1个定时器,等待数据的到达,定时延迟触发Join计算。
3)接收到输入流1事件后更新State。
4)接收到输入流2事件后更新State。
5)定时器遍历State1和State2,根据Join条件进行匹配,将匹配后的结果发送到下游。
2.延迟计算
在上面的延迟Join示例中,使用了计时器来暂存一批数据之后再触发计算,在流计算中这是非常常见的场景。在前面提到的批流合一的关键概念中,关键是Watermark和Window,在Flink中的窗口计算(WindowOperator)就是典型的延迟计算,使用Window暂存数据,使用Watermark触发Window的计算,如图3-29所示。在Blink Table & SQL中也大量使用了定时器。
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/65_01.jpg?sign=1738894430-sTw6bQX3vFEHTvcQkUzvWdclX0qWSmBi-0-8c6d8006a62e9f6f46503cc88987f2de)
图3-29 延迟计算过程
触发器在算子层面上提供支持,所有支持延迟计算的算子都继承了Triggerable接口。Triggerable接口主要定义了基于事件时间和基于处理时间的两种触发行为,如代码清单3-6所示。
代码清单3-6 Triggerable接口
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/65_02.jpg?sign=1738894430-ikZQTWasTJMbcEsm5CqPAJWLckn9Pf8V-0-2104c1dc6561d5995430f53a11e70fb7)
3.5.3 广播函数
前边介绍了单输入Function和双输入Function。在Flink 1.5.0版本中引入了广播状态模式,将一个数据流的内容广播到另一个流中,同时也引入了新的函数类型:广播函数。
广播函数的体系如图3-30所示。
在图3-30中可以看到,广播函数有BroadcastProcessFunction和KeyedBroadcastProcess Function,广播函数跟双流输入的处理函数类似,也有两个数据元素处理的接口,processElement()负责处理一般的数据流,processBroadcastElement()负责处理广播数据流。完整定义如代码清单3-7和代码清单3-8所示。
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/66_01.jpg?sign=1738894430-mN7uWo9JjfFg9ZX7C4pKEWY16uKU8jqH-0-912f015587fa5c39c1a9d7779cf5a942)
图3-30 广播函数体系
代码清单3-7 BroadcastProcessFunction抽象类
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/66_02.jpg?sign=1738894430-K5lDVlkA0V2POr9LwZ8ONSpJlXhLzKpX-0-ae53e1534feb5dfaa0c65fd5a546fe9a)
代码清单3-8 BroadcastProcessFunction类
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/66_03.jpg?sign=1738894430-DqOrt0yxEwp4huxnneMhPkkbVxrSQoq4-0-0cbbf4b8155f1c8fb4878dfe5fc0aa4c)
上面的两个广播函数BroadcastProcessFunction和KeyedBroadcastProcessFunction都是抽象类,所以在实际使用中,开发者需要实现其定义的抽象方法。
processElement()方法和processBroadcastElement()方法的区别在于:processElement只能使用只读的上下文ReadOnlyContext,而processBroadcastElement()方法则可以使用支持读写的上下文Context。这么设计看起来很奇怪,但是合理的。广播状态模式下,要求所有算子上的广播状态应完全一致,如果也允许processElement方法更新、删除广播状态中的数据,那么会使得算子之间的广播状态变得不一致,导致系统行为不可预测。在后边会介绍数据分区,数据分区会将数据流进行分流,交给下游的不同算子,那么不同算子接收的数据流就是不同的,如果开发者在processElement方法中更新了广播状态,必然会导致广播状态变得不一致。也许会有人说,在算子更新广播状态的时候,通知其他算子不就可以了吗?但是Flink中的平行算子之间没有通信接口,所以此处的设计强制要求processElement()不能更新广播状态。
注意,只有设计的强制要求还不够,processBroadcastElement()必须确保行为的不可变性,即无论什么时间、在哪个物理机器、广播数据是否乱序,都必须保证执行结果完全相同。比较典型的破坏不可变性的例子包括处理逻辑依赖于当前时间,不同的节点当前时间并不完全一致,而且还要考虑到作业恢复执行的情况,因此跟恢复之前的当前时间更是不可能相同。
3.5.4 异步函数
在介绍异步算子的时候提到了异步函数(AsyncFunction),异步函数就是对Java异步编程框架的封装。
异步函数的类体系如图3-31所示。
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/67_01.jpg?sign=1738894430-aKumbuOPz3umt4pmNqbCAEbUJYzg7jUw-0-f92299ee4c12bf27baaf4ea03f32fcc5)
图3-31 异步函数类体系
如图3-31所示,异步函数的抽象类RichAsyncFunction实现AsyncFunction接口,继承AbstractRichFunction获得了生命周期管理和FunctionContext的访问能力。
异步函数的接口中定义了两种行为,异步调用行为将调用结果封装到ResultFutrue中,同时提供了调用超时的处理,防止不释放资源,如代码清单3-9所示。
代码清单3-9 AsyncFunction接口
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/67_02.jpg?sign=1738894430-ZGu26rrwUqyj9fsAGwDE8RYiFZjVMpJ4-0-64b3e65547b7cd56e12db13bc1510498)
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/68_01.jpg?sign=1738894430-gdWrHVeH1erBA1qjQGj28BrF3ZVrBoPJ-0-44dc4522dd8b86096b376ce4dda4ef19)
3.5.5 数据源函数
数据源函数在Flink中叫作SourceFunction,Flink是一个计算引擎,其需要从外部读取数据,所以在Flink中设计了SourceFunction体系,专门用来从外部存储读取数据。SourceFunction是Flink作业的起点,一个作业中可以有多个起点,即读取多个数据源的数据。
SourceFunction体系如图3-32所示。
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/68_02.jpg?sign=1738894430-3ihXQXKdWis6XFY6SUN1CNun9StvMk1d-0-37ee9238044204ec1ad30f6a55d93195)
图3-32 SourceFunction体系
SourceFunction接口本身只定义了接口的业务逻辑相关行为,在实际使用中,一般会继承抽象类RichSourceFunction或RichParallelSourceFunction。这两个抽象类通过继承AbstractRichFunction获得了Function的生命周期管理、访问RuntimeContext的能力。
从类的定义上来说,RichSourceFunction和RichParallelSourceFunction的代码完全相同,甚至代码中的注释都基本相同,但是为什么要设计这两个类呢?
其实这两个类的差异在运行层面上,RichSourceFunction是不可并行的,并行度限定为1,超过1则会报错。而RichParallelSourceFunction是可并行的,并行度可以根据需要设定,并没有限制。差异体现在StreamExecutionEnvironment#addSource方法中,其对Function的类型进行了判断,如果是ParallelSourceFunction类型,则是可并行的。如代码清单3-10所示。
代码清单3-10 构造DataStreamSource
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/68_03.jpg?sign=1738894430-ZTEJt6eOGN6ae7pUHFKReMVImwh9E06V-0-89888e43f1d9e1b3db225081c39628ed)
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/69_01.jpg?sign=1738894430-RIVnVpAqc9rTNjbG44eiwykv9Lvln0b1-0-f9d73cb57f7efe036e77829daacd679c)
SourceFunction有几个比较关键的行为。
1)生命周期管理:在实际中,一般SourceFunction的实现类会同时继承AbstractRichFunction,所以其生命周期包含open、close、cancle三种方法,在生命周期方法中可以包含相应的初始化、清理等。
2)读取数据: 持续地从外部存储读取数据,不同的外部存储有不同的实现,如从Kafka读取数据依赖于Kafka Producer等。
3)向下游发送数据。
4)发送Watermark:生成Watermark并向下游发送,Watermark的生成参见“时间与窗口”章节。
5)空闲标记:如果读取不到数据,则将该Task标记为空闲,向下游发送Status#Idle,阻止Watermark向下游传递。
SourceFunction接口定义如代码清单3-11所示,SourceFunction中内嵌了SourceContext接口。
代码清单3-11 SourceFunction接口
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/69_02.jpg?sign=1738894430-zCtMMItjn78vB6RCvjwgBULVuZXEKMke-0-0474a4bebdad06e037ada91e65dc82e7)
上边SourceFunction定义的数据发送、Watermark发送、空闲标记实际上都定义在SourceContext中。
StreamSourceContexts中提供了生成不同类型SourceContext的实例的方法,从总体上按照带不带时间分为两类SourceContext如图3-33所示。
1. NonTimestampContext
NonTimestampContext为所有的元素赋予-1作为时间戳,也就意味着永远不会向下游发送Watermark。
使用Processing Time时使用此Context,使用Processing Time的时候向下游发送Watermark没有意义,在实际处理中,各个计算节点会根据本地时间定义触发器,触发执行Window类计算,而不是根据Watermark来触发。
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/70_01.jpg?sign=1738894430-lpMxIETEI8gkE1ZzJfdERKli8loW0yEQ-0-3d4b5f2164a4a4f7aa163c2125ec4511)
图3-33 SourceContext类体系
2. WatermarkContext
WatermarkContext定义了与Watermark相关的行为:
1)负责管理当前的StreamStatus,确保StreamStatus向下游传递。
2)负责空闲检测的逻辑,当超过设定的事件间隔而没有收到数据或者Watermark时,认为Task处于空闲状态。
WatermarkContext有两个实现类。
(1)AutomaticWatermarkContext
使用摄取时间(Ingestion Time)的时候,AutomaticWatermarkContext自动生成Watermark。在该Context中,启动WatermarkEmittingTask向下游发送Watermark,使用了一个定时器,其触发时间=(作业启动的时刻+Watermark周期×n),一旦启动之后,WatermarkEmittingTask会持续地自动注册定时器,向下游发送Watermark。
(2)ManualWatermarkContext
使用事件时间(Event Time)的时候,ManualWatermarkContext不会产生Watermark,而是向下游发送透传上游的Watermark。
3.5.6 输出函数
输出函数在Flink中叫作SinkFunction,负责将计算结果写入外部存储中,是作业终点,一个作业可以有多个Sink,即将数据写入不同的外部存储中。
SinkFunction类体系如图3-34所示。
SinkFunction只是单纯地定义了数据写出到外部存储的行为,并没有Function的生命周期管理行为,函数的生命周期定义在AbstractRichFunction中。在Connector中实际实现Sink的时候,基本都是从RickSinkFunction和TwoPhaseCommitSinkFunction继承。
TowPhaseCommitSinkFunction是Flink中实现端到端Exactly-Once的关键函数,提供框架级别的端到端Exactly-Once的支持,其在实现过程中与Flink检查点机制结合,在第13章有详细介绍。
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/71_01.jpg?sign=1738894430-uMLeXCkUwu22DflTiqxgJHgTwobC3roZ-0-0ff5762ecb3a00a2dab810623c7b5de1)
图3-34 SinkFunction类体系
3.5.7 检查点函数
检查点函数就是在Flink中支持函数级别状态的保存和恢复的函数。为了实现函数级别的State管理,Flink中设计了CheckpointedFunction和ListCheckpointed接口。在检查点函数接口中主要设计了状态快照的备份和恢复两种行为。
CheckpointedFunction虽然已经标记为废弃,但仍然是现在用得最多的接口。当保存状态之后,其snapshotStat()会被调用,用于备份保存状态到外部存储。当恢复状态的时候,其initializeState()方法负责初始化State,执行从上一个检查点恢复状态的逻辑。
CheckpointedFunction接口定义如代码清单3-12所示。
代码清单3-12 CheckpointedFunction接口
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/71_02.jpg?sign=1738894430-nm38fF8A0BVtCzA04uf0CsAEdrqIJd7O-0-2af320347130d71ec95dd406f0daffef)
ListCheckpointed接口的行为跟Checkpointed行为类似,除了提供状态管理能力之外,修改作业并行度的时候,还提供了状态重分布的支持。ListCheckpointed接口定义如代码清单3-13所示。
代码清单3-13 ListCheckpointed接口
![](https://epubservercos.yuewen.com/AB30C5/19773741101350906/epubprivate/OEBPS/Images/71_03.jpg?sign=1738894430-PLFirpXZSNmo24aVMVBeuEvWBbtnojJV-0-95393fcfe55e89cb0dc4788c874afcc7)