前言:马上要过年了,祝大家新年快乐!在过年回家前分享一篇关于Zookeeper的文章,我们都知道现在微服务盛行,大数据、分布式系统中经常会使用到Zookeeper,它是微服务、分布式系统中必不可少的分布式协调框架。它的作用体现在分布式系统中解决了配置中心的问题,以及解决了在分布式环境中不同进程之间争夺资源的问题,也就是分布式锁的功能以及分布式消息队列功能等等。所以在微服务的环境中Zookeeper是现在很多公司首选的分布式协调框架,包括我之前的公司也在使用Zookeeper。说了这么多,没别的就是想说一下Zookeeper的重要性,废话不多说,进入正题。本篇博客只是演示在.Net
Core
环境中如何使用Zookeeper组件进行基本的增删改查和一些注意的要点,如果对Zookeeper还不是太了解的话,建议认认真真、仔仔细细地阅读该文章:http://www.cnblogs.com/sunddenly/p/4033574.html 
 否则可能下面演示的你会看不懂。

 

一、Zookeeper基本概念快速介绍

概念:

Zookeeper是一个开源的分布式协调框架,它具有高性能
、高可用的特点,同时具有严格的顺序访问控制能力(主要是写操作的严格顺序性),基于对ZAB(Zookeeper原子消息广播协议)的实现,它能够很好的保证分布式环境下的数据一致性。也正是基于这样的特征,使得Zookeeper称为解决分布式数据一致性问题的利器,
Zookeeper由两部分组成:Zookeeper服务端和客户端。

特点:

* 全局一致性:每个server保存一份相同的数据副本,client无论链接哪个server,展示的数据都是一致的,这是最重要的特征。
* 可靠性:如果消息其中一台服务器接受,那么将被所有的服务器接受。
*
顺序性:包括全局有序性和偏序两种:全局有序是指如果在一台服务器上消息a在消息b前发布,则在所有server上消息a都将在消息b前被发布;偏序是指如果一个消息b在消息a后被同一个发送者发布,a必将排在b前面。
* 数据更新原子性:一次数据更新要么成功,要么失败,不存在中间状态。
* 实时性:Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失败的信息。
数据结构:



图片来源:(https://www.cnblogs.com/xums/p/7074008.html)

*
Zookeeper的数据结构模型采用类似于文件系统的树结构。树上的每个节点称为ZNode,而每个节点都可能有一个或者多个子节点。ZNode的节点路径标识方式是由一系列斜杠"/"进行分割的路径表示,必须是绝对路径。既可以向ZNode节点写入、修改和读取数据,也可以创建、删除ZNode节点或ZNode节点下的子节点。
*
值的注意的是,Zookeeper的设计目标不是传统的数据库存储或大数据对象存储,而是协同数据的存储,因此在实现的时候,ZNode存储的数据大小不应该超过1MB。另外,每一个节点都有一个ACL(访问控制列表),据此控制该节点的访问权限。
* ZNode数据节点是有生命周期的,其生命周期的长短取决于数据节点的节点类型。节点类型共有四种:持久节点、持久顺序节点、临时节点、临时顺序节点
 


好了,基本的概念就聊到这里,先有一个印象,如果需要详细的学习,建议认认真真阅读这篇博客:http://www.cnblogs.com/sunddenly/p/4033574.html,下面就开始演示基本的api操作。

 

二、ASP.Net Core 中使用ZooKeeper

 首先,添加下面的依赖包:

 

新建一个.Net Core的控制台应用:




Zookeeper的服务端使用的是张辉清老师新书《中小研发团队架构实践》里面的服务,我这里不再安装Zookeeper服务端,只是介绍一下Zookeeper的目录结构

* Zookeeper目录介绍
(1)bin:主要的一些运行命令

(2)conf:存放配置文件,其中我们需要修改zk.cfg

(3)contrib:附加的一些功能

(4)dist-maven:mvn编译后的目录

(5)docs:文档

(6)lib:需要依赖的jar包

配置文件zk.cfg文件内容介绍(单机版)

(1)trickTime:用于计算的时间单元,比如session超时:N*trickTime

(2)initLimit:用于集群,允许从节点链接并同步到master节点的初始化链接时间,以trickTime的倍数来表示

(3)syncLimit:用于集群,master主节点与从节点之间发送消息,请求和应答时间长度(心跳机制)

(4)dataDir:必须配置

(5)dataLogDir:日志目录,如果不配置会和dataDir公用

(6)clientPort:链接服务器的端口,默认是2181

好了就介绍到这里,下面让我会演示关于Zookeeper  API的各种操作。

* 如何连接Zookeeper的服务端
(1)代码如下:
using org.apache.zookeeper; using org.apache.zookeeper.data; using System;
using System.Collections.Generic; using System.Text; using System.Threading;
using System.Threading.Tasks; using static org.apache.zookeeper.Watcher.Event;
namespace ZookeeperNetCore { public class ZookeeperClient { public ZooKeeper ZK
{get; set; } // 配置项 public string QueryPath { get; set; }= "/Configuration"; //
节点状态信息 public Stat Stat { get; set; } // 配置数据 public byte[] ConfigData { get;
set; } = null; public ZookeeperClient(string serviceAddress, int timeout) { ZK
= new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this));
Console.WriteLine("客户端开始连接zookeeper服务器..."); Console.WriteLine($"
连接状态:{ZK.getState()}"); Thread.Sleep(1000);//注意:为什么要加上这行代码,如果不加会出现什么问题
Console.WriteLine($"连接状态:{ZK.getState()}"); } // 读取节点的配置数据 public async Task<
string> ReadConfigDataAsync() { if (this.ZK == null) { return string.Empty; }
var stat = await ZK.existsAsync(QueryPath, true); if (stat == null) { return
string.Empty; } this.Stat = stat; var dataResult = await
ZK.getDataAsync(QueryPath,true); return
Encoding.UTF8.GetString(dataResult.Data); }public class ConfigServiceWatcher :
Watcher { private ZookeeperClient _cs = null; public
ConfigServiceWatcher(ZookeeperClient cs) { _cs= cs; } public override async
Task process(WatchedEvent @event) { Console.WriteLine($"
Zookeeper链接成功:{@event.getState() == KeeperState.SyncConnected}"); if
(@event.get_Type() == EventType.NodeDataChanged) { var data = await
_cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此节点【{1}】值的通知,其值已被改为【{2}】。"
, Environment.NewLine, _cs.QueryPath, data); } } } } }


