ZooKeeper

相关书籍

有本叫《ZooKeeper分布式过程协同技术详解》,不过我也没看过,😄。

官方文档最好的教程

https://zookeeper.apache.org/documentation.html 学会基本概念后,一定要去看官方文档,增强自己。自上而下,先从整体简单内容入手,再细读文档。

ZooKeeper 是什么?

Apache ZooKeeper(简称 ZK)是一个开源的分布式协调服务框架,由 Apache Hadoop 项目孵化而来,主要为分布式应用提供高效、可靠的协调机制。它本质上是一个分布式的、类似文件系统的存储系统,但更专注于数据一致性和协调功能,而不是海量数据存储。ZK 使用 ZAB(ZooKeeper Atomic Broadcast)协议(基于 Paxos 算法的变体)来确保数据的一致性、高可用性和原子性操作。 简单来说,它像一个“分布式大脑”,帮助多个服务器或进程在网络环境下协作,避免混乱。

为什么要有 ZooKeeper?它为了解决什么问题?

在单机应用时代,一切简单。但分布式系统(如多个服务器协同工作)会面临诸多挑战:网络延迟、节点崩溃、数据不一致等。ZooKeeper 的诞生就是为了解决这些分布式协调问题,提供一个中心化的(但高可用)协调点,让开发者不用从零构建复杂的分布式逻辑。

具体解决的问题包括:

总之,ZK 不是存储大数据的工具,而是“胶水”——让分布式系统更可靠、高性能。 它特别适合对吞吐量有要求的场景,如大数据和微服务架构。

实际工程中的例子

在实际项目中,ZooKeeper 广泛用于大型分布式系统。下面举几个真实案例,展示它如何解决具体痛点:

工程/项目 问题描述 ZooKeeper 如何解决
Hadoop/HBase 在 HDFS(Hadoop Distributed File System)中,多个 NameNode 需要协调元数据一致性;HBase 的 RegionServer 需要动态负载均衡。 ZK 作为协调中心,实现 Leader 选举和配置同步,确保元数据一致,避免数据丢失或不一致。
Apache Kafka Kafka 的 Broker 和分区需要 Leader 选举;消费者组需要协调分区分配。 ZK 用于存储 Broker 注册信息和 Leader 元数据,支持故障时快速选举新 Leader,实现高可用分区管理。(注:Kafka 3.3+ 已迁移到 KRaft,但早期依赖 ZK)
Dubbo(阿里巴巴微服务框架) 微服务间服务发现难,配置变更需手动同步,导致部署复杂。 ZK 作为注册中心,服务提供者注册地址,消费者动态发现;支持配置中心实时推送变更,实现无感知更新。
ShardingSphere(数据库分片中间件) 多节点 Sharding-Proxy 需要统一分片规则和节点管理,防止规则不一致导致数据倾斜。 ZK 作为配置中心,存储分片规则,支持节点注册和监控,实现集群统一管理和故障转移。

Docker Compose搭建ZooKeeper集群

mkdir zookeeper-tutorial && cd zookeeper-tutorial

使用 Docker Compose 搭建 3 节点 ZooKeeper 集群

docker-compose.yml

version: '3.9'

services:
  zk1:
    image: zookeeper:3.9.4
    container_name: zk1
    restart: unless-stopped
    hostname: zk1
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      ZOO_4LW_COMMANDS_WHITELIST: "*"
    volumes:
      - zk1_data:/data
      - zk1_datalog:/datalog

  zk2:
    image: zookeeper:3.9.4
    container_name: zk2
    restart: unless-stopped
    hostname: zk2
    ports:
      - "2182:2181"
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      ZOO_4LW_COMMANDS_WHITELIST: "*"
    volumes:
      - zk2_data:/data
      - zk2_datalog:/datalog

  zk3:
    image: zookeeper:3.9.4
    container_name: zk3
    restart: unless-stopped
    hostname: zk3
    ports:
      - "2183:2181"
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      ZOO_4LW_COMMANDS_WHITELIST: "*"
    volumes:
      - zk3_data:/data
      - zk3_datalog:/datalog

volumes:
  zk1_data:
  zk1_datalog:
  zk2_data:
  zk2_datalog:
  zk3_data:
  zk3_datalog:

对docker-compose.yml解释:

停止集群并清理旧集群(删除volumes以避免残留数据)

docker-compose down -v

启动集群:

docker-compose up -d

验证启动 检查是否有报错信息:

docker logs zk1
docker logs zk2
docker logs zk3

进到容器里

docker exec -it zk1 bash

root@zk3:/apache-zookeeper-3.9.4-bin# ls
bin  conf  docs  lib  LICENSE.txt  NOTICE.txt  README.md  README_packaging.md

root@zk3:/apache-zookeeper-3.9.4-bin/bin# ll -lrt
total 88
-rwxr-xr-x 1 zookeeper zookeeper  1385 Aug 19 19:49 zkTxnLogToolkit.sh*
-rwxr-xr-x 1 zookeeper zookeeper   996 Aug 19 19:49 zkTxnLogToolkit.cmd*
-rwxr-xr-x 1 zookeeper zookeeper  1377 Aug 19 19:49 zkSnapShotToolkit.sh*
-rwxr-xr-x 1 zookeeper zookeeper   988 Aug 19 19:49 zkSnapShotToolkit.cmd*
-rwxr-xr-x 1 zookeeper zookeeper  1422 Aug 19 19:49 zkSnapshotRecursiveSummaryToolkit.sh*
-rwxr-xr-x 1 zookeeper zookeeper   995 Aug 19 19:49 zkSnapshotRecursiveSummaryToolkit.cmd*
-rwxr-xr-x 1 zookeeper zookeeper  1374 Aug 19 19:49 zkSnapshotComparer.sh*
-rwxr-xr-x 1 zookeeper zookeeper   987 Aug 19 19:49 zkSnapshotComparer.cmd*
-rwxr-xr-x 1 zookeeper zookeeper 11680 Aug 19 19:49 zkServer.sh*
-rwxr-xr-x 1 zookeeper zookeeper  4559 Aug 19 19:49 zkServer-initialize.sh*
-rwxr-xr-x 1 zookeeper zookeeper  1243 Aug 19 19:49 zkServer.cmd*
-rwxr-xr-x 1 zookeeper zookeeper  3947 Aug 19 19:49 zkEnv.sh*
-rwxr-xr-x 1 zookeeper zookeeper  1810 Aug 19 19:49 zkEnv.cmd*
-rwxr-xr-x 1 zookeeper zookeeper  1576 Aug 19 19:49 zkCli.sh*
-rwxr-xr-x 1 zookeeper zookeeper  1115 Aug 19 19:49 zkCli.cmd*
-rwxr-xr-x 1 zookeeper zookeeper  1978 Aug 19 19:49 zkCleanup.sh*
-rwxr-xr-x 1 zookeeper zookeeper   232 Aug 19 19:49 README.txt*
drwxr-xr-x 2 zookeeper zookeeper  4096 Aug 19 19:49 ./
drwxr-xr-x 6 zookeeper zookeeper  4096 Oct  2 08:52 ../

在bin下有 zkCli.sh

root@zk3:/apache-zookeeper-3.9.4-bin/bin# ./zkCli.sh 
...
...
2025-10-15 07:41:38,559 [myid:localhost:2181] - INFO  [main-SendThread(localhost:2181):o.a.z.ClientCnxn$SendThread@1427] - Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x30203aaa8590000, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null zxid: -1
[zk: localhost:2181(CONNECTED) 0] 

集群验证,测试一致性

 zk1中执行
[zk: localhost:2181(CONNECTED) 0] create /test-cluster "Cluster OK"
Created /test-cluster
[zk: localhost:2181(CONNECTED) 1] 
 zk1中 zkCli连接 zk2
root@zk3:/apache-zookeeper-3.9.4-bin/bin# ./zkCli.sh -server zk2:2181
Connecting to zk2:2181
..
..
2025-10-15 07:44:26,765 [myid:zk2:2181] - INFO  [main-SendThread(zk2:2181):o.a.z.ClientCnxn$SendThread@1427] - Session establishment complete on server zk2/172.18.0.4:2181, session id = 0x20203aaa89b0000, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null zxid: -1
[zk: zk2:2181(CONNECTED) 3] ls /
[test-cluster, zookeeper]
[zk: zk2:2181(CONNECTED) 4] get /test-cluster
Cluster OK
[zk: zk2:2181(CONNECTED) 5] 

可见我们的集群基本搭建成功了。

基本认识

什么是ZNode?

ZNode是ZooKeeper的核心数据单元,类似于文件系统中的文件或目录,每个ZNode都有一个路径(如 /app/config),可以存储少量数据(通常小于1MB),并支持子节点,形成树状结构,ZNode有几种类型:

ZNode 支持版本控制(stat 信息包括版本号),允许原子操作和 watcher(事件监听)。

ZooKeeper存什么?

ZooKeeper 不存大数据(它不是 HDFS 或 Cassandra),而是存储小量、结构化的元数据和协调信息。典型内容包括:

数据以字节数组(byte[])形式存储,支持 ACL(访问控制)和 watcher 通知变更。ZK 强调高性能读写(QPS > 10k),适合 < 1MB 的小对象。

ZooKeeper的结构

ZooKeeper的数据结构是一个层次化的树状结构(类似 Unix 文件系统),根节点为 /。每个 ZNode 是树中的一个节点:

/
├── zookeeper (系统节点,内置)
├── config
   ├── db
   │   └── url = "jdbc:mysql://host:3306/db"
   └── timeout = "30s"
├── services
   ├── web1 (临时节点) = "192.168.1.10:8080"
   └── web2 (顺序节点) = "192.168.1.11:8080"
└── locks
    └── task-0000000001 (临时顺序节点) = "" (空数据,仅路径用于锁)

这种结构支持快速遍历(ls/get)和事件驱动(watcher 监听子节点/数据变化)。ZK 集群通过复制确保一致性,数据分布在 Leader 和 Follower 节点上。

数据

ZNode时ZooKeeper的基本数据单元,ZNode有 数据(Data)、子节点列表(Children List)、ACL(Access Control List)、Stat(统计信息) 这四大概念。

这些概念构成了 ZNode 的完整属性模型,支持 ZooKeeper 的树状数据结构和分布式协调功能。

数据,ZNode 存储的实际内容,通常是字节数组(byte[]),大小限制为 1MB 以内。

用于保存配置、服务地址等元数据。例如,ZNode /config/db 的数据为 "jdbc:mysql://host:3306"。通过 get 命令读取。

子节点列表

子节点列表,ZNode的直接子节点路径列表,形成层次化树状结构(根为 /)。

支持服务发现和命名服务。例如,/services 的子节点列表为 ["web1", "web2"],通过 ls 或 getChildren 获取。

ACL

ACL (Access Control List), 访问控制列表,定义谁能对 ZNode 执行读/写/创建/删除/管理操作。

确保安全,例如 digest:user:pass:crdwa 允许特定用户所有权限。通过 setACL 设置、getACL 获取。

stat

Stat (Statistics), ZNode 的元数据统计,包括版本号(cVersion/dataVersion)、时间戳(cZxid/mZxid)、子节点数等。

用于乐观锁和变更检测,例如 stat /path 返回 {czxid: 0x100000000, ctime: 169xxxxxx, ...}

ZNode 数据详情

在ZooKeeper中,ZNode是最基本的数据单元,类似于文件系统中的文件或目录。ZNode的Data(数据)是ZNode的核心内容部分,它存储ZNode的实际负载(payload),用于保存应用所需的元数据或配置信息。

  1. Data部分的定义与特性
  1. Data的限制与注意事项
  1. Data的用途

Data是ZK协调服务的“载体”,常见场景:

  1. 与其他ZNode属性的关系

Data独立于其他部分,但操作时常结合:

  1. 相关操作

操作Data的主要方式是通过客户端工具 如zkCli.sh 或 API(如Node.js的node-zookeeper-client)。

假设已连接 zkCli

操作 命令 示例 说明
创建 ZNode 时设置 Data create /path "data" create /config/db "jdbc:mysql://localhost:3306" 创建持久节点,Data 为字符串。支持 -e(临时)、-s(顺序)
读取 Data get /path [watch] get /config/db 输出 Data 内容;watch 监听变更。返回 Data + Stat
修改 Data set /path "newdata" [version] set /config/db "jdbc:mysql://newhost:3306" 1 更新 Data;version 为乐观锁(-1 表示忽略版本)。
获取 Data + Stat stat /path stat /config/db 不返回 Data,只返回 Stat(包括 dataLength)
删除 ZNode(隐式清空 Data) delete /path delete /config/db 删除节点,Data 随之丢失。

Watcher示例: get /config/db watch,然后在另一终端 set /config/db "updated",会触发事件通知 WatchedEvent state:SyncConnected type:NodeDataChanged path:/config/d

  1. 最佳实践与常见问题
  1. Node.js样例
const zookeeper = require('node-zookeeper-client');

const client = zookeeper.createClient('127.0.0.1:2181');

client.connect();


