Zookeeper源码解读 1概述

001_zookeeper概述

zookeeper的基本操作方式

  • ls 列出路径下的所有node,类似于linux的ls命令
  • create {node} {data} 创建节点,并且赋予值
  • get {node} 获取node 信息(包括值,创建时间,修改时间,版本(第n次修改))
  • delete {node} 删除节点

zk不使用相对路径,因此...是不适用的。

zookeeper基本概念

  • znodes 每一个zookeeper中的节点都被称作一个znode

znode 维护一个包含版本号、acl变动的数据结构。数据结构是包括时间戳。版本号也有时间戳,允许zookeeper验证cache并且协调更新。每次znode有数据更新,版本号会增加。因此,每当client获得数据时,它都会获得数据的版本号。当client执行更新或者删除操作时,它也要提供它要更改数据的版本号。如果版本比对失败,更新也会失败。(这个操作也可以是覆盖) 在zookeeper的文档中,znode用来表示数据nodesServers表示执行Zookeeper service的机器,quorum peers表示构成整体的服务(servers),client表示使用Zookeeper service的主机或者是process

znodeszookeeper中,程序主要接触到的实体。有一些特征需要注意:

  • Watches clients可以在znodes上设置watches。znode上的更新会触发watchzookeeper发送一个通知给client

  • DataAccess 每个znode上,命名空间内的数据读写都是原子的。读操作会读取一个znode的所有byte数据,写操作会替换所有的数据。每个node都有一个Access Control List(ACL)访问控制列表来约束操作。

Zookeeper不是为了被设计成一个通用的数据库,或者大的对象存储。它是用来协调数据。数据可以是以配置,状态等。在zk中存放的数据应该是比较小的数据(使用kb来衡量的)。对于大尺寸的数据,会让处理变的很慢。大尺寸的数据,更倾向于使用NFS或者是HDFS

  • Ephemeral Node

Ephemeral Node是指与session生命周期一致的node: 当 session创建时,这些node创建,当session关闭,node删除,因为这种特点,Ephemeral Node不能有child节点。

  • Sequence Node – Unique Naming 当创建节点时,可以让zookeepernode路径使用单调递增计数器创建node。计数器按照格式%010d进行,意味着10位数字的。

  • Container Nodes (3.6.0新增) Container Nodes是特殊用途的nodes,为某些用途比如leaderlock等。当container最后一个child删除了,那么container会被列入候选删除,并且在某个时间点被执行。 考虑到这点,需要应对KeeperException.NoNodeException,当需要在container内部创建child nodes。遇到这样的异常,就需要重新创建Container Node

  • Zookeeper中的时间

    • Zxid 每当zookeeper的状态发生转移,都会以zxid(Zookeeper Tansaction Id)的形式产生时间戳。使用它能确定zookeeper的所有变动。并且时间是绝对有序的。

    • Version numbers node的每一个更改都会增加node的其中一个版本号。每一个版本号均表示更改的次数。 有三个版本号: version(znode的版本号),cversion(znode的child的版本号)

    • Ticks 当使用多server的Zookeeperserver使用ticks来定义事件的事件,比如updatesession timeoutconnection timeouttick time可以间接从最小session timeout(tick time X 2)获得。如果一个client请求一个sessiontimeout比最小的session timeout要下,那么server会告诉clientsession timeout已经是最小的session timeout

    • Real time (真实时间)

    Zookeeper不使用真实时间或者时钟时间,只有两个例外,在创建或者更新znode的时候,放入znode数据结构中使用的时间戳。

  • Zookeeper 状态结构

每一个znode的状态结构由以下字段:

  • czxid

    创建时间关联的zxid, c表示 created。

  • mzxid

                最近的修改时间`zxid`,`m` -> modify
    
  • pzxid

                最近的parent修改时间`zxid`。
    
  • ctime

放一张session的状态图

状态图

在创建连接时,client需要提供一系列的server地址,比如(e.g. “127.0.0.1:4545” or “127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”)。默认会按照顺序,连接第一个,失败后,尝试后面的,直到连接成功。

默认连接时是定位到 / (root)目录的,可以在连接串后附加位置比如”127.0.0.1:4545/app/a” or “127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a” 。这样,连接的默认位置被定位到连接串所指示位置。