解释:

 首先,我们来看看创建Zookeeper对象时,应该注意的问题:



Zookeeper的构造函数参数解释如下:




客户端和zk服务端链接是一个异步的过程,当连接成功后后,客户端会收的一个watch通知,就是调用回调函数:ConfigServiceWatcher.process(WatchedEvent
@event)注意这个类ConfigServiceWatcher必须要继承Watcher,重写 process(WatchedEvent
@event),所以就打印出了。关于Zookeeper的watcher后面会详细介绍,不明白的不要紧,后面会通过代码给大家演示。

(1)connectString:连接服务器的ip字符串,比如:
"192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群,也可以在ip后加路径。

(2)sessionTimeout:超时时间,心跳收不到了,那就超时

(3)watcher:通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,那就设置为null,在上面的演示中,我们设置了一个watcher。


(4)canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用。

(5)sessionId:会话的id

(6)sessionPasswd:会话密码 当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话。

好了,基本的参数已经介绍完毕,那么,来解释一下为什么在创建Zookeeper对象时添加下面这句代码:



其实上面我已经解释了,由于客户端和zk服务端链接是一个异步的过程,需要一定的时间间隔,所以,如果不添加效果这样:



 

(2)zookeeper 恢复之前的会话连接演示
using org.apache.zookeeper; using org.apache.zookeeper.data; using System;
using System.Collections.Generic; using System.Text; using System.Threading;
using System.Threading.Tasks; using static org.apache.zookeeper.Watcher.Event;
namespace ZookeeperNetCore { public class ZookeeperClient { public ZooKeeper ZK
{get; set; } // 配置项 public string QueryPath { get; set; }= "/Configuration"; //
节点状态信息 public Stat Stat { get; set; } // 配置数据 public byte[] ConfigData { get;
set; } = null; public ZookeeperClient(string serviceAddress, int timeout) { ZK =
new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this)); } public
ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[]
sessionPasswd) { ZK= new ZooKeeper(serviceAddress, timeout, new
ConfigServiceWatcher2(this), sessionId, sessionPasswd); } // 读取节点的配置数据 public
async Task<string> ReadConfigDataAsync() { if (this.ZK == null) { return string
.Empty; }var stat = await ZK.existsAsync(QueryPath, true); if (stat == null) {
return string.Empty; } this.Stat = stat; var dataResult = await
ZK.getDataAsync(QueryPath,true); return
Encoding.UTF8.GetString(dataResult.Data); }public class ConfigServiceWatcher :
Watcher {private ZookeeperClient _cs = null; public
ConfigServiceWatcher(ZookeeperClient cs) { _cs= cs; } public override async
Task process(WatchedEvent @event) { Console.WriteLine($"
Zookeeper链接成功:{@event.getState() == KeeperState.SyncConnected}"); if
(@event.get_Type() == EventType.NodeDataChanged) { var data = await
_cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此节点【{1}】值的通知,其值已被改为【{2}】。"
, Environment.NewLine, _cs.QueryPath, data); } } }public class
ConfigServiceWatcher2 : Watcher {private ZookeeperClient _cs = null; public
ConfigServiceWatcher2(ZookeeperClient cs) { _cs= cs; } public override async
Task process(WatchedEvent @event) { Console.WriteLine($"
Zookeeper链接成功:{@event.getState() == KeeperState.SyncConnected}"); if
(@event.get_Type() == EventType.NodeDataChanged) { var data = await
_cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此节点【{1}】值的通知,其值已被改为【{2}】。"
, Environment.NewLine, _cs.QueryPath, data); } } } } }




 

*  ZNode创建删除修改查询
代码:
using org.apache.zookeeper; using org.apache.zookeeper.data; using System;
using System.Collections.Generic; using System.Text; using System.Threading;
using System.Threading.Tasks; using static org.apache.zookeeper.Watcher.Event;
using static org.apache.zookeeper.ZooDefs; namespace ZookeeperNetCore { public
class ZookeeperClient { public ZooKeeper ZK { get; set; } // 配置项 public string
QueryPath {get; set; }= "/Configuration"; //节点状态信息 public Stat Stat { get; set;
}// 配置数据 public byte[] ConfigData { get; set; } = null; public ZookeeperClient(
string serviceAddress, int timeout) { ZK = new ZooKeeper(serviceAddress,
timeout,new ConfigServiceWatcher(this)); } public ZookeeperClient(string
serviceAddress,int timeout, long sessionId, byte[] sessionPasswd) { ZK = new
ZooKeeper(serviceAddress, timeout,new ConfigServiceWatcher2(this), sessionId,
sessionPasswd); }// 读取节点的配置数据 public async Task<string> ReadConfigDataAsync() {
if (this.ZK == null) { return string.Empty; } var stat = await
ZK.existsAsync(QueryPath,true); if (stat == null) { return string.Empty; } this
.Stat = stat; var dataResult = await ZK.getDataAsync(QueryPath, true); return
Encoding.UTF8.GetString(dataResult.Data); }public class ConfigServiceWatcher :
Watcher {private ZookeeperClient _cs = null; public
ConfigServiceWatcher(ZookeeperClient cs) { _cs= cs; } public override async
Task process(WatchedEvent @event) { Console.WriteLine($"
Zookeeper链接成功:{@event.getState() == KeeperState.SyncConnected}"); if
(@event.get_Type() == EventType.NodeDataChanged) { var data = await
_cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此节点【{1}】值的通知,其值已被改为【{2}】。"
, Environment.NewLine, _cs.QueryPath, data); } } }public class
ConfigServiceWatcher2 : Watcher {private ZookeeperClient _cs = null; public
ConfigServiceWatcher2(ZookeeperClient cs) { _cs= cs; } public override async
Task process(WatchedEvent @event) { Console.WriteLine($"
Zookeeper链接成功:{@event.getState() == KeeperState.SyncConnected}"); if
(@event.get_Type() == EventType.NodeDataChanged) { var data = await
_cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此节点【{1}】值的通知,其值已被改为【{2}】。"
, Environment.NewLine, _cs.QueryPath, data); } } }// 关闭ZooKeeper连接 // 释放资源
public async Task Close() { if (this.ZK != null) { await ZK.closeAsync(); } this
.ZK =null; } } } using org.apache.zookeeper; using System; using System.Text;
using System.Threading; using System.Threading.Tasks; using static
org.apache.zookeeper.ZooDefs;namespace ZookeeperNetCore { class Program { public
const int timeout = 5000; static async Task Main(string[] args) { var conf = new
ZookeeperClient("", timeout); try { conf.QueryPath = "/UserName";
Console.WriteLine("客户端开始连接zookeeper服务器..."); Console.WriteLine($"
连接状态:{conf.ZK.getState()}"); Thread.Sleep(1000);//注意:为什么要加上这行代码,如果不加会出现什么问题
Console.WriteLine($"连接状态:{conf.ZK.getState()}"); if (await
conf.ZK.existsAsync(conf.QueryPath,false) == null) { conf.ConfigData =
Encoding.Default.GetBytes("guozheng"); await
conf.ZK.createAsync(conf.QueryPath, conf.ConfigData, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT); }string configData = await conf.ReadConfigDataAsync();
Console.WriteLine("节点【{0}】目前的值为【{1}】。", conf.QueryPath, configData);
Console.ReadLine(); Random random= new Random((int)DateTime.Now.Ticks &
0x0000FFFF); conf.ConfigData = Encoding.UTF8.GetBytes(string.Format("Mike_{0}",
random.Next(100))); await conf.ZK.setDataAsync(conf.QueryPath, conf.ConfigData,
-1); Console.WriteLine("节点【{0}】的值已被修改为【{1}】。", conf.QueryPath,
Encoding.UTF8.GetString(conf.ConfigData)); Console.ReadLine();if (await
conf.ZK.existsAsync(conf.QueryPath,false) != null) { await
conf.ZK.deleteAsync(conf.QueryPath, -1); Console.WriteLine("已删除此【{0}】节点。{1}",
conf.QueryPath, Environment.NewLine); } }catch (Exception ex) { if (conf.ZK ==
null) { Console.WriteLine("已关闭ZooKeeper的连接。"); Console.ReadLine(); return; }
Console.WriteLine("抛出异常:{0}【{1}】。", Environment.NewLine, ex.ToString()); }
finally { await conf.Close(); Console.WriteLine("已关闭ZooKeeper的连接。");
Console.ReadLine(); }////开始会话重连 //Console.WriteLine("开始会话重连..."); //var conf2 =
new ZookeeperClient("", timeout, sessionId, sessionPassword);//
Console.WriteLine(conf2.ZK.getSessionId());//Console.WriteLine(
Encoding.UTF8.GetString(conf2.ZK.getSessionPasswd()));//
Console.WriteLine($"重新连接状态zkSession:{conf2.ZK.getState()}");//
Thread.Sleep(1000);//注意:为什么要加上这行代码,如果不加会出现什么问题 //
Console.WriteLine($"重新连接状态zkSession:{conf2.ZK.getState()}");
Console.ReadKey(); } } }
 



 解释:

关于异步创建节点的方法,是不支持子节点的递归创建,参数介绍:

(1)path:创建的路径

(2)data:存储的数据的byte[]

(3)acl:控制权限策略   Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa   
  CREATOR_ALL_ACL --> auth:user:password:cdrwa

(4)createMode: 节点类型, 是一个枚举    PERSISTENT:持久节点   PERSISTENT_SEQUENTIAL:持久顺序节点 
 EPHEMERAL:临时节点   EPHEMERAL_SEQUENTIAL:临时顺序节点

 关于上面参数引出来的知识点,需要几章来讲解,本篇文章先不介绍,后面会介绍。好了,关于.Net
Core中使用Zookeeper的介绍就到这里,关于上面演示的结果,我先抛出一个问题,大家可以思考一下:为什么“Zookeeper链接成功:True”会输出多次?也就是我们下节要讨论的Zookeeper的watcher机制。时间到了,收拾行李,准备一下回家啦,先写到这里,祝大家新年快乐!希望对你有帮助,过完年来见!

 

三、总结

 可能有些地方解释的不是太清楚,大家多多见谅,有些的不对的地方,希望能指正出来。

说明:演示代码里面使用的Zookeeper服务过一段时间能用,不能用的话,在评论区留言,后面用阿里云自己搭建一个。

 代码地址:

 https://github.com/guozheng007/ZookeeperNetCoreDemo

 

 

参考资料:

(1)张辉清:《中小研发团队架构实践》

(2) 风间影月:《ZooKeeper分布式专题与Dubbo微服务入门》
<https://www.imooc.com/u/3078817>

 (3)sunddenly:http://www.cnblogs.com/sunddenly/p/4033574.html

作者:郭峥

出处:http://www.cnblogs.com/runningsmallguo/

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接。

友情链接
ioDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信