function testCode() {
    // 创建ZNode时设置Data
    client.create('/config', Buffer.from("jdbc:mysql://localhost:3306"), zookeeper.CreateMode.PERSISTENT, (err, path) => {
        if (err) console.error('Create error:', err);
        else console.log('Created:', path);
    });
    // 读取Data 带Watcher
    client.getData('/config', (event, data, stat) => {
        console.log('Data:', data.toString('utf8'));  // 转为字符串
        console.log('Stat:', stat);  // { version: 0, dataLength: 25, ... }
    }, (err, data, stat) => {
        if (err) console.error('Get error:', err);
        else console.log('Initial Data:', data.toString('utf8'));
    });
    // 修改Data 带版本控制
    client.setData('/config/db', Buffer.from('jdbc:mysql://newhost:3306'), -1, (err, stat) => {  // -1 忽略版本
        if (err) console.error('Set error:', err);
        else console.log('Updated Stat:', stat.version);  // 新版本号
    });
    // 关闭连接
    client.close();
}

client.once('connected', () => {
    console.log('Connected to ZooKeeper.');
    testCode();
});

解释:

ZNode 子节点列表详情

在ZooKeeper(简称ZK)中,ZNode的子节点列表(Children List)是ZNode的另一个核心属性部分,它标识该ZNode的直接子节点(Direct Children)的路径名称列表,形成一个层次化的树状数据结构。 子节点列表类似于文件系统目录下的文件/子目录列表,但ZooKeeper强调分布式一致性和事件驱动通知。

  1. 子节点列表的定义与特性
  1. 子节点列表的限制与注意事项
  1. 子节点列表的用途

Children List是ZooKeeper命名服务和发现机制的基础,常见场景:

  1. 与其他ZNode属性的关系

Children List独立但互补:

  1. zkCli操作
操作 命令 示例 说明
创建子节点(间接添加至列表) create /parent/child "data" create /services/node1 "ip:8080" 创建子节点,自动加入 /services 的 Children List。支持 -e(临时)、-s(顺序)。
列出子节点列表 ls /path [watch] ls /services 输出 Children List(如 [node1, node2]);watch 监听变化。
列出子节点 + Stat ls2 /path ls2 /services 输出列表 + 每个子节点的简要 Stat(如版本、时间)。
删除子节点(间接移除列表) delete /parent/child delete /services/node1 移除子节点,从列表中删除;需 d 权限。
获取子节点总数 getAllChildrenNumber getAllChildrenNumber 全局统计所有 ZNode 的子节点总数(非特定路径)。
递归删除(清空列表) deleteall /path 或 rmr /path rmr /services 删除 ZNode 及其所有子节点,清空列表。

Watcher示例:ls /services watch,然后在另一终端 create /services/node3 "ip:8081",会触发:"WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/services"

  1. 最佳实践与常见问题

实践:用顺序节点实现有序列表(如队列);Watcher持久化监听(递归重新注册),分页遍历大列表(ZK 无内置分页,用前缀过滤)

  1. Node.js
const zookeeper = require('node-zookeeper-client');

const client = zookeeper.createClient('127.0.0.1:2181');

client.connect();

function listServices() {
    client.getChildren('/services',
        function (event) {
            if (event) {
                console.log('Children changed event:', event);
                // Re-attach watcher recursively
                listServices();
            }
        },
        function (err, children, stat) {
            if (err) {
                console.error('GetChildren error:', err);
                return;
            }
            console.log('Children List:', children);  // e.g., ['node1']
            console.log('Num Children:', stat ? stat.numChildren : 'N/A');
            console.log('cVersion:', stat ? stat.cversion : 'N/A');
        }
    );
}

function testCode() {
    // 0. Create persistent parent node (allows ephemeral children)
    client.create('/services', Buffer.from('fnode'), zookeeper.CreateMode.PERSISTENT, (err, path) => {
        if (err) console.error('services Create error:', err);
        else console.log('services created', path);
    });

    // 1. Create ephemeral child node (adds to list)
    client.create('/services/node1', Buffer.from('ip:8080'), zookeeper.CreateMode.EPHEMERAL, (err, path) => {
        if (err) console.error('Create error:', err);
        else console.log('Added to list:', path);
    });

    // 2. List children with recursive watcher
    listServices();

    // 3. Delete child after delay (to allow initial list and watcher setup)
    setTimeout(() => {
        client.remove('/services/node1', -1, (err) => {
            if (err) console.error('Delete error:', err);
            else console.log('Removed from list');
        });
    }, 1000);

    // 4. Close after more delay (to see delete effect)
    setTimeout(() => {
        client.close();
    }, 2000);
}

client.once('connected', () => {
    console.log('Connected to ZooKeeper.');
    testCode();
});

// root@ser745692301841:/dev_dir/zookeeper-tutorial/js# node index.js
// Connected to ZooKeeper.
// services created /services
// Added to list: /services/node1
// Children List: [ 'node1' ]
// Num Children: 1
// cVersion: 1
// Children changed event: Event { type: 4, name: 'NODE_CHILDREN_CHANGED', path: '/services' }
// Removed from list
// Children List: []
// Num Children: 0
// cVersion: 2

解释:

ZNode ACL详情

在ZooKeeper中,ZNode的ACL(Acess Control List,访问控制列表)是ZNode的安全属性部分,用于定义对ZNode的访问权限。它提供细粒度的授权机制,确保分布式环境中只有授权主体才能执行特定操作(如读、写、删除)。ACL是ZooKeeper安全模型的核心,帮助防止未授权访问,尤其在多用户或服务场景下。

  1. ACL的定义与特性
  1. ACL的限制与注意事项
  1. ACL的用途

ACL是ZooKeeper安全保障的关键,常见场景:

  1. 与其他ZNode属性的关系

ACL独立但控制其他部分:

  1. 最佳实践与常见问题
  1. zkCli
操作 命令 示例 说明
认证(addAuth) addauth scheme id addauth digest admin:admin123 认证以访问受限节点;digest 会哈希密码。
设置 ACL setAcl /path scheme:id:perms setAcl /secure world:anyone:r,digest:admin:admin123:cdrwa 设置多个规则(逗号分隔);覆盖现有 ACL。
获取 ACL getAcl /path getAcl /secure 输出 ACL 列表,如 world:anyone:cdrwa。
创建时设置 ACL create /path "data" acl create /secure "data" world:anyone:cdrwa 创建节点时指定 ACL(可选)。
列出 ACL 版本 stat /path stat /secure 输出 Stat 中的 aversion(ACL 版本)。
移除 ACL setAcl /path open:anyone:cdrwa setAcl /secure world:anyone:cdrwa 设为默认开放(非删除)。

示例流程:

  1. Node.js

改一下docker-compose.yml,使用自定义 zoo.cfg

version: '3.9'

services:
  zk1:
    image: zookeeper:3.9.4
    container_name: zk1
    restart: unless-stopped
    hostname: zk1
    ports:
      - "2181:2181"
    networks:
      - zk-net
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      ZOO_4LW_COMMANDS_WHITELIST: "*"
    volumes:
      - zk1_data:/data
      - zk1_datalog:/datalog
      - ./zoo.cfg:/apache-zookeeper-3.9.4-bin/conf/zoo.cfg

  zk2:
    image: zookeeper:3.9.4
    container_name: zk2
    restart: unless-stopped
    hostname: zk2
    ports:
      - "2182:2181"
    networks:
      - zk-net
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      ZOO_4LW_COMMANDS_WHITELIST: "*"
    volumes:
      - zk2_data:/data
      - zk2_datalog:/datalog
      - ./zoo.cfg:/apache-zookeeper-3.9.4-bin/conf/zoo.cfg

  zk3:
    image: zookeeper:3.9.4
    container_name: zk3
    restart: unless-stopped
    hostname: zk3
    ports:
      - "2183:2181"
    networks:
      - zk-net
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      ZOO_4LW_COMMANDS_WHITELIST: "*"
    volumes:
      - zk3_data:/data
      - zk3_datalog:/datalog
      - ./zoo.cfg:/apache-zookeeper-3.9.4-bin/conf/zoo.cfg

volumes:
  zk1_data:
  zk1_datalog:
  zk2_data:
  zk2_datalog:
  zk3_data:
  zk3_datalog:

networks:
  zk-net:
    driver: bridge

zoo.cfg

# 基本配置
tickTime=2000
initLimit=10
syncLimit=5

# 数据和日志目录(镜像内部路径)
dataDir=/data
dataLogDir=/datalog

# 客户端端口(内部)
clientPort=2181

# 集群服务器配置(使用容器名称作为主机名)
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888

# 启用 Digest 认证提供者
authProvider.1=org.apache.zookeeper.server.auth.DigestAuthenticationProvider

# 超级用户 Digest(admin:admin123 的哈希值)
superDigest=admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE=
[zk: 127.0.0.1:2181(CONNECTED) 0] addauth digest admin:admin123
[zk: 127.0.0.1:2181(CONNECTED) 1] create /test "data"
Created /test
[zk: 127.0.0.1:2181(CONNECTED) 2] ls /
[test, zookeeper]
[zk: 127.0.0.1:2181(CONNECTED) 3] create /secure-test "protected data" digest:admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE=:cdrwa
Created /secure-test
[zk: 127.0.0.1:2181(CONNECTED) 4] getAcl /secure-test
'digest,'admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE=
: cdrwa
[zk: 127.0.0.1:2181(CONNECTED) 5] get /secure-test
protected data
[zk: 127.0.0.1:2181(CONNECTED) 6] 

退出会话重新进Cli

root@zk1:/apache-zookeeper-3.9.4-bin/bin# ./zkCli.sh -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 1] get /secure-test
Insufficient permission : /secure-test
[zk: 127.0.0.1:2181(CONNECTED) 2] 
# 可见没有权限
# 进行AUTH
[zk: 127.0.0.1:2181(CONNECTED) 2] addauth digest admin:admin123
[zk: 127.0.0.1:2181(CONNECTED) 3] get /secure-test
protected data
[zk: 127.0.0.1:2181(CONNECTED) 4] 
const zookeeper = require('node-zookeeper-client');

const client = zookeeper.createClient('127.0.0.1:2181');

client.connect();

function testCode() {
    // Define ACLs after auth
    const worldAcl = new zookeeper.ACL(zookeeper.Permission.ADMIN, new zookeeper.Id('world', 'anyone'));
    const digestAcl = new zookeeper.ACL(zookeeper.Permission.ADMIN, new zookeeper.Id('digest', 'admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE='));
    const acls = [worldAcl, digestAcl];

    // 2. 创建节点(如果不存在),然后设置 ACL
    client.create('/secure', Buffer.from('initial data'), zookeeper.CreateMode.EPHEMERAL, (err, path) => {
        if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
            console.error('Create error:', err);
            client.close();
            return;
        }
        console.log('Node /secure created or exists:', path);

        // Now set ACL
        client.setACL('/secure', acls, -1, (err, stat) => {  // -1 ignores version
            if (err) {
                console.error('SetACL error:', err);
                client.close();
                return;
            }
            console.log('ACL set, version:', stat.aversion);

            // 3. 获取 ACL
            client.getACL('/secure', (err, retrievedAcls, stat) => {
                if (err) console.error('GetACL error:', err);
                else {
                    console.log('Retrieved ACLs:', retrievedAcls);
                    console.log('ACL Version:', stat.aversion);

                    // 4. 创建新节点时设置 ACL(正确顺序:data, acls, mode)
                    client.create('/secure-new', Buffer.from('data'), acls, zookeeper.CreateMode.EPHEMERAL, (err, path) => {
                        if (err) console.error('Create error:', err);
                        else console.log('Created with ACL:', path);

                        // 5. 关闭连接
                        setTimeout(() => client.close(), 1000);  // Brief delay to see ephemeral nodes
                    });
                }
            });
        });
    });
}

client.once('connected', () => {
    console.log('Connected to ZooKeeper.');

    // 1. 认证(addAuthInfo)
    client.addAuthInfo('digest', Buffer.from('admin:admin123'));

    console.log('Authentication info added.');

    testCode();
});

// root@ser745692301841:/dev_dir/zookeeper-tutorial/js# node index.js
// Connected to ZooKeeper.
// Authentication info added.
// Node /secure created or exists: /secure
// ACL set, version: 1
// Retrieved ACLs: [
//   ACL { permission: 16, id: Id { scheme: 'world', id: 'anyone' } },
//   ACL {
//     permission: 16,
//     id: Id { scheme: 'digest', id: 'admin:dyks0yAxrXfvFZ7G5BN0+ZUcGCE=' }
//   }
// ]
// ACL Version: 1
// Created with ACL: /secure-new

ZNode Stat详情

在ZooKeeper中,ZNode的Stat(Statistics,统计信息)是ZNode的元数据属性部分,它记录了ZNode的生命周期和变更历史,提供版本控制、时间戳和 计数等信息。

Stat不存储实际数据,而是辅助其他操作实现一致性、乐观锁和监控,Stat是只读的、全集群一致的(通过ZAB协议同步),常用于检测变更或诊断问题。

  1. Stat的定义与特性
类别 字段 类型 描述
ZXID(事务ID) czxid long 创建ZNode的ZXID(ZooKeeper事务ID,全局单调递增)
mzxid long 最后修改ZNode的ZXID(Data或ACL变更)
pzxid long 最后子节点变更的ZXID(创建、删除 Children)
时间戳 ctime long 创建时间(毫秒时间戳,Unix Epoch)
mtime long 最后修改时间(Data或子节点变更)
版本号 cversion int 子节点变更版本(Children List修改时递增)
dataVersion int Data变更版本(setData时递增)
aclVersion int ACL变更版本(setACL时递增)
临时节点 ephemeralOwner long 临时节点(Ephemeral)的会话ID(0表示非临时)
统计 dataLength int Data的字节长度(0表示空)
numChildren int 当前子节点数
  1. Stat的限制与注意事项
  1. Stat的用途