client获得Zookeeper service句柄,Zookeeper创建一个Zookeeper session,用64-bit的数据来表示,并且把它给client。如果client连向不同的Zookeeper server,会把session id作为握手连接的一部分。基于安全性考虑,server为每个session id创建了password,每一个Zookeeper server都能进行验证。 当session创建时,password是与session id一起发送给client的。当client需要与新的server重新建立session连接时,它就可以使用已有的session idpassword

创建zookeeper session的一个参数是timeout(以miliseconds)。client发送一个请求的timeout,server回复一个能够给予client的timeout。允许的最小timeout至少是2倍的tickTime,最大是20倍的tickTime。 在api中,client也允许使用负值的timeout。

当client(session)从ZK server集群分离出来,它会搜索当session创建时列出的servers。最终,当client与至少一个servers重新连接成功,session要么重新进入connected(如果重连发生在session尚未timeout)状态,或者进入expired状态(如果重连发生在session timeout之后)。当发生断开连接之后,不建议重新创建一个session object(new一个Zookeeper.class),因为ZK的client会处理重连。只有在session确认超时的情况下,才需要重新创建session。

Session超时是在Zookeeper集群中管理的,而不是客户端。Expirations(过期)发生条件是: cluster在超时时间内收不到client的消息(比如,没有心跳)。当session过期后,cluster会删除所有ephemeral阶段,并且通知所有watch这些节点的client。这时候,那个session timeoutclient是收不到这个通知的,因为它已经断开连接了。当它重新连接后,会收到session expired通知。

我们来总结一下client看到的连接状态机:

  1. ‘connected’: 连接状态,session 简历,并且clientcluster正常通信
  2. …. client 与cluster partitioned
  3. ‘disconnected’ client与cluster失联
  4. … 过了一段时间,当时间超过了cluster的timeout,client看不到任何变化,因为timeout是在cluster管理的,并且,client此时已与cluster失去连接。
  5. … 过了一段时间,client重新获得了与cluster网络级别的连接。(此时TCP通信刚刚能够进行)
  6. ‘expired’: 最终client重新连上了cluster,并且被通知已经过期。

Zookeeper session建立的另一个参数是default watcher。当任何状态发生变动时,client都会被通知。 session 是保持alive的,client不停向session发送请求,如果session空闲,那么client会发送PING 请求来保持session alive。PING request有两个作用,让server知道client活跃,同时,让client验证与server的连接状态。PING的时间足够保守,来确保合理的时间来发现一个dead connection并且重新连接到新的server。

一旦到server的连接建立了(established),基本来说,在执行同步或者异步操作时,有两种情况下会发生connectionloss(java中会抛出异常):

    1. 使用了不存在(或者不可用)的session
    1. server上的操作没结束,client与server丢失连接。比如,有一个等待的异步调用。

TODO 这里暂且停顿下。 > Added in 3.2.0 – SessionMovedException > 文档 file:///Users/yq/project/java/zookeeper/docs/zookeeperProgrammers.html

API的使用

使用zk的API,按照惯例,Zookeeper的应用会拆成两个单元,一个用来维护连接,一个用来监控数据。 在zk的javaExample中,Executor 维护连接,DataMonitor监控Zookeeper的数据树。Executor持有主线程,并且包含异常逻辑,以及交互逻辑(输入,关闭等)

Zookeeper Watches

Zookeeper的watcher,zk中所有的读操作(getData(), getChildren(), exists())都可以设置一个watch。watch有三个重要特征:

    1. 只触发一次 如果一个 client执行了getData(“/znode1”,true ),之后 /znode1更改了,那么client会收到一个watch event。 如果 “/znode1”再次改变,那么不会有watch event,除非client再次read。
    1. 是发送给client 的

这意味着顺序上,它不一定在更改成功操作的返回值之前到达。Watches是异步发送给Watchers的。 Zookeeper可以保证以下的顺序: 当client进行一项更改,它首先接收到的事件是watch event,然后才会拿到更改的返回值。

tickTime可以在zk的配置文件中声明。下面是一段zk的tutorial中给出的配置。 tickTime=2000 dataDir=/var/lib/zookeeper clientPort=2181 initLimit=5 syncLimit=2 server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888

