发布时间:2024-01-18 17:30
zookeeper是一个分布式的, 开源的分布式应用程序协调服务,是对Google的Chubby组件的开源实现,为Hadoop和HBase的运行提供了相应的服务.他是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护(使得集群中的机器可以共享配置信息中的那些公共的部分);命名服务(是指通过指定的名字来获取资环或者服务的地址,以及提供者的信息,利用zookeeper可以很容易的创建一个全局的路径,而这个路径就可以作为一个名字,可以指向集群中的机器,提供的服务的地址,远程对象等);分布式同步(可以使得集群中各个节点具有相同的系统状态);组服务(通过zookeeper的短暂节点的特征,来监控每个应用程序的上线和下线)等。他是用java语言来编写,通过Zab协议来保证节点的一致性。zookeeper的目标就是封装好复杂容易出错的关键服务,将简单易用的接口和性能高效,功能稳定的提供给用户。
zookeeper服务自身组成一个集群(2n+1个节点,允许哪个节点失效)。zookeeper服务具有三个角色,一个是Leader,一个是Follower,一个是Observer。
Leader :为客户端提供读写服务,并维护集群状态,它是由集群选举所产生的;
Follower :为客户端提供读写服务,并定期向 Leader 汇报自己的节点状态。同时也参与写操作“过半写成功”的策略和 Leader 的选举;
Observer :为客户端提供读写服务,并定期向 Leader 汇报自己的节点状态,但不参与写操作“过半写成功”的策略和 Leader 的选举,因此 Observer 可以在不影响写性能的情况下提升集群的读性能。
客户端(Client)可以选择连接到zookeeper集群中的每个服务端(server),并且每个服务端的数据完全相同,每个从节点都需要与主节点进行通信,并同步主节点上更新的数据。
对于zookeeper集群来说,zookeeper只要超过一般数量的服务端可用,那么zookeeper整体服务就是可用的。
1.4:zookeeper的工作原理:
zookeeper的核心原理就是原子广播,该原子广播就是对zookeeper集群上所有主机数据包,通过这个机制保证了各个服务端之前的数据同步,那么实现这个机制在zookeeper中有一个内部协议(Zab协议),这个协议有两种模式,一个是恢复模式,一个是广播模式。
当服务启动或者是在主节点崩溃之后,Zab协议就进入了恢复模式,当主节点再次被选举出来,且大多数服务端完成了和主节点的状态同步之后,恢复模式就结束了,状态同步保证了主节点和服务端具有相同的系统状态,一旦主节点已经和多数的从节点(也就是服务节点)进行了状态同步之后,他就可以开始广播消息即进入到了广播模式。
在广播模式下,服务端会接收客户端的请求,所有的写请求都被转发给leader,再有leader将更新广播给follower节点,当半数以上的follower完成数据写请求之后,leader才会提交这个更新,然后客户端才会收到一个更新成功的响应。
Zookeeper集群搭建指的是ZooKeeper分布式模式安装。通常由2n+1台server组成。这是因为为了保证Leader选举(基于Paxos算法的实现)能过得到多数的支持,所以ZooKeeper集群的数量一般为奇数。
Zookeeper运行需要java环境,所以需要提前安装jdk。对于安装leader+follower模式的集群,大致过程如下:
如果要想使用Observer模式,可在对应节点的配置文件添加如下配置:
peerType=observer
其次,必须在配置文件指定哪些节点被指定为Observer,如:
server.1:node1:2181:3181:observer
这里,我们安装的是leader+follower模式
主机规划:
服务器IP | 主机名 | myid的值 |
---|---|---|
192.168.88.161 | node1 | 1 |
192.168.88.162 | node2 | 2 |
192.168.88.163 | node3 | 3 |
地址规划:
软件名 | 地址 |
---|---|
zookeeper-3.4.6.tar.gz | /export/softwate |
zookeeper-3.4.6 | /export/servers |
zookeeper数据路径 | /export/servers/zookeeper-3.4.6/zkdatas |
下载网址如下http://archive.apache.org/dist/zookeeper/
我们在这个网址下载我们使用的zk版本为3.4.6
下载完成之后,上传到我们的linux的/export/software路径下准备进行安装
在node1主机上,解压zookeeper的压缩包到/export/server路径下去,然后准备进行安装
tar -zxvf zookeeper-3.4.6.tar.gz -C /export/servers/
在node1主机上,修改配置文件
cd /export/servers/zookeeper-3.4.6/conf/
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
修改以下内容
# The number of milliseconds of each tick
# 这个时间是zookeeper服务器之间或者服务器与客户端之间维持心跳的时间机制
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
# 配置zookeeper接收客户端初始化连接时最长能忍受多少个心跳时间间隔数
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
# leader和follower之间发送消息,请求和应答时间长度
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
# 数据目录,需要提前创建
dataDir=/export/servers/zookeeper-3.4.6/zkdatas
# 日志目录,也需要提前创建
dataLogDir=/export/servers/zookeeper-3.4.6/zkLogs
# the port at which the clients will connect
# 访问端口号
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
# 在dataDir中保留快照的数量
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to \"0\" to disable auto purge feature
# 设置清除任务间隔
autopurge.purgeInterval=1
# zookeeper cluster properties
# 设置每个节点上的服务
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
在node1主机的/export/server/zookeeper-3.4.6/zkdatas/这个路径下创建一个文件,文件名为myid ,文件内容为1
echo 1 > /export/servers/zookeeper-3.4.6/zkdatas/myid
在node1主机上,将安装包分发到其他机器
第一台机器上面执行以下两个命令
scp -r /export/servers/zookeeper-3.4.6 node2:$PWD
scp -r /export/servers/zookeeper-3.4.6 node3:$PWD
第二台机器上修改myid的值为2
echo 2 > /export/servers/zookeeper-3.4.6/zkdatas/myid
第三台机器上修改myid的值为3
echo 3 > /export/servers/zookeeper-3.4.6/zkdatas/myid
三台机器分别启动zookeeper服务
这个命令三台机器都要执行
/export/servers/zookeeper-3.4.6/bin/zkServer.sh start
三台主机分别查看启动状态
/export/servers/zookeeper-3.4.6/bin/zkServer.sh status
zookeeper维护这一个树状层次结构,树中的节点被称之为znode,znode可以用来存储数据,并且有一个与之相关联的ACL(Access Control List:访问控制列表,用于控制资源的访问权限)。zookeeper被设计用来实现协调服务(通常使用小数据文件),而不适用于大容量数据存储,因此一个znode能存储的数据被限制在1MB以内,znode的树状结构如下图所示:
图中的每个节点称为一个Znode。 每个Znode由3部分组成:
ZooKeeper的数据模型,在结构上和标准文件系统的非常相似,拥有一个层次的命名空间,都是采用树形层次结构,ZooKeeper树中的每个节点被称为—Znode。和文件系统的目录树一样,ZooKeeper树中的每个节点可以拥有子节点。但也有不同之处:
Znode兼具文件和目录两种特点,既像文件一样维护着数据、元信息、ACL、时间戳等数据结构,又像目录一样可以作为路径标识的一部分,并可以具有子znode。用户对znode具有增、删、改、查等操作(权限允许的情况下)。
Znode具有原子性操作,读操作将获取与节点相关的所有数据,写操作也将替换掉节点的所有数据。另外,每一个节点都拥有自己的ACL(访问控制列表),这个列表规定了用户的权限,即限定了特定用户对目标节点可以执行的操作。
Znode存储数据大小有限制,ZooKeeper虽然可以关联一些数据,但并没有被设计为常规的数据库或者大数据存储,相反的是,它用来管理调度数据,比如分布式应用中的配置文件信息、状态信息、汇集位置等等。这些数据的共同特性就是它们都是很小的数据,通常以KB为大小单位。ZooKeeper的服务器和客户端都被设计为严格检查并限制每个Znode的数据大小至多1M,当时常规使用中应该远小于此值。
Znode通过路径引用,如同Unix中的文件路径。路径必须是绝对的,因此他们必须由斜杠字符来开头。除此以外,他们必须是唯一的,也就是说每一个路径只有一个表示,因此这些路径不能改变。在ZooKeeper中,路径由Unicode字符串组成,并且有一些限制。字符串\"/zookeeper\"用以保存管理信息,比如关键配额信息。
① stat:此为状态信息, 描述该Znode的版本, 权限等信息
② data:与该Znode关联的数据
③ children:该Znode下的子节点
znode是客户端访问的zookeeper的主要实体。
Znode有两种,分别为临时节点和永久节点。
节点的类型在创建时即被确定,并且不能改变。
Znode还有一个序列化的特性,如果创建的时候指定的话,该Znode的名字后面会自动追加一个不断增加的序列号。序列号对于此节点的父节点来说是唯一的,这样便会记录每个子节点创建的先后顺序。它的格式为“%10d”(10位数字,没有数值的数位用0补充,例如“0000000001”)。
这样便会存在四种类型的Znode节点,分别对应:
PERSISTENT:永久节点
EPHEMERAL:临时节点
PERSISTENT_SEQUENTIAL:永久节点、序列化
EPHEMERAL_SEQUENTIAL:临时节点、序列化
ZooKeeper提供了分布式数据发布/订阅功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使他们能够做出相应的处理。
ZooKeeper中,引入了Watcher机制来实现这种分布式的通知功能。ZooKeeper允许客户端向服务端注册一个Watcher监听,当服务端的一些事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。
触发事件种类很多,如:节点创建,节点删除,节点改变,子节点改变等。
总的来说可以概括Watcher为以下三个过程:客户端向服务端注册Watcher、服务端事件发生触发Watcher、客户端回调Watcher得到触发事件情况
事件发生触发监听,一个watcher event就会被发送到设置监听的客户端,这种效果是一次性的,后续再次发生同样的事件,不会再次触发。
ZooKeeper使用WatchedEvent对象来封装服务端事件并传递。
WatchedEvent包含了每一个事件的三个基本属性:
通知状态(keeperState),事件类型(EventType)和节点路径(path)
event 异步发送
watcher的通知事件从服务端发送到客户端是异步的。
Zookeeper中的watch机制,必须客户端先去服务端注册监听,这样事件发送才会触发监听,通知给客户端。
同一个事件类型在不同的通知状态中代表的含义有所不同,下表列举了常见的通知状态和事件类型。
事件封装: Watcher
得到的事件是被封装过的, 包括三个内容 keeperState, eventType, path
KeeperState (通知状态) | EventType (事件类型) | 触发条件 | 说明 |
---|---|---|---|
None | 连接成功 | ||
SyncConnected | NodeCreated | Znode被创建 | 此时处于连接状态 |
SyncConnected | NodeDeleted | Znode被删除 | 此时处于连接状态 |
SyncConnected | NodeDataChanged | Znode数据被改变 | 此时处于连接状态 |
SyncConnected | NodeChildChanged | Znode的子Znode数据被改变 | 此时处于连接状态 |
Disconnected | None | 客户端和服务端断开连接 | 此时客户端和服务器处于断开连接状态 |
Expired | None | 会话超时 | 会收到一个SessionExpiredExceptio |
AuthFailed | None | 权限验证失败 | 会收到一个AuthFailedException |
其中连接状态事件(type=None, path=null)不需要客户端注册,客户端只要有需要直接处理就行了。
设置节点数据变动监听:
通过另一个客户端更改节点数据:
此时设置监听的节点收到通知:
运行 zkCli.sh –server ip 进入命令行工具。
cd /export/servers/zookeeper-3.4.6/bin
./zkCli.sh -server node1:2181
格式:
create [-s][-e] path data acl
其中,-s 表示创建一个序列化节点
-e 表示创建一个临时节点
啥也不加表示创建了一个永久节点
acl:权限控制
创建永久节点:
create /aaa 123456
创建临时节点
create -e /linshijiedian 123456
创建永久序列化节点
create -s /abc 123
创建临时序列化节点
create -s -e /fff 123345
与读取相关的命令有 ls 命令和 get 命令,ls和ls2 命令可以列出 Zookeeper 指定节点下的所有子节点,只能查看指定节点下的第一级的所有子节点;get 命令可以获取 Zookeeper 指定节点的数据内容和属性信息。
格式:
ls path [watch]
get path [watch]
ls2 path [watch]
dataVersion:数据版本号,每次对节点进行set操作,dataVersion的值都会增加1(即使设置的是相同的数据),可有效避免了数据更新时出现的先后顺序问题。
cversion :子节点的版本号。当znode的子节点有变化时,cversion 的值就会增加1。
cZxid :Znode创建的事务id。
mZxid :Znode被修改的事务id,即每次对znode的修改都会更新mZxid。
对于zk来说,每次的变化都会产生一个唯一的事务id,zxid(ZooKeeper Transaction Id)。通过zxid,可以确定更新操作的先后顺序。例如,如果zxid1小于zxid2,说明zxid1操作先于zxid2发生,zxid对于整个zk都是唯一的,即使操作的是不同的znode。
ctime:节点创建时的时间戳.
mtime:节点最新一次更新发生时的时间戳.
ephemeralOwner:如果该节点为临时节点, ephemeralOwner值表示与该节点绑定的session id. 如果不是, ephemeralOwner值为0.
在client和server通信之前,首先需要建立连接,该连接称为session。连接建立后,如果发生连接超时、授权失败,或者显式关闭连接,连接便处于CLOSED状态, 此时session结束。
格式:
set path data [version]
data 就是要更新的新内容,version 表示数据版本
格式:
delete path [version]
若删除节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点
rmr path: 可以递归删除节点。
格式1:
setquota -n|-b val path
-n:表示子节点的最大个数
-b:表示数据值的最大长度
val:子节点最大个数或数据值的最大长度
path:节点路径
格式2: listquota path : 列出指定节点的 quota
注意: 在实际操作的时候, 虽然设置了最大的节点数后,依然可以在整个节点下添加多个子节点, 只是会在zookeeper中的日志文件中记录一下警告信息
格式3: delquota [-n|-b] path : 删除 quota
history: 列出命令历史
redo:该命令可以重新执行指定命令编号的历史命令,命令编号可以通过
这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端框架 Curator ,解决了很多Zookeeper客户端非常底层的细节开发工作 。
Curator包含了几个包:
curator-framework:对zookeeper的底层api的一些封装
curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器等
Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能导致节点操作失败):
<dependencies>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-frameworkartifactId>
<version>2.12.0version>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-recipesartifactId>
<version>2.12.0version>
dependency>
<dependency>
<groupId>com.google.collectionsgroupId>
<artifactId>google-collectionsartifactId>
<version>1.0version>
dependency>
<dependency>
<groupId>junitgroupId>
<artifactId>junitartifactId>
<version>RELEASEversion>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-simpleartifactId>
<version>1.7.25version>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.2version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
plugins>
build>
java 操作zookeeper的相关操作:
package com.lixufei.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
/**
* @Author lixufei
* @Date 2021/12/12 15:03
* @Version 1.0
*/
public class ZookeeperTest{
/**
* 使用java操纵zookeeper,创建节点
*/
@Test
public void createZnode() throws Exception {
// 创建java操作zookeeper的客户端对象
String connectString = \"192.168.88.161:2181,192.168.88.162:2181,192.168.88.163:2181\";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework zookeeperClient = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
// 启动客户端
zookeeperClient.start();
// 执行相关的操作
/**
* CreateMode.PERSISTENT:永久节点
* CreateMode.PERSISTENT_SEQUENTIAL:永久序列化节点
* CreateMode.EPHEMERAL:临时节点
* CreateMode.EPHEMERAL_SEQUENTIAL:临时序列化节点
*/
zookeeperClient.create().withMode(CreateMode.PERSISTENT).forPath(\"/test04\",\"123456\".getBytes());
// 释放资源
zookeeperClient.close();
}
/**
* 使用java操纵zookeeper,修改节点
*/
@Test
public void updataZnode() throws Exception {
// 创建java操作zookeeper的客户端对象
String connectString = \"192.168.88.161:2181,192.168.88.162:2181,192.168.88.163:2181\";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework zookeeperClient = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
// 启动客户端
zookeeperClient.start();
// 执行相关的操作
zookeeperClient.setData().forPath(\"/test03\",\"lixufei123456\".getBytes());
// 释放资源
zookeeperClient.close();
}
/**
* 使用java操纵zookeeper,删除节点
*/
@Test
public void deleteZnode() throws Exception {
// 创建java操作zookeeper的客户端对象
String connectString = \"192.168.88.161:2181,192.168.88.162:2181,192.168.88.163:2181\";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework zookeeperClient = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
// 启动客户端
zookeeperClient.start();
// 执行相关的操作
zookeeperClient.delete().forPath(\"/test04\");
// 释放资源
zookeeperClient.close();
}
/**
* 使用java操纵zookeeper,查询节点
*/
@Test
public void selectZnode() throws Exception {
// 创建java操作zookeeper的客户端对象
String connectString = \"192.168.88.161:2181,192.168.88.162:2181,192.168.88.163:2181\";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework zookeeperClient = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
// 启动客户端
zookeeperClient.start();
// 执行相关的操作
byte[] bytes = zookeeperClient.getData().forPath(\"/test03\");
System.out.println(new String(bytes));
// 释放资源
zookeeperClient.close();
}
}
zookeeper默认的算法是FastLeaderElection,采用投票数大于半数则胜出的逻辑。
服务器ID
比如有三台服务器,编号分别是1,2,3。
编号越大在选择算法中的权重越大。
选举状态
LOOKING,竞选状态。
FOLLOWING,随从状态,同步leader状态,参与投票。
OBSERVING,观察状态,同步leader状态,不参与投票。
LEADING,领导者状态。
数据ID
服务器中存放的最新数据version。
值越大说明数据越新,在选举算法中数据越新权重越大。
逻辑时钟
也叫投票的次数,同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比,根据不同的值做出不同的判断。
假设目前有5台服务器,每台服务器均没有数据,它们的编号分别是1,2,3,4,5,按编号依次启动,它们的选择举过程如下:
服务器1启动自己,给投票,然后发投票信息,由于其它机器还没有启动所以它收不到反馈信息,服务器1的状态一直属于Looking。
服务器2启动,给自己投票,同时与之前启动的服务器1交换结果,由于服务器2的编号大所以服务器2胜出,但此时投票数没有大于半数,所以两个服务器的状态依然是LOOKING。
服务器3启动,给自己投票,同时与之前启动的服务器1,2交换信息,由于服务器3的编号最大所以服务器3胜出,此时投票数正好大于半数,所以服务器3成为领导者,服务器1,2成为小弟。
服务器4启动,给自己投票,同时与之前启动的服务器1,2,3交换信息,尽管服务器4的编号大,但之前服务器3已经胜出,所以服务器4只能成为小弟。
服务器5启动,后面的逻辑同服务器4成为小弟。
对于运行正常的zookeeper集群,中途有机器down掉,需要重新选举时,选举过程就需要加入数据ID、服务器ID和逻辑时钟。
数据ID:数据新的version就大,数据每次更新都会更新version。
服务器ID:就是我们配置的myid中的值,每个机器一个。
逻辑时钟:这个值从0开始递增,每次选举对应一个值。 如果在同一次选举中,这个值是一致的。
这样选举的标准就变成:
1、逻辑时钟小的选举结果被忽略,重新投票;
2、统一逻辑时钟后,数据id大的胜出;
3、数据id相同的情况下,服务器id大的胜出;
根据这个规则选出leader。
ZAB 协议是 Zookeeper 专门设计的一种支持崩溃恢复的原子广播协议。通过该协议,Zookeepe 基于主从模式的系统架构来保持集群中各个副本之间数据的一致性。具体如下:
Zookeeper 使用一个单一的主进程来接收并处理客户端的所有事务请求,并采用原子广播协议将数据状态的变更以事务 Proposal 的形式广播到所有的副本进程上去。如下图:
具体流程如下:
所有的事务请求必须由唯一的 Leader 服务来处理,Leader 服务将事务请求转换为事务 Proposal,并将该 Proposal 分发给集群中所有的 Follower 服务。如果有半数的 Follower 服务进行了正确的反馈,那么 Leader 就会再次向所有的 Follower 发出 Commit 消息,要求将前一个 Proposal 进行提交。
ZAB 协议包括两种基本的模式,分别是崩溃恢复和消息广播:
当整个服务框架在启动过程中,或者当 Leader 服务器出现异常时,ZAB 协议就会进入恢复模式,通过过半选举机制产生新的 Leader,之后其他机器将从新的 Leader 上同步状态,当有过半机器完成状态同步后,就退出恢复模式,进入消息广播模式。
ZAB 协议的消息广播过程使用的是原子广播协议。在整个消息的广播过程中,Leader 服务器会每个事物请求生成对应的 Proposal,并为其分配一个全局唯一的递增的事务 ID(ZXID),之后再对其进行广播。具体过程如下:
Leader 服务会为每一个 Follower 服务器分配一个单独的队列,然后将事务 Proposal 依次放入队列中,并根据 FIFO(先进先出) 的策略进行消息发送。Follower 服务在接收到 Proposal 后,会将其以事务日志的形式写入本地磁盘中,并在写入成功后反馈给 Leader 一个 Ack 响应。当 Leader 接收到超过半数 Follower 的 Ack 响应后,就会广播一个 Commit 消息给所有的 Follower 以通知其进行事务提交,之后 Leader 自身也会完成对事务的提交。而每一个 Follower 则在接收到 Commit 消息后,完成事务的提交。
数据的发布/订阅系统,通常也用作配置中心。在分布式系统中,你可能有成千上万个服务节点,如果想要对所有服务的某项配置进行更改,由于数据节点过多,你不可逐台进行修改,而应该在设计时采用统一的配置中心。之后发布者只需要将新的配置发送到配置中心,所有服务节点即可自动下载并进行更新,从而实现配置的集中管理和动态更新。
Zookeeper 通过 Watcher 机制可以实现数据的发布和订阅。分布式系统的所有的服务节点可以对某个 ZNode 注册监听,之后只需要将新的配置写入该 ZNode,所有服务节点都会收到该事件。
在分布式系统中,通常需要一个全局唯一的名字,如生成全局唯一的订单号等,Zookeeper 可以通过顺序节点的特性来生成全局唯一 ID,从而可以对分布式系统提供命名服务。
分布式系统一个重要的模式就是主从模式 (Master/Salves),Zookeeper 可以用于该模式下的 Matser 选举。可以让所有服务节点去竞争性地创建同一个 ZNode,由于 Zookeeper 不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,这样该服务节点就可以成为 Master 节点。
可以通过 Zookeeper 的临时节点和 Watcher 机制来实现分布式锁,这里以排它锁为例进行说明:
分布式系统的所有服务节点可以竞争性地去创建同一个临时 ZNode,由于 Zookeeper 不能有路径相同的 ZNode,必然只有一个服务节点能够创建成功,此时可以认为该节点获得了锁。其他没有获得锁的服务节点通过在该 ZNode 上注册监听,从而当锁释放时再去竞争获得锁。锁的释放情况有以下两种:
当锁被释放后,其他服务节点则再次去竞争性地进行创建,但每次都只有一个服务节点能够获取到锁,这就是排他锁。
Zookeeper 还能解决大多数分布式系统中的问题:
为了避免存储在 Zookeeper 上的数据被其他程序或者人为误修改,Zookeeper 提供了 ACL(Access Control Lists) 进行权限控制。只有拥有对应权限的用户才可以对节点进行增删改查等操作。下文分别介绍使用原生的 Shell 命令和 Apache Curator 客户端进行权限设置。
想要给某个节点设置权限 (ACL),有以下两个可选的命令:
# 1.给已有节点赋予权限
setAcl path acl
# 2.在创建节点时候指定权限
create [-s] [-e] path data acl
查看指定节点的权限命令如下:
getAcl path
Zookeeper 的权限由[scheme : id :permissions]三部分组成,其中 Schemes 和 Permissions 内置的可选项分别如下:
Permissions 可选项:
Schemes 可选项:
world:anyone:[permissons]
;auth:user:password:[permissons]
,使用这种模式时,你需要先进行登录,之后采用 auth 模式设置权限时,user
和 password
都将使用登录的用户名和密码;auth:user:BASE64(SHA1(password)):[permissons]
,这种形式下的密码必须通过 SHA1 和 BASE64 进行双重加密;ip:182.168.0.168:[permissions]
;可以使用如下所示的命令为当前 Session 添加用户认证信息,等价于登录操作。
# 格式
addauth scheme auth
#示例:添加用户名为heibai,密码为root的用户认证信息
addauth digest heibai:root
world 是一种默认的模式,即创建时如果不指定权限,则默认的权限就是 world。
[zk: localhost:2181(CONNECTED) 32] create /hadoop 123
Created /hadoop
[zk: localhost:2181(CONNECTED) 33] getAcl /hadoop
\'world,\'anyone #默认的权限
: cdrwa
[zk: localhost:2181(CONNECTED) 34] setAcl /hadoop world:anyone:cwda # 修改节点,不允许所有客户端读
....
[zk: localhost:2181(CONNECTED) 35] get /hadoop
Authentication is not valid : /hadoop # 权限不足
[zk: localhost:2181(CONNECTED) 36] addauth digest heibai:heibai # 登录
[zk: localhost:2181(CONNECTED) 37] setAcl /hadoop auth::cdrwa # 设置权限
[zk: localhost:2181(CONNECTED) 38] getAcl /hadoop # 获取权限
\'digest,\'heibai:sCxtVJ1gPG8UW/jzFHR0A1ZKY5s= #用户名和密码 (密码经过加密处理),注意返回的权限类型是 digest
: cdrwa
#用户名和密码都是使用登录的用户名和密码,即使你在创建权限时候进行指定也是无效的
[zk: localhost:2181(CONNECTED) 39] setAcl /hadoop auth:root:root:cdrwa #指定用户名和密码为 root
[zk: localhost:2181(CONNECTED) 40] getAcl /hadoop
\'digest,\'heibai:sCxtVJ1gPG8UW/jzFHR0A1ZKY5s= #无效,使用的用户名和密码依然还是 heibai
: cdrwa
[zk:44] create /spark \"spark\" digest:heibai:sCxtVJ1gPG8UW/jzFHR0A1ZKY5s=:cdrwa #指定用户名和加密后的密码
[zk:45] getAcl /spark #获取权限
\'digest,\'heibai:sCxtVJ1gPG8UW/jzFHR0A1ZKY5s= # 返回的权限类型是 digest
: cdrwa
到这里你可以发现使用 auth
模式设置的权限和使用 digest
模式设置的权限,在最终结果上,得到的权限模式都是 digest
。某种程度上,你可以把 auth
模式理解成是 digest
模式的一种简便实现。因为在 digest
模式下,每次设置都需要书写用户名和加密后的密码,这是比较繁琐的,采用 auth
模式就可以避免这种麻烦。
限定只有特定的 ip 才能访问。
[zk: localhost:2181(CONNECTED) 46] create /hive \"hive\" ip:192.168.0.108:cdrwa
[zk: localhost:2181(CONNECTED) 47] get /hive
Authentication is not valid : /hive # 当前主机已经不能访问
这里可以看到当前主机已经不能访问,想要能够再次访问,可以使用对应 IP 的客户端,或使用下面介绍的 super
模式。
需要修改启动脚本 zkServer.sh
,并在指定位置添加超级管理员账户和密码信息:
\"-Dzookeeper.DigestAuthenticationProvider.superDigest=heibai:sCxtVJ1gPG8UW/jzFHR0A1ZKY5s=\"
修改完成后需要使用 zkServer.sh restart
重启服务,此时再次访问限制 IP 的节点:
[zk: localhost:2181(CONNECTED) 0] get /hive #访问受限
Authentication is not valid : /hive
[zk: localhost:2181(CONNECTED) 1] addauth digest heibai:heibai # 登录 (添加认证信息)
[zk: localhost:2181(CONNECTED) 2] get /hive #成功访问
hive
cZxid = 0x158
ctime = Sat May 25 09:11:29 CST 2019
mZxid = 0x158
mtime = Sat May 25 09:11:29 CST 2019
pZxid = 0x158
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0
这里以 Apache Curator 为例,使用前需要导入相关依赖,完整依赖如下:
<dependencies>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-frameworkartifactId>
<version>4.0.0version>
dependency>
<dependency>
<groupId>org.apache.curatorgroupId>
<artifactId>curator-recipesartifactId>
<version>4.0.0version>
dependency>
<dependency>
<groupId>org.apache.zookeepergroupId>
<artifactId>zookeeperartifactId>
<version>3.4.13version>
dependency>
<dependency>
<groupId>junitgroupId>
<artifactId>junitartifactId>
<version>4.12version>
dependency>
dependencies>
Apache Curator 权限设置的示例如下:
public class AclOperation {
private CuratorFramework client = null;
private static final String zkServerPath = \"192.168.0.226:2181\";
private static final String nodePath = \"/hadoop/hdfs\";
@Before
public void prepare() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.authorization(\"digest\", \"heibai:123456\".getBytes()) //等价于 addauth 命令
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace(\"workspace\").build();
client.start();
}
/**
* 新建节点并赋予权限
*/
@Test
public void createNodesWithAcl() throws Exception {
List<ACL> aclList = new ArrayList<>();
// 对密码进行加密
String digest1 = DigestAuthenticationProvider.generateDigest(\"heibai:123456\");
String digest2 = DigestAuthenticationProvider.generateDigest(\"ying:123456\");
Id user01 = new Id(\"digest\", digest1);
Id user02 = new Id(\"digest\", digest2);
// 指定所有权限
aclList.add(new ACL(Perms.ALL, user01));
// 如果想要指定权限的组合,中间需要使用 | ,这里的|代表的是位运算中的 按位或
aclList.add(new ACL(Perms.DELETE | Perms.CREATE, user02));
// 创建节点
byte[] data = \"abc\".getBytes();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(aclList, true)
.forPath(nodePath, data);
}
/**
* 给已有节点设置权限,注意这会删除所有原来节点上已有的权限设置
*/
@Test
public void SetAcl() throws Exception {
String digest = DigestAuthenticationProvider.generateDigest(\"admin:admin\");
Id user = new Id(\"digest\", digest);
client.setACL()
.withACL(Collections.singletonList(new ACL(Perms.READ | Perms.DELETE, user)))
.forPath(nodePath);
}
/**
* 获取权限
*/
@Test
public void getAcl() throws Exception {
List<ACL> aclList = client.getACL().forPath(nodePath);
ACL acl = aclList.get(0);
System.out.println(acl.getId().getId()
+ \"是否有删读权限:\" + (acl.getPerms() == (Perms.READ | Perms.DELETE)));
}
@After
public void destroy() {
if (client != null) {
client.close();
}
}
}