Stat是ZooKeeper一致性和调试的“指纹”,常见场景:

  1. 与其他ZNode属性的关系

Stat汇总其他部分的状态:

  1. 最佳实践与常见问题
  1. zkCli
操作 命令 示例 说明
获取 Stat stat /path stat /config 输出所有 Stat 字段,如 czxid = 0x100000001、dataVersion = 0 等。
获取 Data + Stat get /path [watch] get /config watch 输出 Data + Stat;watch 监听 Data 变更(间接更新 Stat)。
获取 Children + Stat ls2 /path ls2 /services 输出 Children List + 父节点的 Stat(numChildren 等)。
检查存在 + Stat ls /path(隐式) ls /config 如果节点不存在,输出无 Stat;存在时附带简要 Stat。
版本控制操作 set /path "data" version set /config "new" 1 用 dataVersion 作为预期版本;不匹配失败。
[zk: localhost:2181(CONNECTED) 0] ls /
[secure-test, test, zookeeper]
[zk: localhost:2181(CONNECTED) 1] stat /test
cZxid = 0x100000004
ctime = Thu Oct 16 08:33:50 UTC 2025
mZxid = 0x100000004
mtime = Thu Oct 16 08:33:50 UTC 2025
pZxid = 0x100000004
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0
[zk: localhost:2181(CONNECTED) 2] 
  1. Node.js
const zookeeper = require('node-zookeeper-client');

const client = zookeeper.createClient('127.0.0.1:2181');
client.connect();

// helper:把可能是 Buffer 的字段解析成 Number
function bufToNumber(buf) {
    if (!Buffer.isBuffer(buf)) return buf;

    // 常见两种长度:8 bytes (int64) 或 4 bytes (uint32)
    try {
        if (buf.length === 8 && typeof buf.readBigInt64BE === 'function') {
            const bi = buf.readBigInt64BE(0);
            // 转成 Number 可能会丢精度,但一般 zxid/ctime/mtime 在可表示范围内
            return Number(bi);
        }
        if (buf.length === 8 && typeof buf.readBigInt64LE === 'function') {
            // 如果是小端(不太常见),用 LE
            const bi = buf.readBigInt64LE(0);
            return Number(bi);
        }
        if (buf.length === 4) {
            return buf.readUInt32BE(0);
        }
        // 退而求其次:当做字符串
        return parseInt(buf.toString('hex'), 16);
    } catch (e) {
        // 如果都失败,返回原始 Buffer
        return buf;
    }
}

// 把 stat 里常用字段安全读取并格式化的函数
function formatStat(stat) {
    const get = (name, getterName) => {
        try {
            // 先尝试 getter(如果实现了)
            if (typeof stat[getterName] === 'function') {
                return stat[getterName]();
            }
            // 再尝试直接属性
            if (stat[name] !== undefined) {
                return stat[name];
            }
            // 再尝试小写或不同拼写的属性
            if (stat[name.toLowerCase()] !== undefined) {
                return stat[name.toLowerCase()];
            }
            return undefined;
        } catch (e) {
            return undefined;
        }
    };

    const czxidRaw = get('czxid', 'getCzxid') ?? stat.czxid;
    const mzxidRaw = get('mzxid', 'getMzxid') ?? stat.mzxid;
    const pzxidRaw = get('pzxid', 'getPzxid') ?? stat.pzxid;
    const ctimeRaw = get('ctime', 'getCtime') ?? stat.ctime;
    const mtimeRaw = get('mtime', 'getMtime') ?? stat.mtime;
    // data/version fields在不同实现中名字不同
    const versionRaw = get('version', 'getVersion') ?? stat.version ?? stat.dataVersion;
    const cversionRaw = get('cversion', 'getCversion') ?? stat.cversion;
    const numChildrenRaw = get('numChildren', 'getNumChildren') ?? stat.numChildren;

    // 解析 Buffer -> Number(如果需要)
    const czxid = bufToNumber(czxidRaw);
    const mzxid = bufToNumber(mzxidRaw);
    const pzxid = bufToNumber(pzxidRaw);
    const ctimeNum = bufToNumber(ctimeRaw);
    const mtimeNum = bufToNumber(mtimeRaw);
    const version = bufToNumber(versionRaw);
    const cversion = bufToNumber(cversionRaw);
    const numChildren = bufToNumber(numChildrenRaw);

    return {
        czxid,
        mzxid,
        pzxid,
        ctime: (typeof ctimeNum === 'number' && !Number.isNaN(ctimeNum)) ? new Date(ctimeNum) : ctimeRaw,
        mtime: (typeof mtimeNum === 'number' && !Number.isNaN(mtimeNum)) ? new Date(mtimeNum) : mtimeRaw,
        version,
        cversion,
        numChildren
    };
}

function testCode() {
    // 1. exists -> stat
    client.exists('/secure-test', (err, stat) => {
        if (err) return console.error('Exists error:', err);
        if (!stat) return console.log('Node does not exist');

        console.log('Stat:', formatStat(stat));
    });

    // 2. getData -> data + stat
    client.getData('/secure-test', (err, data, stat) => {
        if (err) return console.error('GetData error:', err);

        console.log('Data:', data ? data.toString('utf8') : null);
        console.log('Stat (from getData):', formatStat(stat));
    });

    // 3. getChildren('/', ...)
    client.getChildren('/', (err, children, stat) => {
        if (err) return console.error('GetChildren error:', err);

        console.log('Children:', children);
        console.log('Root stat:', formatStat(stat));
    });

    // 4. 修改 data(带版本控制)
    client.getData('/secure-test', (err, data, stat) => {
        if (err) return console.error('GetData error before set:', err);

        const fmt = formatStat(stat);
        const expectedVersion = typeof fmt.version === 'number' ? fmt.version : -1;

        client.setData('/secure-test', Buffer.from('updated'), expectedVersion, (err, newStat) => {
            if (err) {
                if (err.getCode && err.getCode() === zookeeper.Exception.BAD_VERSION) {
                    console.error('Version mismatch');
                } else {
                    console.error('SetData error:', err);
                }
            } else {
                console.log('Updated, new stat:', formatStat(newStat));
            }
        });
    });
}

client.once('connected', () => {
    console.log('Connected to ZooKeeper.');
    client.addAuthInfo('digest', Buffer.from('admin:admin123'));
    console.log('Authentication info added.');
    testCode();
});

// root@ser745692301841:/dev_dir/zookeeper-tutorial/js# node index.js
// Connected to ZooKeeper.
// Authentication info added.
// Stat: {
//   czxid: 4294967301,
//   mzxid: 4294967319,
//   pzxid: 4294967301,
//   ctime: 2025-10-16T08:41:32.407Z,
//   mtime: 2025-10-16T17:32:56.581Z,
//   version: 1,
//   cversion: 0,
//   numChildren: 0
// }
// Data: updated
// Stat (from getData): {
//   czxid: 4294967301,
//   mzxid: 4294967319,
//   pzxid: 4294967301,
//   ctime: 2025-10-16T08:41:32.407Z,
//   mtime: 2025-10-16T17:32:56.581Z,
//   version: 1,
//   cversion: 0,
//   numChildren: 0
// }
// Children: [ 'zookeeper', 'test', 'secure-test' ]
// Root stat: {
//   czxid: 0,
//   mzxid: 0,
//   pzxid: 4294967315,
//   ctime: 1970-01-01T00:00:00.000Z,
//   mtime: 1970-01-01T00:00:00.000Z,
//   version: 0,
//   cversion: 7,
//   numChildren: 3
// }
// Updated, new stat: {
//   czxid: 4294967301,
//   mzxid: 4294967328,
//   pzxid: 4294967301,
//   ctime: 2025-10-16T08:41:32.407Z,
//   mtime: 2025-10-16T17:43:33.718Z,
//   version: 2,
//   cversion: 0,
//   numChildren: 0
// }

基本命令

ZooKeeper的指令主要分为两类,zkCli.sh客户端命令(用于交互式操作ZNode,如创建、删除节点)和4字母单词命令(4LW Commands)(用于服务器监控和管理,通过Telnet或nc发送)。

注意: zkCli命令在zkCli.sh会话中执行 4LW命令需连接服务器端口 如 echo stat | nc localhost 2181

zkCli.sh客户端命令

这些是 ZooKeeper CLI(zkCli.sh)的核心命令,用于管理 ZNode(ZooKeeper 的数据节点)。完整列表如下(按功能分类):

命令 描述 示例
help 显示所有可用命令帮助 help
addauth 添加认证方案 如digest addauth digest user:pass
close 关闭当前连接 close
connect 连接到指定服务器 connect localhost:2181
config 获取/设置集群配置 config get
create 创建ZNode(持久/临时/顺序) create /path “data” -e -s
delete 删除ZNode delete /path
deleteall 递归删除ZNode及其子节点 deleteall /path
get 获取ZNode数据 get /path
getACL 获取ZNode ACL(访问控制列表) getACL /path
getAllChildrenNumber 获取所有ZNode的子节点总数 getAllChildrenNumber
getChildren 获取ZNode子节点列表(带统计) getChildren /path
history 显示命令历史 history
ls 列出ZNode子节点 ls /path
ls2 列出子节点并显示统计信息 ls2 /path
printWatches 打印所有监听器 printWatches summary
pwd 显示当前工作路径 pwd
quit 退出zkCli quit
rmr 递归删除ZNode(等同deleteall) rmr /path
set 设置ZNode数据 set /path “newdata”
setACL 设置ZNode ACL setACL /path world:anyone:cdrwa
stat 获取ZNode统计信息 stat /path
sync 同步客户端状态 sync

这些命令支持 watcher(监听器)选项,如 get /path watch

4字母单词命令(4LW Commands)

这些是服务器级命令,用于监控和诊断,通常通过网络工具发送。默认白名单启用大多数命令(排除 wchp和wchc), 可通过 4lw.commands.whitelist=* 配置启用全部,

命令 描述 示例
cons 列出所有连接会话 echo cons \| nc localhost 2181
conf 显示服务器配置 echo conf \| nc localhost 2181
crst 重置会话超时 echo crst \| nc localhost 2181
diff 列出未提交的变更 echo diff \| nc localhost 2181
dump 显示会话和临时节点 echo dump \| nc localhost 2181
echo 回显输入(测试连接) echo echo whoami \| nc localhost 2181
envi 显示环境变量 echo envi \| nc localhost 2181
gbch 列出所有会话中的变更 echo gbch \| nc localhost 2181
get 获取 ZNode 数据(需路径) echo get /path \| nc localhost 2181
ha-status 显示高可用状态 echo ha-status \| nc localhost 2181
hsts 显示 Leader 选举历史 echo hsts \| nc localhost 2181
isro 检查是否为只读模式 echo isro \| nc localhost 2181
leader 选举新 Leader(需路径) echo leader \| nc localhost 2181
mntr 监控服务器指标(JSON) echo mntr \| nc localhost 2181
mtr 列出监听器 echo mtr \| nc localhost 2181
net 显示网络统计 echo net \| nc localhost 2181
pkst 列出未提交的包 echo pkst \| nc localhost 2181
qstatus 显示队列状态 echo qstatus \| nc localhost 2181
ruok 检查服务器是否运行(返回 “imok”) echo ruok \| nc localhost 2181
srvr 显示服务器状态 echo srvr \| nc localhost 2181
stat 显示运行统计 echo stat \| nc localhost 2181
sync 同步文件系统 echo sync \| nc localhost 2181
wchc 列出所有会话的监听器(需白名单) echo wchc \| nc localhost 2181
wchp 列出持久监听器(需白名单) echo wchp \| nc localhost 2181
wchs 列出所有监听器 echo wchs \| nc localhost 2181
zkdump 转储内部状态 echo zkdump \| nc localhost 2181

为什么感觉ZooKeeper和Redis很像

ZooKeeper 和 Redis 的相似之处分析

ZooKeeper 和 Redis 都是分布式系统中的热门工具,用户常常觉得它们“很像”,主要是因为两者在功能重叠、架构设计和应用场景上存在不少共通点,尤其是在处理分布式数据一致性和协调时。以下从几个角度解释这种“相似感”的来源,同时也简要对比差异,帮助你更清晰地理解。

核心功能重叠:分布式锁与数据一致性

内存驻留与高性能设计

键值存储模型的共通

为什么“感觉像”,但实际有差异?

尽管相似,但 ZooKeeper 更侧重强一致性和协调(如领导者选举、配置同步),可靠性高但效率稍低;Redis 则强调高吞吐和简单,适合缓存,但锁的原子性不如 ZooKeeper 严谨(需额外处理过期时间)。 如果你的业务并发大、追求效率,用 Redis;需高可靠协调,用 ZooKeeper。

总之,这种“像”源于分布式生态的共性需求,但深入看,它们是互补的。

ZooKeeper实例之间怎么配合的

ZooKeeper集群结构

ZooKeeper并不是单机程序,而是通常部署成一个奇数节点的集群(ensemble),例如

