官网:https://flink.apache.org/
一、Flink的重要特点 1)事件驱动型(Event-driven)
事件驱动的应用程序
是一个有状态的应用程序,它从一个或多个事件流接收事件,并通过触发计算、状态更新或外部操作对传入事件作出反应。
事件驱动应用程序
是传统应用程序设计的一种发展,它具有分离的计算和数据存储层。在这种体系结构中,应用程序从远程事务数据库读取数据并将其持久化。
相反,事件驱动应用程序
基于有状态流处理应用程序。在这个设计中,数据和计算被放在同一个位置,从而产生本地(内存或磁盘)数据访问。容错是通过定期将检查点写入远程持久存储来实现的。下图描述了传统应用程序体系结构与事件驱动应用程序之间的区别。
kafka作为消息队列就是一种典型的事件驱动型应用。
2) 流、批(stream,micro-batching)
Spark
中,一切都是批次组成的,离线数据是一个大批次,实时数据是一个个无限的小批次组成的。
Flink
中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流。
3)分层API
越顶层越抽象,最高层级的抽象是SQL。
越底层越具体
二、Flink使用(word count) 1、设置pom文件
注意下面的依赖设置,使用的是scala 2.12.x版本,Flink版本为1.10.1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > cn.buildworld.flink</groupId > <artifactId > FlinkTrain</artifactId > <version > 1.0-SNAPSHOT</version > <dependencies > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-scala_2.12</artifactId > <version > 1.10.1</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-streaming-scala_2.12</artifactId > <version > 1.10.1</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > net.alchim31.maven</groupId > <artifactId > scala-maven-plugin</artifactId > <version > 4.4.0</version > <executions > <execution > <goals > <goal > compile</goal > </goals > </execution > </executions > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-assembly-plugin</artifactId > <version > 3.0.0</version > <configuration > <descriptorRefs > <descriptorRef > jar-with-dependencies</descriptorRef > </descriptorRefs > </configuration > <executions > <execution > <id > make-assembly</id > <phase > package</phase > <goals > <goal > single</goal > </goals > </execution > </executions > </plugin > </plugins > </build > </project >
2、编写scala代码 1)批处理 wordcount 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package cn.buildworld.flinkimport org.apache.flink.api.scala.{DataSet , ExecutionEnvironment }import org.apache.flink.api.scala._object WordCount { def main (args: Array [String ]): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment .getExecutionEnvironment val inputPath = "D:\\Java\\project\\Scala\\FlinkTrain\\src\\main\\resources\\hello.txt" val dataSet: DataSet [String ] = env.readTextFile(inputPath) val resultDataSet: DataSet [(String , Int )] = dataSet .flatMap(_.split(" " )) .map((_, 1 )) .groupBy(0 ) .sum(1 ) resultDataSet.print() } }
2)流处理wordcount 超级简单,比sparkstreaming的流式处理简单多了!!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import org.apache.flink.streaming.api.scala._object WordCountByStream { def main (args: Array [String ]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment env.setParallelism(6 ) val dataSet: DataStream [String ] = env.socketTextStream("192.168.162.102" , 7777 ) val resultDataSet = dataSet .flatMap(_.split(" " )) .filter(_.nonEmpty) .map((_, 1 )) .keyBy(0 ) .sum(1 ) resultDataSet.print() env.execute() } }
补充
1 2 3 4 5 6 import org.apache.flink.api.java.utils.ParameterTool val parameterTool: ParameterTool = ParameterTool .fromArgs(args)val host: String = parameterTool.get("host" )val port: Int = parameterTool.getInt("port" )
三、Flink 运行架构 1、Flink运行时组件
作业管理器(JobManager) 控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager 所控制执行。JobManager 会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的 JAR 包。JobManager 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager 上。而在运行过程中,JobManager 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
资源管理器(ResourceManager) 主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是 Flink 中 定义的处理资源单元。Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如 YARN、Mesos、K8s,以及 standalone 部署。当 JobManager 申请插槽资源时,ResourceManager会将有空闲插槽的 TaskManager 分配给 JobManager。如果 ResourceManager 没有足够的插槽来满足 JobManager 的请求,它还可以向资源提供平台发起会话,以提供启动 TaskManager进程的容器。另外,ResourceManager 还负责终止空闲的 TaskManager,释放计算资源。
任务管理器(TaskManager)
Flink 中的工作进程。通常在 Flink 中会有多个 TaskManager 运行,每一个 TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了 TaskManager 能够执行的任务数量。 启动之后,TaskManager 会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager 就会将一个或者多个插槽提供给 JobManager 调用。JobManager 就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个 TaskManager 可以跟其它运行同一应用程 序的 TaskManager 交换数据。
分发器(Dispatcher)
可以跨作业运行,它为应用提交提供了 REST 接口。当一个应用被提交执行时,分发器 就会启动并将应用移交给一个 JobManager。由于是 REST 接口,所以 Dispatcher 可以作为集 群的一个 HTTP 接入点,这样就能够不受防火墙阻挡。Dispatcher 也会启动一个 Web UI,用 来方便地展示和监控作业执行的信息。Dispatcher 在架构中可能并不是必需的,这取决于应 用提交运行的方式。
2、任务提交流程
3、任务调度原理 Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力,可以通过参数 taskmanager.numberOfTaskSlots 进行配置;而 并行度 parallelism 是动态概念 ,即 即 TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default进行配置。
四、Flink流处理API 1、三种不同方式读取数据 bin/kafka-topics.sh –zookeeper localhost:2181 –list
bin/kafka-topics.sh –zookeeper localhost:2181 –create –replication-factor 3 –partitions 1 –topic sensor
bin/kafka-console-producer.sh –broker-list 10.81.1.56:9092 –topic sensor
bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –from-beginning –topic sensor
1 sensor_1, 1547718199, 35.8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 case class SensorReading (id: String , timestamp: Long , temperature: Double )object SourceTest { def main (args: Array [String ]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val properties = new Properties () properties.setProperty("bootstrap.servers" , "localhost:9092" ) properties.setProperty("group.id" , "consumer-group" ) properties.setProperty("auto.offset.reset" , "latest" ) val stream3: DataStream [String ] = env.addSource(new FlinkKafkaConsumer011 [String ]("sensor" , new SimpleStringSchema (), properties)) val value: DataStream [String ] = stream3.flatMap(_.split("," )).filter(_.nonEmpty) value.print() env.execute() } }
2、自定义 Source 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class MySensorSource extends SourceFunction [SensorReading ] { var running: Boolean = true var num = 0 override def run (sourceContext: SourceFunction .SourceContext [SensorReading ]): Unit = { val rand = new Random () var curTemp = 1. to(10 ).map( i => ("sensor_" + i, 65 + rand.nextGaussian() * 20 ) ) while (running) { curTemp = curTemp.map( t => (t._1, t._2 + rand.nextGaussian()) ) val curTime: Long = System .currentTimeMillis() curTemp.foreach( t => sourceContext.collect(SensorReading (t._1, curTime, t._2)) ) num+=1 if (num == 5 ){ cancel() } Thread .sleep(1000 ) } } override def cancel (): Unit = { running = false } }
使用自定义Source 1 2 val stream4: DataStream [SensorReading ] = env.addSource(new MySensorSource ())stream4.print()
map flatMap Filter KeyBy 滚动聚合算子(Rolling Aggregation)
这些算子可以针对 KeyedStream 的每一个支流做聚合。
sum() min() max() minBy() maxBy()
Reduce
KeyedStream
→ → DataStream
:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
1 2 3 4 5 val resultStream = dataStream .keyBy("id" ) .reduce((curState, newData) => SensorReading (curState.id, newData.timestamp, curState.temperature.min(newData.temperature)) )
Split 和 和 Select
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 case class SensorReading (id: String , timestamp: Long , temperature: Double )val resultStream = dataStream .keyBy("id" ) .reduce((curState, newData) => SensorReading (curState.id, newData.timestamp, curState.temperature.min(newData.temperature)) ) val splitStream = resultStream.split(data => { if (data.temperature > 30 ) Seq ("high" ) else Seq ("low" ) }) val high = splitStream.select("high" )val low = splitStream.select("low" )val all = splitStream.select("high" , "low" )
Connect和CoMap
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 val warning: DataStream [(String , Double )] = high.map(sensorData => (sensorData.id, sensorData.temperature))val connected: ConnectedStreams [(String , Double ), SensorReading ] = warning.connect(low)val coMap: DataStream [Product ] = connected.map( warningData => (warningData._1, warningData._2, "WARNING" ), lowData => (lowData.id, lowData.temperature, "SAFE" ) ) coMap.print() 10 > (sensor_7,6.7 ,SAFE )8 > (sensor_6,15.4 ,SAFE )6 > (sensor_10,38.1 ,WARNING )7 > (sensor_1,30.8 ,WARNING )7 > (sensor_1,30.8 ,WARNING )7 > (sensor_1,30.8 ,WARNING )7 > (sensor_1,30.8 ,WARNING )7 > (sensor_1,30.8 ,WARNING )7 > (sensor_1,30.8 ,WARNING )
Union
Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。
Connect 只能操作两个流,Union 可以操作多个。
1 2 val unionStream: DataStream [SensorReading ] = high.union(low)unionStream.print("union:::" )
4、支持的数据类型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 val env = StreamExecutionEnvironment .getExecutionEnvironmentenv.setParallelism(1 ) val numbers: DataStream [Long ] = env.fromElements(1 L, 2 L, 3 L, 4 L)val res: DataStream [Long ] = numbers.map(n => n + 1 )res.print("相加:" ) val persons1: DataStream [(String , Int )] = env.fromElements( ("michong" , 25 ), ("lili" , 15 ) ) val res1: DataStream [(String , Int )] = persons1.filter(p => p._2 > 18 )val persons2: DataStream [Person ] = env.fromElements( Person ("MiChong" , 25 ), Person ("Lili" , 15 ) ) val res2: DataStream [Person ] = persons2.filter( p => p.age > 18 ) res2.print("成年人: " ) env.execute()
5、实现 UDF 函数——更细粒度的控制流 函数类(Function Classes) 1 2 3 4 5 6 7 val res2: DataStream [Person ] = persons2.filter(new MyFilter )class MyFilter extends FilterFunction [Person ] { override def filter (p: Person ): Boolean = { p.age > 18 } }
富函数(Rich Functions)
“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
生命周期
open()
方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。
close()
方法是生命周期中的最后一个调用的方法,做一些清理工作。
getRuntimeContext()
方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class MyFlatMap extends RichFlatMapFunction [Int , (Int , Int )] { var subTaskIndex = 0 override def open (parameters: Configuration ): Unit = { subTaskIndex = getRuntimeContext.getIndexOfThisSubtask } override def flatMap (in: Int , collector: Collector [(Int , Int )]): Unit = { if (in % 2 == subTaskIndex) { collector.collect((subTaskIndex, in)) } } override def close (): Unit = { } }
6、Sink
Flink的对外输出操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011 , FlinkKafkaProducer011 }case class SensorReading (id: String , timestamp: Long , temperature: Double )object SourceTest { def main (args: Array [String ]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val properties = new Properties () properties.setProperty("bootstrap.servers" , "10.12.42.174:9092" ) properties.setProperty("group.id" , "consumer-group" ) val stream3: DataStream [String ] = env.addSource(new FlinkKafkaConsumer011 [String ]("sensor" , new SimpleStringSchema (), properties)) val outputStream: DataStream [String ] = stream3.map( data => { var arr = data.split("," ) SensorReading (arr(0 ), arr(1 ).toLong, arr(2 ).toDouble).toString } ) outputStream.addSink(new FlinkKafkaProducer011 [String ]("10.12.42.174:9092" , "sensor_res" , new SimpleStringSchema ())) outputStream.print() env.execute() } }
1 2 3 4 5 6 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-kafka-0.11_2.12</artifactId > <version > 1.10.1</version > </dependency >
JDBC自定义sink
数据从Kafka获取,然后进行dataStream转换,最后将结果保存在mysql中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 import java.sql.{Connection , DriverManager , PreparedStatement }import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction , SinkFunction }import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011 , FlinkKafkaProducer011 }case class SensorReading (id: String , timestamp: Long , temperature: Double )object SourceTest { def main (args: Array [String ]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val properties = new Properties () properties.setProperty("bootstrap.servers" , "10.12.42.174:9092" ) properties.setProperty("group.id" , "consumer-group" ) val stream3: DataStream [String ] = env.addSource(new FlinkKafkaConsumer011 [String ]("sensor" , new SimpleStringSchema (), properties)) val outputStream: DataStream [SensorReading ] = stream3.map( data => { var arr = data.split("," ) SensorReading (arr(0 ), arr(1 ).toLong, arr(2 ).toDouble) } ) outputStream.addSink(new MyJdbcSink ()) env.execute() } } class MyJdbcSink ( ) extends RichSinkFunction [SensorReading ] { var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ override def open (parameters: Configuration ): Unit = { conn = DriverManager .getConnection("jdbc:mysql://10.12.42.174/flink" , "root" , "root" ) insertStmt = conn.prepareStatement("insert into sensor_temp(id,temp) values (?,?)" ) updateStmt = conn.prepareStatement("update sensor_temp set temp =? where id = ? " ) } override def invoke (value: SensorReading , context: SinkFunction .Context [_]): Unit = { updateStmt.setDouble(1 , value.temperature) updateStmt.setString(2 , value.id) updateStmt.execute() if (updateStmt.getUpdateCount == 0 ) { insertStmt.setString(1 , value.id) insertStmt.setDouble(2 , value.temperature) insertStmt.execute() } } override def close (): Unit = { insertStmt.close() updateStmt.close() conn.close() } }
五、Flink中的Window
Window是一种切割无限数据 为有限块 进行处理的手段。
1、Window类型
CountWindow:按照指定的数据条数生成一个window,和时间没有关系
TimeWindow:按照时间生成window
2、窗口实现原理的不同分成三类 1)滚动窗口(Tumbling Windows)
将数据依据固定的窗口长度
对数据进行切片
。
特点:时间对齐,窗口长度固定,没有重叠。
2)滑动窗口(Sliding Windows)
滑动窗口由固定的窗口长度
和滑动间隔
组成。
特点:时间对齐,窗口长度固定,可以有重叠
3)会话窗口(Session Windows)
由一系列事件组合一个指定时间长度的 timeout
间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
特点:时间无对齐。
3、Window API TimeWindow 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import org.apache.flink.streaming.api.windowing.time.Time val outputStream: DataStream [SensorReading ] = stream3.map( data => { val arr = data.split("," ) SensorReading (arr(0 ), arr(1 ).toLong, arr(2 ).toDouble) } ) outputStream .map(data => (data.id, data.temperature)) .keyBy(_._1) .timeWindow(Time .seconds(10 )) .window(SlidingEventTimeWindows .of(Time .seconds(15 ),Time .seconds(5 ))) .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3))
CountWindow 1 2 3 4 5 outputStream .map(data => (data.id, data.temperature)) .keyBy(_._1) .countWindow(5 ) .reduce((curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3))
4、window function
增量聚合函数(incremental aggregation functions) 每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。
全窗口函数(full window functions) 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。
六、时间语义与 Wartermark 在 Flink 的流式处理中,绝大部分的业务都会使用 eventTime
1 2 3 4 val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic .EventTime )
Watermark
Watermark 是一种衡量 Event Time 进展的机制。
Watermark 是用于处理乱序事件的 ,而正确的处理乱序事件,通常用Watermark 机制结合 window 来实现。
数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的。
Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。
七、ProcessFunction API(底层 API)
Process Function 用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window 函数和转换算子无法实现)。
Flink 提供了 8 个 Process Function: • ProcessFunction • KeyedProcessFunction • CoProcessFunction • ProcessJoinFunction • BroadcastProcessFunction • KeyedBroadcastProcessFunction • ProcessWindowFunction • ProcessAllWindowFunction
TimerService 和 定时器(Timers)案例
监控温度传感器的温度值,如果温度值在 10 秒钟之内(processing time)连续上升,则报警
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 import cn.buildworld.flink.processfunc.bean.SensorReading;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;public class ProcessFunction_App { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); DataStreamSource<String> inputStream = env.socketTextStream("localhost" , 7777 ); DataStream<SensorReading> dataStream = inputStream.map(line -> { String[] fields = line.split("," ); return new SensorReading(fields[0 ], new Long(fields[1 ]), new Double(fields[2 ])); }); dataStream.keyBy("id" ) .process(new TempConsIncreWarning(10 )) .print(); env.execute(); } public static class TempConsIncreWarning extends KeyedProcessFunction <Tuple , SensorReading , String > { private Integer interval; private ValueState<Double> lastTempState; private ValueState<Long> timerTsState; public TempConsIncreWarning (Integer interval) { this .interval = interval; } @Override public void open (Configuration parameters) throws Exception { lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp" , Double.class, Double.MIN_VALUE)); timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("time-ts" , Long.class)); } @Override public void processElement (SensorReading sensorReading, Context context, Collector<String> collector) throws Exception { Double lastTemp = lastTempState.value(); Long timerTs = timerTsState.value(); lastTempState.update(sensorReading.getTemperature()); if (sensorReading.getTemperature() > lastTemp && timerTs == null ) { Long ts = context.timerService().currentProcessingTime() + interval * 1000L ; context.timerService().registerProcessingTimeTimer(ts); timerTsState.update(ts); System.out.println("温度上升" ); } else if (sensorReading.getTemperature() < lastTemp && timerTs != null ) { context.timerService().deleteProcessingTimeTimer(timerTs); timerTsState.clear(); System.out.println("温度下降" ); } } @Override public void onTimer (long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { out.collect("传感器:" + ctx.getCurrentKey().getField(0 ) + "温度值连续" + interval + "秒上升" + ",当前温度为:" + lastTempState.value()); timerTsState.clear(); } @Override public void close () throws Exception { lastTempState.clear(); } } }
侧输出流(SideOutput)
案例:用来监控传感器温度值,将温度值低于 30 度的数据输出到 side output。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class ProcessFunction_SideOutputCase { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); DataStreamSource<String> inputStream = env.socketTextStream("localhost" , 7777 ); DataStream<SensorReading> dataStream = inputStream.map(line -> { String[] fields = line.split("," ); return new SensorReading(fields[0 ], new Long(fields[1 ]), new Double(fields[2 ])); }); final OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp" ) { }; SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.keyBy("id" ) .process(new ProcessFunction<SensorReading, SensorReading>() { @Override public void processElement (SensorReading sensorReading, Context context, Collector<SensorReading> collector) throws Exception { if (sensorReading.getTemperature() < 30 ) { context.output(lowTempTag, sensorReading); } else { collector.collect(sensorReading); } } }); DataStream<SensorReading> lowTempStream = highTempStream.getSideOutput(lowTempTag); highTempStream.print("high" ); lowTempStream.print("low" ); env.execute(); } }
八、状态编程和容错机制
流式计算分为有状态
和无状态
两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果(无状态流处理 每次只转换一条输入记录,并且仅根据最新的输入记录输出结果)。有状态的计算则会基于多个事件输出结果(有状态流处理 维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个事件之后的结果。)。
1、Flink检查点算法–检查点分界线(Checkpoint Barrier)
Flink 的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,用来把一条流上数据按照不同的检查点分开。
分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中。
2、保存点(Savepoints)
Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints)
原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点
Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作
保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等
3、容错机制配置项 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(300 ); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000L ); env.getCheckpointConfig().setMaxConcurrentCheckpoints(2 ); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L ); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3 , 10000L )); env.setRestartStrategy(RestartStrategies.failureRateRestart(3 , Time.minutes(10 ),Time.minutes(1 )));
4、状态一致性分类
Flink 的一个重大价值在于, 它既保证了 exactly-once
,也具有低延迟
和高吞吐力
的处理能力。
5、Flink和kafka实现端到端的 exactly-once 语义
九、Table API和Flink SQL 1、引入pom依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 <dependencies > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-java</artifactId > <version > 1.10.1</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-streaming-java_2.12</artifactId > <version > 1.10.1</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-planner_2.12</artifactId > <version > 1.10.1</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-table-planner-blink_2.12</artifactId > <version > 1.10.1</version > </dependency > </dependencies >
2、简单的例子 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); DataStreamSource<String> inputStream = env.readTextFile("D:\\Java\\project\\Flink_Java\\src\\main\\resources\\sensor.txt" ); DataStream<SensorReading> dataStream = inputStream.map( line -> { String[] fields = line.split("," ); return new SensorReading(fields[0 ], new Long(fields[1 ]), new Double(fields[2 ])); } ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table dataTable = tableEnv.fromDataStream(dataStream); Table resTable = dataTable.select("id,temperature" ) .where(" id = 'sensor_1'" ); tableEnv.createTemporaryView("sensor" , dataTable); String sql = "select id,temperature from sensor where id = 'sensor_1'" ; Table resultSqlTable = tableEnv.sqlQuery(sql); tableEnv.toAppendStream(resTable, Row.class).print("result" ); tableEnv.toAppendStream(resultSqlTable, Row.class).print("resultSql" ); env.execute();
3、新老版本的流、批处理方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); EnvironmentSettings oldStreamSettings = EnvironmentSettings .newInstance() .useOldPlanner() .inStreamingMode() .build(); StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv); EnvironmentSettings blinkStreamSettings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings); EnvironmentSettings blinkBatchSettings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings); env.execute();
4、连接外部系统创建一张表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String filePath = "D:\\Java\\project\\Flink_Java\\src\\main\\resources\\sensor.txt" ; tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) .withSchema(new Schema() .field("id" , DataTypes.STRING()) .field("timestamp" ,DataTypes.BIGINT()) .field("temp" ,DataTypes.DOUBLE()) ).createTemporaryTable("inputTable" ); Table inputTable = tableEnv.from("inputTable" ); inputTable.printSchema(); tableEnv.toAppendStream(inputTable, Row.class).print(); env.execute();
5、查询转换 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 / 3 查询转换 Table resultTable = inputTable.select("id,temperature" ) .filter("id = 'sensor_6'" ); Table aggTable = inputTable.groupBy("id" ) .select("id,id.count as count,temperature.avg as avgTemp" ); Table sqlTable = tableEnv.sqlQuery("select id,temperature from inputTable where id = 'sensor_6'" ); Table sqlQuery = tableEnv.sqlQuery("select id,count(id) as cnt,avg(temperature) as avgTemp from inputTable group by id" ); tableEnv.toAppendStream(resultTable,Row.class).print("resultTable" ); tableEnv.toRetractStream(aggTable,Row.class).print("aggTable" ); tableEnv.toAppendStream(sqlTable,Row.class).print("sqlTable" ); tableEnv.toRetractStream(sqlQuery,Row.class).print("sqlQuery" ); String outputFilePath = "D:\\Java\\project\\Flink_Java\\src\\main\\resources\\output_sensor.txt" ; tableEnv.connect(new FileSystem().path(outputFilePath)) .withFormat(new Csv()) .withSchema(new Schema() .field("id" , DataTypes.STRING()) .field("temperature" , DataTypes.DOUBLE())) .createTemporaryTable("outputTable" ); Table outputTable = tableEnv.from("outputTable" ); resultTable.insertInto("outputTable" );
6、Table&&Kafka 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.connect(new Kafka() .version("0.11" ) .topic("sensor" ) .property("zookeeper.connect" , "10.12.42.174:2181" ) .property("bootstrap.servers" , "10.12.42.174:9092" )) .withFormat(new Csv()) .withSchema(new Schema() .field("id" , DataTypes.STRING()) .field("timestamp" , DataTypes.BIGINT()) .field("temp" , DataTypes.DOUBLE())) .createTemporaryTable("inputTable" ); Table sensorTable = tableEnv.from("inputTable" ); Table resultTable = sensorTable.select("id,temp" ) .filter("id = 'sensor_6'" ); Table aggTable = sensorTable.groupBy("id" ) .select("id,id.count as count,temp.avg as avgTemp" ); tableEnv.connect(new Kafka() .version("0.11" ) .topic("flink" ) .property("zookeeper.connect" , "10.12.42.174:2181" ) .property("bootstrap.servers" , "10.12.42.174:9092" )) .withFormat(new Csv()) .withSchema(new Schema() .field("id" , DataTypes.STRING()) .field("temp" , DataTypes.DOUBLE())) .createTemporaryTable("outputTable" ); resultTable.insertInto("outputTable" ); env.execute();
7、更新模式
对于流式查询,需要声明如何在表和外部连接器之间执行转换
与外部系统交换的消息类型,由更新模式(Update Mode)指定
8、输出到MySQL 1 2 3 4 5 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-jdbc_2.12</artifactId > <version > 1.10.1</version > </dependency
1 2 3 4 5 6 7 8 9 10 11 12 13 String sinkDDL = "create table jdbcOutputTable (" + " id varchar(20) not null, " + " avgTemp double not null " + ") with (" + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://localhost:3306/flink', " + " 'connector.table' = 'sensor_count', " + " 'connector.driver' = 'com.mysql.jdbc.Driver', " + " 'connector.username' = 'root', " + " 'connector.password' = 'root' )" ; tableEnv.sqlUpdate(sinkDDL); aggTable.insertInto("jdbcOutputTable" )
9、动态表和持续查询
流被转换为动态表
对动态表计算连续查询,生成新的动态表
生成的动态表被转换回流
10、Group Windows
滚动窗口(Tumbling windows)– 滚动窗口要用 Tumble
类来定义
1 2 3 4 5 6 .window(Tumble.over("10.minutes" ).on("rowtime" ).as("w" )) .window( Tumble.over(" 10.minutes " ).on(" proctime " ).as("w" )) .window( Tumble.over(" 10.rows " ).on(" proctime " ).as("w" ))
滑动窗口(Sliding windows)– 滑动窗口要用 Slide
类来定义
1 2 3 4 5 6 .window(Slide.over("10.minutes" ).every("5.minutes" ).on("rowtime" ).as("w" )) .window(Slide.over("10.minutes" ).every("5.minutes" ).on("proctime" ).as("w" )) .window(Slide.over("10.rows" ).every("5.rows" ).on("proctime" ).as("w" ))
11、自定义UDF 标量函数(Scalar Functions)
• 用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值 • 为了定义标量函数,必须在 org.apache.flink.table.functions
中扩展基类ScalarFunction
,并实现(一个或多个)求值(eval)方法 • 标量函数的行为由求值方法决定,求值方法必须公开声明并命名为 eval
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public class ScalarFunctionTest { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1 ); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String filePath = "D:\\Java\\project\\Flink_Java\\src\\main\\resources\\sensor.txt" ; DataStream<String> inputStream = env.readTextFile(filePath); DataStream<SensorReading> dataStream = inputStream.map(line -> { String[] fields = line.split("," ); return new SensorReading(fields[0 ], new Long(fields[1 ]), new Double(fields[2 ])); }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2 )) { @Override public long extractTimestamp (SensorReading element) { return element.getTimestamp() * 1000L ; } }); Table sensorTable = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature as temp" ); HashCode hashCode = new HashCode(21 ); tableEnv.registerFunction("hashCode" ,hashCode); Table resultTable = sensorTable.select("id,ts,hashCode(id)" ); tableEnv.createTemporaryView("sensor" ,sensorTable); Table resultSqlTable = tableEnv.sqlQuery("select id,ts,hashCode(id) from sensor" ); tableEnv.toAppendStream(resultTable, Row.class).print("resultTable" ); tableEnv.toRetractStream(resultSqlTable, Row.class).print("resultSqlTable" ); env.execute(); } public static class HashCode extends ScalarFunction { private int factor = 13 ; public HashCode (int factor) { this .factor = factor; } public int eval (String s) { return s.hashCode() * factor; } } }
表函数(Table Functions)
• 用户定义的表函数,也可以将0、1或多个标量值作为输入参数;与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值 • 为了定义一个表函数,必须扩展 org.apache.flink.table.functions
中的基类TableFunction
并实现(一个或多个)求值方法 • 表函数的行为由其求值方法决定,求值方法必须是 public
的,并命名为 eval
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public static class Split extends TableFunction <Tuple2 <String , Integer >> { private String separator = "," ; public Split (String separator) { this .separator = separator; } public void eval ( String str ) { for ( String s: str.split(separator) ){ collect(new Tuple2<>(s, s.length())); } } } Split split = new Split("_" ); tableEnv.registerFunction("split" , split); Table resultTable = sensorTable .joinLateral("split(id) as (word, length)" ) .select("id, ts, word, length" ); tableEnv.createTemporaryView("sensor" , sensorTable); Table resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length " + " from sensor, lateral table(split(id)) as splitid(word, length)" );
聚合函数(Aggregate Functions)
AggregateFunction 的工作原理如下:
首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;可以通过调用 createAccumulator() 方法创建空累加器 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器 处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public static class AvgTemp extends AggregateFunction <Double , Tuple2 <Double , Integer >> { @Override public Double getValue (Tuple2<Double, Integer> accumulator) { return accumulator.f0 / accumulator.f1; } @Override public Tuple2<Double, Integer> createAccumulator () { return new Tuple2<>(0.0 , 0 ); } public void accumulate ( Tuple2<Double, Integer> accumulator, Double temp ) { accumulator.f0 += temp; accumulator.f1 += 1 ; } } AvgTemp avgTemp = new AvgTemp(); tableEnv.registerFunction("avgTemp" , avgTemp); Table resultTable = sensorTable .groupBy("id" ) .aggregate( "avgTemp(temp) as avgtemp" ) .select("id, avgtemp" );
表聚合函数(Table Aggregate Functions)
TableAggregateFunction 的工作原理如下: – 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 createAccumulator() 方法可以创建空累加器。 – 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。 – 处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果。