shuffle流程和优化
graph TD
%% ------------------- STAGE 1: INPUT & MAP -------------------
subgraph "第一阶段: 输入与 Map"
direction LR
Input1["<b>输入分片 1</b><br/>'Hello World Bye World'"]
Input2["<b>输入分片 2</b><br/>'Hello Hadoop Goodbye Hadoop'"]
subgraph "Mapper 1 节点"
M1_Out["map() 输出<br/>(Hello,1), (World,1)<br/>(Bye,1), (World,1)"]
end
subgraph "Mapper 2 节点"
M2_Out["map() 输出<br/>(Hadoop,1), (Hello,1)<br/>(Goodbye,1), (Hadoop,1)"]
end
Input1 --> M1_Out
Input2 --> M2_Out
end
%% ------------------- STAGE 2: SHUFFLE (MAP SIDE) -------------------
subgraph "第二阶段: Shuffle (Map 端)"
direction LR
subgraph "Mapper 1 本地输出"
style M1_P0 fill:#cde4ff
style M1_P1 fill:#cde4ff
M1_P0["<b>分区 0</b><br/>(Hello, 1)"]
M1_P1["<b>分区 1</b><br/>(Bye, 1)<br/>(World, 2)"]
end
subgraph "Mapper 2 本地输出"
style M2_P0 fill:#cde4ff
M2_P0["<b>分区 0</b><br/>(Goodbye, 1)<br/>(Hadoop, 2)<br/>(Hello, 1)"]
end
end
M1_Out -- "分区、排序、合并" --> M1_P0 & M1_P1
M2_Out -- "分区、排序、合并" --> M2_P0
%% ------------------- STAGE 3: SHUFFLE (REDUCE SIDE) -------------------
subgraph "第三阶段: Shuffle (Reduce 端)"
subgraph "Reducer 0 节点"
style R0_Merged fill:#d5e8d4
R0_Merged["<b>拉取并归并排序后</b><br/>(Goodbye, 1)<br/>(Hadoop, 2)<br/>(Hello, 1)<br/>(Hello, 1)"]
end
subgraph "Reducer 1 节点"
style R1_Merged fill:#d5e8d4
R1_Merged["<b>拉取并归并排序后</b><br/>(Bye, 1)<br/>(World, 2)"]
end
end
M1_P0 -.-> R0_Merged
M2_P0 -. "网络拷贝 (Copy)" .-> R0_Merged
M1_P1 -.-> R1_Merged
%% ------------------- STAGE 4: REDUCE & OUTPUT -------------------
subgraph "第四阶段: Reduce 与输出"
R0_Grouped["<b>分组 (Group)</b><br/>(Goodbye, <1>)<br/>(Hadoop, <2>)<br/>(Hello, <1, 1>)"]
R1_Grouped["<b>分组 (Group)</b><br/>(Bye, <1>)<br/>(World, <2>)"]
R0_Out["<b>reduce() 输出</b><br/>(Goodbye, 1)<br/>(Hadoop, 2)<br/>(Hello, 2)"]
R1_Out["<b>reduce() 输出</b><br/>(Bye, 1)<br/>(World, 2)"]
end
R0_Merged -- "按Key分组" --> R0_Grouped -- "执行reduce()" --> R0_Out
R1_Merged -- "按Key分组" --> R1_Grouped -- "执行reduce()" --> R1_Out
subgraph "最终输出 (HDFS)"
direction LR
FinalOut0["part-r-00000"]
FinalOut1["part-r-00001"]
end
R0_Out --> FinalOut0
R1_Out --> FinalOut1
第一阶段:输入与 Map
-
Input Data: 原始数据文件被逻辑切分成两个输入分片(Input Splits),分别送给两个 Mapper。
Input Split 1: "Hello World Bye World"
Input Split 2: "Hello Hadoop Goodbye Hadoop"
-
Mapping: 每个 Mapper 将其输入分片中的文本行转换为 <Key, Value> 对,即 <word, 1>。
- Mapper 1 输出:
(Hello, 1), (World, 1), (Bye, 1), (World, 1)
- Mapper 2 输出:
(Hello, 1), (Hadoop, 1), (Goodbye, 1), (Hadoop, 1)
第二阶段:Shuffle & Sort (核心)
这是连接 Map 和 Reduce 的桥梁,也是最复杂的部分。
Map 端 Shuffle:
-
Partitioning: 在每个 Mapper 内部,输出的 <Key, Value> 对根据 Key 的哈希值被分配到不同的分区(Partition)。这里我们有两个 Reducer,所以有两个分区 (Partition 0, Partition 1)。
- Mapper 1:
Hello -> Partition 0
World, Bye -> Partition 1
- Mapper 2:
Hello, Hadoop, Goodbye -> Partition 0
-
Sort & Combine (in Memory Buffer): 在每个分区内部,数据按 Key 进行排序。如果配置了 Combiner,相同 Key 的值会被预先合并。
- Mapper 1:
- Partition 0:
(Hello, 1)
- Partition 1:
(Bye, 1), (World, 1), (World, 1) -> 合并后变为 (Bye, 1), (World, 2)
- Mapper 2:
- Partition 0:
(Goodbye, 1), (Hadoop, 1), (Hadoop, 1), (Hello, 1) -> 合并后变为 (Goodbye, 1), (Hadoop, 2), (Hello, 1)
此时,每个 Mapper 的本地磁盘上都生成了一个包含多个分区的、有序的输出文件。
Reduce 端 Shuffle:
-
Copy: 每个 Reducer 通过网络从所有 Mapper 拉取(Copy)属于自己分区的数据。
- Reducer 0 拉取所有 Mapper 的 Partition 0 数据。
- Reducer 1 拉取所有 Mapper 的 Partition 1 数据。
-
Merge & Sort: Reducer 将从不同 Mapper 拉取来的数据片段进行归并排序,形成一个单一的、全局有序的数据列表。
- Reducer 0's Merged Data:
(Goodbye, 1), (Hadoop, 2), (Hello, 1), (Hello, 1)
- Reducer 1's Merged Data:
(Bye, 1), (World, 2)
-
Grouping: 框架自动对排好序的数据进行分组,将相同 Key 的所有 Value 收集到一个迭代器中,准备送入 reduce() 方法。
- Input for Reducer 0:
(Goodbye, <1>), (Hadoop, <2>), (Hello, <1, 1>)
- Input for Reducer 1:
(Bye, <1>), (World, <2>)
第三阶段:Reduce 与输出
-
Reducing: reduce() 方法被调用,对每个 Key 和其对应的 Value 列表进行处理(求和)。
- Reducer 0 调用三次:
reduce("Goodbye", <1>) -> 输出 (Goodbye, 1)
reduce("Hadoop", <2>) -> 输出 (Hadoop, 2)
reduce("Hello", <1, 1>) -> 输出 (Hello, 2)
- Reducer 1 调用两次:
reduce("Bye", <1>) -> 输出 (Bye, 1)
reduce("World", <2>) -> 输出 (World, 2)
-
Final Output: 每个 Reducer 将其结果写入 HDFS 上的输出文件。通常每个 Reducer 会生成一个文件,如 part-r-00000 和 part-r-00001。
hdfs的读写流程
yarn的任务提交流程
yarn的调度器
hdfs的小文件问题
hadoop的压缩选择
- 主要的压缩指标 解压缩速率 压缩比
- 主要的格式
- gzip 不支持切片 压缩率高
- snappy 不支持切片,解压缩速率高
- lzo 支持切片,解压缩速率快,压缩比一般
- mr任务中压缩场景
- 读取文件是否使用压缩 主要考虑解压缩速率
- maptask落盘是否压缩 主要考虑解压缩速率
- reuduce后是否压缩 具体分析
hadoop的HA
hdfs的HA
HA组件主要功能
journalnode:负责多个ha节点间的数据传输
zkfc:和zookeeper保持联系,更新集群的状态
故障转移原理(自动转移模式)
- nn假死,zkfc进程检测到并更新zookeeper的注册信息
- standBy的zkfc通信获得active的假死状态,并对其进行kill
- standby的节点通过zookeeper的选举机制产生新的active namenode
搭建大概流程
- 修改core-site.xml文件,注册ha的集群
- 修改hdf-site.xml文件,启动nn的自动故障转移
- 分发修改后的配置文件
- 关闭hdfs的服务
- 启动zookeeper的集群,之后使用
hdfs zkfc --formatZK初始化HA再zk中的状态
yarn的HA
搭建大概流程
- 主要修改yarn-site.xml配置文件,其中主要指定开启rm的HA,并分别指定多台rm的配置
- 启动yarn即可