首页标签分类
11Flink
2022-12-19 · 更新 2026-03-03约 19 分钟 · 5212 字
大数据杂文记
000

目录

Flink
Flink的部署模式
Flink的基本架构和任务提交流程
Flink的基本架构
Flink的任务提交流程
常用算子及比较
合流算子
算子链
Slot的共享
Watermark
Flink的时间语义
Watermark的理解
窗口
Barrier对齐和不对齐
Task的重试策略
数据一致性
Flink的一致性保障
端到端一致性
Flink两阶段提交流程
CEP编程
Flink的优化
FlinkSQL优化
Flink资源优化
大状态调优
Flink数据倾斜
数据源数据倾斜
keyBy前单表聚合数据倾斜
keyBy后的窗口聚合计算数据倾斜
Flink的反压
反压可能原因
反压的一般现象
定位手段
Flink SQL
Flink的Join
Stream API
SQL API
Flink和Spark Streaming比较
FlinkSQL的工作机制
Flink内存模型
Taskmanager内存模型
杂记
Flink算子的并行度如何设置?
Flink可以设置算子并行度地方:
如何合理设置并行度
Flink算子的最大并行度
实验
原因讲解
Flink数据物理分区策略
rescale
rebalance
shuffle
forward
broadcast
keyBy

Flink

Flink的部署模式

  1. standlone模式
    flink独立集群模式,需要单独搭建
  2. yarn模式
    • session
      启动一个Flink集群,向Yarn申请资源,多个Job共用一个App,需要手动提前启动flink集群。application任务常驻在yarn中,除非手动停止。
    • per-job(项目使用模式)
      一个job就是一个yarn的application,job之间独立,提交任务时启动集群,任务执行结束释放资源关闭 application。
    • application
      和per-job模式比较,主要在于main方法的执行位置不同,此时main在client执行,将用户编写的streamgraph转化为jobgraph;application模式下,main在JobMaster中执行,相比于 per-job 模式,减少了上传从 client 上传资源到 jobmaster 的网络开销。

Flink的基本架构和任务提交流程

Flink的基本架构

  • flink的架构主要包括Flink client,JobManager,TaskManager
  • client负责提交任务参数的解析,比如并行度的设置,TaskManager的slot数指定等,并将StreamGraph转化为JobGraph
  • JobManger主要负责将JobGraph转化为ExecutionGraph,并对Subtask进行调度等
  • TaskManager主要负责task的执行,Checkpoint等

Flink的任务提交流程

主要的提交流程重点:

  • 提交参数的解析
  • streamgraph,jobgraph,executiongraph的转化
  • subtask任务的调度

提交描述:

  • 使用flink的application模式提交flink作业,建立flink任务提交客户端,解析提交任务参数,并把streamgraph转化为jobgraph
  • 在ApplicationMaster中建立JobManager进行,其中有三个重要组件,dispather,jobmaster,resourcemanager。dispather负责接受flink客户端的命令,并转发给jobmater;jobmater主要负责把jobgraph转化为executiongraph;flink中resourcemanager区别于Yarn中的resourcemanager,主要向Yarn的RM申请资源
  • 当首次申请资源完毕后,在指定的NodeManager上开启TaskManager进程,并启动指定数量的Slot
  • jobmanager调度任务给taskmanager

通信组件: akka和netty,akka主要用于组件间的简单通信,比如维持心跳等;而netty主要用于task间的数据传输等大数据量场景

常用算子及比较

  1. source
    readtextfilefromCollection
  2. transformer
    mapflatMapreduceaggeratefiterminBy
  3. sink
    printoutput
  4. 数据分发算子

    8(fink提供) + 1(用户自定义)
    • rebalance(默认)
      下游的task平均分配上游数据,做到不同taskmanager的所有task平均分配
    • rescale
      TaskManager内的数据负载均衡
    • shuffle
      随机分往下游的task
    • broadcast
      下游的所有task都有完整的一份上游数据
    • forward
      将数据发送到本地下游的0号分区
    • keyBy
      相同key的数据发往同一个task
    • globalpartitioner
      将上游的task的所有数据发往下游subtask的id为0 的算子执行

合流算子

union:合并多条具有相同数据类型的流
connect:合并两条任意类型的流

算子链

  • 算子链产生条件
    在flink提交任务的客户端,产生jobgraph时会形成算子链条,只有满足数据的分发规则为forward(one to one)并且上下游算子的并行度相同满足才可形成。此时多个task被一个线程执行。
  • 形成算子链的好处
    多个task在一个线程内执行,减少了task间的网络IO。

Slot的共享


slot共享:减少了所需slot的数量,提高了资源的利用率。比如现在有一个2G的slot,在flink中slot是最小的资源分配单位,并且内存资源相互隔离,不同的slot间资源不能借调,但共享cpu资源;现在有task需要1G资源才能运行,假设task并行度为2,如果每个task单独分配slot的话,需要2个2G的slot,而使用slot共享的话,只需要一个2G的slot,资源利用率就提高了。

Watermark

Flink的时间语义

  • 事件时间
  • 处理时间
  • 注入时间

Watermark的理解

  • 评估事件的进展,利用水印可以解决乱序
  • watermark单调不减
  • 水印可以用来触发窗口计算,定时器等
  • 是一条特殊的时间戳,从指定的地方插入,向下游传递
  • 迟到数据: 数据的事件时间 < 当前的WM

Watermark的原理