zk的java api使用有一些不习惯的地方以getData为例,如果调用同步接口,返回值是byte[]类型,仅仅返回了node的值,如果需要node的状态信息,需要传入一个Stat入参,非null。zk会改变这个对象的状态。它其实也是返回值。 这一点,与scala的无副作用的设计偏好背离的。

这一段内容主要来自于zookeeperrecipes地址

1 zookeeper的主要概念: Name Service, Configuration, Group Membership

Name Service, ConfigurationZookeeper两个主要的应用。这两个功能通过Zookeeper的API直接提供。

另一个直接提供的功能是group membership。一个group用一个node来表示。group成员在node之下,创建 ephemeral(一年期的)node。 如果检测到成员的node失效,那么node会被移除。

2. Barriers

分布式系统使用barriers来block nodesprocess直到所有的node能够被允许。Barrier通过一个barrier node来指定。barrier node存在的情况下,才会出现barrier。下面是伪代码:

  • 客户端client调用 Zookeeper barrier nodeexists()方法,并且设置watchtrue

Client calls the ZooKeeper API’s exists() function on the barrier node, with watch set to true.

  • 如果existsfalse,那么表明没有barrierclient可以继续执行

  • watch事件触发后(这意味着某些状态发生了改变),client需要重新使用exists()

When the watch event is triggered, the client reissues the exists( ) call, again waiting until the barrier node is removed

Double Barriers

Double barrier使得clients能够同步计算的开始和结束。当barrier中已经有足够的processesprocessess会开始计算,并且完成后立刻离开barrier。下面我们演示如何使用zookeeper的node作为barrier

基本过程描述如下: 用b来表示barrier node。每个client的处理过程(process),在注册时进入barrier node,结束后,进行注销(unregister)。

node按照下面Enter的 步骤,在barrier node中进行注册,它一直等到x client process 注册之后,才会进行计算。(x 是由你的系统指定)

Enter的过程: 1. 创建命名n = b + "/" + p 2. 设置watch: exists(b + "/ready", true) 3. 创建child: create(n, EPHEMERAL) 4. L = getChildren(b, false) 5. 如果L的child比x少,那么等待watch event 6. 否则 create(b + ‘‘/ready’’, REGULAR)

下面的排版稍微有点问题。

Enter Leave
Create a name n = b+“/”+pSet watch: exists(b + ‘‘/ready’’, true)Create child: create( n, EPHEMERAL)L = getChildren(b, false)if fewer children in L than x, wait for watch eventelse create(b + ‘‘/ready’’, REGULAR) L = getChildren(b, false)if no children, exitif p is only process node in L, delete(n) and exitif p is the lowest process node in L, wait on highest process node in Lelse delete(n) if still exists and wait on lowest process node in Lgoto 1

###队列

在节点下,创建EPHEMERAL节点,每一个新元素表明入队时间。使用getChildren来进行可能锁死的出队(类似take)操作。

优先队列其实是在命名上加入优先级标示queue-YY, YY是优先级。在getChildren中进行逻辑判断。

Locks 锁

现在zk提供了锁的recipes,位于目录 src/recipes/lock

加锁的伪代码如下:

  1. 使用create 创建”locknode/lock-“ 节点,并且声明为EPHEMERAL和sequence
  2. getChildren,不进行watch。看看自己创建的是不是最小的。
  3. 如果自己创建的是最小的,那么就持有锁,该干嘛干嘛。如果不是,那么,就调用exists观察比自己小一号的。(这样做是为了避免herd effect)进入等待。
  4. 如果exists是false状态了。那么就说明自己可以持有锁了,转到步骤2。如果exists是true,那么继续等。

解锁的逻辑只要删除节点即可。

SharedLock

共享锁,共享锁的特性主要是读和写的分离。

过程描述如下,获取读锁:

  1. 在lock根节点下创建_read_开头的临时节点,标记创建属性为EPHEMERAL_SEQUENCE。
  2. 调用children,不进行watch,检查是否有比自己编号小的WRITE节点。
  3. 有比自己小的WRITE节点,使用exists观察离自己最近的WRITE节点,设置watch,如果观察到true,进入等待,被唤醒后进入步骤2,观察到false,直接进入步骤2。
  4. 如果没有比自己小的WRITE节点,即持有锁。可进行操作。

