0%

MapReduce

编程模型 (Programming Model)

输入与输出 (Input and Output):

  • 接收一组输入键/值对 (input key/value pairs)
  • 生成一组输出键/值对 (output key/value pairs)

用户自定义函数 (User-Defined Functions):

  • 映射函数 (Map Function):
    • 由用户编写。
    • 处理每个输入键/值对。
    • 生成一组中间键/值对 (intermediate key/value pairs)
  • 归约函数 (Reduce Function):
    • 同样由用户编写。
    • 接收一个中间键 I 及其关联的值集合 (set of values)
    • 合并这些值以产生一个更小集合 (smaller set) 的输出值,通常为零个或一个值。

中间数据处理 (Intermediate Data Handling):

  • MapReduce 库 (MapReduce library) 将中间值按其键 (I) 分组,并将它们发送给归约函数。
  • 中间值通过迭代器 (iterator) 提供给归约函数,从而能够高效处理因数据量过大而无法全部放入内存的数据集。

容错性与可扩展性 (Fault Tolerance and Scalability):

  • 通过将任务分解成更小的独立计算单元,MapReduce 确保了即使在大型分布式环境中也能实现可扩展性和容错性。

实现 (Implementation)

img

数据分割与任务分配 (Data Splitting and Task Assignment):

  • 输入数据划分: MapReduce 库自动将输入文件分割成 M 个片段(通常每个片段大小为 16MB 到 64MB,可由用户控制)。
  • 启动程序实例: 在集群中启动多个程序副本。
  • 角色分配: 其中一个程序实例被指定为主节点 (master),其余的作为工作节点 (workers)

任务调度 (Task Scheduling):

  • 主节点的职责: 主节点负责管理 M 个 map 任务和 R 个 reduce 任务。
  • 任务分配: 主节点将空闲的工作节点分配给 map 任务或 reduce 任务。

Map 阶段 (Map Phase):

  • 读取数据: 被分配 map 任务的工作节点读取对应的输入片段。
  • 处理数据: 解析出键/值对,并将其传递给用户定义的 Map 函数。
  • 生成中间结果: Map 函数产生的中间键/值对会存储在本地磁盘中。

中间数据处理 (Intermediate Data Processing):

  • 写入本地磁盘: **缓存的中间结果会定期写入本地磁盘,并根据分区函数划分为 R 个区域 (partitioned into R regions)。
  • 通知主节点: 工作节点将这些中间数据的位置告知主节点,主节点负责将这些信息传递给 reduce 工作节点。

Reduce 阶段准备 (Reduce Phase Preparation):

  • 读取中间数据: reduce 工作节点收到主节点的通知后,通过远程过程调用 (RPC - Remote Procedure Call) 从 map 工作节点的本地磁盘读取中间数据。
  • 排序数据: reduce 工作节点将所有中间数据按键排序,以确保相同的键聚集在一起。如果数据量过大,无法全部加载到内存,会采用外部排序 (external sort)

Reduce 阶段 (Reduce Phase):

  • 执行 Reduce 函数: reduce 工作节点遍历排序后的中间数据,对于每个唯一的中间键,将键和对应的值列表传递给用户定义的 Reduce 函数。
  • 生成最终输出: Reduce 函数的输出被追加到该 reduce 分区的最终输出文件中。

任务完成与结果返回 (Task Completion and Result Retrieval):

  • 任务监控: 当所有的 map 和 reduce 任务都完成后,主节点会唤醒用户程序。
  • 返回结果: 此时,用户程序中的 MapReduce 调用返回,用户可以获取 R 个输出文件(每个 reduce 任务对应一个输出文件)。

额外说明 (Additional Notes):

  • 数据处理链: 通常用户不需要将这 R 个输出文件合并成一个文件,因为这些文件可以直接作为下一个 MapReduce 调用的输入,或者被能够处理多文件输入的分布式应用程序使用。
  • 流程图参考: 上图👆用于展示 MapReduce 操作的整体流程(对应上述 7 个步骤)。

主节点数据结构 (Master Data Structure)

任务状态跟踪 (Task State Tracking):

  • 对于每个 mapreduce 任务,主节点存储:
    • 状态:
      • idle (空闲): 任务尚未分配。
      • in-progress (执行中): 任务正在被执行。
      • completed (已完成): 任务执行完毕。
    • 工作节点标识 (Worker Identity): 处理该任务的工作机器(针对非空闲任务)。

中间数据管理 (Intermediate Data Management):

  • 主节点充当将中间数据从 map 任务传递到 reduce 任务的管道 (conduit)
  • 对于每个已完成的 map 任务:
    • 它记录所生成的 R 个中间文件区域的位置大小
    • 这些数据对于 reduce 任务从相应的 map 工作节点获取中间结果至关重要。

