编程模型 (Programming Model)
输入与输出 (Input and Output):
- 接收一组输入键/值对 (input key/value pairs)。
- 生成一组输出键/值对 (output key/value pairs)。
用户自定义函数 (User-Defined Functions):
- 映射函数 (Map Function):
- 由用户编写。
- 处理每个输入键/值对。
- 生成一组中间键/值对 (intermediate key/value pairs)。
- 归约函数 (Reduce Function):
- 同样由用户编写。
- 接收一个中间键
I
及其关联的值集合 (set of values)。 - 合并这些值以产生一个更小集合 (smaller set) 的输出值,通常为零个或一个值。
中间数据处理 (Intermediate Data Handling):
- MapReduce 库 (MapReduce library) 将中间值按其键 (
I
) 分组,并将它们发送给归约函数。 - 中间值通过迭代器 (iterator) 提供给归约函数,从而能够高效处理因数据量过大而无法全部放入内存的数据集。
容错性与可扩展性 (Fault Tolerance and Scalability):
- 通过将任务分解成更小的独立计算单元,MapReduce 确保了即使在大型分布式环境中也能实现可扩展性和容错性。
实现 (Implementation)
数据分割与任务分配 (Data Splitting and Task Assignment):
- 输入数据划分: MapReduce 库自动将输入文件分割成 M 个片段(通常每个片段大小为 16MB 到 64MB,可由用户控制)。
- 启动程序实例: 在集群中启动多个程序副本。
- 角色分配: 其中一个程序实例被指定为主节点 (master),其余的作为工作节点 (workers)。
任务调度 (Task Scheduling):
- 主节点的职责: 主节点负责管理 M 个 map 任务和 R 个 reduce 任务。
- 任务分配: 主节点将空闲的工作节点分配给 map 任务或 reduce 任务。
Map 阶段 (Map Phase):
- 读取数据: 被分配 map 任务的工作节点读取对应的输入片段。
- 处理数据: 解析出键/值对,并将其传递给用户定义的 Map 函数。
- 生成中间结果: Map 函数产生的中间键/值对会存储在本地磁盘中。
中间数据处理 (Intermediate Data Processing):
- 写入本地磁盘: **缓存的中间结果会定期写入本地磁盘,并根据分区函数划分为 R 个区域 (partitioned into R regions)。
- 通知主节点: 工作节点将这些中间数据的位置告知主节点,主节点负责将这些信息传递给 reduce 工作节点。
Reduce 阶段准备 (Reduce Phase Preparation):
- 读取中间数据: reduce 工作节点收到主节点的通知后,通过远程过程调用 (RPC - Remote Procedure Call) 从 map 工作节点的本地磁盘读取中间数据。
- 排序数据: reduce 工作节点将所有中间数据按键排序,以确保相同的键聚集在一起。如果数据量过大,无法全部加载到内存,会采用外部排序 (external sort)。
Reduce 阶段 (Reduce Phase):
- 执行 Reduce 函数: reduce 工作节点遍历排序后的中间数据,对于每个唯一的中间键,将键和对应的值列表传递给用户定义的 Reduce 函数。
- 生成最终输出: Reduce 函数的输出被追加到该 reduce 分区的最终输出文件中。
任务完成与结果返回 (Task Completion and Result Retrieval):
- 任务监控: 当所有的 map 和 reduce 任务都完成后,主节点会唤醒用户程序。
- 返回结果: 此时,用户程序中的 MapReduce 调用返回,用户可以获取 R 个输出文件(每个 reduce 任务对应一个输出文件)。
额外说明 (Additional Notes):
- 数据处理链: 通常用户不需要将这 R 个输出文件合并成一个文件,因为这些文件可以直接作为下一个 MapReduce 调用的输入,或者被能够处理多文件输入的分布式应用程序使用。
- 流程图参考: 上图👆用于展示 MapReduce 操作的整体流程(对应上述 7 个步骤)。
主节点数据结构 (Master Data Structure)
任务状态跟踪 (Task State Tracking):
- 对于每个 map 和 reduce 任务,主节点存储:
- 状态:
idle
(空闲): 任务尚未分配。in-progress
(执行中): 任务正在被执行。completed
(已完成): 任务执行完毕。
- 工作节点标识 (Worker Identity): 处理该任务的工作机器(针对非空闲任务)。
- 状态:
中间数据管理 (Intermediate Data Management):
- 主节点充当将中间数据从 map 任务传递到 reduce 任务的管道 (conduit)。
- 对于每个已完成的 map 任务:
- 它记录所生成的
R
个中间文件区域的位置 和大小。 - 这些数据对于 reduce 任务从相应的 map 工作节点获取中间结果至关重要。
- 它记录所生成的
动态更新 (Dynamic Updates):
- 随着 map 任务完成,主节点持续更新其记录的中间文件位置和大小。
- 这些更新会增量式地推送给当前正在执行中的 reduce 工作节点。
容错机制 (Fault Tolerance)
工作节点故障 (Worker Failure)
- 故障检测 (Failure Detection):
- 主节点定期向每个工作节点发送 ping。
- 如果工作节点在特定时间窗口内未响应,主节点将其标记为故障。
- 任务重新调度 (Task Rescheduling):
- Map 任务 (Map Tasks):
- 已完成的 Map 任务 (Completed Map Tasks):
- 如果故障工作节点已完成 map 任务,其输出将变得不可访问(存储在故障机器的本地磁盘上)。
- 这些任务被重置为空闲状态 (idle state) 并在其他工作节点上重新执行。
- 执行中的 Map 任务 (In-Progress Map Tasks):
- 类似地,执行中的任务被标记为空闲并重新分配给可用的工作节点。
- 已完成的 Map 任务 (Completed Map Tasks):
- Reduce 任务 (Reduce Tasks):
- 已完成的 Reduce 任务 (Completed Reduce Tasks):
- 这些任务不需要重新执行,因为它们的输出存储在全局文件系统 (global file system) 中,即使发生故障也仍然可访问。
- 执行中的 Reduce 任务 (In-Progress Reduce Tasks):
- 如果某些 reduce 工作节点尚未读取中间数据,它们会从新的(重新执行的 map 任务的)结果中读取数据。
- 已完成的 Reduce 任务 (Completed Reduce Tasks):
- 数据协调 (Data Coordination):
- 当 map 任务在新的工作节点上重新执行时:
- 通知 (Notification): 所有 reduce 工作节点会被告知该重新执行。
- 数据重定向 (Data Redirection): 尚未从故障工作节点获取中间数据的 reduce 工作节点将改为从新的工作节点获取数据。
- 当 map 任务在新的工作节点上重新执行时:
- Map 任务 (Map Tasks):
主节点故障 (Master Failure)
- 可以很方便地让主节点定期将上述主节点数据结构写入检查点 (checkpoints)。如果主节点任务终止,可以从最后一个检查点状态启动一个新的副本。
- 然而,考虑到只有一个主节点,其故障的可能性很低;因此我们当前的实现在主节点故障时会中止 MapReduce 计算。 客户端可以检测到此情况,并在需要时重试 MapReduce 操作。
数据本地化 (Locality)
- 存储设计: 数据存储在 Google 文件系统 (GFS - Google File System) 中。GFS 将每个文件分割为 64 MB 的块 (blocks),并在不同的机器上保存多个副本(通常是 3 个)。
- 任务调度优先级:
- 优先本地化调度: 主节点优先将 map 任务分配给包含对应数据块副本的同一台机器上的工作节点。
- 次优调度: 如果本地调度不可行(例如,拥有数据块副本的工作节点繁忙),主节点将任务分配给靠近副本的机器,例如同一机架 (rack) 或数据中心 (data center) 内的机器。
- 实际效果: 在运行大型 MapReduce 操作时,大部分输入数据会从本地磁盘读取。因为数据本地化,减少了跨网络传输的数据量,从而节省网络带宽。
任务粒度 (Task Granularity)
- Map 和 Reduce 阶段的划分
- 任务数量 (M 和 R): Map 阶段被划分为 M 个任务。Reduce 阶段被划分为 R 个任务。
- 划分原则: 理想情况下,M 和 R 的数量应该远大于工作节点的数量(即机器的数量)。
- 多任务划分的好处
- 动态负载均衡: 每个工作节点可执行多个任务,这样可以动态调整任务分配,避免某些节点过载或闲置。
- 故障恢复加速: 如果某个工作节点失败,其已完成的多个任务可以分散到其他节点重新执行,恢复速度更快。
- 任务划分的实际限制
- 调度开销: 主节点需要进行 O(M + R) 次调度决策,且需要存储 O(M × R) 的状态信息。虽然每对 map/reduce 任务对仅占用约 1 字节内存,但过多任务会增加内存需求和调度复杂性。
- 输出文件限制: R 的大小往往受到用户需求限制,因为每个 reduce 任务会生成一个独立的输出文件。输出文件过多会导致文件管理复杂。
- 实际任务大小选择
- Map 阶段: 每个 map 任务通常处理 16 MB 到 64 MB 的输入数据。这样的任务大小可以充分利用数据本地化优化(即尽量从本地磁盘读取数据)。
- Reduce 阶段: R 通常是工作节点数量的几倍,以充分利用并行能力。在一个典型的大规模 MapReduce 计算中:
- M = 200,000(Map 阶段任务数)。
- R = 5,000(Reduce 阶段任务数)。
- 工作节点 = 2,000(机器数量)。
备份任务 (Backup Tasks)
- 什么是拖后腿的任务(Straggler Tasks)?
- 定义: 拖后腿任务指的是 MapReduce 作业中运行速度远慢于其他任务的任务(map 或 reduce),从而延迟整个作业的完成。
- 解决方法:
- 备份任务机制: 当 MapReduce 计算接近完成时,主节点会为未完成的任务安排备份执行 (Backup Executions)。同一任务的多个副本在不同的工作节点上同时运行。只要其中一个副本完成,任务即被标记为完成。
- 资源开销: 调整后的机制只增加少量(通常是几个百分点)的计算资源使用。通过备份执行,能够显著缩短总执行时间。
优化与增强 (Refinement)
分区函数 (Partitioning Function)
- Reduce 任务与分区
- 用户通过设置 R 来指定需要的 reduce 任务数或输出文件数。
- 数据在这些 reduce 任务之间分区,分区方式取决于分区函数。
- 默认分区方式
- 默认使用哈希函数。
- 分区规则:
hash(key) mod R
。 - 优势: 通常能实现较为均衡的分区(即数据均匀分布到不同 reduce 任务中)。
- 自定义分区方式
- 有时默认的哈希分区不满足实际需求,需要根据特定逻辑对数据进行分区。例如:数据的键是 URL,用户希望所有来自同一主机 (host) 的条目存储在同一个输出文件中。
- 解决方案: 用户可以定义自己的分区函数,例如:
hash(Hostname(urlkey)) mod R
:根据 URL 的主机名 (hostname) 分区。这样,来自同一主机的所有条目会被分配到相同的 reduce 任务中。
排序保证 (Ordering Guarantees)
- 排序保证
- 在 MapReduce 的每个分区内,中间的键/值对(key/value pairs)会按照键的递增顺序 进行处理。
- 目标: 确保每个分区的输出文件是有序的。
- 排序的作用
- 生成有序输出文件: 每个 reduce 任务生成的输出文件是按键排序的,直接支持有序数据的存储。
- 支持高效随机访问: 有序数据便于通过键值实现高效的随机访问。
- 用户便利: 用户使用这些输出文件时,通常不需要额外排序。
合并函数 (Combiner Function)
- 问题背景
- 在某些情况下,中间键重复率较高,每个 map 任务可能会生成大量重复的中间键记录。示例: 在单词计数任务中(例如
<the, 1>
),常见单词(如 “the”)会频繁出现。 - 结果: 这些重复记录需要通过网络传输到同一个 reduce 任务,增加了网络负载。
- 在某些情况下,中间键重复率较高,每个 map 任务可能会生成大量重复的中间键记录。示例: 在单词计数任务中(例如
- Combiner 函数的解决方案
- 定义: Combiner 是一个可选的、局部的聚合函数,用于在 map 任务所在机器上对中间数据进行部分合并。
- 工作原理:
- 执行位置: Combiner 在 map 任务的机器上运行。
- 功能: 对重复键的中间结果进行局部汇总,减少需要传输的数据量。
- 例如: 将
<the, 1>
、<the, 1>
、<the, 1>
合并为<the, 3>
。
- Combiner 和 Reduce 的区别
- 相同点: 通常,Combiner 的代码与 Reduce 函数的代码相同。都用于对数据进行聚合处理。
- 不同点:
- Combiner: 输出的是中间结果,数据会继续传递给 Reduce 任务。
- Reduce: 输出的是最终结果,数据写入最终的输出文件。
- 优化效果
- 减少网络传输量: 通过提前合并数据,Combiner 显著减少了从 map 任务到 reduce 任务的数据量。例如,不传输 1000 条
<the, 1>
,而是只传输 1 条<the, 1000>
。 - 提升性能: 对于重复率高的任务,Combiner 能显著加快 MapReduce 操作的速度。
- 减少网络传输量: 通过提前合并数据,Combiner 显著减少了从 map 任务到 reduce 任务的数据量。例如,不传输 1000 条
输入与输出类型 (Input and Output Types)
- 输入数据格式的支持
- 预定义格式:
- 文本模式: 每行数据被视为一个键/值对。
- 键:文件中该行的偏移量。
- 值:该行的内容。
- 排序键/值对模式 (sorted key/value mode): 存储的键/值对按键排序,便于按范围处理。
- 文本模式: 每行数据被视为一个键/值对。
- 自动分割范围: 每种输入格式都有分割机制,可将输入数据划分为适合 map 任务处理的范围。例如,文本模式会确保分割发生在行边界,而不是行中间,保证数据的完整性。
- 用户自定义格式: 用户可以通过实现简单的读取接口 (reader interface),支持新的输入类型。
- 非文件输入: 数据可以来自其他来源,如数据库或内存中的数据结构,而不一定是文件。
- 预定义格式:
- 输出数据格式的支持
- 类似输入格式,MapReduce 也支持多种输出格式:
- 预定义格式: 提供了一些常用的输出格式。
- 自定义格式: 用户可以通过实现新的接口定义输出数据格式。
- 类似输入格式,MapReduce 也支持多种输出格式:
跳过错误记录 (Skipping Bad Records)
- 问题背景
- 用户代码缺陷: Map 或 Reduce 函数中可能存在错误(如某些记录引发崩溃)。
- 确定性崩溃: 对特定记录,每次处理都会发生崩溃。
- 问题影响: 这类错误可能阻止整个 MapReduce 操作完成。
- 无法修复的情况: 错误可能在第三方库中,用户无法访问源代码。
- MapReduce 提供的解决方案
- 跳过问题记录: MapReduce 允许系统检测引发崩溃的记录,并跳过这些记录以继续操作。
- 实现机制:
- 信号处理: 每个工作节点安装信号处理器,捕获段错误 (segmentation violations) 和总线错误 (bus errors)。
- 记录错误序号: 在调用用户的 Map 或 Reduce 函数之前,系统将参数的序列号 (sequence number) 存储在全局变量中。
- 发送错误报告: 如果用户代码触发错误,信号处理器会发送一个 “最后的喘息” (last gasp) UDP 数据包,包含引发错误的记录序号,通知主节点。
- 主节点决策: 如果一条记录多次导致失败,主节点指示在下次重试该任务时跳过 (skip) 这条记录。
本地执行 (Local Execution)
- 分布式调试的挑战
- 复杂性: Map 和 Reduce 函数的实际计算是在分布式系统上完成,涉及数千台机器。主节点动态分配任务,调试难以直接定位问题。
- 常见问题: 分布式环境下的日志、任务状态和数据流使得问题排查更加困难。
- 本地执行模式的设计
- 功能: MapReduce 提供了一种本地执行的替代实现,在单台机器上顺序执行整个 MapReduce 操作。
- 特点: 所有任务按顺序运行,无需分布式调度。用户可以限制计算范围,仅调试特定的 map 任务。
计数器 (Counter)
- 计数器用于跟踪 MapReduce 操作期间特定事件的发生次数,例如:
- 用户定义的自定义事件(例如,单词计数、检测特定模式)。
- 系统定义的指标,如处理的输入/输出键值对数量。
计数器工作原理 (How Counters Work)
- 传播到主节点 (Propagation to the Master): 来自各个工作节点的计数器值通过 ping 响应 发送到主节点。
- 聚合 (Aggregation):
- 主节点聚合所有已完成任务的计数器值。
- 它通过忽略重复的任务执行(例如,由于重新执行或备份任务)来确保没有重复计数。
监控与报告 (Monitoring and Reporting)
- 实时监控 (Real-Time Monitoring): 当前的计数器值显示在主节点状态页面 上,允许用户观察计算的进度。
- 最终报告 (Final Reporting): 当 MapReduce 作业完成时,聚合后的计数器值返回给用户程序。
问题 (Questions)
假设 M=10 且 R=20,映射器 (mappers) 产生的文件总数是多少?
总文件数 = M × R = 10 × 20 = 200
为什么 MapReduce 将 Reduce 的输出存储在 Google 文件系统 (GFS) 中?
- 高可用性 (High Availability): GFS 通过在多个机器上复制数据 (replicating data) 提供容错能力。这确保了即使一台机器故障,输出也不会丢失。
- 可扩展性 (Scalability): GFS 专为处理大规模数据存储而设计,适用于 MapReduce 作业产生的大量输出。
拖后腿任务 (straggler) 的目的是什么?
- “拖后腿任务 (Straggler)” 指的是运行缓慢的任务,通常是 map 或 reduce 任务,它们会显著延迟 MapReduce 作业的完成。
- 解决方法:
- 备份执行 (Backup Execution): 主节点在其它可用工作节点上为拖后腿任务安排备份执行。
判断对错:可以在没有模式 (schema) 的情况下,对 CSV 数据文件使用 SQL++。
正确 (True): SQL++ 可以操作半结构化数据,包括 CSV 文件,而不需要预定义的模式。
在 SQL++ 中,pivot 和 unpivot 有什么区别?
Pivot (透视):
- 目的: 将行 (rows) 转换为属性 (attributes) / 列 (columns)。
- 示例:
- 输入:
[ { "symbol": "amzn", "price": 1900 }, { "symbol": "goog", "price": 1120 }, { "symbol": "fb", "price": 180 } ]
- 查询:
PIVOT sp.price AT sp.symbol FROM today_stock_prices sp;
- 输出:
{ "amzn": 1900, "goog": 1120, "fb": 180 }
Unpivot (逆透视):
- 目的: 将属性 (attributes) / 列 (columns) 转换为行 (rows)。
- 示例:
- 输入:
{ "date": "4/1/2019", "amzn": 1900, "goog": 1120, "fb": 180 }
- 查询:
UNPIVOT c AS price AT sym FROM closing_prices c WHERE sym != 'date';
- 输出:
[ { "date": "4/1/2019", "symbol": "amzn", "price": 1900 }, { "date": "4/1/2019", "symbol": "goog", "price": 1120 }, { "date": "4/1/2019", "symbol": "fb", "price": 180 } ]
使用 BG ,可以通过其 SoAR (Satisfaction of Agreement Ratio) 来总结数据存储的性能。计算数据存储 SoAR 的 BG 输入是什么?
1. SLA 规范 (SLA Specifications)
服务等级协议 (SLA) 定义了计算 SoAR 的条件。SLA 包括:
- α: 必须观察到响应时间小于或等于 β 的请求百分比(例如,95%)。
- β: 最大可接受响应时间(例如,100 毫秒)。
- τ: 观察到不可预测(过时或不一致)数据的请求的最大允许百分比(例如,0.01%)。
- Δ: SLA 必须被满足的持续时间(例如,10 分钟)。
2. 数据库配置 (Database Configuration)
关于被测数据存储的详细信息:
- 逻辑模式 (Logical Schema): 数据存储使用的数据模型(例如,关系模式、NoSQL 的类 JSON 模式)。
- 物理设置 (Physical Setup): 硬件配置,包括:
- 节点数量。
- 存储和内存资源。
- 网络能力。
- 数据量大小 (Population Size):
- M: 数据库中的成员数量。
- ϕ: 每个成员的关注者/朋友数量。
- ρ: 每个成员的资源数量。
3. 工作负载参数 (Workload Parameters)
工作负载指定了 BG 将模拟的操作的性质和强度:
- 操作混合比例 (Mix of Actions):
- 社交网络操作的类型(例如,查看个人资料、列出朋友、查看好友请求)。
- 每种操作类型的百分比(读密集型、写密集型或混合工作负载)。
- 思考时间 (ϵ - Think Time): 单个线程执行连续操作之间的延迟。
- 到达间隔时间 (ψ - Inter-Arrival Time): 新用户会话之间的延迟。
4. 环境参数 (Environmental Parameters)
关于 BG 如何生成和管理工作负载的详细信息:
- BGClients 数量 (N): 负责生成请求的实例数。
- 线程数量 (T): 并发级别(每个 BGClient 的线程数)。
- D-Zipfian 分布参数 (θ): 定义访问模式(例如,热门数据与冷门数据的访问频率)。
考虑键值对优先级 (priority) 的以下二进制表示:00101001。其精度为 4 的 CAMP 舍入 (CAMP rounding) 结果是什么?
00101000
什么是惊群效应 (thundering herd)?IQ 框架如何防止它导致持久化数据存储成为瓶颈?
惊群效应问题 (Thundering Herd Problem):
- 当一个键值对在键值存储 (KVS) 中未找到(发生 KVS 未命中 (KVS miss))时,多个读取会话可能会同时查询关系数据库管理系统 (RDBMS) 以获取该值。
- 这可能在高并发情况下使 RDBMS 过载并导致性能下降。
IQ 框架的解决方案:
- 当第一个读取会话遇到 KVS 未命中时,它会为该键请求一个 I 租约 (I lease)。
- 一旦 I 租约被授予,KVS 会阻止其他读取会话为同一个键查询 RDBMS。
- 所有其他读取会话必须 “回退 (back off)” 并等待持有 I 租约的会话将值更新到 KVS 中。
(补充解释) 惊群效应发生在特定键经历大量读写活动时。
- 写入操作重复地使缓存失效 (invalidate the cache)。
- 所有读取操作都被迫查询数据库。
I 租约解决了这个问题:
- 对特定键的第一次读取被授予 I 租约。
- 所有其他读取观察到未命中并回退。
- 持有 I 租约的读取查询 RDBMS,计算缺失的值,并将该值填充 (populate) 到缓存中。
- 所有其他读取随后会观察到缓存命中 (cache hit)。
参考: https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf