精华内容
下载资源
问答
  • netty案例,netty4.1中级拓展篇九《Netty集群部署实现跨服务端通信的落地方案》
    千次阅读
    2019-09-04 09:05:37

    前言介绍

    微信公众号:bugstack虫洞栈

    Netty的性能非常好,在一些小型用户体量的socket服务内,仅部署单台机器就可以满足业务需求。但当遇到一些中大型用户体量的服务时,就需要考虑讲Netty按照集群方式部署,以更好的满足业务诉求。但Netty部署集群后都会遇到跨服务端怎么通信,也就是有集群服务X和Y,用户A链接服务X,用户B链接服务Y,那么他们都不在一个服务内怎么通信?本章节将介绍一种实现方式案例,以满足跨服务之间的用户通信。但实际场景中需要进行一些扩展性改造,案例仅将核心主干思路做以实现,只是一种思路指导,并不能直接使用于业务开发。

    本章知识点

    • 跨服务之间案例采用redis的发布和订阅进行传递消息,如果你是大型服务可以使用zookeeper
    • 用户A在发送消息给用户B时候,需要传递B的channeId,以用于服务端进行查找channeId所属是否自己的服务内
    • 单台机器也可以启动多个Netty服务,程序内会自动寻找可用端口

    环境准备

    1、jdk1.8【jdk1.7以下只能部分支持netty】
    2、Netty4.1.36.Final【netty3.x 4.x 5每次的变化较大,接口类名也随着变化】
    3、NetAssist 网络调试助手,可以从网上下载也可以联系我ÿ

    更多相关内容
  • netty案例,netty4.1中级拓展篇九《Netty集群部署实现跨服务端通信的落地方案》源码 ...
  • 实战Netty集群

    千次阅读 2019-09-29 02:33:23
    1.1 实战Netty集群的理由 Java基础练习中,一个重要的实战练习是: java的聊天程序。基本上,每一个java工程师,都有写过自己的聊天程序。 实现一个Java的分布式的聊天程序的分布式练习,同样非常重要的是。有以下几...

    疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 -25【 博客园 总入口


    1.写在前面

    1.1 实战Netty集群的理由

    Java基础练习中,一个重要的实战练习是: java的聊天程序。基本上,每一个java工程师,都有写过自己的聊天程序。

    实现一个Java的分布式的聊天程序的分布式练习,同样非常重要的是。有以下几个方面的最重要作用:

    1 体验高并发的程序的开发:从研究承载千、万QPS级的流量,拓展能够承载百万级、千万级、亿万级流量

    2 有分布式、高并发的实战经验,面试谈薪水的时候,能提升不少

    3 Netty集群的分布式原理,和大数据的分布式原理,elasticsearch 的分布式原理,和redis集群的分布式原理,和mongodb的分布式原理,很大程度上,都是想通。 Netty集群作为一个实战开发, 是一个非常好的分布式基础练习

    4 更多的理由,请参考机械工业出版社的书籍 《Netty Zookeeper Redis 高并发实战》

    1.2 Netty 集群 实战源码

    本文的代码,来自于开源项目CrazyIm , 项目的地址为
    https://gitee.com/crazymaker/crazy_tourist_circle__im.git

    源码 目前已经完成了基本的通信,在不断迭代中,不少的群友,通过疯狂创客圈的QQ群,沟通迭代过程中的问题。

    2 Netty 集群中,服务节点的注册和发现

    2.1 服务节点的注册和发现

    zookeeper作为注册中心,每一个netty服务启动的时候,把节点的信息比如ip地址+端口号注册到zookeeper上。

    具体的原理,请参见书籍《Netty Zookeeper Redis 高并发实战》

    2.2 节点的POJO

    package com.crazymakercircle.entity;
    
    import lombok.Data;
    
    import java.io.Serializable;
    import java.util.Objects;
    
    /**
     * IM节点的POJO类
     * create by 尼恩 @ 疯狂创客圈
     **/
    @Data
    public class ImNode implements Comparable<ImNode>, Serializable {
    
        private static final long serialVersionUID = -499010884211304846L;
    
    
        //worker 的Id,zookeeper负责生成
        private long id;
    
        //Netty 服务 的连接数
        private Integer balance = 0;
    
        //Netty 服务 IP
        private String host;
    
        //Netty 服务 端口
        private Integer port;
    
        public ImNode() {
        }
    
        public ImNode(String host, Integer port) {
            this.host = host;
            this.port = port;
        }
    
    
        @Override
        public String toString() {
            return "ImNode{" +
                    "id='" + id + '\'' +
                    "host='" + host + '\'' +
                    ", port='" + port + '\'' +
                    ",balance=" + balance +
                    '}';
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ImNode node = (ImNode) o;
    //        return id == node.id &&
            return Objects.equals(host, node.host) &&
                    Objects.equals(port, node.port);
        }
    
        @Override
        public int hashCode() {
            return Objects.hash(id, host, port);
        }
    
        /**
         * 升序排列
         */
        public int compareTo(ImNode o) {
            int weight1 = this.balance;
            int weight2 = o.balance;
            if (weight1 > weight2) {
                return 1;
            } else if (weight1 < weight2) {
                return -1;
            }
            return 0;
        }
    
    
        public void incrementBalance() {
            balance++;
        }
    
        public void decrementBalance() {
            balance--;
        }
    }

    2.3 服务的发现

    利用zk有一个监听机制,就是针对某个节点进行监听,一点这个节点发生了变化就会收到zk的通知。我们就是利用zk的这个watch来进行服务的上线和下线的通知,也就是我们的服务发现功能。

    package com.crazymakercircle.imServer.distributed;
    
    import com.crazymakercircle.constants.ServerConstants;
    import com.crazymakercircle.entity.ImNode;
    import com.crazymakercircle.im.common.bean.msg.ProtoMsg;
    import com.crazymakercircle.imServer.protoBuilder.NotificationMsgBuilder;
    import com.crazymakercircle.util.JsonUtil;
    import com.crazymakercircle.util.ObjectUtil;
    import com.crazymakercircle.zk.ZKclient;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.cache.ChildData;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * create by 尼恩 @ 疯狂创客圈
     **/
    @Slf4j
    public class PeerManager {
        //Zk客户端
        private CuratorFramework client = null;
    
        private String pathRegistered = null;
        private ImNode node = null;
    
    
        private static PeerManager singleInstance = null;
        private static final String path = ServerConstants.MANAGE_PATH;
    
        private ConcurrentHashMap<Long, PeerSender> peerMap =
                new ConcurrentHashMap<>();
    
    
        public static PeerManager getInst() {
            if (null == singleInstance) {
                singleInstance = new PeerManager();
                singleInstance.client = ZKclient.instance.getClient();
            }
            return singleInstance;
        }
    
        private PeerManager() {
    
        }
    
    
        /**
         * 初始化节点管理
         */
        public void init() {
            try {
    
                //订阅节点的增加和删除事件
    
                PathChildrenCache childrenCache = new PathChildrenCache(client, path, true);
                PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
    
                    @Override
                    public void childEvent(CuratorFramework client,
                                           PathChildrenCacheEvent event) throws Exception {
                        log.info("开始监听其他的ImWorker子节点:-----");
                        ChildData data = event.getData();
                        switch (event.getType()) {
                            case CHILD_ADDED:
                                log.info("CHILD_ADDED : " + data.getPath() + "  数据:" + data.getData());
                                processNodeAdded(data);
                                break;
                            case CHILD_REMOVED:
                                log.info("CHILD_REMOVED : " + data.getPath() + "  数据:" + data.getData());
                                processNodeRemoved(data);
                                break;
                            case CHILD_UPDATED:
                                log.info("CHILD_UPDATED : " + data.getPath() + "  数据:" + new String(data.getData()));
                                break;
                            default:
                                log.debug("[PathChildrenCache]节点数据为空, path={}", data == null ? "null" : data.getPath());
                                break;
                        }
    
                    }
    
                };
    
                childrenCache.getListenable().addListener(childrenCacheListener);
                System.out.println("Register zk watcher successfully!");
                childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private void processNodeRemoved(ChildData data) {
    
            byte[] payload = data.getData();
            ImNode n = ObjectUtil.JsonBytes2Object(payload, ImNode.class);
    
            long id = ImWorker.getInst().getIdByPath(data.getPath());
            n.setId(id);
            log.info("[TreeCache]节点删除, path={}, data={}",
                    data.getPath(), JsonUtil.pojoToJson(n));
            PeerSender peerSender = peerMap.get(n.getId());
    
            if (null != peerSender) {
                peerSender.stopConnecting();
                peerMap.remove(n.getId());
            }
        }
    
        private void processNodeAdded(ChildData data) {
            byte[] payload = data.getData();
            ImNode n = ObjectUtil.JsonBytes2Object(payload, ImNode.class);
    
            long id = ImWorker.getInst().getIdByPath(data.getPath());
            n.setId(id);
    
            log.info("[TreeCache]节点更新端口, path={}, data={}",
                    data.getPath(), JsonUtil.pojoToJson(n));
    
            if (n.equals(getLocalNode())) {
                log.info("[TreeCache]本地节点, path={}, data={}",
                        data.getPath(), JsonUtil.pojoToJson(n));
                return;
            }
            PeerSender peerSender = peerMap.get(n.getId());
            if (null != peerSender && peerSender.getNode().equals(n)) {
    
                log.info("[TreeCache]节点重复增加, path={}, data={}",
                        data.getPath(), JsonUtil.pojoToJson(n));
                return;
            }
            if (null != peerSender) {
                //关闭老的连接
                peerSender.stopConnecting();
            }
            peerSender = new PeerSender(n);
            peerSender.doConnect();
    
            peerMap.put(n.getId(), peerSender);
        }
    
    
        public PeerSender getPeerSender(long id) {
            PeerSender peerSender = peerMap.get(id);
            if (null != peerSender) {
                return peerSender;
            }
            return null;
        }
    
    
        public void sendNotification(String json) {
            peerMap.keySet().stream().forEach(
                    key -> {
                        if (!key.equals(getLocalNode().getId())) {
                            PeerSender peerSender = peerMap.get(key);
                            ProtoMsg.Message pkg = NotificationMsgBuilder.buildNotification(json);
                            peerSender.writeAndFlush(pkg);
                        }
                    }
            );
    
        }
    
    
        public ImNode getLocalNode() {
            return ImWorker.getInst().getLocalNodeInfo();
        }
    
        public void remove(ImNode remoteNode) {
            peerMap.remove(remoteNode.getId());
            log.info("[TreeCache]移除远程节点信息,  node={}", JsonUtil.pojoToJson(remoteNode));
        }
    }
    

    2.4 为什么使用临时节点?

    什么是临时节点?服务启动后创建临时节点, 服务断掉后临时节点就不存在了

    正常的思路可能是注册的时候,我们像zk注册一个正常的节点,然后在服务下线的时候删除这个节点,但是这样的话会有一个弊端。比如我们的服务挂机,无法去删除临时节点,那么这个节点就会被我们错误的提供给了客户端。

    另外我们还要考虑持久化的节点创建之后删除之类的问题,问题会更加的复杂化,所以我们使用了临时节点。

    3 负载均衡策略

    3.1 负载均衡策略的基本思路

    在我们解决了服务的注册和发现问题之后,那么我们究竟提供给客户端那台服务呢,这时候就需要我们做出选择,为了让客户端能够均匀的连接到我们的服务器上(比如有个100个客户端,2台服务器,每台就分配50个),我们需要使用一个负载均衡的策略。

    这里我们使用轮询的方式来为每个请求的客户端分配ip。具体的代码实现如下:

    3.2 负载均衡实现源码的示意

    package com.crazymakercircle.Balance;
    
    import com.crazymakercircle.constants.ServerConstants;
    import com.crazymakercircle.entity.ImNode;
    import com.crazymakercircle.util.JsonUtil;
    import com.crazymakercircle.zk.ZKclient;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.curator.framework.CuratorFramework;
    import org.springframework.stereotype.Service;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    
    /**
     * create by 尼恩 @ 疯狂创客圈
     **/
    @Data
    @Slf4j
    @Service
    public class ImLoadBalance {
    
        //Zk客户端
        private CuratorFramework client = null;
        private String managerPath;
    
        public ImLoadBalance() {
            this.client = ZKclient.instance.getClient();
    //        managerPath=ServerConstants.MANAGE_PATH+"/";
            managerPath=ServerConstants.MANAGE_PATH;
        }
    
        /**
         * 获取负载最小的IM节点
         *
         * @return
         */
        public ImNode getBestWorker() {
            List<ImNode> workers = getWorkers();
    
            log.info("全部节点如下:");
            workers.stream().forEach(node -> {
                log.info("节点信息:{}", JsonUtil.pojoToJson(node));
            });
            ImNode best = balance(workers);
    
            return best;
        }
    
        /**
         * 按照负载排序
         *
         * @param items 所有的节点
         * @return 负载最小的IM节点
         */
        protected ImNode balance(List<ImNode> items) {
            if (items.size() > 0) {
                // 根据balance值由小到大排序
                Collections.sort(items);
    
                // 返回balance值最小的那个
                ImNode node = items.get(0);
    
                log.info("最佳的节点为:{}", JsonUtil.pojoToJson(node));
                return node;
            } else {
                return null;
            }
        }
    
    
        /**
         * 从zookeeper中拿到所有IM节点
         */
        protected List<ImNode> getWorkers() {
    
            List<ImNode> workers = new ArrayList<ImNode>();
    
            List<String> children = null;
            try {
                children = client.getChildren().forPath(managerPath);
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
    
            for (String child : children) {
                log.info("child:", child);
                byte[] payload = null;
                try {
                    payload = client.getData().forPath(managerPath+"/"+child);
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
                if (null == payload) {
                    continue;
                }
                ImNode worker = JsonUtil.jsonBytes2Object(payload, ImNode.class);
                workers.add(worker);
            }
            return workers;
    
        }
        /**
         * 从zookeeper中删除所有IM节点
         */
        public void removeWorkers() {
    
    
            try {
              client.delete().deletingChildrenIfNeeded().forPath(managerPath);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
    
        }
    
    }

    4 环境的启动

    4.1 启动Zookeeper

    Zookeeper的安装和原理,以及开发的基础知识,请参见书籍《Netty Zookeeper Redis 高并发实战》

    在这里插入图片描述
    启动zookeeper的两个节点,本来有三个,启动二个即可
    在这里插入图片描述

    客户端连接zookeeper集群。命令如下:

    ./zkCli.cmd -server localhost:2181

    在这里插入图片描述

    4.2 启动Redis

    Redis的安装和原理,以及开发的基础知识,请参见请参见书籍《Netty Zookeeper Redis 高并发实战》

    redis 的客户端界面。

    在这里插入图片描述

    5 Netty集群启动

    5.1 启动WEBGate

    使用一个WEBGate,作为负载均衡的服务器,具体的原理,请参见书籍《Netty Zookeeper Redis 高并发实战》

    在这里插入图片描述

    除了负载均衡,从WEBGate还可以从 zookeeper中删除所有IM节点

    连接为: http://localhost:8080/swagger-ui.html

    swagger 的界面如下:

    在这里插入图片描述

    5.2 启动第一个Netty节点

    服务端的端口为7000

    在这里插入图片描述

    5.3 启动第二个Netty节点

    服务端的端口为7001,自动递增的

    在这里插入图片描述

    5.4 启动第一个客户端

    启动后输入登录的信息

    请输入登录信息,格式为:用户名@密码

    z1@1

    启动客户端后,并且登录后,会自动连接一个netty节点, 这里为7001,第二个Netty服务节点。

    在这里插入图片描述

    5.5 启动第二个客户端

    启动后输入登录的信息

    请输入登录信息,格式为:用户名@密码

    z2@1

    启动客户端后,并且登录后,按照负载均衡的机制,会自动连接一个netty节点, 这里为7000,第一个Netty服务节点。

    在这里插入图片描述

    6 不同服务器直接进行IM通信

    ​ 下面演示,不同的客户端,通过各自的服务器节点,进行通信。

    6.1 发送聊天消息

    在第二个客户端(用户为z2),发送消息给第一个客户端(用户为z1),消息的格式为 :“ 内容@用户名”

    请输入聊天信息,格式为:内容@用户名

    hello@z1

    请输入聊天信息,格式为:内容@用户名

    helloworld@z1

    在这里插入图片描述

    6.2 远程客户端接收消息

    通过Netty服务节点的转发,第一个客户端收到的消息如下:

    收到消息 from uid:z2 -> hello

    收到消息 from uid:z2 -> helloworld

    在这里插入图片描述

    7 总结

    7.1 开发的难度

    通过Netty+Zookeep+Redis的架构,整个Netty的集群,具备了服务节点的自动发现,节点之间的消息路由的能力。

    说明一下,整个程序,还是比较复杂的,如果看不懂,建议不要捉急,慢慢来。

    如果能从0到1的自己实现一版,开发的水平,也就不一般了。

    全面的理论基础,请参见 《Netty Zookeeper Redis 高并发实战》 一书

    7.2 Netty集群的最全理论基础

    《Netty Zookeeper Redis 高并发实战》 一书,对Netty 集群的基本原理,进行了详尽的介绍,大致的目录如下:

    12.1 【面试必备】如何支撑亿级流量的高并发IM架构的理论基础

    12.1.1 亿级流量的系统架构的开发实践 338

    12.1.2 高并发架构的技术选型 338

    12.1.3 详解IM消息的序列化协议选型 339

    12.1.4 详解长连接和短连接 339

    12.2 分布式IM的命名服务的实践案例 340

    12.2.1 IM节点的POJO类 341

    12.2.2 IM节点的ImWorker类 342

    12.3 Worker集群的负载均衡之实践案例 345

    12.3.1 ImLoadBalance负载均衡器 346

    12.3.2 与WebGate的整合 348

    12.4 即时通信消息的路由和转发的实践案例 349

    12.4.1 IM路由器WorkerRouter 349

    12.4.2 IM转发器WorkerReSender 352

    12.5 Feign短连接RESTful调用 354

    12.5.1 短连接API的接口准备 355

    12.5.2 声明远程接口的本地代理 355

    12.5.3 远程API的本地调用 356

    12.6 分布式的在线用户统计的实践案例 358

    12.6.1 Curator的分布式计数器 358

    12.6.2 用户上线和下线的统计 360


    疯狂创客圈 Java 死磕系列

    • Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战

    • Netty 源码、原理、JAVA NIO 原理
    • Java 面试题 一网打尽
    • 疯狂创客圈 【 博客园 总入口 】

    转载于:https://www.cnblogs.com/crazymakercircle/p/11470287.html

    展开全文
  • Netty集群:Netty Zookeeper 高并发 实战

    千次阅读 2019-09-05 22:02:53
    写在前面1.1 实战Netty集群的理由1.2 Netty 集群 实战源码2 Netty 集群中,服务节点的注册和发现2.1 服务节点的注册和发现2.2 节点的POJO2.3 服务的发现2.4 为什么使用临时节点?3 负载均衡策略3.1 负载均衡策略的...

    疯狂创客圈 Java 分布式聊天室【 亿级流量】实战 【 博客园 总入口


    1.写在前面

    1.1 实战Netty集群的理由

    Java基础练习中,一个重要的实战练习是: java的聊天程序。基本上,每一个java工程师,都有写过自己的聊天程序。

    实现一个Java的分布式的聊天程序的分布式练习,同样非常重要的是。有以下几个方面的最重要作用:

    1 体验高并发的程序的开发:从研究承载千、万QPS级的流量,拓展能够承载百万级、千万级、亿万级流量

    2 有分布式、高并发的实战经验,面试谈薪水的时候,能提升不少

    3 Netty集群的分布式原理,和大数据的分布式原理,elasticsearch 的分布式原理,和redis集群的分布式原理,和mongodb的分布式原理,很大程度上,都是想通。 Netty集群作为一个实战开发, 是一个非常好的分布式基础练习

    4 更多的理由,请参考机械工业出版社的书籍 《Netty Zookeeper Redis 高并发实战》

    1.2 Netty 集群 实战源码

    本文的代码,来自于开源项目CrazyIm , 项目的地址为
    https://gitee.com/sfasdfasdfsdf/crazy_tourist_circle__im

    源码 目前已经完成了基本的通信,在不断迭代中,不少的群友,通过疯狂创客圈的QQ群,沟通迭代过程中的问题。

    2 Netty 集群中,服务节点的注册和发现

    2.1 服务节点的注册和发现

    zookeeper作为注册中心,每一个netty服务启动的时候,把节点的信息比如ip地址+端口号注册到zookeeper上。

    具体的原理,请参见书籍《Netty Zookeeper Redis 高并发实战》

    2.2 节点的POJO

    package com.crazymakercircle.entity;
    
    import lombok.Data;
    
    import java.io.Serializable;
    import java.util.Objects;
    
    /**
     * IM节点的POJO类
     * create by 尼恩 @ 疯狂创客圈
     **/
    @Data
    public class ImNode implements Comparable<ImNode>, Serializable {
    
        private static final long serialVersionUID = -499010884211304846L;
    
    
        //worker 的Id,zookeeper负责生成
        private long id;
    
        //Netty 服务 的连接数
        private Integer balance = 0;
    
        //Netty 服务 IP
        private String host;
    
        //Netty 服务 端口
        private Integer port;
    
        public ImNode() {
        }
    
        public ImNode(String host, Integer port) {
            this.host = host;
            this.port = port;
        }
    
    
        @Override
        public String toString() {
            return "ImNode{" +
                    "id='" + id + '\'' +
                    "host='" + host + '\'' +
                    ", port='" + port + '\'' +
                    ",balance=" + balance +
                    '}';
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ImNode node = (ImNode) o;
    //        return id == node.id &&
            return Objects.equals(host, node.host) &&
                    Objects.equals(port, node.port);
        }
    
        @Override
        public int hashCode() {
            return Objects.hash(id, host, port);
        }
    
        /**
         * 升序排列
         */
        public int compareTo(ImNode o) {
            int weight1 = this.balance;
            int weight2 = o.balance;
            if (weight1 > weight2) {
                return 1;
            } else if (weight1 < weight2) {
                return -1;
            }
            return 0;
        }
    
    
        public void incrementBalance() {
            balance++;
        }
    
        public void decrementBalance() {
            balance--;
        }
    }
    

    2.3 服务的发现

    利用zk有一个监听机制,就是针对某个节点进行监听,一点这个节点发生了变化就会收到zk的通知。我们就是利用zk的这个watch来进行服务的上线和下线的通知,也就是我们的服务发现功能。

    package com.crazymakercircle.imServer.distributed;
    
    import com.crazymakercircle.constants.ServerConstants;
    import com.crazymakercircle.entity.ImNode;
    import com.crazymakercircle.im.common.bean.msg.ProtoMsg;
    import com.crazymakercircle.imServer.protoBuilder.NotificationMsgBuilder;
    import com.crazymakercircle.util.JsonUtil;
    import com.crazymakercircle.util.ObjectUtil;
    import com.crazymakercircle.zk.ZKclient;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.cache.ChildData;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * create by 尼恩 @ 疯狂创客圈
     **/
    @Slf4j
    public class PeerManager {
        //Zk客户端
        private CuratorFramework client = null;
    
        private String pathRegistered = null;
        private ImNode node = null;
    
    
        private static PeerManager singleInstance = null;
        private static final String path = ServerConstants.MANAGE_PATH;
    
        private ConcurrentHashMap<Long, PeerSender> peerMap =
                new ConcurrentHashMap<>();
    
    
        public static PeerManager getInst() {
            if (null == singleInstance) {
                singleInstance = new PeerManager();
                singleInstance.client = ZKclient.instance.getClient();
            }
            return singleInstance;
        }
    
        private PeerManager() {
    
        }
    
    
        /**
         * 初始化节点管理
         */
        public void init() {
            try {
    
                //订阅节点的增加和删除事件
    
                PathChildrenCache childrenCache = new PathChildrenCache(client, path, true);
                PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
    
                    @Override
                    public void childEvent(CuratorFramework client,
                                           PathChildrenCacheEvent event) throws Exception {
                        log.info("开始监听其他的ImWorker子节点:-----");
                        ChildData data = event.getData();
                        switch (event.getType()) {
                            case CHILD_ADDED:
                                log.info("CHILD_ADDED : " + data.getPath() + "  数据:" + data.getData());
                                processNodeAdded(data);
                                break;
                            case CHILD_REMOVED:
                                log.info("CHILD_REMOVED : " + data.getPath() + "  数据:" + data.getData());
                                processNodeRemoved(data);
                                break;
                            case CHILD_UPDATED:
                                log.info("CHILD_UPDATED : " + data.getPath() + "  数据:" + new String(data.getData()));
                                break;
                            default:
                                log.debug("[PathChildrenCache]节点数据为空, path={}", data == null ? "null" : data.getPath());
                                break;
                        }
    
                    }
    
                };
    
                childrenCache.getListenable().addListener(childrenCacheListener);
                System.out.println("Register zk watcher successfully!");
                childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private void processNodeRemoved(ChildData data) {
    
            byte[] payload = data.getData();
            ImNode n = ObjectUtil.JsonBytes2Object(payload, ImNode.class);
    
            long id = ImWorker.getInst().getIdByPath(data.getPath());
            n.setId(id);
            log.info("[TreeCache]节点删除, path={}, data={}",
                    data.getPath(), JsonUtil.pojoToJson(n));
            PeerSender peerSender = peerMap.get(n.getId());
    
            if (null != peerSender) {
                peerSender.stopConnecting();
                peerMap.remove(n.getId());
            }
        }
    
        private void processNodeAdded(ChildData data) {
            byte[] payload = data.getData();
            ImNode n = ObjectUtil.JsonBytes2Object(payload, ImNode.class);
    
            long id = ImWorker.getInst().getIdByPath(data.getPath());
            n.setId(id);
    
            log.info("[TreeCache]节点更新端口, path={}, data={}",
                    data.getPath(), JsonUtil.pojoToJson(n));
    
            if (n.equals(getLocalNode())) {
                log.info("[TreeCache]本地节点, path={}, data={}",
                        data.getPath(), JsonUtil.pojoToJson(n));
                return;
            }
            PeerSender peerSender = peerMap.get(n.getId());
            if (null != peerSender && peerSender.getNode().equals(n)) {
    
                log.info("[TreeCache]节点重复增加, path={}, data={}",
                        data.getPath(), JsonUtil.pojoToJson(n));
                return;
            }
            if (null != peerSender) {
                //关闭老的连接
                peerSender.stopConnecting();
            }
            peerSender = new PeerSender(n);
            peerSender.doConnect();
    
            peerMap.put(n.getId(), peerSender);
        }
    
    
        public PeerSender getPeerSender(long id) {
            PeerSender peerSender = peerMap.get(id);
            if (null != peerSender) {
                return peerSender;
            }
            return null;
        }
    
    
        public void sendNotification(String json) {
            peerMap.keySet().stream().forEach(
                    key -> {
                        if (!key.equals(getLocalNode().getId())) {
                            PeerSender peerSender = peerMap.get(key);
                            ProtoMsg.Message pkg = NotificationMsgBuilder.buildNotification(json);
                            peerSender.writeAndFlush(pkg);
                        }
                    }
            );
    
        }
    
    
        public ImNode getLocalNode() {
            return ImWorker.getInst().getLocalNodeInfo();
        }
    
        public void remove(ImNode remoteNode) {
            peerMap.remove(remoteNode.getId());
            log.info("[TreeCache]移除远程节点信息,  node={}", JsonUtil.pojoToJson(remoteNode));
        }
    }
    
    

    2.4 为什么使用临时节点?

    什么是临时节点?服务启动后创建临时节点, 服务断掉后临时节点就不存在了

    正常的思路可能是注册的时候,我们像zk注册一个正常的节点,然后在服务下线的时候删除这个节点,但是这样的话会有一个弊端。比如我们的服务挂机,无法去删除临时节点,那么这个节点就会被我们错误的提供给了客户端。

    另外我们还要考虑持久化的节点创建之后删除之类的问题,问题会更加的复杂化,所以我们使用了临时节点。

    3 负载均衡策略

    3.1 负载均衡策略的基本思路

    在我们解决了服务的注册和发现问题之后,那么我们究竟提供给客户端那台服务呢,这时候就需要我们做出选择,为了让客户端能够均匀的连接到我们的服务器上(比如有个100个客户端,2台服务器,每台就分配50个),我们需要使用一个负载均衡的策略。

    这里我们使用轮询的方式来为每个请求的客户端分配ip。具体的代码实现如下:

    3.2 负载均衡实现源码的示意

    package com.crazymakercircle.Balance;
    
    import com.crazymakercircle.constants.ServerConstants;
    import com.crazymakercircle.entity.ImNode;
    import com.crazymakercircle.util.JsonUtil;
    import com.crazymakercircle.zk.ZKclient;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.curator.framework.CuratorFramework;
    import org.springframework.stereotype.Service;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    
    /**
     * create by 尼恩 @ 疯狂创客圈
     **/
    @Data
    @Slf4j
    @Service
    public class ImLoadBalance {
    
        //Zk客户端
        private CuratorFramework client = null;
        private String managerPath;
    
        public ImLoadBalance() {
            this.client = ZKclient.instance.getClient();
    //        managerPath=ServerConstants.MANAGE_PATH+"/";
            managerPath=ServerConstants.MANAGE_PATH;
        }
    
        /**
         * 获取负载最小的IM节点
         *
         * @return
         */
        public ImNode getBestWorker() {
            List<ImNode> workers = getWorkers();
    
            log.info("全部节点如下:");
            workers.stream().forEach(node -> {
                log.info("节点信息:{}", JsonUtil.pojoToJson(node));
            });
            ImNode best = balance(workers);
    
            return best;
        }
    
        /**
         * 按照负载排序
         *
         * @param items 所有的节点
         * @return 负载最小的IM节点
         */
        protected ImNode balance(List<ImNode> items) {
            if (items.size() > 0) {
                // 根据balance值由小到大排序
                Collections.sort(items);
    
                // 返回balance值最小的那个
                ImNode node = items.get(0);
    
                log.info("最佳的节点为:{}", JsonUtil.pojoToJson(node));
                return node;
            } else {
                return null;
            }
        }
    
    
        /**
         * 从zookeeper中拿到所有IM节点
         */
        protected List<ImNode> getWorkers() {
    
            List<ImNode> workers = new ArrayList<ImNode>();
    
            List<String> children = null;
            try {
                children = client.getChildren().forPath(managerPath);
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
    
            for (String child : children) {
                log.info("child:", child);
                byte[] payload = null;
                try {
                    payload = client.getData().forPath(managerPath+"/"+child);
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
                if (null == payload) {
                    continue;
                }
                ImNode worker = JsonUtil.jsonBytes2Object(payload, ImNode.class);
                workers.add(worker);
            }
            return workers;
    
        }
        /**
         * 从zookeeper中删除所有IM节点
         */
        public void removeWorkers() {
    
    
            try {
              client.delete().deletingChildrenIfNeeded().forPath(managerPath);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
    
        }
    
    }
    

    4 环境的启动

    4.1 启动Zookeeper

    Zookeeper的安装和原理,以及开发的基础知识,请参见书籍《Netty Zookeeper Redis 高并发实战》

    在这里插入图片描述
    启动zookeeper的两个节点,本来有三个,启动二个即可
    在这里插入图片描述

    客户端连接zookeeper集群。命令如下:

    ./zkCli.cmd -server localhost:2181

    在这里插入图片描述

    4.2 启动Redis

    Redis的安装和原理,以及开发的基础知识,请参见请参见书籍《Netty Zookeeper Redis 高并发实战》

    redis 的客户端界面。

    在这里插入图片描述

    5 Netty集群启动

    5.1 启动WEBGate

    使用一个WEBGate,作为负载均衡的服务器,具体的原理,请参见书籍《Netty Zookeeper Redis 高并发实战》

    在这里插入图片描述

    除了负载均衡,从WEBGate还可以从 zookeeper中删除所有IM节点

    连接为: http://localhost:8080/swagger-ui.html

    swagger 的界面如下:

    在这里插入图片描述

    5.2 启动第一个Netty节点

    服务端的端口为7000

    在这里插入图片描述

    5.3 启动第二个Netty节点

    服务端的端口为7001,自动递增的

    在这里插入图片描述

    5.4 启动第一个客户端

    启动后输入登录的信息

    请输入登录信息,格式为:用户名@密码

    z1@1

    启动客户端后,并且登录后,会自动连接一个netty节点, 这里为7001,第二个Netty服务节点。

    在这里插入图片描述

    5.5 启动第二个客户端

    启动后输入登录的信息

    请输入登录信息,格式为:用户名@密码

    z2@1

    启动客户端后,并且登录后,按照负载均衡的机制,会自动连接一个netty节点, 这里为7000,第一个Netty服务节点。

    在这里插入图片描述

    6 不同服务器直接进行IM通信

    ​ 下面演示,不同的客户端,通过各自的服务器节点,进行通信。

    6.1 发送聊天消息

    在第二个客户端(用户为z2),发送消息给第一个客户端(用户为z1),消息的格式为 :“ 内容@用户名”

    请输入聊天信息,格式为:内容@用户名

    hello@z1

    请输入聊天信息,格式为:内容@用户名

    helloworld@z1

    在这里插入图片描述

    6.2 远程客户端接收消息

    通过Netty服务节点的转发,第一个客户端收到的消息如下:

    收到消息 from uid:z2 -> hello

    收到消息 from uid:z2 -> helloworld

    在这里插入图片描述

    7 总结

    7.1 开发的难度

    通过Netty+Zookeep+Redis的架构,整个Netty的集群,具备了服务节点的自动发现,节点之间的消息路由的能力。

    说明一下,整个程序,还是比较复杂的,如果看不懂,建议不要捉急,慢慢来。

    如果能从0到1的自己实现一版,开发的水平,也就不一般了。

    全面的理论基础,请参见 《Netty Zookeeper Redis 高并发实战》 一书

    7.2 Netty集群的最全理论基础

    《Netty Zookeeper Redis 高并发实战》 一书,对Netty 集群的基本原理,进行了详尽的介绍,大致的目录如下:

    12.1 【面试必备】如何支撑亿级流量的高并发IM架构的理论基础

    12.1.1 亿级流量的系统架构的开发实践 338

    12.1.2 高并发架构的技术选型 338

    12.1.3 详解IM消息的序列化协议选型 339

    12.1.4 详解长连接和短连接 339

    12.2 分布式IM的命名服务的实践案例 340

    12.2.1 IM节点的POJO类 341

    12.2.2 IM节点的ImWorker类 342

    12.3 Worker集群的负载均衡之实践案例 345

    12.3.1 ImLoadBalance负载均衡器 346

    12.3.2 与WebGate的整合 348

    12.4 即时通信消息的路由和转发的实践案例 349

    12.4.1 IM路由器WorkerRouter 349

    12.4.2 IM转发器WorkerReSender 352

    12.5 Feign短连接RESTful调用 354

    12.5.1 短连接API的接口准备 355

    12.5.2 声明远程接口的本地代理 355

    12.5.3 远程API的本地调用 356

    12.6 分布式的在线用户统计的实践案例 358

    12.6.1 Curator的分布式计数器 358

    12.6.2 用户上线和下线的统计 360


    疯狂创客圈 Java 死磕系列

    • Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战
    展开全文
  • 分布式IM及Netty服务集群解决方案

    千次阅读 2021-09-23 17:07:43
    使用netty开发分布式Im,提供分布netty集群解决方案。服务端通过负载均衡策略与服务集群建立连接,消息发送通过服务间集群的通信进行消息转发。 二、集群架构 三、项目地址 https://github.com/beardlessCat/im,烦...

    一、概述

    使用netty开发分布式Im,提供分布netty集群解决方案。服务端通过负载均衡策略与服务集群建立连接,消息发送通过服务间集群的通信进行消息转发。

    二、集群架构

    架构图

    三、项目地址

    https://github.com/beardlessCat/im,烦请star

    1.客户端

    用户聊天客户端,客户端连接IM服务需要进行用户认证。用户认证成功之后,开始连接上线。

    2.服务路由

    服务路由负责将客户端的连接请求按照不同的负载均衡策略路由到不同的IM服务,建立长链接。负载均衡策略分为以下四种:

    • 一致性HASH负载均衡策略
    • 最少活跃数负载均衡策略
    • 随机调用负载均衡策略
    • 轮询调用负载均衡策略

    3.IM服务集群

    为了避免单节点故障,IM服务采用集群模式。集群内各个IM服务又互为对方的客户端,用于转发远程消息(消息接收客户端连接其他IM服务节点)。

    4.ZK集群

    ZK集群作为IM服务的注册中心,用户IM服务的注册与发现以及服务上线、下线的事件监听通知。通过node事件,控制IM服务之间连接的建立与断开。

    5.消息队列

    消息队列用户发送离线消息、聊天消息。

    6.MongoDB集群

    存储离线消息及聊天消息。

    7.Redis集群

    存储客户端的连接session信息(客户端与服务端连接的信息)

    四、netty集群方案

    首先需要明确一个问题,netty的channel是无法存储到redis里面的。netty的channel是一个连接,是和机器的硬件绑定的,无法序列化,计算存到redis里面,取出来也无法使用。

    1.ZK作为注册中心实现

    (1)channel无法存储的问题

    channel是无法存储到redis里面的,但是客户端和服务端的连接信息(例如:127.0.0.1:8080的服务端是127.0.0.1:9090)是可以存储到redis里面的,因此可以通过redis存储连接信息。key为客户端标识,value为服务端地址信息,获取客户端的连接时,直接通过客户端信息即可获取其服务信息。

    channel存储

    (2)服务端连接的问题

    客户端连接服务端时,客户端如何知道当前服务端有哪些,需要要连接哪个?这个问题可以通过ZK解决。使用ZK作为注册中心,服务端上线后在ZK中创建node,连接服务端时,从ZK获取在线节点信息,根据负载均衡策略选择服务端连接。

    ZK注册中心

    (3)消息转发的问题

    连接相同服务的客户端,可以直接通过获连接当前服取客户端信息进行消息的转发,那连接不同服务端消息如何转发?我们可以通过监听ZK中node的事件(node创建代表新的服务上线,node销毁代表服务下线),通过不同的事件方法,实现服务端之间的互相连接。

    消息转发

    2.redis订阅与广播实现(可替换为消息队列)

    redis支持消息订阅与发布机制机制(消息队列),可以使用该机制实现不同服务间的消息转发。在广播消息时,需要携带能唯一标识接收者身份的字段(例如clientId)。消息广播结束后,所有服务端会
    收到该消息,服务端仅仅需要判断该消息接收者的是否是连接的自己作为服务端。若发现该接收者正是连接的自己,则直接将消息转发到该客户端即可。

    消息转发

    五、核心功能

    1.netty服务节点的注册与发现

    2.netty服务节点的负载均衡策略

    2.netty服务节点的消息转发

    展开全文
  • 基于Netty实现websocket集群部署实现方案 每天多学一点点~ 话不多说,这就开始吧… 文章目录基于Netty实现websocket集群部署实现方案1.前言2. 整体思路3. 代码demo4. 测试5.结语 1.前言 最近公司在做saas平台,其中...
  • springcloud和netty的结合集群,采用nacos注册中心,gateway网关
  • 基于redis搭建netty tcp通讯集群方案简介一、集群原理二、项目依赖三、部分代码 简介 在实际应用中我们的tcp服务端经常会使用集群方式运行,这样增大了系统的性能和容灾,本文讲述简单的netty tcp服务端集群应用原理...
  • Netty之——做集群 channel共享方案

    千次阅读 2019-09-18 13:14:11
    netty做集群 channel如何共享? 方案一: netty 集群,通过rocketmq等MQ 推送到所有netty服务端, channel 共享无非是要那个通道都可以发送消息向客户端, ...netty集群,添加注册中心,实现...
  • 索引 概述 当前项目是由蘑菇街的TeamTalk协议,完全用JAVA生态重构而成。以用于线上业务使用,并适当进行了调整,基本上还是与原Teamtalk协议保持兼容。 大家在开发过程中遇到问题,请提交到。 ...
  • Netty zookeeper 集群 实战 (CrazyIM)

    千次阅读 2019-09-07 00:44:14
    1.Netty Zookeeper 集群的实战的意义 完成一个Netty Zookeeper 分布式集群的肉搏实战, 有以下几个方面的最重要作用: 增加一次高并发的程序的亲身体验(重要): ​ 从学习千、万QPS级的流量,拓展能够cover...
  • netty实现简单的rpc,支持服务集群前言简介环境准备Netty 处理器链设计消费者RPC代理工厂设计netty rpc消费者核心设计netty rpc生产者核心设计服务注册、发现以集群演示Demo尾言相关链接 前言 简介 最近了解了下...
  • Netty-SocketIO 集群解决方案

    千次阅读 2019-12-12 13:41:00
    Netty-SocketIO 集群解决方案 Netty-SocketIO作为一个Socket框架,使用非常方便,并且使用Netty开发性能也有保证。 但是,我在使用Netty-SocketIO框架时,却发现,国内的资料比较少,虽然有些Demo级别的技术分享,...
  • netty额外知识点
  • netty集群 channel共享 方案

    千次阅读 2019-01-22 18:28:00
    netty做集群 channel如何共享? 方案一: netty 集群,通过rocketmq等MQ 推送到所有netty服务端, channel 共享无非是要那个通道都可以发送消息向客户端, ...MQ广播+ 多Netty ,Netty收到MQ...netty集群,添加注...
  • Websocket是一种保持长连接的技术,并且是双向的,从HTML5开始加入,并非完全基于HTTP,适合于频繁和较大流量的双向通讯场景,是服务器推送消息功能的最佳实践。而实现websocket的最佳方式,就是netty——yyds!!!
  • 为了满足消息推送的需求和增强推送系统的性能,采用Netty网络编程框架并搭建消息推送服务器集群,使用TCP链接发送心跳包,以保持和维护连接状态进行消息推送。通过性能测试,结果表明服务器集群可以分散链接压力,...
  • 主要简介是Netty-SocketIO技术的使用和介绍,实时推送技术的介绍
  • Netty TCP方式的集群方案

    万次阅读 2018-08-31 11:14:10
    :zookeeper(集群),服务启动时在zookeeper的一个持久节点下创建临时顺序节点,服务尽量保持一般偏低负载(防止某一台tcp服务器挂掉之后,客户端连接其他的服务导致其他的服务挂掉,引起雪崩)。 负载均衡 :...
  • Netty:它使 NIO 编程更加容易,屏蔽了 Java 底层的 NIO 细节。 Protostuff:它基于 Protobuf 序列化框架,面向 POJO,无需编写 .proto 文件。 ZooKeeper:提供服务注册与发现功能,开发分布式系统的必备选择,同时...
  • netty能不能集群

    千次阅读 2016-07-29 10:11:57
    客户端通过进程名到zk拉去可用的netty服务,并订阅zk的服务端的事件,在客户端处理分布式逻辑。&hellip;&hellip;&hellip;&hellip;&hellip;&hellip;&hellip;&hellip;&hellip;&...
  • //集群通过Redis进行数据通道交互,配置如下 Config redissonConfig = new Config(); redissonConfig.useSingleServer().setPassword(“xxxxxx”).setAddress(“http://127.0.0.1:6379”); RedissonClient redisson...
  • 使用 Netty 本地模拟一个 DHT 网络节点加入到 DHT 网络中去(即向启动节点发送 find_node 请求) 收到 find_node 回复解析出更多的 DHT 节点信息,向这些节点发送 find_node 节点(目的就是让更多的人知道自己,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 33,490
精华内容 13,396
关键字:

netty集群