首页标签分类
01MapReduce和Yarn
2024-06-03 · 更新 2026-03-03约 4 分钟 · 883 字
大数据杂文记
000

目录

shuffle流程和优化
第一阶段:输入与 Map
第二阶段:Shuffle & Sort (核心)
第三阶段:Reduce 与输出
hdfs的读写流程
yarn的任务提交流程
yarn的调度器
hdfs的小文件问题
hadoop的压缩选择
hadoop的HA
hdfs的HA
yarn的HA

shuffle流程和优化

graph TD %% ------------------- STAGE 1: INPUT & MAP ------------------- subgraph "第一阶段: 输入与 Map" direction LR Input1["<b>输入分片 1</b><br/>'Hello World Bye World'"] Input2["<b>输入分片 2</b><br/>'Hello Hadoop Goodbye Hadoop'"] subgraph "Mapper 1 节点" M1_Out["map() 输出<br/>(Hello,1), (World,1)<br/>(Bye,1), (World,1)"] end subgraph "Mapper 2 节点" M2_Out["map() 输出<br/>(Hadoop,1), (Hello,1)<br/>(Goodbye,1), (Hadoop,1)"] end Input1 --> M1_Out Input2 --> M2_Out end %% ------------------- STAGE 2: SHUFFLE (MAP SIDE) ------------------- subgraph "第二阶段: Shuffle (Map 端)" direction LR subgraph "Mapper 1 本地输出" style M1_P0 fill:#cde4ff style M1_P1 fill:#cde4ff M1_P0["<b>分区 0</b><br/>(Hello, 1)"] M1_P1["<b>分区 1</b><br/>(Bye, 1)<br/>(World, 2)"] end subgraph "Mapper 2 本地输出" style M2_P0 fill:#cde4ff M2_P0["<b>分区 0</b><br/>(Goodbye, 1)<br/>(Hadoop, 2)<br/>(Hello, 1)"] end end M1_Out -- "分区、排序、合并" --> M1_P0 & M1_P1 M2_Out -- "分区、排序、合并" --> M2_P0 %% ------------------- STAGE 3: SHUFFLE (REDUCE SIDE) ------------------- subgraph "第三阶段: Shuffle (Reduce 端)" subgraph "Reducer 0 节点" style R0_Merged fill:#d5e8d4 R0_Merged["<b>拉取并归并排序后</b><br/>(Goodbye, 1)<br/>(Hadoop, 2)<br/>(Hello, 1)<br/>(Hello, 1)"] end subgraph "Reducer 1 节点" style R1_Merged fill:#d5e8d4 R1_Merged["<b>拉取并归并排序后</b><br/>(Bye, 1)<br/>(World, 2)"] end end M1_P0 -.-> R0_Merged M2_P0 -. "网络拷贝 (Copy)" .-> R0_Merged M1_P1 -.-> R1_Merged %% ------------------- STAGE 4: REDUCE & OUTPUT ------------------- subgraph "第四阶段: Reduce 与输出" R0_Grouped["<b>分组 (Group)</b><br/>(Goodbye, <1>)<br/>(Hadoop, <2>)<br/>(Hello, <1, 1>)"] R1_Grouped["<b>分组 (Group)</b><br/>(Bye, <1>)<br/>(World, <2>)"] R0_Out["<b>reduce() 输出</b><br/>(Goodbye, 1)<br/>(Hadoop, 2)<br/>(Hello, 2)"] R1_Out["<b>reduce() 输出</b><br/>(Bye, 1)<br/>(World, 2)"] end R0_Merged -- "按Key分组" --> R0_Grouped -- "执行reduce()" --> R0_Out R1_Merged -- "按Key分组" --> R1_Grouped -- "执行reduce()" --> R1_Out subgraph "最终输出 (HDFS)" direction LR FinalOut0["part-r-00000"] FinalOut1["part-r-00001"] end R0_Out --> FinalOut0 R1_Out --> FinalOut1

第一阶段:输入与 Map

  1. Input Data: 原始数据文件被逻辑切分成两个输入分片(Input Splits),分别送给两个 Mapper。

    • Input Split 1: "Hello World Bye World"
    • Input Split 2: "Hello Hadoop Goodbye Hadoop"
  2. Mapping: 每个 Mapper 将其输入分片中的文本行转换为 <Key, Value> 对,即 <word, 1>

    • Mapper 1 输出: (Hello, 1), (World, 1), (Bye, 1), (World, 1)
    • Mapper 2 输出: (Hello, 1), (Hadoop, 1), (Goodbye, 1), (Hadoop, 1)

第二阶段:Shuffle & Sort (核心)

这是连接 Map 和 Reduce 的桥梁,也是最复杂的部分。

