精华内容
下载资源
问答
  • java实现代理服务器

    万次阅读 2017-03-14 01:23:36
    前束本篇博客没有给出一个完善的技术解决方案,使用java搭建代理服务器是处理本人想要实现主备切换的broker而产生的想法,由于能力,时间和精力的限制,目前只能将想法的大致内容实践一下,这里只是分享一些这两天在...

    前束

    本篇博客没有给出一个完善的技术解决方案,使用java搭建代理服务器是处理本人想要实现主备切换的broker而产生的想法,由于能力,时间和精力的限制,目前只能将想法的大致内容实践一下,这里只是分享一些这两天在这上面做出的一些探索,以免其他人重蹈覆辙。

    简述

    需求简述为通过JAVA实现一个代理服务器,客户端的Http请求全部指向代理服务器,代理服务器基于一定的策略将请求转发给后台服务器,(这里的策略就比较核心,能够实现很多特殊的需求,比如我想实现的主备切换的功能,通过添加主机检测模块,如果主机检测模块检测到主服务器down了,那么程序能够顺利的迁移到备用服务器上)。

    技术准备

    Http Request和Response的基础知识
    java socket/Serversocket 类的使用技巧

    问题分析

    问题可以简化为
    1代理服务主机将客户机与代理服务主机之间的请求截获,然后直接将信息转发给目标主机
    2代理服务主机将目标主机的回应直接转发给客户机。

    那么
    代理服务主机可以通过socket直接获取客户机发送到本机指定端口的数据报文,那么通过分析内容可以得到客户机想要发送给目标主机的信息

    代理服务主机可以通过httpRequest的方式再次重新请求,将原来请求中的内容转发给目标主机,之后将目标主机的回应组织成Http Response的形式转发给客户机。

    Demo

    代码中只演示的代理服务器中的通信过程,中间的源信息的分析 以及返回值得构造还没有做,另外还存在多线程控制等问题,建议参考者将精力花在解决以上问题上面。

    import java.net.* ;
    import java.io.* ;
    class ActionSocket extends Thread{
        private Socket socket = null ;
        public ActionSocket(Socket s){
            this.socket = s ;
        }
        public void run(){
            try{
                this.action() ;
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        public void action() throws Exception {
            if (this.socket == null){
                return ;
            }
            InputStream cis = this.socket.getInputStream();
    
            URL url = new URL("http://www.baidu.com");
            HttpURLConnection action = (HttpURLConnection) url.openConnection();
            InputStream sis =action.getInputStream();
            OutputStream cos = socket.getOutputStream();
            int length;
            byte bytes[] = new byte[1024];
    
            while(true){
                try {
                    if ((length = sis.read(bytes)) > 0) {
                        cos.write(bytes, 0, length);//将http请求头写到目标主机
                        System.out.println(new String(bytes));
                        cos.flush();
                    } else if (length < 0)
                    break;
                } catch (Exception e) {
                }
            }
            socket.close();
            cis.close();
        }
    }
    public class SocketServer{
        public static void main(String args[])throws Exception{
            ServerSocket server  = new ServerSocket(8888);
            while(true){
                Socket socket = server.accept();
                ActionSocket ap = new ActionSocket(socket);
                ap.start();
            }
        }
    }
    

    参考

    http://www.fullstackdevel.com/computer-tec/network/731.html
    http://stackoverflow.com/questions/6876266/java-net-connectexception-connection-refused
    http://www.freebuf.com/articles/web/21832.html

    展开全文
  • node.js搭建代理服务器实现跨域

    千次阅读 2017-04-24 12:39:07
    node.js搭建代理服务器实现跨域前后端分离,本地前端开发调用接口会有跨域问题,一般有以下3种解决方法: 后端接口打包到本地运行(缺点:每次后端更新都要去测试服下一个更新包,还要在本地搭建java运行环境,麻烦...

    node.js搭建代理服务器实现跨域


    前后端分离,本地前端开发调用接口会有跨域问题,一般有以下3种解决方法:

    1. 后端接口打包到本地运行(缺点:每次后端更新都要去测试服下一个更新包,还要在本地搭建java运行环境,麻烦)
    2. CORS跨域:后端接口在返回的时候,在header中加入’Access-Control-Allow-origin’:* 之类的。
    3. 用nodejs搭建本地http服务器,并且判断访问接口URL时进行转发,完美解决本地开发时候的跨域问题。

    node.js提供模块:node-http-proxy来实现http代理

    源码如下:

    /**
     * Created by zhaohuan on 2017/4/19.
     */
    // (function creatserver(pathurl) {
    
        var http=require('http');
        var fs=require('fs');
        var mime=require('mime');
        var urls=require('url');
        var pathurl='/Users/zhaohuan/login';
        //以上路径为html,css,js的路径
        //mark:在终端输入pwd获取路径
        var httpProxy = require('http-proxy');
    
    
    var proxy = httpProxy.createProxyServer({
        target: 'http://139.224.235.133:8099/',   //接口地址
        //创建一个代理,从以上路径获取数据。
    });
    
    //本地客户端请求服务器端的数据(跨域):客户端向代理服务器发起请求,
    代理服务器接到请求,向目标服务器发起请求。
    
    http.createServer(function (req,res) {
    
            var url=urls.parse(req.url).pathname;
            //req.url:localhost:8116/index.html
            //url:/index.html
            var realPath=pathurl+url;
    
    
        if(url.indexOf("user") > 0||url.indexOf("login") > 0){
        //当请求为user,login需要代理服务器请求远端服务器数据
            proxy.web(req, res);
            return;
        }
    
            fs.readFile(realPath,function (err,data) {
                if(data){
                        res.writeHead(200,{
                            'Content-type':mime.lookup(realPath)
                           //获取请求文件后缀                   
                        });
                    res.end(data);
                }
            });
        }).listen(8888);
        console.log('服务器启动');

    mime.lookup(realPath)

    以上是使用mime.lookup(realPath)获取的文件后缀。

    对应的客户端js代码如下:

    //使用ui.router实现路由机制
     var loginapp = angular.module('loginapp', ['ui.router']);
    
        loginapp.controller('routselect', function ($scope) {
            $scope.message = '配置完成';
        });
    
        loginapp.config(function ($stateProvider, $urlRouterProvider) {
    
            $urlRouterProvider
                .when('/', '/login')//默认页面
                .otherwise("/login");//重定向
    
            $stateProvider
                .state('login', {
                    url: "/login",
                    templateUrl: 'index1.html',
                    controller: 'loginC'
                })
                .state('success', {
                    url: "/success",
                    templateUrl: 'index2.html',
                    controller: 'loginsuccessC'
                });
        });
    
        loginapp.controller('loginC', function ($scope, $state,$http) {
    //验证码(因为此页面实现的是登陆获取权限操作数据库)
            $scope.unicode = (function () {
                var codes = [];
                //数字
                for (var i = 48; i < 57; codes.push(i), i++);
                //大写字母
                for (var i = 60; i < 90; codes.push(i), i++);
                //小写字母
                for (var i = 97; i < 122; codes.push(i), i++);
                var arr = [];
                for (var i = 0; i < 4; i++) {
                    var index = Math.floor(Math.random() * (61) + 0);
                    var char = String.fromCharCode(codes[index]);
                    arr.push(char);
                }
                var code = arr.join("");
                $scope.yzm = code;
            })();
    
            $scope.ajax = function (user, password, yzm) {
                if (yzm == $scope.yzm) {
                    $http({
                        method: 'GET',
                        url: 'login',//对应的代理地址通过代理获取跨域接口http://139.224.235.133:8099/login
                        headers: {
                            "Authorization": "Basic " + btoa(user + ":" + password)
                        },
                    }).then(function successCallback(response) {
                        console.log(response);
                        $state.go('success');//页面跳转
                        //注意:使用路由是文件名不可用中文
                    })
                }
                else {
                    alert('请输入正确的验证码')
                }
            }
        });
    展开全文
  • MQTT代理服务器开发) 本例将基于JMQTT进行二次开发作为MQTT代理(broker服务器)。由于JMQTT需要配置启动环境变量,比较麻烦并且对初学者不利,环境变量是为了系统运行更快而设置的,jmtqq不是频繁读取系统变量,...

    MQTT代理(服务器开发)

    本例将基于JMQTT进行二次开发作为MQTT代理(broker服务器)。由于JMQTT需要配置启动环境变量,比较麻烦并且对初学者不利,环境变量是为了系统运行更快而设置的,jmtqq不是频繁读取系统变量,故将其屏蔽掉。

    本MQTT服务器绕过了环境变量设置,系统自动建立运行所需的配置文件,但需要创建一下两个类进行配置。

    JMQTT功能特性

    • 基于Java及Netty开发,插件化模式,高性能,高扩展性。
    • 支持mqtt协议qos0,qos1,qos2消息质量服务。
    • 支持mqtt协议cleansession,retain,will等消息服务。
    • 完整支持mqtt Topic匹配过滤。
    • 支持websocket协议。
    • 支持RocksDB进行数据本地存储,数据高可靠。

    JMQTT架构设计图:

    50dcc569810ead55b71e02bb6fd025fe.png

    模块简介及本地环境

    • broker:mqtt协议层,逻辑处理,BrokerStartup为启动类,BrokerController为初始化类,初始化所有的必备环境,其中acl,store的插件配置也必须在这里初始化。
    • common:公共层,存放工具类,bean类等。
    • remoting:通信层,连接管理,协议解析,心跳等。
    • distribution:配置模块,主要是配置文件,启停命令等存放。
    • example:客户端示例,目前只有java以及websocket。
    • group:集群管理模块:消息传输,集群管理,以及相关运维功能实现。
    • store:存储模块,提供了mqtt协议数据的几个接口,支持基于内存的和Rocksdb的本地存储。

    基于JMQTT二次开发使用步骤:

    • 在JMQTT官方网站中下载或者Clone JMQTT项目:https://gitee.com/cuteTree/jmqtt/
    • 在根目录执行:mvn -Ppackage-all -DskipTests clean install -U 将JMQTT项目打成可导入jar包。
    • 使用Maven分别导入broker,common,remoting,store包,效果如下(MAVEN将自然导入相关联jar包):

    3beb5fc45a317d33df1592643a3531ca.png
    • 在此之前需要导入3个jar包

    bbe9d14fcf23f9f9cf490b5c101549d7.png
    • 配置类书写(使用自己的配置文件,并在start方法中启动服务器):
      MQTTService.java
    package org.jmqtt.broker.brokersever;
    import java.awt.EventQueue;
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.io.StringWriter;
    import java.util.List;
    import javax.swing.JOptionPane;
    import df.am.service.ServiceRouter;
    public class MQTTService implements ServiceRouter {
        @Override
        public boolean isSmooth() {
            // 只要进来, 说明服务已经加载,但需要测试访问一下。
            return true;
        }
        @Override
        public String router(String arg0) {
            // TODO Auto-generated method stub
            return null;
        }
        /**
         * 停止服务
         * */
        public void stop() {
            Thread newThread = new Thread();
            Thread t = new Thread(new Runnable() {
                public void run() {
                    // 启动服务器
                    MQTTServer mqttServer = new MQTTServer();
                    mqttServer.stop();
                }
            });
            t.start();
        }
        @Override
        public boolean start() {
            try {
                /*
                 * 使用自己的配置文件。检查配置文件,如果配置文件出问题,则启动失败 配置文件与当前jar文件在同一个目录之下
                 */
    
                // 获取当前运行jar文件所在的目录
                String basePath = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
                if(basePath.replace("", "/").toLowerCase().endsWith("jar")) {
                    basePath=basePath.substring(0,basePath.lastIndexOf("/")+1);
                }
                basePath=basePath.substring(1, basePath.length()-1);
                File config = new File(basePath + "/conf");
                if (!config.exists()) {
                    //如果配置文件丢失,就自己建一套
                    config.mkdir();
                }
                File jmqttproperties=new File(basePath + "/conf/jmqtt.properties");
                if (!jmqttproperties.exists()) {
                    //如果配置文件丢失,就自己建一套
                    //JOptionPane.showMessageDialog(null, basePath + "/conf/jmqtt.properties", "提示", JOptionPane.ERROR_MESSAGE);
                    jmqttproperties.createNewFile();
                    FileOutputStream fis = new FileOutputStream(jmqttproperties);
                    FileWriter fw;
                    fw=new FileWriter(jmqttproperties,true);
                    String lineEnd="";
                    lineEnd+="#MQTT tcp服务端口rn" +
                            "port=1883rn" +
                            "#消息大小限制rn" +
                            "maxMsgSize=524288rn" +
                            "rn" +
                            "#存储选型 1.rocksdb  2.redis   3.default memoryrn" +
                            "storeType=1rn" +
                            "rocksDbPath=rocksdb.dbrn";
    
                    fw.write(lineEnd);
                    fw.flush();
                    fw.close();
                }
    
                File logback_broker=new File(basePath + "/conf/logback_broker.xml");
                if(!logback_broker.exists()) {
                    logback_broker.createNewFile();
                    FileOutputStream fis = new FileOutputStream(logback_broker);
                    FileWriter fw;
                    fw=new FileWriter(logback_broker,true);
                    String lineEnd="";
                    lineEnd+="<?xml version="1.0" encoding="UTF-8"?>rn" +
                            "rn" +
                            "<configuration>rn" +
                            "    <!--<property name="user.home" value="" />-->rn" +
                            "rn" +
                            "    <!-- console -->rn" +
                            "    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">rn" +
                            "        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">rn" +
                            "            <Pattern>%date{yyyy-MM-dd HH:mm:ss.SSS} %-5level[%thread]%logger{56}.%method:%L -%msg%n</Pattern>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- other.log -->rn" +
                            "    <appender name="JmqttOtherAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/store.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/cluster.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttOtherAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JJmqttOtherAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- cluster.log -->rn" +
                            "    <appender name="JmqttClusterAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/store.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/cluster.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttClusterAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JJmqttClusterAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- store.log -->rn" +
                            "    <appender name="JmqttStoreAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/store.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/store.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttStoreAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JJmqttStoreAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- remoting.log -->rn" +
                            "    <appender name="JmqttRemotingAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/remoting.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/remoting.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttRemotingAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JmqttRemotingAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- messageTrace.log -->rn" +
                            "    <appender name="JmqttMessageTraceAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/messageTrace.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/messageTrace.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttMessageTraceAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JmqttMessageTraceAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- broker.log -->rn" +
                            "    <appender name="JmqttBrokerAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/broker.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/broker.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttBrokerAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JmqttBrokerAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- clientTrace.log -->rn" +
                            "    <appender name="JmqttClientTraceAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/clientTrace.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/clientTrace.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttClientTraceAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JmqttClientTraceAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <logger name="brokerLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttBrokerAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <logger name="clientTraceLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttClientTraceAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <logger name="messageTraceLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttMessageTraceAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <logger name="remotingLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttRemotingAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <logger name="storeLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttStoreAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <logger name="clusterLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttClusterTraceAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <logger name="otherLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttOtherTraceAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <root>rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </root>rn" +
                            "</configuration>rn" +
                            "";
                    fw.write(lineEnd);
                    fw.flush();
                    fw.close();
                }
                // 加载文件
                String argStr="";
                argStr+="-h "+basePath+"/ -c "+basePath+"/conf/jmqtt.properties";
                String[] args=argStr.split(" ");
                // 启动服务
                Thread newThread = new Thread();
                Thread t = new Thread(new Runnable() {
                    public void run() {
                        // 启动服务器
                        MQTTServer mqttServer = new MQTTServer();
                        mqttServer.start(args);
                    }
                });
                t.start();
                //打开服务页面
                return true;
            } catch (Exception ex) {
                StringWriter stringWriter = new StringWriter();
                ex.printStackTrace(new PrintWriter(stringWriter));
                JOptionPane.showMessageDialog(null, "mqtt服务启动错误:" +stringWriter.toString(), "提示", JOptionPane.ERROR_MESSAGE);
                return false;
            }
        }
        public static void main(String[] args) {
            MQTTService mqttService = new MQTTService();
            mqttService.start();
        }
        public String ReadConfig(File propertyfile, String configKey) throws IOException {
            FileInputStream fis = new FileInputStream(propertyfile);
            InputStreamReader isr = new InputStreamReader(fis);
            BufferedReader br = new BufferedReader(isr);
            String temp = "";
            for (int j = 1; (temp = br.readLine()) != null; j++) {
                if(temp.startsWith("#")) {
                    continue;
                }else {
                    String[] property= temp.trim().split("=");
                    if(property[0].trim().contentEquals(configKey)) {
                        return property[1];
                    }
                }
            }
            return "";
        }
    }

    MQTTServer.java(启动JMQTT中的BrokerStartup ):

    package org.jmqtt.broker.brokersever;
    
    import df.util.log.biz.LogUtil;
    import org.jmqtt.broker.BrokerController;
    import org.jmqtt.broker.BrokerStartup;
    
    import javax.swing.*;
    import java.awt.*;
    import java.io.*;
    
    public class MQTTServer {
        public static BrokerStartup broker;
        public static BrokerController brokerController;
    
        public void start(String[] args) {
            broker=new BrokerStartup();
            try {
                brokerController=broker.start(args);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        public void stop() {
            if(brokerController!=null){
                brokerController.shutdown();
                brokerController=null;
            }
        }
    
        public static void autoRun() {
            try {
                File directory = new File("");
                String baseDirectory = directory.getAbsolutePath();
                String autorunPath = "";
                if (baseDirectory.endsWith("")) {
                    autorunPath = """+baseDirectory + "MQTTService.jar"+ "  autorun "+baseDirectory+""  ";
                } else {
                    autorunPath =  """+baseDirectory + "MQTTService.jar"+ "  autorun "+baseDirectory+"" ";
                }
                String regKey = ""HKEY_CURRENT_USERSoftwareMicrosoftWindowsCurrentVersionRun"";// 注册表的key
                String myAppName = "MQTTService";
                //String kStr=commandPrefix + key + " /v  Zhakong  /t REG_SZ /d " + autorunPath + " /f";
                String kStr="reg "+"add "+regKey+" /v "+myAppName+" /t reg_sz /d "+autorunPath;
                Runtime.getRuntime().exec(kStr);
            } catch (Exception ex) {
                LogUtil.debugInfo("uitl", "注册表书写错误:"+ex.getMessage());
            }
        }
    
    }

    至此,MQTT代理服务器搭建完毕。

    搭建MQTT服务器方法2:直接导入Jar包(以上服务器搭建集成的jar包)

    启动处调用代码如下:

        MQTTService mqttService = new MQTTService();
        mqttService.start();       

    启动后,MQTTService会自动在jar文件的根目录下创建默认的MQTT配置,并使用这些配置启动服务。如果需要,请更改这些文件中的配置满足自己的私欲。

    MQTT模拟客户端测试

    开启服务器后,Producer推送一条信息,Consumer订阅本条信息,需导入本包

    b6ce998770d3ad11796b08592aac9492.png

    创建broker服务器:

    Mqtt_Server.java

    package org.jmqtt.broker;
    import org.jmqtt.broker.brokersever.MQTTService;
    public class Mqtt_Server {
        public static void main(String[] args) {
            MQTTService mqttService = new MQTTService();
            mqttService.start();
        }
    }

    Produce.java:

    package org.jmqtt.java;
    
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.jmqtt.pojo.ReportData;
    
    public class Producer {
        private static final String broker = "tcp://127.0.0.1:1883";
        private static final String content = "Message from MqttProducer";
        private static ReportData reportData=new ReportData();
        private static final int qos = 1;
        private static final String topic = "MQTT/TOPIC";
        private static final String clientId = "MQTT_PUB_CLIENT";
    
        public static void main(String[] args) throws MqttException, InterruptedException {
            MqttClient pubClient = getMqttClient();
            for(int i = 0; i < 3; i++){
                MqttMessage mqttMessage = getMqttMessage();
                pubClient.publish(topic,mqttMessage);
                System.out.println("Send message success.");
            }
        }
    
        private static MqttMessage getMqttMessage(){
            MqttMessage mqttMessage = new MqttMessage(reportData.toString().getBytes());
            mqttMessage.setQos(qos);
            return mqttMessage;
        }
    
        private static MqttClient getMqttClient(){
            try {
                MqttClient pubClient = new MqttClient(broker,clientId,new MemoryPersistence());
                MqttConnectOptions connectOptions = new MqttConnectOptions();
                connectOptions.setWill("lwt","this is a will message".getBytes(),1,false);
                connectOptions.setCleanSession(true);
                System.out.println("Connecting to broker: " + broker);
                pubClient.connect(connectOptions);
                return pubClient;
            } catch (MqttException e) {
                e.printStackTrace();
            }
            return null;
        }
    }

    Consumer.jar:

    package org.jmqtt.java;
    
    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class Consumer {
        private static final String broker = "tcp://127.0.0.1:1883";
        private static final String topic = "MQTT/+";
        private static final String clientId = "MQTT_SUB_CLIENT";
    
        public static void main(String[] args) throws MqttException, InterruptedException {
            MqttClient subClient = getMqttClient();
            subClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable throwable) {
                    System.out.println("Connect lost,do some thing to solve it");
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    System.out.println("From topic: " + s);
                    System.out.println("Message content: " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    
                }
            });
            subClient.subscribe(topic);
        }
    
    
        private static MqttClient getMqttClient(){
            try {
                MqttClient pubClient = new MqttClient(broker,clientId,new MemoryPersistence());
                MqttConnectOptions connectOptions = new MqttConnectOptions();
                connectOptions.setCleanSession(true);
                System.out.println("Connecting to broker: " + broker);
                pubClient.connect(connectOptions);
                return pubClient;
            } catch (MqttException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    展开全文
  • MQTT代理服务器开发) 本例将基于JMQTT进行二次开发作为MQTT代理(broker服务器)。由于JMQTT需要配置启动环境变量,比较麻烦并且对初学者不利,环境变量是为了系统运行更快而设置的,jmtqq不是频繁读取系统变量,...

    MQTT代理(服务器开发)

    本例将基于JMQTT进行二次开发作为MQTT代理(broker服务器)。由于JMQTT需要配置启动环境变量,比较麻烦并且对初学者不利,环境变量是为了系统运行更快而设置的,jmtqq不是频繁读取系统变量,故将其屏蔽掉。

    本MQTT服务器绕过了环境变量设置,系统自动建立运行所需的配置文件,但需要创建一下两个类进行配置。

    JMQTT功能特性

    • 基于Java及Netty开发,插件化模式,高性能,高扩展性。
    • 支持mqtt协议qos0,qos1,qos2消息质量服务。
    • 支持mqtt协议cleansession,retain,will等消息服务。
    • 完整支持mqtt Topic匹配过滤。
    • 支持websocket协议。
    • 支持RocksDB进行数据本地存储,数据高可靠。

    JMQTT架构设计图:

    ce05e63d1aa29b9761463df293bf875d.png

    模块简介及本地环境

    • broker:mqtt协议层,逻辑处理,BrokerStartup为启动类,BrokerController为初始化类,初始化所有的必备环境,其中acl,store的插件配置也必须在这里初始化。
    • common:公共层,存放工具类,bean类等。
    • remoting:通信层,连接管理,协议解析,心跳等。
    • distribution:配置模块,主要是配置文件,启停命令等存放。
    • example:客户端示例,目前只有java以及websocket。
    • group:集群管理模块:消息传输,集群管理,以及相关运维功能实现。
    • store:存储模块,提供了mqtt协议数据的几个接口,支持基于内存的和Rocksdb的本地存储。

    基于JMQTT二次开发使用步骤:

    • 在JMQTT官方网站中下载或者Clone JMQTT项目:https://gitee.com/cuteTree/jmqtt/
    • 在根目录执行:mvn -Ppackage-all -DskipTests clean install -U 将JMQTT项目打成可导入jar包。
    • 使用Maven分别导入broker,common,remoting,store包,效果如下(MAVEN将自然导入相关联jar包):

    0c7ba288e9f9d5001f4bd40b8fbe3e78.png
    • 在此之前需要导入3个jar包

    ecfc69b81dc64c8c985f404cf4ec9295.png
    • 配置类书写(使用自己的配置文件,并在start方法中启动服务器):
      MQTTService.java
    package org.jmqtt.broker.brokersever;
    import java.awt.EventQueue;
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.io.StringWriter;
    import java.util.List;
    import javax.swing.JOptionPane;
    import df.am.service.ServiceRouter;
    public class MQTTService implements ServiceRouter {
        @Override
        public boolean isSmooth() {
            // 只要进来, 说明服务已经加载,但需要测试访问一下。
            return true;
        }
        @Override
        public String router(String arg0) {
            // TODO Auto-generated method stub
            return null;
        }
        /**
         * 停止服务
         * */
        public void stop() {
            Thread newThread = new Thread();
            Thread t = new Thread(new Runnable() {
                public void run() {
                    // 启动服务器
                    MQTTServer mqttServer = new MQTTServer();
                    mqttServer.stop();
                }
            });
            t.start();
        }
        @Override
        public boolean start() {
            try {
                /*
                 * 使用自己的配置文件。检查配置文件,如果配置文件出问题,则启动失败 配置文件与当前jar文件在同一个目录之下
                 */
    
                // 获取当前运行jar文件所在的目录
                String basePath = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
                if(basePath.replace("", "/").toLowerCase().endsWith("jar")) {
                    basePath=basePath.substring(0,basePath.lastIndexOf("/")+1);
                }
                basePath=basePath.substring(1, basePath.length()-1);
                File config = new File(basePath + "/conf");
                if (!config.exists()) {
                    //如果配置文件丢失,就自己建一套
                    config.mkdir();
                }
                File jmqttproperties=new File(basePath + "/conf/jmqtt.properties");
                if (!jmqttproperties.exists()) {
                    //如果配置文件丢失,就自己建一套
                    //JOptionPane.showMessageDialog(null, basePath + "/conf/jmqtt.properties", "提示", JOptionPane.ERROR_MESSAGE);
                    jmqttproperties.createNewFile();
                    FileOutputStream fis = new FileOutputStream(jmqttproperties);
                    FileWriter fw;
                    fw=new FileWriter(jmqttproperties,true);
                    String lineEnd="";
                    lineEnd+="#MQTT tcp服务端口rn" +
                            "port=1883rn" +
                            "#消息大小限制rn" +
                            "maxMsgSize=524288rn" +
                            "rn" +
                            "#存储选型 1.rocksdb  2.redis   3.default memoryrn" +
                            "storeType=1rn" +
                            "rocksDbPath=rocksdb.dbrn";
    
                    fw.write(lineEnd);
                    fw.flush();
                    fw.close();
                }
    
                File logback_broker=new File(basePath + "/conf/logback_broker.xml");
                if(!logback_broker.exists()) {
                    logback_broker.createNewFile();
                    FileOutputStream fis = new FileOutputStream(logback_broker);
                    FileWriter fw;
                    fw=new FileWriter(logback_broker,true);
                    String lineEnd="";
                    lineEnd+="<?xml version="1.0" encoding="UTF-8"?>rn" +
                            "rn" +
                            "<configuration>rn" +
                            "    <!--<property name="user.home" value="" />-->rn" +
                            "rn" +
                            "    <!-- console -->rn" +
                            "    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">rn" +
                            "        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">rn" +
                            "            <Pattern>%date{yyyy-MM-dd HH:mm:ss.SSS} %-5level[%thread]%logger{56}.%method:%L -%msg%n</Pattern>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- other.log -->rn" +
                            "    <appender name="JmqttOtherAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/store.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/cluster.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttOtherAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JJmqttOtherAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- cluster.log -->rn" +
                            "    <appender name="JmqttClusterAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/store.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/cluster.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttClusterAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JJmqttClusterAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- store.log -->rn" +
                            "    <appender name="JmqttStoreAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/store.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/store.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttStoreAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JJmqttStoreAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- remoting.log -->rn" +
                            "    <appender name="JmqttRemotingAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/remoting.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/remoting.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttRemotingAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JmqttRemotingAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- messageTrace.log -->rn" +
                            "    <appender name="JmqttMessageTraceAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/messageTrace.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/messageTrace.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttMessageTraceAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JmqttMessageTraceAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- broker.log -->rn" +
                            "    <appender name="JmqttBrokerAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/broker.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/broker.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttBrokerAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JmqttBrokerAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <!-- clientTrace.log -->rn" +
                            "    <appender name="JmqttClientTraceAppender_inner"rn" +
                            "              class="ch.qos.logback.core.rolling.RollingFileAppender">rn" +
                            "        <file>${user.home}/logs/jmqttlogs/clientTrace.log</file>rn" +
                            "        <append>true</append>rn" +
                            "        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">rn" +
                            "            <fileNamePattern>${user.home}/logs/jmqttlogs/otherdays/clientTrace.%i.log.gz</fileNamePattern>rn" +
                            "            <minIndex>1</minIndex>rn" +
                            "            <maxIndex>20</maxIndex>rn" +
                            "        </rollingPolicy>rn" +
                            "        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">rn" +
                            "            <maxFileSize>128MB</maxFileSize>rn" +
                            "        </triggeringPolicy>rn" +
                            "        <encoder>rn" +
                            "            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>rn" +
                            "            <charset class="java.nio.charset.Charset">UTF-8</charset>rn" +
                            "        </encoder>rn" +
                            "    </appender>rn" +
                            "    <appender name="JmqttClientTraceAppender" class="ch.qos.logback.classic.AsyncAppender">rn" +
                            "        <appender-ref ref="JmqttClientTraceAppender_inner"/>rn" +
                            "    </appender>rn" +
                            "rn" +
                            "    <logger name="brokerLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttBrokerAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <logger name="clientTraceLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttClientTraceAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <logger name="messageTraceLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttMessageTraceAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <logger name="remotingLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttRemotingAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <logger name="storeLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttStoreAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <logger name="clusterLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttClusterTraceAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <logger name="otherLog" additivity="false">rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="JmqttOtherTraceAppender"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </logger>rn" +
                            "rn" +
                            "    <root>rn" +
                            "        <level value="INFO"/>rn" +
                            "        <appender-ref ref="STDOUT"/>rn" +
                            "    </root>rn" +
                            "</configuration>rn" +
                            "";
                    fw.write(lineEnd);
                    fw.flush();
                    fw.close();
                }
                // 加载文件
                String argStr="";
                argStr+="-h "+basePath+"/ -c "+basePath+"/conf/jmqtt.properties";
                String[] args=argStr.split(" ");
                // 启动服务
                Thread newThread = new Thread();
                Thread t = new Thread(new Runnable() {
                    public void run() {
                        // 启动服务器
                        MQTTServer mqttServer = new MQTTServer();
                        mqttServer.start(args);
                    }
                });
                t.start();
                //打开服务页面
                return true;
            } catch (Exception ex) {
                StringWriter stringWriter = new StringWriter();
                ex.printStackTrace(new PrintWriter(stringWriter));
                JOptionPane.showMessageDialog(null, "mqtt服务启动错误:" +stringWriter.toString(), "提示", JOptionPane.ERROR_MESSAGE);
                return false;
            }
        }
        public static void main(String[] args) {
            MQTTService mqttService = new MQTTService();
            mqttService.start();
        }
        public String ReadConfig(File propertyfile, String configKey) throws IOException {
            FileInputStream fis = new FileInputStream(propertyfile);
            InputStreamReader isr = new InputStreamReader(fis);
            BufferedReader br = new BufferedReader(isr);
            String temp = "";
            for (int j = 1; (temp = br.readLine()) != null; j++) {
                if(temp.startsWith("#")) {
                    continue;
                }else {
                    String[] property= temp.trim().split("=");
                    if(property[0].trim().contentEquals(configKey)) {
                        return property[1];
                    }
                }
            }
            return "";
        }
    }

    MQTTServer.java(启动JMQTT中的BrokerStartup ):

    package org.jmqtt.broker.brokersever;
    
    import df.util.log.biz.LogUtil;
    import org.jmqtt.broker.BrokerController;
    import org.jmqtt.broker.BrokerStartup;
    
    import javax.swing.*;
    import java.awt.*;
    import java.io.*;
    
    public class MQTTServer {
        public static BrokerStartup broker;
        public static BrokerController brokerController;
    
        public void start(String[] args) {
            broker=new BrokerStartup();
            try {
                brokerController=broker.start(args);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        public void stop() {
            if(brokerController!=null){
                brokerController.shutdown();
                brokerController=null;
            }
        }
    
        public static void autoRun() {
            try {
                File directory = new File("");
                String baseDirectory = directory.getAbsolutePath();
                String autorunPath = "";
                if (baseDirectory.endsWith("")) {
                    autorunPath = """+baseDirectory + "MQTTService.jar"+ "  autorun "+baseDirectory+""  ";
                } else {
                    autorunPath =  """+baseDirectory + "MQTTService.jar"+ "  autorun "+baseDirectory+"" ";
                }
                String regKey = ""HKEY_CURRENT_USERSoftwareMicrosoftWindowsCurrentVersionRun"";// 注册表的key
                String myAppName = "MQTTService";
                //String kStr=commandPrefix + key + " /v  Zhakong  /t REG_SZ /d " + autorunPath + " /f";
                String kStr="reg "+"add "+regKey+" /v "+myAppName+" /t reg_sz /d "+autorunPath;
                Runtime.getRuntime().exec(kStr);
            } catch (Exception ex) {
                LogUtil.debugInfo("uitl", "注册表书写错误:"+ex.getMessage());
            }
        }
    
    }

    至此,MQTT代理服务器搭建完毕。

    搭建MQTT服务器方法2:直接导入Jar包(以上服务器搭建集成的jar包)

    启动处调用代码如下:

        MQTTService mqttService = new MQTTService();
        mqttService.start();       

    启动后,MQTTService会自动在jar文件的根目录下创建默认的MQTT配置,并使用这些配置启动服务。如果需要,请更改这些文件中的配置满足自己的私欲。

    MQTT模拟客户端测试

    开启服务器后,Producer推送一条信息,Consumer订阅本条信息,需导入本包

    34fa9ce1ee99a17829d4d69b2308d6b8.png

    创建broker服务器:

    Mqtt_Server.java

    package org.jmqtt.broker;
    import org.jmqtt.broker.brokersever.MQTTService;
    public class Mqtt_Server {
        public static void main(String[] args) {
            MQTTService mqttService = new MQTTService();
            mqttService.start();
        }
    }

    Produce.java:

    package org.jmqtt.java;
    
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.jmqtt.pojo.ReportData;
    
    public class Producer {
        private static final String broker = "tcp://127.0.0.1:1883";
        private static final String content = "Message from MqttProducer";
        private static ReportData reportData=new ReportData();
        private static final int qos = 1;
        private static final String topic = "MQTT/TOPIC";
        private static final String clientId = "MQTT_PUB_CLIENT";
    
        public static void main(String[] args) throws MqttException, InterruptedException {
            MqttClient pubClient = getMqttClient();
            for(int i = 0; i < 3; i++){
                MqttMessage mqttMessage = getMqttMessage();
                pubClient.publish(topic,mqttMessage);
                System.out.println("Send message success.");
            }
        }
    
        private static MqttMessage getMqttMessage(){
            MqttMessage mqttMessage = new MqttMessage(reportData.toString().getBytes());
            mqttMessage.setQos(qos);
            return mqttMessage;
        }
    
        private static MqttClient getMqttClient(){
            try {
                MqttClient pubClient = new MqttClient(broker,clientId,new MemoryPersistence());
                MqttConnectOptions connectOptions = new MqttConnectOptions();
                connectOptions.setWill("lwt","this is a will message".getBytes(),1,false);
                connectOptions.setCleanSession(true);
                System.out.println("Connecting to broker: " + broker);
                pubClient.connect(connectOptions);
                return pubClient;
            } catch (MqttException e) {
                e.printStackTrace();
            }
            return null;
        }
    }

    Consumer.jar:

    package org.jmqtt.java;
    
    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class Consumer {
        private static final String broker = "tcp://127.0.0.1:1883";
        private static final String topic = "MQTT/+";
        private static final String clientId = "MQTT_SUB_CLIENT";
    
        public static void main(String[] args) throws MqttException, InterruptedException {
            MqttClient subClient = getMqttClient();
            subClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable throwable) {
                    System.out.println("Connect lost,do some thing to solve it");
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    System.out.println("From topic: " + s);
                    System.out.println("Message content: " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    
                }
            });
            subClient.subscribe(topic);
        }
    
    
        private static MqttClient getMqttClient(){
            try {
                MqttClient pubClient = new MqttClient(broker,clientId,new MemoryPersistence());
                MqttConnectOptions connectOptions = new MqttConnectOptions();
                connectOptions.setCleanSession(true);
                System.out.println("Connecting to broker: " + broker);
                pubClient.connect(connectOptions);
                return pubClient;
            } catch (MqttException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    展开全文
  • Linux系统java环境(以及Nginx反向代理) 主要安装jdk和Tomcat环境 1.tomcat环境安装 2.jak环境安装以及环境变量配置 两种方式: 第一种:使用yum方式安装jdk 第二种:直接官网下载压缩包进行安装(以下只讲第一种...
  • 目录 1.服务器环境 2.安装gcc 3.安装zlib ...5)配置代理静态资源 6)配置静态资源库 7)配置图片服务器 8)配置负载均衡 9)隐藏nginx版本号 1.服务器环境 系统版本:Red Hat Enterprise Lin...
  • 一般做网络爬虫代理的IP需求比较大。由于在爬取网站信息的过程中...3、自建代理服务器,稳定,但是需要服务器的大量资源。 Java自建代理池: 1、建立ParallelFlowable,并对提供免费代理IP的每个页面进行并行抓取。 .
  • 准备工作: wget下载tomcat 安装java环境 nginx服务器反向代理一级目录 一级目录
  • 所以推荐使用一个相关联的项目 -- BrowserMobProxy。具体项目首页见相关GitHub。...它其实是一个java实现的开源代理项目,它与普通的代理不同之处在于:它支持REST API接口(可以通过http请求来动...
  • 1. 架构说明: nginx + 4个tomcatnginx作为前端代理,并且肩负负载均衡的作用,多个tomcat可以解决单台服务器高并发的性能问题,至于后端放几个tomcat要看你的服务器有多大内存,我的服务器是4核的CPU,8G内存。...
  • MacOS 下搭建Mqtt服务器

    2021-02-04 10:13:12
    目前主流的MQTT协议的服务器搭建方式有Eclipse Mosquitto这样的代理工具软件,还有一些云端的代理服务器。这里介绍的是自建Apache Apollo代理服务器,模拟Mqtt的数据传输 一、 jdk环境安装 这里不多赘述,说几个...
  • nginx和ftp搭建图片服务器 一、需要的组件 图片服务器两个服务: Nginx(图片访问): 1、http服务:可以使用nginx做静态资源服务器。也可以使用apache。推荐使用nginx,效率更高。 2、反向代理 实现 负载均衡 ftp...
  • apollo搭建mqtt服务器 1、需要java jdk支持,所以要先去安装好jdk,具体教程自行查找 2、下载apollo安装包 下载地址:http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/ 3、解压 apollo 安装包...
  • 目前我对这个图片服务器的使用是利用java语言。 官方提供了一个jar: fastdfs_client_v1.24.jar 使用方法: 1、把FastDFS提供的jar包添加到工程中  由于我使用的maven工程管理,所以我首先需要将fastdfs_...
  • 搭建Nginx+Java环境

    2020-06-28 07:40:16
    与Apache相比,Nginx在配合Java应用服务器方面,耦合度很低,它只能通过自身的反向代理功能来实现与Java应用服务器的支持。但这恰恰是Nginx的一个优点,耦合度的降低,可以使Nginx与Java服务器的相互影响降到最低。 ...
  • 本文为姜友瑶原创作品非商业转载请注明作译者、出处,并保留本文...本教程教大家通过阿里云服务器搭建一个由nginx做代理转发的javaweb系统,所有的请求通过nginx 转发到对应的tomcat下。 一、准备 1、阿里云Ub...
  • 搭建Nginx+JAVA环境

    2015-09-19 13:25:00
    搭建Nginx+JAVA环境Apache对Java的支持很灵活,他们的结合度...与Apache相比,Nginx在配合Java应用服务器方面,耦合度很低,它只能通过自身的反向代理功能来实现与Java应用服务器的支持,这恰恰是Nginx的一个优点,...
  • 通过代理服务器访问SFTP报如下错误:Caused by: ...环境说明:1、代理服务器是自己用Apache搭建的,安装之后打开代理相关的模块LoadModule proxy_module modules/mod_proxy.soLo...
  • 发现在并发环境下mysql数据库的压力时钟很大,一台服务器上面不仅仅要跑java的jar包程序,还要承受高并发访问数据库的压力。这样的压力都在同一台服务器上显然存在缺陷。所以我们可以购买多台服务器。 一台专门用来...

空空如也

空空如也

1 2 3 4 5 ... 12
收藏数 227
精华内容 90
关键字:

java搭建代理服务器

java 订阅