java
自动换行:关
放大阅读
展开代码
/* *水印单调不减 */ @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } /* *水印周期性发送,默认200ms发送一次 */ @Override public void onPeriodicEmit(WatermarkOutput output) { // 水印为当前的最大时间戳并考虑数据的乱序容忍度 output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); }

多并行度下水印的传递

下游总是取当前收到的水印中最小值。

Flink迟到数据的处理

  • 推迟水印的推进<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  • 延迟窗口关闭:假设有窗口参与计算,可以延迟窗口关闭的时间。当窗口关闭时间到达,会触发依次计算,但是窗口不会关闭,只要在延迟窗口关闭的时间内到达,窗口将再次触发单独的以此计算,allowedLateness(Time.seconds(3))
  • 为task赋闲.withIdleness(Duration.ofSeconds(15));, 在多个并行度的算子间watermark传递时,每个并行度(task)只会取收到的最小watermark,当上游的某个task出现异常导致水印无法推进,可能会导致下游的窗口无法触发计算; 此时可以手动为task赋闲。

窗口

  1. 组成
    • 分配器(window assigners)
      把元素分配一个,0个 后者多个窗口
    • 计算函数(window function)
      窗口内的元素处理逻辑,常见的窗口处理函数,分为增量聚合函数(比如reduce,aggerate),和全量聚合函数(process等)
    • 清理器(window evictor)
      evictor可以在window function移除窗口中的元素,一般用于windowAll类窗口
    • 触发器(window trrigger)
      window何时会被求值以及何时发送求值结果,可以用来用来触发窗口计算,比如不用等待 end - 1才触发窗口计算
  2. 分类
    • 按照时间分类
      滚动,滑动,会话
    • 按照元素个数
      滚动,滑动
    • keyby
      keyBy后窗口
  3. 窗口范围
    • 窗口的区间 [start,end), 以滚动窗口为例,start为窗口长度的整数倍并向下取整,end = start + size
    • 窗口左闭右开,窗口中能够存在的最大时间end - 1
    • 窗口触发计算的时机,当时间进展 >= end -1
  4. 窗口的生命周期
    • 窗口创建的时机:当符合窗口的第一条元素到来时,创建窗口,并把窗口对象放在一个单实例的集合中
    • 窗口关闭:时间进展 >= end - 1 - orderness, orderness为乱序容忍度,分发水印时指定的
  5. 累计窗口
    累计窗口是一种table API中提供的特殊TVF函数,可以实现累加的效果。
    sql
    自动换行:关
    放大阅读
    展开代码
    SELECT * FROM TABLE( CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
    • 实现累计窗口的其他思路 例子:
      java
      自动换行:关
      放大阅读
      展开代码
      public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.socketTextStream("hadoop102",9899) // 监听输入的数据 a 1 转为 Tuple2《a,1》 .map(new MapFunction<String, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] line = value.split(","); return new Tuple2<>(line, Integer.parseInt(line) ); } }) // watermark分发 .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forMonotonousTimestamps() .withTimestampAssigner((e,t) -> e.f1 ) ) // 按照 tuple的 f0 ;即元素分组 .keyBy(e -> e.f0) // 滚动20秒的滚动窗口 .window(TumblingEventTimeWindows.of(Time.minutes(20))) // 设定触发器为2秒触发一次计算 .trigger(ContinuousEventTimeTrigger.of(Time.seconds(2))) // 增量累加reduce .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { value1.f1 = value1.f1 + value2.f1; return value1; } }) .print(); env.execute(); }

Flink状态

状态分类

  • 自管理状态
  • 托管状态
    • 键控状态:只能用于keyBy后的流上
    • 算子状态:适用于所有的算子
状态名 状态类型 描述 状态恢复和状态分发
listState 算子状态,键控状态 一组状态的集合 下游的所有task平均分配恢复后的状态
unionListState 算子状态 一组状态的集合 下游的每个task全量拥有恢复的一份状态
broadcastState 算子状态 广播类型的状态 分发给下游的每个算子一份状态
mapState 键控状态 k-v类型状态集合 每个key恢复之前的状态
valueState 算子状态,键控状态 单值类型状态 恢复之前的状态
reduceState 键控状态 聚合类型状态 恢复每个key之前聚合状态
aggregatingState 键控状态 聚合类型状态 恢复每个key之前的聚合状态

状态后端

flink 1.13版本后,hashmap存储在taskmanager的堆内内存中,而rockdb状态后端存储在Rocksdb(task manager的堆外内存的管理内存中)。
checkpoin的位置(比如hdfs上)由checkpoint配置单独指定,区别于1.13版本前的状态后端和checkpoint混合的情况。

容错机制

Checkpoint和Savepoint

Checkpoint (ck)默认flink保存在JobManager的堆内存中,但当JM的进程fail,也会丢失数据;故一般需要指定外部的持久化设备,比如hdfs(可以通过flink的执行环境设置),会周期自动执行ck

Savepoint (sv):为flink提供的手动模式,通过flink命令备份。

shell
自动换行:关
放大阅读
展开代码
bin/flink savepoint jobid 路径

也可通过flink的命令从savepoint恢复数据:

shell
自动换行:关
放大阅读
展开代码
bin/flink run -c 程序主类名 -s hdfs://hadoop102:8020/flink/fs/aa62c578adb3c37056280e0c74f3d689/chk-9 jar包

Checkpoint原理

使用异步的分布式快照算法。

  • 在jobmanager中产生barrier,并发往source算子,每轮的barrier单调递增。
  • 每个task接触到barrier就会触发task的checkpoint,checkpoint完成后会向 CheckpointCordinator汇报checkpoint的信息,比如存储位置等元数据信息。
  • 当本轮所有的task都做完了checkpoint,CheckpointCordinator认为本轮checkpoint完成。

ck异步体现: task在ck时,并不堵塞task执行。
ck快照:根据ck的方式,有hashmap和rockdb两种方式,快照保存有全量和增量之分。

Barrier对齐和不对齐

barrier对齐:算子在接受多个上游的barrier时,可能barrier有先后顺序到达,最先发送barrier给下游的上游算子的数据将会被下游缓存,等待所有上游算子barrier到齐,触发ck。

barrier不对齐:不管barrier是否对齐,下游处理继续处理上游数据,当晚到的barrier到时,触发ck。此次快照保留部分下次ck的数据,当fail回滚时,数据会被重复消费。

Task的重试策略

可以通过配置flink_conf.xml的配置文件对task的重启策略进行配置。当没有开启checkpoint时,task的重启策略失效;当开启了ck,但没有指定重启策略,默认使用Fixed Delay Restart Strategy

当task执行失败,开启checkpoint后会自动重试,有以下几种重试策略:

  • 固定重试次数和延迟的策略( Fixed Delay Restart Strategy):默认策略
  • 固定失败率的策略( Failure Rate Restart Strategy):固定时间段内失败次数固定
  • 无限重试策略( Exponential Delay Restart Strategy):重试无限次,但前后两次重试的间隔逐渐增大,达到最大值,回到初始值
  • 无重试策略(no restart strategy)

数据一致性

Flink的一致性保障

开启checkpoint。

端到端一致性

  1. source支持重复读
  2. flink开启ck
  3. sink外部系统支持事务或者幂等写入

Flink两阶段提交流程

flink两阶段提交

以kafka - flink - kafka传输为例:

  1. 预提交:flink向kafka写数据,第一阶段,每次来一条数据就写入下游的kafka,并标记数据为未提交。
  2. 正式提交:当所有的本轮所有的task完成checkpoint,jobmanager发布通知给sink,同时checkpoint一次完整的ck并修改事务提交状态为已提交。

当第一阶段预提交失败
回滚到上一条数据预提交成功的状态。

当第二次提交失败
从第一阶段预提交成功的位置,接续进行第二阶段;能够从预提交的位置继续正式提交得益于上一轮ck完成后,jobmanager做了一次完整的ckeckpoint。

实时项目中具体配置

  • flink写数据给kafka:flink开启checkpoint,设置精确一次;使用了flink的2pc,当读取下游的kafka数据时,需要读已提交的数据,即事务写入kafka。
  • flink写数据到hbase:利用了hbase rowkey的唯一性,即幂等写入hbase。
  • flink写数据到clickhouse:利用了replacingmergeTree,支持幂等写入clickhouse的表。

CEP编程

  1. 模式序列
  2. cep处理超时数据方式

Flink的优化

FlinkSQL优化

数据热点问题webUI查看

  • miniBatch开启
    对一些实时要求不高的指标,可以可用微批次功能,细节为使用状态存储数据,当批次达到处理时机才处理。

    java
    自动换行:关
    放大阅读
    展开代码
    // 初始化table environment TableEnvironment tEnv = ... // 获取 tableEnv的配置对象 Configuration configuration = tEnv.getConfig().getConfiguration(); // 设置参数: // 开启miniBatch configuration.setString("table.exec.mini-batch.enabled", "true"); // 批量输出的间隔时间 configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条 configuration.setString("table.exec.mini-batch.size", "20000");
  • LocalGlobal(两阶段聚合)开启 针对一些普通聚合场景,比如单独的sum,count等,可能在聚合时,出现数据热点(数据倾斜)。解决手段可以localAgg(第一阶段预聚合),globalAgg(最终聚合)减轻最终聚合task的压力。

    java
    自动换行:关
    放大阅读
    展开代码
    // 初始化table environment TableEnvironment tEnv = ... // 获取 tableEnv的配置对象 Configuration configuration = tEnv.getConfig().getConfiguration(); // 设置参数: // 开启miniBatch *******使用localglobal必须开启minibatch功能 configuration.setString("table.exec.mini-batch.enabled", "true"); // 批量输出的间隔时间 configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条 configuration.setString("table.exec.mini-batch.size", "20000"); // 开启LocalGlobal configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
  • SplitDistinct 场景当出现count(distinct)类似需求时,可能使用LocalGlobal效果不明显,则可以使用splitdistinct。

    sql
    自动换行:关
    放大阅读
    展开代码
    -- 按照a分组,统计b个数 -- 数据热点问题写法 SELECT a, COUNT(DISTINCT b) FROM T GROUP BY a; -- 手动sql调整写法 SELECT a, SUM(cnt) FROM ( SELECT a, COUNT(DISTINCT b) as cnt FROM T GROUP BY a, MOD(HASH_CODE(b), 1024) ) GROUP BY a;
    java
    自动换行:关
    放大阅读
    展开代码
    // 初始化table environment TableEnvironment tEnv = ... // 获取 tableEnv的配置对象 Configuration configuration = tEnv.getConfig().getConfiguration(); // 设置参数:(要结合minibatch一起使用) // 开启Split Distinct configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); // 第一层打散的bucket数目 configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");

    github代码地址

Flink资源优化

  • jobmanager内存,taskmanager内存
    shell
    自动换行:关
    放大阅读
    展开代码
    # flink-conf.yaml jobmanager.memory.process.size: 2048m taskmanager.memory.process.size: 4096m
  • taskmanager的slot个数 和消费的topic的分区数保持一致,项目中为2。

    一般使用kafka作为数据源,并设置source算子的并行度为kafka的topic的分区数,这样可以最大程度的避免数据丢失和数据倾斜问题。

    shell
    自动换行:关
    放大阅读
    展开代码
    bin/flink run \ -d \ -t yarn-per-job \ -p 2 \ -Drest.flamegraph.enabled=true \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dtaskmanager.memory.process.size=4096m \ -c com.atguigu.bigdata.tune.BackpressureApp \ /opt/testjars/flink_tuning-1.0-SNAPSHOT.jar
  • checkpoint的时间间隔
    1min,通过不断调整间隔发现集群1min时能够承受,并且业务没有毫秒级别的指标。
  • watermark的乱序程度
    不同调整水印的乱序程度,使用测流输出,迟到数据,发现为5s时,刚好没有迟到的数据。

大状态调优

  • 场景1:dwd层做页面日志浏览时,做了新老客户的纠正,日志中is_new标识用户是否为新用户,可能出现为老用户但识别我新用户的情况;使用键控状态valueState存储每个用户的首次登录的登录日期,此时出现大状态问题。
  • 场景2:dwd层做订单明细表时,使用了FlinkSQL实现,需要将订单明细表 join 订单表 取下单用户,flink默认join上的数据保留在状态中,时间一长,出现了大状态,有2G左右状态。
  1. 问题:早期使用了hashmap作为状态后端,后来发现job做checkpoint的时间越来越长,最终job直接失败。
  2. 解决: 1)使用了rockdb代替了hashmap。
    同时开启了rockdb的增量备份功能,默认的全量备份,发现每次状态也很大。
    开了rockdb的内存+ 磁盘备份功能。
    yaml
    自动换行:关
    放大阅读
    展开代码
    #设置为机械硬盘+内存模式 state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM
    2)在不影响业务情况下,flinkSQL设置了表的ttl,到期后自动清理状态。
    java
    自动换行:关
    放大阅读
    展开代码
    // 设置ttl 为5s,join上的数据存储在状态后5s清理 tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5));
  3. 效果:job不会再随着时间推进出现因为checkpoint超时而导致job失败的情况,状态变为了1M左右。