节点名 IP 角色
zk1 192.168.1.1 Leader
zk2 192.168.1.2 Follower
zk3 192.168.1.3 Follower

ZooKeeper各实例之间的角色分工

  1. Leader(领导者)
  1. Followers(跟随者)
  1. Observer(观察者,可选)

实例之间的配合机制:ZAB协议

ZAB(ZooKeeper Atomic Broadcast)是ZooKeeper的核心算法,用于保证数据一致性与顺序性。

它大致分为两个阶段:

阶段1:选举(Leader Election)

当集群启动或Leader崩溃时:

  1. 所有节点进入 选举状态
  2. 各自广播 “我认为的最大事务ID(zxid)”
  3. 拥有最新zxid的节点胜出,成为Leader
  4. 其他节点成为Follower
  5. 所有节点同步到Leader的最新状态

这样确保集群重新达到一致状态

阶段2:原子广播(Atomic Broadcast)

Leader运行期间,写请求通过“两阶段提交”传播:

步骤 动作 说明
1 Leader接受写请求,生成提案(proposal) proposal包含事务号zxid
2 Leader向所有Follower广播提案ZAB保证顺序
3 Follower接受提案并写入本地事务日志,发送ACK 表示已接受
4 当Leader收到 超过半数节点ACK时,提交(commit) 写入内存并通知客户端成功
5 Leader向所有节点广播commit消息 所有节点提交同样数据

ZooKeeper的所有写操作都必须被 过半节点(>N/2)确认,这就是它保证一致性的根本。

一个简单的例子

假设3台机器:

客户端发送请求:

set /game/state "RUNNING"

流程:

  1. zk2收到请求(客户端连的它),转发给zk1
  2. zk1生成提案P1(zxid=0x0001)
  3. zk1-> zk2、zk3发送 proposal
  4. zk2、zk3写日志并回复ACK
  5. zk1收到2个ACK(超过半数),提交
  6. zk1-> zk2、zk3 广播commit
  7. zk2、zk3 也提交
  8. zk1向客户端返回 成功

即使zk3挂了,只要zk1 + zk2 半数仍在,写操作也能继续成功。

配置上是怎么体现的

每个节点的 zoo.cfg 通常如下:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181

# 集群成员定义
server.1=192.168.1.1:2888:3888
server.2=192.168.1.1:2888:3888
server.3=192.168.1.3:2888:3888

解释:

客户端如何配合集群

客户端可指定多个节点:

const client = zookeeper.createClient('192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181');

ZooKeeper客户端会自动:

ZooKeeper集群是一种“带选举机制的强一致复制状态机”,多个实例之间通过Zab协议协同工作,Leader负责事务顺序与提交,Followers负责同步与确认, 客户端无感知地连接任意节点,即可获得线性一致地数据视图。

工程实践

服务发现

下面一个基于Node.js+ZooKeeper的 服务发现(Service Discovery)实践示例。

  1. 服务实例自动注册
  2. 客户端实时发现可用服务节点(并自动感知新增、下线)

server.js

// server.js
const zookeeper = require('node-zookeeper-client');
const os = require("os");

const client = zookeeper.createClient("127.0.0.1:2181");
const SERVICE_PATH = "/services/myapp";

client.once('connected', () => {
    console.log("Connected to ZooKeeper");
    registerService();
});

client.connect();

function registerService() {
    client.exists(SERVICE_PATH, (err, stat) => {
        if (err) {
            return console.error("exists error:", err);
        }
        if (!stat) {
            client.mkdirp(SERVICE_PATH, (err) => {
                if (err) {
                    return console.error("mkdir error:", err);
                }
                createEphemeralNode();
            });
        } else {
            createEphemeralNode();
        }
    });
}

function createEphemeralNode() {
    const ip = getLocalIP();
    const port = 3000 + Math.floor(Math.random() * 1000);
    const data = JSON.stringify({ ip, port });
    const nodePath = `${SERVICE_PATH}/node-`;

    client.create(nodePath, Buffer.from(data), zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL, (err, path) => {
        if (err) return console.error("create error:", err);
        console.log(`Service registered at ${path} (${data})`);
    });
}

function getLocalIP() {
    const interfaces = os.networkInterfaces();
    for (const name in interfaces) {
        for (const iface of interfaces[name]) {
            if (iface.family === 'IPv4' && !iface.internal) return iface.address;
        }
    }
    return '127.0.0.1';
}

client.js

// client.js
const zookeeper = require('node-zookeeper-client');

const client = zookeeper.createClient('127.0.0.1:2181');
const SERVICE_PATH = '/services/myapp';

client.once('connected', () => {
    console.log('Connected to ZooKeeper');
    watchServices();
});

client.connect();

function watchServices() {
    client.getChildren(
        SERVICE_PATH,
        event => {
            console.log('Service change event:', event);
            watchServices(); // 重新监听
        },
        (err, children) => {
            if (err) {
                return console.error('getChildren error:', err);
            }
            if (!children.length) {
                console.log('No Service available');
                return;
            }

            console.log('Found Services: ', children);

            children.forEach(child => {
                const fullPath = `${SERVICE_PATH}/${child}`;
                client.getData(fullPath, (err, data) => {
                    if (!err && data) {
                        console.log(`${child}: ${data.toString()}`);
                    }
                });
            });
        }
    );
}

简单负载均衡

server.js

// server.js
const zookeeper = require('node-zookeeper-client');
const os = require("os");

const client = zookeeper.createClient("127.0.0.1:2181");
const SERVICE_PATH = "/services/myapp";

client.once('connected', () => {
    console.log("Connected to ZooKeeper");
    registerService();
});

client.connect();

function registerService() {
    client.exists(SERVICE_PATH, (err, stat) => {
        if (err) {
            return console.error("exists error:", err);
        }
        if (!stat) {
            client.mkdirp(SERVICE_PATH, (err) => {
                if (err) {
                    return console.error("mkdir error:", err);
                }
                createEphemeralNode();
            });
        } else {
            createEphemeralNode();
        }
    });
}

function createEphemeralNode() {
    const ip = getLocalIP();
    const port = 3000 + Math.floor(Math.random() * 1000);
    const data = JSON.stringify({ ip, port });
    const nodePath = `${SERVICE_PATH}/node-`;

    client.create(nodePath, Buffer.from(data), zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL, (err, path) => {
        if (err) return console.error("create error:", err);
        console.log(`Service registered at ${path} (${data})`);
    });
}

function getLocalIP() {
    const interfaces = os.networkInterfaces();
    for (const name in interfaces) {
        for (const iface of interfaces[name]) {
            if (iface.family === 'IPv4' && !iface.internal) return iface.address;
        }
    }
    return '127.0.0.1';
}

client.js 服务实现 + 负载均衡端

// client.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const SERVICE_PATH = '/services/myapp';
let services = [];
let currentIndex = 0; // 用于轮询选择

const client = zookeeper.createClient(ZK_SERVERS);

client.once('connected', () => {
    console.log('Connected to ZooKeeper');
    watchServices();
});

client.connect();

// 监听服务节点变化
function watchServices() {
    client.getChildren(
        SERVICE_PATH,
        event => {
            console.log('Service list changed:', event);
            watchServices();
        },
        (err, children) => {
            if (err) return console.error('getChildren error:', err);
            if (!children.length) {
                services = [];
                console.log('⚠️ No available services.');
                return;
            }

            // 获取每个节点的数据
            const temp = [];
            let pending = children.length;

            children.forEach(child => {
                const fullPath = `${SERVICE_PATH}/${child}`;
                client.getData(fullPath, (err, data) => {
                    if (!err && data) {
                        temp.push(JSON.parse(data.toString()));
                    }
                    if (--pending === 0) {
                        services = temp;
                        console.log('🔍 Current available services:', services);
                    }
                });
            });

        }
    );
}

// 监听轮询负载均衡
function getNextService() {
    if (services.length === 0) {
        console.log('❌ No service available.');
        return null;
    }
    const service = services[currentIndex % services.length];
    currentIndex++;
    return service;
}

// 模拟客户端调用
setInterval(() => {
    const service = getNextService();
    if (service) {
        console.log(`Sendding request to ${service.ip}:${service.port}`);
    }
}, 2000);

Leader选举

这是分布式系统里非常经典的场景,像 Kafka、HBase、etcd 都使用类似机制。

目标:实现一个简单的 Leader 选举算法

leader.js 实现Leader选举

const zookeeper = require('node-zookeeper-client');
const os = require('os');

const ZK_SERVERS = '127.0.0.1:2181';
const ELECTION_PATH = '/election';

const client = zookeeper.createClient(ZK_SERVERS);
let currentNodePath = null; // 当前节点路径

client.once('connected', () => {
    console.log('Connected to ZooKeeper');
    startElection();
});

client.connect();

// 创建选举节点
function startElection() {
    client.mkdirp(ELECTION_PATH, (err) => {
        if (err) {
            return console.error('mkdirp error:', err);
        }
        createElectionNode();
    });
}

function createElectionNode() {
    const nodePath = `${ELECTION_PATH}/node-`;
    const nodeData = Buffer.from(JSON.stringify({
        host: getLocalIP(),
        pid: process.pid,
    }));

    client.create(nodePath, nodeData, zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL, (err, path) => {
        if (err) return console.error('create node error:', err);

        currentNodePath = path;
        console.log(`Created election node: ${path}`);
        checkLeader();
    });
}

// 检查自己是否是 Leader
function checkLeader() {
    client.getChildren(ELECTION_PATH, event => {
        console.log('Election node changed:', event);
        checkLeader(); // 重新检测
    }, (err, children) => {
        if (err) return console.error('getChildren error:', err);

        // 排序节点
        const sorted = children.sort();
        const myNode = currentNodePath.split('/').pop();

        if (sorted[0] === myNode) {
            console.log('I am the Leader!');
        } else {
            const leaderNode = sorted[0];
            console.group(`I am a Follower, current Leader: ${leaderNode}`);

            // 监听比自己小的那个节点(前一个节点)
            const myIndex = sorted.indexOf(myNode);
            const watchTarget = `${ELECTION_PATH}/${sorted[myIndex - 1]}`;

            client.exists(watchTarget, event => {
                console.log(`Watched node deleted: ${event}`);
                checkLeader(); // 前一个节点删除后重新检查
            }, (err, stat) => {
                if (err) return console.error('exist error:', err);
                if (!stat) checkLeader(); // 前一个节点被删除时重新检查
            });
        }
    });
}

function getLocalIP() {
    const interfaces = os.networkInterfaces();
    for (const name in interfaces) {
        for (const iface of interfaces[name]) {
            if (iface.family === 'IPv4' && !iface.internal) return iface.address;
        }
    }
    return '127.0.0.1';
}

公平锁

ZooKeeper的分布式锁原理:

  1. 所有客户端在某个目录(如 /locks/mylock)下 __创建顺序节点(EPHEMERAL_SEQUENTIAL);
  2. ZooKeeper按顺序编号分配节点名,如
/locks/mylock/lock-0000000001
/locks/mylock/lock-0000000002
/locks/mylock/lock-0000000003
  1. 序号最小的节点即获得锁
  2. 其他节点监听前一个节点的删除事件
  3. 当持锁者释放锁(断开或删除节点)时,排队下一个节点获得锁
  4. ZooKeeper保证顺序一致性,从而实现公平性(FIFO)

fair_lock.js 分布式公平锁实现

const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_PATH = '/locks/mylock';

const client = zookeeper.createClient(ZK_SERVERS);
let myNodePath = null;
let lockReleased = false;
let lockAcquired = false; // ✅ 防止重复获得锁

client.once('connected', () => {
    console.log('Connected to ZooKeeper');
    acquireLock();
});

client.connect();

function acquireLock() {
    client.mkdirp(LOCK_PATH, (err) => {
        if (err) return console.error('mkdirp error:', err);
        createLockNode();
    });
}

function createLockNode() {
    const nodePath = `${LOCK_PATH}/lock-`;
    client.create(
        nodePath,
        Buffer.from('lock-node'),
        zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
        (err, path) => {
            if (err) return console.error('create error:', err);
            myNodePath = path;
            console.log(`🔒 Created lock node: ${path}`);
            checkLock();
        }
    );
}

function checkLock() {
    if (!myNodePath || lockReleased || lockAcquired) return;

    client.getChildren(
        LOCK_PATH,
        event => {
            if (!lockReleased && !lockAcquired) {
                console.log('📡 Lock node event:', event);
                checkLock();
            }
        },
        (err, children) => {
            if (err) return console.error('getChildren error:', err);
            if (!myNodePath || lockReleased || lockAcquired) return;

            const sorted = children.sort();
            const myNode = myNodePath.split('/').pop();
            const myIndex = sorted.indexOf(myNode);

            if (myIndex === 0) {
                lockAcquired = true; // ✅ 防止重复进入临界区
                console.log('✅ Lock acquired! I am the holder.');
                doCriticalSection();
            } else {
                const watchTarget = `${LOCK_PATH}/${sorted[myIndex - 1]}`;
                console.log(`⏳ Waiting for ${watchTarget} to be released...`);
                client.exists(
                    watchTarget,
                    event => {
                        if (!lockReleased && !lockAcquired && event.getType() ===
                            zookeeper.Event.NODE_DELETED) {
                            console.log('📢 Previous lock released, rechecking...');
                            checkLock();
                        }
                    },
                    (err, stat) => {
                        if (err) return console.error('exists error:', err);
                        if (!stat) checkLock(); // 前一个节点可能刚被删掉
                    }
                );
            }
        }
    );
}