动态更新 (Dynamic Updates):

  • 随着 map 任务完成,主节点持续更新其记录的中间文件位置和大小。
  • 这些更新会增量式地推送给当前正在执行中的 reduce 工作节点。

容错机制 (Fault Tolerance)

工作节点故障 (Worker Failure)

  • 故障检测 (Failure Detection):
    • 主节点定期向每个工作节点发送 ping
    • 如果工作节点在特定时间窗口内未响应,主节点将其标记为故障
  • 任务重新调度 (Task Rescheduling):
    • Map 任务 (Map Tasks):
      • 已完成的 Map 任务 (Completed Map Tasks):
        • 如果故障工作节点已完成 map 任务,其输出将变得不可访问(存储在故障机器的本地磁盘上)。
        • 这些任务被重置为空闲状态 (idle state) 并在其他工作节点上重新执行。
      • 执行中的 Map 任务 (In-Progress Map Tasks):
        • 类似地,执行中的任务被标记为空闲并重新分配给可用的工作节点。
    • Reduce 任务 (Reduce Tasks):
      • 已完成的 Reduce 任务 (Completed Reduce Tasks):
        • 这些任务不需要重新执行,因为它们的输出存储在全局文件系统 (global file system) 中,即使发生故障也仍然可访问。
      • 执行中的 Reduce 任务 (In-Progress Reduce Tasks):
        • 如果某些 reduce 工作节点尚未读取中间数据,它们会从新的(重新执行的 map 任务的)结果中读取数据。
    • 数据协调 (Data Coordination):
      • 当 map 任务在新的工作节点上重新执行时:
        • 通知 (Notification): 所有 reduce 工作节点会被告知该重新执行。
        • 数据重定向 (Data Redirection): 尚未从故障工作节点获取中间数据的 reduce 工作节点将改为从新的工作节点获取数据。

主节点故障 (Master Failure)

  • 可以很方便地让主节点定期将上述主节点数据结构写入检查点 (checkpoints)。如果主节点任务终止,可以从最后一个检查点状态启动一个新的副本。
  • 然而,考虑到只有一个主节点,其故障的可能性很低;因此我们当前的实现在主节点故障时会中止 MapReduce 计算。 客户端可以检测到此情况,并在需要时重试 MapReduce 操作。

数据本地化 (Locality)

  • 存储设计: 数据存储在 Google 文件系统 (GFS - Google File System) 中。GFS 将每个文件分割为 64 MB 的块 (blocks),并在不同的机器上保存多个副本(通常是 3 个)。
  • 任务调度优先级:
    • 优先本地化调度: 主节点优先将 map 任务分配给包含对应数据块副本的同一台机器上的工作节点。
    • 次优调度: 如果本地调度不可行(例如,拥有数据块副本的工作节点繁忙),主节点将任务分配给靠近副本的机器,例如同一机架 (rack) 或数据中心 (data center) 内的机器。
  • 实际效果: 在运行大型 MapReduce 操作时,大部分输入数据会从本地磁盘读取。因为数据本地化,减少了跨网络传输的数据量,从而节省网络带宽。

任务粒度 (Task Granularity)

  1. Map 和 Reduce 阶段的划分
    • 任务数量 (M 和 R): Map 阶段被划分为 M 个任务。Reduce 阶段被划分为 R 个任务。
    • 划分原则: 理想情况下,M 和 R 的数量应该远大于工作节点的数量(即机器的数量)。
  2. 多任务划分的好处
    • 动态负载均衡: 每个工作节点可执行多个任务,这样可以动态调整任务分配,避免某些节点过载或闲置。
    • 故障恢复加速: 如果某个工作节点失败,其已完成的多个任务可以分散到其他节点重新执行,恢复速度更快。
  3. 任务划分的实际限制
    • 调度开销: 主节点需要进行 O(M + R) 次调度决策,且需要存储 O(M × R) 的状态信息。虽然每对 map/reduce 任务对仅占用约 1 字节内存,但过多任务会增加内存需求和调度复杂性。
    • 输出文件限制: R 的大小往往受到用户需求限制,因为每个 reduce 任务会生成一个独立的输出文件。输出文件过多会导致文件管理复杂。
  4. 实际任务大小选择
    • Map 阶段: 每个 map 任务通常处理 16 MB 到 64 MB 的输入数据。这样的任务大小可以充分利用数据本地化优化(即尽量从本地磁盘读取数据)。
    • Reduce 阶段: R 通常是工作节点数量的几倍,以充分利用并行能力。在一个典型的大规模 MapReduce 计算中:
      • M = 200,000(Map 阶段任务数)。
      • R = 5,000(Reduce 阶段任务数)。
      • 工作节点 = 2,000(机器数量)。