Flink数据倾斜

数据源数据倾斜

可以使用数据重分发算子比如shuffle(数据随机离散均匀分配),rescale(单个taskmanager的子任务均匀分配),rebalance(整个子任务间数据均匀分配)。

keyBy前单表聚合数据倾斜

  • API:利用flatmap攒批、预聚合
  • SQL:开启MiniBatch+LocalGlobal

LocalGlobal大概原理
类似于hadoop中combine和reduce,提前进行一次数据的预聚合,减少后续每个分组global聚合的数据量。 单表聚合倾斜代码
解决思路

keyBy前的数据倾斜可以从flink的webUI界面的每个Task接受数据大小判断,一般数据相差20倍以上,就称为数据倾斜。
使用了flatmap攒批次再keyBy之后:

keyBy后的窗口聚合计算数据倾斜

第一阶段聚合:key拼接随机数前缀或后缀,进行keyby、开窗、聚和
第二阶段:按照原来的key及windowEnd作keyby、聚合

keyBy后窗口聚合数据倾斜代码

解决思路

使用了二次聚合之后:

Flink的反压

Flink反压资料

反压可能原因

  1. subtask间的数据倾斜

    当metrics中floatingBuffersUsage 高、exclusiveBuffersUsage 低为有倾斜,因为少数 channel 占用了大部分的 Floating Buffer

  2. 用户的代码不够优化或者逻辑有误
  3. taskmanager的资源分配不合适
  4. 外部系统的数据速率与flink不匹配(数据源或者sink的外部系统)

