有本叫《ZooKeeper分布式过程协同技术详解》,不过我也没看过,😄。
https://zookeeper.apache.org/documentation.html 学会基本概念后,一定要去看官方文档,增强自己。自上而下,先从整体简单内容入手,再细读文档。
Apache ZooKeeper(简称 ZK)是一个开源的分布式协调服务框架,由 Apache Hadoop 项目孵化而来,主要为分布式应用提供高效、可靠的协调机制。它本质上是一个分布式的、类似文件系统的存储系统,但更专注于数据一致性和协调功能,而不是海量数据存储。ZK 使用 ZAB(ZooKeeper Atomic Broadcast)协议(基于 Paxos 算法的变体)来确保数据的一致性、高可用性和原子性操作。 简单来说,它像一个“分布式大脑”,帮助多个服务器或进程在网络环境下协作,避免混乱。
在单机应用时代,一切简单。但分布式系统(如多个服务器协同工作)会面临诸多挑战:网络延迟、节点崩溃、数据不一致等。ZooKeeper 的诞生就是为了解决这些分布式协调问题,提供一个中心化的(但高可用)协调点,让开发者不用从零构建复杂的分布式逻辑。
具体解决的问题包括:
总之,ZK 不是存储大数据的工具,而是“胶水”——让分布式系统更可靠、高性能。 它特别适合对吞吐量有要求的场景,如大数据和微服务架构。
实际工程中的例子
在实际项目中,ZooKeeper 广泛用于大型分布式系统。下面举几个真实案例,展示它如何解决具体痛点:
| 工程/项目 | 问题描述 | ZooKeeper 如何解决 |
|---|---|---|
| Hadoop/HBase | 在 HDFS(Hadoop Distributed File System)中,多个 NameNode 需要协调元数据一致性;HBase 的 RegionServer 需要动态负载均衡。 | ZK 作为协调中心,实现 Leader 选举和配置同步,确保元数据一致,避免数据丢失或不一致。 |
| Apache Kafka | Kafka 的 Broker 和分区需要 Leader 选举;消费者组需要协调分区分配。 | ZK 用于存储 Broker 注册信息和 Leader 元数据,支持故障时快速选举新 Leader,实现高可用分区管理。(注:Kafka 3.3+ 已迁移到 KRaft,但早期依赖 ZK) |
| Dubbo(阿里巴巴微服务框架) | 微服务间服务发现难,配置变更需手动同步,导致部署复杂。 | ZK 作为注册中心,服务提供者注册地址,消费者动态发现;支持配置中心实时推送变更,实现无感知更新。 |
| ShardingSphere(数据库分片中间件) | 多节点 Sharding-Proxy 需要统一分片规则和节点管理,防止规则不一致导致数据倾斜。 | ZK 作为配置中心,存储分片规则,支持节点注册和监控,实现集群统一管理和故障转移。 |
mkdir zookeeper-tutorial && cd zookeeper-tutorial使用 Docker Compose 搭建 3 节点 ZooKeeper 集群
docker-compose.yml
version: '3.9'
services:
zk1:
image: zookeeper:3.9.4
container_name: zk1
restart: unless-stopped
hostname: zk1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
ZOO_4LW_COMMANDS_WHITELIST: "*"
volumes:
- zk1_data:/data
- zk1_datalog:/datalog
zk2:
image: zookeeper:3.9.4
container_name: zk2
restart: unless-stopped
hostname: zk2
ports:
- "2182:2181"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
ZOO_4LW_COMMANDS_WHITELIST: "*"
volumes:
- zk2_data:/data
- zk2_datalog:/datalog
zk3:
image: zookeeper:3.9.4
container_name: zk3
restart: unless-stopped
hostname: zk3
ports:
- "2183:2181"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
ZOO_4LW_COMMANDS_WHITELIST: "*"
volumes:
- zk3_data:/data
- zk3_datalog:/datalog
volumes:
zk1_data:
zk1_datalog:
zk2_data:
zk2_datalog:
zk3_data:
zk3_datalog:对docker-compose.yml解释:
停止集群并清理旧集群(删除volumes以避免残留数据)
docker-compose down -v启动集群:
docker-compose up -d验证启动 检查是否有报错信息:
docker logs zk1
docker logs zk2
docker logs zk3进到容器里
docker exec -it zk1 bash
root@zk3:/apache-zookeeper-3.9.4-bin# ls
bin conf docs lib LICENSE.txt NOTICE.txt README.md README_packaging.md
root@zk3:/apache-zookeeper-3.9.4-bin/bin# ll -lrt
total 88
-rwxr-xr-x 1 zookeeper zookeeper 1385 Aug 19 19:49 zkTxnLogToolkit.sh*
-rwxr-xr-x 1 zookeeper zookeeper 996 Aug 19 19:49 zkTxnLogToolkit.cmd*
-rwxr-xr-x 1 zookeeper zookeeper 1377 Aug 19 19:49 zkSnapShotToolkit.sh*
-rwxr-xr-x 1 zookeeper zookeeper 988 Aug 19 19:49 zkSnapShotToolkit.cmd*
-rwxr-xr-x 1 zookeeper zookeeper 1422 Aug 19 19:49 zkSnapshotRecursiveSummaryToolkit.sh*
-rwxr-xr-x 1 zookeeper zookeeper 995 Aug 19 19:49 zkSnapshotRecursiveSummaryToolkit.cmd*
-rwxr-xr-x 1 zookeeper zookeeper 1374 Aug 19 19:49 zkSnapshotComparer.sh*
-rwxr-xr-x 1 zookeeper zookeeper 987 Aug 19 19:49 zkSnapshotComparer.cmd*
-rwxr-xr-x 1 zookeeper zookeeper 11680 Aug 19 19:49 zkServer.sh*
-rwxr-xr-x 1 zookeeper zookeeper 4559 Aug 19 19:49 zkServer-initialize.sh*
-rwxr-xr-x 1 zookeeper zookeeper 1243 Aug 19 19:49 zkServer.cmd*
-rwxr-xr-x 1 zookeeper zookeeper 3947 Aug 19 19:49 zkEnv.sh*
-rwxr-xr-x 1 zookeeper zookeeper 1810 Aug 19 19:49 zkEnv.cmd*
-rwxr-xr-x 1 zookeeper zookeeper 1576 Aug 19 19:49 zkCli.sh*
-rwxr-xr-x 1 zookeeper zookeeper 1115 Aug 19 19:49 zkCli.cmd*
-rwxr-xr-x 1 zookeeper zookeeper 1978 Aug 19 19:49 zkCleanup.sh*
-rwxr-xr-x 1 zookeeper zookeeper 232 Aug 19 19:49 README.txt*
drwxr-xr-x 2 zookeeper zookeeper 4096 Aug 19 19:49 ./
drwxr-xr-x 6 zookeeper zookeeper 4096 Oct 2 08:52 ../在bin下有 zkCli.sh
root@zk3:/apache-zookeeper-3.9.4-bin/bin# ./zkCli.sh
...
...
2025-10-15 07:41:38,559 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):o.a.z.ClientCnxn$SendThread@1427] - Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x30203aaa8590000, negotiated timeout = 30000
WATCHER::
WatchedEvent state:SyncConnected type:None path:null zxid: -1
[zk: localhost:2181(CONNECTED) 0] 集群验证,测试一致性
在 zk1中执行
[zk: localhost:2181(CONNECTED) 0] create /test-cluster "Cluster OK"
Created /test-cluster
[zk: localhost:2181(CONNECTED) 1] 在 zk1中 zkCli连接 zk2
root@zk3:/apache-zookeeper-3.9.4-bin/bin# ./zkCli.sh -server zk2:2181
Connecting to zk2:2181
..
..
2025-10-15 07:44:26,765 [myid:zk2:2181] - INFO [main-SendThread(zk2:2181):o.a.z.ClientCnxn$SendThread@1427] - Session establishment complete on server zk2/172.18.0.4:2181, session id = 0x20203aaa89b0000, negotiated timeout = 30000
WATCHER::
WatchedEvent state:SyncConnected type:None path:null zxid: -1
[zk: zk2:2181(CONNECTED) 3] ls /
[test-cluster, zookeeper]
[zk: zk2:2181(CONNECTED) 4] get /test-cluster
Cluster OK
[zk: zk2:2181(CONNECTED) 5] 可见我们的集群基本搭建成功了。
ZNode是ZooKeeper的核心数据单元,类似于文件系统中的文件或目录,每个ZNode都有一个路径(如
/app/config),可以存储少量数据(通常小于1MB),并支持子节点,形成树状结构,ZNode有几种类型:
ZNode 支持版本控制(stat 信息包括版本号),允许原子操作和 watcher(事件监听)。
ZooKeeper 不存大数据(它不是 HDFS 或 Cassandra),而是存储小量、结构化的元数据和协调信息。典型内容包括:
数据以字节数组(byte[])形式存储,支持 ACL(访问控制)和
watcher 通知变更。ZK 强调高性能读写(QPS > 10k),适合 < 1MB
的小对象。
ZooKeeper的数据结构是一个层次化的树状结构(类似 Unix 文件系统),根节点为 /。每个 ZNode 是树中的一个节点:
/
├── zookeeper (系统节点,内置)
├── config
│ ├── db
│ │ └── url = "jdbc:mysql://host:3306/db"
│ └── timeout = "30s"
├── services
│ ├── web1 (临时节点) = "192.168.1.10:8080"
│ └── web2 (顺序节点) = "192.168.1.11:8080"
└── locks
└── task-0000000001 (临时顺序节点) = "" (空数据,仅路径用于锁)这种结构支持快速遍历(ls/get)和事件驱动(watcher 监听子节点/数据变化)。ZK 集群通过复制确保一致性,数据分布在 Leader 和 Follower 节点上。
ZNode时ZooKeeper的基本数据单元,ZNode有 数据(Data)、子节点列表(Children List)、ACL(Access Control List)、Stat(统计信息) 这四大概念。
这些概念构成了 ZNode 的完整属性模型,支持 ZooKeeper 的树状数据结构和分布式协调功能。
数据,ZNode
存储的实际内容,通常是字节数组(byte[]),大小限制为 1MB
以内。
用于保存配置、服务地址等元数据。例如,ZNode /config/db
的数据为 "jdbc:mysql://host:3306"。通过 get 命令读取。
子节点列表,ZNode的直接子节点路径列表,形成层次化树状结构(根为 /)。
支持服务发现和命名服务。例如,/services 的子节点列表为
["web1", "web2"],通过 ls 或 getChildren 获取。
ACL (Access Control List), 访问控制列表,定义谁能对 ZNode 执行读/写/创建/删除/管理操作。
确保安全,例如 digest:user:pass:crdwa
允许特定用户所有权限。通过 setACL 设置、getACL 获取。
Stat (Statistics), ZNode 的元数据统计,包括版本号(cVersion/dataVersion)、时间戳(cZxid/mZxid)、子节点数等。
用于乐观锁和变更检测,例如 stat /path 返回
{czxid: 0x100000000, ctime: 169xxxxxx, ...}。
在ZooKeeper中,ZNode是最基本的数据单元,类似于文件系统中的文件或目录。ZNode的Data(数据)是ZNode的核心内容部分,它存储ZNode的实际负载(payload),用于保存应用所需的元数据或配置信息。
Data是ZK协调服务的“载体”,常见场景:
/config/app.json = {"db": "mysql://host"}/services/prod/node1 = "ip:port:version"Data独立于其他部分,但操作时常结合:
操作Data的主要方式是通过客户端工具 如zkCli.sh 或 API(如Node.js的node-zookeeper-client)。
假设已连接 zkCli
| 操作 | 命令 | 示例 | 说明 |
|---|---|---|---|
| 创建 ZNode 时设置 Data | create /path "data" |
create /config/db "jdbc:mysql://localhost:3306" |
创建持久节点,Data 为字符串。支持 -e(临时)、-s(顺序) |
| 读取 Data | get /path [watch] |
get /config/db |
输出 Data 内容;watch 监听变更。返回 Data + Stat |
| 修改 Data | set /path "newdata" [version] |
set /config/db "jdbc:mysql://newhost:3306" 1 |
更新 Data;version 为乐观锁(-1 表示忽略版本)。 |
| 获取 Data + Stat | stat /path |
stat /config/db |
不返回 Data,只返回 Stat(包括 dataLength) |
| 删除 ZNode(隐式清空 Data) | delete /path |
delete /config/db |
删除节点,Data 随之丢失。 |
Watcher示例:
get /config/db watch,然后在另一终端
set /config/db "updated",会触发事件通知
WatchedEvent state:SyncConnected type:NodeDataChanged path:/config/d
const zookeeper = require('node-zookeeper-client');
const client = zookeeper.createClient('127.0.0.1:2181');
client.connect();
function testCode() {
// 创建ZNode时设置Data
client.create('/config', Buffer.from("jdbc:mysql://localhost:3306"), zookeeper.CreateMode.PERSISTENT, (err, path) => {
if (err) console.error('Create error:', err);
else console.log('Created:', path);
});
// 读取Data 带Watcher
client.getData('/config', (event, data, stat) => {
console.log('Data:', data.toString('utf8')); // 转为字符串
console.log('Stat:', stat); // { version: 0, dataLength: 25, ... }
}, (err, data, stat) => {
if (err) console.error('Get error:', err);
else console.log('Initial Data:', data.toString('utf8'));
});
// 修改Data 带版本控制
client.setData('/config/db', Buffer.from('jdbc:mysql://newhost:3306'), -1, (err, stat) => { // -1 忽略版本
if (err) console.error('Set error:', err);
else console.log('Updated Stat:', stat.version); // 新版本号
});
// 关闭连接
client.close();
}
client.once('connected', () => {
console.log('Connected to ZooKeeper.');
testCode();
});解释:
setData(path, data, version, callback),若版本不匹配则失败(NoAuthException
或 BadVersionException)。在ZooKeeper(简称ZK)中,ZNode的子节点列表(Children List)是ZNode的另一个核心属性部分,它标识该ZNode的直接子节点(Direct Children)的路径名称列表,形成一个层次化的树状数据结构。 子节点列表类似于文件系统目录下的文件/子目录列表,但ZooKeeper强调分布式一致性和事件驱动通知。
["child1", "child2"],表示子节点路径为
/parent/child1 和 /parent/child2。根节点 /
的Children List包含所有顶级ZNode。Children List是ZooKeeper命名服务和发现机制的基础,常见场景:
/sservices 的Children
List为服务实例列表(如
["node1", "node2"]),消费者遍历列表获取地址/config/prod的Children
List为环境配置子项([“db”, “cache”])/domain/subdomainChildren List独立但互补:
| 操作 | 命令 | 示例 | 说明 |
|---|---|---|---|
| 创建子节点(间接添加至列表) | create /parent/child "data" |
create /services/node1 "ip:8080" |
创建子节点,自动加入 /services 的 Children List。支持 -e(临时)、-s(顺序)。 |
| 列出子节点列表 | ls /path [watch] |
ls /services |
输出 Children List(如 [node1, node2]);watch 监听变化。 |
| 列出子节点 + Stat | ls2 /path |
ls2 /services |
输出列表 + 每个子节点的简要 Stat(如版本、时间)。 |
| 删除子节点(间接移除列表) | delete /parent/child |
delete /services/node1 |
移除子节点,从列表中删除;需 d 权限。 |
| 获取子节点总数 | getAllChildrenNumber |
getAllChildrenNumber |
全局统计所有 ZNode 的子节点总数(非特定路径)。 |
| 递归删除(清空列表) | deleteall /path 或 rmr /path |
rmr /services |
删除 ZNode 及其所有子节点,清空列表。 |
Watcher示例:ls /services watch,然后在另一终端
create /services/node3 "ip:8081",会触发:"WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/services"。
实践:用顺序节点实现有序列表(如队列);Watcher持久化监听(递归重新注册),分页遍历大列表(ZK 无内置分页,用前缀过滤)
const zookeeper = require('node-zookeeper-client');
const client = zookeeper.createClient('127.0.0.1:2181');
client.connect();
function listServices() {
client.getChildren('/services',
function (event) {
if (event) {
console.log('Children changed event:', event);
// Re-attach watcher recursively
listServices();
}
},
function (err, children, stat) {
if (err) {
console.error('GetChildren error:', err);
return;
}
console.log('Children List:', children); // e.g., ['node1']
console.log('Num Children:', stat ? stat.numChildren : 'N/A');
console.log('cVersion:', stat ? stat.cversion : 'N/A');
}
);
}
function testCode() {
// 0. Create persistent parent node (allows ephemeral children)
client.create('/services', Buffer.from('fnode'), zookeeper.CreateMode.PERSISTENT, (err, path) => {
if (err) console.error('services Create error:', err);
else console.log('services created', path);
});
// 1. Create ephemeral child node (adds to list)
client.create('/services/node1', Buffer.from('ip:8080'), zookeeper.CreateMode.EPHEMERAL, (err, path) => {
if (err) console.error('Create error:', err);
else console.log('Added to list:', path);
});
// 2. List children with recursive watcher
listServices();
// 3. Delete child after delay (to allow initial list and watcher setup)
setTimeout(() => {
client.remove('/services/node1', -1, (err) => {
if (err) console.error('Delete error:', err);
else console.log('Removed from list');
});
}, 1000);
// 4. Close after more delay (to see delete effect)
setTimeout(() => {
client.close();
}, 2000);
}
client.once('connected', () => {
console.log('Connected to ZooKeeper.');
testCode();
});
// root@ser745692301841:/dev_dir/zookeeper-tutorial/js# node index.js
// Connected to ZooKeeper.
// services created /services
// Added to list: /services/node1
// Children List: [ 'node1' ]
// Num Children: 1
// cVersion: 1
// Children changed event: Event { type: 4, name: 'NODE_CHILDREN_CHANGED', path: '/services' }
// Removed from list
// Children List: []
// Num Children: 0
// cVersion: 2解释:
getChildren(path, watcher, callback):返回字符串数组(Children
List)和 Stat;watcher 为可选回调函数。event.type 为
zookeeper.Event.NODE_CHILDREN_CHANGED 时表示列表变更。stat.cversion
实现 CAS。在ZooKeeper中,ZNode的ACL(Acess Control List,访问控制列表)是ZNode的安全属性部分,用于定义对ZNode的访问权限。它提供细粒度的授权机制,确保分布式环境中只有授权主体才能执行特定操作(如读、写、删除)。ACL是ZooKeeper安全模型的核心,帮助防止未授权访问,尤其在多用户或服务场景下。
scheme:id:permissions。例如,world:anyone:cdrwa
表示任何人(world模式)对ZNode有所有权限。world:anyone:cdrwa,生产中必须自定义ACL是ZooKeeper安全保障的关键,常见场景:
digest:admin:***:rwACL独立但控制其他部分:
| 操作 | 命令 | 示例 | 说明 |
|---|---|---|---|
| 认证(addAuth) | addauth scheme id |
addauth digest admin:admin123 |
认证以访问受限节点;digest 会哈希密码。 |
| 设置 ACL | setAcl /path scheme:id:perms |
setAcl /secure world:anyone:r,digest:admin:admin123:cdrwa |
设置多个规则(逗号分隔);覆盖现有 ACL。 |
| 获取 ACL | getAcl /path |
getAcl /secure |
输出 ACL 列表,如 world:anyone:cdrwa。 |
| 创建时设置 ACL | create /path "data" acl |
create /secure "data" world:anyone:cdrwa |
创建节点时指定 ACL(可选)。 |
| 列出 ACL 版本 | stat /path |
stat /secure |
输出 Stat 中的 aversion(ACL 版本)。 |
| 移除 ACL | setAcl /path open:anyone:cdrwa |
setAcl /secure world:anyone:cdrwa |
设为默认开放(非删除)。 |
示例流程:
create /secure "sensitive"setAcl /secure digest:admin:admin123:crdwaaddauth digest admin:admin123get /secure (成功);无认证则失败
KeeperErrorCode = NoAuth for /secure改一下docker-compose.yml,使用自定义 zoo.cfg
version: '3.9'
services:
zk1:
image: zookeeper:3.9.4
container_name: zk1
restart: unless-stopped
hostname: zk1
ports:
- "2181:2181"
networks:
- zk-net
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
ZOO_4LW_COMMANDS_WHITELIST: "*"
volumes:
- zk1_data:/data
- zk1_datalog:/datalog
- ./zoo.cfg:/apache-zookeeper-3.9.4-bin/conf/zoo.cfg
zk2:
image: zookeeper:3.9.4
container_name: zk2
restart: unless-stopped
hostname: zk2
ports:
- "2182:2181"
networks:
- zk-net
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
ZOO_4LW_COMMANDS_WHITELIST: "*"
volumes:
- zk2_data:/data
- zk2_datalog:/datalog
- ./zoo.cfg:/apache-zookeeper-3.9.4-bin/conf/zoo.cfg
zk3:
image: zookeeper:3.9.4
container_name: zk3
restart: unless-stopped
hostname: zk3
ports:
- "2183:2181"
networks:
- zk-net
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
ZOO_4LW_COMMANDS_WHITELIST: "*"
volumes:
- zk3_data:/data
- zk3_datalog:/datalog
- ./zoo.cfg:/apache-zookeeper-3.9.4-bin/conf/zoo.cfg
volumes:
zk1_data:
zk1_datalog:
zk2_data:
zk2_datalog:
zk3_data:
zk3_datalog:
networks:
zk-net:
driver: bridgezoo.cfg
# 基本配置
tickTime=2000
initLimit=10
syncLimit=5
# 数据和日志目录(镜像内部路径)
dataDir=/data
dataLogDir=/datalog
# 客户端端口(内部)
clientPort=2181
# 集群服务器配置(使用容器名称作为主机名)
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888
# 启用 Digest 认证提供者
authProvider.1=org.apache.zookeeper.server.auth.DigestAuthenticationProvider
# 超级用户 Digest(admin:admin123 的哈希值)
superDigest=admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE=[zk: 127.0.0.1:2181(CONNECTED) 0] addauth digest admin:admin123
[zk: 127.0.0.1:2181(CONNECTED) 1] create /test "data"
Created /test
[zk: 127.0.0.1:2181(CONNECTED) 2] ls /
[test, zookeeper]
[zk: 127.0.0.1:2181(CONNECTED) 3] create /secure-test "protected data" digest:admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE=:cdrwa
Created /secure-test
[zk: 127.0.0.1:2181(CONNECTED) 4] getAcl /secure-test
'digest,'admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE=
: cdrwa
[zk: 127.0.0.1:2181(CONNECTED) 5] get /secure-test
protected data
[zk: 127.0.0.1:2181(CONNECTED) 6] 退出会话重新进Cli
root@zk1:/apache-zookeeper-3.9.4-bin/bin# ./zkCli.sh -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 1] get /secure-test
Insufficient permission : /secure-test
[zk: 127.0.0.1:2181(CONNECTED) 2]
# 可见没有权限
# 进行AUTH
[zk: 127.0.0.1:2181(CONNECTED) 2] addauth digest admin:admin123
[zk: 127.0.0.1:2181(CONNECTED) 3] get /secure-test
protected data
[zk: 127.0.0.1:2181(CONNECTED) 4] const zookeeper = require('node-zookeeper-client');
const client = zookeeper.createClient('127.0.0.1:2181');
client.connect();
function testCode() {
// Define ACLs after auth
const worldAcl = new zookeeper.ACL(zookeeper.Permission.ADMIN, new zookeeper.Id('world', 'anyone'));
const digestAcl = new zookeeper.ACL(zookeeper.Permission.ADMIN, new zookeeper.Id('digest', 'admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE='));
const acls = [worldAcl, digestAcl];
// 2. 创建节点(如果不存在),然后设置 ACL
client.create('/secure', Buffer.from('initial data'), zookeeper.CreateMode.EPHEMERAL, (err, path) => {
if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
console.error('Create error:', err);
client.close();
return;
}
console.log('Node /secure created or exists:', path);
// Now set ACL
client.setACL('/secure', acls, -1, (err, stat) => { // -1 ignores version
if (err) {
console.error('SetACL error:', err);
client.close();
return;
}
console.log('ACL set, version:', stat.aversion);
// 3. 获取 ACL
client.getACL('/secure', (err, retrievedAcls, stat) => {
if (err) console.error('GetACL error:', err);
else {
console.log('Retrieved ACLs:', retrievedAcls);
console.log('ACL Version:', stat.aversion);
// 4. 创建新节点时设置 ACL(正确顺序:data, acls, mode)
client.create('/secure-new', Buffer.from('data'), acls, zookeeper.CreateMode.EPHEMERAL, (err, path) => {
if (err) console.error('Create error:', err);
else console.log('Created with ACL:', path);
// 5. 关闭连接
setTimeout(() => client.close(), 1000); // Brief delay to see ephemeral nodes
});
}
});
});
});
}
client.once('connected', () => {
console.log('Connected to ZooKeeper.');
// 1. 认证(addAuthInfo)
client.addAuthInfo('digest', Buffer.from('admin:admin123'));
console.log('Authentication info added.');
testCode();
});
// root@ser745692301841:/dev_dir/zookeeper-tutorial/js# node index.js
// Connected to ZooKeeper.
// Authentication info added.
// Node /secure created or exists: /secure
// ACL set, version: 1
// Retrieved ACLs: [
// ACL { permission: 16, id: Id { scheme: 'world', id: 'anyone' } },
// ACL {
// permission: 16,
// id: Id { scheme: 'digest', id: 'admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE=' }
// }
// ]
// ACL Version: 1
// Created with ACL: /secure-new在ZooKeeper中,ZNode的Stat(Statistics,统计信息)是ZNode的元数据属性部分,它记录了ZNode的生命周期和变更历史,提供版本控制、时间戳和 计数等信息。
Stat不存储实际数据,而是辅助其他操作实现一致性、乐观锁和监控,Stat是只读的、全集群一致的(通过ZAB协议同步),常用于检测变更或诊断问题。
| 类别 | 字段 | 类型 | 描述 |
|---|---|---|---|
| ZXID(事务ID) | czxid | long | 创建ZNode的ZXID(ZooKeeper事务ID,全局单调递增) |
| mzxid | long | 最后修改ZNode的ZXID(Data或ACL变更) | |
| pzxid | long | 最后子节点变更的ZXID(创建、删除 Children) | |
| 时间戳 | ctime | long | 创建时间(毫秒时间戳,Unix Epoch) |
| mtime | long | 最后修改时间(Data或子节点变更) | |
| 版本号 | cversion | int | 子节点变更版本(Children List修改时递增) |
| dataVersion | int | Data变更版本(setData时递增) | |
| aclVersion | int | ACL变更版本(setACL时递增) | |
| 临时节点 | ephemeralOwner | long | 临时节点(Ephemeral)的会话ID(0表示非临时) |
| 统计 | dataLength | int | Data的字节长度(0表示空) |
| numChildren | int | 当前子节点数 |
Stat是ZooKeeper一致性和调试的“指纹”,常见场景:
Stat汇总其他部分的状态:
| 操作 | 命令 | 示例 | 说明 |
|---|---|---|---|
| 获取 Stat | stat /path |
stat /config |
输出所有 Stat 字段,如 czxid = 0x100000001、dataVersion = 0 等。 |
| 获取 Data + Stat | get /path [watch] |
get /config watch |
输出 Data + Stat;watch 监听 Data 变更(间接更新 Stat)。 |
| 获取 Children + Stat | ls2 /path |
ls2 /services |
输出 Children List + 父节点的 Stat(numChildren 等)。 |
| 检查存在 + Stat | ls /path(隐式) |
ls /config |
如果节点不存在,输出无 Stat;存在时附带简要 Stat。 |
| 版本控制操作 | set /path "data" version |
set /config "new" 1 |
用 dataVersion 作为预期版本;不匹配失败。 |
[zk: localhost:2181(CONNECTED) 0] ls /
[secure-test, test, zookeeper]
[zk: localhost:2181(CONNECTED) 1] stat /test
cZxid = 0x100000004
ctime = Thu Oct 16 08:33:50 UTC 2025
mZxid = 0x100000004
mtime = Thu Oct 16 08:33:50 UTC 2025
pZxid = 0x100000004
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0
[zk: localhost:2181(CONNECTED) 2] const zookeeper = require('node-zookeeper-client');
const client = zookeeper.createClient('127.0.0.1:2181');
client.connect();
// helper:把可能是 Buffer 的字段解析成 Number
function bufToNumber(buf) {
if (!Buffer.isBuffer(buf)) return buf;
// 常见两种长度:8 bytes (int64) 或 4 bytes (uint32)
try {
if (buf.length === 8 && typeof buf.readBigInt64BE === 'function') {
const bi = buf.readBigInt64BE(0);
// 转成 Number 可能会丢精度,但一般 zxid/ctime/mtime 在可表示范围内
return Number(bi);
}
if (buf.length === 8 && typeof buf.readBigInt64LE === 'function') {
// 如果是小端(不太常见),用 LE
const bi = buf.readBigInt64LE(0);
return Number(bi);
}
if (buf.length === 4) {
return buf.readUInt32BE(0);
}
// 退而求其次:当做字符串
return parseInt(buf.toString('hex'), 16);
} catch (e) {
// 如果都失败,返回原始 Buffer
return buf;
}
}
// 把 stat 里常用字段安全读取并格式化的函数
function formatStat(stat) {
const get = (name, getterName) => {
try {
// 先尝试 getter(如果实现了)
if (typeof stat[getterName] === 'function') {
return stat[getterName]();
}
// 再尝试直接属性
if (stat[name] !== undefined) {
return stat[name];
}
// 再尝试小写或不同拼写的属性
if (stat[name.toLowerCase()] !== undefined) {
return stat[name.toLowerCase()];
}
return undefined;
} catch (e) {
return undefined;
}
};
const czxidRaw = get('czxid', 'getCzxid') ?? stat.czxid;
const mzxidRaw = get('mzxid', 'getMzxid') ?? stat.mzxid;
const pzxidRaw = get('pzxid', 'getPzxid') ?? stat.pzxid;
const ctimeRaw = get('ctime', 'getCtime') ?? stat.ctime;
const mtimeRaw = get('mtime', 'getMtime') ?? stat.mtime;
// data/version fields在不同实现中名字不同
const versionRaw = get('version', 'getVersion') ?? stat.version ?? stat.dataVersion;
const cversionRaw = get('cversion', 'getCversion') ?? stat.cversion;
const numChildrenRaw = get('numChildren', 'getNumChildren') ?? stat.numChildren;
// 解析 Buffer -> Number(如果需要)
const czxid = bufToNumber(czxidRaw);
const mzxid = bufToNumber(mzxidRaw);
const pzxid = bufToNumber(pzxidRaw);
const ctimeNum = bufToNumber(ctimeRaw);
const mtimeNum = bufToNumber(mtimeRaw);
const version = bufToNumber(versionRaw);
const cversion = bufToNumber(cversionRaw);
const numChildren = bufToNumber(numChildrenRaw);
return {
czxid,
mzxid,
pzxid,
ctime: (typeof ctimeNum === 'number' && !Number.isNaN(ctimeNum)) ? new Date(ctimeNum) : ctimeRaw,
mtime: (typeof mtimeNum === 'number' && !Number.isNaN(mtimeNum)) ? new Date(mtimeNum) : mtimeRaw,
version,
cversion,
numChildren
};
}
function testCode() {
// 1. exists -> stat
client.exists('/secure-test', (err, stat) => {
if (err) return console.error('Exists error:', err);
if (!stat) return console.log('Node does not exist');
console.log('Stat:', formatStat(stat));
});
// 2. getData -> data + stat
client.getData('/secure-test', (err, data, stat) => {
if (err) return console.error('GetData error:', err);
console.log('Data:', data ? data.toString('utf8') : null);
console.log('Stat (from getData):', formatStat(stat));
});
// 3. getChildren('/', ...)
client.getChildren('/', (err, children, stat) => {
if (err) return console.error('GetChildren error:', err);
console.log('Children:', children);
console.log('Root stat:', formatStat(stat));
});
// 4. 修改 data(带版本控制)
client.getData('/secure-test', (err, data, stat) => {
if (err) return console.error('GetData error before set:', err);
const fmt = formatStat(stat);
const expectedVersion = typeof fmt.version === 'number' ? fmt.version : -1;
client.setData('/secure-test', Buffer.from('updated'), expectedVersion, (err, newStat) => {
if (err) {
if (err.getCode && err.getCode() === zookeeper.Exception.BAD_VERSION) {
console.error('Version mismatch');
} else {
console.error('SetData error:', err);
}
} else {
console.log('Updated, new stat:', formatStat(newStat));
}
});
});
}
client.once('connected', () => {
console.log('Connected to ZooKeeper.');
client.addAuthInfo('digest', Buffer.from('admin:admin123'));
console.log('Authentication info added.');
testCode();
});
// root@ser745692301841:/dev_dir/zookeeper-tutorial/js# node index.js
// Connected to ZooKeeper.
// Authentication info added.
// Stat: {
// czxid: 4294967301,
// mzxid: 4294967319,
// pzxid: 4294967301,
// ctime: 2025-10-16T08:41:32.407Z,
// mtime: 2025-10-16T17:32:56.581Z,
// version: 1,
// cversion: 0,
// numChildren: 0
// }
// Data: updated
// Stat (from getData): {
// czxid: 4294967301,
// mzxid: 4294967319,
// pzxid: 4294967301,
// ctime: 2025-10-16T08:41:32.407Z,
// mtime: 2025-10-16T17:32:56.581Z,
// version: 1,
// cversion: 0,
// numChildren: 0
// }
// Children: [ 'zookeeper', 'test', 'secure-test' ]
// Root stat: {
// czxid: 0,
// mzxid: 0,
// pzxid: 4294967315,
// ctime: 1970-01-01T00:00:00.000Z,
// mtime: 1970-01-01T00:00:00.000Z,
// version: 0,
// cversion: 7,
// numChildren: 3
// }
// Updated, new stat: {
// czxid: 4294967301,
// mzxid: 4294967328,
// pzxid: 4294967301,
// ctime: 2025-10-16T08:41:32.407Z,
// mtime: 2025-10-16T17:43:33.718Z,
// version: 2,
// cversion: 0,
// numChildren: 0
// }ZooKeeper的指令主要分为两类,zkCli.sh客户端命令(用于交互式操作ZNode,如创建、删除节点)和4字母单词命令(4LW Commands)(用于服务器监控和管理,通过Telnet或nc发送)。
注意: zkCli命令在zkCli.sh会话中执行 4LW命令需连接服务器端口 如
echo stat | nc localhost 2181
这些是 ZooKeeper CLI(zkCli.sh)的核心命令,用于管理 ZNode(ZooKeeper 的数据节点)。完整列表如下(按功能分类):
| 命令 | 描述 | 示例 |
|---|---|---|
| help | 显示所有可用命令帮助 | help |
| addauth | 添加认证方案 如digest | addauth digest user:pass |
| close | 关闭当前连接 | close |
| connect | 连接到指定服务器 | connect localhost:2181 |
| config | 获取/设置集群配置 | config get |
| create | 创建ZNode(持久/临时/顺序) | create /path “data” -e -s |
| delete | 删除ZNode | delete /path |
| deleteall | 递归删除ZNode及其子节点 | deleteall /path |
| get | 获取ZNode数据 | get /path |
| getACL | 获取ZNode ACL(访问控制列表) | getACL /path |
| getAllChildrenNumber | 获取所有ZNode的子节点总数 | getAllChildrenNumber |
| getChildren | 获取ZNode子节点列表(带统计) | getChildren /path |
| history | 显示命令历史 | history |
| ls | 列出ZNode子节点 | ls /path |
| ls2 | 列出子节点并显示统计信息 | ls2 /path |
| printWatches | 打印所有监听器 | printWatches summary |
| pwd | 显示当前工作路径 | pwd |
| quit | 退出zkCli | quit |
| rmr | 递归删除ZNode(等同deleteall) | rmr /path |
| set | 设置ZNode数据 | set /path “newdata” |
| setACL | 设置ZNode ACL | setACL /path world:anyone:cdrwa |
| stat | 获取ZNode统计信息 | stat /path |
| sync | 同步客户端状态 | sync |
这些命令支持 watcher(监听器)选项,如
get /path watch。
这些是服务器级命令,用于监控和诊断,通常通过网络工具发送。默认白名单启用大多数命令(排除
wchp和wchc), 可通过 4lw.commands.whitelist=*
配置启用全部,
| 命令 | 描述 | 示例 |
|---|---|---|
| cons | 列出所有连接会话 | echo cons \| nc localhost 2181 |
| conf | 显示服务器配置 | echo conf \| nc localhost 2181 |
| crst | 重置会话超时 | echo crst \| nc localhost 2181 |
| diff | 列出未提交的变更 | echo diff \| nc localhost 2181 |
| dump | 显示会话和临时节点 | echo dump \| nc localhost 2181 |
| echo | 回显输入(测试连接) | echo echo whoami \| nc localhost 2181 |
| envi | 显示环境变量 | echo envi \| nc localhost 2181 |
| gbch | 列出所有会话中的变更 | echo gbch \| nc localhost 2181 |
| get | 获取 ZNode 数据(需路径) | echo get /path \| nc localhost 2181 |
| ha-status | 显示高可用状态 | echo ha-status \| nc localhost 2181 |
| hsts | 显示 Leader 选举历史 | echo hsts \| nc localhost 2181 |
| isro | 检查是否为只读模式 | echo isro \| nc localhost 2181 |
| leader | 选举新 Leader(需路径) | echo leader \| nc localhost 2181 |
| mntr | 监控服务器指标(JSON) | echo mntr \| nc localhost 2181 |
| mtr | 列出监听器 | echo mtr \| nc localhost 2181 |
| net | 显示网络统计 | echo net \| nc localhost 2181 |
| pkst | 列出未提交的包 | echo pkst \| nc localhost 2181 |
| qstatus | 显示队列状态 | echo qstatus \| nc localhost 2181 |
| ruok | 检查服务器是否运行(返回 “imok”) | echo ruok \| nc localhost 2181 |
| srvr | 显示服务器状态 | echo srvr \| nc localhost 2181 |
| stat | 显示运行统计 | echo stat \| nc localhost 2181 |
| sync | 同步文件系统 | echo sync \| nc localhost 2181 |
| wchc | 列出所有会话的监听器(需白名单) | echo wchc \| nc localhost 2181 |
| wchp | 列出持久监听器(需白名单) | echo wchp \| nc localhost 2181 |
| wchs | 列出所有监听器 | echo wchs \| nc localhost 2181 |
| zkdump | 转储内部状态 | echo zkdump \| nc localhost 2181 |
ZooKeeper 和 Redis 的相似之处分析
ZooKeeper 和 Redis 都是分布式系统中的热门工具,用户常常觉得它们“很像”,主要是因为两者在功能重叠、架构设计和应用场景上存在不少共通点,尤其是在处理分布式数据一致性和协调时。以下从几个角度解释这种“相似感”的来源,同时也简要对比差异,帮助你更清晰地理解。
核心功能重叠:分布式锁与数据一致性
内存驻留与高性能设计
键值存储模型的共通
为什么“感觉像”,但实际有差异?
尽管相似,但 ZooKeeper 更侧重强一致性和协调(如领导者选举、配置同步),可靠性高但效率稍低;Redis 则强调高吞吐和简单,适合缓存,但锁的原子性不如 ZooKeeper 严谨(需额外处理过期时间)。 如果你的业务并发大、追求效率,用 Redis;需高可靠协调,用 ZooKeeper。
总之,这种“像”源于分布式生态的共性需求,但深入看,它们是互补的。
ZooKeeper并不是单机程序,而是通常部署成一个奇数节点的集群(ensemble),例如
| 节点名 | IP | 角色 |
|---|---|---|
| zk1 | 192.168.1.1 | Leader |
| zk2 | 192.168.1.2 | Follower |
| zk3 | 192.168.1.3 | Follower |
ZAB(ZooKeeper Atomic Broadcast)是ZooKeeper的核心算法,用于保证数据一致性与顺序性。
它大致分为两个阶段:
阶段1:选举(Leader Election)
当集群启动或Leader崩溃时:
这样确保集群重新达到一致状态
阶段2:原子广播(Atomic Broadcast)
Leader运行期间,写请求通过“两阶段提交”传播:
| 步骤 | 动作 | 说明 |
|---|---|---|
| 1 | Leader接受写请求,生成提案(proposal) | proposal包含事务号zxid |
| 2 | Leader向所有Follower广播提案ZAB保证顺序 | |
| 3 | Follower接受提案并写入本地事务日志,发送ACK | 表示已接受 |
| 4 | 当Leader收到 超过半数节点ACK时,提交(commit) | 写入内存并通知客户端成功 |
| 5 | Leader向所有节点广播commit消息 | 所有节点提交同样数据 |
ZooKeeper的所有写操作都必须被 过半节点(>N/2)确认,这就是它保证一致性的根本。
假设3台机器:
客户端发送请求:
set /game/state "RUNNING"流程:
即使zk3挂了,只要zk1 + zk2 半数仍在,写操作也能继续成功。
每个节点的 zoo.cfg 通常如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181
# 集群成员定义
server.1=192.168.1.1:2888:3888
server.2=192.168.1.1:2888:3888
server.3=192.168.1.3:2888:3888解释:
客户端可指定多个节点:
const client = zookeeper.createClient('192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181');ZooKeeper客户端会自动:
ZooKeeper集群是一种“带选举机制的强一致复制状态机”,多个实例之间通过Zab协议协同工作,Leader负责事务顺序与提交,Followers负责同步与确认, 客户端无感知地连接任意节点,即可获得线性一致地数据视图。
下面一个基于Node.js+ZooKeeper的 服务发现(Service Discovery)实践示例。
server.js
// server.js
const zookeeper = require('node-zookeeper-client');
const os = require("os");
const client = zookeeper.createClient("127.0.0.1:2181");
const SERVICE_PATH = "/services/myapp";
client.once('connected', () => {
console.log("Connected to ZooKeeper");
registerService();
});
client.connect();
function registerService() {
client.exists(SERVICE_PATH, (err, stat) => {
if (err) {
return console.error("exists error:", err);
}
if (!stat) {
client.mkdirp(SERVICE_PATH, (err) => {
if (err) {
return console.error("mkdir error:", err);
}
createEphemeralNode();
});
} else {
createEphemeralNode();
}
});
}
function createEphemeralNode() {
const ip = getLocalIP();
const port = 3000 + Math.floor(Math.random() * 1000);
const data = JSON.stringify({ ip, port });
const nodePath = `${SERVICE_PATH}/node-`;
client.create(nodePath, Buffer.from(data), zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL, (err, path) => {
if (err) return console.error("create error:", err);
console.log(`Service registered at ${path} (${data})`);
});
}
function getLocalIP() {
const interfaces = os.networkInterfaces();
for (const name in interfaces) {
for (const iface of interfaces[name]) {
if (iface.family === 'IPv4' && !iface.internal) return iface.address;
}
}
return '127.0.0.1';
}client.js
// client.js
const zookeeper = require('node-zookeeper-client');
const client = zookeeper.createClient('127.0.0.1:2181');
const SERVICE_PATH = '/services/myapp';
client.once('connected', () => {
console.log('Connected to ZooKeeper');
watchServices();
});
client.connect();
function watchServices() {
client.getChildren(
SERVICE_PATH,
event => {
console.log('Service change event:', event);
watchServices(); // 重新监听
},
(err, children) => {
if (err) {
return console.error('getChildren error:', err);
}
if (!children.length) {
console.log('No Service available');
return;
}
console.log('Found Services: ', children);
children.forEach(child => {
const fullPath = `${SERVICE_PATH}/${child}`;
client.getData(fullPath, (err, data) => {
if (!err && data) {
console.log(`${child}: ${data.toString()}`);
}
});
});
}
);
}server.js
// server.js
const zookeeper = require('node-zookeeper-client');
const os = require("os");
const client = zookeeper.createClient("127.0.0.1:2181");
const SERVICE_PATH = "/services/myapp";
client.once('connected', () => {
console.log("Connected to ZooKeeper");
registerService();
});
client.connect();
function registerService() {
client.exists(SERVICE_PATH, (err, stat) => {
if (err) {
return console.error("exists error:", err);
}
if (!stat) {
client.mkdirp(SERVICE_PATH, (err) => {
if (err) {
return console.error("mkdir error:", err);
}
createEphemeralNode();
});
} else {
createEphemeralNode();
}
});
}
function createEphemeralNode() {
const ip = getLocalIP();
const port = 3000 + Math.floor(Math.random() * 1000);
const data = JSON.stringify({ ip, port });
const nodePath = `${SERVICE_PATH}/node-`;
client.create(nodePath, Buffer.from(data), zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL, (err, path) => {
if (err) return console.error("create error:", err);
console.log(`Service registered at ${path} (${data})`);
});
}
function getLocalIP() {
const interfaces = os.networkInterfaces();
for (const name in interfaces) {
for (const iface of interfaces[name]) {
if (iface.family === 'IPv4' && !iface.internal) return iface.address;
}
}
return '127.0.0.1';
}/services/myapp 下创建一个
临时顺序节点client.js 服务实现 + 负载均衡端
// client.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const SERVICE_PATH = '/services/myapp';
let services = [];
let currentIndex = 0; // 用于轮询选择
const client = zookeeper.createClient(ZK_SERVERS);
client.once('connected', () => {
console.log('Connected to ZooKeeper');
watchServices();
});
client.connect();
// 监听服务节点变化
function watchServices() {
client.getChildren(
SERVICE_PATH,
event => {
console.log('Service list changed:', event);
watchServices();
},
(err, children) => {
if (err) return console.error('getChildren error:', err);
if (!children.length) {
services = [];
console.log('⚠️ No available services.');
return;
}
// 获取每个节点的数据
const temp = [];
let pending = children.length;
children.forEach(child => {
const fullPath = `${SERVICE_PATH}/${child}`;
client.getData(fullPath, (err, data) => {
if (!err && data) {
temp.push(JSON.parse(data.toString()));
}
if (--pending === 0) {
services = temp;
console.log('🔍 Current available services:', services);
}
});
});
}
);
}
// 监听轮询负载均衡
function getNextService() {
if (services.length === 0) {
console.log('❌ No service available.');
return null;
}
const service = services[currentIndex % services.length];
currentIndex++;
return service;
}
// 模拟客户端调用
setInterval(() => {
const service = getNextService();
if (service) {
console.log(`Sendding request to ${service.ip}:${service.port}`);
}
}, 2000);/services/myapp 子节点这是分布式系统里非常经典的场景,像 Kafka、HBase、etcd 都使用类似机制。
目标:实现一个简单的 Leader 选举算法
leader.js 实现Leader选举
const zookeeper = require('node-zookeeper-client');
const os = require('os');
const ZK_SERVERS = '127.0.0.1:2181';
const ELECTION_PATH = '/election';
const client = zookeeper.createClient(ZK_SERVERS);
let currentNodePath = null; // 当前节点路径
client.once('connected', () => {
console.log('Connected to ZooKeeper');
startElection();
});
client.connect();
// 创建选举节点
function startElection() {
client.mkdirp(ELECTION_PATH, (err) => {
if (err) {
return console.error('mkdirp error:', err);
}
createElectionNode();
});
}
function createElectionNode() {
const nodePath = `${ELECTION_PATH}/node-`;
const nodeData = Buffer.from(JSON.stringify({
host: getLocalIP(),
pid: process.pid,
}));
client.create(nodePath, nodeData, zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL, (err, path) => {
if (err) return console.error('create node error:', err);
currentNodePath = path;
console.log(`Created election node: ${path}`);
checkLeader();
});
}
// 检查自己是否是 Leader
function checkLeader() {
client.getChildren(ELECTION_PATH, event => {
console.log('Election node changed:', event);
checkLeader(); // 重新检测
}, (err, children) => {
if (err) return console.error('getChildren error:', err);
// 排序节点
const sorted = children.sort();
const myNode = currentNodePath.split('/').pop();
if (sorted[0] === myNode) {
console.log('I am the Leader!');
} else {
const leaderNode = sorted[0];
console.group(`I am a Follower, current Leader: ${leaderNode}`);
// 监听比自己小的那个节点(前一个节点)
const myIndex = sorted.indexOf(myNode);
const watchTarget = `${ELECTION_PATH}/${sorted[myIndex - 1]}`;
client.exists(watchTarget, event => {
console.log(`Watched node deleted: ${event}`);
checkLeader(); // 前一个节点删除后重新检查
}, (err, stat) => {
if (err) return console.error('exist error:', err);
if (!stat) checkLeader(); // 前一个节点被删除时重新检查
});
}
});
}
function getLocalIP() {
const interfaces = os.networkInterfaces();
for (const name in interfaces) {
for (const iface of interfaces[name]) {
if (iface.family === 'IPv4' && !iface.internal) return iface.address;
}
}
return '127.0.0.1';
}ZooKeeper的分布式锁原理:
/locks/mylock/lock-0000000001
/locks/mylock/lock-0000000002
/locks/mylock/lock-0000000003fair_lock.js 分布式公平锁实现
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_PATH = '/locks/mylock';
const client = zookeeper.createClient(ZK_SERVERS);
let myNodePath = null;
let lockReleased = false;
let lockAcquired = false; // ✅ 防止重复获得锁
client.once('connected', () => {
console.log('Connected to ZooKeeper');
acquireLock();
});
client.connect();
function acquireLock() {
client.mkdirp(LOCK_PATH, (err) => {
if (err) return console.error('mkdirp error:', err);
createLockNode();
});
}
function createLockNode() {
const nodePath = `${LOCK_PATH}/lock-`;
client.create(
nodePath,
Buffer.from('lock-node'),
zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
(err, path) => {
if (err) return console.error('create error:', err);
myNodePath = path;
console.log(`🔒 Created lock node: ${path}`);
checkLock();
}
);
}
function checkLock() {
if (!myNodePath || lockReleased || lockAcquired) return;
client.getChildren(
LOCK_PATH,
event => {
if (!lockReleased && !lockAcquired) {
console.log('📡 Lock node event:', event);
checkLock();
}
},
(err, children) => {
if (err) return console.error('getChildren error:', err);
if (!myNodePath || lockReleased || lockAcquired) return;
const sorted = children.sort();
const myNode = myNodePath.split('/').pop();
const myIndex = sorted.indexOf(myNode);
if (myIndex === 0) {
lockAcquired = true; // ✅ 防止重复进入临界区
console.log('✅ Lock acquired! I am the holder.');
doCriticalSection();
} else {
const watchTarget = `${LOCK_PATH}/${sorted[myIndex - 1]}`;
console.log(`⏳ Waiting for ${watchTarget} to be released...`);
client.exists(
watchTarget,
event => {
if (!lockReleased && !lockAcquired && event.getType() ===
zookeeper.Event.NODE_DELETED) {
console.log('📢 Previous lock released, rechecking...');
checkLock();
}
},
(err, stat) => {
if (err) return console.error('exists error:', err);
if (!stat) checkLock(); // 前一个节点可能刚被删掉
}
);
}
}
);
}
function doCriticalSection() {
console.log('🧠 Doing critical section...');
setTimeout(() => {
console.log('🧹 Releasing lock...');
releaseLock();
}, 5000);
}
function releaseLock() {
if (!myNodePath || lockReleased) return;
lockReleased = true;
client.remove(myNodePath, (err) => {
if (err) {
if (err.code === zookeeper.Exception.NO_NODE) {
console.warn('⚠️ Node already removed, safe to ignore.');
} else {
console.error('remove error:', err);
}
} else {
console.log('🔓 Lock released!');
}
myNodePath = null;
setTimeout(() => client.close(), 1000);
});
}在ZooKeeper里,每个节点的 dataVersion 都相当于“版本号”。每次修改数据时,ZooKeeper都会让版本号+1。
所谓“乐观锁(Optimistic Lock)”,就是:
我认为我读到的数据在我修改前不会被别人改过,修改时如果版本号不匹配,就说明“别人改了”,我放弃修改或重试。
我们创建一个节点
/optimistic/item,里面存一个库存数,比如“stock=10”。
两个客户端同时读取该节点,然后尝试修改它:
ZooKeeper会检查版本号,如果版本不匹配,修改失败(相当于CAS:Compare And Swap)
optimistic_lock.js
// optimistic_lock.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const PATH = '/optimistic/item';
const client = zookeeper.createClient(ZK_SERVERS);
client.once('connected', () => {
console.log('✅ Connected to ZooKeeper.');
// client.addAuthInfo('digest', Buffer.from('admin:admin123'));
initNode(() => simulateClients());
});
client.connect();
// 初始化节点
function initNode(callback) {
client.exists(PATH, (err, stat) => {
if (err) return console.error('exists error:', err);
if (stat) return callback();
client.mkdirp('/optimistic', (err) => {
if (err) return console.error('mkdirp error:', err);
client.create(
PATH,
Buffer.from('stock=10'),
zookeeper.CreateMode.PERSISTENT,
(err) => {
if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
return console.error('create error:', err);
}
console.log('🧱 Initialized node with stock=10');
callback();
}
);
});
});
}
// 模拟两个客户端同时修改库存
function simulateClients() {
readAndUpdate('Client A', -1, () => {
readAndUpdate('Client B', -2, () => {
// 全部结束
setTimeout(() => client.close(), 1000);
});
});
}
// 读取并尝试更新数据(带版本检查)
function readAndUpdate(clientName, delta, callback) {
client.getData(PATH, (err, data, stat) => {
if (err) return console.error(`${clientName} getData error:`, err);
const stockStr = data.toString('utf8');
const stock = parseInt(stockStr.split('=')[1]);
const newStock = stock + delta;
console.log(`${clientName} reads ${stockStr}, version=${stat.version}`);
console.log(`${clientName} wants to update to stock=${newStock}`);
// 尝试更新(使用当前 version)
const newData = Buffer.from(`stock=${newStock}`);
client.setData(PATH, newData, stat.version, (err, newStat) => {
if (err) {
if (err.getCode && err.getCode() === zookeeper.Exception.BADVERSION) {
console.error(`❌ ${clientName} update failed (version conflict).`);
} else {
console.error(`${clientName} setData error:`, err);
}
} else {
console.log(`✅ ${clientName} successfully updated! newVersion=${newStat.version}`);
}
callback();
});
});
}在ZooKeeper里,“悲观锁”常用 临时顺序节点(EPHEMERAL_SEQUENTIAL) 实现。原理和“公平锁”几乎一样,只是:
“悲观锁会先阻止其他客户端进入临界区,等自己处理完再释放。”
假设我们有一个共享资源节点
/pessimistic/resource,多个客户端要修改它,但不能同时操作。
我们使用 ZooKeeper 的锁节点 /locks/pessimistic:
// pessimistic_lock.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_PATH = '/locks/pessimistic';
const RESOURCE_PATH = '/pessimistic/resource';
const client = zookeeper.createClient(ZK_SERVERS);
let myNodePath = null;
client.once('connected', () => {
console.log('✅ Connected to ZooKeeper.');
client.addAuthInfo('digest', Buffer.from('admin:admin123'));
ensurePaths(() => acquireLock());
});
client.connect();
// 确保锁目录和资源存在
function ensurePaths(callback) {
client.mkdirp(LOCK_PATH, (err) => {
if (err) return console.error('mkdirp lock error:', err);
client.exists(RESOURCE_PATH, (err, stat) => {
if (err) return console.error('exists error:', err);
if (stat) return callback();
client.mkdirp('/pessimistic', (err) => {
if (err) return console.error('mkdirp resource error:', err);
client.create(RESOURCE_PATH,
Buffer.from('value=0'),
zookeeper.CreateMode.PERSISTENT,
(err) => {
if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS)
return console.error('create resource error:', err);
console.log('🧱 Initialized resource: value=0');
callback();
});
});
});
});
}
// 获取锁
function acquireLock() {
const nodePrefix = `${LOCK_PATH}/lock-`;
client.create(nodePrefix,
Buffer.from('lock-node'),
zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
(err, path) => {
if (err) return console.error('create lock node error:', err);
myNodePath = path;
console.log(`🔒 Created lock node: ${path}`);
checkLock();
});
}
// 检查自己是否获得锁
function checkLock() {
client.getChildren(LOCK_PATH, (err, children) => {
if (err) return console.error('getChildren error:', err);
const sorted = children.sort();
const myNode = myNodePath.split('/').pop();
const myIndex = sorted.indexOf(myNode);
if (myIndex === 0) {
console.log('✅ Lock acquired! Performing exclusive operation...');
doCriticalSection();
} else {
const watchTarget = `${LOCK_PATH}/${sorted[myIndex - 1]}`;
console.log(`⏳ Waiting for ${watchTarget} to be released...`);
client.exists(watchTarget, event => {
if (event.getType() === zookeeper.Event.NODE_DELETED) {
console.log('📢 Previous lock released, rechecking...');
checkLock();
}
}, (err, stat) => {
if (err) return console.error('exists error:', err);
if (!stat) checkLock();
});
}
});
}
// 执行临界区操作
function doCriticalSection() {
client.getData(RESOURCE_PATH, (err, data, stat) => {
if (err) return console.error('getData error:', err);
let value = parseInt(data.toString('utf8').split('=')[1]);
console.log(`🔍 Read value=${value}`);
// 模拟操作(加1)
value++;
const newData = Buffer.from(`value=${value}`);
client.setData(RESOURCE_PATH, newData, stat.version, (err, newStat) => {
if (err) return console.error('setData error:', err);
console.log(`🧠 Updated resource to value=${value} (version ${newStat.version})`);
// 模拟任务耗时
setTimeout(() => {
releaseLock();
}, 3000);
});
});
}
// 释放锁
function releaseLock() {
if (!myNodePath) return;
console.log('🧹 Releasing lock...');
client.remove(myNodePath, (err) => {
if (err) {
if (err.getCode && err.getCode() === zookeeper.Exception.NO_NODE) {
console.warn('⚠️ Lock node already removed.');
} else {
console.error('remove error:', err);
}
} else {
console.log('🔓 Lock released!');
}
client.close();
});
}在分布式系统中,“可重入锁(Reentrant Lock)” 允许同一个线程(或客户端)多次获取同一把锁,而不会被自己阻塞。
实现关键点:
| 特性 | 说明 |
|---|---|
| 可重入性 | 同一客户端多次加锁不会阻塞自己 |
| 基于Ephemeral节点 | ZooKeeper自动清理掉死连接 |
| 分布式安全性 | 即使进程崩溃,锁也能释放 |
| 客户端唯一标识 | 通过CLIENT_ID区分不同实例 |
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_ROOT = '/reentrant_lock';
const LOCK_NAME = 'mylock';
const CLIENT_ID = 'client-A'; // 模拟当前客户端唯一ID
const client = zookeeper.createClient(ZK_SERVERS);
client.connect();
let lockPath = null;
let reentrantCount = 0;
client.once('connected', async () => {
console.log('✅ Connected to ZooKeeper');
await ensurePath(LOCK_ROOT);
await acquireLock();
await acquireLock(); // 再次获取(测试可重入)
setTimeout(async () => {
await releaseLock();
await releaseLock(); // 第二次释放才真正删除锁节点
process.exit(0);
}, 4000);
});
/**
* 确保根路径存在
*/
function ensurePath(path) {
return new Promise((resolve) => {
client.exists(path, (err, stat) => {
if (stat) return resolve();
client.create(path, (err2) => {
if (err2 && err2.getCode() !== zookeeper.Exception.NODE_EXISTS) {
console.error('❌ create path error:', err2);
}
resolve();
});
});
});
}
/**
* 尝试获取锁
*/
async function acquireLock() {
if (lockPath) {
reentrantCount++;
console.log(`🔁 Re-entered lock (${reentrantCount} times)`);
return;
}
const path = `${LOCK_ROOT}/${LOCK_NAME}`;
try {
lockPath = await createEphemeralNode(path, CLIENT_ID);
reentrantCount = 1;
console.log(`🔒 Lock acquired by ${CLIENT_ID}`);
} catch (err) {
if (err.code === zookeeper.Exception.NODE_EXISTS) {
console.log(`⏳ Lock held by others, waiting...`);
await waitForLock(path);
return acquireLock(); // 重试
} else {
console.error('❌ acquire lock error:', err);
}
}
}
/**
* 释放锁
*/
async function releaseLock() {
if (!lockPath) return;
reentrantCount--;
console.log(`🧹 release lock called (${reentrantCount} remaining)`);
if (reentrantCount === 0) {
await deleteNode(lockPath);
lockPath = null;
console.log('🔓 Lock fully released');
}
}
/**
* 创建临时节点
*/
function createEphemeralNode(path, data) {
return new Promise((resolve, reject) => {
client.create(
path,
Buffer.from(data),
zookeeper.CreateMode.EPHEMERAL,
(err, createdPath) => {
if (err) return reject(err);
resolve(createdPath);
}
);
});
}
/**
* 删除节点
*/
function deleteNode(path) {
return new Promise((resolve) => {
client.remove(path, (err) => {
if (err && err.getCode() !== zookeeper.Exception.NO_NODE) {
console.error('remove error:', err);
}
resolve();
});
});
}
/**
* 等待锁释放(监听删除事件)
*/
function waitForLock(path) {
return new Promise((resolve) => {
const watcher = (event) => {
if (event.getType() === zookeeper.Event.NODE_DELETED) {
console.log('📢 Lock node deleted, can retry.');
resolve();
} else {
client.exists(path, watcher, () => { });
}
};
client.exists(path, watcher, () => { });
});
}在分布式系统中,读写锁允许:
ZooKeeper实现思路
|类型|节点命名规则|可并发情况| |读锁| /locks/read-xxxxxx
|可以与其他读锁共存| |写锁| /locks/write-xxxxxx |
需要等待前面所有锁释放 |
实现规则:
| 特性 | 说明 |
|---|---|
| 读共享 | 多个读锁可同时持有 |
| 写独占 | 写锁必须等待前面所有锁释放 |
| FIFO顺序 | 使用EPHEMERAL_SEQUENTIAL实现公平排队 |
| 自动释放 | 客户端断开时锁节点自动清除 |
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_ROOT = '/rw_locks';
const client = zookeeper.createClient(ZK_SERVERS);
client.connect();
client.once('connected', async () => {
console.log('✅ Connected to ZooKeeper');
await ensurePath(LOCK_ROOT);
// 测试:一个读锁 + 一个写锁
simulateClient('Reader-1', 'read');
setTimeout(() => simulateClient('Writer-1', 'write'), 1000);
setTimeout(() => simulateClient('Reader-2', 'read'), 2000);
});
async function ensurePath(path) {
return new Promise((resolve) => {
client.exists(path, (err, stat) => {
if (stat) return resolve();
client.create(path, (err2) => {
if (err2 && err2.getCode() !== zookeeper.Exception.NODE_EXISTS) {
console.error('❌ create path error:', err2);
}
resolve();
});
});
});
}
/**
* 模拟不同客户端
*/
async function simulateClient(id, type) {
const lock = new ReadWriteLock(client, LOCK_ROOT, id, type);
await lock.acquire();
console.log(`🧠 [${id}] doing ${type} operation...`);
setTimeout(async () => {
await lock.release();
console.log(`🔓 [${id}] released ${type} lock`);
}, 3000);
}
/**
* 读写锁类
*/
class ReadWriteLock {
constructor(client, root, id, type) {
this.client = client;
this.root = root;
this.id = id;
this.type = type; // 'read' | 'write'
this.nodePath = null;
}
async acquire() {
this.nodePath = await this.createLockNode();
console.log(`🔒 [${this.id}] requested ${this.type} lock: ${this.nodePath}`);
while (true) {
const children = await this.getChildren();
const sorted = children.sort();
const myNode = this.nodePath.split('/').pop();
const myIndex = sorted.indexOf(myNode);
if (this.type === 'write') {
// 写锁需等待前面所有节点都释放
if (myIndex === 0) break;
} else {
// 读锁:只需等待前面所有“写锁”
const beforeMe = sorted.slice(0, myIndex);
const hasWriterBefore = beforeMe.some(name => name.startsWith('write-'));
if (!hasWriterBefore) break;
}
const watchNode = `${this.root}/${sorted[myIndex - 1]}`;
console.log(`⏳ [${this.id}] waiting for ${watchNode}...`);
await this.waitForDelete(watchNode);
}
console.log(`✅ [${this.id}] acquired ${this.type} lock`);
}
async release() {
if (this.nodePath) {
await new Promise((resolve) => {
this.client.remove(this.nodePath, (err) => {
if (err && err.getCode() !== zookeeper.Exception.NO_NODE) {
console.error('remove error:', err);
}
resolve();
});
});
this.nodePath = null;
}
}
getChildren() {
return new Promise((resolve, reject) => {
this.client.getChildren(this.root, (err, children) => {
if (err) reject(err);
else resolve(children);
});
});
}
createLockNode() {
const nodePrefix = `${this.root}/${this.type}-`;
return new Promise((resolve, reject) => {
this.client.create(
nodePrefix,
Buffer.from(this.id),
zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
(err, path) => {
if (err) reject(err);
else resolve(path);
}
);
});
}
waitForDelete(path) {
return new Promise((resolve) => {
const watcher = (event) => {
if (event.getType() === zookeeper.Event.NODE_DELETED) {
resolve();
} else {
this.client.exists(path, watcher, () => { });
}
};
this.client.exists(path, watcher, () => { });
});
}
}分布式系统中,如果某个客户端:
这时,其他客户端会一直卡住等待。所以,我们希望加上一个 超时机制(Timeout):
超时锁 = ZooKeeper分布式锁 + 本地计时器(过期自动释放)
| 特性 | 说明 |
|---|---|
| 公平性 | 按顺序号排队,先来先得 |
| 自动释放 | 本地超时+ZooKeeper session失效 双保险 |
| 容错性 | 即使节点断连,ZooKeeper会自动删除临时节点 |
| 安全性 | 不会发生死锁或永久等待 |
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_PATH = '/timeout_locks/mylock';
const client = zookeeper.createClient(ZK_SERVERS);
client.connect();
client.once('connected', () => {
console.log('✅ Connected to ZooKeeper');
ensurePath(LOCK_PATH, () => {
simulateClient('Client-A', 5000);
setTimeout(() => simulateClient('Client-B', 1000), 1000);
});
});
// 确保路径存在
function ensurePath(path, cb) {
client.mkdirp(path, (err) => {
if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS)
console.error('mkdirp error:', err);
cb();
});
}
// 模拟不同客户端
function simulateClient(id, timeoutMs) {
const lock = new TimeoutLock(client, LOCK_PATH, id, timeoutMs);
lock.acquire()
.then(() => {
console.log(`🧠 [${id}] doing critical work...`);
// 模拟长任务
setTimeout(() => {
lock.release();
}, 3000);
})
.catch(err => console.error(`[${id}] failed to acquire:`, err));
}
/**
* 超时锁类
*/
class TimeoutLock {
constructor(client, lockPath, id, timeoutMs = 10000) {
this.client = client;
this.lockPath = lockPath;
this.id = id;
this.nodePath = null;
this.timeoutMs = timeoutMs;
this.timer = null;
}
async acquire() {
this.nodePath = await this.createLockNode();
console.log(`🔒 [${this.id}] requested lock: ${this.nodePath}`);
while (true) {
const children = await this.getChildren();
const sorted = children.sort();
const myNode = this.nodePath.split('/').pop();
const myIndex = sorted.indexOf(myNode);
if (myIndex === 0) {
console.log(`✅ [${this.id}] acquired lock`);
this.startTimeout();
return;
}
const prevNode = `${this.lockPath}/${sorted[myIndex - 1]}`;
console.log(`⏳ [${this.id}] waiting for ${prevNode}...`);
await this.waitForDelete(prevNode);
}
}
// 启动超时定时器
startTimeout() {
this.timer = setTimeout(() => {
console.log(`⏰ [${this.id}] lock timeout (${this.timeoutMs}ms)! Releasing...`);
this.release();
}, this.timeoutMs);
}
async release() {
if (this.timer) clearTimeout(this.timer);
if (this.nodePath) {
await new Promise((resolve) => {
this.client.remove(this.nodePath, (err) => {
if (err && err.getCode() !== zookeeper.Exception.NO_NODE) {
console.error(`[${this.id}] remove error:`, err);
} else {
console.log(`🔓 [${this.id}] released lock`);
}
resolve();
});
});
this.nodePath = null;
}
}
createLockNode() {
const prefix = `${this.lockPath}/lock-`;
return new Promise((resolve, reject) => {
this.client.create(
prefix,
Buffer.from(this.id),
zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
(err, path) => {
if (err) reject(err);
else resolve(path);
}
);
});
}
getChildren() {
return new Promise((resolve, reject) => {
this.client.getChildren(this.lockPath, (err, children) => {
if (err) reject(err);
else resolve(children);
});
});
}
waitForDelete(path) {
return new Promise((resolve) => {
const watcher = (event) => {
if (event.getType() === zookeeper.Event.NODE_DELETED) {
resolve();
} else {
this.client.exists(path, watcher, () => { });
}
};
this.client.exists(path, watcher, () => { });
});
}
}来有类似的 支持自动续期”的 可续租超时锁(Renewable Timeout Lock)
“尝试锁”与普通锁的区别在于:
| 类型 | 行为 |
|---|---|
| 普通锁 | 获取不到会一直阻塞等待 |
| 尝试锁TryLock | 尝试获取,如果锁被占用,立即返回失败或等待指定时间后放弃 |
这非常适合做非阻塞操作,比如:某个任务只要资源没空闲就跳过,而不是卡在哪里。
基于ZooKeeper的锁机制:
/trylock/mylock| 特性 | 说明 |
|---|---|
| ✅ 非阻塞 | 无需等待,失败立即返回 |
| ⏱ 可配置超时 | 可指定等待时长,超时放弃 |
| 🔒 安全 | 使用临时节点,断线自动释放 |
| 🧩 应用场景 | 秒杀库存、任务去重、单次调度防重入 |
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_PATH = '/trylock/mylock';
const client = zookeeper.createClient(ZK_SERVERS);
client.once('connected', async () => {
console.log('✅ Connected to ZooKeeper');
await ensurePath(LOCK_PATH);
// 启动两个客户端模拟
tryLockClient('Client-A', 0); // 立即尝试
setTimeout(() => tryLockClient('Client-B', 3000), 1000); // 等待重试
});
client.connect();
// 确保路径存在
function ensurePath(path) {
return new Promise((resolve) => {
client.mkdirp(path, (err) => {
if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
console.error('mkdirp error:', err);
}
resolve();
});
});
}
// 模拟客户端尝试锁
async function tryLockClient(id, retryDelay = 0) {
const lock = new TryLock(client, LOCK_PATH, id);
const success = await lock.tryAcquire(2000); // 2秒超时模式
if (success) {
console.log(`✅ [${id}] got lock! Doing work...`);
setTimeout(() => lock.release(), 3000);
} else {
console.log(`❌ [${id}] failed to acquire lock.`);
if (retryDelay > 0) {
console.log(`🔁 [${id}] will retry after ${retryDelay}ms`);
setTimeout(() => tryLockClient(id), retryDelay);
}
}
}
/**
* 尝试锁类
*/
class TryLock {
constructor(client, basePath, id) {
this.client = client;
this.lockNode = `${basePath}/lock-node`;
this.id = id;
}
tryAcquire(timeoutMs = 0) {
return new Promise((resolve) => {
const tryCreate = () => {
this.client.create(
this.lockNode,
Buffer.from(this.id),
zookeeper.CreateMode.EPHEMERAL,
(err) => {
if (!err) {
resolve(true); // ✅ 成功获得锁
} else if (err.getCode() === zookeeper.Exception.NODE_EXISTS) {
if (timeoutMs > 0) {
// 等待锁释放
this.waitForLock(timeoutMs).then(resolve);
} else {
resolve(false); // ❌ 立即失败
}
} else {
console.error(`[${this.id}] unexpected error:`, err);
resolve(false);
}
}
);
};
tryCreate();
});
}
waitForLock(timeoutMs) {
return new Promise((resolve) => {
const timer = setTimeout(() => {
console.log(`⏰ [${this.id}] timeout waiting for lock`);
resolve(false);
}, timeoutMs);
const watch = (event) => {
if (event.getType() === zookeeper.Event.NODE_DELETED) {
clearTimeout(timer);
this.tryAcquire(0).then(resolve);
}
};
this.client.exists(this.lockNode, watch, () => { });
});
}
release() {
this.client.remove(this.lockNode, (err) => {
if (err && err.getCode() !== zookeeper.Exception.NO_NODE) {
console.error(`[${this.id}] remove error:`, err);
} else {
console.log(`🔓 [${this.id}] released lock`);
}
});
}
}还有,带重试次数上限的 TryLock(可重试非阻塞锁)
分段锁(Segment Lock)是一种用于提高并发性能的锁机制,它将数据划分为多个段(Segment),每个段有自己独立的锁。这样不同段上的操作可以并行执行,而同一段仍然是串行的。
例如:
userId%10 分成10段我们利用ZooKeeper的 临时顺序节点(Ephemeral Sequential Node)实现:
/locks-segment/locks-segment/segment-<id>
表示某个段的锁目录/locks-segment/segment-<id>/lock-xxxx| 特性 | 说明 |
|---|---|
| 锁粒度 | 每个段独立 |
| 并发性能 | 显著提升 |
| 典型应用 | 用户缓存更新、库存更新 |
| ZNode结构 | /lock-segment/segment-N/lock-xxxx |
// segment_lock.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const BASE_PATH = '/locks-segment';
const SEGMENT_COUNT = 8; // 分成8段
const client = zookeeper.createClient(ZK_SERVERS);
// 防止 Node.js 警告:监听器太多
client.setMaxListeners(1000);
client.once('connected', async () => {
console.log('Connected to ZooKeeper');
await initSegments();
// 模拟多个客户端尝试加锁
simulateClients();
});
client.connect();
async function initSegments() {
for (let i = 0; i < SEGMENT_COUNT; i++) {
await mkdirp(`${BASE_PATH}/segment-${i}`);
}
}
// 确保路径存在
function mkdirp(path) {
return new Promise((resolve, reject) => {
client.mkdirp(path, (err) => (err ? reject(err) : resolve()));
});
}
// 等待节点删除(只监听一次)
function waitForDelete(path) {
return new Promise((resolve) => {
client.exists(
path,
(event) => {
if (event.getType() === zookeeper.Event.NODE_DELETED) {
resolve();
}
},
(err, stat) => {
if (!stat) resolve();
}
);
});
}
// 尝试加锁
async function acquireSegmentLock(segmentId, resourceId) {
const segmentPath = `${BASE_PATH}/segment-${segmentId}`;
const nodePath = await new Promise((resolve, reject) => {
client.create(
`${segmentPath}/lock-`,
Buffer.from(`lock-${resourceId}`),
zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
(err, path) => (err ? reject(err) : resolve(path))
);
});
const nodeName = nodePath.split('/').pop();
await tryAcquire(segmentPath, nodeName, segmentId, resourceId);
return nodePath;
}
async function tryAcquire(segmentPath, nodeName, segmentId, resourceId) {
const children = await new Promise((resolve, reject) => {
client.getChildren(segmentPath, (err, ch) => (err ? reject(err) : resolve(ch)));
});
children.sort();
const index = children.indexOf(nodeName);
if (index === 0) {
console.log(`✅ [${resourceId}] Got lock on segment-${segmentId}`);
doWork(segmentPath, nodeName, segmentId, resourceId);
} else {
const prevNode = children[index - 1];
console.log(`⏳ [${resourceId}] Waiting for ${prevNode} on segment-${segmentId}...`);
await waitForDelete(`${segmentPath}/${prevNode}`);
await tryAcquire(segmentPath, nodeName, segmentId, resourceId);
}
}
// 模拟业务逻辑
async function doWork(segmentPath, nodeName, segmentId, resourceId) {
console.log(`🧠 [${resourceId}] Doing work on segment-${segmentId}...`);
await sleep(3000 + Math.random() * 2000);
console.log(`🧹 [${resourceId}] Releasing lock on segment-${segmentId}...`);
await new Promise((resolve) => {
client.remove(`${segmentPath}/${nodeName}`, (err) => {
if (err) console.error('remove error:', err);
else console.log(`🔓 [${resourceId}] Released lock ${segmentPath}/${nodeName}`);
resolve();
});
});
}
// 延迟函数
function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}
// 模拟多个客户端争抢不同 segment
function simulateClients() {
const tasks = [101, 202, 303, 404, 505, 606, 707, 808];
for (const id of tasks) {
const segmentId = Math.floor(Math.random() * SEGMENT_COUNT);
acquireSegmentLock(segmentId, id).catch(console.error);
}
}root@ser745692301841:/dev_dir/zookeeper-tutorial/js# node segment_lock.js
Connected to ZooKeeper
✅ [101] Got lock on segment-4
🧠 [101] Doing work on segment-4...
✅ [202] Got lock on segment-3
🧠 [202] Doing work on segment-3...
✅ [303] Got lock on segment-2
🧠 [303] Doing work on segment-2...
✅ [404] Got lock on segment-7
🧠 [404] Doing work on segment-7...
✅ [505] Got lock on segment-5
🧠 [505] Doing work on segment-5...
⏳ [606] Waiting for lock-0000000003 on segment-2...
⏳ [707] Waiting for lock-0000000003 on segment-4...
✅ [808] Got lock on segment-6
🧠 [808] Doing work on segment-6...
🧹 [505] Releasing lock on segment-5...
🔓 [505] Released lock /locks-segment/segment-5/lock-0000000003
🧹 [101] Releasing lock on segment-4...
🔓 [101] Released lock /locks-segment/segment-4/lock-0000000003
✅ [707] Got lock on segment-4
🧠 [707] Doing work on segment-4...
🧹 [303] Releasing lock on segment-2...
🔓 [303] Released lock /locks-segment/segment-2/lock-0000000003
✅ [606] Got lock on segment-2
🧠 [606] Doing work on segment-2...
🧹 [808] Releasing lock on segment-6...
🔓 [808] Released lock /locks-segment/segment-6/lock-0000000000
🧹 [404] Releasing lock on segment-7...
🔓 [404] Released lock /locks-segment/segment-7/lock-0000000003
🧹 [202] Releasing lock on segment-3...
🔓 [202] Released lock /locks-segment/segment-3/lock-0000000000
🧹 [707] Releasing lock on segment-4...
🔓 [707] Released lock /locks-segment/segment-4/lock-0000000004
🧹 [606] Releasing lock on segment-2...
🔓 [606] Released lock /locks-segment/segment-2/lock-0000000004这个版本适合用在 需要同时锁定多个资源 的场景,比如游戏后端中两个玩家的交易、多个库存项的更新等。
在分布式系统中,有时候你要同时锁住多个资源(例如 itemA 和 itemB):
我们可以利用 ZooKeeper 的临时顺序节点 机制来实现公平的多资源锁。
| 功能 | 实现说明 |
|---|---|
| ✅ 多资源加锁 | 一次性锁多个资源,确保顺序一致 |
| ✅ 死锁避免 | 对资源名称排序,保证锁申请顺序相同 |
| ✅ 公平性 | 使用 EPHEMERAL_SEQUENTIAL |
| ✅ 自动释放 | 会话断开自动清除节点 |
| ✅ 无内存泄漏 | 使用一次性 exists 监听 |
| ✅ 可扩展 | 可以在游戏、交易、库存更新等场景中直接使用 |
// multi_resource_lock.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_ROOT = '/multi-locks';
const client = zookeeper.createClient(ZK_SERVERS);
client.setMaxListeners(1000);
client.once('connected', async () => {
console.log('✅ Connected to ZooKeeper');
await mkdirp(LOCK_ROOT);
simulateClients();
});
client.connect();
// 辅助函数
function mkdirp(path) {
return new Promise((resolve, reject) => {
client.mkdirp(path, (err) => (err ? reject(err) : resolve()));
});
}
function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}
/**
* @param {string[]} resources 要加锁的资源名数组,例如 ['user:101', 'user:202']
* @param {string} clientId 客户端ID(打印标识)
*/
async function acquireMultiLock(resources, clientId) {
// 按字典顺序排序,防止死锁
const sortedResources = [...resources].sort();
const lockPaths = [];
try {
for (const res of sortedResources) {
const path = await acquireSingleLock(res, clientId);
lockPaths.push(path);
}
console.log(`🔒 [${clientId}] Acquired locks on: ${sortedResources.join(', ')}`);
await doWork(clientId, sortedResources);
} finally {
// 全部释放
for (const path of lockPaths.reverse()) {
await releaseLock(path, clientId);
}
}
}
/** 获取单个资源锁 */
async function acquireSingleLock(resource, clientId) {
const resPath = `${LOCK_ROOT}/${resource}`;
await mkdirp(resPath);
const nodePath = await new Promise((resolve, reject) => {
client.create(
`${resPath}/lock-`,
Buffer.from(clientId),
zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
(err, path) => (err ? reject(err) : resolve(path))
);
});
const nodeName = nodePath.split('/').pop();
await tryAcquire(resPath, nodeName, resource, clientId);
return nodePath;
}
/** 检查是否获得锁,否则监听前一个节点 */
async function tryAcquire(resPath, nodeName, resource, clientId) {
const children = await new Promise((resolve, reject) => {
client.getChildren(resPath, (err, list) => (err ? reject(err) : resolve(list)));
});
children.sort();
const index = children.indexOf(nodeName);
if (index === 0) {
console.log(`✅ [${clientId}] Got lock on ${resource}`);
return;
} else {
const prevNode = children[index - 1];
const prevPath = `${resPath}/${prevNode}`;
await waitForDelete(prevPath);
await tryAcquire(resPath, nodeName, resource, clientId);
}
}
/** 等待前一个节点删除 */
function waitForDelete(path) {
return new Promise((resolve) => {
client.exists(
path,
(event) => {
if (event.getType() === zookeeper.Event.NODE_DELETED) resolve();
},
(err, stat) => {
if (!stat) resolve();
}
);
});
}
/** 释放锁 */
async function releaseLock(nodePath, clientId) {
return new Promise((resolve) => {
client.remove(nodePath, (err) => {
if (err) console.error(`❌ [${clientId}] Release error:`, err);
else console.log(`🔓 [${clientId}] Released ${nodePath}`);
resolve();
});
});
}
/** 模拟临界区任务 */
async function doWork(clientId, resources) {
console.log(`🧠 [${clientId}] Working on ${resources.join(', ')}...`);
await sleep(2000 + Math.random() * 2000);
}
/** 模拟多个客户端 */
function simulateClients() {
const tasks = [
{ id: 'C1', res: ['user:101', 'item:5'] },
{ id: 'C2', res: ['item:5'] },
{ id: 'C3', res: ['user:101'] },
{ id: 'C4', res: ['item:6', 'user:101'] },
];
for (const t of tasks) {
acquireMultiLock(t.res, t.id).catch(console.error);
}
}| 客户端 ID | 想要加的锁资源 | 说明 |
|---|---|---|
| C1 | ['user:101', 'item:5'] |
同时操作用户101和物品5(例如购买、交易操作) |
| C2 | ['item:5'] |
只操作物品5(例如库存更新) |
| C3 | ['user:101'] |
只操作用户101(例如用户资料更新) |
| C4 | ['item:6', 'user:101'] |
同时操作物品6和用户101(例如新购买) |
假设 ZooKeeper 目录 /multi-locks 结构如下:
/multi-locks/
├── user:101/
├── item:5/
└── item:6/每个客户端都会:
C1 → ['item:5', 'user:101']C4 → ['item:6', 'user:101']举个运行时例子
执行顺序可能是:
✅ C2 先拿到 item:5
⏳ C1 等待 item:5
✅ C3 拿到 user:101
...
🔓 C2 释放 item:5
✅ C1 拿到 item:5
⏳ C1 等待 user:101(因为 C3 在用)
...
🔓 C3 释放 user:101
✅ C1 拿到 user:101
🧠 C1 开始工作整个过程保证:
| 功能 | 技术 |
|---|---|
| 锁节点存放 | /multi-locks/{resource}/lock-* |
| 锁唯一标识 | EPHEMERAL_SEQUENTIAL 节点 |
| 死锁避免 | 按字典序加锁 |
| 锁失效清理 | 客户端断开 ZooKeeper 自动删除 |
| 并发安全 | 等待上一个节点删除后重试 |
还有 “锁超时自动释放 + 重试机制” 的版本
你现在已经玩到 ZooKeeper 锁的“高级关卡”了 —— 我们要实现一个 “公平 + 可重入 + 读写锁(Fair Reentrant ReadWrite Lock)” 的 Node.js 实践 Demo。
| 特性 | 含义 |
|---|---|
| 公平 (Fair) | 锁的获取顺序严格按照请求顺序(节点序号)执行,谁先排队谁先执行。 |
| 可重入 (Reentrant) | 同一个客户端(同一个
clientId)如果已经获得锁,可以再次进入,不会阻塞自己。 |
| 读写锁 (ReadWrite Lock) | 允许多个读者共享锁,但写者必须独占;写锁优先于后续读锁。 |
实现原理(基于ZooKeeper临时顺序节点)
在 /rw-locks 下为每个资源维护子节点:
/rw-locks/myResource/
├── read-0000000001
├── read-0000000002
├── write-0000000003
└── read-0000000004规则:
fair_reentrant_rwlock.js
// fair_reentrant_rwlock.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_ROOT = '/rw-locks';
const client = zookeeper.createClient(ZK_SERVERS);
client.setMaxListeners(1000);
const heldLocks = new Map(); // 记录 clientId 当前持有的锁 {clientId: Set(resources)}
client.once('connected', async () => {
console.log('✅ Connected to ZooKeeper');
await mkdirp(LOCK_ROOT);
simulateClients();
});
client.connect();
function mkdirp(path) {
return new Promise((resolve, reject) => {
client.mkdirp(path, (err) => (err ? reject(err) : resolve()));
});
}
function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}
/** 申请读或写锁 */
async function acquireLock(resource, clientId, mode) {
const resPath = `${LOCK_ROOT}/${resource}`;
await mkdirp(resPath);
// ✅ 可重入支持
if (heldLocks.get(clientId)?.has(resource)) {
console.log(`♻️ [${clientId}] Re-entered ${mode} lock on ${resource}`);
return { resPath, nodePath: null };
}
const nodePath = await new Promise((resolve, reject) => {
client.create(
`${resPath}/${mode}-`,
Buffer.from(clientId),
zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
(err, path) => (err ? reject(err) : resolve(path))
);
});
const nodeName = nodePath.split('/').pop();
await tryAcquire(resPath, nodeName, clientId, mode);
if (!heldLocks.has(clientId)) heldLocks.set(clientId, new Set());
heldLocks.get(clientId).add(resource);
return { resPath, nodePath };
}
/** 判断是否可以获得锁,否则监听前一个节点 */
async function tryAcquire(resPath, nodeName, clientId, mode) {
const children = await new Promise((resolve, reject) => {
client.getChildren(resPath, (err, list) => (err ? reject(err) : resolve(list)));
});
children.sort();
const index = children.indexOf(nodeName);
if (mode.startsWith('read')) {
const before = children.slice(0, index);
const blockingWrite = before.find((n) => n.startsWith('write'));
if (!blockingWrite) {
console.log(`📖 [${clientId}] Got READ lock on ${resPath.split('/').pop()}`);
return;
}
await waitForDelete(`${resPath}/${blockingWrite}`);
return tryAcquire(resPath, nodeName, clientId, mode);
} else {
// 写锁必须等所有前面的节点都删除
if (index === 0) {
console.log(`✍️ [${clientId}] Got WRITE lock on ${resPath.split('/').pop()}`);
return;
}
const prev = children[index - 1];
await waitForDelete(`${resPath}/${prev}`);
return tryAcquire(resPath, nodeName, clientId, mode);
}
}
/** 等待某节点被删除 */
function waitForDelete(path) {
return new Promise((resolve) => {
client.exists(
path,
(event) => {
if (event.getType() === zookeeper.Event.NODE_DELETED) resolve();
},
(err, stat) => {
if (!stat) resolve();
}
);
});
}
/** 释放锁(支持可重入引用计数) */
async function releaseLock(resource, nodePath, clientId) {
const held = heldLocks.get(clientId);
if (!held?.has(resource)) return;
held.delete(resource);
if (held.size === 0) heldLocks.delete(clientId);
if (!nodePath) {
console.log(`🔁 [${clientId}] Reentrant release skip for ${resource}`);
return;
}
return new Promise((resolve) => {
client.remove(nodePath, (err) => {
if (err) console.error(`❌ [${clientId}] Release error:`, err);
else console.log(`🔓 [${clientId}] Released lock on ${resource}`);
resolve();
});
});
}
/** 模拟读/写操作 */
async function doWork(clientId, resource, mode) {
console.log(`🧠 [${clientId}] Doing ${mode.toUpperCase()} work on ${resource}...`);
await sleep(2000 + Math.random() * 2000);
}
/** 模拟多个客户端竞争锁 */
async function simulateClients() {
const clients = [
{ id: 'C1', res: 'book-123', mode: 'read' },
{ id: 'C2', res: 'book-123', mode: 'read' },
{ id: 'C3', res: 'book-123', mode: 'write' },
{ id: 'C4', res: 'book-123', mode: 'read' },
{ id: 'C5', res: 'book-123', mode: 'write' },
];
for (const c of clients) {
(async () => {
const { resPath, nodePath } = await acquireLock(c.res, c.id, c.mode);
await doWork(c.id, c.res, c.mode);
await releaseLock(c.res, nodePath, c.id);
})();
}
// 测试可重入性
setTimeout(async () => {
const r1 = await acquireLock('book-123', 'C1', 'read');
const r2 = await acquireLock('book-123', 'C1', 'read');
await doWork('C1', 'book-123', 'read');
await releaseLock('book-123', r2.nodePath, 'C1');
await releaseLock('book-123', r1.nodePath, 'C1');
}, 8000);
}我们要做的是一个 分布式安全自增计数器,多个客户端同时执行
increment(),但计数结果仍然正确且不会重复。
例如:
/counter = 0increment()/counter=10(不会出现丢失或覆盖)ZooKeeper 并没有原生的“计数器”类型,但我们可以用 CAS 版本号机制 来实现原子自增:
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const COUNTER_PATH = '/distributed_counter';
const client = zookeeper.createClient(ZK_SERVERS);
client.connect();
// 初始化计数器节点
function ensureCounterNode() {
return new Promise((resolve, reject) => {
client.exists(COUNTER_PATH, (err, stat) => {
if (err) return reject(err);
if (stat) return resolve();
client.create(COUNTER_PATH, Buffer.from('0'), (err) => {
if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
return reject(err);
}
console.log('✅ Counter node initialized.');
resolve();
});
});
});
}
// 获取当前计数器值
function getCounterValue() {
return new Promise((resolve, reject) => {
client.getData(COUNTER_PATH, (err, data, stat) => {
if (err) return reject(err);
const value = parseInt(data.toString());
resolve({ value, stat });
});
});
}
// 使用 CAS(compare-and-set)机制自增计数器
async function incrementCounter(retry = 0) {
try {
const { value, stat } = await getCounterValue();
const newValue = value + 1;
await new Promise((resolve, reject) => {
client.setData(COUNTER_PATH, Buffer.from(String(newValue)), stat.version, (err) => {
if (err) {
if (err.getCode() === zookeeper.Exception.BAD_VERSION) {
if (retry < 5) {
console.log(`🔁 Version conflict, retry ${retry + 1}`);
setTimeout(() => incrementCounter(retry + 1).then(resolve).catch(reject), 50);
} else {
reject(new Error('Too many retries'));
}
} else {
reject(err);
}
} else {
console.log(`✅ Increment success: ${value} → ${newValue}`);
resolve(newValue);
}
});
});
} catch (err) {
console.error('❌ Increment failed:', err.message);
}
}
client.once('connected', async () => {
console.log('🔗 Connected to ZooKeeper.');
await ensureCounterNode();
// 模拟多个客户端并发自增
const tasks = Array.from({ length: 10 }, () => incrementCounter());
await Promise.all(tasks);
const { value } = await getCounterValue();
console.log(`🎯 Final counter value = ${value}`);
client.close();
});多个服务实例都从 ZooKeeper 读取同一份配置,当配置在 ZooKeeper 中被修改时,所有客户端自动收到更新通知。
✅ 特点:
zookeeper-config-demo/
├─ config_client.js # 模拟业务服务读取 ZooKeeper 配置
├─ config_updater.js # 模拟管理员更新配置config_client.js
// config_client.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const CONFIG_PATH = '/app_config';
const client = zookeeper.createClient(ZK_SERVERS);
client.connect();
// 监听配置变化
function watchConfig() {
client.getData(CONFIG_PATH, (event) => {
console.log('⚙️ 配置发生变化,重新加载...');
watchConfig(); // 重新注册 watcher
loadConfig(); // 重新读取新配置
}, (err, data, stat) => {
if (err) {
if (err.getCode() === zookeeper.Exception.NO_NODE) {
console.log('❌ 配置节点不存在,等待创建...');
setTimeout(watchConfig, 2000);
return;
}
return console.error('读取配置失败:', err);
}
try {
const config = JSON.parse(data.toString());
console.log('📦 当前配置:', config);
} catch (e) {
console.error('配置格式错误:', e.message);
}
});
}
function loadConfig() {
client.getData(CONFIG_PATH, (err, data) => {
if (!err && data) {
console.log('🔄 更新后的配置:', JSON.parse(data.toString()));
}
});
}
client.once('connected', () => {
console.log('🔗 已连接到 ZooKeeper');
watchConfig();
});config_updater.js
// config_updater.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const CONFIG_PATH = '/app_config';
const client = zookeeper.createClient(ZK_SERVERS);
client.connect();
// 初始化配置节点
function ensureConfigNode(defaultConfig) {
return new Promise((resolve, reject) => {
client.exists(CONFIG_PATH, (err, stat) => {
if (err) return reject(err);
if (stat) return resolve();
const data = Buffer.from(JSON.stringify(defaultConfig));
client.create(CONFIG_PATH, data, (err) => {
if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
return reject(err);
}
console.log('✅ 配置节点已创建');
resolve();
});
});
});
}
// 更新配置
function updateConfig(newConfig) {
const data = Buffer.from(JSON.stringify(newConfig, null, 2));
client.setData(CONFIG_PATH, data, (err, stat) => {
if (err) return console.error('❌ 更新失败:', err);
console.log('✅ 配置已更新:', newConfig);
client.close();
});
}
client.once('connected', async () => {
console.log('🔗 已连接到 ZooKeeper');
await ensureConfigNode({ max_players: 100, motd: "Welcome to the Game!" });
// 模拟管理员修改配置
setTimeout(() => {
updateConfig({
max_players: 200,
motd: "⚡ Server upgraded! Enjoy the game!",
});
}, 3000);
});这种模式常用于 任务分发/消息队列,保证多节点消费者能 公平消费任务,并且任务不会丢失。
/task_queue
├── task-0000000000
├── task-0000000001
└── task-0000000002/task_queue 是队列根节点/task_queue/task- 前缀的 顺序节点getChildren 获取任务列表并按顺序消费// producer.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const QUEUE_PATH = '/task_queue';
const client = zookeeper.createClient(ZK_SERVERS);
client.connect();
function ensureQueueNode() {
return new Promise((resolve, reject) => {
client.exists(QUEUE_PATH, (err, stat) => {
if (err) return reject(err);
if (stat) return resolve();
client.create(QUEUE_PATH, (err) => {
if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) return reject(err);
console.log('✅ 队列节点已创建');
resolve();
});
});
});
}
function produceTask(taskData) {
const data = Buffer.from(JSON.stringify(taskData));
client.create(
`${QUEUE_PATH}/task-`,
data,
zookeeper.CreateMode.PERSISTENT_SEQUENTIAL,
(err, path) => {
if (err) return console.error('❌ 创建任务失败:', err);
console.log('📦 任务已入队:', path);
}
);
}
client.once('connected', async () => {
console.log('🔗 已连接到 ZooKeeper');
await ensureQueueNode();
// 模拟入队任务
produceTask({ id: 1, msg: 'task1' });
produceTask({ id: 2, msg: 'task2' });
produceTask({ id: 3, msg: 'task3' });
setTimeout(() => client.close(), 1000);
});// consumer.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const QUEUE_PATH = '/task_queue';
const client = zookeeper.createClient(ZK_SERVERS);
client.connect();
function consumeTask() {
client.getChildren(
QUEUE_PATH,
(event) => {
console.log('⚡ 队列变化事件:', event);
consumeTask(); // 重新注册 watcher
},
async (err, children) => {
if (err) return console.error('❌ 获取任务失败:', err);
if (!children || children.length === 0) return;
children.sort(); // 按顺序消费
const taskNode = children[0];
const taskPath = `${QUEUE_PATH}/${taskNode}`;
// 取数据并删除节点,实现任务消费
client.getData(taskPath, (err, data) => {
if (err) return console.error('❌ 读取任务失败:', err);
const task = JSON.parse(data.toString());
// 删除节点,表示任务完成
client.remove(taskPath, (err) => {
if (err) return console.error('❌ 删除任务失败:', err);
console.log('✅ 消费任务:', task);
console.log('🗑 任务完成:', taskNode);
});
});
}
);
}
client.once('connected', () => {
console.log('🔗 已连接到 ZooKeeper,开始消费任务...');
consumeTask();
});有的也叫 屏障。
这种模式常用于 分布式节点同步启动、任务协同,保证所有节点在到达某个“关卡”之前都阻塞,等到所有节点都准备好了才统一放行。
/barrier
├── node-0000000000
├── node-0000000001
├── node-0000000002/barrier是Barrier根节点// barrier.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const BARRIER_PATH = '/barrier';
const NODE_COUNT = 3; // Barrier 阈值
const client = zookeeper.createClient(ZK_SERVERS, {
sessionTimeout: 30000, // 30s session
spinDelay: 1000,
retries: 5
});
client.connect();
// --------------------
// 辅助函数:确保 Barrier 根节点存在
// --------------------
function ensureBarrierNode() {
return new Promise((resolve, reject) => {
client.exists(BARRIER_PATH, (err, stat) => {
if (err) return reject(err);
if (stat) return resolve();
client.create(BARRIER_PATH, (err) => {
if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) return reject(err);
console.log('✅ Barrier 节点已创建');
resolve();
});
});
});
}
// --------------------
// 获取子节点带重试机制
// --------------------
function getChildrenWithRetry(path, maxRetries = 5) {
return new Promise((resolve, reject) => {
let attempt = 0;
function tryGet() {
client.getChildren(
path,
(event) => {
// watcher 触发时重新注册,但加延迟避免频繁调用
setTimeout(() => tryGet(), 50);
},
(err, children) => {
if (!err) return resolve(children);
if (err.getCode && err.getCode() === zookeeper.Exception.CONNECTION_LOSS && attempt < maxRetries) {
attempt++;
console.log(`🔁 CONNECTION_LOSS, retry ${attempt}`);
setTimeout(tryGet, 100);
} else {
reject(err);
}
}
);
}
tryGet();
});
}
// --------------------
// Barrier 核心逻辑
// --------------------
async function enterBarrier(nodeName) {
// 创建临时顺序节点
const path = await new Promise((resolve, reject) => {
client.create(
`${BARRIER_PATH}/node-`,
Buffer.from(nodeName),
zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
(err, path) => {
if (err) return reject(err);
console.log(`🔗 ${nodeName} 已加入 Barrier: ${path}`);
resolve(path);
}
);
});
// 等待所有节点到达
await new Promise(async (resolve) => {
async function checkBarrier() {
try {
const children = await getChildrenWithRetry(BARRIER_PATH);
console.log(`⏳ 当前 Barrier 节点数量: ${children.length}/${NODE_COUNT}`);
if (children.length >= NODE_COUNT) {
resolve();
}
} catch (err) {
console.error('❌ 检查 Barrier 失败:', err.message);
// 失败后延迟重试
setTimeout(checkBarrier, 200);
}
}
checkBarrier();
});
console.log(`🎉 ${nodeName} Barrier 放行,开始执行任务!`);
// 安全延迟关闭 client
setTimeout(() => {
client.close();
console.log(`🔒 ${nodeName} 客户端已关闭`);
}, 500);
}
// --------------------
// 启动入口
// --------------------
client.once('connected', async () => {
console.log('🔗 已连接到 ZooKeeper');
await ensureBarrierNode();
const nodeName = process.argv[2] || 'node-' + Math.floor(Math.random() * 1000);
await enterBarrier(nodeName);
});