function doCriticalSection() {
    console.log('🧠 Doing critical section...');
    setTimeout(() => {
        console.log('🧹 Releasing lock...');
        releaseLock();
    }, 5000);
}

function releaseLock() {
    if (!myNodePath || lockReleased) return;
    lockReleased = true;

    client.remove(myNodePath, (err) => {
        if (err) {
            if (err.code === zookeeper.Exception.NO_NODE) {
                console.warn('⚠️ Node already removed, safe to ignore.');
            } else {
                console.error('remove error:', err);
            }
        } else {
            console.log('🔓 Lock released!');
        }

        myNodePath = null;
        setTimeout(() => client.close(), 1000);
    });
}

乐观锁

  1. 原理说明(通俗理解):

在ZooKeeper里,每个节点的 dataVersion 都相当于“版本号”。每次修改数据时,ZooKeeper都会让版本号+1。

所谓“乐观锁(Optimistic Lock)”,就是:

我认为我读到的数据在我修改前不会被别人改过,修改时如果版本号不匹配,就说明“别人改了”,我放弃修改或重试。

  1. 示例场景

我们创建一个节点 /optimistic/item,里面存一个库存数,比如“stock=10”。

两个客户端同时读取该节点,然后尝试修改它:

ZooKeeper会检查版本号,如果版本不匹配,修改失败(相当于CAS:Compare And Swap)

optimistic_lock.js

// optimistic_lock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const PATH = '/optimistic/item';
const client = zookeeper.createClient(ZK_SERVERS);

client.once('connected', () => {
    console.log('✅ Connected to ZooKeeper.');
    // client.addAuthInfo('digest', Buffer.from('admin:admin123'));
    initNode(() => simulateClients());
});

client.connect();

// 初始化节点
function initNode(callback) {
    client.exists(PATH, (err, stat) => {
        if (err) return console.error('exists error:', err);
        if (stat) return callback();

        client.mkdirp('/optimistic', (err) => {
            if (err) return console.error('mkdirp error:', err);

            client.create(
                PATH,
                Buffer.from('stock=10'),
                zookeeper.CreateMode.PERSISTENT,
                (err) => {
                    if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
                        return console.error('create error:', err);
                    }
                    console.log('🧱 Initialized node with stock=10');
                    callback();
                }
            );
        });
    });
}

// 模拟两个客户端同时修改库存
function simulateClients() {
    readAndUpdate('Client A', -1, () => {
        readAndUpdate('Client B', -2, () => {
            // 全部结束
            setTimeout(() => client.close(), 1000);
        });
    });
}

// 读取并尝试更新数据(带版本检查)
function readAndUpdate(clientName, delta, callback) {
    client.getData(PATH, (err, data, stat) => {
        if (err) return console.error(`${clientName} getData error:`, err);

        const stockStr = data.toString('utf8');
        const stock = parseInt(stockStr.split('=')[1]);
        const newStock = stock + delta;

        console.log(`${clientName} reads ${stockStr}, version=${stat.version}`);
        console.log(`${clientName} wants to update to stock=${newStock}`);

        // 尝试更新(使用当前 version)
        const newData = Buffer.from(`stock=${newStock}`);
        client.setData(PATH, newData, stat.version, (err, newStat) => {
            if (err) {
                if (err.getCode && err.getCode() === zookeeper.Exception.BADVERSION) {
                    console.error(`❌ ${clientName} update failed (version conflict).`);
                } else {
                    console.error(`${clientName} setData error:`, err);
                }
            } else {
                console.log(`✅ ${clientName} successfully updated! newVersion=${newStat.version}`);
            }
            callback();
        });
    });
}

悲观锁

  1. 概念

在ZooKeeper里,“悲观锁”常用 临时顺序节点(EPHEMERAL_SEQUENTIAL) 实现。原理和“公平锁”几乎一样,只是:

“悲观锁会先阻止其他客户端进入临界区,等自己处理完再释放。”

  1. 设计场景

假设我们有一个共享资源节点 /pessimistic/resource,多个客户端要修改它,但不能同时操作。

我们使用 ZooKeeper 的锁节点 /locks/pessimistic:

  1. pessimistic_lock.js
// pessimistic_lock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_PATH = '/locks/pessimistic';
const RESOURCE_PATH = '/pessimistic/resource';

const client = zookeeper.createClient(ZK_SERVERS);
let myNodePath = null;

client.once('connected', () => {
    console.log('✅ Connected to ZooKeeper.');
    client.addAuthInfo('digest', Buffer.from('admin:admin123'));
    ensurePaths(() => acquireLock());
});

client.connect();

// 确保锁目录和资源存在
function ensurePaths(callback) {
    client.mkdirp(LOCK_PATH, (err) => {
        if (err) return console.error('mkdirp lock error:', err);

        client.exists(RESOURCE_PATH, (err, stat) => {
            if (err) return console.error('exists error:', err);
            if (stat) return callback();

            client.mkdirp('/pessimistic', (err) => {
                if (err) return console.error('mkdirp resource error:', err);
                client.create(RESOURCE_PATH,
                    Buffer.from('value=0'),
                    zookeeper.CreateMode.PERSISTENT,
                    (err) => {
                        if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS)
                            return console.error('create resource error:', err);
                        console.log('🧱 Initialized resource: value=0');
                        callback();
                    });
            });
        });
    });
}

// 获取锁
function acquireLock() {
    const nodePrefix = `${LOCK_PATH}/lock-`;
    client.create(nodePrefix,
        Buffer.from('lock-node'),
        zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
        (err, path) => {
            if (err) return console.error('create lock node error:', err);
            myNodePath = path;
            console.log(`🔒 Created lock node: ${path}`);
            checkLock();
        });
}

// 检查自己是否获得锁
function checkLock() {
    client.getChildren(LOCK_PATH, (err, children) => {
        if (err) return console.error('getChildren error:', err);
        const sorted = children.sort();
        const myNode = myNodePath.split('/').pop();
        const myIndex = sorted.indexOf(myNode);

        if (myIndex === 0) {
            console.log('✅ Lock acquired! Performing exclusive operation...');
            doCriticalSection();
        } else {
            const watchTarget = `${LOCK_PATH}/${sorted[myIndex - 1]}`;
            console.log(`⏳ Waiting for ${watchTarget} to be released...`);
            client.exists(watchTarget, event => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) {
                    console.log('📢 Previous lock released, rechecking...');
                    checkLock();
                }
            }, (err, stat) => {
                if (err) return console.error('exists error:', err);
                if (!stat) checkLock();
            });
        }
    });
}

// 执行临界区操作
function doCriticalSection() {
    client.getData(RESOURCE_PATH, (err, data, stat) => {
        if (err) return console.error('getData error:', err);

        let value = parseInt(data.toString('utf8').split('=')[1]);
        console.log(`🔍 Read value=${value}`);

        // 模拟操作(加1)
        value++;
        const newData = Buffer.from(`value=${value}`);

        client.setData(RESOURCE_PATH, newData, stat.version, (err, newStat) => {
            if (err) return console.error('setData error:', err);
            console.log(`🧠 Updated resource to value=${value} (version ${newStat.version})`);

            // 模拟任务耗时
            setTimeout(() => {
                releaseLock();
            }, 3000);
        });
    });
}

// 释放锁
function releaseLock() {
    if (!myNodePath) return;
    console.log('🧹 Releasing lock...');
    client.remove(myNodePath, (err) => {
        if (err) {
            if (err.getCode && err.getCode() === zookeeper.Exception.NO_NODE) {
                console.warn('⚠️ Lock node already removed.');
            } else {
                console.error('remove error:', err);
            }
        } else {
            console.log('🔓 Lock released!');
        }
        client.close();
    });
}

可重入锁

  1. 可重入锁原理说明

在分布式系统中,“可重入锁(Reentrant Lock)” 允许同一个线程(或客户端)多次获取同一把锁,而不会被自己阻塞。

实现关键点:

特性 说明
可重入性 同一客户端多次加锁不会阻塞自己
基于Ephemeral节点 ZooKeeper自动清理掉死连接
分布式安全性 即使进程崩溃,锁也能释放
客户端唯一标识 通过CLIENT_ID区分不同实例
  1. reentrant_lock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_ROOT = '/reentrant_lock';
const LOCK_NAME = 'mylock';
const CLIENT_ID = 'client-A';  // 模拟当前客户端唯一ID

const client = zookeeper.createClient(ZK_SERVERS);
client.connect();

let lockPath = null;
let reentrantCount = 0;

client.once('connected', async () => {
    console.log('✅ Connected to ZooKeeper');

    await ensurePath(LOCK_ROOT);
    await acquireLock();
    await acquireLock(); // 再次获取(测试可重入)

    setTimeout(async () => {
        await releaseLock();
        await releaseLock(); // 第二次释放才真正删除锁节点
        process.exit(0);
    }, 4000);
});

/**
 * 确保根路径存在
 */
function ensurePath(path) {
    return new Promise((resolve) => {
        client.exists(path, (err, stat) => {
            if (stat) return resolve();
            client.create(path, (err2) => {
                if (err2 && err2.getCode() !== zookeeper.Exception.NODE_EXISTS) {
                    console.error('❌ create path error:', err2);
                }
                resolve();
            });
        });
    });
}

/**
 * 尝试获取锁
 */
async function acquireLock() {
    if (lockPath) {
        reentrantCount++;
        console.log(`🔁 Re-entered lock (${reentrantCount} times)`);
        return;
    }

    const path = `${LOCK_ROOT}/${LOCK_NAME}`;
    try {
        lockPath = await createEphemeralNode(path, CLIENT_ID);
        reentrantCount = 1;
        console.log(`🔒 Lock acquired by ${CLIENT_ID}`);
    } catch (err) {
        if (err.code === zookeeper.Exception.NODE_EXISTS) {
            console.log(`⏳ Lock held by others, waiting...`);
            await waitForLock(path);
            return acquireLock(); // 重试
        } else {
            console.error('❌ acquire lock error:', err);
        }
    }
}

/**
 * 释放锁
 */
async function releaseLock() {
    if (!lockPath) return;
    reentrantCount--;
    console.log(`🧹 release lock called (${reentrantCount} remaining)`);

    if (reentrantCount === 0) {
        await deleteNode(lockPath);
        lockPath = null;
        console.log('🔓 Lock fully released');
    }
}

/**
 * 创建临时节点
 */
function createEphemeralNode(path, data) {
    return new Promise((resolve, reject) => {
        client.create(
            path,
            Buffer.from(data),
            zookeeper.CreateMode.EPHEMERAL,
            (err, createdPath) => {
                if (err) return reject(err);
                resolve(createdPath);
            }
        );
    });
}

/**
 * 删除节点
 */
function deleteNode(path) {
    return new Promise((resolve) => {
        client.remove(path, (err) => {
            if (err && err.getCode() !== zookeeper.Exception.NO_NODE) {
                console.error('remove error:', err);
            }
            resolve();
        });
    });
}

/**
 * 等待锁释放(监听删除事件)
 */
function waitForLock(path) {
    return new Promise((resolve) => {
        const watcher = (event) => {
            if (event.getType() === zookeeper.Event.NODE_DELETED) {
                console.log('📢 Lock node deleted, can retry.');
                resolve();
            } else {
                client.exists(path, watcher, () => { });
            }
        };
        client.exists(path, watcher, () => { });
    });
}

读写锁

  1. 读写锁(Read-Write Lock)原理

在分布式系统中,读写锁允许:

ZooKeeper实现思路

|类型|节点命名规则|可并发情况| |读锁| /locks/read-xxxxxx |可以与其他读锁共存| |写锁| /locks/write-xxxxxx | 需要等待前面所有锁释放 |

实现规则:

特性 说明
读共享 多个读锁可同时持有
写独占 写锁必须等待前面所有锁释放
FIFO顺序 使用EPHEMERAL_SEQUENTIAL实现公平排队
自动释放 客户端断开时锁节点自动清除
  1. rw_lock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_ROOT = '/rw_locks';
const client = zookeeper.createClient(ZK_SERVERS);
client.connect();

client.once('connected', async () => {
    console.log('✅ Connected to ZooKeeper');
    await ensurePath(LOCK_ROOT);

    // 测试:一个读锁 + 一个写锁
    simulateClient('Reader-1', 'read');
    setTimeout(() => simulateClient('Writer-1', 'write'), 1000);
    setTimeout(() => simulateClient('Reader-2', 'read'), 2000);
});

async function ensurePath(path) {
    return new Promise((resolve) => {
        client.exists(path, (err, stat) => {
            if (stat) return resolve();
            client.create(path, (err2) => {
                if (err2 && err2.getCode() !== zookeeper.Exception.NODE_EXISTS) {
                    console.error('❌ create path error:', err2);
                }
                resolve();
            });
        });
    });
}

/**
 * 模拟不同客户端
 */
async function simulateClient(id, type) {
    const lock = new ReadWriteLock(client, LOCK_ROOT, id, type);
    await lock.acquire();
    console.log(`🧠 [${id}] doing ${type} operation...`);
    setTimeout(async () => {
        await lock.release();
        console.log(`🔓 [${id}] released ${type} lock`);
    }, 3000);
}

