精华内容
下载资源
问答
  • jetcd error

    2020-11-20 21:55:34
    I am trying to use jetcd. However, it needs a lot of dependency jar files. I have downloaded/included all slf4j-1.7.25, protobuf-java-3.2.0, grpc-java-1.0.0,netty-4.1.9.Final JAR files. But, still ...
  • <div><p>This enables users to specify a namespace when building a jetcd client. <p>Fixes #472</p><p>该提问来源于开源项目:etcd-io/jetcd</p></div>
  • Jetcd on android

    2020-12-04 12:43:40
    <div><p>Hello! It appears the grpc-netty is hardcoded at core, but grpc-...Is there a chance to use jetcd on Android? Could anyone make it work?</p><p>该提问来源于开源项目:etcd-io/jetcd</p></div>
  • jetcd locking problem

    2020-11-20 21:59:15
    m using jetcd in my project to do lock/unlock of etcd key before doing an update of the value. Verified from the code and logs that everytime a lock is done, an unlock of the key also is done. ...
  • project name about jetcd

    2020-11-20 21:55:29
    <div><p>There is another java project named jetcd on github and it is a java client for etcd v2. <p>Do we change to another name? like jetcd3?</p><p>该提问来源于开源项目:etcd-io/jetcd</p></div>
  • jetcd-0.2.0.zip

    2019-10-13 17:02:56
    jetcd.zip,Java客户端for ETCDJava客户端for ETCD
  • <p>And does jetcd use Travis? <p>We've moved all etcd projects to https://travis-ci.com/etcd-io.</p> <p>Please let me know if you need access to Travis CI.</p><p>该提问来源于开源项目:etcd-...
  • <div><p>Could anyone tell me jetcd support etcd version limit ? I use etcd version v3.3.8, and use jetcd v0.0.2, connect failed</p><p>该提问来源于开源项目:etcd-io/jetcd</p></div>
  • discuss: jetcd 0.0.1 release

    2020-11-20 21:55:43
    <div><p>I want to discuss what&#...s needed be done for jetcd 0.0.1 release? Maven Central? Github Release? Uber Jar somewhere? and etc? <p>cc/ </p><p>该提问来源于开源项目:etcd-io/jetcd</p></div>
  • But I found that the jetcd code does not allow users to extend. <p><code>io.etcd.jetcd.ClientConnectionManager#defaultChannelBuilder</code>: <pre><code>java protected ManagedChannelBuilder>...
  • <div><p>will the kvclient and watchclient will reconnect after etcd server reboot?</p><p>该提问来源于开源项目:etcd-io/jetcd</p></div>
  • Jetcd 是 etcd 的简单 Java 客户端开发包。etcd 是 CoreOS 中的高可用 Key/Value 存储和 示例代码: EtcdClient client = new EtcdClient(URI.create("http://127.0.0.1:4001/")); String key = "/watch"; ...
  • Missing com.coreos.jetcd.api

    2020-11-20 21:55:35
    <div><p>Importing jetcd to intellij IDEA cause missing com.coreos.jetcd.api Anybody faced same issue?</p><p>该提问来源于开源项目:etcd-io/jetcd</p></div>
  • m trying to get jetcd working in a poject that runs in Karaf. I'm currently working primarily with the 0.0.1 build and I'm hoping somebody can help me out. Basically, in Karaf, the jetcd ...
  • <div><p>com.coreos.jetcd.Watch.Watcher#listen will block thread, I have many watch to listen , How to support callback on jetcd0.0.1 ?</p><p>该提问来源于开源项目:etcd-io/jetcd</p></div>
  • <div><p>add jetcd-simple-ctl example to illustrate how to build a simple ctl of GET, PUT, and WATCH using jetcd client.</p><p>该提问来源于开源项目:etcd-io/jetcd</p></div>
  • <div><p>I use jetcd with etcd version 3.2.1. Here is my scala code snippet: <pre><code> val host = "http://10.110.27.6:2280" //a member of etcd cluster val client = ClientBuilder....
  • 本文与您一起,用jetcd实现对etcd的基本操作

    欢迎访问我的GitHub

    这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

    系列文章链接

    1. jetcd实战之一:极速体验
    2. jetcd实战之二:基本操作
    3. jetcd实战之三:进阶操作(事务、监听、租约)

    本篇概览

    本文是《jetcd实战系列》的第二篇,经过前面的准备,我们有了可用的etcd集群环境和gradle父工程,并且写了个helloworld程序连接etcd简单体验了一番,今天的实战咱们聚焦那些常用的etcd操作,例如写、读、删除等,这些操作可以覆盖到日常大部分场景,本文主要有以下几部分组成:

    1. 编写接口类EtcdService.java,定义常用的etcd操作;
    2. 编写接口类的实现EtcdServiceImpl.java,这里面主要是调用jetcd提供的API来完成具体的etcd操作;
    3. 编写单元测试类EtcdServiceImplTest.java,这里面有很多测试方法,来演示如何使用EtcdService的接口来实现各种复杂的操作;

    源码下载

    • 本篇实战中的完整源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
    名称 链接 备注
    项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
    git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
    git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
    • 这个git项目中有多个文件夹,kubebuilder相关的应用在jetcd-tutorials文件夹下,如下图红框所示:
      在这里插入图片描述

    • jetcd-tutorials文件夹下有多个子项目,本篇的是base-operate
      在这里插入图片描述

    新建子模块base-operate

    • 在父工程jetcd-tutorials下新增名为base-operate的Gradle子模块,其build.gradle文件内容如下:
    plugins {
        id 'java-library'
    }
    
    // 子模块自己的依赖
    dependencies {
        api 'io.etcd:jetcd-core'
        api 'org.projectlombok:lombok'
        // annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor
        annotationProcessor 'org.projectlombok:lombok'
        // slf4j的包自己用就行了,不要继承到其他工程中去,否则容易和其他日志包起冲突
        implementation 'org.slf4j:slf4j-log4j12'
        testImplementation('org.junit.jupiter:junit-jupiter')
    }
    
    test {
        useJUnitPlatform()
    }
    
    • 新增接口EtcdService.java,这里面定义了常用的etcd操作:
    package com.bolingcavalry.dao;
    
    import io.etcd.jetcd.Response.Header;
    import io.etcd.jetcd.kv.GetResponse;
    import io.etcd.jetcd.options.DeleteOption;
    import io.etcd.jetcd.options.GetOption;
    
    /**
     * @Description: Etcd操作服务的接口
     * @author: willzhao E-mail: zq2599@gmail.com
     * @date: 2021/3/30 7:55
     */
    public interface EtcdService {
    
        /**
         * 写入
         * @param key
         * @param value
         */
        Header put(String key, String value) throws Exception;
    
        /**
         * 读取
         * @param key
         * @return
         */
        String getSingle(String key) throws Exception;
    
    
        /**
         * 带额外条件的查询操作,例如前缀、结果排序等
         * @param key
         * @param getOption
         * @return
         */
        GetResponse getRange(String key, GetOption getOption) throws Exception;
    
        /**
         * 单个删除
         * @param key
         * @return
         */
        long deleteSingle(String key) throws Exception;
    
        /**
         * 范围删除
         * @param key
         * @param deleteOption
         * @return
         */
        long deleteRange(String key, DeleteOption deleteOption) throws Exception;
    
        /**
         * 关闭,释放资源
         */
        void close();
    }
    
    
    • 新增上述接口对应的实现类,可见大多数是直接调用jetcd提供的API:
    package com.bolingcavalry.dao.impl;
    
    import com.bolingcavalry.dao.EtcdService;
    import io.etcd.jetcd.ByteSequence;
    import io.etcd.jetcd.Client;
    import io.etcd.jetcd.KV;
    import io.etcd.jetcd.Response;
    import io.etcd.jetcd.kv.GetResponse;
    import io.etcd.jetcd.options.DeleteOption;
    import io.etcd.jetcd.options.GetOption;
    
    import static com.google.common.base.Charsets.UTF_8;
    
    /**
     * @Description: etcd服务的实现类
     * @author: willzhao E-mail: zq2599@gmail.com
     * @date: 2021/3/30 8:28
     */
    public class EtcdServiceImpl implements EtcdService {
    
    
    
        private Client client;
    
        private String endpoints;
    
        private Object lock = new Object();
    
        public EtcdServiceImpl(String endpoints) {
            super();
            this.endpoints = endpoints;
        }
    
        /**
         * 将字符串转为客户端所需的ByteSequence实例
         * @param val
         * @return
         */
        public static ByteSequence bytesOf(String val) {
            return ByteSequence.from(val, UTF_8);
        }
    
        /**
         * 新建key-value客户端实例
         * @return
         */
        private KV getKVClient(){
    
            if (null==client) {
                synchronized (lock) {
                    if (null==client) {
    
                        client = Client.builder().endpoints(endpoints.split(",")).build();
                    }
                }
            }
    
            return client.getKVClient();
        }
    
        @Override
        public void close() {
            client.close();
            client = null;
        }
    
        @Override
        public Response.Header put(String key, String value) throws Exception {
            return getKVClient().put(bytesOf(key), bytesOf(value)).get().getHeader();
        }
    
        @Override
        public String getSingle(String key) throws Exception {
            GetResponse getResponse = getKVClient().get(bytesOf(key)).get();
    
            return getResponse.getCount()>0 ?
                   getResponse.getKvs().get(0).getValue().toString(UTF_8) :
                   null;
        }
    
        @Override
        public GetResponse getRange(String key, GetOption getOption) throws Exception {
            return getKVClient().get(bytesOf(key), getOption).get();
        }
    
        @Override
        public long deleteSingle(String key) throws Exception {
            return getKVClient().delete(bytesOf(key)).get().getDeleted();
        }
    
        @Override
        public long deleteRange(String key, DeleteOption deleteOption) throws Exception {
            return getKVClient().delete(bytesOf(key), deleteOption).get().getDeleted();
        }
    }
    
    • 看到这里,您一定觉得太easy了,确实,调用上述方法就能轻松完成常用的读写操作,但很多时候咱们的操作并非对指定的key做读写那么简单,例如按前缀查询、只返回数量不返回数据、批量删除直到指定的key出现为止,其实只要用好EtcdService提供的那几个接口,上述复杂操作都能轻松完成;
    • 接下来咱们通过单元测试来逐一体验EtcdService提供的那几个接口,并尝试完成各种复杂操作;

    编写单元测试用例

    • 新增单元测试类EtcdServiceImplTest,如下图所示,为了让其内部的方法按我们指定的顺序执行,记得给类添加注解@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
      在这里插入图片描述
    • 如下图红框,默认使用Gradle作为测试工具,这里请改成红框中的IntelliJ IDEA,这样单元测试代码中的Order、DisplayName等注解才能生效:
      在这里插入图片描述
    • 接下来开始在EtcdServiceImplTest中写代码,先写个key方法,这里面用当前时间和输入的字符串拼接成一个独一无二的字符串,可以作为后面的测试是的key(或者key前缀):
    	private static String key(String name) {
            return "/EtcdServiceImplTest/" + name + "-" + System.currentTimeMillis();
        }
    
    • 定义EtcdServiceImp实例作为静态变量,后面的测试中都会用到,另外还要在测试结束时关闭客户端连接:
    	private static EtcdService etcdService = new EtcdServiceImpl();
    
        @AfterAll
        static void close() {
            etcdService.close();
        }
    
    • 接下来开始体验etcd的基本操作;

    基本写操作

    • 写操作非常简单,就是调用put方法传入key和value,至于验证,在开始读操作之前先简单点,确认header非空即可:
        @Test
        @Order(1)
        @DisplayName("基本写操作")
        void put() throws Exception {
            Response.Header header = etcdService.put(key("put"), "123");
            assertNotNull(header);
        }
    

    读操作

    • 先测试最基本的读操作,用getSingle方法可以返回单个结果:
        @Test
        @Order(2)
        @DisplayName("基本读操作")
        void getSingle() throws Exception {
            String key = key("getSingle");
            String value = String.valueOf(System.currentTimeMillis());
    
            // 先写入
            etcdService.put(key, value);
    
            // 再读取
            String queryRlt = etcdService.getSingle(key);
    
            assertEquals(value, queryRlt);
        }
    
    • 接下来借助GetOption对象,我们可以进行跟多复杂的读操作,先看如何通过前缀查询多个键值对:
        @Test
        @Order(3)
        @DisplayName("读操作(指定前缀)")
        void getWithPrefix() throws Exception {
            String prefix = key("getWithPrefix");
    
            // 先写入十条
            int num = 10;
    
            for (int i=0;i<num;i++) {
                // 写入,每个key都不同
                etcdService.put(prefix + i, String.valueOf(i));
            }
    
            // 带前缀的方式查询,注意要入参key和prefix是同一个值
            GetOption getOption = GetOption.newBuilder().withPrefix(EtcdServiceImpl.bytesOf(prefix)).build();
            GetResponse getResponse = etcdService.getRange(prefix, getOption);
    
            // 总数应该是十个
            assertEquals(num, getResponse.getCount());
        }
    
    • 假设总共有十条结果,还可以控制只返回五条记录(不过总数字段还是十):
        @Test
        @Order(4)
        @DisplayName("读操作(指定KeyValue结果数量)")
        void getWithLimit() throws Exception {
            String prefix = key("getWithLimit");
    
            // 先写入十条
            int num = 10;
            int limit = num/2;
    
            for (int i=0;i<num;i++) {
                // 写入,每个key都不同
                etcdService.put(prefix + i, String.valueOf(i));
            }
    
            // 带前缀的方式查询,查出来应该是十个,再加上数量限制为五个
            GetOption getOption = GetOption.newBuilder()
                    .withPrefix(EtcdServiceImpl.bytesOf(prefix))
                    .withLimit(limit)
                    .build();
    
            GetResponse getResponse = etcdService.getRange(prefix, getOption);
    
            // 总数还是十个
            assertEquals(num, getResponse.getCount());
            // 结果的数量和limit有关,是5个
            assertEquals(limit, getResponse.getKvs().size());
        }
    
    • revision字段是etcd的全局版本号,每次写入都会对应一个revision值,可以用该revision值作为查询条件,查到指定key过往的某个版本的值:
        @Test
        @Order(5)
        @DisplayName("读操作(指定revision)")
        void getWithRevision() throws Exception {
            String key = key("getWithRevision");
    
            // 先写入十条
            int num = 10;
            int limit = num/2;
    
            // 第一次写入时的revision
            long firstRevision = 0L;
    
            // 第一次写入的value
            String firstValue = null;
    
            // 最后一次写入的value
            String lastValue = null;
    
            for (int i=0;i<num;i++) {
                // 用同一个key写十次,每次的value都不同
                String value = String.valueOf(i);
                // 注意,key一直没有变化
                Response.Header header = etcdService.put(key, value);
    
                // 第一次写入的revision和value都保存下来,后面用revision取出值,和value对比应该相等
                if (0==i) {
                    firstRevision = header.getRevision();
                    firstValue = value;
                } else if ((num-1)==i) {
                    // 将最后一次写入的value记录下来
                    lastValue = value;
                }
            }
    
    
            // 记录下来的第一次写入的值和最后一次写入的值应该不等
            assertNotEquals(firstValue, lastValue);
    
            // 如果不带其他条件只用key查找,查出的值应该等于最后一次写入的
            assertEquals(lastValue, etcdService.getSingle(key));
    
            // 查询条件中指定第一次写入的revision
            GetOption getOption = GetOption.newBuilder()
                                  .withRevision(firstRevision)
                                  .build();
    
            GetResponse getResponse = etcdService.getRange(key, getOption);
    
            // 总数是一个
            assertEquals(1, getResponse.getCount());
    
            // 结果的value应该和前面记录的第一次写入的值相等
            assertEquals(firstValue, getResponse.getKvs().get(0).getValue().toString(UTF_8));
        }
    
    • 当前查询结果有多个时,还可以对结果进行排序,key或者value都能用作排序字段,并且可以选择升序还是降序:
        @Test
        @Order(6)
        @DisplayName("读操作(结果排序)")
        void getWithOrder() throws Exception {
            String prefix = key("getWithOrder");
    
            // 先写入十条,每一条的key都不同,value也不同
            int num = 10;
    
            // 第一次写的key
            String firstKey = null;
            // 第一次写的value
            String firstValue = null;
            // 最后一次写的key
            String lastKey = null;
            // 最后一次写的value
            String lastValue = null;
    
            for (int i=0;i<num;i++) {
                String key = prefix + i;
                String value = String.valueOf(i);
                // 写入,每个key都不同
                etcdService.put(key, value);
    
                // 把第一次写的key、value,最后一次写的key、value保存到对应的变量中
                if(0==i) {
                    firstKey = key;
                    firstValue = value;
                } else if((num-1)==i) {
                    lastKey = key;
                    lastValue = value;
                }
            }
    
    
            // 第一次查询,结果用key排序,从大到小
            GetOption getOption = GetOption.newBuilder()
                                    .withPrefix(EtcdServiceImpl.bytesOf(prefix))
                                    .withSortField(GetOption.SortTarget.KEY)
                                    .withSortOrder(GetOption.SortOrder.DESCEND)
                                    .build();
    
            GetResponse getResponse = etcdService.getRange(prefix, getOption);
    
            // 总数还是十个
            assertEquals(num, getResponse.getCount());
    
            // 取查询结果的第一条
            KeyValue firstResult = getResponse.getKvs().get(0);
    
            // 因为是从大到小,查询结果的第一条应该是最后一次写入的(key是lastKey,value是lastValue)
            assertEquals(lastKey, firstResult.getKey().toString(UTF_8));
            assertEquals(lastValue, firstResult.getValue().toString(UTF_8));
    
    
            // 第二次查询,结果用key排序,从小到大
            getOption = GetOption.newBuilder()
                        .withPrefix(EtcdServiceImpl.bytesOf(prefix))
                        .withSortField(GetOption.SortTarget.KEY)
                        .withSortOrder(GetOption.SortOrder.ASCEND)
                        .build();
    
            getResponse = etcdService.getRange(prefix, getOption);
    
            // 总数还是十个
            assertEquals(num, getResponse.getCount());
    
            // 取查询结果的第一条
            firstResult = getResponse.getKvs().get(0);
    
            // 因为是从小到大,查询结果的第一条应该是第一次写入的(key是firstKey,value是firstValue)
            assertEquals(firstKey, firstResult.getKey().toString(UTF_8));
            assertEquals(firstValue, firstResult.getValue().toString(UTF_8));
        }
    
    • 指定返回结果中只有key没有value:
        @Test
        @Order(7)
        @DisplayName("读操作(只返回key)")
        void getOnlyKey() throws Exception {
            String key = key("getOnlyKey");
            // 写入一条记录
            etcdService.put(key, String.valueOf(System.currentTimeMillis()));
    
            // 查询条件中指定只返回key
            GetOption getOption = GetOption.newBuilder()
                                .withKeysOnly(true)
                                .build();
    
            GetResponse getResponse = etcdService.getRange(key, getOption);
    
            assertEquals(1, getResponse.getCount());
    
            KeyValue keyValue = getResponse.getKvs().get(0);
    
            assertNotNull(keyValue);
    
            assertEquals(key, keyValue.getKey().toString(UTF_8));
    
            // value应该是空的
            assertTrue(keyValue.getValue().isEmpty());
        }
    
    • 返回的结果中只有数量,不包含key和value:
        @Test
        @Order(8)
        @DisplayName("读操作(只返回数量)")
        void getOnlyCount() throws Exception {
            String key = key("getOnlyCount");
            // 写入一条记录
            etcdService.put(key, String.valueOf(System.currentTimeMillis()));
    
            // 查询条件中指定只返回key
            GetOption getOption = GetOption.newBuilder()
                                .withCountOnly(true)
                                .build();
    
            GetResponse getResponse = etcdService.getRange(key, getOption);
    
            // 数量应该是1
            assertEquals(1, getResponse.getCount());
    
            // KeyValue应该是空的
            assertTrue(getResponse.getKvs().isEmpty());
        }
    
    • 假设etcd有三个key:a1、a2、a3,那么通过前缀a可以将这三个key都查出来,与此同时还可以再加个endKey查询条件,假设endKey等于a2,那么查找工作在查到a2时就会停止并返回,而返回值中只有a1,不包含a2,换言之endKey之前的值才会被返回
        @Test
        @Order(9)
        @DisplayName("读操作(查到指定key就结束)")
        void getWithEndKey() throws Exception {
            String prefix = key("getWithEndKey");
            String endKey = null;
    
            int num = 10;
    
            for (int i=0;i<num;i++) {
                String key = prefix + i;
                // 写入,每个key都不同
                etcdService.put(key, String.valueOf(i));
    
                // 总共写入十条记录,把第九条的key作为endKey保存
                if ((num-2)==i) {
                    endKey = key;
                }
            }
    
            // 查询条件中指定了endKey是上面写入的第九条记录的key
            // 注意,查询结果中不包含endKey那条记录,也就是说只返回前八条
            GetOption getOption = GetOption.newBuilder()
                    .withRange(EtcdServiceImpl.bytesOf(endKey))
                    .build();
    
            GetResponse getResponse = etcdService.getRange(prefix, getOption);
    
            // 注意,查询结果中不包含endKey那条记录,也就是说只返回前八条
            assertEquals(num-2, getResponse.getCount());
        }
    
    • 以上就是读操作的常见用法,接下来看删除;

    删除操作

    • 最基本的删除就是调用deleteSingle方法:
        @Test
        @Order(10)
        @DisplayName("单个删除")
        void deleteSingle() throws Exception {
            String key = key("deleteSingle");
    
            // 写入一条记录
            etcdService.put(key, String.valueOf(System.currentTimeMillis()));
    
            // 此时应该能查到
            assertNotNull(etcdService.getSingle(key));
    
            // 删除
            etcdService.deleteSingle(key);
    
            // 此时应该查不到了
            assertNull(etcdService.getSingle(key));
        }
    
    • 借助DeleteOption对象,可以实现更多类型的删除,下面是删除指定前缀的所有记录:
       @Test
        @Order(11)
        @DisplayName("删除(指定前缀)")
        void deleteWithPrefix() throws Exception {
            String prefix = key("deleteWithPrefix");
    
            int num = 10;
    
            // 写入,每个key都不同,但是有相同的前缀
            for (int i=0;i<num;i++) {
                etcdService.put(prefix + i, String.valueOf(i));
            }
    
            GetOption getOption = GetOption.newBuilder()
                                    .withPrefix(EtcdServiceImpl.bytesOf(prefix))
                                    .build();
    
            // 此时总数应该是十
            assertEquals(num, etcdService.getRange(prefix, getOption).getCount());
    
            // 删除条件是指定前缀
            DeleteOption deleteOption = DeleteOption.newBuilder()
                                        .withPrefix(EtcdServiceImpl.bytesOf(prefix))
                                        .build();
    
            // 删除
            etcdService.deleteRange(prefix, deleteOption);
    
            // 删除后再查,总数应该是0
            assertEquals(0, etcdService.getRange(prefix, getOption).getCount());
        }
    
    • 与读操作的endKey类似,删除操作也有endKey参数,假设etcd有三个key:a1、a2、a3,那么通过前缀a可以将这三个key都删除,与此同时还可以再加个endKey删除条件,假设endKey等于a2,那么删除工作在查到a2时就会停止并返回,被删除的记录只有a1,不包含a2,换言之endKey之前的记录才会被删除
        @Test
        @Order(11)
        @DisplayName("删除(删到指定key就结束)")
        void deleteWithEndKey() throws Exception {
            String prefix = key("deleteWithEndKey");
    
            int num = 10;
            String endKey = null;
    
            // 写入,每个key都不同,但是有相同的前缀
            for (int i=0;i<num;i++) {
                String key = prefix + i;
    
                etcdService.put(key, String.valueOf(i));
    
                // 把第九条记录的key保存在endKey变量中
                if((num-2)==i) {
                    endKey = key;
                }
            }
    
            GetOption getOption = GetOption.newBuilder()
                    .withPrefix(EtcdServiceImpl.bytesOf(prefix))
                    .build();
    
            // 此时总数应该是十
            assertEquals(num, etcdService.getRange(prefix, getOption).getCount());
    
            // 删除条件是指定前缀,并且遇到第九条记录的key就停止删除操作,此时第九条和第十条都不会被删除
            DeleteOption deleteOption = DeleteOption.newBuilder()
                    .withPrefix(EtcdServiceImpl.bytesOf(prefix))
                    .withRange(EtcdServiceImpl.bytesOf(endKey))
                    .build();
    
            // 删除
            etcdService.deleteRange(prefix, deleteOption);
    
            // 删除后再查,总数应该是二
    
            assertEquals(2, etcdService.getRange(prefix, getOption).getCount());
        }
    
    • 至此,编码结束,执行单元测试试;

    执行单元测试

    • 点击下图红框中的按钮,在弹出的菜单中点击Run EtcdServiceImplTest,即可开始单元测试:
      在这里插入图片描述
    • 如下图,单元测试通过:
      在这里插入图片描述
    • 至此,使用jetcd对etcd进行基本操作的实战已经完成,希望能给您的开发带来一些参考,接下来的章节,咱们去操作一些etcd的特性,包括事务、监听、租约;

    你不孤单,欣宸原创一路相伴

    1. Java系列
    2. Spring系列
    3. Docker系列
    4. kubernetes系列
    5. 数据库+中间件系列
    6. DevOps系列

    欢迎关注公众号:程序员欣宸

    微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界…

    展开全文
  • <p>My question is how to use java gRPC and jetcd to discover the list of three service nodes and do rpc call with round robin load balancing. I've tried many ways only to find the NameResolver ...
  • <div><p>same with title</p><p>该提问来源于开源项目:etcd-io/jetcd</p></div>
  • We use jetcd to connect to the Etcd server. After recompiling the code with JDK 11 using jetcd 0.0.2 version jar, it is failing to connect to the Etcd server with the following error. <pre><code> ...
  • <div><p>Why so many errors when I downlod the project and then compile it. Error:(19, 28) java: package com.coreos.jetcd.api do not exists like this: public class UnlockResponse...etcd-io/jetcd</p></div>
  • m using jetcd (version 0.0.1), How to use api: <code>client.Watch("\x02/servers/", clientv3.WithPrefix()) ; <code>client.Watch("\x03/servers/server-a/", clientv3.WithPrefix()) ?</code>...
  • 项目马上要上线了,突然发现预发布环境的 etcd 连不上了。虽然别人催的很紧急,但出于保存现场的目的...结论是:jetcd 在创建 watcher 之后,如果要关掉它,不管是Client.close()、还是watcher.close(),都无法马上...

    项目马上要上线了,突然发现预发布环境的 etcd 连不上了。虽然别人催的很紧急,但出于保存现场的目的,还是详查了一下为啥 etcd 连不上。

    排查结果

    • 排查过程就不详细列举了,就是一个抽丝剥茧的过程。很艰难,其实最后发现真相才觉得过程都不值一提。
    • 结论是:jetcd 在创建 watcher 之后,如果要关掉它,不管是Client.close()、还是watcher.close(),都无法马上关掉连接,然后连接会越来越多,最后把连接数占满。通过 lsof -i:2379 可以看到答案。
    • 于是乎,我去提了一个 ISSUES,截至发稿时,没有人回复。
    • 但是马上要上线了,必须想办法解决一下这个问题。首先在开发的时候,就已经搭建了 jetcd 的环境,随时可以改源码自己出包。咨询了一下熟悉 grpc 的同事,很明显问题是 ManagedChannel 在 shutdown 之后,关不掉这个连接;同事建议我用 shutdownNow,遂改源码试了一下,果然可以。不过为什么 shutdown 之后关不掉,仍然需要详细排查。

    2019-06-01 补充

    • 通过抓包分析,发现使用 shutdown 时,客户端不会主动发起 FIN 请求断开连接,而 shutdownNow 就会。

    结束语

    • etcd 的学习还需要继续努力。
    展开全文
  • <div><p>io.grpc.ManagedChannel#notifyWhenStateChanged not implemention yet on jetcd0.0.1 , but I fount was implemention on master branch, When release next version ?</p><p>该提问来源于开源项目:...
  • <div><p>I have just tried to import this project into Eclipse with M2E (Maven integration) to make a small contribution, and found it to be quite a PITA. Therefore opening this ...etcd-io/jetcd</p></div>
  • <div><p>Need to add apache 2.0 license to all jetcd related files. Do this after all pending 0.0.1 milestone prs have merged.</p><p>该提问来源于开源项目:etcd-io/jetcd</p></div>
  • <div><p>TestNG is removed to unify stack used for testing, <code>ByteSequence.from(<str>, UTF_8)</code> is replaced with more concise <code>bytesOf(<...etcd-io/jetcd</p></div>
  • 软件介绍Jetcd 是 etcd 的简单 Java 客户端开发包。etcd 是 CoreOS 中的高可用 Key/Value 存储和示例代码:EtcdClient client = new EtcdClient(URI.create("http://127.0.0.1:4001/"));String key = "/watch";...

    软件介绍

    Jetcd 是 etcd 的简单 Java 客户端开发包。etcd 是 CoreOS 中的高可用 Key/Value 存储和

    示例代码:

    EtcdClient client = new EtcdClient(URI.create("http://127.0.0.1:4001/"));

    String key = "/watch";

    EtcdResult result = this.client.set(key, "hello");

    Assert.assertEquals("hello", result.value);

    result = this.client.get(key);

    Assert.assertEquals("hello", result.value);

    ListenableFuture watchFuture = this.client.watch(key, result.index + 1);

    Assert.assertFalse(watchFuture.isDone());

    result = this.client.set(key, "world");

    Assert.assertEquals("world", result.value);

    EtcdResult watchResult = watchFuture.get(100, TimeUnit.MILLISECONDS);

    Assert.assertNotNull(result);

    Assert.assertEquals("world", result.value);

    为你推荐:

    展开全文
  • 接着上篇我们来分析下jetcd的其他模块和raft算法如图:我们一个个看首先jetcd-common :都是公共类jetcd-examples:可以作为学习用,里边的代码都是main方法启动的,可以直接跑(看源码的以后也可以这么看—)jetcd-...

    接着上篇我们来分析下jetcd的其他模块和raft算法

    如图:

    我们一个个看

    首先jetcd-common :

    f8d64ea485374af9525fbe859be43322.png

    都是公共类

    jetcd-examples:

    47ec3a89546d4bcef6fc2a7e26549012.png

    可以作为学习用,里边的代码都是main方法启动的,可以直接跑(看源码的以后也可以这么看—)

    jetcd-launcher:

    66f43bb756a8fe22660dfb6a09b5c9d8.png

    里边提供了一个已编程插件的方式启动etcd 服务器,核心代码如下:

    import org.apache.maven.plugins.annotations.Mojo;import org.apache.maven.plugins.annotations.Parameter;@Mojo(name = "start", requiresProject = false, defaultPhase = LifecyclePhase.PRE_INTEGRATION_TEST)public class StartMojo extends AbstractMojo {    private static final Logger LOG = LoggerFactory.getLogger(StartMojo.class);    private static final String[] EMPTY = new String[0];    @Parameter(required = true, defaultValue = "target/jetcd-launcher-maven-plugin/endpoint")    private File endpointFile;    /**     * Additional     * arguments to pass to etcd.     */    @Parameter(required = false)    private String[] additionalArguments;    public StartMojo() {    }    @Override    public void execute() throws MojoExecutionException, MojoFailureException {        Singleton.etcd = EtcdClusterFactory.buildCluster("maven", 1, false,            additionalArguments != null ? additionalArguments : EMPTY);        Singleton.etcd.start();        URI endpoint = Singleton.etcd.getClientEndpoints().get(0);        try {            endpointFile.getParentFile().mkdirs();            Files.asCharSink(endpointFile, US_ASCII).write(endpoint.toString());            LOG.info("{} = {}", endpointFile, endpoint);        } catch (IOException e) {            throw new MojoFailureException("writing file failed", e);        }    }}

    通过apache maven 插件的api 编写指定插件,我们可以看到具体使用来自xxx-test子项目:

     <plugin>                <groupId>${project.groupId}groupId>                <artifactId>jetcd-launcher-maven-pluginartifactId>                <version>${project.version}version>                <configuration>                  <additionalArguments>                    <additionalArgument>--max-txn-opsadditionalArgument>                    <additionalArgument>1024additionalArgument>                  additionalArguments>                configuration>                <executions>                  <execution>                    <id>start-etcdid>                    <phase>process-test-classesphase>                    <goals>                      <goal>startgoal>                    goals>                  execution>                  <execution>                    <id>stop-etcdid>                    <phase>prepare-packagephase>                    <goals>                      <goal>stopgoal>                    goals>                  execution>                executions>            plugin>

    jetcd-resolver:

    f6d87660cac4cf0afb01488112839c96.png

    这个主要是为了处理名称服务的解析,解析uri和对uri的和地址的缓存映射,

    题外话

    单元测试有用到

     await().atMost(15, TimeUnit.SECONDS).untilAsserted(() -> assertThat(ref.get()).isTrue());

    用的是org.awaitility的包,此包主要用来做异步场景下的测试

    此外在jetcd-launcher项目中我们可以看到源码如下:

     public static EtcdCluster buildCluster(        @NonNull String clusterName,        int nodes,        boolean ssl,        @NonNull String image,        List additionalArgs) {        final Network network = Network.builder().id(clusterName).build();        final CountDownLatch latch = new CountDownLatch(nodes);        final AtomicBoolean failedToStart = new AtomicBoolean(false);        final EtcdContainer.LifecycleListener listener = new EtcdContainer.LifecycleListener() {            @Override            public void started(EtcdContainer container) {                latch.countDown();            }            @Override            public void failedToStart(EtcdContainer container, Exception exception) {                LOGGER.error("Exception while starting etcd container: ", exception);                failedToStart.set(true);                latch.countDown();            }            @Override            public void stopped(EtcdContainer container) {            }        };

    之前有说过该项目主要是用来提供继承测试适应云场景,所以可以发现以下的虚拟容器提供etcd继承测试服务端,当然你也可以实现其他的服务端虚拟容器用到自己的项目中

    import org.testcontainers.containers.Network;

    raft:

    log replication:

    这里笔者做一个简单介绍,当你使用的是一个单体应用的时候,你是无需进行数据同步的如图:

    5b213474a62722f803d1e5e05d764d41.png

    这个时候客户端发送请求服务端处理存储即可,那么如果服务端集群模式,那么服务端如何同步,类似如下:

    031407931cd4e9ad4beee652eb28cf82.png

    假设中间的节点是leader,数据发送到leader节点,leader节点需要在第一次性跳时间后发送数据同步请求其他的节点

    97b156f0ebd6ba714f7c79e8d1630072.png

    实际上这里的同步涉及到多数法人的原则,也leader是不会直接提交到本地数据的,而是在第一次心跳间隔之后发送数据同步请求,接收到请求的节点保存数据,返回给leader,当leader接收到了超过半数的成功返回则提交本地事务。否则不提交(当脑裂出现的时候,基数节点的多个leader因为没有多数其他节点的commit 该脑裂节点也不会提交数据,这也就是raft处理脑裂的核心如下图)

    622dc2b025779a1d59fa53dccf432ea0.png

    lead election:

    首先了解一下几个基本的角色:

    leader

    follower

    candidate

    首先所有的节点都是follower的角色,通过选举超时时间之后随机成为候选人

    candidate,然后第一个成为候选人的节点发起请求(投票给自己)给其他节点,如果多数节点返回投票响应,当前follower成为candidate。这里有一个规则,当集群中自己没有成为candidate之前,所有来自其他节点的投票请求都会成功返回,换句话说就是当自己没有发起投票请求之前都会主动投票给其他节点

    整个流程还是比较复杂的,涉及到多leader分片(多个follower同时成为candidate发起投票的情况,raft如何处理),以及如何通过修改election timout 动态控制选举流程等等,因为篇幅限制这里不赘述,日后有时间会给一篇专门分析,实现可以去看nacos的raftCore的流程,这里贴一段初始化代码:

     @PostConstruct    public void init() throws Exception {        Loggers.RAFT.info("initializing Raft sub-system");        executor.submit(notifier);        final long start = System.currentTimeMillis();        raftStore.loadDatums(notifier, datums);        setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));        Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());        while (true) {            if (notifier.tasks.size() <= 0) {                break;            }            Thread.sleep(1000L);        }        initialized = true;        Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));        GlobalExecutor.registerMasterElection(new MasterElection());        GlobalExecutor.registerHeartbeat(new HeartBeat());        Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",            GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);    }

    核心流程在registerMasterElection里边。

    敬请期待后续.....

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 15
收藏数 300
精华内容 120
关键字:

Jetcd