备份任务 (Backup Tasks)

  1. 什么是拖后腿的任务(Straggler Tasks)?
    • 定义: 拖后腿任务指的是 MapReduce 作业中运行速度远慢于其他任务的任务(map 或 reduce),从而延迟整个作业的完成
  2. 解决方法:
    • 备份任务机制: 当 MapReduce 计算接近完成时,主节点会为未完成的任务安排备份执行 (Backup Executions)。同一任务的多个副本在不同的工作节点上同时运行。只要其中一个副本完成,任务即被标记为完成。
    • 资源开销: 调整后的机制只增加少量(通常是几个百分点)的计算资源使用。通过备份执行,能够显著缩短总执行时间。

优化与增强 (Refinement)

分区函数 (Partitioning Function)

  1. Reduce 任务与分区
    • 用户通过设置 R 来指定需要的 reduce 任务数或输出文件数。
    • 数据在这些 reduce 任务之间分区,分区方式取决于分区函数
  2. 默认分区方式
    • 默认使用哈希函数
    • 分区规则: hash(key) mod R
    • 优势: 通常能实现较为均衡的分区(即数据均匀分布到不同 reduce 任务中)。
  3. 自定义分区方式
    • 有时默认的哈希分区不满足实际需求,需要根据特定逻辑对数据进行分区。例如:数据的键是 URL,用户希望所有来自同一主机 (host) 的条目存储在同一个输出文件中。
    • 解决方案: 用户可以定义自己的分区函数,例如:hash(Hostname(urlkey)) mod R:根据 URL 的主机名 (hostname) 分区。这样,来自同一主机的所有条目会被分配到相同的 reduce 任务中。

排序保证 (Ordering Guarantees)

  1. 排序保证
    • 在 MapReduce 的每个分区内,中间的键/值对(key/value pairs)会按照键的递增顺序 进行处理。
    • 目标: 确保每个分区的输出文件是有序的
  2. 排序的作用
    • 生成有序输出文件: 每个 reduce 任务生成的输出文件是按键排序的,直接支持有序数据的存储。
    • 支持高效随机访问: 有序数据便于通过键值实现高效的随机访问。
    • 用户便利: 用户使用这些输出文件时,通常不需要额外排序。

合并函数 (Combiner Function)

  1. 问题背景
    • 在某些情况下,中间键重复率较高,每个 map 任务可能会生成大量重复的中间键记录。示例: 在单词计数任务中(例如 <the, 1>),常见单词(如 “the”)会频繁出现。
    • 结果: 这些重复记录需要通过网络传输到同一个 reduce 任务,增加了网络负载。
  2. Combiner 函数的解决方案
    • 定义: Combiner 是一个可选的、局部的聚合函数用于在 map 任务所在机器上对中间数据进行部分合并。
    • 工作原理:
      • 执行位置: Combiner 在 map 任务的机器上运行。
      • 功能: 对重复键的中间结果进行局部汇总,减少需要传输的数据量。
      • 例如: 将 <the, 1><the, 1><the, 1> 合并为 <the, 3>
  3. Combiner 和 Reduce 的区别
    • 相同点: 通常,Combiner 的代码与 Reduce 函数的代码相同。都用于对数据进行聚合处理
    • 不同点:
      • Combiner: 输出的是中间结果,数据会继续传递给 Reduce 任务。
      • Reduce: 输出的是最终结果,数据写入最终的输出文件。
  4. 优化效果
    • 减少网络传输量: 通过提前合并数据,Combiner 显著减少了从 map 任务到 reduce 任务的数据量。例如,不传输 1000 条 <the, 1>,而是只传输 1 条 <the, 1000>
    • 提升性能: 对于重复率高的任务,Combiner 能显著加快 MapReduce 操作的速度。

输入与输出类型 (Input and Output Types)

  1. 输入数据格式的支持
    • 预定义格式:
      • 文本模式: 每行数据被视为一个键/值对。
        • 键:文件中该行的偏移量
        • 值:该行的内容
      • 排序键/值对模式 (sorted key/value mode): 存储的键/值对按键排序,便于按范围处理。
    • 自动分割范围: 每种输入格式都有分割机制,可将输入数据划分为适合 map 任务处理的范围。例如,文本模式会确保分割发生在行边界,而不是行中间,保证数据的完整性。
    • 用户自定义格式: 用户可以通过实现简单的读取接口 (reader interface),支持新的输入类型。
      • 非文件输入: 数据可以来自其他来源,如数据库或内存中的数据结构,而不一定是文件。
  2. 输出数据格式的支持
    • 类似输入格式,MapReduce 也支持多种输出格式:
      • 预定义格式: 提供了一些常用的输出格式。
      • 自定义格式: 用户可以通过实现新的接口定义输出数据格式。