/**
 * 读写锁类
 */
class ReadWriteLock {
    constructor(client, root, id, type) {
        this.client = client;
        this.root = root;
        this.id = id;
        this.type = type; // 'read' | 'write'
        this.nodePath = null;
    }

    async acquire() {
        this.nodePath = await this.createLockNode();
        console.log(`🔒 [${this.id}] requested ${this.type} lock: ${this.nodePath}`);

        while (true) {
            const children = await this.getChildren();
            const sorted = children.sort();
            const myNode = this.nodePath.split('/').pop();
            const myIndex = sorted.indexOf(myNode);

            if (this.type === 'write') {
                // 写锁需等待前面所有节点都释放
                if (myIndex === 0) break;
            } else {
                // 读锁:只需等待前面所有“写锁”
                const beforeMe = sorted.slice(0, myIndex);
                const hasWriterBefore = beforeMe.some(name => name.startsWith('write-'));
                if (!hasWriterBefore) break;
            }

            const watchNode = `${this.root}/${sorted[myIndex - 1]}`;
            console.log(`⏳ [${this.id}] waiting for ${watchNode}...`);
            await this.waitForDelete(watchNode);
        }

        console.log(`✅ [${this.id}] acquired ${this.type} lock`);
    }

    async release() {
        if (this.nodePath) {
            await new Promise((resolve) => {
                this.client.remove(this.nodePath, (err) => {
                    if (err && err.getCode() !== zookeeper.Exception.NO_NODE) {
                        console.error('remove error:', err);
                    }
                    resolve();
                });
            });
            this.nodePath = null;
        }
    }

    getChildren() {
        return new Promise((resolve, reject) => {
            this.client.getChildren(this.root, (err, children) => {
                if (err) reject(err);
                else resolve(children);
            });
        });
    }

    createLockNode() {
        const nodePrefix = `${this.root}/${this.type}-`;
        return new Promise((resolve, reject) => {
            this.client.create(
                nodePrefix,
                Buffer.from(this.id),
                zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
                (err, path) => {
                    if (err) reject(err);
                    else resolve(path);
                }
            );
        });
    }

    waitForDelete(path) {
        return new Promise((resolve) => {
            const watcher = (event) => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) {
                    resolve();
                } else {
                    this.client.exists(path, watcher, () => { });
                }
            };
            this.client.exists(path, watcher, () => { });
        });
    }
}

超时锁

  1. 什么是“超时锁”

分布式系统中,如果某个客户端:

这时,其他客户端会一直卡住等待。所以,我们希望加上一个 超时机制(Timeout):

超时锁 = ZooKeeper分布式锁 + 本地计时器(过期自动释放)

  1. 实现思路
特性 说明
公平性 按顺序号排队,先来先得
自动释放 本地超时+ZooKeeper session失效 双保险
容错性 即使节点断连,ZooKeeper会自动删除临时节点
安全性 不会发生死锁或永久等待
  1. timeout_lock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_PATH = '/timeout_locks/mylock';
const client = zookeeper.createClient(ZK_SERVERS);

client.connect();

client.once('connected', () => {
    console.log('✅ Connected to ZooKeeper');
    ensurePath(LOCK_PATH, () => {
        simulateClient('Client-A', 5000);
        setTimeout(() => simulateClient('Client-B', 1000), 1000);
    });
});

// 确保路径存在
function ensurePath(path, cb) {
    client.mkdirp(path, (err) => {
        if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS)
            console.error('mkdirp error:', err);
        cb();
    });
}

// 模拟不同客户端
function simulateClient(id, timeoutMs) {
    const lock = new TimeoutLock(client, LOCK_PATH, id, timeoutMs);
    lock.acquire()
        .then(() => {
            console.log(`🧠 [${id}] doing critical work...`);
            // 模拟长任务
            setTimeout(() => {
                lock.release();
            }, 3000);
        })
        .catch(err => console.error(`[${id}] failed to acquire:`, err));
}

/**
 * 超时锁类
 */
class TimeoutLock {
    constructor(client, lockPath, id, timeoutMs = 10000) {
        this.client = client;
        this.lockPath = lockPath;
        this.id = id;
        this.nodePath = null;
        this.timeoutMs = timeoutMs;
        this.timer = null;
    }

    async acquire() {
        this.nodePath = await this.createLockNode();
        console.log(`🔒 [${this.id}] requested lock: ${this.nodePath}`);

        while (true) {
            const children = await this.getChildren();
            const sorted = children.sort();
            const myNode = this.nodePath.split('/').pop();
            const myIndex = sorted.indexOf(myNode);

            if (myIndex === 0) {
                console.log(`✅ [${this.id}] acquired lock`);
                this.startTimeout();
                return;
            }

            const prevNode = `${this.lockPath}/${sorted[myIndex - 1]}`;
            console.log(`⏳ [${this.id}] waiting for ${prevNode}...`);
            await this.waitForDelete(prevNode);
        }
    }

    // 启动超时定时器
    startTimeout() {
        this.timer = setTimeout(() => {
            console.log(`⏰ [${this.id}] lock timeout (${this.timeoutMs}ms)! Releasing...`);
            this.release();
        }, this.timeoutMs);
    }

    async release() {
        if (this.timer) clearTimeout(this.timer);
        if (this.nodePath) {
            await new Promise((resolve) => {
                this.client.remove(this.nodePath, (err) => {
                    if (err && err.getCode() !== zookeeper.Exception.NO_NODE) {
                        console.error(`[${this.id}] remove error:`, err);
                    } else {
                        console.log(`🔓 [${this.id}] released lock`);
                    }
                    resolve();
                });
            });
            this.nodePath = null;
        }
    }

    createLockNode() {
        const prefix = `${this.lockPath}/lock-`;
        return new Promise((resolve, reject) => {
            this.client.create(
                prefix,
                Buffer.from(this.id),
                zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
                (err, path) => {
                    if (err) reject(err);
                    else resolve(path);
                }
            );
        });
    }

    getChildren() {
        return new Promise((resolve, reject) => {
            this.client.getChildren(this.lockPath, (err, children) => {
                if (err) reject(err);
                else resolve(children);
            });
        });
    }

    waitForDelete(path) {
        return new Promise((resolve) => {
            const watcher = (event) => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) {
                    resolve();
                } else {
                    this.client.exists(path, watcher, () => { });
                }
            };
            this.client.exists(path, watcher, () => { });
        });
    }
}

来有类似的 支持自动续期”的 可续租超时锁(Renewable Timeout Lock)

尝试锁

  1. 什么是“尝试锁 TryLock”

“尝试锁”与普通锁的区别在于:

类型 行为
普通锁 获取不到会一直阻塞等待
尝试锁TryLock 尝试获取,如果锁被占用,立即返回失败或等待指定时间后放弃

这非常适合做非阻塞操作,比如:某个任务只要资源没空闲就跳过,而不是卡在哪里。

  1. 实现思路

基于ZooKeeper的锁机制:

特性 说明
✅ 非阻塞 无需等待,失败立即返回
⏱ 可配置超时 可指定等待时长,超时放弃
🔒 安全 使用临时节点,断线自动释放
🧩 应用场景 秒杀库存、任务去重、单次调度防重入
  1. try_lock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_PATH = '/trylock/mylock';
const client = zookeeper.createClient(ZK_SERVERS);

client.once('connected', async () => {
    console.log('✅ Connected to ZooKeeper');
    await ensurePath(LOCK_PATH);

    // 启动两个客户端模拟
    tryLockClient('Client-A', 0); // 立即尝试
    setTimeout(() => tryLockClient('Client-B', 3000), 1000); // 等待重试
});

client.connect();

// 确保路径存在
function ensurePath(path) {
    return new Promise((resolve) => {
        client.mkdirp(path, (err) => {
            if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
                console.error('mkdirp error:', err);
            }
            resolve();
        });
    });
}

// 模拟客户端尝试锁
async function tryLockClient(id, retryDelay = 0) {
    const lock = new TryLock(client, LOCK_PATH, id);
    const success = await lock.tryAcquire(2000); // 2秒超时模式
    if (success) {
        console.log(`✅ [${id}] got lock! Doing work...`);
        setTimeout(() => lock.release(), 3000);
    } else {
        console.log(`❌ [${id}] failed to acquire lock.`);
        if (retryDelay > 0) {
            console.log(`🔁 [${id}] will retry after ${retryDelay}ms`);
            setTimeout(() => tryLockClient(id), retryDelay);
        }
    }
}

/**
 * 尝试锁类
 */
class TryLock {
    constructor(client, basePath, id) {
        this.client = client;
        this.lockNode = `${basePath}/lock-node`;
        this.id = id;
    }

    tryAcquire(timeoutMs = 0) {
        return new Promise((resolve) => {
            const tryCreate = () => {
                this.client.create(
                    this.lockNode,
                    Buffer.from(this.id),
                    zookeeper.CreateMode.EPHEMERAL,
                    (err) => {
                        if (!err) {
                            resolve(true); // ✅ 成功获得锁
                        } else if (err.getCode() === zookeeper.Exception.NODE_EXISTS) {
                            if (timeoutMs > 0) {
                                // 等待锁释放
                                this.waitForLock(timeoutMs).then(resolve);
                            } else {
                                resolve(false); // ❌ 立即失败
                            }
                        } else {
                            console.error(`[${this.id}] unexpected error:`, err);
                            resolve(false);
                        }
                    }
                );
            };

            tryCreate();
        });
    }

    waitForLock(timeoutMs) {
        return new Promise((resolve) => {
            const timer = setTimeout(() => {
                console.log(`⏰ [${this.id}] timeout waiting for lock`);
                resolve(false);
            }, timeoutMs);

            const watch = (event) => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) {
                    clearTimeout(timer);
                    this.tryAcquire(0).then(resolve);
                }
            };

            this.client.exists(this.lockNode, watch, () => { });
        });
    }

    release() {
        this.client.remove(this.lockNode, (err) => {
            if (err && err.getCode() !== zookeeper.Exception.NO_NODE) {
                console.error(`[${this.id}] remove error:`, err);
            } else {
                console.log(`🔓 [${this.id}] released lock`);
            }
        });
    }
}

还有,带重试次数上限的 TryLock(可重试非阻塞锁)

分段锁

  1. 分段锁是什么

分段锁(Segment Lock)是一种用于提高并发性能的锁机制,它将数据划分为多个段(Segment),每个段有自己独立的锁。这样不同段上的操作可以并行执行,而同一段仍然是串行的。

例如:

  1. ZooKeeper实现思路

我们利用ZooKeeper的 临时顺序节点(Ephemeral Sequential Node)实现:

特性 说明
锁粒度 每个段独立
并发性能 显著提升
典型应用 用户缓存更新、库存更新
ZNode结构 /lock-segment/segment-N/lock-xxxx
  1. segment_lock.js
// segment_lock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const BASE_PATH = '/locks-segment';
const SEGMENT_COUNT = 8; // 分成8段
const client = zookeeper.createClient(ZK_SERVERS);

// 防止 Node.js 警告:监听器太多
client.setMaxListeners(1000);

client.once('connected', async () => {
    console.log('Connected to ZooKeeper');
    await initSegments();
    // 模拟多个客户端尝试加锁
    simulateClients();
});

client.connect();

async function initSegments() {
    for (let i = 0; i < SEGMENT_COUNT; i++) {
        await mkdirp(`${BASE_PATH}/segment-${i}`);
    }
}

// 确保路径存在
function mkdirp(path) {
    return new Promise((resolve, reject) => {
        client.mkdirp(path, (err) => (err ? reject(err) : resolve()));
    });
}

// 等待节点删除(只监听一次)
function waitForDelete(path) {
    return new Promise((resolve) => {
        client.exists(
            path,
            (event) => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) {
                    resolve();
                }
            },
            (err, stat) => {
                if (!stat) resolve();
            }
        );
    });
}

// 尝试加锁
async function acquireSegmentLock(segmentId, resourceId) {
    const segmentPath = `${BASE_PATH}/segment-${segmentId}`;
    const nodePath = await new Promise((resolve, reject) => {
        client.create(
            `${segmentPath}/lock-`,
            Buffer.from(`lock-${resourceId}`),
            zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
            (err, path) => (err ? reject(err) : resolve(path))
        );
    });

    const nodeName = nodePath.split('/').pop();
    await tryAcquire(segmentPath, nodeName, segmentId, resourceId);
    return nodePath;
}

async function tryAcquire(segmentPath, nodeName, segmentId, resourceId) {
    const children = await new Promise((resolve, reject) => {
        client.getChildren(segmentPath, (err, ch) => (err ? reject(err) : resolve(ch)));
    });

    children.sort();
    const index = children.indexOf(nodeName);

    if (index === 0) {
        console.log(`✅ [${resourceId}] Got lock on segment-${segmentId}`);
        doWork(segmentPath, nodeName, segmentId, resourceId);
    } else {
        const prevNode = children[index - 1];
        console.log(`⏳ [${resourceId}] Waiting for ${prevNode} on segment-${segmentId}...`);
        await waitForDelete(`${segmentPath}/${prevNode}`);
        await tryAcquire(segmentPath, nodeName, segmentId, resourceId);
    }
}

