主要的提交流程重点:
提交描述:
dispather负责接受flink客户端的命令,并转发给jobmater;jobmater主要负责把jobgraph转化为executiongraph;flink中resourcemanager区别于Yarn中的resourcemanager,主要向Yarn的RM申请资源通信组件: akka和netty,akka主要用于组件间的简单通信,比如维持心跳等;而netty主要用于task间的数据传输等大数据量场景
readtextfile,fromCollectionmap,flatMap,reduce,aggerate,fiter,minByprint,outputunion:合并多条具有相同数据类型的流
connect:合并两条任意类型的流
slot共享:减少了所需slot的数量,提高了资源的利用率。比如现在有一个2G的slot,在flink中slot是最小的资源分配单位,并且内存资源相互隔离,不同的slot间资源不能借调,但共享cpu资源;现在有task需要1G资源才能运行,假设task并行度为2,如果每个task单独分配slot的话,需要2个2G的slot,而使用slot共享的话,只需要一个2G的slot,资源利用率就提高了。
java自动换行:关放大阅读展开代码/* *水印单调不减 */ @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } /* *水印周期性发送,默认200ms发送一次 */ @Override public void onPeriodicEmit(WatermarkOutput output) { // 水印为当前的最大时间戳并考虑数据的乱序容忍度 output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); }
下游总是取当前收到的水印中最小值。
<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))allowedLateness(Time.seconds(3)).withIdleness(Duration.ofSeconds(15));, 在多个并行度的算子间watermark传递时,每个并行度(task)只会取收到的最小watermark,当上游的某个task出现异常导致水印无法推进,可能会导致下游的窗口无法触发计算; 此时可以手动为task赋闲。end - 1才触发窗口计算[start,end), 以滚动窗口为例,start为窗口长度的整数倍并向下取整,end = start + size时间为end - 1end -1end - 1 - orderness, orderness为乱序容忍度,分发水印时指定的sql自动换行:关放大阅读展开代码SELECT * FROM TABLE( CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
java自动换行:关放大阅读展开代码public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.socketTextStream("hadoop102",9899) // 监听输入的数据 a 1 转为 Tuple2《a,1》 .map(new MapFunction<String, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] line = value.split(","); return new Tuple2<>(line, Integer.parseInt(line) ); } }) // watermark分发 .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forMonotonousTimestamps() .withTimestampAssigner((e,t) -> e.f1 ) ) // 按照 tuple的 f0 ;即元素分组 .keyBy(e -> e.f0) // 滚动20秒的滚动窗口 .window(TumblingEventTimeWindows.of(Time.minutes(20))) // 设定触发器为2秒触发一次计算 .trigger(ContinuousEventTimeTrigger.of(Time.seconds(2))) // 增量累加reduce .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { value1.f1 = value1.f1 + value2.f1; return value1; } }) .print(); env.execute(); }
| 状态名 | 状态类型 | 描述 | 状态恢复和状态分发 |
|---|---|---|---|
| listState | 算子状态,键控状态 | 一组状态的集合 | 下游的所有task平均分配恢复后的状态 |
| unionListState | 算子状态 | 一组状态的集合 | 下游的每个task全量拥有恢复的一份状态 |
| broadcastState | 算子状态 | 广播类型的状态 | 分发给下游的每个算子一份状态 |
| mapState | 键控状态 | k-v类型状态集合 | 每个key恢复之前的状态 |
| valueState | 算子状态,键控状态 | 单值类型状态 | 恢复之前的状态 |
| reduceState | 键控状态 | 聚合类型状态 | 恢复每个key之前聚合状态 |
| aggregatingState | 键控状态 | 聚合类型状态 | 恢复每个key之前的聚合状态 |
flink 1.13版本后,hashmap存储在taskmanager的堆内内存中,而rockdb状态后端存储在Rocksdb(task manager的堆外内存的管理内存中)。
checkpoin的位置(比如hdfs上)由checkpoint配置单独指定,区别于1.13版本前的状态后端和checkpoint混合的情况。
Checkpoint (ck):默认flink保存在JobManager的堆内存中,但当JM的进程fail,也会丢失数据;故一般需要指定外部的持久化设备,比如hdfs(可以通过flink的执行环境设置),会周期自动执行ck。
Savepoint (sv):为flink提供的手动模式,通过flink命令备份。
shell自动换行:关放大阅读展开代码bin/flink savepoint jobid 路径
也可通过flink的命令从savepoint恢复数据:
shell自动换行:关放大阅读展开代码bin/flink run -c 程序主类名 -s hdfs://hadoop102:8020/flink/fs/aa62c578adb3c37056280e0c74f3d689/chk-9 jar包
使用异步的分布式快照算法。
ck异步体现: task在ck时,并不堵塞task执行。
ck快照:根据ck的方式,有hashmap和rockdb两种方式,快照保存有全量和增量之分。
barrier对齐:算子在接受多个上游的barrier时,可能barrier有先后顺序到达,最先发送barrier给下游的上游算子的数据将会被下游缓存,等待所有上游算子barrier到齐,触发ck。
barrier不对齐:不管barrier是否对齐,下游处理继续处理上游数据,当晚到的barrier到时,触发ck。此次快照保留部分下次ck的数据,当fail回滚时,数据会被重复消费。
可以通过配置flink_conf.xml的配置文件对task的重启策略进行配置。当没有开启checkpoint时,task的重启策略失效;当开启了ck,但没有指定重启策略,默认使用
Fixed Delay Restart Strategy。
当task执行失败,开启checkpoint后会自动重试,有以下几种重试策略:
开启checkpoint。
以kafka - flink - kafka传输为例:
当第一阶段预提交失败:
回滚到上一条数据预提交成功的状态。当第二次提交失败:
从第一阶段预提交成功的位置,接续进行第二阶段;能够从预提交的位置继续正式提交得益于上一轮ck完成后,jobmanager做了一次完整的ckeckpoint。
实时项目中具体配置
数据热点问题webUI查看:
miniBatch开启
对一些实时要求不高的指标,可以可用微批次功能,细节为使用状态存储数据,当批次达到处理时机才处理。
java自动换行:关放大阅读展开代码// 初始化table environment TableEnvironment tEnv = ... // 获取 tableEnv的配置对象 Configuration configuration = tEnv.getConfig().getConfiguration(); // 设置参数: // 开启miniBatch configuration.setString("table.exec.mini-batch.enabled", "true"); // 批量输出的间隔时间 configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条 configuration.setString("table.exec.mini-batch.size", "20000");
LocalGlobal(两阶段聚合)开启 针对一些普通聚合场景,比如单独的sum,count等,可能在聚合时,出现数据热点(数据倾斜)。解决手段可以localAgg(第一阶段预聚合),globalAgg(最终聚合)减轻最终聚合task的压力。
java自动换行:关放大阅读展开代码// 初始化table environment TableEnvironment tEnv = ... // 获取 tableEnv的配置对象 Configuration configuration = tEnv.getConfig().getConfiguration(); // 设置参数: // 开启miniBatch *******使用localglobal必须开启minibatch功能 configuration.setString("table.exec.mini-batch.enabled", "true"); // 批量输出的间隔时间 configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条 configuration.setString("table.exec.mini-batch.size", "20000"); // 开启LocalGlobal configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
SplitDistinct 场景当出现count(distinct)类似需求时,可能使用LocalGlobal效果不明显,则可以使用splitdistinct。
sql自动换行:关放大阅读展开代码-- 按照a分组,统计b个数 -- 数据热点问题写法 SELECT a, COUNT(DISTINCT b) FROM T GROUP BY a; -- 手动sql调整写法 SELECT a, SUM(cnt) FROM ( SELECT a, COUNT(DISTINCT b) as cnt FROM T GROUP BY a, MOD(HASH_CODE(b), 1024) ) GROUP BY a;
java自动换行:关放大阅读展开代码// 初始化table environment TableEnvironment tEnv = ... // 获取 tableEnv的配置对象 Configuration configuration = tEnv.getConfig().getConfiguration(); // 设置参数:(要结合minibatch一起使用) // 开启Split Distinct configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); // 第一层打散的bucket数目 configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
shell自动换行:关放大阅读展开代码# flink-conf.yaml jobmanager.memory.process.size: 2048m taskmanager.memory.process.size: 4096m
一般使用kafka作为数据源,并设置source算子的并行度为kafka的topic的分区数,这样可以最大程度的避免数据丢失和数据倾斜问题。
shell自动换行:关放大阅读展开代码bin/flink run \ -d \ -t yarn-per-job \ -p 2 \ -Drest.flamegraph.enabled=true \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dtaskmanager.memory.process.size=4096m \ -c com.atguigu.bigdata.tune.BackpressureApp \ /opt/testjars/flink_tuning-1.0-SNAPSHOT.jar
is_new标识用户是否为新用户,可能出现为老用户但识别我新用户的情况;使用键控状态valueState存储每个用户的首次登录的登录日期,此时出现大状态问题。join 订单表 取下单用户,flink默认join上的数据保留在状态中,时间一长,出现了大状态,有2G左右状态。2)在不影响业务情况下,flinkSQL设置了表的ttl,到期后自动清理状态。yaml自动换行:关放大阅读展开代码#设置为机械硬盘+内存模式 state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM
java自动换行:关放大阅读展开代码// 设置ttl 为5s,join上的数据存储在状态后5s清理 tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5));
可以使用数据重分发算子比如shuffle(数据随机离散均匀分配),rescale(单个taskmanager的子任务均匀分配),rebalance(整个子任务间数据均匀分配)。
LocalGlobal大概原理:
类似于hadoop中combine和reduce,提前进行一次数据的预聚合,减少后续每个分组global聚合的数据量。
单表聚合倾斜代码
解决思路:
keyBy前的数据倾斜可以从flink的webUI界面的每个Task接受数据大小判断,一般数据相差20倍以上,就称为数据倾斜。
使用了flatmap攒批次再keyBy之后:
第一阶段聚合:key拼接随机数前缀或后缀,进行keyby、开窗、聚和
第二阶段:按照原来的key及windowEnd作keyby、聚合
keyBy后窗口聚合数据倾斜代码
解决思路:
使用了二次聚合之后:
当metrics中floatingBuffersUsage 高、exclusiveBuffersUsage 低为有倾斜,因为少数 channel 占用了大部分的 Floating Buffer
kafka的消费吞吐问题,但是我们使用kafka的消费者消费topic的数据发现能够正常消费到,最终排除kafka问题;手动执行savapoint进行flink作业的存档,并禁用了算子链优化,并开启了火焰图,观察每个算子的一些指标,定位了出现反压的问题代码calcite的动态数据管理框架,提供的sql的解析,验证,根据编写的流处理或者批处理判断使用的优化手段,得到DataStream API Transformation| Component | Description |
|---|---|
| Framework Heap Memory | 框架堆内内存,128M,JVM Heap memory dedicated to Flink framework (advanced option) |
| Task Heap Memory | Task堆内内存,JVM Heap memory dedicated to Flink application to run operators and user code |
| Managed memory | 管理内存,Native memory managed by Flink, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend |
| Framework Off-heap Memory | 框架堆外内存,128M,Off-heap direct (or native) memory dedicated to Flink framework (advanced option) |
| Task Off-heap Memory | 默认0,表示不使用堆外内存,Off-heap direct (or native) memory dedicated to Flink application to run operators |
| Network Memory | 网络内存,Direct memory reserved for data record exchange between tasks (e.g. buffering for the transfer over the network),This memory is used for allocation of network buffers |
| JVM metastore | JVM元空间256m,Metaspace size of the Flink JVM process |
| JVM Overhead | JVM执行开销,taskmanager memory * 0.1 ,Native memory reserved for other JVM overhead: e.g. thread stacks, code cache, garbage collection space etc |
flink-conf.yaml:服务器配置所有的 flink 程序的默认并行度StreamExecutionEnvironment 设置任务的全局并行度
java自动换行:关放大阅读展开代码StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4);
java自动换行:关放大阅读展开代码public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.fromData("world", "hello", "world","data","data","hello") .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value, 1); } }).setParallelism(3) .keyBy(tup -> tup.f0) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce( Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }) .print(); env.execute(); }
shell自动换行:关放大阅读展开代码flink -c xxx.xxx.Main xxx/xxx/xx.jar -p 4
测试程序
java自动换行:关放大阅读展开代码public class MaxParallelism { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); System.setProperty("HADOOP_USER_NAME", "atguigu"); // checkpoint设置 env.enableCheckpointing(1000); // 其他可选配置如下: env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); // env.getCheckpointConfig().enableUnalignedCheckpoints(); env.getCheckpointConfig().setExternalizedCheckpointCleanup( org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/flink_tolerance"); env.setMaxParallelism(512); // 初始设置为 512 env.socketTextStream("hadoop102", 9999) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value, 1); } }) .keyBy(tup -> tup.f0) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce( Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }) .print(); env.execute(); } }
测试步骤
此处以 yarn-session 模拟测试,启动 yarn-session 模式
shell自动换行:关放大阅读展开代码./bin/flink run -t yarn-session
打包程序并提交
shell自动换行:关放大阅读展开代码./bin/flink run -t yarn-session -c cn.hedeoer.chapter01.physicalpartitioning.MaxParallelism ./job-jars/flink-1.0-SNAPSHOT.jar
日志打印结果:
创建 savepoint:
shell自动换行:关放大阅读展开代码bin/flink savepoint 6bbe6ab947c53044f68c070eca495c1f hdfs://hadoop102:8020/savepoint -yid application_1727501769142_0001
修改程序算子的最大并行度:
java自动换行:关放大阅读展开代码// 之前为512 env.setMaxParallelism(520);
打包上传并从 savepoint 恢复任务:
报错:关键日志如下
plaintext自动换行:关放大阅读展开代码Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint state and new program. Cannot map operator cbc357ccb763df2852fee8c4fc7d55f2 with max parallelism 512 to new program with max parallelism 520. This indicates that the program has been changed in a non-compatible way after the checkpoint/savepoint.
原因:flink 程序回滚恢复 savepoint 数据时,由于 savepoint 前后算子最大并行度的不一致,导致无法读取 checkpoint 的元数据,最终恢复任务失败!
简述:算子的最大并行度在没有用户设置的情况下具有默认值(根据每个算子的并行度计算得出),算子的最大并行度是 flink 的任务在上线后续如果有扩容(指提高算子并行度)的情况下,提供状态容错和恢复的保障。相关源码可以参考类:org.apache.flink.runtime.state.KeyGroupRangeAssignment
启示:算子的最大并行度需要在首次编程时就严格考虑,上线程序后不能更改,区别于算子的并行度。
上游算子每个子任务均匀向下游任务的部分子任务轮询发送数据。保证了部分子任务的数据均衡。是一种优化的重平衡分区,数据按照上下游任务的并行度比例进行分配,使得数据尽量保持本地性(同一个taskmanager)。减少了不同taskmanager间的网络通信传输重新分配数据。
代码:
java自动换行:关放大阅读展开代码public class RescaleExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.addSource(new MyAcSource()).rescale() .map(new RichMapFunction<DataModel, DataModel>() { @Override public DataModel map(DataModel value) throws Exception { int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); value.setTransSubTaskName("TransTask" + indexOfThisSubtask); return value; } }).setParallelism(4) .print(); env.execute(); } }
效果:
上游的每个子任务向下游的所有子任务轮询发送数据。将数据均匀地随机分配到所有下游并行子任务中,确保每个子任务处理的数据量相对均衡。
java自动换行:关放大阅读展开代码public class RebalanceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.addSource(new MyAcSource()).rebalance() .map(new RichMapFunction<DataModel, DataModel>() { @Override public DataModel map(DataModel value) throws Exception { int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); value.setTransSubTaskName("TransTask" + indexOfThisSubtask); return value; } }).setParallelism(4) .print(); env.execute(); } }
效果:
实际应用:
数据打乱发送下游。
上游子任务和下游子任务之间的数据传输是一对一的关系。是 flink 算子默认的数据分配策略,如果不满足 forward 的条件,flink 会选择 rebalance 策略。
算子链形成条件:
可以通过调用算子方法实现算子链的断开,或开启新的算子链。
java自动换行:关放大阅读展开代码// 当数据必须保持严格顺序处理,且上下游并行度相同时 public class ForwardExample { public static void main(String[] args) throws Exception { Configuration config = new Configuration(); config.setString("rest.port", "8081"); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(config); env.setParallelism(2); env.addSource(new MyAcSource()) .map(new RichMapFunction<DataModel, DataModel>() { @Override public DataModel map(DataModel value) throws Exception { int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); value.setTransSubTaskName("TransTask" + indexOfThisSubtask); return value; } }) .print().disableChaining(); env.execute(); } }
WebUI 图:
将数据复制并发送到所有下游并行子任务,每个子任务都接收到所有数据。
应用场景:大小流的双流 join 场景。
根据指定的 key 对数据流进行哈希分区,确保相同 key 的数据被发送到同一个下游子任务。
本文作者:hedeoer
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!