精华内容
下载资源
问答
  • Copycat - CopycatServer
    2017-02-24 16:53:00

    Server被拉起有两种方式,

    Address address = new Address("123.456.789.0", 5000);
    CopycatServer.Builder builder = CopycatServer.builder(address);
    builder.withStateMachine(MapStateMachine::new);

    自己拉起一个cluster,

    CompletableFuture<CopycatServer> future = server.bootstrap();
    future.join();

     

    join到一个现有的cluster,

    Collection<Address> cluster = Collections.singleton(new Address("127.0.0.1", 8700))
    server.join(cluster).join();

     

    CopycatServer.builder.build

    /**
         * @throws ConfigurationException if a state machine, members or transport are not configured
         */
        @Override
        public CopycatServer build() {
          if (stateMachineFactory == null)
            throw new ConfigurationException("state machine not configured");
    
          // If the transport is not configured, attempt to use the default Netty transport.
          if (serverTransport == null) {
            try {
              serverTransport = (Transport) Class.forName("io.atomix.catalyst.transport.netty.NettyTransport").newInstance(); //默认netty
            } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
              throw new ConfigurationException("transport not configured");
            }
          }
    
          // If the client transport is not configured, default it to the server transport.
          if (clientTransport == null) {
            clientTransport = serverTransport;
          }
    
          // If no serializer instance was provided, create one.
          if (serializer == null) {
            serializer = new Serializer(new PooledHeapAllocator());
          }
    
          // Resolve serializable request/response and other types.
          serializer.resolve(new ClientRequestTypeResolver());
          serializer.resolve(new ClientResponseTypeResolver());
          serializer.resolve(new ProtocolSerialization());
          serializer.resolve(new ServerSerialization());
          serializer.resolve(new StorageSerialization());
    
          // If the storage is not configured, create a new Storage instance with the configured serializer.
          if (storage == null) {
            storage = new Storage(); //storage
          }
    
          ConnectionManager connections = new ConnectionManager(serverTransport.client());
          ThreadContext threadContext = new SingleThreadContext(String.format("copycat-server-%s-%s", serverAddress, name), serializer); //单线程的TreadContext,对thread的简单封装,用于执行对statemachine的操作
    
          ServerContext context = new ServerContext(name, type, serverAddress, clientAddress, storage, serializer, stateMachineFactory, connections, threadContext); //封装成ServerContext
          context.setElectionTimeout(electionTimeout)
            .setHeartbeatInterval(heartbeatInterval)
            .setSessionTimeout(sessionTimeout)
            .setGlobalSuspendTimeout(globalSuspendTimeout);
    
          return new CopycatServer(name, clientTransport, serverTransport, context);
        }

     

    CopycatServer.bootstrap

    public CompletableFuture<CopycatServer> bootstrap() {
    return bootstrap(Collections.EMPTY_LIST); //仅仅拉起自己,所以参数是empty list
    }

     

    public CompletableFuture<CopycatServer> bootstrap(Collection<Address> cluster) {
    return start(() -> cluster().bootstrap(cluster));
    }
     
    调用start,
    /**
       * Starts the server.
       */
      private CompletableFuture<CopycatServer> start(Supplier<CompletableFuture<Void>> joiner) {
        if (started)
          return CompletableFuture.completedFuture(this);
    
        if (openFuture == null) {
          synchronized (this) {
            if (openFuture == null) {
              Function<Void, CompletionStage<CopycatServer>> completionFunction = state -> {
                CompletableFuture<CopycatServer> future = new CompletableFuture<>();
                openFuture = null;
                joiner.get().whenComplete((result, error) -> { //处理joiner
                  if (error == null) {
                    if (cluster().leader() != null) {
                      started = true;
                      future.complete(this);
                    } else {
                      electionListener = cluster().onLeaderElection(leader -> {
                        if (electionListener != null) {
                          started = true;
                          future.complete(this);
                          electionListener.close();
                          electionListener = null;
                        }
                      });
                    }
                  } else {
                    future.completeExceptionally(error);
                  }
                });
                return future;
              };
    
              if (closeFuture == null) {
                openFuture = listen().thenCompose(completionFunction); //listen
              } else {
                openFuture = closeFuture.thenCompose(c -> listen().thenCompose(completionFunction));
              }
            }
          }
        }

    start主要做两件事,执行joiner和listen

    joiner这里是cluster().bootstrap(cluster)

    @Override
      public CompletableFuture<Void> bootstrap(Collection<Address> cluster) {
        if (joinFuture != null)
          return joinFuture;
    
        if (configuration == null) {
          if (member.type() != Member.Type.ACTIVE) {
            return Futures.exceptionalFuture(new IllegalStateException("only ACTIVE members can bootstrap the cluster"));
          } else {
            // Create a set of active members.
            Set<Member> activeMembers = cluster.stream()
              .filter(m -> !m.equals(member.serverAddress()))
              .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated()))
              .collect(Collectors.toSet());
    
            // Add the local member to the set of active members.
            activeMembers.add(member);
    
            // Create a new configuration and store it on disk to ensure the cluster can fall back to the configuration.
            configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers));
          }
        }
        return join();
      }

     

    listen

    /**
       * Starts listening the server.
       */
      private CompletableFuture<Void> listen() {
        CompletableFuture<Void> future = new CompletableFuture<>();
        context.getThreadContext().executor().execute(() -> {
          internalServer.listen(cluster().member().serverAddress(), context::connectServer).whenComplete((internalResult, internalError) -> { //internalServer可能是local或是netty
            if (internalError == null) {
              // If the client address is different than the server address, start a separate client server.
              if (clientServer != null) {
                clientServer.listen(cluster().member().clientAddress(), context::connectClient).whenComplete((clientResult, clientError) -> { //和client沟通可能是不同的地址
                  started = true;
                  future.complete(null);
                });
              } else {
                started = true;
                future.complete(null);
              }
            } else {
              future.completeExceptionally(internalError);
            }
          });
        });
    
        return future;
      }

    ServerContext

    /**
       * Handles a connection from a client.
       */
      public void connectClient(Connection connection) {
        threadContext.checkThread();
    
        // Note we do not use method references here because the "state" variable changes over time.
        // We have to use lambdas to ensure the request handler points to the current state.
        connection.handler(RegisterRequest.class, request -> state.register(request));
        connection.handler(ConnectRequest.class, request -> state.connect(request, connection));
        connection.handler(KeepAliveRequest.class, request -> state.keepAlive(request));
        connection.handler(UnregisterRequest.class, request -> state.unregister(request));
        connection.handler(CommandRequest.class, request -> state.command(request));
        connection.handler(QueryRequest.class, request -> state.query(request));
    
        connection.closeListener(stateMachine.executor().context().sessions()::unregisterConnection);
      }
    
      /**
       * Handles a connection from another server.
       */
      public void connectServer(Connection connection) {
        threadContext.checkThread();
    
        // Handlers for all request types are registered since requests can be proxied between servers.
        // Note we do not use method references here because the "state" variable changes over time.
        // We have to use lambdas to ensure the request handler points to the current state.
        connection.handler(RegisterRequest.class, request -> state.register(request));
        connection.handler(ConnectRequest.class, request -> state.connect(request, connection));
        connection.handler(KeepAliveRequest.class, request -> state.keepAlive(request));
        connection.handler(UnregisterRequest.class, request -> state.unregister(request));
        connection.handler(PublishRequest.class, request -> state.publish(request));
        connection.handler(ConfigureRequest.class, request -> state.configure(request));
        connection.handler(InstallRequest.class, request -> state.install(request));
        connection.handler(JoinRequest.class, request -> state.join(request));
        connection.handler(ReconfigureRequest.class, request -> state.reconfigure(request));
        connection.handler(LeaveRequest.class, request -> state.leave(request));
        connection.handler(AppendRequest.class, request -> state.append(request));
        connection.handler(PollRequest.class, request -> state.poll(request));
        connection.handler(VoteRequest.class, request -> state.vote(request));
        connection.handler(CommandRequest.class, request -> state.command(request));
        connection.handler(QueryRequest.class, request -> state.query(request));
    
        connection.closeListener(stateMachine.executor().context().sessions()::unregisterConnection);
      }

     

    加入一个cluster

    public CompletableFuture<CopycatServer> join(Collection<Address> cluster) {
        return start(() -> cluster().join(cluster));
      }

    ClusterState.join,这里的逻辑和bootstrap类似

    @Override
      public synchronized CompletableFuture<Void> join(Collection<Address> cluster) {
    
        // If no configuration was loaded from disk, create a new configuration.
        if (configuration == null) { //当不存在configuration
          // Create a set of cluster members, excluding the local member which is joining a cluster.
          Set<Member> activeMembers = cluster.stream()
            .filter(m -> !m.equals(member.serverAddress())) //过滤掉自己
            .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated())) //创建ServerMember对象
            .collect(Collectors.toSet());
    
          // If the set of members in the cluster is empty when the local member is excluded,
          // fail the join.
          if (activeMembers.isEmpty()) { //如果cluster为空
            return Futures.exceptionalFuture(new IllegalStateException("cannot join empty cluster"));
          }
    
          // Create a new configuration and configure the cluster. Once the cluster is configured, the configuration
          // will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary.
          configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers)); //让configuration生效
        }
        return join();
      }

    只是需要初始化configuration

    然后调用join

    /**
       * Starts the join to the cluster.
       */
      private synchronized CompletableFuture<Void> join() {
        joinFuture = new CompletableFuture<>();
    
        context.getThreadContext().executor().execute(() -> {
          // Transition the server to the appropriate state for the local member type.
          context.transition(member.type()); //将当前member transitioin到指定type
    
          // Attempt to join the cluster. If the local member is ACTIVE then failing to join the cluster
          // will result in the member attempting to get elected. This allows initial clusters to form.
          List<MemberState> activeMembers = getActiveMemberStates();
          if (!activeMembers.isEmpty()) {
            join(getActiveMemberStates().iterator()); //join 其他active members
          } else {
            joinFuture.complete(null);
          }
        });
    
        return joinFuture.whenComplete((result, error) -> joinFuture = null);
      }

     

    /**
       * Recursively attempts to join the cluster.
       */
      private void join(Iterator<MemberState> iterator) {
        if (iterator.hasNext()) {
          cancelJoinTimer();
          joinTimeout = context.getThreadContext().schedule(context.getElectionTimeout().multipliedBy(2), () -> {
            join(iterator); //只要不成功,就会一直递归schedule join
          });
    
          MemberState member = iterator.next();
          LOGGER.debug("{} - Attempting to join via {}", member().address(), member.getMember().serverAddress());
    
          context.getConnections().getConnection(member.getMember().serverAddress()).thenCompose(connection -> {
            JoinRequest request = JoinRequest.builder()
              .withMember(new ServerMember(member().type(), member().serverAddress(), member().clientAddress(), member().updated()))
              .build();
            return connection.<JoinRequest, JoinResponse>send(request); //发送join request
          }).whenComplete((response, error) -> {
            // Cancel the join timer.
            cancelJoinTimer(); //先cancel join timer
    
            if (error == null) { //join 成功
              if (response.status() == Response.Status.OK) {
                LOGGER.info("{} - Successfully joined via {}", member().address(), member.getMember().serverAddress());
    
                Configuration configuration = new Configuration(response.index(), response.term(), response.timestamp(), response.members());
    
                // Configure the cluster with the join response.
                // Commit the configuration as we know it was committed via the successful join response.
                configure(configuration).commit(); //更新配置
    
    
              } else if (response.error() == null || response.error() == CopycatError.Type.CONFIGURATION_ERROR) {
                // If the response error is null, that indicates that no error occurred but the leader was
                // in a state that was incapable of handling the join request. Attempt to join the leader
                // again after an election timeout.
                LOGGER.debug("{} - Failed to join {}", member().address(), member.getMember().address());
                resetJoinTimer();
              } else {
                // If the response error was non-null, attempt to join via the next server in the members list.
                LOGGER.debug("{} - Failed to join {}", member().address(), member.getMember().address());
                join(iterator);
              }
            } else {
              LOGGER.debug("{} - Failed to join {}", member().address(), member.getMember().address());
              join(iterator);
            }
          });
        }
        // If join attempts remain, schedule another attempt after two election timeouts. This allows enough time
        // for servers to potentially timeout and elect a leader.
        else {
          LOGGER.debug("{} - Failed to join cluster, retrying...", member.address());
          resetJoinTimer(); //如果遍历完还不成功,reset
        }
      }

    对任何一个member,join成功,即可,因为join request无论发给谁,都会forward给leader

    更多相关内容
  • Copycat

    2019-10-25 00:48:53
    Copycat
  • Copycat是一个剪贴板扩展,允许您从web的任何地方复制和粘贴文本和图像。 Copycat是一个免费的剪贴板扩展程序,可让您从网络上的任何位置复制选定的文本和图像,并将这些剪辑存储在我们的扩展程序中。然后,您可以将...
  • CopyCat-开源

    2021-05-01 13:01:27
    CopyCat是一个简单的备份程序,可让您安排常规备份。 通过CopyCat,您可以选择仅备份自上次以来已被修改的文件,并根据需要以RAR格式压缩文件。
  • 运行插件命令Copycat > Create Copycat in Artboard 键入布局名称 将图层样式__copycat/style-name应用于其他画板上的矩形 变更日志 1.1.0-设置出口规模 1.0.1-与Runner集成 1.0.0-初始版本 反馈 将您的反馈和想法...
  • 山寨 Copycat是一个Rails引擎,允许用户编辑实时网站副本。 如何使用 将copycat添加到您的Gemfile中,然后运行... 然后在浏览器中访问/copycat_translations ,使用config/initializers/copycat.rb /copycat_trans
  • CopyCat-crx插件

    2021-04-03 10:07:04
    很久以前有一家公司叫做fireball, 公司里面有一只小猫叫做copycat,它每天都会帮公司订单组同事发订单。 1. 安装插件2. 登陆你的淘宝账户3. 进入我们后台,点击返货4.如果发货的订单在你的淘宝账户里面已经有物流...
  • 这是我正在使用的CopyCat的Java端口。 最初的Java端口是作为applet来实现的,它在某些系统上运行有问题(例如Iceweasel和过时的Java),因此我决定创建一个JAR项目。 该项目需要Maven 2或3才能构建。 它是用Maven 3...
  • Laravel开发-copycat .zip

    2021-10-05 21:42:36
    Laravel开发-copycat .zip
  • copycat:_猫伴侣

    2021-06-22 13:05:03
    山寨 用法 % es master MDmFVH3GQ-ifxG6xm7zsIQ hotel.local 10.0.1.200 Wolverine % es shards | tail -1 foo 98 p STARTED 104575 67.6mb 10.0.1.200 Wolverine % es indices -v health status index pri rep ...
  • Laravel开发-copycat

    2019-08-27 11:38:38
    Laravel开发-copycat 一种通用的刮削工具,可用于各种数据采集。你可以从哪里和想要什么来决定。都有正则表达式。有关Github页面的详细信息。
  • Copycat:我的 LD32 条目

    2021-06-04 21:07:55
    山寨 v1.1,2015 年 4 月 28 日 Terry Cavanagh ( ) 用 Haxe 和 OpenFL 创建,使用 Flashdevelop。
  • 语言:English Warning: Can only detect less than 5000 characters
  • Raft一致性框架_Copycat基础学习(一)

    千次阅读 2017-11-12 13:40:05
    Copycat is a fault-tolerant state machine replication framework. Built on the Raft consensus algorithm, it handles replication and persistence and enforces strict ordering of inputs and outputs,

    Copycat is a fault-tolerant state machine replication framework. Built
    on the Raft consensus algorithm, it handles replication and
    persistence and enforces strict ordering of inputs and outputs,
    allowing developers to focus on single-threaded application logic. Its
    event-driven model allows for efficient client communication with
    replicated state machines, from simple key-value stores to wait-free
    locks and leader elections. You supply the state machine and Copycat
    takes care of the rest, making it easy to build robust, safe
    distributed systems.

    上面一段摘录于Copycat官网的介绍(http://atomix.io/copycat/),那么Copycat 是一个基于Raft一致性算法的编程框架,它能够为分布式应用中的状态提供一致性。本文主要基于Copycat官网给的示例进行学习.

    1.首先在IDE里面创建一个maven工程,并在pom文件中加入依赖:

    <dependency>
        <groupId>io.atomix.copycat</groupId>
        <artifactId>copycat-server</artifactId>
        <version>1.1.4</version>
    </dependency>
    <dependency>
        <groupId>io.atomix.copycat</groupId>
        <artifactId>copycat-client</artifactId>
        <version>1.1.4</version>
    </dependency>
    <dependency>
        <groupId>io.atomix.catalyst</groupId>
        <artifactId>catalyst-netty</artifactId>
        <version>1.1.1</version>
    </dependency>

    2.自定义StateMachine以及Command

    //自定了MapstateMachine,它继承框架提供的StateMachine类,MapstateMachine主要处理来自客户端的操作,如示例建的这个类,用于处理两个操作,put和get.put用于向map中写入键值,get用于获取值
    public class MapstateMachine extends StateMachine implements Snapshottable {
        //此为copycat-server需要维护的一致性数据结构,本例使用的是MAP
        private Map<Object, Object> map = new HashMap<>();
    
        //定义对map的put操作
        public Object put(Commit<PutCommand> commit) {
            try {
                map.put(commit.operation().key(), commit.operation().value());
            } finally {
                commit.close();
            }
            return null;
        }
        //定义对map的get操作
        public Object get(Commit<GetQuery> commit) {
            try {
                return map.get(commit.operation().key());
            } finally {
                commit.close();
            }
        }
    
        //以下两个方法来自于实现Snapshottable的接口,实现这个接口是用于copycat-server能够对本地状态日志进行压缩,并形成snapshot(快照),当copycat-server重启后,可以从快照恢复状态,如果有其它的server加入进来,可以将快照复制到其它server上.
        @Override
        public void snapshot(SnapshotWriter writer) {
            writer.writeObject(map);
        }
    
        @Override
        public void install(SnapshotReader reader) {
            map = reader.readObject();
        }
    }

    GetQuery类

    package com.xkx.common;
    
    import io.atomix.copycat.Query;
    
    //定义对MapstateMachine查询的命令
    public class GetQuery implements Query<Object> {
    
        private final Object key;
    
        public GetQuery(Object key){
            this.key = key;
        }
    
        public Object key(){
            return key;
        }
    }

    PutCommand类

    package com.xkx.common;
    
    
    import io.atomix.copycat.Command;
    
    public class PutCommand implements Command<Object> {
    
        private final Object key;
        private final Object value;
    
        public PutCommand(Object key,Object value){
            this.key = key;
            this.value = value;
        }
    
        public Object key(){
            return key;
        }
    
        public Object value(){
            return value;
        }
    
    }

    PutCommand和GetQuery类都实现Command接口.

    3.最后定义服务器端和客户端,copycat_server这里我们实现3个,copyCat_server-1,copyCat_server-2,copyCat_server-3。它们共同组成一个cluster.这里我们通过copyCat_server-2,copyCat_server-3 join到copyCat_server-1的方式形成cluseter

    copyCat_server-1 实现

    package com.xkx.myCopycat;
    
    import com.xkx.common.GetQuery;
    import com.xkx.common.MapstateMachine;
    import com.xkx.common.PutCommand;
    import io.atomix.catalyst.transport.Address;
    import io.atomix.catalyst.transport.netty.NettyTransport;
    import io.atomix.copycat.server.CopycatServer;
    import io.atomix.copycat.server.storage.Storage;
    import io.atomix.copycat.server.storage.StorageLevel;
    
    import java.io.File;
    import java.util.concurrent.CompletableFuture;
    
    public class Main {
    
        public static void main(String[] args){
    
            //设置server_1的地址和端口
            Address address = new Address("127.0.0.1", 5000);
            //通过chain的方式创建copy_cat server
            CopycatServer server = CopycatServer.builder(address)
                    .withStateMachine(MapstateMachine::new)
                    .withTransport(NettyTransport.builder()
                            .withThreads(4)
                            .build())
                    .withStorage(Storage.builder()
                            .withDirectory(new File("logs"))
                          .withStorageLevel(StorageLevel.DISK)
                            .build())
                    .build();
            //注册putCommand和GetQuery命令类
            server.serializer().register(PutCommand.class);
            server.serializer().register(GetQuery.class);
    
            //启动服务器
            CompletableFuture<CopycatServer> future = server.bootstrap();
            future.join();
    
        }
    
    }

    copyCat_server-2 实现

    package com.xkx.myCopycat2;
    
    import com.xkx.common.GetQuery;
    import com.xkx.common.MapstateMachine;
    import com.xkx.common.PutCommand;
    import io.atomix.catalyst.transport.Address;
    import io.atomix.catalyst.transport.netty.NettyTransport;
    import io.atomix.copycat.server.CopycatServer;
    import io.atomix.copycat.server.storage.Storage;
    import io.atomix.copycat.server.storage.StorageLevel;
    
    import java.io.File;
    import java.util.Collection;
    import java.util.Collections;
    
    public class Main2 {
    
        public static void main(String[] args){
    
            Address address = new Address("127.0.0.1", 5001);
    
            CopycatServer server = CopycatServer.builder(address)
                    .withStateMachine(MapstateMachine::new)
                    .withTransport(NettyTransport.builder()
                            .withThreads(4)
                            .build())
                    .withStorage(Storage.builder()
                            .withDirectory(new File("logs"))
                            .withStorageLevel(StorageLevel.DISK)
                            .build())
                    .build();
    
            server.serializer().register(PutCommand.class);
            server.serializer().register(GetQuery.class);
    
            //这里通过join到copyCat-server-1实现cluster
            Collection<Address> cluster = Collections.singleton(new Address("127.0.0.1", 5000));
            server.join(cluster).join();
    
        }
    
    }

    这里只给出copyCat-server-1和copyCat_server-2 的实现,copyCat-server-3跟copyCat_server-2 实现相同,只是改变了下IP地址和端口.

    copycat-client实现

    package com.xkx.client;
    
    import com.xkx.common.GetQuery;
    import com.xkx.common.PutCommand;
    import io.atomix.catalyst.transport.Address;
    import io.atomix.catalyst.transport.netty.NettyTransport;
    import io.atomix.copycat.client.CopycatClient;
    
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.concurrent.CompletableFuture;
    
    public class ClientMain {
    
        public static void main(String[] args){
    
            CopycatClient.Builder builder = CopycatClient.builder();
    
            builder.withTransport(NettyTransport.builder()
                    .withThreads(2)
                    .build());
    
            CopycatClient client = builder.build();
    
            //客户端注册命令
            client.serializer().register(PutCommand.class);
            client.serializer().register(GetQuery.class);
    
    
            //集群的ip以及端口
            Collection<Address> cluster = Arrays.asList(
                    new Address("127.0.0.1", 5000),
                    new Address("127.0.0.1", 5001),
                    new Address("127.0.0.1", 5002)
            );
    
            CompletableFuture<CopycatClient> future = client.connect(cluster);
            future.join();
    
            //使用PutCommand提交三个键值对
            CompletableFuture[] futures = new CompletableFuture[3];
            futures[0] = client.submit(new PutCommand("foo", "Hello world!"));
            futures[1] = client.submit(new PutCommand("bar", "Hello world!"));
            futures[2] = client.submit(new PutCommand("baz", "Hello world!"));
    
            //等待集群完成一致性的复制后,打印完成的结果
            CompletableFuture.allOf(futures).thenRun(() -> System.out.println("Commands completed!"));
    
            //客户端提交查询
            client.submit(new GetQuery("foo")).thenAccept(result -> {
                System.out.println("foo is: " + result);
            });
        }
    
    }

    java项目工程结构:
    这里写图片描述

    注意copyCat-server 和 copyCat-client都应该使用相同的GetQuery,MapstateMachine,PutCommand类,所以放在了common目录下,也就说他们都需要应用相同的类。

    实验结果:
    copyCat-server-1 console内容:
    这里写图片描述
    copyCat-server-2 console内容:
    这里写图片描述
    copyCat-server-3 console内容:
    这里写图片描述
    copyCat-client console内容:
    这里写图片描述

    可以看到三台server中copyCat-server-1被选举为Leader,另外两台为Follower,所有请求都会到copyCat-server-1来处理,并通过Raft算法复制到另外两台server。

    展开全文
  • 微博 车轮重塑,推特
  • 山寨 我最后一年项目的代码抄袭检测模块。
  • copycat-action:用于将文件复制到其他存储库的GitHub Action
  • 局域网里和别人交流文本信息你怎么做?用QQ等IM工具?建立个QQ群?...其实我们完全可以利用Copycat这款软件,打造一个局域网共享剪贴板。这样在其中一台电脑中复制的文本信息,就能在其它电脑中直接粘贴了。
  • Copycat - 状态

    2017-04-06 21:16:00
    context.transition(CopycatServer.State.FOLLOWER); } // If the number of failures has increased above 3 and the member hasn't been marked as UNAVAILABLE, do so. else if (member....

    Member.Status

    status的变迁是源于heartbeat

    heartbeat,append空的entries

    复制代码
    /**
       * Triggers a heartbeat to a majority of the cluster.
       * <p>
       * For followers to which no AppendRequest is currently being sent, a new empty AppendRequest will be
       * created and sent. For followers to which an AppendRequest is already being sent, the appendEntries()
       * call will piggyback on the *next* AppendRequest. Thus, multiple calls to this method will only ever
       * result in a single AppendRequest to each follower at any given time, and the returned future will be
       * shared by all concurrent calls.
       *
       * @return A completable future to be completed the next time a heartbeat is received by a majority of the cluster.
       */
      public CompletableFuture<Long> appendEntries() {
        // If there are no other active members in the cluster, simply complete the append operation.
        if (context.getClusterState().getRemoteMemberStates().isEmpty())
          return CompletableFuture.completedFuture(null);
    
        // If no heartbeat future already exists, that indicates there's no heartbeat currently under way.
        // Create a new heartbeat future and commit to all members in the cluster.
        if (heartbeatFuture == null) {
          CompletableFuture<Long> newHeartbeatFuture = new CompletableFuture<>();
          heartbeatFuture = newHeartbeatFuture;
          heartbeatTime = System.currentTimeMillis();
          for (MemberState member : context.getClusterState().getRemoteMemberStates()) {
            appendEntries(member); // 对所有member发起appendEntries
          }
          return newHeartbeatFuture;
        }
    复制代码

    heartbeat的逻辑是会向所有的getRemoteMemberStates,发起heartbeat

     

    AVAILABLE

    在初始化的时候,每个ServerMember默认是Status.AVAILABLE

    public final class ServerMember implements Member, CatalystSerializable, AutoCloseable {
      private Member.Type type;
      private Status status = Status.AVAILABLE;

     

    LeaderAppender

    复制代码
    @Override
      protected void succeedAttempt(MemberState member) {
        super.succeedAttempt(member);
    
        // If the member is currently marked as UNAVAILABLE, change its status to AVAILABLE and update the configuration.
        if (member.getMember().status() == ServerMember.Status.UNAVAILABLE && !leader.configuring()) {
          member.getMember().update(ServerMember.Status.AVAILABLE, Instant.now());
          leader.configure(context.getCluster().members());
        }
      }
    复制代码

     

    在succeedAttempt里面会将unavailable转换成available;在super.succeedAttempt中会将fail count清空

     

    这个当收到AppendResponseOk的时候会调用,

    protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) {
        // Reset the member failure count and update the member's availability status if necessary.
        succeedAttempt(member);

    leader的心跳是通过空AppendResponse实现的,所以可以收到ResponseOK,说明member是available的

     

    UNAVAILABLE

    在fail Attempt中被调用

    复制代码
    @Override
      protected void failAttempt(MemberState member, Throwable error) {
        super.failAttempt(member, error);
    
        // Verify that the leader has contacted a majority of the cluster within the last two election timeouts.
        // If the leader is not able to contact a majority of the cluster within two election timeouts, assume
        // that a partition occurred and transition back to the FOLLOWER state.
        if (System.currentTimeMillis() - Math.max(heartbeatTime(), leaderTime) > context.getElectionTimeout().toMillis() * 2) {
          LOGGER.warn("{} - Suspected network partition. Stepping down", context.getCluster().member().address());
          context.setLeader(0);
          context.transition(CopycatServer.State.FOLLOWER);
        }
        // If the number of failures has increased above 3 and the member hasn't been marked as UNAVAILABLE, do so.
        else if (member.getFailureCount() >= 3) {
          // If the member is currently marked as AVAILABLE, change its status to UNAVAILABLE and update the configuration.
          if (member.getMember().status() == ServerMember.Status.AVAILABLE && !leader.configuring()) {
            member.getMember().update(ServerMember.Status.UNAVAILABLE, Instant.now());
            leader.configure(context.getCluster().members());
          }
        }
      }
    复制代码

    super.failAttempt中,会重置connection,和increase failcount

    member.incrementFailureCount();

     

    第一个判断Math.max(heartbeatTime(), leaderTime)

    heartbeatTime

    复制代码
    /**
       * Returns the last time a majority of the cluster was contacted.
       * <p>
       * This is calculated by sorting the list of active members and getting the last time the majority of
       * the cluster was contacted based on the index of a majority of the members. So, in a list of 3 ACTIVE
       * members, index 1 (the second member) will be used to determine the commit time in a sorted members list.
       */
      private long heartbeatTime() {
        int quorumIndex = quorumIndex();
        if (quorumIndex >= 0) {
          return context.getClusterState().getActiveMemberStates((m1, m2)-> Long.compare(m2.getHeartbeatTime(), m1.getHeartbeatTime())).get(quorumIndex).getHeartbeatTime();
        }
        return System.currentTimeMillis();
      }
    复制代码

    这个意思将ActiveMember按heartbeat排序,然后取出quorumIndex的heartbeat,即多数派中最早的heartbeat 
    如果leader收到的有效heartbeat达不到多数派,说明发生脑裂

    这时,leader会退化成follower

     

    第二个判断,当一个member的failcount>3,就把他标记为UNAVAILABLE

     

    而failAttempt,会在各种fail response里面被调用

    AbstractAppender
    handleAppendRequestFailure,
    handleAppendResponseFailure,
    handleConfigureRequestFailure,
    handleInstallRequestFailure

     

     

    CopycatServer.State

     

    复制代码
    public enum State {
    
        /**
         * Represents the state of an inactive server.
         * <p>
         * All servers start in this state and return to this state when {@link #leave() stopped}.
         */
        INACTIVE,
    
        /**
         * Represents the state of a server that is a reserve member of the cluster.
         * <p>
         * Reserve servers only receive notification of leader, term, and configuration changes.
         */
        RESERVE,
    
        /**
         * Represents the state of a server in the process of catching up its log.
         * <p>
         * Upon successfully joining an existing cluster, the server will transition to the passive state and remain there
         * until the leader determines that the server has caught up enough to be promoted to a full member.
         */
        PASSIVE,
    
        /**
         * Represents the state of a server participating in normal log replication.
         * <p>
         * The follower state is a standard Raft state in which the server receives replicated log entries from the leader.
         */
        FOLLOWER,
    
        /**
         * Represents the state of a server attempting to become the leader.
         * <p>
         * When a server in the follower state fails to receive communication from a valid leader for some time period,
         * the follower will transition to the candidate state. During this period, the candidate requests votes from
         * each of the other servers in the cluster. If the candidate wins the election by receiving votes from a majority
         * of the cluster, it will transition to the leader state.
         */
        CANDIDATE,
    
        /**
         * Represents the state of a server which is actively coordinating and replicating logs with other servers.
         * <p>
         * Leaders are responsible for handling and replicating writes from clients. Note that more than one leader can
         * exist at any given time, but Raft guarantees that no two leaders will exist for the same {@link Cluster#term()}.
         */
        LEADER
    
      }
    复制代码

     

    在serverContext初始化的时候,state为Inactive

    public class ServerContext implements AutoCloseable {
      //......
      protected ServerState state = new InactiveState(this);

     

    比较tricky的是,在Member里面有,

    复制代码
    enum Type {
    
        /**
         * Represents an inactive member.
         * <p>
         * The {@code INACTIVE} member type represents a member which does not participate in any communication
         * and is not an active member of the cluster. This is typically the state of a member prior to joining
         * or after leaving a cluster.
         */
        INACTIVE,
    
        /**
         * Represents a member which does not participate in replication.
         * <p>
         * The {@code RESERVE} member type is representative of a member that does not participate in any
         * replication of state but only maintains contact with the cluster leader and is an active member
         * of the {@link Cluster}. Typically, reserve members act as standby nodes which can be
         * {@link #promote() promoted} to a {@link #PASSIVE} or {@link #ACTIVE} role when needed.
         */
        RESERVE,
    
        /**
         * Represents a member which participates in asynchronous replication but does not vote in elections
         * or otherwise participate in the Raft consensus algorithm.
         * <p>
         * The {@code PASSIVE} member type is representative of a member that receives state changes from
         * follower nodes asynchronously. As state changes are committed via the {@link #ACTIVE} Raft nodes,
         * committed state changes are asynchronously replicated by followers to passive members. This allows
         * passive members to maintain nearly up-to-date state with minimal impact on the performance of the
         * Raft algorithm itself, and allows passive members to be quickly promoted to {@link #ACTIVE} voting
         * members if necessary.
         */
        PASSIVE,
    
        /**
         * Represents a full voting member of the Raft cluster which participates fully in leader election
         * and replication algorithms.
         * <p>
         * The {@code ACTIVE} member type represents a full voting member of the Raft cluster. Active members
         * participate in the Raft leader election and replication algorithms and can themselves be elected
         * leaders.
         */
        ACTIVE,
    
      }
    复制代码

    看看不同,这里面有Active,而State里面没有

    除此state包含type;

    意思是,memeber可以是inactive,reserve,passive和active

    当member是inactive,reserve,passive时,那么server的state也和其相应

    当member是active时,那么server的state,可能是follower,candidator或leader其中之一

     

    在CopycatServer.builder中,

    public static class Builder implements io.atomix.catalyst.util.Builder<CopycatServer> {
      //......
      private Member.Type type = Member.Type.ACTIVE;

     

    而注意,transition是根据Member.type,来transition state的

    复制代码
    /**
       * Transitions the server to the base state for the given member type.
       */
      protected void transition(Member.Type type) {
        switch (type) {
          case ACTIVE:
            if (!(state instanceof ActiveState)) {
              transition(CopycatServer.State.FOLLOWER);
            }
            break;
          case PASSIVE:
            if (this.state.type() != CopycatServer.State.PASSIVE) {
              transition(CopycatServer.State.PASSIVE);
            }
            break;
          case RESERVE:
            if (this.state.type() != CopycatServer.State.RESERVE) {
              transition(CopycatServer.State.RESERVE);
            }
            break;
          default:
            if (this.state.type() != CopycatServer.State.INACTIVE) {
              transition(CopycatServer.State.INACTIVE);
            }
            break;
        }
      }
    复制代码

    注意Active的处理,

    当Member.type为active,如果这个时候state不是ActiveState,就transition到follower;显然candidator和leader不是能直接transition过去的

     

    可以看到上面ServerContext在初始化的时候,state的初始状态是inactive 
    何时会变成active,

    在server bootstrap或join一个cluster时, 都会调用ClusterState.join,里面会做状态的transition

    复制代码
    @Override
      public CompletableFuture<Void> bootstrap(Collection<Address> cluster) {
    
        if (configuration == null) {
          if (member.type() != Member.Type.ACTIVE) {
            return Futures.exceptionalFuture(new IllegalStateException("only ACTIVE members can bootstrap the cluster"));
          } else {
            // Create a set of active members.
            Set<Member> activeMembers = cluster.stream()
              .filter(m -> !m.equals(member.serverAddress()))
              .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated()))
              .collect(Collectors.toSet());
    
            // Add the local member to the set of active members.
            activeMembers.add(member);
    
            // Create a new configuration and store it on disk to ensure the cluster can fall back to the configuration.
            configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers));
          }
        }
        return join();
      }
    复制代码

     

    复制代码
    @Override
      public synchronized CompletableFuture<Void> join(Collection<Address> cluster) {
    
        // If no configuration was loaded from disk, create a new configuration.
        if (configuration == null) {
          // Create a set of cluster members, excluding the local member which is joining a cluster.
          Set<Member> activeMembers = cluster.stream()
            .filter(m -> !m.equals(member.serverAddress()))
            .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated()))
            .collect(Collectors.toSet());
    
          // Create a new configuration and configure the cluster. Once the cluster is configured, the configuration
          // will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary.
          configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers)); //修改配置
        }
        return join();
      }
    
      /**
       * Starts the join to the cluster.
       */
      private synchronized CompletableFuture<Void> join() {
        joinFuture = new CompletableFuture<>();
    
        context.getThreadContext().executor().execute(() -> {
          // Transition the server to the appropriate state for the local member type.
          context.transition(member.type()); //transition state
    
          // Attempt to join the cluster. If the local member is ACTIVE then failing to join the cluster
          // will result in the member attempting to get elected. This allows initial clusters to form.
          List<MemberState> activeMembers = getActiveMemberStates();
          if (!activeMembers.isEmpty()) {
            join(getActiveMemberStates().iterator());
          } else {
            joinFuture.complete(null);
          }
        });
    复制代码

     

     

    下面看看leader,candidator和follower之间的转化条件,

    Leader

    只有当Candidator发起vote,得到majority同意时,

    context.transition(CopycatServer.State.LEADER)
    复制代码
    /**
       * Resets the election timer.
       */
      private void sendVoteRequests() {
        //.........
        // Send vote requests to all nodes. The vote request that is sent
        // to this node will be automatically successful.
        // First check if the quorum is null. If the quorum isn't null then that
        // indicates that another vote is already going on.
        final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {
          complete.set(true);
          if (elected) {
            context.transition(CopycatServer.State.LEADER); //checkComplete()调用
          } else {
            context.transition(CopycatServer.State.FOLLOWER);
          }
        });
    
        // Once we got the last log term, iterate through each current member
        // of the cluster and vote each member for a vote.
        for (ServerMember member : votingMembers) {
          LOGGER.debug("{} - Requesting vote from {} for term {}", context.getCluster().member().address(), member, context.getTerm());
          VoteRequest request = VoteRequest.builder()
            .withTerm(context.getTerm())
            .withCandidate(context.getCluster().member().id())
            .withLogIndex(lastIndex)
            .withLogTerm(lastTerm)
            .build();
    
          context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> {
            connection.<VoteRequest, VoteResponse>send(request).whenCompleteAsync((response, error) -> {
              context.checkThread();
              if (isOpen() && !complete.get()) {
                if (error != null) {
                  LOGGER.warn(error.getMessage());
                  quorum.fail();
                } else {
                    //........
                  } else {
                    LOGGER.debug("{} - Received successful vote from {}", context.getCluster().member().address(), member);
                    quorum.succeed(); //member同意,succeeded++;checkComplete();
                  }
                }
              }
            }, context.getThreadContext().executor());
          });
    复制代码

     

    Candidator

    只有当Follower发起Poll请求,并得到majority的同意后,

    复制代码
      /**
       * Polls all members of the cluster to determine whether this member should transition to the CANDIDATE state.
       */
      private void sendPollRequests() {
       final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {
          // If a majority of the cluster indicated they would vote for us then transition to candidate.
          complete.set(true);
          if (elected) {
            context.transition(CopycatServer.State.CANDIDATE);
          } else {
            resetHeartbeatTimeout();
          }
        });
        
        //......
    复制代码

     

    Follower

    Leader –> Follower

    在LeaderAppender中,由于heartbeat触发

    复制代码
    /**
       * Handles a {@link Response.Status#OK} response.
       */
      protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) {
        //......
        // If we've received a greater term, update the term and transition back to follower.
        else if (response.term() > context.getTerm()) {
          context.setTerm(response.term()).setLeader(0);
          context.transition(CopycatServer.State.FOLLOWER);
        }
    复制代码

    如果收到Response OK,但是response的term大于我的term,说明我已经不是leader了 
    所以要退化成follower

    复制代码
    /**
       * Handles a {@link Response.Status#ERROR} response.
       */
      protected void handleAppendResponseError(MemberState member, AppendRequest request, AppendResponse response) {
        // If we've received a greater term, update the term and transition back to follower.
        if (response.term() > context.getTerm()) {
          context.setTerm(response.term()).setLeader(0);
          context.transition(CopycatServer.State.FOLLOWER);
    复制代码

    对于ResponseError也一样

    复制代码
    @Override
      protected void failAttempt(MemberState member, Throwable error) {
        super.failAttempt(member, error);
    
        // Verify that the leader has contacted a majority of the cluster within the last two election timeouts.
        // If the leader is not able to contact a majority of the cluster within two election timeouts, assume
        // that a partition occurred and transition back to the FOLLOWER state.
        if (System.currentTimeMillis() - Math.max(heartbeatTime(), leaderTime) > context.getElectionTimeout().toMillis() * 2) {
          LOGGER.warn("{} - Suspected network partition. Stepping down", context.getCluster().member().address());
          context.setLeader(0);
          context.transition(CopycatServer.State.FOLLOWER);
        }
    复制代码

    failAttemp时,两个getElectionTimeout超时内,收不到majority的heartbeat,说明发生partition 
    退化成follower

     

    在LeaderState中,

    leader初始化失败时,

    复制代码
    /**
       * Commits a no-op entry to the log, ensuring any entries from a previous term are committed.
       */
      private CompletableFuture<Void> commitInitialEntries() {
        // The Raft protocol dictates that leaders cannot commit entries from previous terms until
        // at least one entry from their current term has been stored on a majority of servers. Thus,
        // we force entries to be appended up to the leader's no-op entry. The LeaderAppender will ensure
        // that the commitIndex is not increased until the no-op entry (appender.index()) is committed.
        CompletableFuture<Void> future = new CompletableFuture<>();
        appender.appendEntries(appender.index()).whenComplete((resultIndex, error) -> {
          context.checkThread();
          if (isOpen()) {
            if (error == null) {
              context.getStateMachine().apply(resultIndex);
              future.complete(null);
            } else {
              context.setLeader(0);
              context.transition(CopycatServer.State.FOLLOWER);
            }
          }
        });
        return future;
      }
    复制代码

    也会退化为follower

     

    Candidator –> Follower

    Vote失败时,退化为follower

    复制代码
    /**
       * Resets the election timer.
       */
      private void sendVoteRequests() {
        //......
        // Send vote requests to all nodes. The vote request that is sent
        // to this node will be automatically successful.
        // First check if the quorum is null. If the quorum isn't null then that
        // indicates that another vote is already going on.
        final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {
          complete.set(true);
          if (elected) {
            context.transition(CopycatServer.State.LEADER);
          } else {
            context.transition(CopycatServer.State.FOLLOWER); //没被选中
          }
        });
    复制代码

     

    ActiveState –> Follower

    包含LeaderState,CandidatorState,在响应vote,append请求时,都会下面的逻辑

    复制代码
        // If the request indicates a term that is greater than the current term then
        // assign that term and leader to the current context and transition to follower.
        boolean transition = updateTermAndLeader(request.term(), request.leader());
        
        // If a transition is required then transition back to the follower state.
        // If the node is already a follower then the transition will be ignored.
        if (transition) {
          context.transition(CopycatServer.State.FOLLOWER);
        }
    复制代码

     

    复制代码
    /**
       * Updates the term and leader.
       */
      protected boolean updateTermAndLeader(long term, int leader) {
        // If the request indicates a term that is greater than the current term or no leader has been
        // set for the current term, update leader and term.
        if (term > context.getTerm() || (term == context.getTerm() && context.getLeader() == null && leader != 0)) {
          context.setTerm(term);
          context.setLeader(leader);
    
          // Reset the current cluster configuration to the last committed configuration when a leader change occurs.
          context.getClusterState().reset();
          return true;
        }
        return false;
      }
    复制代码
    展开全文
  • CopyCat - 基于Node.js的通用MITM Web服务器。与DNS欺骗或其他重定向攻击一起使用时,此服务器将充当受害者和真实服务器之间的Web流量的MITM。
  • Copycat - MemberShip

    2017-04-06 21:34:00
    为了便于实现,Copycat把member分成3种, active, passive, and reserve members — each of which play some role in supporting rapid replacement of failed servers. Activemembers are full voting...

    为了便于实现,Copycat把member分成3种,

    active, passive, and reserve members — each of which play some role in supporting rapid replacement of failed servers.

     

    Active members are full voting members which participate in all aspects of the Raft consensus algorithm. Active servers are always in one of the Raft states — follower, candidate, or leader — at any given time.

    Active member包含Raft协议中的follower, candidate, or leader 角色,即正常使用的member

     

    Passive member

    When a new server is added to a Raft cluster, the server typically must be caught up to within some bound of the leader before it can become a full voting member of the cluster. Adding a new server without first warming up its log will result in some period of decreased availability.

    Systems can maintain servers that are virtually kept in sync with the rest of the cluster at all times. We call these servers passive servers.

    那些只是和集群其他的member保持同步,但不参加vote流程的member,称为Passive 
    有passive节点的好处,是当需要加Active节点和替换fail的Active节点时,不需要catch up的过程,直接替换就可以 
    passive节点,还可以作为只读节点

     

    Passive Replication

    Passive节点的同步catchup,不是从leader直接接收AppendEntries RPCs

    Each follower is responsible for sending AppendEntries RPCs to a subset of passive servers at regular intervals.

    • Each follower sends AppendEntries RPCs only to a subset of passive servers
    • Followers send only committed entries to passive servers

     

    Reserve Members

    For large clusters, though, the overhead of maintaining passive servers can by itself become a drain on the cluster’s resources. 
    Each additional passive server imposes the overhead of replicating all committed log entries, and this is significant even if done by followers. Thus, to ease the load on large clusters, we introduce the reserve member type.

    对于比较大的集群,维护passive member的代价也很高;所以为了降低这个成本,又加入reserve member

    Reserve members serve as standbys to passive members. 
    Reserve servers do not maintain state machines and need not known about committed entries. 
    However, because reserve servers can be promoted to passive, they do need to have some mechanism for learning about configuration changes.

    Reserve是passive的standby,这样就不用维护太多的passive,当一个passive升级成active后,再把一个Reserve加入到Passive中

    所以Reserve不需要存储完整的machine state,但是需要知道configuration的变化

     

    Member

    复制代码
    public interface Member {
    
      enum Type {
    
        INACTIVE,
    
        RESERVE,
    
        PASSIVE,
    
        ACTIVE,
    
      }
    
      CompletableFuture<Void> promote();
      
      CompletableFuture<Void> demote();
    
    }
    复制代码

    定义member有几种类型,然后可以promote和demote

     

    ServerMember

    复制代码
    public final class ServerMember implements Member, CatalystSerializable, AutoCloseable {
    
      @Override
      public CompletableFuture<Void> promote() {
        return configure(Type.values()[type.ordinal() + 1]);
      }
      
      @Override
      public CompletableFuture<Void> demote() {
        return configure(Type.values()[type.ordinal() - 1]);
      }
            
      /**
       * Recursively reconfigures the cluster.
       */
      private void configure(Member.Type type, CompletableFuture<Void> future) {
        // Set a timer to retry the attempt to leave the cluster.
        configureTimeout = cluster.getContext().getThreadContext().schedule(cluster.getContext().getElectionTimeout(), () -> {
          configure(type, future);
        });
    
        // Attempt to leave the cluster by submitting a LeaveRequest directly to the server state.
        // Non-leader states should forward the request to the leader if there is one. Leader states
        // will log, replicate, and commit the reconfiguration.
        cluster.getContext().getServerState().reconfigure(ReconfigureRequest.builder() //往ServerState发送reconfigure请求
          .withIndex(cluster.getConfiguration().index())
          .withTerm(cluster.getConfiguration().term())
          .withMember(new ServerMember(type, serverAddress(), clientAddress(), updated))
          .build()).whenComplete((response, error) -> {
            //.......
        });
      }
    复制代码

     

    下面看看ServerState

    ServerState,定义各种可以接受的request

    复制代码
    public interface ServerState extends Managed<ServerState> {
    
    
      CopycatServer.State type();
    
      CompletableFuture<RegisterResponse> register(RegisterRequest request);
    
      CompletableFuture<ConnectResponse> connect(ConnectRequest request, Connection connection);
    
      CompletableFuture<KeepAliveResponse> keepAlive(KeepAliveRequest request);
    
      CompletableFuture<UnregisterResponse> unregister(UnregisterRequest request);
    
      CompletableFuture<PublishResponse> publish(PublishRequest request);
    
      CompletableFuture<ConfigureResponse> configure(ConfigureRequest request);
    
      CompletableFuture<InstallResponse> install(InstallRequest request);
    
      CompletableFuture<JoinResponse> join(JoinRequest request);
    
      CompletableFuture<ReconfigureResponse> reconfigure(ReconfigureRequest request);
    
      CompletableFuture<LeaveResponse> leave(LeaveRequest request);
    
      CompletableFuture<AppendResponse> append(AppendRequest request);
    
      CompletableFuture<PollResponse> poll(PollRequest request);
    
      CompletableFuture<VoteResponse> vote(VoteRequest request);
    
      CompletableFuture<CommandResponse> command(CommandRequest request);
    
      CompletableFuture<QueryResponse> query(QueryRequest request);
    
    }
    复制代码

     

    AbstractState

    复制代码
    public abstract class AbstractState implements ServerState {
    
      /**
       * Forwards the given request to the leader if possible.
       */
      protected <T extends Request, U extends Response> CompletableFuture<U> forward(T request) {
        CompletableFuture<U> future = new CompletableFuture<>();
        context.getConnections().getConnection(context.getLeader().serverAddress()).whenComplete((connection, connectError) -> {
          if (connectError == null) {
            connection.<T, U>send(request).whenComplete((response, responseError) -> {
              if (responseError == null) {
                future.complete(response);
              } else {
                future.completeExceptionally(responseError);
              }
            });
          } else {
            future.completeExceptionally(connectError);
          }
        });
        return future;
      }
    
      /**
       * Updates the term and leader.
       */
      protected boolean updateTermAndLeader(long term, int leader) {
        // If the request indicates a term that is greater than the current term or no leader has been
        // set for the current term, update leader and term.
        if (term > context.getTerm() || (term == context.getTerm() && context.getLeader() == null && leader != 0)) {
          context.setTerm(term);
          context.setLeader(leader);
    
          // Reset the current cluster configuration to the last committed configuration when a leader change occurs.
          context.getClusterState().reset();
          return true;
        }
        return false;
      }
    
    }
    复制代码

    主要是增加一些工具接口

     

    InactiveState

    对于InactiveState,只能响应configure请求,其他请求都是返回error

    复制代码
    class InactiveState extends AbstractState {
    
      @Override
      public CompletableFuture<ConfigureResponse> configure(ConfigureRequest request) {
    
        updateTermAndLeader(request.term(), request.leader());
    
        Configuration configuration = new Configuration(request.index(), request.term(), request.timestamp(), request.members());
    
        // Configure the cluster membership. This will cause this server to transition to the
        // appropriate state if its type has changed.
        context.getClusterState().configure(configuration); // 更新clusterState中对应server的state
    
        // If the configuration is already committed, commit it to disk.
        // Check against the actual cluster Configuration rather than the received configuration in
        // case the received configuration was an older configuration that was not applied.
        if (context.getCommitIndex() >= context.getClusterState().getConfiguration().index()) {
          context.getClusterState().commit();
        }
    
        return CompletableFuture.completedFuture(logResponse(ConfigureResponse.builder()
          .withStatus(Response.Status.OK)
          .build()));
      }
    复制代码

    context.getClusterState().configure

    首先看看什么是ClusterState

    Manages the persistent state of the Copycat cluster from the perspective of a single server

    也就是说,每个server上都会保存ClusterState,来了解整个cluster的情况

    并且ClusterState.member用来表示self server

    ClusterState.members用来记录cluster中所有的member的状态

    这里configure的逻辑,主要就是根据传入的configuration来更新member和members

     

    context.getClusterState().commit()

    核心逻辑,是把更新的配置commit到disk

    if (context.getMetaStore().loadConfiguration().index() < configuration.index()) {
          context.getMetaStore().storeConfiguration(configuration);
    }

     

    ReserveState

    响应append请求

    复制代码
    class ReserveState extends InactiveState {
      @Override
      public CompletableFuture<AppendResponse> append(AppendRequest request) {
        context.checkThread();
        logRequest(request);
        updateTermAndLeader(request.term(), request.leader());
    
        // Update the local commitIndex and globalIndex.
        context.setCommitIndex(request.commitIndex());
        context.setGlobalIndex(request.globalIndex()); //Sets the maximum compaction index for major compaction
        return CompletableFuture.completedFuture(logResponse(AppendResponse.builder()
          .withStatus(Response.Status.OK)
          .withTerm(context.getTerm())
          .withSucceeded(true)
          .withLogIndex(0)
          .build()));
      }
    复制代码
    AbstractState.updateTermAndLeader
    复制代码
    protected boolean updateTermAndLeader(long term, int leader) {
        // If the request indicates a term that is greater than the current term or no leader has been
        // set for the current term, update leader and term.
        if (term > context.getTerm() || (term == context.getTerm() && context.getLeader() == null && leader != 0)) {
          context.setTerm(term); //如果term,leader是新的更新context信息
          context.setLeader(leader);
    
          // Reset the current cluster configuration to the last committed configuration when a leader change occurs.
          context.getClusterState().reset(); //需要reset到最新commited配置,避免老leader的脏数据
          return true;
        }
        return false;
      }
    复制代码

    context.setCommitIndex

    复制代码
    ServerContext setCommitIndex(long commitIndex) {
    
        long previousCommitIndex = this.commitIndex;
        if (commitIndex > previousCommitIndex) {
          this.commitIndex = commitIndex;
          log.commit(Math.min(commitIndex, log.lastIndex())); //log commit到这个index
          long configurationIndex = cluster.getConfiguration().index();
          if (configurationIndex > previousCommitIndex && configurationIndex <= commitIndex) {
            cluster.commit(); //commit cluster的configuration
          }
        }
        return this;
      }
    复制代码

     

    响应command,query,register, keepalive, unregister, join, leave, reconfigure

    都是forward到leader

    也就是说,reserve以上的state都至少会转发这些request到leader

     

    PassiveState

    对于passive state,主要是和其他的member完成同步

     

    在open的时候会先truncate,没有commit的log,即所有PassiveState往后的state都有这个操作

    @Override
      public CompletableFuture<ServerState> open() {
        return super.open()
          .thenRun(this::truncateUncommittedEntries)
          .thenApply(v -> this);
      }
    private void truncateUncommittedEntries() {
        if (type() == CopycatServer.State.PASSIVE) {
          context.getLog().truncate(Math.min(context.getCommitIndex(), context.getLog().lastIndex()));
        }
      }

    open是何时调用的,

    ServerContext.transition,在state迁移时,

    复制代码
        // Close the old state.
        try {
          this.state.close().get();
        } catch (InterruptedException | ExecutionException e) {
          throw new IllegalStateException("failed to close Raft state", e);
        }
    
        // Force state transitions to occur synchronously in order to prevent race conditions.
        try {
          this.state = createState(state);
          this.state.open().get();
        } catch (InterruptedException | ExecutionException e) {
          throw new IllegalStateException("failed to initialize Raft state", e);
        }
    复制代码

    会调用old state的close,并且调用新state的open 
    注意后面的get,说明这里是block同步执行的

     

    响应connect请求,注册connection

    context.getStateMachine().executor().context().sessions().registerConnection(request.client(), connection);

     

    响应append请求,这里可以看到passive的append请求的逻辑要比reserve复杂,因为reserve只需要同步config,而passive需要同步状态机的数据

    复制代码
    @Override
      public CompletableFuture<AppendResponse> append(final AppendRequest request) {
        context.checkThread();
        logRequest(request);
        updateTermAndLeader(request.term(), request.leader());
    
        return CompletableFuture.completedFuture(logResponse(handleAppend(request)));
      }
    复制代码

    handleAppend

    复制代码
    protected AppendResponse handleAppend(AppendRequest request) {
        // If the request term is less than the current term then immediately
        // reply false and return our current term. The leader will receive
        // the updated term and step down.
        if (request.term() < context.getTerm()) {
          LOGGER.debug("{} - Rejected {}: request term is less than the current term ({})", context.getCluster().member().address(), request, context.getTerm());
          return AppendResponse.builder()
            .withStatus(Response.Status.OK)
            .withTerm(context.getTerm())
            .withSucceeded(false)
            .withLogIndex(context.getLog().lastIndex())
            .build();
        } else {
          return checkGlobalIndex(request);
        }
      }
    复制代码

    当request的term比较旧的时候,直接回复拒绝append,因为这说明这个leader已经是过期的,他需要stepdown

    checkGlobalIndex,调用appendEntries

    复制代码
    /**
       * Appends entries to the local log.
       */
      protected AppendResponse appendEntries(AppendRequest request) {
        // Get the last entry index or default to the request log index.
        long lastEntryIndex = request.logIndex();
        if (!request.entries().isEmpty()) {
          lastEntryIndex = request.entries().get(request.entries().size() - 1).getIndex();
        }
    
        // Ensure the commitIndex is not increased beyond the index of the last entry in the request.
        long commitIndex = Math.max(context.getCommitIndex(), Math.min(request.commitIndex(), lastEntryIndex));
    
        // Append entries to the log starting at the last log index.
        for (Entry entry : request.entries()) {
          // If the entry index is greater than the last index and less than the commit index, append the entry.
          // We perform no additional consistency checks here since passive members may only receive committed entries.
          if (context.getLog().lastIndex() < entry.getIndex() && entry.getIndex() <= commitIndex) {
            context.getLog().skip(entry.getIndex() - context.getLog().lastIndex() - 1).append(entry);
            LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex());
          }
        }
    
        // Update the context commit and global indices.
        context.setCommitIndex(commitIndex);
        context.setGlobalIndex(request.globalIndex());
    
        // Apply commits to the state machine in batch.
        context.getStateMachine().applyAll(context.getCommitIndex());
    
        return AppendResponse.builder()
          .withStatus(Response.Status.OK)
          .withTerm(context.getTerm())
          .withSucceeded(true)
          .withLogIndex(context.getLog().lastIndex())
          .build();
      }
    复制代码

    关键就是中间的注释,

    当entry index 大于last index,并小于commit index时,就append entry

     

    响应query请求,passive在某些条件下,也可以响应query请求

    复制代码
    @Override
      public CompletableFuture<QueryResponse> query(QueryRequest request) {
        context.checkThread();
        logRequest(request);
    
        // If the query was submitted with RYW or monotonic read consistency, attempt to apply the query to the local state machine.
        if (request.query().consistency() == Query.ConsistencyLevel.SEQUENTIAL) {
    
          // If this server has not yet applied entries up to the client's session ID, forward the
          // query to the leader. This ensures that a follower does not tell the client its session
          // doesn't exist if the follower hasn't had a chance to see the session's registration entry.
          if (context.getStateMachine().getLastApplied() < request.session()) {
            LOGGER.debug("{} - State out of sync, forwarding query to leader");
            return queryForward(request);
          }
    
          // If the commit index is not in the log then we've fallen too far behind the leader to perform a local query.
          // Forward the request to the leader.
          if (context.getLog().lastIndex() < context.getCommitIndex()) {
            LOGGER.debug("{} - State out of sync, forwarding query to leader");
            return queryForward(request);
          }
    
          QueryEntry entry = context.getLog().create(QueryEntry.class)
            .setIndex(request.index())
            .setTerm(context.getTerm())
            .setTimestamp(System.currentTimeMillis())
            .setSession(request.session())
            .setSequence(request.sequence())
            .setQuery(request.query());
    
          return queryLocal(entry); //查询local数据
        } else {
          return queryForward(request);
        }
      }
    复制代码

     

     

    ActiveState

    abstract class ActiveState extends PassiveState
    可以看到ActiveState是abstract,因为active member一定是follower,candidator,leader中的一种

    同时Active member作为正式的member,需要响应如,poll,vote,append等请求

     

    先看看append请求,和passive有什么不同

    复制代码
    @Override
      public CompletableFuture<AppendResponse> append(final AppendRequest request) {
        context.checkThread();
        logRequest(request);
    
        // If the request indicates a term that is greater than the current term then
        // assign that term and leader to the current context and transition to follower.
        boolean transition = updateTermAndLeader(request.term(), request.leader()); //如果request的term比当前的term大,说明你肯定不是leader
    
        CompletableFuture<AppendResponse> future = CompletableFuture.completedFuture(logResponse(handleAppend(request)));
    
        // If a transition is required then transition back to the follower state.
        // If the node is already a follower then the transition will be ignored.
        if (transition) {
          context.transition(CopycatServer.State.FOLLOWER); //切换到Follower
        }
        return future;
      }
    复制代码

    handleAppend最终仍然调用到,

    appendEntries
    复制代码
    // Iterate through request entries and append them to the log.
        for (Entry entry : request.entries()) {
          // If the entry index is greater than the last log index, skip missing entries.
          if (context.getLog().lastIndex() < entry.getIndex()) {
            context.getLog().skip(entry.getIndex() - context.getLog().lastIndex() - 1).append(entry);
            LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex());
          } else if (context.getCommitIndex() >= entry.getIndex()) {
            continue;
          } else {
            // Compare the term of the received entry with the matching entry in the log.
            long term = context.getLog().term(entry.getIndex());
            if (term != 0) {
              if (entry.getTerm() != term) {
                // We found an invalid entry in the log. Remove the invalid entry and append the new entry.
                // If appending to the log fails, apply commits and reply false to the append request.
                LOGGER.debug("{} - Appended entry term does not match local log, removing incorrect entries", context.getCluster().member().address());
                context.getLog().truncate(entry.getIndex() - 1).append(entry);
                LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex());
              }
            } else {
              context.getLog().truncate(entry.getIndex() - 1).append(entry);
              LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex());
            }
          }
        }
    复制代码

    主要这块逻辑和Passive是不一样的,

    当entry index > last index,不管commit index,就直接append entry

    如果entry index<= last index并小于commit index, 忽略这个entry

    如果entry index<= last index并大于commit index,说明有脏数据,所以truncate到entry.getIndex() - 1,继续append entry

     

    响应poll请求,

    复制代码
    /**
       * Handles a poll request.
       */
      protected PollResponse handlePoll(PollRequest request) {
        // If the request term is not as great as the current context term then don't
        // vote for the candidate. We want to vote for candidates that are at least
        // as up to date as us.
        if (request.term() < context.getTerm()) {
          LOGGER.debug("{} - Rejected {}: candidate's term is less than the current term", context.getCluster().member().address(), request);
          return PollResponse.builder()
            .withStatus(Response.Status.OK)
            .withTerm(context.getTerm())
            .withAccepted(false) //拒绝
            .build();
        } else if (isLogUpToDate(request.logIndex(), request.logTerm(), request)) {
          return PollResponse.builder()
            .withStatus(Response.Status.OK)
            .withTerm(context.getTerm())
            .withAccepted(true) //接受
            .build();
        } else {
          return PollResponse.builder()
            .withStatus(Response.Status.OK)
            .withTerm(context.getTerm())
            .withAccepted(false)
            .build();
        }
      }
    复制代码

    isLogUpToDate

    复制代码
    boolean isLogUpToDate(long lastIndex, long lastTerm, Request request) {
        // If the log is empty then vote for the candidate.
        if (context.getLog().isEmpty()) {
          LOGGER.debug("{} - Accepted {}: candidate's log is up-to-date", context.getCluster().member().address(), request);
          return true;
        }
    
        // Read the last entry index and term from the log.
        long localLastIndex = context.getLog().lastIndex();
        long localLastTerm = context.getLog().term(localLastIndex);
    
        // If the candidate's last log term is lower than the local log's last entry term, reject the request.
        if (lastTerm < localLastTerm) {
          LOGGER.debug("{} - Rejected {}: candidate's last log entry ({}) is at a lower term than the local log ({})", context.getCluster().member().address(), request, lastTerm, localLastTerm);
          return false;
        }
    
        // If the candidate's last term is equal to the local log's last entry term, reject the request if the
        // candidate's last index is less than the local log's last index. If the candidate's last log term is
        // greater than the local log's last term then it's considered up to date, and if both have the same term
        // then the candidate's last index must be greater than the local log's last index.
        if (lastTerm == localLastTerm && lastIndex < localLastIndex) {
          LOGGER.debug("{} - Rejected {}: candidate's last log entry ({}) is at a lower index than the local log ({})", context.getCluster().member().address(), request, lastIndex, localLastIndex);
          return false;
        }
    
        // If we made it this far, the candidate's last term is greater than or equal to the local log's last
        // term, and if equal to the local log's last term, the candidate's last index is equal to or greater
        // than the local log's last index.
        LOGGER.debug("{} - Accepted {}: candidate's log is up-to-date", context.getCluster().member().address(), request);
        return true;
      }
    复制代码

    接受poll请求的规则,

    如果当前log是empty的,那么只能接受

    如果request term小于 local term,拒绝

    如果request term等于 local term,但是request index < local index,拒绝

    总之,我只同意比我要更新的candidator,at least as up to date as us

     

    响应vote请求

    复制代码
    /**
       * Handles a vote request.
       */
      protected VoteResponse handleVote(VoteRequest request) {
        // If the request term is not as great as the current context term then don't
        // vote for the candidate. We want to vote for candidates that are at least
        // as up to date as us.
        if (request.term() < context.getTerm()) { //request的term旧
          LOGGER.debug("{} - Rejected {}: candidate's term is less than the current term", context.getCluster().member().address(), request);
          return VoteResponse.builder()
            .withStatus(Response.Status.OK)
            .withTerm(context.getTerm())
            .withVoted(false)
            .build();
        }
        // If a leader was already determined for this term then reject the request.
        else if (context.getLeader() != null) { //已经有leader
          LOGGER.debug("{} - Rejected {}: leader already exists", context.getCluster().member().address(), request);
          return VoteResponse.builder()
            .withStatus(Response.Status.OK)
            .withTerm(context.getTerm())
            .withVoted(false)
            .build();
        }
        // If the requesting candidate is not a known member of the cluster (to this
        // node) then don't vote for it. Only vote for candidates that we know about.
        else if (!context.getClusterState().getRemoteMemberStates().stream().<Integer>map(m -> m.getMember().id()).collect(Collectors.toSet()).contains(request.candidate())) {
          LOGGER.debug("{} - Rejected {}: candidate is not known to the local member", context.getCluster().member().address(), request);
          return VoteResponse.builder()
            .withStatus(Response.Status.OK)
            .withTerm(context.getTerm())
            .withVoted(false)
            .build();
        }
        // If no vote has been cast, check the log and cast a vote if necessary.
        else if (context.getLastVotedFor() == 0) { //如果还没有vote其他的candidator
          if (isLogUpToDate(request.logIndex(), request.logTerm(), request)) { //足够新
            context.setLastVotedFor(request.candidate());
            return VoteResponse.builder()
              .withStatus(Response.Status.OK)
              .withTerm(context.getTerm())
              .withVoted(true) //接受
              .build();
          } else {
            return VoteResponse.builder()
              .withStatus(Response.Status.OK)
              .withTerm(context.getTerm())
              .withVoted(false)
              .build();
          }
        }
        // If we already voted for the requesting server, respond successfully.
        else if (context.getLastVotedFor() == request.candidate()) { //之前选中也是该candidator
          LOGGER.debug("{} - Accepted {}: already voted for {}", context.getCluster().member().address(), request, context.getCluster().member(context.getLastVotedFor()).address());
          return VoteResponse.builder()
            .withStatus(Response.Status.OK)
            .withTerm(context.getTerm())
            .withVoted(true) //接受
            .build();
        }
        // In this case, we've already voted for someone else.
        else {
          LOGGER.debug("{} - Rejected {}: already voted for {}", context.getCluster().member().address(), request, context.getCluster().member(context.getLastVotedFor()).address());
          return VoteResponse.builder()
            .withStatus(Response.Status.OK)
            .withTerm(context.getTerm())
            .withVoted(false)
            .build();
        }
      }
    复制代码

     

    FollowerState

    关键逻辑是等待超时,并试图成为candidator

    public synchronized CompletableFuture<ServerState> open() {
        return super.open().thenRun(this::startHeartbeatTimeout).thenApply(v -> this);
      }

    在open的时候,异步run,thenRun就是不依赖前面的输入

    startHeartbeatTimeout –> resetHeartbeatTimeout

    复制代码
    private void resetHeartbeatTimeout() {
    
        // Set the election timeout in a semi-random fashion with the random range
        // being election timeout and 2 * election timeout.
        Duration delay = context.getElectionTimeout().plus(Duration.ofMillis(random.nextInt((int) context.getElectionTimeout().toMillis()))); //随机产生delay时间
        heartbeatTimer = context.getThreadContext().schedule(delay, () -> { //delay时间到后
          heartbeatTimer = null;
          if (isOpen()) {
            context.setLeader(0); //清空leader
            sendPollRequests(); //发送poll请求
          }
        });
      }
    复制代码

    当超时结束时,是否我就可以成为candidator,raft论文里面限制,必须要具有最新commit的member才能成为candidator,

    那么我怎么知道我是否具有最新的commit

    发送poll请求,majority都同意,说明我的commit比大多数都要新或一样新,说明我具有最新的commit

    注意这个resetHeartbeatTimeout,不光在这里调用,基本在Follower所有的请求响应时都会调用,即如果和leader有交互,会不停的重启这个timer

    只有接收不到leader的心跳了,才会调用sendPollRequests,试图成为candidator

    复制代码
    /**
       * Polls all members of the cluster to determine whether this member should transition to the CANDIDATE state.
       */
      private void sendPollRequests() {
        // Create a quorum that will track the number of nodes that have responded to the poll request.
        final AtomicBoolean complete = new AtomicBoolean();
        final Set<ServerMember> votingMembers = new HashSet<>(context.getClusterState().getActiveMemberStates().stream().map(MemberState::getMember).collect(Collectors.toList()));
    
        //找到所有active的members,并且生成Quorum
        final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {
          // If a majority of the cluster indicated they would vote for us then transition to candidate.
          complete.set(true);
          if (elected) { //如果elected成功
            context.transition(CopycatServer.State.CANDIDATE); //迁移到candidate
          } else {
            resetHeartbeatTimeout();
          }
        });
    
        // Once we got the last log term, iterate through each current member
        // of the cluster and vote each member for a vote.
        for (ServerMember member : votingMembers) {
          LOGGER.debug("{} - Polling {} for next term {}", context.getCluster().member().address(), member, context.getTerm() + 1);
          PollRequest request = PollRequest.builder()
            .withTerm(context.getTerm())
            .withCandidate(context.getCluster().member().id())
            .withLogIndex(lastIndex) //当前我的lastindex
            .withLogTerm(lastTerm) //当前我的lastTerm,别人需要根据index和term来决定是否poll我
            .build();
          context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> {
            connection.<PollRequest, PollResponse>send(request).whenCompleteAsync((response, error) -> { //异步发送request,并且加上callback
              context.checkThread();
              if (isOpen() && !complete.get()) {
                if (error != null) {
                  LOGGER.warn("{} - {}", context.getCluster().member().address(), error.getMessage());
                  quorum.fail();
                } else {
                  if (response.term() > context.getTerm()) {
                    context.setTerm(response.term());
                  }
    
                  if (!response.accepted()) {
                    LOGGER.debug("{} - Received rejected poll from {}", context.getCluster().member().address(), member);
                    quorum.fail();
                  } else if (response.term() != context.getTerm()) {
                    LOGGER.debug("{} - Received accepted poll for a different term from {}", context.getCluster().member().address(), member);
                    quorum.fail();
                  } else {
                    LOGGER.debug("{} - Received accepted poll from {}", context.getCluster().member().address(), member);
                    quorum.succeed(); //各种错误后,只有这个表示对方poll我了
                  }
                }
              }
            }, context.getThreadContext().executor());
          });
        }
      }
    复制代码

    quorum的实现比较简单,

    复制代码
    public class Quorum {
      private final int quorum;
      private int succeeded = 1;
      private int failed;
      private Consumer<Boolean> callback;
      private boolean complete;
    
      public Quorum(int quorum, Consumer<Boolean> callback) {
        this.quorum = quorum;
        this.callback = callback;
      }
    
      private void checkComplete() {
        if (!complete && callback != null) {
          if (succeeded >= quorum) {
            complete = true;
            callback.accept(true);
          } else if (failed >= quorum) {
            complete = true;
            callback.accept(false);
          }
        }
      }
    
      /**
       * Indicates that a call in the quorum succeeded.
       */
      public Quorum succeed() {
        succeeded++;
        checkComplete();
        return this;
      }
    复制代码

    succeed就+1,并checkComplete,当成功的数目大于quorum就调用callback

    FollowerState的append请求响应

    复制代码
    @Override
      public CompletableFuture<AppendResponse> append(AppendRequest request) {
        CompletableFuture<AppendResponse> future = super.append(request);
    
        // Reset the heartbeat timeout.
        resetHeartbeatTimeout();
    
        // Send AppendEntries requests to passive members if necessary.
        appender.appendEntries();
        return future;
      }
    复制代码

    可以看到除了调用super的append

    以及resetHB外,还有appender.appendEntries();

    这应该是Follower需要承担起,把数据同步给passive的责任

    复制代码
    final class FollowerAppender extends AbstractAppender {
    
      public FollowerAppender(ServerContext context) {
        super(context);
      }
    
      /**
       * Sends append requests to assigned passive members.
       */
      public void appendEntries() {
        if (open) {
          for (MemberState member : context.getClusterState().getAssignedPassiveMemberStates()) {
            appendEntries(member);
          }
        }
      }
    复制代码
    逻辑就是,给所有assigned的passive memeber,发送appendEntries

     

    CandidateState

    candidate的逻辑就是通过vote,变成leader

    public synchronized CompletableFuture<ServerState> open() {
        return super.open().thenRun(this::startElection).thenApply(v -> this);
      }

    startElection-> sendVoteRequests

    复制代码
    /**
       * Resets the election timer.
       */
      private void sendVoteRequests() {
    
        // When the election timer is reset, increment the current term and
        // restart the election.
        context.setTerm(context.getTerm() + 1).setLastVotedFor(context.getCluster().member().id());
        //重新选举,所以term+1;setLastVoteFor,设成self,先投自己一票
    
        final AtomicBoolean complete = new AtomicBoolean();
        final Set<ServerMember> votingMembers = new HashSet<>(context.getClusterState().getActiveMemberStates().stream().map(MemberState::getMember).collect(Collectors.toList()));
    
        // Send vote requests to all nodes. The vote request that is sent
        // to this node will be automatically successful.
        // First check if the quorum is null. If the quorum isn't null then that
        // indicates that another vote is already going on.
        final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {
          complete.set(true);
          if (elected) {
            context.transition(CopycatServer.State.LEADER); //vote成功,就是leader
          } else {
            context.transition(CopycatServer.State.FOLLOWER);
          }
        });
    
        // Once we got the last log term, iterate through each current member
        // of the cluster and vote each member for a vote.
        for (ServerMember member : votingMembers) {
          LOGGER.debug("{} - Requesting vote from {} for term {}", context.getCluster().member().address(), member, context.getTerm());
          VoteRequest request = VoteRequest.builder()
            .withTerm(context.getTerm())
            .withCandidate(context.getCluster().member().id())
            .withLogIndex(lastIndex)
            .withLogTerm(lastTerm)
            .build();
    
          context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> {
            connection.<VoteRequest, VoteResponse>send(request).whenCompleteAsync((response, error) -> {
              context.checkThread();
              if (isOpen() && !complete.get()) {
                if (error != null) {
                  LOGGER.warn(error.getMessage());
                  quorum.fail();
                } else {
                  if (response.term() > context.getTerm()) {
                    LOGGER.debug("{} - Received greater term from {}", context.getCluster().member().address(), member);
                    context.setTerm(response.term());
                    complete.set(true);
                    context.transition(CopycatServer.State.FOLLOWER);
                  } else if (!response.voted()) {
                    LOGGER.debug("{} - Received rejected vote from {}", context.getCluster().member().address(), member);
                    quorum.fail();
                  } else if (response.term() != context.getTerm()) {
                    LOGGER.debug("{} - Received successful vote for a different term from {}", context.getCluster().member().address(), member);
                    quorum.fail();
                  } else {
                    LOGGER.debug("{} - Received successful vote from {}", context.getCluster().member().address(), member);
                    quorum.succeed();
                  }
                }
              }
            }, context.getThreadContext().executor());
          });
        }
      }
    复制代码

    响应append请求,

    复制代码
    public CompletableFuture<AppendResponse> append(AppendRequest request) {
        context.checkThread();
    
        // If the request indicates a term that is greater than the current term then
        // assign that term and leader to the current context and step down as a candidate.
        if (request.term() >= context.getTerm()) { //如果term比我的大,说明已经有leader
          context.setTerm(request.term());
          context.transition(CopycatServer.State.FOLLOWER); //退化成follower
        }
        return super.append(request);
      }
    复制代码

    响应vote请求,

    复制代码
    @Override
      public CompletableFuture<VoteResponse> vote(VoteRequest request) {
        context.checkThread();
        logRequest(request);
    
        // If the request indicates a term that is greater than the current term then
        // assign that term and leader to the current context and step down as a candidate.
        if (updateTermAndLeader(request.term(), 0)) { //如果request term比我大,说明已经有leader
          CompletableFuture<VoteResponse> future = super.vote(request);
          context.transition(CopycatServer.State.FOLLOWER); //退化成follower
          return future;
        }
    
        // If the vote request is not for this candidate then reject the vote.
        if (request.candidate() == context.getCluster().member().id()) { //否则,只有request的candidate id是我,我才同意
          return CompletableFuture.completedFuture(logResponse(VoteResponse.builder()
            .withStatus(Response.Status.OK)
            .withTerm(context.getTerm())
            .withVoted(true)
            .build()));
        } else { //candidate不会同意其他的candidate
          return CompletableFuture.completedFuture(logResponse(VoteResponse.builder()
            .withStatus(Response.Status.OK)
            .withTerm(context.getTerm())
            .withVoted(false)
            .build()));
        }
      }
    复制代码

     

    LeaderState

    leaderState比较复杂单独开个blog

    展开全文
  • loj#3 -Copycat

    2018-07-04 21:43:00
    Copycat 内存限制:256 MiB 时间限制:1000 ms 输入文件: copycat.in 输出文件: copycat.out 题目描述 这道题用于测试文件输入输出,请注意使用文件输入输出,而非标准输入输出。 输入一个正整数 a,输出这个数 a。 ...
  • Copycat - AppendRequest

    2017-04-05 21:13:00
    对于Command,Configuration都要通过appendEntries的方式,把Entries同步给follower LeaderState.configure /** * Commits the given configuration. */ protected CompletableFuture<Long>...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 827
精华内容 330
关键字:

copycat