// 模拟业务逻辑
async function doWork(segmentPath, nodeName, segmentId, resourceId) {
    console.log(`🧠 [${resourceId}] Doing work on segment-${segmentId}...`);
    await sleep(3000 + Math.random() * 2000);

    console.log(`🧹 [${resourceId}] Releasing lock on segment-${segmentId}...`);
    await new Promise((resolve) => {
        client.remove(`${segmentPath}/${nodeName}`, (err) => {
            if (err) console.error('remove error:', err);
            else console.log(`🔓 [${resourceId}] Released lock ${segmentPath}/${nodeName}`);
            resolve();
        });
    });
}

// 延迟函数
function sleep(ms) {
    return new Promise((r) => setTimeout(r, ms));
}

// 模拟多个客户端争抢不同 segment
function simulateClients() {
    const tasks = [101, 202, 303, 404, 505, 606, 707, 808];
    for (const id of tasks) {
        const segmentId = Math.floor(Math.random() * SEGMENT_COUNT);
        acquireSegmentLock(segmentId, id).catch(console.error);
    }
}
root@ser745692301841:/dev_dir/zookeeper-tutorial/js# node  segment_lock.js
Connected to ZooKeeper
 [101] Got lock on segment-4
🧠 [101] Doing work on segment-4...
 [202] Got lock on segment-3
🧠 [202] Doing work on segment-3...
 [303] Got lock on segment-2
🧠 [303] Doing work on segment-2...
 [404] Got lock on segment-7
🧠 [404] Doing work on segment-7...
 [505] Got lock on segment-5
🧠 [505] Doing work on segment-5...
 [606] Waiting for lock-0000000003 on segment-2...
 [707] Waiting for lock-0000000003 on segment-4...
 [808] Got lock on segment-6
🧠 [808] Doing work on segment-6...
🧹 [505] Releasing lock on segment-5...
🔓 [505] Released lock /locks-segment/segment-5/lock-0000000003
🧹 [101] Releasing lock on segment-4...
🔓 [101] Released lock /locks-segment/segment-4/lock-0000000003
 [707] Got lock on segment-4
🧠 [707] Doing work on segment-4...
🧹 [303] Releasing lock on segment-2...
🔓 [303] Released lock /locks-segment/segment-2/lock-0000000003
 [606] Got lock on segment-2
🧠 [606] Doing work on segment-2...
🧹 [808] Releasing lock on segment-6...
🔓 [808] Released lock /locks-segment/segment-6/lock-0000000000
🧹 [404] Releasing lock on segment-7...
🔓 [404] Released lock /locks-segment/segment-7/lock-0000000003
🧹 [202] Releasing lock on segment-3...
🔓 [202] Released lock /locks-segment/segment-3/lock-0000000000
🧹 [707] Releasing lock on segment-4...
🔓 [707] Released lock /locks-segment/segment-4/lock-0000000004
🧹 [606] Releasing lock on segment-2...
🔓 [606] Released lock /locks-segment/segment-2/lock-0000000004

多资源锁

这个版本适合用在 需要同时锁定多个资源 的场景,比如游戏后端中两个玩家的交易、多个库存项的更新等。

  1. 什么是多资源锁

在分布式系统中,有时候你要同时锁住多个资源(例如 itemA 和 itemB):

我们可以利用 ZooKeeper 的临时顺序节点 机制来实现公平的多资源锁。

功能 实现说明
✅ 多资源加锁 一次性锁多个资源,确保顺序一致
✅ 死锁避免 对资源名称排序,保证锁申请顺序相同
✅ 公平性 使用 EPHEMERAL_SEQUENTIAL
✅ 自动释放 会话断开自动清除节点
✅ 无内存泄漏 使用一次性 exists 监听
✅ 可扩展 可以在游戏、交易、库存更新等场景中直接使用
  1. multi_resource_lock.js
// multi_resource_lock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_ROOT = '/multi-locks';

const client = zookeeper.createClient(ZK_SERVERS);
client.setMaxListeners(1000);

client.once('connected', async () => {
    console.log('✅ Connected to ZooKeeper');
    await mkdirp(LOCK_ROOT);
    simulateClients();
});

client.connect();

// 辅助函数
function mkdirp(path) {
    return new Promise((resolve, reject) => {
        client.mkdirp(path, (err) => (err ? reject(err) : resolve()));
    });
}

function sleep(ms) {
    return new Promise((r) => setTimeout(r, ms));
}

/**
 * @param {string[]} resources 要加锁的资源名数组,例如 ['user:101', 'user:202']
 * @param {string} clientId 客户端ID(打印标识)
 */
async function acquireMultiLock(resources, clientId) {
    // 按字典顺序排序,防止死锁
    const sortedResources = [...resources].sort();
    const lockPaths = [];

    try {
        for (const res of sortedResources) {
            const path = await acquireSingleLock(res, clientId);
            lockPaths.push(path);
        }

        console.log(`🔒 [${clientId}] Acquired locks on: ${sortedResources.join(', ')}`);
        await doWork(clientId, sortedResources);

    } finally {
        // 全部释放
        for (const path of lockPaths.reverse()) {
            await releaseLock(path, clientId);
        }
    }
}

/** 获取单个资源锁 */
async function acquireSingleLock(resource, clientId) {
    const resPath = `${LOCK_ROOT}/${resource}`;
    await mkdirp(resPath);

    const nodePath = await new Promise((resolve, reject) => {
        client.create(
            `${resPath}/lock-`,
            Buffer.from(clientId),
            zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
            (err, path) => (err ? reject(err) : resolve(path))
        );
    });

    const nodeName = nodePath.split('/').pop();
    await tryAcquire(resPath, nodeName, resource, clientId);
    return nodePath;
}

/** 检查是否获得锁,否则监听前一个节点 */
async function tryAcquire(resPath, nodeName, resource, clientId) {
    const children = await new Promise((resolve, reject) => {
        client.getChildren(resPath, (err, list) => (err ? reject(err) : resolve(list)));
    });

    children.sort();
    const index = children.indexOf(nodeName);

    if (index === 0) {
        console.log(`✅ [${clientId}] Got lock on ${resource}`);
        return;
    } else {
        const prevNode = children[index - 1];
        const prevPath = `${resPath}/${prevNode}`;
        await waitForDelete(prevPath);
        await tryAcquire(resPath, nodeName, resource, clientId);
    }
}

/** 等待前一个节点删除 */
function waitForDelete(path) {
    return new Promise((resolve) => {
        client.exists(
            path,
            (event) => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) resolve();
            },
            (err, stat) => {
                if (!stat) resolve();
            }
        );
    });
}

/** 释放锁 */
async function releaseLock(nodePath, clientId) {
    return new Promise((resolve) => {
        client.remove(nodePath, (err) => {
            if (err) console.error(`❌ [${clientId}] Release error:`, err);
            else console.log(`🔓 [${clientId}] Released ${nodePath}`);
            resolve();
        });
    });
}

/** 模拟临界区任务 */
async function doWork(clientId, resources) {
    console.log(`🧠 [${clientId}] Working on ${resources.join(', ')}...`);
    await sleep(2000 + Math.random() * 2000);
}

/** 模拟多个客户端 */
function simulateClients() {
    const tasks = [
        { id: 'C1', res: ['user:101', 'item:5'] },
        { id: 'C2', res: ['item:5'] },
        { id: 'C3', res: ['user:101'] },
        { id: 'C4', res: ['item:6', 'user:101'] },
    ];

    for (const t of tasks) {
        acquireMultiLock(t.res, t.id).catch(console.error);
    }
}
客户端 ID 想要加的锁资源 说明
C1 ['user:101', 'item:5'] 同时操作用户101和物品5(例如购买、交易操作)
C2 ['item:5'] 只操作物品5(例如库存更新)
C3 ['user:101'] 只操作用户101(例如用户资料更新)
C4 ['item:6', 'user:101'] 同时操作物品6和用户101(例如新购买)

假设 ZooKeeper 目录 /multi-locks 结构如下:

/multi-locks/
 ├── user:101/
 ├── item:5/
 └── item:6/

每个客户端都会:

举个运行时例子

执行顺序可能是:

 C2 先拿到 item:5
 C1 等待 item:5
 C3 拿到 user:101
...
🔓 C2 释放 item:5
 C1 拿到 item:5
 C1 等待 user:101(因为 C3 在用)
...
🔓 C3 释放 user:101
 C1 拿到 user:101
🧠 C1 开始工作

整个过程保证:

功能 技术
锁节点存放 /multi-locks/{resource}/lock-*
锁唯一标识 EPHEMERAL_SEQUENTIAL 节点
死锁避免 按字典序加锁
锁失效清理 客户端断开 ZooKeeper 自动删除
并发安全 等待上一个节点删除后重试

还有 “锁超时自动释放 + 重试机制” 的版本

公平可重入读写锁

你现在已经玩到 ZooKeeper 锁的“高级关卡”了 —— 我们要实现一个 “公平 + 可重入 + 读写锁(Fair Reentrant ReadWrite Lock)” 的 Node.js 实践 Demo。

特性 含义
公平 (Fair) 锁的获取顺序严格按照请求顺序(节点序号)执行,谁先排队谁先执行。
可重入 (Reentrant) 同一个客户端(同一个 clientId)如果已经获得锁,可以再次进入,不会阻塞自己。
读写锁 (ReadWrite Lock) 允许多个读者共享锁,但写者必须独占;写锁优先于后续读锁。

实现原理(基于ZooKeeper临时顺序节点)

/rw-locks 下为每个资源维护子节点:

/rw-locks/myResource/
  ├── read-0000000001
  ├── read-0000000002
  ├── write-0000000003
  └── read-0000000004

规则:

fair_reentrant_rwlock.js

// fair_reentrant_rwlock.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const LOCK_ROOT = '/rw-locks';
const client = zookeeper.createClient(ZK_SERVERS);
client.setMaxListeners(1000);

const heldLocks = new Map(); // 记录 clientId 当前持有的锁 {clientId: Set(resources)}

client.once('connected', async () => {
    console.log('✅ Connected to ZooKeeper');
    await mkdirp(LOCK_ROOT);
    simulateClients();
});
client.connect();

function mkdirp(path) {
    return new Promise((resolve, reject) => {
        client.mkdirp(path, (err) => (err ? reject(err) : resolve()));
    });
}

function sleep(ms) {
    return new Promise((r) => setTimeout(r, ms));
}

/** 申请读或写锁 */
async function acquireLock(resource, clientId, mode) {
    const resPath = `${LOCK_ROOT}/${resource}`;
    await mkdirp(resPath);

    // ✅ 可重入支持
    if (heldLocks.get(clientId)?.has(resource)) {
        console.log(`♻️ [${clientId}] Re-entered ${mode} lock on ${resource}`);
        return { resPath, nodePath: null };
    }

    const nodePath = await new Promise((resolve, reject) => {
        client.create(
            `${resPath}/${mode}-`,
            Buffer.from(clientId),
            zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
            (err, path) => (err ? reject(err) : resolve(path))
        );
    });

    const nodeName = nodePath.split('/').pop();
    await tryAcquire(resPath, nodeName, clientId, mode);
    if (!heldLocks.has(clientId)) heldLocks.set(clientId, new Set());
    heldLocks.get(clientId).add(resource);

    return { resPath, nodePath };
}

/** 判断是否可以获得锁,否则监听前一个节点 */
async function tryAcquire(resPath, nodeName, clientId, mode) {
    const children = await new Promise((resolve, reject) => {
        client.getChildren(resPath, (err, list) => (err ? reject(err) : resolve(list)));
    });

    children.sort();
    const index = children.indexOf(nodeName);

    if (mode.startsWith('read')) {
        const before = children.slice(0, index);
        const blockingWrite = before.find((n) => n.startsWith('write'));
        if (!blockingWrite) {
            console.log(`📖 [${clientId}] Got READ lock on ${resPath.split('/').pop()}`);
            return;
        }
        await waitForDelete(`${resPath}/${blockingWrite}`);
        return tryAcquire(resPath, nodeName, clientId, mode);
    } else {
        // 写锁必须等所有前面的节点都删除
        if (index === 0) {
            console.log(`✍️ [${clientId}] Got WRITE lock on ${resPath.split('/').pop()}`);
            return;
        }
        const prev = children[index - 1];
        await waitForDelete(`${resPath}/${prev}`);
        return tryAcquire(resPath, nodeName, clientId, mode);
    }
}

/** 等待某节点被删除 */
function waitForDelete(path) {
    return new Promise((resolve) => {
        client.exists(
            path,
            (event) => {
                if (event.getType() === zookeeper.Event.NODE_DELETED) resolve();
            },
            (err, stat) => {
                if (!stat) resolve();
            }
        );
    });
}

/** 释放锁(支持可重入引用计数) */
async function releaseLock(resource, nodePath, clientId) {
    const held = heldLocks.get(clientId);
    if (!held?.has(resource)) return;

    held.delete(resource);
    if (held.size === 0) heldLocks.delete(clientId);

    if (!nodePath) {
        console.log(`🔁 [${clientId}] Reentrant release skip for ${resource}`);
        return;
    }

    return new Promise((resolve) => {
        client.remove(nodePath, (err) => {
            if (err) console.error(`❌ [${clientId}] Release error:`, err);
            else console.log(`🔓 [${clientId}] Released lock on ${resource}`);
            resolve();
        });
    });
}