获取写锁:

  1. 在lock根节点下创建_write_开头的临时节点,标记创建属性为EPHEMERAL_SEQUENCE。
  2. 调用children,不进行watch,检查是否有比自己小的编号(read或者write)。
  3. 如果有比自己小的节点,使用exists观察李自己最近的节点(可以是read或者write),设置watch,如果观察到true,进入等待,被唤醒后进入步骤2,观察到false,直接进入步骤2。
  4. 如果没有比自己小的节点,则持有锁,进行操作。

关于herd effect,

共享锁的释放比较简单,删除节点。

共享锁的进阶,Recoverable Shared Lock。可回收共享锁。

可回收锁在共享锁的基础上做微小改动,当create临时节点之后,对当前创建成功的节点调用getData,设置watch flag。如果收到创建的通知,那么再次watch。当观察到getData中写入了unlock之后,其实是在提示释放锁,如果同意释放,那么删除。

Two-phased Commit 两阶段提交

两阶段提交协议是一种算法,用来通知分布式系统中的client要么提交一个事务,要么终止。

在Zk中,可以实现两阶段提交,通过一个coordinator创建事务节点,比如”/app/Tx”,每一个参与的site使用child node,比如”/app/Tx/s_i”。当coordinator创建了child node,content设置为undefined。每当涉及到事务的site收到coordinator的事务,site读取每一个child node并且设置watch。每一个site处理查询并且在对应的node中投票给”commit”或者”abort”。一旦写结束,那么其他sites会被通知到,并且一旦所有的sites投票结束,他们能够决定到底是”abort”或者是”commit”。 一个”node”是可以在提前决定”abort”,一旦有些站点投了”abort”。

比较有意思的一点是,coordiantor的唯一角色是决定站点组,创建Zookeeper节点,广播事务给对应的sites。实际上,甚至广播事务也能通过在事务节点上写操作进行。

其实,上面描述的方式有两个缺点。1. 消息复杂,O(n^2)。 因为没有一个变化都要通知到其他节点。 2. 无法使用ephemeral nodes来监测到失效站点。(因为创建节点是由cordinator来创建的),为了解决这个问题,需要site自己去创建节点。

解决第一个问题,只能由coordinator去通知事务的变化,当coordinator达成决定后,由cordinator通知站点。注意,这种方法是有办法提高的,但是它也更慢,因为要求所有的通信都从coodinator进行。

解决第二个问题,需要sites自己去创建ephemeral node。让 coordinator负责广播事务。

上面过程描述的只是两阶段提交的主要过程,具体广播的事务,以及执行,需要使用额外的通信(比如TCP,或者写node)来进行。

选举Leader

最简单的选举leader的方式,是使用SEQUNCE EPHEMRAL的方式创建选举节点,最小的即为leader。比如每一个参与者创建”/election/n_“节点,递增方式。

但是问题没那么简单,检查leader的失败,也是很重要的,这样的话,就能够重新推举一个leader。一个比较直接的方式,所有的node都watch那个最小编号的节点(这个节点是leader),当这个leader节点出问题(被删除),这样的话,判断谁是新的节点。但这种做法是具有herd effect的: 在当前leader失效后,所有的进程需要接收通知,并且在”/election”节点上执行getChildren。如果clients的数据很大,会出现zookeeper server处理次数的激增。为了避免集群效应,因此,需要watch比自己小一号的。这样的话,就避免了watch同一个节点导致的问题。

不过问题依然没有结束,当一个leader出问题,它的后继者是能够知道的,并且后继者知道自己已经成为了新的leader。但是其他节点并不知道,因此,需要一个额外的进程,通知其他所有的clients。leader已经发生了变化。

自己动手写Zk的感受

  1. zk只是提供了底层的原子性,可以类比CPU的原子操作。高层的功能实现还是需要自己挨个来。比如上面的Barrier,Queue功能。
  2. 该加锁的地方还是要加锁,除非本地不使用并发。
  3. zk的watch是一次性的!!!因此,如果需要获得正常的数据。比较好的写法还是使用wile(true),在循环中getData,获取不到时,使用mutex 阻塞当前操作,直到数据发生变更,通知过来再唤醒。
  4. getChildren。

在使用zk时,加锁的地方需要注意。我们都知道,锁的 wait和notify(notifyAll)方法必须在synchronized里面调用。在scala中,如果不这么写,是不会抛出异常的。。这点非常不合适。

Published: September 14 2016

blog comments powered by Disqus