这本书写的还是很有诚意的,读完后非常惊叹于作者知识的广博,这本书也不仅仅是综述,作者在某些章节也提出了一些一些自己的观点。总之,值得推荐一读。
从社会发展的角度,很明显大数据会是目前肉眼可及的范围里能看到的最大的趋势之一。我们不能盲目跟风不断追逐切换热点,但是忽视趋势的力量同样也不是一个理性选择。多年以后,当大数据应用已经无处不在地影响我们的生活的时候,我们准会想起那个面红耳赤讨论大数据泡沫合适破灭的下午。
海量的数据可广泛获得,所稀缺的是如何从中挖掘出智慧观点。
数据分片与路由
数据分片与数据路由是密不可分的两个概念,对于海量数据,通过数据分片实现系统的水平拓展,而通过数据复制来保证数据的高可用性。
分片的两种主要方式:哈希分片和范围分片。对于哈希分片,主要通过哈希函数来简历 key-partition 的关系,所以只支持点查询,无法支持范围查询。
哈希分片
主要有三种方法:
- Round Robin / 哈希取模法
- 虚拟桶:虚拟桶层位于存储记录和物理机之间。所有记录首先通过哈希函数映射到对应的虚拟桶,记录和虚拟桶是多对一的映射关系,即一个虚拟桶包含多条记录信息;第二层映射是虚拟桶和物理机之间的映射关系,同样也是多对一映射,一个物理机可以容纳多个虚拟桶,其具体实现方式是通过查表来实现的。
- 一致性哈希
范围分片
范围分片首先将所有的记录进行排序,然后在排好序的主键空间内将记录划分成数据分片,每个数据分片存储有序的主键空间内的所有记录。在实现具体存储系统时,往往保持一个数据分片的映射表(BigTable 中,映射表被组织成了类似 B+ 树的层次结构,这样可容纳的数据分片个数获得极大提升),记录表每一项记载数据分片的最小主键及其对应的物理机地址。
数据复制和一致性
基本原则
CAP
一般认为,传统的关系型数据库在三要素中选择 CA
两个因素,即强一致性、高可用性,但是可拓展性和容错性差。而 NoSQL
系统往往更关注 AP
因素,即高可用和高拓展,但是往往以弱一致性为代价,这与现实世界应用密不可分,因为高可用直接涉及用户体验。
Eric Brewer 指出,实践过程中应用 CAP
理论时不得不在三要素中选择两个具有误导性:
- 网络分区(
P
)出现的概率很小,不应该为了容忍这种小概率事件而在设计之初九选择放弃A
或者C
; - 在
AC
之间做取舍的时候,不应该是粗粒度地在整个系统级别进行取舍,而是应该考虑系统中存在不同的子系统,甚至应该在不同的系统运行时或不同的数据间进行灵活的差异化细粒度取舍; CAP
三者并非是绝对二元式地有或没有,而是应该将其看作连续变量,即可以看作在一定程度上的有或者没有,比如可用性指标中(A
)延时长度多少算可用。
ACID
即原子性、一致性、隔离性、持久性。
需要注意的是 ACID
中的 C
和 CAP
中的 C
具有不同的含义,CAP
中的 C
指的是多副本的数据一致性,ACID
中的 C
应理解为一致性约束条件。CAP
中的 C
是 ACID
中的 C
所涵盖语义的子集。
BASE
- 基本可用(Basically Available):绝大多数时间可用,运行失败;
- 软状态后者柔性状态(Soft State):数据状态不邀请在任意时刻都完全保持同步,即可以处于有状态(State)和无状态(Stateless)之间。
- 最终一致性(Eventual Consistency):在给定时间窗口内数据会达到一致,属于弱一致性。
BASE
原则和 ACID
原则不同,前者牺牲了强一致性来达到高可用性,同时 NoSQL 和云存储系统在发展过程中正在向逐步提供局部 ACID
特性发展,即全局满足 BASE
原则,但是局部支持 ACID
原则。
CAP/BASE/ACID的关系
ACID
和 BASE
原则是在明确提出 CAP
理论之前关于如何对待可用性和强一致性的两种不同的设计哲学。ACID
更强调数据的一致性,这是传统数据库设计的思路,BASE
更强调可用性,弱化数据一致性的概念。
一致性模型分类
副本更新策略
- 同时更新
- 主从式更新
- 同步方式:主副本等所有从副本更新成功才确认操作完成,缺点是存在请求时延;
- 异步方式:在从副本更新前即可确认操作完成,同时记录更新操作记录,避免主副本崩溃;
- 如果所有读请求都走主副本(eg. Chubby):一致性可得到保障但是请求时延增加;
- 如果读请求可以走从副本(eg. ZooKeeper):会存在读结果不一致的问题。
- 混合方式:主副本更新部分从副本数据,然后即可以确认更新操作完成,Kafka 在维护数据副本一致性时即采取此种方案。
- 任何节点更新
一致性协议
- 两阶段提交(
2PC
) - 三阶段提交(
3PC
) - 向量时钟
RWN
Paxos
Raft
一些注意:
- 两阶段协议更多是作为实现数据更新原子性手段出现,比如
Raft
中就在内部使用了两阶段提交协议来保证信息更新的原则性;在具体实施2PC
时,需要协调者和参与者将自身状态或者消息写入本地 Log 文件,这样即便是崩溃了,在重新启动时也可以根据 Log 进行状态恢复; 3PC
将2PC
中的提交阶段再次细分为预提交和提交阶段,但是3PC
在实际系统中很少使用,一方面是2PC
中的阻塞情况很少发生,其次是 3PC 的效率过低;Paxos
协议在实现时需要考虑很多问题,如如何保证不同进程所采纳的倡议编号全局唯一、且递增增长、异常处理、状态持久化等,这些在不同的系统实现时都可能采取各异的策略;Paxos
的难理解性在于是什么因素导致协议以此种方式呈现以及其正确性证明过程,而非最终协议内容本身;- Raft 协议的目标有两个:首先是可理解性(相较于
Paxos
);其次是实现实际系统的确定性,保证每个技术细节的清晰界定与描述。整个协议分为:Leader 选举,Log 复制与安全性三部分。 Raft
协议将Paxos
的 P2P 模式改造成了 Master-slave 模式,Paxos
的复杂性很大原因是由于其完全的 P2P 模式决定的,即多个并发进程之间无主次关系。Raft
的Master-Slave
模式一定程度上简化了一致性维护问题,Log 的一致性维护以及安全性要求都由 Leader 来完成。
大数据常用算法与数据结构
布隆过滤器
具有很好的空间和时间效率,空间效率极高,常被用来检测某个元素是否是巨量数据集合的成员。存在误判,但不会漏判。
把布隆过滤器中的一个比特位拓展为多个比特,信息位可从只能保存 0/1 被拓展到可进行计数,从而就解决了原生布隆过滤器无法删除子成员的问题。
SkipList
SkipList 依靠随机生成树以一定概率来保持数据的平衡分布,最坏情况下其效率要低于平衡树,插入、删除、查找数据的时间复杂度均为 O(log(N))
,被用在很多大数据系统中,维护有序列表高效读/写,如 Redis
中的 Sorted Set
。
LSM 树
LSM(Log-structured Merge-tree) 树的本质是将大量的随机写转换为批量的序列写,非常适合对写效率有高要求的应用场景,比如 BigTable 中的单机数据存储引擎本质上就是 LSM树。
Merkel 哈希树
Merkel Hash Tree 中的叶子节点存储数据项或者数据项的Hash值,中间节点存储其所有子节点 Hash 值的 Hash 值,直至根节点。主要用来在海量数据下快速定位少量变化的数据内容,比如比特币中用 Merkel 树对交易进行验证。
Snappy 和 LZSS 算法
LZSS是一种动态词典编码,词典编码的思想是:文本中的词用它在词典中表示位置的号码代替的无损数据压缩方法,一般分为静态词典和动态词典方法。
Snappy 是基于 LZSS 压缩方案的,其目标并不是最高的数据压缩率,而是在合理的压缩率的基础上追求尽可能快的压缩和解压缩速度,同时,Snappy 相较于其他压缩方案占用 CPU 时间更少。
Cuckoo 哈希
Cuckoo Hash(布谷鸟散列)是为了解决哈希冲突问题而提出,利用较少的计算换取较大的空间。Cuckoo Hash 使用多个哈希函数进行映射:
使用 HashA、HashB 计算对应的 keyA 位置:
- 两个位置均为空,则任选一个插入 keyA;
- 两个位置中一个为空,则插入到空的那个位置;
- 两个位置均不为空,则踢出一个位置后插入keyA,被踢出的 keyB 再执行该算法找其另一个位置,循环直到插入成功;
- 如果被踢出的次数达到一定的阈值,则认为哈希表已满,进行扩容操作;
对 Cuckoo Hash 的优化一般有增加哈希函数个数和增加桶中槽(Slot)的个数两种,这两种变体都是为了提高哈希空间的桶利用率。
集群资源管理和调度
静态资源划分:将集群中所有的资源做出静态划分,将划分后的固定硬件资源指定给固定的计算框架用,各个框架之间各行其是,互不干扰。静态资源划分的优点是简便易行,缺点是资源的整体利用率不高,经常会出现集群中有些计算系统资源不足,但是有些计算系统存在大量闲置资源的情况。
使用独立的资源管理和调度系统的优点:
- 集群的整体资源利用率高
- 可增加数据共享能力
调度系统解决的问题:
- 资源异质性和工作负载的异质性:可以简单理解为服务的功能特性各异和机器的配置不同;
- 数据局部性:移送计算代码到数据所在地,而不是移动数据到计算任务所在地。节点局部性(Node Locality) 优于 机架局部性(Rack Locality) 优于 全局局部性(Global Locality)。
- 抢占式和非抢占式调度;
- 资源分配粒度:群体分配、全分或不分等;
- 饿死和死锁问题:饿死指的计算任务长时间无法获得开始执行所需要的最小资源导致一直处于等待的状态;死锁指的是由于资源调度不当导致整个调度系统无法继续正常执行。
- 资源隔离方法:Linux容器(Linux Container,LXC),轻量级的内核虚拟化技术。
资源管理与调度系统范型
- 集中式调度器(Monolithic Scheduler):整个系统中只运行一个全局的中央调度器实例,所有之上的框架或计算任务的资源请求全部由中央调度器来满足。
- 两级调度器(Two-Level Scheduler):整个系统的调度工作可以分为中央调度器和框架调度器两个级别,例如 Mesos;
- 中央调度器:可以看到集群中所有机器的可用资源并管理其状态,它可以按照一定策略将集群中的所有资源分配给各个计算框架,中央调度器级别的资源调度是一种粗粒度的资源调度方式;
- 框架调度器:计算框架在接收到所需资源后,可以根据自身计算任务的特性,使用自身的调度策略来进一步细粒度地分配从中央调度器获得的各种资源。
- 状态共享调度器(Shared-State Scheduler):每个计算框架可以看到整个集群中的所有资源,并采用相互竞争(乐观并发控制)的方式去获取自己所需的资源,例如 Google Omega。
本书成书于2016年,因此近期在资源调度领域一统天下的 kubernetes 在本书中并未提及。因此本章
分布式协调系统
协调的内容?
- 动态配置
- 服务发现
- 动态选主
- 健康检查
- 分布式锁
- 路障同步
- ……
Chubby 服务
Chubby 是针对分布式系统协调管理的粗粒度锁服务,所谓的粗粒度指的是持有的锁时间较长,反之如果锁持有的时间较短(秒级别)则称为细粒度锁,粗粒度锁对服务器的请求负载较低,可以支撑更高的并发度。一个 Chubby 实例可以负责1万台4核 CPU 机器相互之间对资源的协同管理。这种锁服务的主要功能是让众多客户端程序进行相互之间的同步,并对系统环境或者资源达成一致认知。
Chubby 的理论基础是 Paxos
(完全分布式的),同时处于效率考虑,增加了一些中心管理策略,在达到同一目标的情况下改善了系统效率。
Chubby 中所有的读/写操作都由主控服务器完成,从属服务器只是为了提高整个协调系统的可用性。
ZooKeeper 服务
ZooKeeper 的理论基础是 ZAB
协议,集群中的一台通过 ZAB 原子广播协议选举作为主控服务器,其他作为从服务器。ZooKeep 通过 重放日志(Replay Log
,更新操作体现在内存前先写入外存日志) 和模糊快照(Fuzzy Snapshot
,周期性通过深度遍历而不是加锁的方式对内存数据做数据快照)来对服务器进行容错。
和 Chubby 不同的是,ZooKeeper 的任何一台服务器都可以响应客户端的读操作,这是为何其吞吐量高的原因。潜在问题是客户端可能会读到过期的数据,当然 ZooKeeper 的接口 API 中提供了 Sync
操作,接收到 Sync
指令的从服务器从主控服务器同步状态信息。
ZooKeeper 提供的 API中,可以实现 Watch
的三个接口:getData、exists、getChildren
。
分布式通信
大数据系统中3种常见的通信机制:
- 序列化与远程过程调用;
- 消息队列;
- 多播通信。
序列化与远程过程调用
PB
是在Google内部广泛使用的序列化与 RPC 框架,是几乎所有Google服务的黏合剂,与JSON、XML 及 Thrift
等相比,PB
对数据的压缩率是最高的。
PB
和 Thrift
在使用流程方面大致相同。其流程一般如下:
- 首先,使用
IDL
定义消息体以及 RPC 函数调用接口,IDL
是与具体编程语言无关的接口描述语言,使用它可以定义调用方和被调用方都一致遵循的数据结构与接口; - 使用工具根据上步的
IDL
定义文件生成指定编程语言的代码; - 在应用程序中链接使用上一步生成的代码;
如果追求序列化的高效但不使用 RPC,可以优先考虑 PB
;如果需要内建的便捷 RPC 支持,可以优先考虑 Thrift
;如果需要和动态语言方便地集成,则可以优先考虑 Avro
。
消息队列
实际测试表明,连续并发发送1KB大小的消息,从性能的角度看,ZeroMQ性能最优,可达10万TPS(Transaction Per Second,每秒事务处理量)以上;Kafka次之,可达4万TPS左右;RabbitMQ再次,大约1万TPS左右;ActiveMQ最次,性能大约在6 000 TPS左右。
- ActiveMQ 和 RabbitMQ 相对来说算是重量级系统,其遵循AMQP协议,具有较强的功能和相对广泛的适用场景,但也因此导致其性能较低和扩展性较差;
- ZeroMQ 相对特殊,也是其中最轻量级的系统,严格来讲,其是介于会话层之上应用层之下的网络通信库,适用于高并发低延迟的场景,比如金融行业数据传输;但是其不支持消息持久化,而是将消息全部保存在内存传递,所以在消息送达保证方面存在潜在问题;
- Kafka 较于 ActiveMQ 和 RabbitMQ 而言,算是轻量级的消息系统,同时其提供了消息持久化保证,支持消息“至少送达一次”语义,在性能方面表现优异,除此之外,其在高可用性及可扩展性方面也很出色。
Kafka
Kafka通过消息副本机制提供了高可用的消息服务,其副本管理单位不是Topic消息队列,而是Topic的数据分片(Partition),在配置文件里可以指定数据分片的副本个数。在多个副本里,其中一个作为主副本(Leader),其他作为次级副本(Slave)。
Kafka并未使用类似Zab或者Paxos协议的多数投票机制来保证主备数据的一致性,而是提出了一种被称为 ISR(In-Sync Replicas)
的机制来保证数据一致性,多数投票机制要求在消息写入的时候同时同步至少2N个数据,效率太低。
ISR的运行机制如下:将所有次级副本数据分到两个集合,其中一个被称为ISR集合,这个集合备份数据的特点是即时和主副本数据保持一致,而另外一个集合的备份数据允许其消息队列落后于主副本的数据。在做主备切换时,只允许从ISR集合中选择候选主副本,这样即可保证切换后新的主副本数据状态和老的主副本保持一致。在数据分片进行消息写入时,只有ISR集合内所有备份都写成功才能认为这次写入操作成功。在具体实现时,Kafka利用ZooKeeper来保存每个ISR集合的信息,当ISR集合内成员变化时,相关构件也便于通知。通过这种方式,如果设定ISR集合大小为f+1,那么可以最多允许f个副本故障,而对于多数投票机制来说,则需要2f+1个副本才能达到相同的容错性。
使用磁盘读/写根本且普适的原则是:尽可能避免随机读/写,同时尽可能利用顺序读/写,即连续读/写整块数据。现代操作系统也针对磁盘这种特性做出了优化:预读(Read-Ahead)和迟写(Write-Behind)。即预先将整块数据读入操作系统内核空间的页缓存以及将若干较少的逻辑写操作拼成一个较多的物理写操作。
Kafka是一个基于文件系统的消息系统,能够高效处理大批量消息的一个重要原因就是将读/写操作尽可能转换为顺序读/写,比如类似于Log文件方式的文件尾部追加写。另外,Kafka涉及将文件内容通过网络进行传输,为了提升效率,Kafka采用了Linux操作系统的SendFile调用。
多播通信
Gossip协议是常用的应用层多播通信协议,与其他多播协议相比,其在信息传递的强壮性和传播效率这两方面有较好的折中效果,使得其在大数据领域广泛使用。
Gossip协议用来尽快地将本地更新数据通知到网络中的所有其他节点。其具体更新模型又可以分为3种:全部通知模型(Best Effort或Direct Mail)、反熵模型(Anti-Entropy)和散布谣言模型(Rumor Mongering)。其中反熵模型是最常用的。
数据通道
Log数据收集
Chukwa
Chukwa是用于针对大规模分布式系统Log收集与分析用途的Apache开源项目,其建立在Hadoop之上,每台机器节点都部署Chukwa代理程序(Agent),其负责收集应用产生的Log数据并通过HTTP协议传给Chukwa收集器(Collectors)。
使用MR任务直接来收集Log数据是不合适的,因为单机Log数据具有量小且渐增的特性,而MR更适合处理大文件数据块。因此Chukwa的基本策略是首先收集大量单机的Log增量文件,将其汇总后形成大文件,之后再利用MR任务来对其进行加工处理。
Chukwa的设计思路希望集成数据收集和数据分析,这造成其设计思路不够单一和明确,因为数据收集和数据分析这两种任务各自优化目标大不相同甚至有些矛盾,因此Chukwa整个系统无特色、无明显优势,与其他类似系统相比其发展前景堪忧。
Scribe
应用程序作为Thrift客户端来和Scribe服务器通信,将本地Log信息及其信息分类发送到Scribe服务器。Scribe服务器可以是单机也可以是集群,其内部维护了消息队列,队列内容即各个客户端发送的信息。Scribe服务器后端可以将队列内容消费传达中央存储区(HDFS、NFS或者另外的Scribe服务器),如果中央存储区不可用,Scribe将信息先存入本地磁盘,待其可用时再转发过去,这样整个系统就具备较好的容错能力。
数据总线
数据总线的作用就是能够形成数据变化通知通道,当集中存储的数据源(往往是关系型数据库)的数据发生变化时,能尽快通知对数据变化敏感的相关应用或者系统构件,使得它们能尽快捕获这种数据变化。一般而言,设计数据总线系统时要关注以下3个特性:
- 近实时性:变化通知机制越快越好;
- 数据回溯能力:应用可以重新获取指定时刻的历史数据变化情况;
- 主题订阅能力:支持应用灵活地订阅其关心的数据变化情况。
现实中可以有两种不同的实现思路:应用双写(Dual Write)或者数据库日志挖掘。
应用双写,是指应用将数据变化同时写入数据库以及某个Pub-Sub消息系统中,关注数据变化的应用可以订阅Pub-Sub消息系统中自己关心的主题,以此来获得数据通知,即数据库的归数据库,应用的归消息系统。这种思路的好处是实现简捷,但是存在潜在的数据不一致问题。
数据库日志挖掘的思路是:应用先将数据变更写入数据库,数据总线从数据库的日志中挖掘出数据变化信息,然后通知给关心数据变化的各类应用。这样做可以保证数据的一致性,但是实现起来相对复杂,因为需要解析Oracle或者MySQL的日志格式。目前比较常见的做法还是数据库日志挖掘的方式,LinkedIn 的 Databus 和 Facebook 的 Wormhole数据总线,两者都是以数据库日志挖掘的思路来实现的
数据导入/导出
Sqoop是专门在Hadoop和其他关系型数据库或者NoSQL数据库之间进行相互之间数据导入和导出的开源工具(见图7-9)。在其内部实现时,具体的导入/导出工作是通过可以连接并操作数据库的MR任务完成的。
分布式文件系统
GFS
GFS文件系统主要由3个组成部分构成:唯一的“主控服务器”(Master)、众多的“Chunk服务器”和“GFS客户端”。“主控服务器”主要做管理工作,“Chunk服务器”负责实际的数据存储并响应“GFS客户端”的读/写请求。
Colossus是Google的下一代GFS分布式文件系统,其对GFS的改进集中在以下几个方面:
- 主控服务器:GFS 采取单一主控服务器架构,可扩展性很差,能够容纳的文件数量相对有限;Colossus将单一主控服务器改造为多主控服务器构成的集群。
- Chunk服务器:为了保证数据的高可用性,GFS通过默认将Chunk数据保持3个备份的方式来达到此要求,Colossus使用了Reed-Solomon纠删码算法来实现这一点;
- 客户端:Colossus增加了客户端的灵活性,使得客户端可以管理备份数据的存储地点;
HDFS
HDFS是Hadoop中的大规模分布式文件系统,HDFS的整体架构如下图所示,其由NameNode、DataNode、Secondary NameNode以及客户端构成。NameNode类似于GFS的“主控服务器”,DataNode类似于GFS的“Chunk服务器”。HDFS适合存储大文件并为之提供高吞吐量的顺序读/写访问,不太适合大量随机读的应用场景,也不适合存储大量小文件等应用场景。
GFS架构中的单一“主控服务器”设计是存在较大问题的,其中最主要的两个问题是:单点失效和水平扩展性不佳。针对这两个问题,从Hadoop 2.0开始提出了统一的解决方案,即高可用方案(High Availability,HA)和NameNode联盟(NameNode Federation)。其中HA是为了解决单点失效问题,而NameNode联盟则是为了解决整个系统的水平扩展问题。
HA 方案
可以简单描述如下:“主控服务器”由Active NameNode(简称ANN)和Standby NameNode(简称SNN)一主一从两台服务器构成,ANN是当前响应客户端请求的服务器,SNN作为冷备份或者热备份机。SNN的所有元数据需要与ANN的元数据保持一致,HA方案通过以下两点来保证这一要求:
- 使用第三方共享存储(NAS+NFS)来保存目录文件等命名空间元数据,其本质是将NN的单点失效问题转换成为第三方存储的单点失效问题;
- 所有DataNode同时将心跳信息发送给ANN和SNN;
- 使用 ZooKeeper 在此用作 SNN 和 ANN 的“领导者选举”;
- 为了防止在故障切换过程中出现脑裂(Brain-Split)现象,需要在多处进行隔离措施(Fencing),如需要保证同一时刻只能有一个NN能够对客户端请求发出正确响应;
Hadoop发行版中提供了基于 QJM(Quorum Journal Manager)的HA方案:利用Paxos协议在多台备份机之间选举“主控服务器”,QJM在2F+1个JournalNode中存储NN的editlog,每次写入操作如果有F台服务器返回成功即可认为成功写入,通过Paxos协议保证数据的一致性,QJM最多可以容忍F个JournalNode同时发生故障而系统仍然可以正常运行。
NameNode联盟
即将一个大的命名空间切割成若干子命名空间,每个子命名空间由单独的NN来负责管理,NN之间独立,相互之间无须做任何协调工作。所有的DataNode被多个NN共享,仍然充当实际数据块的存储场所。而子命名空间和DataNode之间则由数据块管理层作为中介建立映射关系,数据块管理层由若干数据块池(Pool)构成,每个数据块唯一属于某个固定的数据块池,而一个子命名空间可以对应多个数据块池。
HayStack
HayStack是Facebook公司设计开发的一种“对象存储系统”,这里的“对象”主要是指用户上传的图片数据,典型的特征是:一次写入,多次读取,从不更改,很少删除。
一般读取一张图片需要有两次磁盘读操作,首先从磁盘中获得图片的“元数据”,根据“元数据”从磁盘中读出图片内容。为了增加读取速度,“元数据”从内存读取,图片数据从磁盘读取。
HayStack存储系统由很多PC构成,每个机器的磁盘存储若干“物理卷”,这里的“物理卷”就是存储多个图片数据对应的某个文件,一般一个“物理卷”文件大小为100GB,可以存储上百万张图片数据。不同机器上的若干“物理卷”共同构成一个“逻辑卷”,在HayStack的存储操作过程中,是以“逻辑卷”为单位的,对于一个待存储的图片,会同时将这个图片数据追加到某个“逻辑卷”对应的多个“物理卷”文件末尾,之所以要这么做,主要是从数据冗余的角度考虑的,即使某台机器宕机,或者因为其他原因不可用,还可以从其他机器的“物理卷”中读出图片信息,这种数据的冗余是海量存储系统必须考虑的。
读取图片:HayStack缓存系统会向存储系统提供图片的“逻辑卷”ID编号以及图片ID(由Key和辅助Key构成),当存储系统接收到请求后,会在内存中的“物理卷”映射表中查找图片ID,如果找到,则根据映射表保存的信息可以获取其在对应“物理卷”中的文件起始位置和文件大小,如此就可以读到这个图片的内容。
上传图片:HayStack存储系统根据Web服务器传过来的图片“逻辑卷”ID编号以及图片ID和图片数据,将这个图片信息追加到对应的“物理卷”文件末尾,同时在内存的映射表中增加相应的映射信息。
更改了图片:如果用户更改了图片的内容后再次上传,HayStack存储系统不允许覆盖原先图片信息这种操作,因为这种操作严重影响系统效率,而是将这个修改的图片当作一个新的图片追加到“物理卷”的文件末尾,不过这个图片的ID是不变的。
删除了图片:如果用户删除某张图片,HayStack系统的操作也很直观,只要在内存映射表和“物理卷”中在相应的“删除标记位置”上做出标记即可。系统会在适当的时机回收这些被删除的图片数据空间。
文件存储布局
行式存储
行式存储广泛使用在主流关系型数据库及HDFS文件系统中,每条记录的各个字段连续存储在一起,而对于文件中的各个记录也是连续存储在数据块中的。
行式存储布局的优势体现在按行遍历或者查找数据,缺陷是:
- 对于很多SQL查询来说,其所需读取的记录可能只涉及整个记录所有字段中的部分字段,而若是行式存储布局,即使如此也要将整个记录全部读出后才能读取到所需的字段;
- 在存储时可以使用数据压缩模式,但是对于记录的所有字段只能统一采用同一种压缩算法,这样的压缩模式导致数据压缩率不高,所以磁盘利用率不是很高。
列式存储
列式存储布局在实际存储数据时,按照列对所有记录进行垂直划分,将同一列的内容连续存放在一起。在各种应用场景中,记录数据的格式有简单的和复杂的两种。简单的记录数据格式类似于传统数据库的(记录-字段)这种平面型(Flat)数据结构,而复杂的记录格式则可能是嵌套(Nested)的记录结构,即字段内容可能是另外一个有结构的记录体,比如JSON格式就是这种支持嵌套表达的复杂记录格式。
同一列的所有数据连续存储在一起。这样做有两个好处:
- 对于上层的大数据分析系统来说,如果SQL查询只涉及记录的个别列,则只需读取对应的列内容即可,其他无关字段不需要进行读取操作,这样可以增加I/O效率;
- 因为数据按列存储,所以可以针对每列数据的类型采取具有针对性的数据压缩算法,不同的字段可以采用不同的压缩算法,这样整体压缩效果会有极大的提升。
混合式存储
混合式存储布局融合了行式和列式存储各自的优点,首先其将记录表按照行进行分组,若干行划分为一组,而对于每组内的所有记录,在实际存储时按照列将同一列内容连续存储在一起。这种存储布局,一方面可以像行式存储一样,保证同一行的记录字段一定是在同一台机器节点上的,避免拼合记录的网络传输问题;另外一方面可以像列式存储布局一样按列存储,这样不同列可以采用不同的压缩算法,同时也可以避免读取无关列的数据。
典型的混合式存储方案包括RCFile、ORCFile和Parquet。
纠删码
纠删码通过对原始数据进行校验并保留校验数据,以增加冗余的方式来保证数据的可恢复性。
一种常见的做法是:对于热点数据,在大规模存储系统中仍然保留3个备份,而对于冷数据,则只保留1份数据,通过纠删码来保证数据的可靠性。之所以不对所有数据都采用纠删码的方式,是因为备份数据除了能够增加数据的可用性外,还可以提升数据的并发读取效率,所以对于热点数据用多备份的方式比较合适。
内存 KV 存储
对于内存KV数据库来说,极高的数据读/写速度和请求吞吐量是其题中应有之义。从系统设计角度来看,主要需要考虑的是:数据存储成本与系统高可用性如何均衡与选择的问题。
RAMCloud
RAMCloud的由一组存储服务器和一个协调器组成,存储服务器由高速网络连接,每台存储服务器包含两个构件:Master和Backup。Master负责内存KV数据的存储并响应客户端读/写请求,Backup负责在外存存储管理其他服务器节点内存数据的数据备份。协调器记载集群中的一些配置信息,比如各个存储服务器的IP地址等,另外还负责维护存储对象和存储服务器的映射关系,即某个存储对象是放在哪台服务器的。
为了能够支持快速数据持久化以及故障时快速数据恢复,RAMCloud在内存和外存存储数据时都统一采用了LSM树方案,其对应的Log结构被切割为8MB大小的数据片段(Segment)。
当RAMCloud接收到写数据请求时,首先将其追加进入内存中的Log结构中,然后更新哈希表以记载记录在内存中的存储位置,这里之所以会需要哈希表,是因为内存数据采取LSM树结构后,是由若干个Log片段构成的,所以需要记载记录所在Log片段的位置信息。之后,RAMCloud的主数据服务器将新数据转发给其他备份服务器,备份服务器将新数据追加到内存中Log片段后即通知主数据服务器返回,当备份服务器用于备份的Log片段写满时将其写入外存的LSM结构中。
Redis
系统中唯一的Master负责数据的读/写操作,可以有多个Slave来保存数据副本,副本数据只能读不能做数据更新操作。当Slave初次启动时,从Master获取数据,在数据复制过程中,Master是非阻塞的,即同时可以支持读/写操作。Master采用快照加增量的异步方式完成数据复制过程,首先在时刻T将内存数据写入本地快照文件,同时在内存记录从T时刻起新增的数据操作,当快照文件生成结束后,Master将文件传给Slave,Slave先保存为本地文件,然后将其加载入内存。之后,Master将T时刻后的数据变更操作以命令流的形式传给Slave,Slave顺序执行命令流,这样就达到数据和Master保持同步。
Master和Slave都记载上次复制时的命令流地址(Offset),当Slave重新连接Master时,Master可以根据地址偏移量将增量更新传递给Slave。
由于Redis的主从复制采用异步方式,所以Master接收到数据更新操作与Slave接收到数据副本有一个时间差,这样如果Master发生故障可能会导致数据丢失。另外,因为Redis并未支持主从自动切换,如果Master故障,很明显此时系统对外表现为只读不能写入。
Redis Cluster是专注于解决多机集群问题的版本,但是其整个投票机制复杂且不够优雅,其实在这里如果引入ZooKeeper是能简单而优雅地实现主备自动切换的。
Redis的在现有Redis版本下如何自助实现系统高可用呢?一种常见的解决思路是使用Keepalived结合虚拟IP来实现Redis的HA方案。
- 首先,在两台服务器(或者多台,机制类似)分别安装Redis并设置成一主一备。
- 其次,Keepalived配置虚拟IP和两台Redis服务器IP的映射关系,这样,对外统一采用虚拟IP,而虚拟IP和真实IP的映射关系及故障切换由Keepalived来负责。
- 当Redis服务器都正常时,数据请求由Master负责,Salve只需从Master同步数据;当Master发生故障时,Slave接管数据请求,同时关闭主从复制功能以避免Master再次启动后Slave数据被清掉;当发生故障的Master恢复正常后,首先从Slave同步数据以获得最新的数据情况,然后关闭主从复制功能并恢复Master身份,与此同时Slave恢复其Slave身份。
MemBase
已更名为CouchBase。MemBase通过“虚拟桶”的方式对数据进行分片,其将所有数据的主键空间映射到4096个虚拟桶中,并在“虚拟桶映射表”中记载每个虚拟桶主数据及副本数据的机器地址,MemBase对“虚拟桶映射表”的更改采用两阶段提交协议来保证其原子性。
MemBase中的所有服务器都是地位平等的,并不存在一个专门进行管理功能的Master服务器,但是其数据副本管理采用了Master-Slave模式。每个虚拟桶有一台服务器作为主数据存储地,这台服务器负责响应客户端请求,副本存放在其他服务器内存中,其副本个数可以通过配置来指定。
客户端在本地缓存一份“虚拟桶映射表”,所以通过哈希函数以及这个映射表可以直接找到主数据及副本数据的机器地址。如果是读请求,则主数据服务器直接可以响应请求。如果是写请求,则主数据服务器以同步的方式将写请求转发给所有备份数据服务器,
列式数据库
列式数据库兼具NoSQL数据库和传统数据库的一些优点,其具备NoSQL数据库很强的水平扩展能力、极强的容错性以及极高的数据承载能力,同时也有接近于传统关系型数据库的数据模型,在数据表达能力上强于简单的Key-Value数据库。
BigTable
BigTable本质上是一个三维的映射表,其最基础的存储单元是由(行主键、列主键、时间)三维主键(Key)所定位的。BigTable要求每行的主键一定是字符串的形式,而且在存储表格数据时,按照行主键的字母大小顺序排序存储。
BigTable的整体结构示意图,其中主要包含:主控服务器(Master Server)、子表服务器(Tablet Server)和客户端程序(Client)。每个表格将若干连续的行数据划分为一个子表(Tablet),这样,表格的数据就会被分解为一些子表。“子表服务器”主要负责子表的数据存储和管理,同时需要响应客户端程序的读/写请求,其负责管理的子表以GFS文件的形式存在,BigTable内部将这种文件称之为SSTable,一个子表就是由“子表服务器”磁盘中存储的若干个SSTable文件组成的。“主控服务器”负责整个系统的管理工作,包括子表的分配、子表服务器的负载均衡、子表服务器失效检测等。“客户端程序”则是具体应用的接口程序,直接和“子表服务器”进行交互通信,来读/写某个子表对应的数据。
Spanner
BigTable尽管有很多适合的使用场景,但是其在复杂或者不断演化的数据模式或者有跨行跨表的强一致性需求等应用场景下表现不佳。
Spanner是Google开发的可在全球范围部署的具有极强可扩展性的列式数据库系统,其可以将千亿规模的数据自动部署到世界范围数百个数据中心中的百万台服务器中,通过细粒度的数据备份机制极大地提高了数据的可用性以及地理分布上的数据局部性。
Spanner可以细粒度地自主控制数据备份策略,包括备份数目、在不同数据中心的存储配置、备份数据距离用户的物理距离远近、备份数据之间的距离远近,Spanner还具备传统分布式数据库系统所不具备的优点:其可提供外部一致(Externally Consistent)的读/写能力,以及跨数据库的全局一致性读能力。
Spanner之所以能够提供上述诸种优点,很重要的原因是通过TrueTime机制为分布式事务打上具有全局比较意义的时间戳,这个时间戳由于跨数据中心全局可比,所以可以将其作为事务序列化顺序(Serialization Order)的依据。而且这种序列化顺序还满足如下的外部一致性:如果事务T1的提交时间早于事务T2的开始时间,那么T1的提交时间戳要小于T2的提交时间戳,即可以依据提交时间戳的大小顺序来将分布式事务全局序列化。Spanner是第一个能够在全局范围提供此种能力的分布式存储系统。
TrueTime具体实现时综合了GPS和原子时钟来共同决定准确的时间,每个数据中心部署一些时间服务器(Time Master),其中一些机器通过GPS确定时间,另外一些通过原子时钟确定时间,其他所有机器都安装一个时间监控进程。时间服务器之间通过定时互询来修正各自的准确时间,以避免单台时间服务器的时间和真实时间偏离太远,时间监控进程定期从多个数据中心的多台时间服务器拉取它们各自的时间标准,通过一定算法保证每个服务器自己的时间落在真实时间的一定精度范围内。
TrueTime提供了全局可比时间服务,再结合Paxos算法,Spanner可以提供读/写事务、只读事务以及快照读事务等各种分布式事务支持,并提供外部一致性读/写能力,这为上层应用开发带来了很大的灵活性和便利性。
大规模批处理系统
大数据计算中一类最常见的计算任务即为批处理,现代批处理计算系统的设计目标一般包括数据的高吞吐量、系统灵活水平扩展、能处理极大规模数据、系统具有极强的容错性、应用表达的便捷性和灵活性等,而非流式计算系统强调的处理的实时性等特性。
从发展趋势看,相对复杂的任务转换为MapReduce任务开发效率还是不够高,所以其有逐步被封装到下层的趋势,即在上层系统提供更为简洁方便的应用开发接口,在底层由系统自动转换为大量MapReduce任务,这一点值得读者关注。
MapReduce
Google的MapReduce计算框架
- 被分配到Map任务的Worker读取对应的数据块内容,Map函数输出的中间结果Key/Value数据在内存中进行缓存,缓存的Map函数产生的中间结果周期性地被写入本地磁盘,每个Map函数的中间结果在写入磁盘前被分割函数(Partitioner)切割成R份,R是Reduce的个数;
- 只有所有Map任务都完成时Reduce任务才能启动,也即MapReduce计算模型中在Map阶段有一个所有Map任务同步的过程,只有同步完成才能进入Reduce阶段
- Reduce任务从Map任务获取中间数据时采用拉取方式而非由Map任务将中间数据推送(Push)给Reduce任务,这样做的好处是可以支持细粒度容错。
Hadoop的MapReduce计算框架
MapReduce计算模型和框架具有很多优点。首先,其具有极强的可扩展性,可以在数千台机器上并发执行。其次,其具有很好的容错性,即使集群机器发生故障,一般情况下也不会影响任务的正常执行。另外,其具有简单性,用户只需要完成Map和Reduce函数即可完成大规模数据的并行处理。一般认为MapReduce的缺点包括:无高层抽象数据操作语言、数据无Schema及索引、单节点效率低下、任务流描述方法单一等。
MapReduce运算机制的优势是数据的高吞吐量、支持海量数据处理的大规模并行处理、细粒度的容错,但是并不适合对时效性要求较高的应用场景,比如交互式查询或者流式计算,也不适合迭代运算类的机器学习及数据挖掘类应用,主要原因有以下两点。
- 其Map和Reduce任务启动时间较长;
- 在一次应用任务执行过程中,MapReduce计算模型存在多处的磁盘读/写及网络传输过程。
DAG计算模型
DAG是有向无环图(Directed Acyclic Graph)的简称,在大数据处理领域,DAG计算模型往往是指将计算任务在内部分解成为若干个子任务,这些子任务之间由逻辑关系或运行先后顺序等因素被构建成有向无环图结构。MapReduce计算模型,在本质上是DAG的一种特例。DAG计算系统比较通用的由上到下三层结构:
- 最上层是应用表达层,即通过一定手段将计算任务分解成由若干子任务形成的DAG结构,这层的核心是表达的便捷性,主要目的是方便应用开发者快速描述或者构建应用。
- 中间层是DAG执行引擎层,其主要目的是将上层以特殊方式表达的DAG计算任务通过转换和映射,将其部署到下层的物理机集群中来真正运行;
- 最下层是物理机集群,即由大量物理机器搭建的分布式计算环境,这是计算任务最终执行的场所。
流式计算
批处理计算系统、图计算系统等相比,流式计算系统有其独特性。优秀的流式计算系统应该具备以下特点:
- 记录处理低延迟(毫秒级)
- 极佳的系统容错性
- 极强的系统扩展能力
- 灵活强大的应用逻辑表达能力
常见的流式计算系统架构分为两种:主从模式(Master-Slave)和P2P模式。大多数系统架构遵循主从模式,主要是因为主控节点做全局管理比较简洁,P2P架构因为无中心控制节点,所以系统管理方面相对较复杂,使用该类架构的系统较少,S4是一个典型例子。
DAG结构中最常见的基本拓扑结构包含:流水线、乱序分组、定向分组和广播模式。
对于流式计算系统的DAG任务结构来说,流数据进入系统后经过多个计算节点的不断变换,最终到达输出节点形成计算结果。如何保证流数据正确地从上游节点送达下游节点是非常重要的问题,这个问题的解决方案一般被称作数据的“送达保证机制”。可以定义如下三种送达可能:
- 至少送达一次(At-Least Once Delivery)
- 至多送达一次(At-Most Once Delivery)
- 恰好送达一次(Exact-Once Delivery)
Storm在系统级提供“恰好送达一次”语义,这是通过“送达保证机制”和“事务拓扑”(Transaction Topology)联合完成的。“送达保证机制”能够实现“至少送达一次”语义(通过将数据ID进行不断XOR,判断最终是否为0),而“事务拓扑”则保证不会出现多次送达的情形。
流式计算任务系统容错有三种模式:
- 备用服务(Standby Service):计算任务的某个计算节点N在另外一台物理机上设置其对应的备份服务S,计算框架定时通过心跳或者ZooKeeper来及时捕获服务状态,当节点N发生故障时,启动备份服务S来接替计算节点N的功能,但是这只适合计算节点属于无状态(Stateless)类型的服务;
- 热备(Hot Standby):热备机制的计算节点N和其备用节点S同时运行相同的功能,上游节点将数据流同时发往下游的计算节点N及其备用节点S;
- 检查点(Checkpointing):计算节点N周期性地将其状态信息通过检查点的方式在其他地方进行备份,当计算框架侦测到计算节点N发生故障时,则启动备用节点S,并从Log中将对应的状态信息进行恢复。
目前主流的流式计算系统都采用检查点的容错机制。
交互式数据分析
对目前的各种SQL-On-Hadoop系统进行归类梳理,根据其整体技术框架和技术路线的差异,将其分为以下四类:
- Hive系。Hive是直接构建在Hadoop之上的早期提出的数据仓库系统;
- Shark系。如果将Hive理解为在Hadoop基础上的交互式数据分析系统,那么可以将Shark理解为在Spark系统之上的数据仓库系统;
- Dremel系。严格地说,将很多归于此类的数据仓库统称为“Dremel系”是不够严谨的,因为很多系统不仅参考了Dremel的设计思路,在很大程度上也融合了MPP并行数据库的设计思想;
- 混合系。混合系是直接将传统的关系数据库系统和Hadoop进行有机混合而构造出的大规模数据仓库,其中,HadoopDB是最具代表性的。
图数据库:架构与算法
在处理图数据时,其内部存储结构往往采用邻接矩阵或邻接表的方式,在大规模并行图数据库场景下,邻接表的方式更加常用,大部分图数据库和处理框架都采用了这一存储结构。
图数据的数据局部性很差,相互之间有很密切的关联,具体体现就是图节点所展现出的边,其表征着数据之间的关联。数据局部性差意味着数据分布到集群中的机器时存在潜在的数据分布不均匀或者计算中需要极高的网络通信量等问题。
图数据库可以分为两类:一类是在线查询类,另一类是离线挖掘类。
在线查询类图数据库
如Facebook的TAO图数据库、Neo4j。
在线查询类图数据库的主要目的往往是给具体应用提供在线数据读写服务,其中尤其关注数据查询类服务,所以更强调系统的高可用性和读写的低延迟。其体系结构一般由底向上可以划分为三层:分布式存储引擎层、图数据管理层和最上端的图操作API层。
底层的存储引擎层采用MySQL数据库居多,主要是可以利用成熟数据库的很多特有功能,比如事务等,这一层只负责数据的存储。
离线挖掘类图数据库
离线挖掘类图计算范型包括同步模型和异步模型。Pregel和Giraph采用了典型的同步模型,GraphChi采用了典型的异步模型,而PowerGraph则可以看成是混合模型的代表,即其既可以模拟同步模型,也可以模型异步模型。
同步执行模型是相对于异步执行模型而言的。我们知道,图计算往往需要经过多轮迭代过程,在以节点为中心的图编程模型下,在每轮迭代过程中对图节点会调用用户自定义函数Function(vertex),这个函数会更改vertex节点及其对应边的状态,如果节点的这种状态变化在本轮迭代过程中就可以被其他节点看到并使用,也就是说变化立即可见,那么这种模式被称为异步执行模型;如果所有的状态变化只有等到下一轮迭代才可见并允许使用,那么这种模式被称为同步执行模型。
离线挖掘数据分片
由于图数据记录之间的强耦合性,如果数据分片不合理,不仅会造成机器之间负载不均衡,还会大量增加机器之间的网络通信。,再考虑到图挖掘算法往往具有多轮迭代运行的特性,这样会明显放大数据切片不合理的影响,严重拖慢系统整体的运行效率,所以合理切分图数据对于离线挖掘类型图应用的运行效率来说非常重要,但是这也是至今尚未得到很好解决的一个潜在问题。衡量图数据切片是否合理主要考虑两个因素:机器负载均衡以及网络通信总量。
机器学习:范式和架构
很多机器学习算法都有迭代运算的特点,这主要是在损失函数最小化的训练过程中,需要在巨大的参数空间中通过迭代方式寻找最优解,比如主题模型、回归、矩阵分解、SVM以及深度学习等都是如此。但是将迭代式机器学习程序改造为并行架构下运行也面临一些挑战:
- 单机版通过共享内存获取的全局参数此时需要并发程序通过网络来存取,而网络的通信效率会比内存存取效率低很多;
- 在分布式环境下,运行在不同机器上的并发程序可能因为各种原因(机器负载高或者硬件故障等)造成执行速度不统一,这对快速完成整个任务也有负面影响;
- 较强的容错性,当集群中的机器发生故障时,如何进行调度使整个任务能够顺利完成,并保证程序运行的正确也是很重要的问题。
将算法改造为在分布式环境下执行,直观感觉上会认为运行速度一定会大大快于单机环境,但是鉴于以上各种挑战,如果不能合理设计系统架构和算法,有时候分布式算法的执行效率甚至不如单机版。
典型的监督学习任务分别是分类(Classification)与回归(Regression),对于非监督学习来说,最典型的问题就是聚类问题。
Google内部使用深度神经网络构建图片识别系统的参数规模已经达到了十亿到百亿级别(天哪!!!)。
构建大规模分布式机器学习系统时经常采用的两种并行措施分别为数据并行和模型并行:
- 所谓“数据并行”,指的是将训练数据划分成若干子集合,每个子集合都运行相同的学习算法来进行并行训练过程,这样分别得到局部训练模型,在机器学习语境下,往往会有一个融合局部训练模型为全局训练模型的过程(显式地独立融合过程或者通过参数服务器同步或者异步方式融合等不同的策略);
- 所谓“模型并行”,指的是在模型参数巨大的情形下,单机往往没有能力单独完成整个机器学习算法的建模过程,此时必须将整个机器学习模型分布到多台机器上联合完成训练过程。
分布式机器学习范型
对于迭代式计算任务,根据并发进程或线程之间是否需要进行数据同步,可以将其划分为同步范型和异步范型。考虑到同步范型又可以细分为严格同步和部分同步两种,所以在此列出三种范型:同步范型、异步范型及部分同步范型。
- MapReduce迭代计算模型
- BSP计算模型
- SSP模型
- MPI
- GPU
分布式机器学习架构
常见的三类分布式机器学习架构:MapReduce系列架构、Spark及其上的MLBase架构和参数服务器架构。
MapReduce系列
将机器学习系统直接构建在Hadoop平台之上:HDFS作为训练数据和机器学习模型的存储场所,机器学习框架往往采用三层结构,底层是Hadoop提供的MapReduce计算机制,在其上构建一个处于中间层的常用机器学习算法库,最上层往往分为模型训练和在线服务两类功能模块。
其运行流程为:首先将训练数据存入HDFS指定的目录下,计算层根据配置文件内容读取训练数据,然后使用特定参数配置的机器学习算法学习模型,并将习得的计算模型存入HDFS指定的目录下,这样即可完成训练过程。服务层加载习得的模型即可对外提供在线预测功能。
Hadoop平台改造的计算框架有Twister和Haloop。
Spark及MLBase
Spark在本质上仍然是一个DAG批处理计算系统,在其提出之前已经有Dryad等通用DAG计算系统,考虑到很多迭代类机器学习问题在迭代计算过程中会反复重用很多中间数据,这些数据被称为工作集数据(Working Set),而Dryad等系统并未明确提出针对工作集数据的优化处理策略,在数据处理过程中需要反复将工作集数据输入/输出到磁盘,所以传统的DAG系统处理这类问题的效率并不太高。Spark针对工作集数据提出了基于内存的分布式存储抽象模型:可恢复分布式数据集(Resilient Distributed Datasets,简称RDD),这样工作集数据可以有选择性地被加载并常驻在内存中,有利于后续迭代计算过程中大大提升此类问题的处理效率。由此可看出,Spark是集成了RDD模型的DAG批处理系统,它在RDD增加数据复用与系统处理速度的优势基础上,同时还具备传统DAG系统很强的容错性、数据局部性感知的调度策略以及高可扩展性,其处理迭代式机器学习任务的效率比MapReduce原始方式快20倍左右。
MLBase提供了相对统一的机器学习架构,既方便普通应用者快速开发机器学习相关的应用,又能够方便算法研究者不用考虑架构问题而集中精力在改进算法本身。
普通用户使用MLBase提供的任务声明语言来描述机器学习任务,并将请求提交给MLBase主控服务器(Master)。MLBase将用户请求解析为逻辑学习计划(Logical Learning Plan,LLP),其描述了机器学习任务的一般工作流。MLBase将优化的逻辑计划进一步转化为物理学习计划(Physical Learning Plan,PLP)以供实际执行,PLP由若干机器学习操作符构成。MLBase将物理学习计划分配给各个工作服务器(Slaves)来并行执行,并把执行结果返回给用户,执行结果往往包含从训练数据习得的机器学习模型和重要特征等,这便于用户使用这些信息来进行预测。
参数服务器(Parameter Server)
从本质上讲,可以将参数服务器看作是传统的共享内存方式在网络环境下的并行扩展版本。
Petuum是CMU提出的通用参数服务器架构,其由众多并发执行的客户端和由多台参数服务器构成的参数服务器集群构成,其中一台参数服务器充当主控服务器(Name-Node)的作用,并负责数据路由以及数据分片在不同的服务器间分配等工作。参数服务器集群是一个类似于分布式共享内存的分布式KV存储池,用于存储机器学习任务中各个并行客户端共享的全局参数,不同应用的全局参数可以放置在参数服务器集群不同的表格中。Petuum采取部分同步范型,不同的表格可以绑定不同的部分同步参数设置。客户端可以在合适的时机更新参数服务器中对应表格的全局参数,当其更新参数服务器对应的参数后,这个更新后的参数即对其他客户端可见。每个客户端还可以在本地缓存部分参数服务器里的参数值,一般情况下,客户端直接从本地缓存存取参数值,只有当达到一定条件时才会通过网络访问参数服务器集群内对应的参数,这样可以减少数据同步操作次数及网络通信数量,有效地加快整个任务的执行效率。
机器学习:分布式算法
这部分的笔记不太好整理,略过。
增量计算
增量计算是一种比较节约系统资源的思路:每次只对新增内容以及对其影响到的旧计算结果进行重新计算,原先的大部分计算结果可以复用。按照其整体计算思路的不同,将现有增量计算技术划分为“变化传播”与“结果缓存复用”两种模式。
“变化传播模式”在设计技术方案时,更多地从新数据和受影响的旧数据这个角度来考虑如何设计系统,其基本思路往往是:首先计算新数据的结果,然后判断直接受到影响的旧数据有哪些,并重新计算其结果,接着将这些变化的结果通过数据之间的结构传播出去,再考虑又有哪些其他旧的计算结果会进一步受到影响,如果影响足够大,那么需要重新计算,如此不断循环往复,即可完成整个增量的计算过程。
“结果缓存复用模式”在设计技术方案时,更多地从哪些旧数据的计算结果没有发生变化的角度考虑,并在此基础上对数据或者计算流程进行组织,尽可能最大化地复用没有变化的旧的结果,其复用方式往往采用结果缓存,将可复用的旧数据计算结果缓存在内存或者外存文件中。
Hadoop平台下增量计算的一般模式
首次运行时,增量计算与普通的Hadoop运算过程是一致的,区别体现在后续的增量迭代运行过程中。在后续的增量迭代中,增量计算系统首先要区分哪些数据是新增数据,哪些数据是已经有计算结果的旧数据,并尽可能将新增数据独立出来。对于新增数据,需要进行完整的Map和Reduce两阶段的运算,对于旧数据,则可以免去Map阶段的运算,只进行Reduce阶段的运算,即这是一种对Map阶段输出中间结果的复用,所以增量计算系统需要将上一轮计算中Map阶段的输出缓存到文件中,以供后续增量迭代重用。
Incoop和IncMR是两个典型的基于Hadoop平台的增量计算系统,都采取了“结果缓存复用模式”。这种模式对结果复用的大部分只能重用旧数据Map阶段的中间结果,在Reduce阶段,即使有很多旧数据结果没有受到影响,但是受制于MR的运算机制,也很难对此加以区分,所以必须完整地运行所有数据的Reduce阶段逻辑流程,其复用效果并不是特别突出。
实验结果表明,基于Hadoop改造的增量计算与全量更新相比,大多数应用性能的提升只是在10%以内。
Percolator
Percolator的本质是构建在Bigtable上的一种与MapReduce计算方式互补的增量计算模式,主要用来对搜索引擎的索引系统进行快速增量更新。如果是全局性的统计工作,还是比较适合用MapReduce来做,而对于局部性的更新则比较适合使用Percolator系统来处理。Percolator的目标:
- 针对海量数据处理
- 提供强一致性支持
- 局部增量更新计算
Bigtable提供了对数据行的事务支持,Percolator充分利用这一点,为表中每列数据增加管理数据,提供了支持ACID“快照隔离”语义的跨行跨表事务。
Percolater采用了“观察/通知”的机制来将应用程序串接起来形成一个整体,这样就形成了变化传播的增量计算模式,
Kineograph
Kineograph是一个支持增量计算的分布式准实时流式图挖掘系统。其同时具备的三个特点:
- 首先,Kineograph是一个图挖掘系统;
- 其次,它是准实时的流式挖掘系统;
- 最后,Kineograph是一个支持增量挖掘的系统,当连续不断的数据流入系统后,无须每次都对历史的所有数据进行重新计算,而是采用增量计算的方式使得挖掘结果进行更新。
Kineograph的增量计算采取了典型的变化传播模式。
DryadInc
DryadInc是建立在DAG批处理系统Dryad之上的增量计算系统,这是一种典型的“结果缓存复用模式”增量计算机制。其基本思想是将上一轮计算中部分可复用的结果存储到缓存服务器中,下次计算时尽可能从缓存服务器重用原先的计算结果,而非重新计算。其整体架构由“重执行逻辑”(Rerun Logic)和“缓存服务器”(Cache Server)构成。“重执行逻辑”是扩展版本的Dryad任务管理器,用于检测任务DAG中可复用的计算部分,并对DAG进行改写;“缓存服务器”则是通过网络访问的数据缓存。