/** 模拟读/写操作 */
async function doWork(clientId, resource, mode) {
    console.log(`🧠 [${clientId}] Doing ${mode.toUpperCase()} work on ${resource}...`);
    await sleep(2000 + Math.random() * 2000);
}

/** 模拟多个客户端竞争锁 */
async function simulateClients() {
    const clients = [
        { id: 'C1', res: 'book-123', mode: 'read' },
        { id: 'C2', res: 'book-123', mode: 'read' },
        { id: 'C3', res: 'book-123', mode: 'write' },
        { id: 'C4', res: 'book-123', mode: 'read' },
        { id: 'C5', res: 'book-123', mode: 'write' },
    ];

    for (const c of clients) {
        (async () => {
            const { resPath, nodePath } = await acquireLock(c.res, c.id, c.mode);
            await doWork(c.id, c.res, c.mode);
            await releaseLock(c.res, nodePath, c.id);
        })();
    }

    // 测试可重入性
    setTimeout(async () => {
        const r1 = await acquireLock('book-123', 'C1', 'read');
        const r2 = await acquireLock('book-123', 'C1', 'read');
        await doWork('C1', 'book-123', 'read');
        await releaseLock('book-123', r2.nodePath, 'C1');
        await releaseLock('book-123', r1.nodePath, 'C1');
    }, 8000);
}

分布式计数器

  1. 实现目标

我们要做的是一个 分布式安全自增计数器,多个客户端同时执行 increment(),但计数结果仍然正确且不会重复。

例如:

  1. 实现原理

ZooKeeper 并没有原生的“计数器”类型,但我们可以用 CAS 版本号机制 来实现原子自增:

  1. distributed_counter.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const COUNTER_PATH = '/distributed_counter';
const client = zookeeper.createClient(ZK_SERVERS);
client.connect();

// 初始化计数器节点
function ensureCounterNode() {
    return new Promise((resolve, reject) => {
        client.exists(COUNTER_PATH, (err, stat) => {
            if (err) return reject(err);
            if (stat) return resolve();

            client.create(COUNTER_PATH, Buffer.from('0'), (err) => {
                if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
                    return reject(err);
                }
                console.log('✅ Counter node initialized.');
                resolve();
            });
        });
    });
}

// 获取当前计数器值
function getCounterValue() {
    return new Promise((resolve, reject) => {
        client.getData(COUNTER_PATH, (err, data, stat) => {
            if (err) return reject(err);
            const value = parseInt(data.toString());
            resolve({ value, stat });
        });
    });
}

// 使用 CAS(compare-and-set)机制自增计数器
async function incrementCounter(retry = 0) {
    try {
        const { value, stat } = await getCounterValue();
        const newValue = value + 1;

        await new Promise((resolve, reject) => {
            client.setData(COUNTER_PATH, Buffer.from(String(newValue)), stat.version, (err) => {
                if (err) {
                    if (err.getCode() === zookeeper.Exception.BAD_VERSION) {
                        if (retry < 5) {
                            console.log(`🔁 Version conflict, retry ${retry + 1}`);
                            setTimeout(() => incrementCounter(retry + 1).then(resolve).catch(reject), 50);
                        } else {
                            reject(new Error('Too many retries'));
                        }
                    } else {
                        reject(err);
                    }
                } else {
                    console.log(`✅ Increment success: ${value}${newValue}`);
                    resolve(newValue);
                }
            });
        });

    } catch (err) {
        console.error('❌ Increment failed:', err.message);
    }
}

client.once('connected', async () => {
    console.log('🔗 Connected to ZooKeeper.');
    await ensureCounterNode();

    // 模拟多个客户端并发自增
    const tasks = Array.from({ length: 10 }, () => incrementCounter());
    await Promise.all(tasks);

    const { value } = await getCounterValue();
    console.log(`🎯 Final counter value = ${value}`);

    client.close();
});

配置管理

  1. 目标:集中式配置中心

多个服务实例都从 ZooKeeper 读取同一份配置,当配置在 ZooKeeper 中被修改时,所有客户端自动收到更新通知。

✅ 特点:

zookeeper-config-demo/
 ├─ config_client.js   # 模拟业务服务读取 ZooKeeper 配置
 ├─ config_updater.js  # 模拟管理员更新配置

config_client.js

// config_client.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const CONFIG_PATH = '/app_config';
const client = zookeeper.createClient(ZK_SERVERS);

client.connect();

// 监听配置变化
function watchConfig() {
    client.getData(CONFIG_PATH, (event) => {
        console.log('⚙️ 配置发生变化,重新加载...');
        watchConfig(); // 重新注册 watcher
        loadConfig();  // 重新读取新配置
    }, (err, data, stat) => {
        if (err) {
            if (err.getCode() === zookeeper.Exception.NO_NODE) {
                console.log('❌ 配置节点不存在,等待创建...');
                setTimeout(watchConfig, 2000);
                return;
            }
            return console.error('读取配置失败:', err);
        }

        try {
            const config = JSON.parse(data.toString());
            console.log('📦 当前配置:', config);
        } catch (e) {
            console.error('配置格式错误:', e.message);
        }
    });
}

function loadConfig() {
    client.getData(CONFIG_PATH, (err, data) => {
        if (!err && data) {
            console.log('🔄 更新后的配置:', JSON.parse(data.toString()));
        }
    });
}

client.once('connected', () => {
    console.log('🔗 已连接到 ZooKeeper');
    watchConfig();
});

config_updater.js

// config_updater.js
const zookeeper = require('node-zookeeper-client');
const ZK_SERVERS = '127.0.0.1:2181';
const CONFIG_PATH = '/app_config';
const client = zookeeper.createClient(ZK_SERVERS);

client.connect();

// 初始化配置节点
function ensureConfigNode(defaultConfig) {
    return new Promise((resolve, reject) => {
        client.exists(CONFIG_PATH, (err, stat) => {
            if (err) return reject(err);
            if (stat) return resolve();

            const data = Buffer.from(JSON.stringify(defaultConfig));
            client.create(CONFIG_PATH, data, (err) => {
                if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) {
                    return reject(err);
                }
                console.log('✅ 配置节点已创建');
                resolve();
            });
        });
    });
}

// 更新配置
function updateConfig(newConfig) {
    const data = Buffer.from(JSON.stringify(newConfig, null, 2));
    client.setData(CONFIG_PATH, data, (err, stat) => {
        if (err) return console.error('❌ 更新失败:', err);
        console.log('✅ 配置已更新:', newConfig);
        client.close();
    });
}

client.once('connected', async () => {
    console.log('🔗 已连接到 ZooKeeper');
    await ensureConfigNode({ max_players: 100, motd: "Welcome to the Game!" });

    // 模拟管理员修改配置
    setTimeout(() => {
        updateConfig({
            max_players: 200,
            motd: "⚡ Server upgraded! Enjoy the game!",
        });
    }, 3000);
});

分布式队列/工作队列

这种模式常用于 任务分发/消息队列,保证多节点消费者能 公平消费任务,并且任务不会丢失。

  1. 目标
  1. ZooKeeper 队列设计
/task_queue
    ├── task-0000000000
    ├── task-0000000001
    └── task-0000000002
  1. producer.js + consumer.js
// producer.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const QUEUE_PATH = '/task_queue';
const client = zookeeper.createClient(ZK_SERVERS);

client.connect();

function ensureQueueNode() {
    return new Promise((resolve, reject) => {
        client.exists(QUEUE_PATH, (err, stat) => {
            if (err) return reject(err);
            if (stat) return resolve();
            client.create(QUEUE_PATH, (err) => {
                if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) return reject(err);
                console.log('✅ 队列节点已创建');
                resolve();
            });
        });
    });
}

function produceTask(taskData) {
    const data = Buffer.from(JSON.stringify(taskData));
    client.create(
        `${QUEUE_PATH}/task-`,
        data,
        zookeeper.CreateMode.PERSISTENT_SEQUENTIAL,
        (err, path) => {
            if (err) return console.error('❌ 创建任务失败:', err);
            console.log('📦 任务已入队:', path);
        }
    );
}

client.once('connected', async () => {
    console.log('🔗 已连接到 ZooKeeper');
    await ensureQueueNode();

    // 模拟入队任务
    produceTask({ id: 1, msg: 'task1' });
    produceTask({ id: 2, msg: 'task2' });
    produceTask({ id: 3, msg: 'task3' });

    setTimeout(() => client.close(), 1000);
});
// consumer.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const QUEUE_PATH = '/task_queue';
const client = zookeeper.createClient(ZK_SERVERS);

client.connect();

function consumeTask() {
    client.getChildren(
        QUEUE_PATH,
        (event) => {
            console.log('⚡ 队列变化事件:', event);
            consumeTask(); // 重新注册 watcher
        },
        async (err, children) => {
            if (err) return console.error('❌ 获取任务失败:', err);
            if (!children || children.length === 0) return;

            children.sort(); // 按顺序消费
            const taskNode = children[0];
            const taskPath = `${QUEUE_PATH}/${taskNode}`;

            // 取数据并删除节点,实现任务消费
            client.getData(taskPath, (err, data) => {
                if (err) return console.error('❌ 读取任务失败:', err);
                const task = JSON.parse(data.toString());

                // 删除节点,表示任务完成
                client.remove(taskPath, (err) => {
                    if (err) return console.error('❌ 删除任务失败:', err);

                    console.log('✅ 消费任务:', task);
                    console.log('🗑 任务完成:', taskNode);
                });
            });
        }
    );
}

client.once('connected', () => {
    console.log('🔗 已连接到 ZooKeeper,开始消费任务...');
    consumeTask();
});

Barrier/同步栅栏

有的也叫 屏障。

这种模式常用于 分布式节点同步启动任务协同,保证所有节点在到达某个“关卡”之前都阻塞,等到所有节点都准备好了才统一放行。

  1. 目标
  1. ZooKeeper节点设计
/barrier
    ├── node-0000000000
    ├── node-0000000001
    ├── node-0000000002
// barrier.js
const zookeeper = require('node-zookeeper-client');

const ZK_SERVERS = '127.0.0.1:2181';
const BARRIER_PATH = '/barrier';
const NODE_COUNT = 3; // Barrier 阈值
const client = zookeeper.createClient(ZK_SERVERS, {
    sessionTimeout: 30000, // 30s session
    spinDelay: 1000,
    retries: 5
});

client.connect();

// --------------------
// 辅助函数:确保 Barrier 根节点存在
// --------------------
function ensureBarrierNode() {
    return new Promise((resolve, reject) => {
        client.exists(BARRIER_PATH, (err, stat) => {
            if (err) return reject(err);
            if (stat) return resolve();
            client.create(BARRIER_PATH, (err) => {
                if (err && err.getCode() !== zookeeper.Exception.NODE_EXISTS) return reject(err);
                console.log('✅ Barrier 节点已创建');
                resolve();
            });
        });
    });
}

// --------------------
// 获取子节点带重试机制
// --------------------
function getChildrenWithRetry(path, maxRetries = 5) {
    return new Promise((resolve, reject) => {
        let attempt = 0;

        function tryGet() {
            client.getChildren(
                path,
                (event) => {
                    // watcher 触发时重新注册,但加延迟避免频繁调用
                    setTimeout(() => tryGet(), 50);
                },
                (err, children) => {
                    if (!err) return resolve(children);
                    if (err.getCode && err.getCode() === zookeeper.Exception.CONNECTION_LOSS && attempt < maxRetries) {
                        attempt++;
                        console.log(`🔁 CONNECTION_LOSS, retry ${attempt}`);
                        setTimeout(tryGet, 100);
                    } else {
                        reject(err);
                    }
                }
            );
        }

        tryGet();
    });
}

// --------------------
// Barrier 核心逻辑
// --------------------
async function enterBarrier(nodeName) {
    // 创建临时顺序节点
    const path = await new Promise((resolve, reject) => {
        client.create(
            `${BARRIER_PATH}/node-`,
            Buffer.from(nodeName),
            zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL,
            (err, path) => {
                if (err) return reject(err);
                console.log(`🔗 ${nodeName} 已加入 Barrier: ${path}`);
                resolve(path);
            }
        );
    });

    // 等待所有节点到达
    await new Promise(async (resolve) => {
        async function checkBarrier() {
            try {
                const children = await getChildrenWithRetry(BARRIER_PATH);
                console.log(`⏳ 当前 Barrier 节点数量: ${children.length}/${NODE_COUNT}`);
                if (children.length >= NODE_COUNT) {
                    resolve();
                }
            } catch (err) {
                console.error('❌ 检查 Barrier 失败:', err.message);
                // 失败后延迟重试
                setTimeout(checkBarrier, 200);
            }
        }
        checkBarrier();
    });

    console.log(`🎉 ${nodeName} Barrier 放行,开始执行任务!`);

    // 安全延迟关闭 client
    setTimeout(() => {
        client.close();
        console.log(`🔒 ${nodeName} 客户端已关闭`);
    }, 500);
}

// --------------------
// 启动入口
// --------------------
client.once('connected', async () => {
    console.log('🔗 已连接到 ZooKeeper');
    await ensureBarrierNode();

    const nodeName = process.argv[2] || 'node-' + Math.floor(Math.random() * 1000);
    await enterBarrier(nodeName);
});