反压的一般现象

  1. 状态过大导致的checkpoint超时,进而导致 flink 作业的失败
  2. flink 作业时间过长

定位手段

  1. 使用Flink提供的Web UI界面TaskManager中查看作业的运行情况
  2. 使用flink的savepoint命令手动存档,之后禁用算子链优化
  3. 在web ui界面查看metrics查看 task的参数
    • inpoolusage
    • outpoolusage
    • floatingBuffersUsage(一个taskmanager中的所有task共同使用Network BufferPool)
    • exclusiveBuffersUsage
  4. 火焰图功能开启
    火焰图中横向标识task中各个方法被采样的频率,可以视为方法的执行时间;纵向标识方法的调用顺序;可以寻找火焰图中的“大坪顶”,定位task具体具体的问题代码
  5. 具体场景
    • 在一个job中,由于对业务的理解不到位,在算子中写了一个死循环,循环最终不断累加了一个值,并发给了下游算子
    • 问题:使用的是yarn的per-job模式,主要通过flink的webUI观察job流图,发现出现了反压
    • 定位: 1)观察数据链路,kafka - flink - kafka,通过查看 kafka监控工具eagle(数据条数和消费offset之差lag),发现某个topic分区出现积压,以为是kafka的消费吞吐问题,但是我们使用kafka的消费者消费topic的数据发现能够正常消费到,最终排除kafka问题;
      2)同时也排除了网络的故障问题可能导致的问题,之后怀疑是下游的flink作业出现了问题,通过flink的提供的web ui界面,发现出现大量的反压算子,并且每个算子的收发数据量都比较均衡,排除了数据倾斜问题;
      3)最有可能是flink的反压造成的,之后通过手动执行savapoint进行flink作业的存档,并禁用了算子链优化,并开启了火焰图,观察每个算子的一些指标,定位了出现反压的问题代码
    • 解决:修改代码死循环
    • 效果:job正常执行

Flink SQL

Flink的Join

Stream API

flink StreamAPI join

  1. window join (tumbling window,sliding window,session window)
  2. interval join(间隔join)

SQL API

flink sql join

  1. regular join(inner join, left join,full join)
  2. temporary join(保留了动态表的一些join历史,多版本的)
  3. interval join(间隔join的sql用法)
  4. lookup join(特殊的版本表join,比如join维度时可以使用)

Flink和Spark Streaming比较

  1. flink的状态编程
  2. flink的水印策略
  3. flink是真正的流处理,sparkStreaming为微批次处理

FlinkSQL的工作机制

  1. 用户通过TableAPI或者FlinkSQL编写代码之后,由calcite的动态数据管理框架,提供的sql的解析,验证,根据编写的流处理或者批处理判断使用的优化手段,得到DataStream API Transformation
  2. 如果使用的是基于yarn的per-job模式,在client,进行算子链的合并,对符合条件的多个算子合并为一个算子,减少了task之间的网络传输,得到jobGraph
  3. 在jobmaster中将按照task的并行度展开,获得ExcutionGraph,最后提交yarn执行

Flink内存模型

Taskmanager内存模型

Component Description
Framework Heap Memory 框架堆内内存,128M,JVM Heap memory dedicated to Flink framework (advanced option)
Task Heap Memory Task堆内内存,JVM Heap memory dedicated to Flink application to run operators and user code
Managed memory 管理内存,Native memory managed by Flink, reserved for sorting, hash tables, caching of intermediate results and RocksDB state backend
Framework Off-heap Memory 框架堆外内存,128M,Off-heap direct (or native) memory dedicated to Flink framework (advanced option)
Task Off-heap Memory 默认0,表示不使用堆外内存,Off-heap direct (or native) memory dedicated to Flink application to run operators
Network Memory 网络内存,Direct memory reserved for data record exchange between tasks (e.g. buffering for the transfer over the network),This memory is used for allocation of network buffers
JVM metastore JVM元空间256m,Metaspace size of the Flink JVM process
JVM Overhead JVM执行开销,taskmanager memory * 0.1 ,Native memory reserved for other JVM overhead: e.g. thread stacks, code cache, garbage collection space etc

杂记

Flink算子的并行度如何设置?

Flink可以设置算子并行度地方:

  1. flink-conf.yaml:服务器配置所有的 flink 程序的默认并行度
    image-20250819111024149
  2. Job程序中:调用 StreamExecutionEnvironment 设置任务的全局并行度
    java
    自动换行:关
    放大阅读
    展开代码
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4);
  3. 每个算子单独设置
    java
    自动换行:关
    放大阅读
    展开代码
    public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.fromData("world", "hello", "world","data","data","hello") .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value, 1); } }).setParallelism(3) .keyBy(tup -> tup.f0) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce( Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }) .print(); env.execute(); }
  4. 启动时传参
    shell
    自动换行:关
    放大阅读
    展开代码
    flink -c xxx.xxx.Main xxx/xxx/xx.jar -p 4

如何合理设置并行度

  1. 对于 source 算子和 sink 算子
    如果外面连接存储有分区的概念,建议并行度设置与外部存储分区成整数倍关系。以读写 kafka 为例:
    image-20250819111121066
  2. 对于 transform 算子 推荐使用 "启动 flink 任务的时候通过传参设置任务的全局并行度"。

Flink算子的最大并行度

实验

  1. 测试程序

    java
    自动换行:关
    放大阅读
    展开代码
    public class MaxParallelism { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); System.setProperty("HADOOP_USER_NAME", "atguigu"); // checkpoint设置 env.enableCheckpointing(1000); // 其他可选配置如下: env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); // env.getCheckpointConfig().enableUnalignedCheckpoints(); env.getCheckpointConfig().setExternalizedCheckpointCleanup( org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/flink_tolerance"); env.setMaxParallelism(512); // 初始设置为 512 env.socketTextStream("hadoop102", 9999) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return new Tuple2<>(value, 1); } }) .keyBy(tup -> tup.f0) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce( Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }) .print(); env.execute(); } }
  2. 测试步骤

    • 先设置 flink 程序的全局算子最大并行度为 512。
    • 通过 flink 的 savepoint,手动保存检查点,手动取消任务。
    • 修改 flink 程序的算子最大并行度为 520,并上传 jar,提交任务时指定从上次的 savepoint 路径恢复任务。
    • 观察提交日志。

    此处以 yarn-session 模拟测试,启动 yarn-session 模式

    shell
    自动换行:关
    放大阅读
    展开代码
    ./bin/flink run -t yarn-session

    打包程序并提交

    shell
    自动换行:关
    放大阅读
    展开代码
    ./bin/flink run -t yarn-session -c cn.hedeoer.chapter01.physicalpartitioning.MaxParallelism ./job-jars/flink-1.0-SNAPSHOT.jar

    image-20250819111151140 日志打印结果: image-20250819111215378

    创建 savepoint:

    shell
    自动换行:关
    放大阅读
    展开代码
    bin/flink savepoint 6bbe6ab947c53044f68c070eca495c1f hdfs://hadoop102:8020/savepoint -yid application_1727501769142_0001

    image-20250819111231364

    修改程序算子的最大并行度:

    java
    自动换行:关
    放大阅读
    展开代码
    // 之前为512 env.setMaxParallelism(520);

    打包上传并从 savepoint 恢复任务: image-20250819111245299

    报错:关键日志如下

    plaintext
    自动换行:关
    放大阅读
    展开代码
    Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint state and new program. Cannot map operator cbc357ccb763df2852fee8c4fc7d55f2 with max parallelism 512 to new program with max parallelism 520. This indicates that the program has been changed in a non-compatible way after the checkpoint/savepoint.

    原因:flink 程序回滚恢复 savepoint 数据时,由于 savepoint 前后算子最大并行度的不一致,导致无法读取 checkpoint 的元数据,最终恢复任务失败!

原因讲解

flink官网对于算子最大并行度的解释

简述:算子的最大并行度在没有用户设置的情况下具有默认值(根据每个算子的并行度计算得出),算子的最大并行度是 flink 的任务在上线后续如果有扩容(指提高算子并行度)的情况下,提供状态容错和恢复的保障。相关源码可以参考类:org.apache.flink.runtime.state.KeyGroupRangeAssignment

启示:算子的最大并行度需要在首次编程时就严格考虑,上线程序后不能更改,区别于算子的并行度。

Flink数据物理分区策略

rescale

上游算子每个子任务均匀向下游任务的部分子任务轮询发送数据。保证了部分子任务的数据均衡。是一种优化的重平衡分区,数据按照上下游任务的并行度比例进行分配,使得数据尽量保持本地性(同一个taskmanager)。减少了不同taskmanager间的网络通信传输重新分配数据。 image-20250819111303695 代码:

java
自动换行:关
放大阅读
展开代码
public class RescaleExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.addSource(new MyAcSource()).rescale() .map(new RichMapFunction<DataModel, DataModel>() { @Override public DataModel map(DataModel value) throws Exception { int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); value.setTransSubTaskName("TransTask" + indexOfThisSubtask); return value; } }).setParallelism(4) .print(); env.execute(); } }

效果:
image-20250819111323195

rebalance

上游的每个子任务向下游的所有子任务轮询发送数据。将数据均匀地随机分配到所有下游并行子任务中,确保每个子任务处理的数据量相对均衡。 image-20250819111337149

java
自动换行:关
放大阅读
展开代码
public class RebalanceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.addSource(new MyAcSource()).rebalance() .map(new RichMapFunction<DataModel, DataModel>() { @Override public DataModel map(DataModel value) throws Exception { int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); value.setTransSubTaskName("TransTask" + indexOfThisSubtask); return value; } }).setParallelism(4) .print(); env.execute(); } }

效果:
image-20250819111402761 实际应用:
image-20250819111505277

shuffle

数据打乱发送下游。

forward

上游子任务和下游子任务之间的数据传输是一对一的关系。是 flink 算子默认的数据分配策略,如果不满足 forward 的条件,flink 会选择 rebalance 策略。

算子链形成条件:

  • 上下游算子并行度相同
  • ChainingStrategy(每个 operate 的可链接策略)为ALWAYS 或者 HEAD

可以通过调用算子方法实现算子链的断开,或开启新的算子链。

java
自动换行:关
放大阅读
展开代码
// 当数据必须保持严格顺序处理,且上下游并行度相同时 public class ForwardExample { public static void main(String[] args) throws Exception { Configuration config = new Configuration(); config.setString("rest.port", "8081"); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(config); env.setParallelism(2); env.addSource(new MyAcSource()) .map(new RichMapFunction<DataModel, DataModel>() { @Override public DataModel map(DataModel value) throws Exception { int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); value.setTransSubTaskName("TransTask" + indexOfThisSubtask); return value; } }) .print().disableChaining(); env.execute(); } }

WebUI 图:

broadcast

将数据复制并发送到所有下游并行子任务,每个子任务都接收到所有数据。 应用场景:大小流的双流 join 场景。

keyBy

根据指定的 key 对数据流进行哈希分区,确保相同 key 的数据被发送到同一个下游子任务。

本文作者:hedeoer

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!