Map 端 Shuffle:

  1. Partitioning: 在每个 Mapper 内部,输出的 <Key, Value> 对根据 Key 的哈希值被分配到不同的分区(Partition)。这里我们有两个 Reducer,所以有两个分区 (Partition 0, Partition 1)。

    • Mapper 1:
      • Hello -> Partition 0
      • World, Bye -> Partition 1
    • Mapper 2:
      • Hello, Hadoop, Goodbye -> Partition 0
  2. Sort & Combine (in Memory Buffer): 在每个分区内部,数据按 Key 进行排序。如果配置了 Combiner,相同 Key 的值会被预先合并。

    • Mapper 1:
      • Partition 0: (Hello, 1)
      • Partition 1: (Bye, 1), (World, 1), (World, 1) -> 合并后变为 (Bye, 1), (World, 2)
    • Mapper 2:
      • Partition 0: (Goodbye, 1), (Hadoop, 1), (Hadoop, 1), (Hello, 1) -> 合并后变为 (Goodbye, 1), (Hadoop, 2), (Hello, 1)

    此时,每个 Mapper 的本地磁盘上都生成了一个包含多个分区的、有序的输出文件。

Reduce 端 Shuffle:

  1. Copy: 每个 Reducer 通过网络从所有 Mapper 拉取(Copy)属于自己分区的数据。

    • Reducer 0 拉取所有 Mapper 的 Partition 0 数据。
    • Reducer 1 拉取所有 Mapper 的 Partition 1 数据。
  2. Merge & Sort: Reducer 将从不同 Mapper 拉取来的数据片段进行归并排序,形成一个单一的、全局有序的数据列表。

    • Reducer 0's Merged Data: (Goodbye, 1), (Hadoop, 2), (Hello, 1), (Hello, 1)
    • Reducer 1's Merged Data: (Bye, 1), (World, 2)
  3. Grouping: 框架自动对排好序的数据进行分组,将相同 Key 的所有 Value 收集到一个迭代器中,准备送入 reduce() 方法。

    • Input for Reducer 0: (Goodbye, <1>), (Hadoop, <2>), (Hello, <1, 1>)
    • Input for Reducer 1: (Bye, <1>), (World, <2>)

第三阶段:Reduce 与输出

  1. Reducing: reduce() 方法被调用,对每个 Key 和其对应的 Value 列表进行处理(求和)。

    • Reducer 0 调用三次:
      • reduce("Goodbye", <1>) -> 输出 (Goodbye, 1)
      • reduce("Hadoop", <2>) -> 输出 (Hadoop, 2)
      • reduce("Hello", <1, 1>) -> 输出 (Hello, 2)
    • Reducer 1 调用两次:
      • reduce("Bye", <1>) -> 输出 (Bye, 1)
      • reduce("World", <2>) -> 输出 (World, 2)
  2. Final Output: 每个 Reducer 将其结果写入 HDFS 上的输出文件。通常每个 Reducer 会生成一个文件,如 part-r-00000part-r-00001

    • Output File 0:
      plaintext
      自动换行:关
      放大阅读
      展开代码
      Goodbye 1 Hadoop 2 Hello 2
    • Output File 1:
      plaintext
      自动换行:关
      放大阅读
      展开代码
      Bye 1 World 2

hdfs的读写流程

yarn的任务提交流程

yarn的调度器

hdfs的小文件问题

hadoop的压缩选择

  1. 主要的压缩指标 解压缩速率 压缩比
  2. 主要的格式
    • gzip 不支持切片 压缩率高
    • snappy 不支持切片,解压缩速率高
    • lzo 支持切片,解压缩速率快,压缩比一般
  3. mr任务中压缩场景
    • 读取文件是否使用压缩 主要考虑解压缩速率
    • maptask落盘是否压缩 主要考虑解压缩速率
    • reuduce后是否压缩 具体分析

hadoop的HA

hdfs的HA

HA组件主要功能
journalnode:负责多个ha节点间的数据传输
zkfc:和zookeeper保持联系,更新集群的状态 故障转移原理(自动转移模式)

  1. nn假死,zkfc进程检测到并更新zookeeper的注册信息
  2. standBy的zkfc通信获得active的假死状态,并对其进行kill
  3. standby的节点通过zookeeper的选举机制产生新的active namenode 搭建大概流程
  4. 修改core-site.xml文件,注册ha的集群
  5. 修改hdf-site.xml文件,启动nn的自动故障转移
  6. 分发修改后的配置文件
  7. 关闭hdfs的服务
  8. 启动zookeeper的集群,之后使用 hdfs zkfc --formatZK初始化HA再zk中的状态

yarn的HA

搭建大概流程

  1. 主要修改yarn-site.xml配置文件,其中主要指定开启rm的HA,并分别指定多台rm的配置
  2. 启动yarn即可

本文作者:hedeoer

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!