分布式协调 - CSDN
精华内容
参与话题
  • 分布式协调技术主要用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种临界资源,防止造成"脏数据"的后果。 在这图中有三台机器,每台机器各跑一个应用程序。然后我们将这三台机器通过网络将...

     

    分布式协调技术主要用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种临界资源,防止造成"脏数据"的后果。

    在这图中有三台机器,每台机器各跑一个应用程序。然后我们将这三台机器通过网络将其连接起来,构成一个系统来为用户提供服务,对用户来说这个系统的架构是透明的,他感觉不到我这个系统是一个什么样的架构。那么我们就可以把这种系统称作一个分布式系统

    在这个分布式系统中如何对进程进行调度,我假设在第一台机器上挂载了一个资源,然后这三个物理分布的进程都要竞争这个资源,但我们又不希望他们同时进行访问,这时候我们就需要一个协调器,来让他们有序的来访问这个资源。这个协调器就是我们经常提到的那个,比如说"进程-1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。这个分布式锁也就是我们分布式协调技术实现的核心内容。

     

    什么是分布式锁?

    请看我的另外一篇博客:https://blog.csdn.net/qq_40722827/article/details/102993655

     

    如有其它问题,欢迎大家留言,我们一起讨论,一起学习,一起进步 

     

     

     

    展开全文
  • Zookeeper 分布式协调服务介绍

    千次阅读 2019-01-21 23:02:04
    公众号原文:Zookeeper 分布式协调服务介绍 博客原文:Zookeeper 分布式协调服务介绍 分布式系统 分布式系统的简单定义:分布式系统是一个硬件或软件组件分布在不同的网络计算机上,彼此之间仅仅通过消息传递进行...

    公众号原文:Zookeeper 分布式协调服务介绍
    博客原文:Zookeeper 分布式协调服务介绍

    分布式系统

    分布式系统的简单定义:分布式系统是一个硬件或软件组件分布在不同的网络计算机上,彼此之间仅仅通过消息传递进行通信和协调的系统。

    分布式系统的特征:

    • 分布性:系统中的计算机在空间上随意分布和随时变动
    • 对等性:系统中的计算机是对等的,没有主从之分
    • 并发性:并发性操作是非常常见的行为
    • 缺乏全局时钟:系统中的计算机具有明显的分布性,且缺乏一个全局的时钟序列控制,所以很难比较两个事件的先后
    • 故障总是会发生:任何在设计阶段考虑到的异常情况,一定会在系统实际运行中发生,并且还会遇到很多在设计时未考虑到的异常故障

    随着分布式架构的出现,越来越多的分布式应用会面临数据一致性问题。

    选择Zookeeper

    Zookeeper是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于它实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、master选举、分布式锁和分布式队列等功能。

    Zookeeper致力于提供一个高性能、高可用,具有严格的顺序访问控制能力的分布式协调服务;其主要的设计目标是简单的数据模型、可以构建集群、顺序访问、高性能。Zookeeper已经成为很多大型分布式项目譬如Hadoop、HBase、Storm、Solr等中的核心组件,用于分布式协调。

    Zookeeper可以保证如下分布式一致性特性

    • 顺序一致性:从同一个客户端发起的事务请求,最终将会严格地按照其发起顺序被应用到Zookeeper中去
    • 原子性:所有事务请求的处理结果在整个集群中所有的机器上的应用情况是一致的
    • 单一视图:无论客户端连接的是哪个Zookeeper服务器,其看到的服务器数据模型都是一致的
    • 可靠性:一旦服务端成功地应用了一个事务,并完成对客户端的响应,那么该事务所引起的服务端状态变更将会被一直保留下来,除非有另一个事务又对其进行了变更
    • 实时性:在一定的时间内,客户端最终一定能够从服务端上读取到最新的数据状态

    Zookeeper服务架构图

    Zookeeper 基本概念

    集群角色

    • Leader:客户端提供读和写服务
    • Follower:提供读服务,所有写服务都需要转交给Leader角色,参与选举
    • Observer:提供读服务,不参与选举过程,一般是为了增强Zookeeper集群的读请求并发能力

    会话 (session)

    • Zk的客户端与zk的服务端之间的连接
    • 通过心跳检测保持客户端连接的存活
    • 接收来自服务端的watch事件通知
    • 可以设置超时时间

    ZNode 节点

    ZNode 是Zookeeper中数据的最小单元,每个ZNode上可以保存数据(byte[]类型),同时可以挂在子节点,因此构成了一个层次化的命名空间,我们称之为树

    Zookeeper数据模型

    节点是有生命周期的,生命周期由节点类型决定:

    • 持久节点(PERSISTENT):节点创建后就一直存在于Zookeeper服务器上,直到有删除操作主动将其删除
    • 持久顺序节点(PERSISTENT_SEQUENTIAL):基本特性与持久节点一致,额外的特性在于Zookeeper会记录其子节点创建的先后顺序
    • 临时节点(EPHEMERAL):声明周期与客户端的会话绑定,客户端会话失效时节点将被自动清除
    • 临时顺序节点(EPHEMERAL_SEQUENTIAL):基本特性与临时节点一致,但添加了顺序的特性

    每个节点都有状态信息,抽象为 Stat 对象,状态属性如下:

    • czxid:节点被创建时的事务ID
    • mzxid:节点最后一个被更新时的事务ID
    • ctime:节点创建时间
    • mtime:节点最后一个被更新时间
    • version:节点版本号
    • cversion:子节点版本号
    • aversion:节点的ACL版本号
    • ephemeralOwner:创建该临时节点的会话的sessionID,若为持久节点则为0
    • dataLength:数据内容长度
    • numChildren:子节点数量

    权限控制ACL (Access Control Lists)

    • CREATE:创建子节点的权限
    • READ:获取节点数据和子节点列表的权限
    • WRITE:更新节点数据的权限
    • DELETE:删除子节点的权限
    • ADMIN:设置节点ACL的权限

    watcher机制

    Zookeeper 引入watcher机制来实现发布/订阅功能,能够让多个订阅者同时监听某一个节点对象,当这个节点对象状态发生变化时,会通知所有订阅者。

    Zookeeper的watcher机制主要包括客户端线程、客户端WatchManager、Zookeeper服务器三个部分。其工作流程简单来说:客户端在向Zookeeper服务器注册Watcher的同时,会将Watcher对象存储在客户端的WatchManager中;当Zookeeper服务器端触发Watcher事件后,会向客户端发送通知,客户端线程从WatchManager中取出对应的Watcher对象来执行回调逻辑

    Zookeeper Watcher机制概述

    可以设置的两种 Watcher

    NodeCache

    • 监听数据节点的内容变更
    • 监听节点的创建,即如果指定的节点不存在,则节点创建后,会触发这个监听

    PathChildrenCache

    • 监听指定节点的子节点变化情况
    • 包括新增子节点、子节点数据变更和子节点删除

    客户端命令

    Zookeeper的安装可参考官方文档

    # 启动客户端,默认为 localhost:2181
    bin/zkCli.sh     
    # 启动客户端,指定连接的Zookeeper地址
    bin/zkCli.sh -server ip:port 
    # create 创建一个节点,路径为 /test,内容为 some test data 
    create /test "some test data"
    # ls 列出指定节点下的所有子节点 
    ls /
    # get 获取指定节点的数据内容和属性
    get /test
    # set 更新指定节点的数据内容
    set /test "new test data"
    # delete 删除节点
    delete /test
    # rmr 删除非空节点
    rmr /test
    # stat 输出节点的状态信息
    stat /test
    

    四字命令

    四字命令可以查看Zookeeper服务器的一些信息,可以通过 telnet 和 nc 等方式执行四字命令,以执行 conf 命令为例

    # telnet 方式 执行Zookeeper的 conf 命令
    telnet localhost 2181
    conf
    # nc 方式 执行Zookeeper的 conf 命令
    echo conf | nc localhost 2181
    

    四字命令介绍

    • conf 命令用于输出Zookeeper服务器运行时的基本配置信息
    • cons 命令用于输出这台服务器上所有客户端连接的详细信息
    • crst 命令用于重置所有客户端的连接统计信息
    • dump 命令用于输出当前集群的所有会话信息
    • envi 命令用于输出Zookeeper所在服务器的运行时信息
    • ruok 命令用于输出当前Zookeeper服务器是否正在运行
    • stat 命令用于获取Zookeeper服务器的运行状态信息
    • srvr 命令与stat命令功能一致,但仅输出服务器自身的信息
    • srst 命令用于重置所有服务器的统计信息
    • wchs 命令用于输出当前服务器上管理的 watcher 的概要信息
    • wchc 命令用于输出当前服务器上管理的 watcher 的详细信息
    • wchp 命令与wchc功能非常相似,但输出信息以节点路径为单位进行归组
    • mntr 命令用于输出比stat命令更为详细的服务器统计信息

    Curator 客户端代码实例

    Curator 是 Apache 基金会的顶级项目之一,解决了很多Zookeeper客户端非常底层的细节开发工作,包括会话超时重连、反复注册Watcher、NodeExistsException异常等,提供了一套易用性和可读性更强的Fluent风格的客户端API框架,除此之外,curator还提供了Zookeeper各种应用场景(Recipe,如共享锁服务、master选举、分布式计数器等)的抽象封装。

    Zookeeper 的核心提交者 Patrick Hunt 对 Curator 的高度评价:

    Guava is to Java what Curator is to Zookeeper

    示例 - 增删查改

    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.imps.CuratorFrameworkState;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.data.Stat;
    
    public class CuratorCrud {
        // 集群模式则是多个ip
        private static final String zkServerIps = "master:2181,hadoop2:2181";
    
        public static void main(String[] args) throws Exception {
            // 创建节点
            String nodePath = "/testZK"; // 节点路径
            byte[] data = "this is a test data".getBytes(); // 节点数据
            byte[] newData = "new test data".getBytes(); // 节点数据
    
            // 设置重连策略ExponentialBackoffRetry, baseSleepTimeMs:初始sleep的时间,maxRetries:最大重试次数,maxSleepMs:最大重试时间
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 5);
            //(推荐)curator链接zookeeper的策略:RetryNTimes n:重试的次数 sleepMsBetweenRetries:每次重试间隔的时间
            // RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
            // (不推荐) curator链接zookeeper的策略:RetryOneTime sleepMsBetweenRetry:每次重试间隔的时间,这个策略只会重试一次
            // RetryPolicy retryPolicy2 = new RetryOneTime(3000);
            // 永远重试,不推荐使用
            // RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
            // curator链接zookeeper的策略:RetryUntilElapsed maxElapsedTimeMs:最大重试时间 sleepMsBetweenRetries:每次重试间隔 重试时间超过maxElapsedTimeMs后,就不再重试
            // RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
    
            // Curator客户端
            CuratorFramework client = null;
            // 实例化Curator客户端,Curator的编程风格可以让我们使用方法链的形式完成客户端的实例化
            client = CuratorFrameworkFactory.builder()  // 使用工厂类来建造客户端的实例对象
                    .connectString(zkServerIps) // 放入zookeeper服务器ip
                    .sessionTimeoutMs(10000).retryPolicy(retryPolicy)  // 设定会话时间以及重连策略
                    // .namespace("testApp")    // 隔离的命名空间
                    .build(); // 建立连接通道
            // 启动Curator客户端
            client.start();
    
            boolean isZkCuratorStarted = client.getState().equals(CuratorFrameworkState.STARTED);
            System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
            try {
                // 检查节点是否存在
                Stat s = client.checkExists().forPath(nodePath);
                if (s == null) {
                    System.out.println("节点不存在,创建节点");
                    // 创建节点
                    String result = client.create().creatingParentsIfNeeded()    // 创建父节点,也就是会递归创建
                            .withMode(CreateMode.PERSISTENT) // 节点类型
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) // 节点的ACL权限
                            .forPath(nodePath, data);
                    System.out.println(result + "节点,创建成功...");
                } else {
                    System.out.println("节点已存在," + s);
                }
                getData(client, nodePath);  // 输出节点信息
    
                // 更新指定节点的数据
                int version = s == null ? 0 : s.getVersion();  // 版本不一致时的异常:KeeperErrorCode = BadVersion
                Stat resultStat = client.setData().withVersion(version)   // 指定数据版本
                        .forPath(nodePath, newData);    // 需要修改的节点路径以及新数据
                System.out.println("更新节点数据成功");
                getData(client, nodePath);  // 输出节点信息
    
                // 删除节点
                client.delete().guaranteed()    // 如果删除失败,那么在后端还是会继续删除,直到成功
                        .deletingChildrenIfNeeded() // 子节点也一并删除,也就是会递归删除
                        .withVersion(resultStat.getVersion())
                        .forPath(nodePath);
                System.out.println("删除节点:" + nodePath);
                Thread.sleep(1000);
            } finally {
                // 关闭客户端
                if (!client.getState().equals(CuratorFrameworkState.STOPPED)) {
                    System.out.println("关闭客户端.....");
                    client.close();
                }
                isZkCuratorStarted = client.getState().equals(CuratorFrameworkState.STARTED);
                System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..."));
            }
        }
    
        /**
         * 读取节点的数据
         */
        private static byte[] getData(CuratorFramework client, String nodePath) throws Exception {
            Stat stat = new Stat();
            byte[] nodeData = client.getData().storingStatIn(stat).forPath(nodePath);
            System.out.println("节点 " + nodePath + " 的数据为:" + new String(nodeData));
            System.out.println("该节点的数据版本号为:" + stat.getVersion() + "\n");
            return nodeData;
        }
    }
    
    // 输出
    当前客户端的状态:连接中...
    节点不存在,创建节点
    /testZK节点,创建成功...
    节点 /testZK 的数据为:this is a test data
    该节点的数据版本号为:0
    更新节点数据成功
    节点 /testZK 的数据为:new test data
    该节点的数据版本号为:1
    删除节点:/testZK
    关闭客户端.....
    当前客户端的状态:已关闭...
    

    示例 - 异步接口

    public class CuratorBackGround {
        private static final String zkServerIps = "master:2181,hadoop2:2181";
    
        public static void main(String[] args) throws Exception {
            CountDownLatch samphore = new CountDownLatch(2);
            ExecutorService tp = Executors.newFixedThreadPool(2);   // 线程池
            String nodePath = "/testZK";
            byte[] data = "this is a test data".getBytes();
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 5);
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkServerIps)
                    .sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
            client.start();
    
            // 异步创建节点,传入 ExecutorService,这样比较复杂的就会放到线程池中执行
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                    .inBackground(new BackgroundCallback() {
                        @Override
                        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                            System.out.println("event[code: " + curatorEvent.getResultCode() + ", type: " + curatorEvent.getType() + "]");
                            System.out.println("当前线程:" + Thread.currentThread().getName());
                            samphore.countDown();
                        }
                    }, tp).forPath(nodePath, data); // 此处传入 ExecutorService tp
    
            // 异步创建节点
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                    .inBackground(new BackgroundCallback() {
                        @Override
                        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                            System.out.println("event[code: " + curatorEvent.getResultCode() + ", type: " + curatorEvent.getType() + "]");
                            System.out.println("当前线程:" + Thread.currentThread().getName());
                            samphore.countDown();
                        }
                    }).forPath(nodePath, data); // 此处没有传入 ExecutorService tp
    
            samphore.await();
            tp.shutdown();
        }
    }
    
    // 输出
    event[code: -110, type: CREATE]
    当前线程:main-EventThread
    event[code: 0, type: CREATE]
    当前线程:pool-1-thread-1
    

    示例 - watcher 事件监听 - NodeCache

    public class CuratorWatcher {
        private static final String zkServerIps = "master:2181,hadoop2:2181";
    
        public static void main(String[] args) throws Exception {
            final String nodePath = "/testZK";
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 5);
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkServerIps)
                    .sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
            try {
                client.start();
                client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(nodePath, "this is a test data".getBytes());
    
                final NodeCache cacheNode = new NodeCache(client, nodePath, false);
                cacheNode.start(true);  // true 表示启动时立即从Zookeeper上获取节点
                cacheNode.getListenable().addListener(new NodeCacheListener() {
                    @Override
                    public void nodeChanged() throws Exception {
                        System.out.println("节点数据更新,新的内容是: " + new String(cacheNode.getCurrentData().getData()));
                    }
                });
                for (int i = 0; i < 5; i++) {
                    client.setData().forPath(nodePath, ("new test data " + i).getBytes());
                    Thread.sleep(1000);
                }
                Thread.sleep(10000); // 等待100秒,手动在 zkCli 客户端操作节点,触发事件
            } finally {
                client.delete().deletingChildrenIfNeeded().forPath(nodePath);
                client.close();
                System.out.println("客户端关闭......");
            }
        }
    }
    

    示例 - watcher 事件监听 - PathChildrenCache

    public class CuratorPCWatcher {
        private static final String zkServerIps = "master:2181,hadoop2:2181";
    
        public static void main(String[] args) throws Exception {
            final String nodePath = "/testZK";
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 5);
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkServerIps)
                    .sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
            client.start();
            try {
                // 为子节点添加watcher,PathChildrenCache: 监听数据节点的增删改,可以设置触发的事件
                final PathChildrenCache childrenCache = new PathChildrenCache(client, nodePath, true);
    
                /**
                 * StartMode: 初始化方式
                 *  - POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
                 *  - NORMAL:异步初始化
                 *  - BUILD_INITIAL_CACHE:同步初始化
                 */
                childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
    
                // 列出子节点数据列表,需要使用BUILD_INITIAL_CACHE同步初始化模式才能获得,异步是获取不到的
                List<ChildData> childDataList = childrenCache.getCurrentData();
                System.out.println("当前节点的子节点详细数据列表:");
                for (ChildData childData : childDataList) {
                    System.out.println("\t* 子节点路径:" + new String(childData.getPath()) + ",该节点的数据为:" + new String(childData.getData()));
                }
    
                // 添加事件监听器
                childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                    @Override
                    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                        // 通过判断event type的方式来实现不同事件的触发
                        if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子节点初始化时触发
                            System.out.println("子节点初始化成功");
                        } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 添加子节点时触发
                            System.out.print("子节点:" + event.getData().getPath() + " 添加成功,");
                            System.out.println("该子节点的数据为:" + new String(event.getData().getData()));
                        } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 删除子节点时触发
                            System.out.println("子节点:" + event.getData().getPath() + " 删除成功");
                        } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子节点数据时触发
                            System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功,");
                            System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData()));
                        }
                    }
                });
                Thread.sleep(100000); // sleep 100秒,在 zkCli.sh 操作子节点,注意查看控制台的输出
            } finally {
                client.close();
            }
        }
    }
    
    // 输出
    当前节点的子节点详细数据列表:
    	* 子节点路径:/testZK/node1,该节点的数据为:hello world
    子节点:/testZK/node2 添加成功,该子节点的数据为:hello node2
    子节点:/testZK/node2 数据更新成功,子节点:/testZK/node2 新的数据为:hello zookeeper
    子节点:/testZK/node2 删除成功
    

    Zookeeper的典型应用场景

    下一篇文章将使用 Curator 客户端来实现 Zookeeper 的典型应用场景的示例,这里简单概括一下Zookeeper的典型应用场景:

    • 数据发布/订阅,即所谓的配置中心
    • 负载均衡
    • 命名服务
    • 分布式协调/通知
    • 集群管理
    • master 选举
    • 分布式锁
    • 分布式队列

    参考:
    《从Paxos到Zookeeper分布式一致性原理与实践》

    关注_小旋锋_微信公众号

    展开全文
  • 点击上方“Java之间”,选择“置顶或者星标”你关注的就是我关心的!作者:林湾村龙猫微信公众号:林湾村龙猫(rudy_tan_home)随着互联网技术的发展,大型网站需要...
        

    点击上方Java之间”,选择“置顶或者星标”

    你关注的就是我关心的!

    640?wx_fmt=jpeg

    作者:林湾村龙猫

    微信公众号:林湾村龙猫(rudy_tan_home)

    随着互联网技术的发展,大型网站需要的计算能力和存储能力越来越高,网站架构逐渐从集中式转变成分布式系统。

    虽然分布式相对于集中式系统有比较多的优势,比如更高更强的计算、存储、处理能力等。但同时也引入了其他一些问题,比如如何在分布式系统中保证数据的一致性和可用性。

    在日常中,如果两个员工或用户对某件事产生了分歧,通常我们的做法是找上级,去做数据和信息的同步。

    那么对于我们的服务呢,多个节点之间数据不同步如何处理?

    在单机发展到集群、分布式服务的过程中,每一件技术或工具都走向了系统化,专一化的道路。举个例子,在单体应用中,如果多个线程想对同一个变量进行修改,我们通常的做法是对要修改的变量或资源加锁。那么对于集群来说,并没有这样的东西,我们应该怎么做?

    对于分布式集群来说,这个时候,我们通常需要一个能够在各个服务或节点之间进行协调的服务或中间人

    架构设计中,没有一个问题不能通过一层抽象层来解决,如果有,那就是两层。

    640?wx_fmt=png

    我们可以一起看看,协调服务中的佼佼者 --ZooKeeper

    zookeeper 起源

    640?wx_fmt=png

    最初,在 Hadoop 生态中,会存在很多的服务或组件(比如 hive、pig 等),每个服务或组件之间进行协调处理是很麻烦的一件事情,急需一种高可用高性能数据强一致性的协调框架。因此雅虎的工程师们创造了这个中间程序,但中间程序的命名却愁死了开发人员,突然想到 hadoop 中的大多是动物名字,似乎缺乏一个管理员,这个程序的功能有是如此的相似。因此 zookeeper 诞生。

    zookeeper 提供了哪些特性,以便于能够很好的完成协调能力的处理呢?

    功能与特性

    数据存储

    zookeeper 提供了类似 Linux 文件系统一样的数据结构。每一个节点对应一个 Znode 节点,每一个 Znode 节点都可以存储 1MB(默认)的数据。

    客户端对 zk 的操作就是对 Znode 节点的操作。

    640?wx_fmt=png

    • Znode: 包含 ACL 权限控制、修改 / 访问时间、最后一次操作的事务 Id(zxid) 等等

    • 说有数据存储在内存中,在内存中维护这么一颗树。

    • 每次对 Znode 节点修改都是保证顺序和原子性的操作。写操作是原子性操作。

    举个例子,在注册中心中,可以通过路径 "/fsof / 服务名 1/providers" 找到 "服务 1" 的所有提供者。

    每一个 Znode 节点又根据节点的生命周期与类型分为 4 种节点。

    640?wx_fmt=png

    • 生命周期:当客户端会话结束的时候,是否清理掉这个会话创建的节点。持久 - 不清理,临时 - 清理。

    • 类型:每一个会话,创建单独的节点(例子:正常节点:rudytan, 顺序编号节点:rudytan001,rudytan002 等等)

    监听机制

    zookeeper 除了提供对 Znode 节点的处理能力,还提供了对节点的变更进行监听通知的能力。

    640?wx_fmt=png

    监听机制的步骤如下:

    1. 任何 session(session1,session2) 都可以对自己感兴趣的 znode 监听。

    2. 当 znode 通过 session1 对节点进行了修改。

    3. session1,session2 都会收到 znode 的变更事件通知。

    节点常见的事件通知有:

    • session 建立成功事件

    • 节点添加

    • 节点删除

    • 节点变更

    • 子节点列表变化

    需要特别说明的是:

    一次监听事件,只会被触发一次,如果想要监听到 znode 的第二次变更,需要重新注册监听。

    到这里,我们了解到 zookeeper 提供的能力,那我们在哪些场景可以使用它?如何使用它呢?

    应用场景

    zookeeper 用得比较多的地方可能是,微服务的集群管理与服务注册与发现。

    注册中心

    640?wx_fmt=png

    • 依赖于临时节点

    • 消费者启动的时候,会先去注册中心中全量拉取服务的注册列表。

    • 当某个服务节点有变化的时候,通过监听机制做数据更新。

    • zookeeper 挂了,不影响消费者的服务调用。

    目前还有个比较流行的服务 Eureka 也可以做注册中心,他们有什么优势和劣势呢?留个疑问,哈哈哈。

    分布式锁

    640?wx_fmt=png

    • 依赖于临时顺序节点

    • 判断当前 client 的顺序号是否是最小的,如果是获取到锁。

    • 没有获取到锁的节点监听最小节点的删除事件(比如 lockkey001)

    • 锁释放,最小节点删除,剩余节点重新开始获取锁。

    • 重复步骤二到四。

    redis 和 db 也能创建分布式锁,哪有什么异同呢?留个疑问,哈哈哈。

    集群管理与 master 选举

    640?wx_fmt=png

    • 依赖于临时节点

    • zookeeper 保证无法重复创建一个已存在的数据节点,创建成功的 client 为 master。

    • 非 master,在已经创建的节点上注册节点删除事件监听。

    • 当 master 挂掉后,其他集群节点收到节点删除事件,进行重新选举

    • 重复步骤二到四

    当然还有其他应用场景,不一一列举了。

    有人说,zookeeper 可以做分布式配置中心、分布式消息队列,看到这里的小伙伴们,你们觉得合适么?

    到这里,可以基本上满足基于 zk 应用开发的理论知识储备。对原理或有更强求知欲的小伙伴可以继续往下看,接下来聊聊 zookeeper 如何做到高性能高可用强一致性的。

    高性能高可用强一致性保障

    高性能 - 分布式集群

    640?wx_fmt=png

    高性能,我们通常想到的是通过集群部署来突破单机的性能瓶颈。对于 zk 来说,就是通过部署多个节点共同对外提供服务,来提供读的高性能。

    • Master/Slave 模式。

    • 在 zookeeper 中部署多台节点对外提供服务,客户端可以连接到任意一个节点。

    • 每个节点的数据都是一样的。

    • 节点根据角色分为 Leader 节点与 Learner 节点(包括 Follower 节点与 Observer 节点)。

    • 集群中,只有一个 Leader 节点,完成所有的写请求处理。

    • 每次写请求都会生成一个全局的唯一的 64 位整型的事务 ID(可以理解为全局的数据的版本号)。

    • Learner 节点可以有很多,每个 Leaner 可以独自处理读请求,转写请求到 Leader 节点。

    • 当 Leader 节点挂掉后,会从 Follower 节点中通过选举方式选出一个 Leader 提供对外服务。

    • Follower 节点与 Observer 节点区别在于不参与选举和提议的事务过半处理。

    • 集群通常是按照奇数个节点进行部署(偶然太对容灾没啥影响,浪费机器)。

    数据一致性(zab 协议 - 原子广播协议)

    通过集群的部署,根据 CAP 原理,这样,可能导致同一个数据在不同节点上的数据不一致。zookeeper 通过 zab 原子广播协议来保证数据在每一个节点上的一致性。原子广播协议(类似 2PC 提交协议)大概分为 3 个步骤。

    640?wx_fmt=png

    • Leader 包装写请求,生成唯一 zxid,发起提议,广播给所有 Follower。

    • Follower 收到提议后,写入本地事务日志,根据自身情况,是否同意该事务的提交。

    • Leader 收到过半的 Follower 同意,自己先添加事务。然后对所有的 Learner 节点发送提交事务请求。

    需要说明的是,zookeeper 对数据一致性的要求是:

    • 顺序一致性:严格按照事务发起的顺序执行写操作。

    • 原子性:所有事务请求的结果在集群中的所有节点上的应用情况是一致的。

    • 单一视图:客户端访问任何一个节点,看到的数据模型都是一致的。

    • 实时性:保证在极小一段时间客户端最终可以从服务读取最新数据状态(如果要实时,需要客户端调用 syn 方法)。

    可用性 - leader 选举(zab 协议 - 崩溃恢复协议)

    在整个集群中,写请求都集中在一个 Leader 节点上,如果 Leader 节点挂了咋办呢?

    640?wx_fmt=png

    当集群初始化或 Follower 无法联系上 Leader 节点的时候,每个 Follower 开始进入选举模式。选举步骤如下:

    1. Follower 节点第一次投票先投自己,然后将自己的选票广播给剩余的 Follower 节点。

    2. Follower 节点接收到其他的选票。

    3. 选票比较:比较自己的与接收的选票的投票更有。

    4. 如果资金的选票不是最优选票,变更自己的选票,投最优选票的节点。

    5. 统计自己收到的选票,如果某个节点获得了过半的节点的投票。确认该节点为新的 Leader 节点。

    6. 确认 Leader 节点后,每个节点变更自己的角色。完成投票选举。

    选举原则:谁的数据最新,谁就有优先被选为 Leader 的资格。

    举个例子,假如现在 zk 集群有 5 个节点,然后挂掉了 2 个节点。剩余节点 S3,S4,S6 开始进行选举,他们的最大事务 ID 分别是 6,2,6。定义投票结构为(投票的节点 ID,被投节点 ID,被投节点最大事务 ID)。

    640?wx_fmt=png

    1. 初始状态,S3,S4,S5 分别投自己,并带上自己的最大事务 ID。

    2. S3,S4,S5 分别对自己收到的 2 票与自己的 1 票做比较。

    3. S5 发现自己的是最优投票,不变更投票,S3,S4 发现 S5 的投票是最优解,更改投票。

    4. S3,S4 广播自己变更的投票。

    5. 最后大家都确认了 S5 是 Leader,S5 节点状态变更为 Leader 节点,S3,S4 变更为 Follower 节点。

    到这里,就是选举的主要过程。

    数据的持久化

    640?wx_fmt=png

    • zookeeper 所有数据都存在内存中。

    • zookeeper 会定期将内存 dump 到磁盘中,形成数据快照。

    • zookeeper 每次的事务请求,都会先接入到磁盘中,形成事务日志。

    • 全量数据 = 数据快照 + 事务日志。

    比较

    前面也提到过,不同的应用场景似乎有不少替代品,我们就大概做个简单的比较。

    注册中心的比较(与 Netflix 的 Eureka 方案)

    640?wx_fmt=png

    通过上面的架构图,可以发现 Eureka 不同于 zk 中的节点,Eureka 中的节点每一个节点对等。是个 AP 系统,而不是 zk 的 CP 系统。在注册中心的应用场景下,相对于与强数据一致性,更加关心可用性。

    分布式锁(与 redis、mysql 比较)

    具体差异比较:

    • Redis:简单,可靠性不高

    • DB:稳定、性能有所不足

    • ZK: 比较高的性能和完整的锁的实现,但实现复杂度高,并发不高。

    通过这些比较,我们为什么还需要或在什么场景下使用 zookeeper 呢。我觉得就一句话:

    zookeeper,一种通用的分布式协调服务解决方案。

    感悟

    最后,说说在整个学习和使用 zk 过程中的一个感悟吧。

    • 没有银弹,每一种技术或方案都有其优点和缺点。

    • 做一件事情很简单,做好一件事件很难。

    640?wx_fmt=png

    最近热文阅读:

    1、讲讲亿级PV的负载均衡架构!

    2、Spring Boot中如何干掉if else

    3、一篇读懂聚集索引、非聚集索引、覆盖索引的工作原理!

    4、Maven 的这 7 个问题你思考过没有?

    5、最易懂的数据库异地多活方案

    6、分布式事务常见的5种解决方案

    7、为什么你的Intellij没别人的好用?

    8、秒杀系统流量削峰这事应该怎么做?

    640?wx_fmt=jpeg

    关注公众号,你想要的Java都在这里

    展开全文
  • 1、什么是分布式协调服务 分布式协调技术主要用来解决分布式环境当中的多个进程之间的同步控制,让他们有序的去访问某种临界资源,防止造成脏数据的后果 2、什么是分布式锁 为了防止分布式系统中的多个进程之间相互...

    1、什么是分布式协调服务

    分布式协调技术主要用来解决分布式环境当中的多个进程之间的同步控制,让他们有序的去访问某种临界资源,防止造成脏数据的后果
    在这里插入图片描述

    2、什么是分布式锁

    为了防止分布式系统中的多个进程之间相互干扰,需要一种分布式协调技术来对这些进程进行调度,而这个分布式协调技术的核心就是实现分布式锁。

    2.1、分布式锁应该具备哪些条件

    • 分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
    • 高可用的获取锁和释放锁(一直可以获取锁,一直可以释放锁 )
    • 高性能的获取锁和释放锁(获取锁和释放锁的速度一定要快)
    • 具备可重入特性(可以理解为重新进入,有多个任务并发使用,而不用担心数据错误)
    • 具备锁失效机制,防止死锁(为了防止一些在获取锁后服务突然崩溃,而释放不了锁的情况)
    • 具备非阻塞锁特性,即没有获取到锁,将直接返回获取锁失败

    2.2、分布式锁的实现有哪些

    • Redis:利用redis的setnx命令,此命令是原子性操作,只有当key不存在的时候,才可以set成功,也就相当于添加了锁
    • Zookeeper:利用zookeeper的顺序临时节点,来实现分布式锁和等待队列,zookeeper设计的初衷,就是为实现分布式锁服务的。

    2.3、 通过Redis实现分布式锁的过程和原理

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    2.3、 通过Zookeeper实现分布式锁的过程和原理

    2.3.1、什么是Zookeeper

    zookeeper是一种分布式协调服务,用于管理大型主机,在分布式环境中协调和管理服务是一个复杂的过程,zookeeper通过其简单的架构和API解决了这个问题,zookeeper允许开发人员专注于核心应用程序逻辑,而不用担心应用程序的分布式特性(比如像redis的加锁,解锁和锁超时等步骤)。

    2.3.1、Zookeeper的数据模型

    zookeeper是一个树型的目录服务,zookeeper的数据存储基于节点,这种节点叫Znode,Znode的引用方式是路径引用,类似于文件路径,比如下图中的 /动物/猫,这样的层级结构,让每一个Znode节点拥有唯一的路径,就像命名空间一样对不同的信息作出清晰的隔离。模型如下图所示:
    在这里插入图片描述

    2.3.2、Znode包含哪些元素

    在这里插入图片描述

    • data:Znode存储的数据信息
    • ACL:记录Znode的访问权限,即哪些人或哪些IP可以访问本节点
    • stat:包含Znode的各种元数据(数据的数据,比如数据的时间、大小等等),比如事务ID、版本号、时间戳、大小等。
    • child:当前节点的子节点引用

    注意:Zookeeper是为读多写少的场景所设计,Znode并不是用来存储大规模的业务数据,而是用于存储少量的状态和配置信息,每个节点的数据最大不能超过1MB。

    2.3.3 Zookeeper的基本操作

    创建节点:create
    删除节点:delete
    判断节点是否存在:exists
    获得一个节点的数据:getData
    设置一个节点的数据:setData
    获取节点下的所有子节点:getChildren
    其中,exists、getData、setData属于读操作,Zookeeper客户端在请求读操作的时候,可以选择是否选择Watch(观察),也就是因为这,Zookeeper本身就具备观察者模式。

    2.3.4、Zookeeper的事件通知

    我们可以把Watch理解成是注册在Znode上的触发器,当这个Znode发生改变,也就是调用了create、delete和setData方法的时候,将会触发Znode上注册的对应事件,请求Watch的客户端会接收到异步通知。
    客户端调用getData方法,Watch参数是true,服务端接到请求,返回节点数据,并且在对应的哈希表里插入被Watch的Znode路径,以及Watch列表
    在这里插入图片描述
    当被Watch的Znode已删除,服务端会查找哈希表,找到该Znode对应的所有Watcher,异步通知客户端,并且删除哈希表中对应的key-value
    在这里插入图片描述
    在这里插入图片描述

    2.3.5、Zookeeper实现分布式锁

    什么是临时顺序节点?
    Znode分为四种类型:

    • 持久节点 (PERSISTENT) 默认的节点类型。创建节点的客户端与zookeeper断开连接后,该节点依旧存在 。
    • 持久节点顺序节点(PERSISTENT_SEQUENTIAL)
      所谓顺序节点,就是在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号:
    • 临时节点(EPHEMERAL) 和持久节点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除:
    • 临时顺序节点(EPHEMERAL_SEQUENTIAL)
      顾名思义,临时顺序节点结合和临时节点和顺序节点的特点:在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号;当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。

    Zookeeper分布式锁恰恰应用了临时顺序节点。详细步骤如下:
    首先,在Zookeeper当中创建一个持久节点ParentLock。当第一个客户端想要获得锁时,需要在ParentLock这个节点下面创建一个临时顺序节点 Lock1。
    之后,Client1查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock1是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁。
    这时候,如果再有一个客户端 Client2 前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock2。Client2查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock2是不是顺序最靠前的一个,结果发现节点Lock2并不是最小的。于是,Client2向排序仅比它靠前的节点Lock1注册Watcher,用于监听Lock1节点是否存在。这意味着Client2抢锁失败,进入了等待状态。
    这时候,如果又有一个客户端Client3前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock3。Client3查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock3是不是顺序最靠前的一个,结果同样发现节点Lock3并不是最小的。
    于是,Client3向排序仅比它靠前的节点Lock2注册Watcher,用于监听Lock2节点是否存在。这意味着Client3同样抢锁失败,进入了等待状态
    这样一来,Client1得到了锁,Client2监听了Lock1,Client3监听了Lock2。这恰恰形成了一个等待队列,很像是Java当中ReentrantLock所依赖的AQS(AbstractQueuedSynchronizer)。
    在这里插入图片描述

    2.3.6、redis锁与zookeeper锁的区别:

    Redis分布式锁,需要自己不断去尝试获取锁,比较消耗性能
    ZooKeeper分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小
    如果Redis获取锁的那个客户端挂了,那么只能等待超时时间之后才能释放锁
    而对于ZooKeeper,因为创建的是临时znode,只要客户端挂了,znode就没了,此时就自动释放锁

    展开全文
  • 分布式协调技术

    千次阅读 2018-04-16 10:47:11
    一、分布式协调技术在给大家介绍ZooKeeper之前先来给大家介绍一种技术——分布式协调技术。那么什么是分布式协调技术?那么我来告诉大家,其实分布式协调技术 主要用来解决分布式环境当中多个进程之间的同步控制,让...
  • 分布式环境的特点 分布性 并发性 程序运行过程中,并发性操作是很常见的,比如同一个分布式系统中的多个节点,同时访问一个共享资源,数据库、分布式存储 无序性 进程直接的消息通信,会出现顺序不一致的问题 ...
  • 博文中的内容来源《从Paxos到Zookeeper 分布式一致性原理与实践》这一本书,感激不尽。
  • 分布式节点协调实现方式

    千次阅读 2017-08-15 15:35:39
    分布式系统是多个节点通过网络连接再一起并通过消息的传递来进行协调的系统。通常节点间的协调与控制主要是通过以下几种方式完成的。   一、硬件负载均衡   这是一个远程通讯过程,请求发起方和请求...
  • 分布式协调与同步算法分布式技术的发展:分布式系统的指标集中式算法分布式算法令牌环算法 分布式技术的发展: 单机模式 所有应用程序和数据均部署在一台电脑或服务器上,由一台计算机完成所有的处理。模式的好处...
  • 在 2006 年,Google 发表了一篇名为 The Chubby lock service for loosely-coupled distributed systems 的论文,其中描述了一个分布式锁服务 Chubby 的设计理念和实现原理;作为 Google 内部的一个基础服务,虽然 ...
  • 分布式协调/通知 对于一个在多台机器上部署运行的应用而言,通常需要一个协调者(Coordinator)来控制整个系统的运行流程。ZooKeeper 中特有的 Watcher 注册与异步通知机制,能够很好地实现分布式环境下不同机器。...
  • 什么是分布式协调技术 分布式协调技术主要用来解决分布式环境中多个进程之间的同步控制, 让他们有序的去访问某种临界资源, 防止造成脏数据的产生 在这图中有三台机器, 每台机器各跑一个应用程序, 然后我们将这...
  • zookeeper是一个分布式的,开放源码的协调服务,是Hadoop和Hbase重要组件,它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护,域名服务,分布式同步,组服务等。 zookeeper下的角色 zookeeper下...
  • 目录zookeeper zookeeper 一个分布式的中间件, 待续…突然有事
  • ClickHouse需要依赖ZooKeeper使用,本文对ZooKeeper做简单介绍。 一、ZooKeeper定义 ZooKeeper译名为“动物园管理员”。...ZooKeeper是一个开源的分布式协调服务。分布式应用程序可以基于 ZooKeeper实现如数据发布...
  • zookeeper分布式协调服务的使用一

    千次阅读 2016-04-01 15:20:38
    Zookeeper是一个高性能,分布式的应用协调服务。 提供服务: 1、集群成员的管理(Group Membership) 2、分布式锁(Locking) 3、选主(Leader Election) 4、同步(Synchronization) 5、发布/订阅(Publisher/...
  • Zookeeper的数据模型 Zookeeper的数据模型是什么样子呢?它很像数据结构当中的树,也很像文件系统的目录。 树是由节点所组成,Zookeeper的数据存储也同样是基于节点,这种节点叫做Znode。 ...
  • 文章目录分布式协调服务的存在意义leader 选举负载均衡ZooKeeper 数据模型层级命名空间WatcherSessionZooKeeper 基本架构ZooKeeper 应用leader 选举分布式队列负载均衡 在分布式系统中,服务之间的协调是非常重要的...
  • zookeeper则是在Hadoop中大放光彩的分布式协调服务,提供了分布式锁,数据同步,等服务。 从功能上看,二者都可以很好的完成集群中分布式中遇到的同步,配置问题,但是不可否认,这2种服务在设计的时候的目的不同...
  • 分布式协调器ZooKeeper3.4—概述

    千次阅读 2013-02-01 09:08:42
    【ZooKeeper是Apache Hadoop下的开源软件,是一个分布式协调器,本文来自于Zookeeper的官方网站,地址为:http://zookeeper.apache.org/doc/r3.4.5/zookeeperOver.html)】 ZooKeeper: A Distributed ...
1 2 3 4 5 ... 20
收藏数 99,403
精华内容 39,761
热门标签
关键字:

分布式协调