跳过错误记录 (Skipping Bad Records)

  1. 问题背景
    • 用户代码缺陷: Map 或 Reduce 函数中可能存在错误(如某些记录引发崩溃)。
    • 确定性崩溃: 对特定记录,每次处理都会发生崩溃。
    • 问题影响: 这类错误可能阻止整个 MapReduce 操作完成
    • 无法修复的情况: 错误可能在第三方库中,用户无法访问源代码。
  2. MapReduce 提供的解决方案
    • 跳过问题记录: MapReduce 允许系统检测引发崩溃的记录,并跳过这些记录以继续操作。
    • 实现机制:
      • 信号处理: 每个工作节点安装信号处理器,捕获段错误 (segmentation violations)总线错误 (bus errors)
      • 记录错误序号: 在调用用户的 Map 或 Reduce 函数之前,系统将参数的序列号 (sequence number) 存储在全局变量中。
      • 发送错误报告: 如果用户代码触发错误,信号处理器会发送一个 “最后的喘息” (last gasp) UDP 数据包,包含引发错误的记录序号,通知主节点。
      • 主节点决策: 如果一条记录多次导致失败,主节点指示在下次重试该任务时跳过 (skip) 这条记录。

本地执行 (Local Execution)

  1. 分布式调试的挑战
    • 复杂性: Map 和 Reduce 函数的实际计算是在分布式系统上完成,涉及数千台机器。主节点动态分配任务,调试难以直接定位问题。
    • 常见问题: 分布式环境下的日志、任务状态和数据流使得问题排查更加困难。
  2. 本地执行模式的设计
    • 功能: MapReduce 提供了一种本地执行的替代实现,在单台机器顺序执行整个 MapReduce 操作。
    • 特点: 所有任务按顺序运行,无需分布式调度。用户可以限制计算范围,仅调试特定的 map 任务。

计数器 (Counter)

  • 计数器用于跟踪 MapReduce 操作期间特定事件的发生次数,例如:
    • 用户定义的自定义事件(例如,单词计数、检测特定模式)。
    • 系统定义的指标,如处理的输入/输出键值对数量。

计数器工作原理 (How Counters Work)

  • 传播到主节点 (Propagation to the Master): 来自各个工作节点的计数器值通过 ping 响应 发送到主节点
  • 聚合 (Aggregation):
    • 主节点聚合所有已完成任务的计数器值。
    • 它通过忽略重复的任务执行(例如,由于重新执行或备份任务)来确保没有重复计数

监控与报告 (Monitoring and Reporting)

  • 实时监控 (Real-Time Monitoring): 当前的计数器值显示在主节点状态页面 上,允许用户观察计算的进度。
  • 最终报告 (Final Reporting): 当 MapReduce 作业完成时,聚合后的计数器值返回给用户程序。

问题 (Questions)

假设 M=10 且 R=20,映射器 (mappers) 产生的文件总数是多少?

总文件数 = M × R = 10 × 20 = 200

为什么 MapReduce 将 Reduce 的输出存储在 Google 文件系统 (GFS) 中?

  • 高可用性 (High Availability): GFS 通过在多个机器上复制数据 (replicating data) 提供容错能力。这确保了即使一台机器故障,输出也不会丢失。
  • 可扩展性 (Scalability): GFS 专为处理大规模数据存储而设计,适用于 MapReduce 作业产生的大量输出。

拖后腿任务 (straggler) 的目的是什么?

  • “拖后腿任务 (Straggler)” 指的是运行缓慢的任务,通常是 map 或 reduce 任务,它们会显著延迟 MapReduce 作业的完成。
  • 解决方法:
    • 备份执行 (Backup Execution): 主节点在其它可用工作节点上为拖后腿任务安排备份执行。

判断对错:可以在没有模式 (schema) 的情况下,对 CSV 数据文件使用 SQL++。

正确 (True): SQL++ 可以操作半结构化数据,包括 CSV 文件,而不需要预定义的模式。

在 SQL++ 中,pivot 和 unpivot 有什么区别?

Pivot (透视):

  • 目的: 将行 (rows) 转换为属性 (attributes) / 列 (columns)
  • 示例:
    • 输入: [ { "symbol": "amzn", "price": 1900 }, { "symbol": "goog", "price": 1120 }, { "symbol": "fb", "price": 180 } ]
    • 查询: PIVOT sp.price AT sp.symbol FROM today_stock_prices sp;
    • 输出: { "amzn": 1900, "goog": 1120, "fb": 180 }

Unpivot (逆透视):

  • 目的: 将属性 (attributes) / 列 (columns) 转换为行 (rows)
  • 示例:
    • 输入: { "date": "4/1/2019", "amzn": 1900, "goog": 1120, "fb": 180 }
    • 查询: UNPIVOT c AS price AT sym FROM closing_prices c WHERE sym != 'date';
    • 输出: [ { "date": "4/1/2019", "symbol": "amzn", "price": 1900 }, { "date": "4/1/2019", "symbol": "goog", "price": 1120 }, { "date": "4/1/2019", "symbol": "fb", "price": 180 } ]

使用 BG ,可以通过其 SoAR (Satisfaction of Agreement Ratio) 来总结数据存储的性能。计算数据存储 SoAR 的 BG 输入是什么?

1. SLA 规范 (SLA Specifications)

服务等级协议 (SLA) 定义了计算 SoAR 的条件。SLA 包括:

  • α: 必须观察到响应时间小于或等于 β 的请求百分比(例如,95%)。
  • β: 最大可接受响应时间(例如,100 毫秒)。
  • τ: 观察到不可预测(过时或不一致)数据的请求的最大允许百分比(例如,0.01%)。
  • Δ: SLA 必须被满足的持续时间(例如,10 分钟)。

2. 数据库配置 (Database Configuration)

关于被测数据存储的详细信息:

  • 逻辑模式 (Logical Schema): 数据存储使用的数据模型(例如,关系模式、NoSQL 的类 JSON 模式)。
  • 物理设置 (Physical Setup): 硬件配置,包括:
    • 节点数量。
    • 存储和内存资源。
    • 网络能力。
  • 数据量大小 (Population Size):
    • M: 数据库中的成员数量。
    • ϕ: 每个成员的关注者/朋友数量。
    • ρ: 每个成员的资源数量。

3. 工作负载参数 (Workload Parameters)

工作负载指定了 BG 将模拟的操作的性质和强度:

  • 操作混合比例 (Mix of Actions):
    • 社交网络操作的类型(例如,查看个人资料、列出朋友、查看好友请求)。
    • 每种操作类型的百分比(读密集型、写密集型或混合工作负载)。
  • 思考时间 (ϵ - Think Time): 单个线程执行连续操作之间的延迟。
  • 到达间隔时间 (ψ - Inter-Arrival Time): 新用户会话之间的延迟。

4. 环境参数 (Environmental Parameters)

关于 BG 如何生成和管理工作负载的详细信息:

  • BGClients 数量 (N): 负责生成请求的实例数。
  • 线程数量 (T): 并发级别(每个 BGClient 的线程数)。
  • D-Zipfian 分布参数 (θ): 定义访问模式(例如,热门数据与冷门数据的访问频率)。

考虑键值对优先级 (priority) 的以下二进制表示:00101001。其精度为 4 的 CAMP 舍入 (CAMP rounding) 结果是什么?

00101000
img

什么是惊群效应 (thundering herd)?IQ 框架如何防止它导致持久化数据存储成为瓶颈?

惊群效应问题 (Thundering Herd Problem):

  • 当一个键值对在键值存储 (KVS) 中未找到(发生 KVS 未命中 (KVS miss))时,多个读取会话可能会同时查询关系数据库管理系统 (RDBMS) 以获取该值。
  • 这可能在高并发情况下使 RDBMS 过载并导致性能下降。

IQ 框架的解决方案:

  • 第一个读取会话遇到 KVS 未命中时,它会为该键请求一个 I 租约 (I lease)
  • 一旦 I 租约被授予,KVS 会阻止其他读取会话为同一个键查询 RDBMS。
  • 所有其他读取会话必须 “回退 (back off)” 并等待持有 I 租约的会话将值更新到 KVS 中。

(补充解释) 惊群效应发生在特定键经历大量读写活动时。

  • 写入操作重复地使缓存失效 (invalidate the cache)
  • 所有读取操作都被迫查询数据库

I 租约解决了这个问题

  • 对特定键的第一次读取被授予 I 租约。
  • 所有其他读取观察到未命中并回退
  • 持有 I 租约的读取查询 RDBMS,计算缺失的值,并将该值填充 (populate) 到缓存中。
  • 所有其他读取随后会观察到缓存命中 (cache hit)

参考: https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf