精华内容
下载资源
问答
  • emqxssl教程 添加链接描述

    emqx的ssl教程
    encoderlee 的教程
    Jessie_YYY 的教程

    1.购买/申请 CA证书

    给出一个教程链接:申请一个免费ca证书
    证书 服务器类型 选择 Ngnix

    //这个我没有验证,我已经有了一个证书了
    

    2.安装

    将下载的.pem .key文件保存到以下路径,需要提前新建文件夹,在xftp下很容易,就不贴图了。

    /etc/certs //此为linux下路径,window请视情况更改
    

    3.配置

    在 /etc/emqx/emqx.conf 配置文件中将证书路径更改以下:

    listener.ssl.external.keyfile = /etc/certs/你的证书名字.key
    
    listener.ssl.external.certfile = /etc/certs/你的证书名字.pem
    
    listener.ssl.external.cacertfile = /etc/certs/你的证书名字.pem
    //后两个文件为同一个
    

    4.生效更改

    emqx_ctl reload
    emqx restart
    

    5.验证

    在mqttx中填写以下:
    在这里插入图片描述
    结果如下:
    在这里插入图片描述

    在这里插入图片描述

    展开全文
  • https://www.emqx.cn/blog/emqx-server-ssl-tls-secure-connection-configuration-guide 2.SSL/TLS 启用及验证 在 EMQ X 中 mqtt:ssl 的默认监听端口为 8883。 EMQ X 配置 将前文中通过 OpenSSL 工具生成的 emqx....

    1.自制证书

    官网配置全过程,里面有自制证书方法
    https://www.emqx.cn/blog/emqx-server-ssl-tls-secure-connection-configuration-guide
    在这里插入图片描述

    2.SSL/TLS 启用及验证

    在 EMQ X 中 mqtt:ssl 的默认监听端口为 8883。

    EMQ X 配置
    将前文中通过 OpenSSL 工具生成的 emqx.pem、emqx.key 及 ca.pem 文件拷贝到 EMQ X 的 etc/certs/ 目录下,并参考如下配置修改 emqx.conf

    ## listener.ssl.$name is the IP address and port that the MQTT/SSL
    ## Value: IP:Port | Port
    listener.ssl.external = 8883
    
    ## Path to the file containing the user's private PEM-encoded key.
    ## Value: File
    listener.ssl.external.keyfile = etc/certs/emqx.key
    
    ## 注意:如果 emqx.pem 是证书链,请确保第一个证书是服务器的证书,而不是 CA 证书。
    ## Path to a file containing the user certificate.
    ## Value: File
    listener.ssl.external.certfile = etc/certs/emqx.pem
    
    ## 注意:ca.pem 用于保存服务器的中间 CA 证书和根 CA 证书。可以附加其他受信任的 CA,用来进行客户端证书验证。
    ## Path to the file containing PEM-encoded CA certificates. The CA certificates
    ## Value: File
    listener.ssl.external.cacertfile = etc/certs/ca.pem
    

    3.show code

    import datetime
    import functools
    import json
    import os
    import ssl
    import time
    import traceback
    import paho.mqtt.client as mqclient
    import threading
    
    
    class MQTTClientInfoX:
    
        def __init__(self, brokerIP: str, brokerPort: int, clientId: str, user: str, pwd: str, certPath: str):
            self.brokerIP = brokerIP
            self.brokerPort = brokerPort
            self.clientId = clientId
            self.user = user
            self.pwd = pwd
            self.certPath = certPath
            # FIXME: how to annotation list?
            self.clientCfgs = list()
            self.eIds = None
    
        def __repr__(self):
            # user/password@127.0.0.1:8080, sys_robot, [<XXX: a->1, b->2>]
            return "<MQTT {}/{}@{}:{}, {}\n{}\n{}>".format(self.user, self.pwd, self.brokerIP, self.brokerPort,
                                                           self.clientId, self.certPath, self.clientCfgs)
    
    
    class SimpleMqttClientCallElevTest(threading.Thread):
        def __init__(self, mqttClientInfo: MQTTClientInfoX, clientId):
            threading.Thread.__init__(self, name="mqttElevClient_{}".format(time.time()))
            self.mqsCI = mqttClientInfo
            self.connectedCB = None
            self.disconnectedCB = None
            self.mqttMsgRecvCB = None
            self.keepAliveInterval = None
            self.clientId = clientId
            self._isConnected = False
            self.sn = 1
    
        ''' should set before start '''
    
        def setCallback(self, connectedCB, disconnectedCB, mqttMsgRecvCB):
            self.connectedCB = connectedCB
            self.disconnectedCB = disconnectedCB
            self.mqttMsgRecvCB = mqttMsgRecvCB
    
        ''' should set before start '''
    
        def setKeepAlive(self, keepAliveInterval):
            self.keepAliveInterval = keepAliveInterval
    
        @property
        def isConnected(self):
            return self._isConnected
    
        def run(self):
            # FIXME: if clientId is different, multiple clients will request same topics, so use fixed number: 1
            # self.startClient("mqttElevClient_{}_{}_{}".format(MM.TOPIC_KEY_APP, MM.TOPIC_KEY_BUILDING_ID, 1))
            self.startClient(self.clientId)
    
        def startClient(self, clientId):
    
            print("starting mqttElevClient...")
            self.clientConn = mqclient.Client(client_id=clientId, clean_session=False)
            self.clientConn.username_pw_set(self.mqsCI.user, self.mqsCI.pwd)
            print("MQTT u:p -> {}:{}".format(self.mqsCI.user, '*' * len(self.mqsCI.pwd)))
    
            if self.mqsCI.certPath is not None:
                print('certPath validation completed', self.mqsCI.certPath)
                # self.clientConn.tls_set(ca_certs, certfile, keyfile, cert_reqs, tls_version, ciphers)
                if not os.path.exists(self.mqsCI.certPath):
                    self.mqsCI.certPath = None  # ssl.CERT_NONE
                # FIXME: local server need a ca_cert, but server side does not need
                self.clientConn.tls_set(ca_certs=self.mqsCI.certPath, tls_version=ssl.PROTOCOL_TLSv1_2)
                # disables peer verification
                self.clientConn.tls_insecure_set(True)
                print('certPath is cer', self.mqsCI.certPath)
            else:
                print("ssl/tls is disabled")
    
            self.clientConn.on_connect = functools.partial(self.on_connect)
            self.clientConn.on_message = functools.partial(self.mqttMsgRecvCB)
            self.clientConn.on_disconnect = functools.partial(self.on_disconnect)
            self.clientConn.on_log = functools.partial(self.on_log)
            keepAliveTime = 30 if self.keepAliveInterval is None or self.keepAliveInterval < 0 else self.keepAliveInterval
    
            # -----------------------------------------------------------------------------
            while True:
                try:
                    print(" connecting to MQTT broker: {}:{}".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
                    self.clientConn.connect(self.mqsCI.brokerIP, self.mqsCI.brokerPort, keepAliveTime)
                    break
                except Exception as e:
                    # traceback.print_exc()
                    print('startClient :{}'.format(e))
                    time.sleep(5)
            self.clientConn.loop_forever()
    
        def on_disconnect(self, client, userdata, rc):
            print("connection to MQTT broker is broken, {}:{}".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
    
            if self.disconnectedCB is not None:
                try:
                    self.disconnectedCB(client, userdata)
                except:
                    traceback.print_exc()
            self._isConnected = False
    
        ''' connection event listener '''
    
        def on_connect(self, client, userdata, flags, rc):
    
            if rc == 0:
                print("============================================")
                print("MQTT broker {}:{} - connected !!!".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort))
                print("============================================")
    
                if self.connectedCB is not None:
                    try:
                        self.connectedCB(client, userdata)
                    except:
                        traceback.print_exc()
                self._isConnected = True
            else:
                print("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
                print(
                    "MQTT broker {}:{} - Failed to connect, rc->{} !!!".format(self.mqsCI.brokerIP, self.mqsCI.brokerPort,
                                                                               rc))
                print("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
                '''
                 rc: 
                   5 - login failed for not_authorized
                   4 - login failed for password_
                '''
                self.startClient(clientId)
                return
            # don't add blocking operation in event
    
        def on_log(self, client, userdata, level, buf):
            pass
            # print("log:{}".format(buf))
    
        def subscribeE(self, topic, qos):
            if self.isConnected:
                self.clientConn.subscribe(topic, qos)
            else:
                print("the connection is broken, subscribeE swallowed")
    
        def publishE(self, topic, payload, qos, retain):
            if self.isConnected:
                self.clientConn.publish(topic, payload, qos, retain)
                print('publish==========================')
            else:
                print("the connection is broken, publishE swallowed")
    
    
    def mqttMsgRecvCB(client, userdata, msg):
        try:
            mqttMsg = json.loads(msg.payload.decode('utf-8'))
            print('recv mqttMsg:{}'.format(mqttMsg))
        except Exception as e:
            print("mqttMsgRecvCB:{}".format(e))
    
    
    def connectedCB(client, userdata, topic):
        sub_topic1 = topic
        client.subscribe(sub_topic1)
        print('connectedCB subscribe ==> {}'.format(sub_topic1))
    
    
    if __name__ == '__main__':
        brokerIP = '127.0.0.1'
        brokerPort = 8883
        clientId = 'mq_test_ca'
        user = 'mq_test_ca'
        pwd = '123456'
        basedir = os.path.abspath(os.path.dirname(__file__)).replace('\\', '/')
        certPath = basedir + '/cacert.pem'
        topic = '/demo/test/ca'
    
        mqCI = MQTTClientInfoX(brokerIP, brokerPort, clientId, user, pwd, certPath)
        mqttClient = SimpleMqttClientCallElevTest(mqCI, clientId)
        mqttClient.setCallback(functools.partial(connectedCB, topic=topic), None, mqttMsgRecvCB)
        mqttClient.setKeepAlive(30)
        mqttClient.start()
    
        while 1:
            try:
                time.sleep(1)
                cmd = {"sn": 10086, "t": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "f": 3, "p1": 120, }
                cmd_s = json.dumps(cmd)
                mqttClient.publishE(topic, payload=cmd_s, qos=0, retain=False)
            except Exception as e:
                print(e)
    

    在这里插入图片描述
    在这里插入图片描述

    展开全文
  • #参考资料EMQX官网博客: ##启用SSL单向验证 购买证书 如果有购买证书的话,就不...为方便 EMQ X 配置,请将购买的证书文件重命名为 emqx.crt,证书密钥重命名为 emqx.key ##启用SSL双向验证 #centos 安装openssl ...

    参考资料EMQX官网博客:

    启用SSL单向验证

    购买证书
    如果有购买证书的话,就不需要自签名证书。
    为方便 EMQ X 配置,请将购买的证书文件重命名为 emqx.crt,证书密钥重命名为 emqx.key

    启用SSL双向验证

    1. centos 7 安装openssl

    1.1 方式一

    $ yum install openssl
    

    1.2 方式二

    官网地址 选择版本进行下载
    当前最新版本为OpenSSL_1_1_1k.tar.gz

    1. 选择一个服务器文件夹地址,放入压缩包,并解压
     tar -zxvf OpenSSL_1_1_1k.tar.gz
    
    1. 新建安装目录
     mkdir /usr/local/openssl
    
    1. 进入解压后的文件目录
     cd OpenSSL_1_1_1k
    
    1. 生成编译文件
      ./config --prefix=/usr/local/openssl
    
    1. make安装
    make && make install
    
    1. 验证安装
     cd /usr/local
     ldd /usr/local/openssl/bin/openssl
     #能出现版本号则成功
     openssl version
    

    如果出现gcc、make等command not found,需要安装gcc

    yum -y install gcc
    

    2. openssl生成证书

    crt证书可参考:https://www.cnblogs.com/xiaohanlin/p/10213966.html

    2.1 方式一(已验证成功)

    来源 简单的证书制作

     openssl genrsa -out ca.key 2048
    
     openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -subj "/CN=www.emqx.io" -out ca.pem
     
     openssl genrsa -out server.key 2048
    
     openssl req -new -key ./server.key -out server.csr -subj "/CN=127.0.0.1"
    
     openssl x509 -req -in ./server.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out server.pem -days 3650 -sha256
    
     openssl genrsa -out client.key 2048
    
     openssl req -new -key ./client.key -out client.csr -subj "/CN=127.0.0.1"
    
     openssl x509 -req -in ./client.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out client.pem -days 3650 -sha256
    
    

    2.2 方式二 使用配置文件

    先新建openssl.cnf文件。
    将证书需要的国家、机构等信息填入,方便生成证书。
    如果不使用此配置,可在每次生成证书的时候,根据提示填入国家、机构等信息,将下面bash命令中的 ==-config ./openssl.cnf ==省略即可。

    2.2.1 使用openssl自带配置文件(没有验证)

    cp /etc/pki/tls/openssl.cnf ./
    rm -rf /etc/pki/CA/*.old
    ## 生成证书索引库数据库文件
    touch /etc/pki/CA/index.txt
    ## 指定第一个颁发证书的序列号
    echo 01 > /etc/pki/CA/serial
    

    使用默认的配置,可在生成证书命令后增加 -subj “/C=CN/ST=hangzhou/O=EMQ/CN=RootCA” 修改配置。
    参考emqx自签二级证书

    2.2.2 自制配置文件(验证时返回18)

    [req]
    default_bits  = 2048
    distinguished_name = req_distinguished_name
    req_extensions = req_ext
    x509_extensions = v3_req
    prompt = no
    [req_distinguished_name]
    countryName = CN
    stateOrProvinceName = Jiangsu
    localityName = NanJing
    organizationName = EMQX
    commonName = EH
    [req_ext]
    subjectAltName = @alt_names
    [v3_req]
    subjectAltName = @alt_names
    [alt_names]
    DNS.1 = kb.example.com
    DNS.2 = helpdesk.example.org
    DNS.3 = systems.example.net
    IP.1 = 192.168.1.1
    IP.2 = 192.168.69.14
    

    SubjectAltName 可以包含email 地址,ip地址,正则匹配DNS主机名,等等。
    其中:
    commonName_default: 证书的主域名
    organizationName_default: 企业/单位名称
    organizationalUnitName_default:企业部门
    localityName_default: 城市
    stateOrProvinceName_default: 省份
    countryName_default: 国家代码,一般都是CN(大写)
    [alt_names]: 后面为备用名称列表,可以是域名、泛域名、IP地址

    2.2.3 生成自签名CA证书

    先生成2048长度的随机Key,再用此Key生成EMQX根证书ca.pem,有效期10年(3650)

    openssl genrsa -out ca.key 2048
    openssl req -x509 -new -config ./openssl.cnf -key ca.key -sha256 -days 3650 -out ca.pem
    

    2.2.4 生成服务端证书

    生成步骤与上面类似。创建emqx.key,生成证书请求emqx.csr,签发证书。

    openssl genrsa -out emqx.key 2048
    openssl req -new -key ./emqx.key -config openssl.cnf -out emqx.csr
    openssl x509 -req -in ./emqx.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out emqx.pem -days 3650 -sha256 -extensions v3_req -extfile openssl.cnf
    

    2.2.5 生成客户端证书

    双向连接认证还需要创建客户端证书。创建client.key,生成证书请求emqx.csr,签发证书。

    openssl genrsa -out client.key 2048
    openssl req -new -key client.key -out client.csr -subj "/C=CN/ST=Jiangsu/L=SuZhou/O=EMQX/CN=client"
    openssl x509 -req -days 3650 -in client.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out client.pem
    

    3. EMQX/EMQTTD 开启SSL双向验证

    将前文中通过 OpenSSL 工具生成的 emqx.pem、emqx.key 及 ca.pem 文件拷贝到 EMQ X 的 etc/certs/ 目录下,并参考如下配置修改 emqx.conf:

    listener.ssl.external.keyfile = etc/certs/emqx.key
    
    ## 注意:如果 emqx.pem 是证书链,请确保第一个证书是服务器的证书,而不是 CA 证书。
    listener.ssl.external.certfile = etc/certs/emqx.pem
    
    ## 注意:ca.pem 用于保存服务器的中间 CA 证书和根 CA 证书。可以附加其他受信任的 CA,用来进行客户端证书验证。
    listener.ssl.external.cacertfile = etc/certs/ca.pem
    
    ## 开启双向认证
    listener.ssl.external.verify = verify_peer
    
    ## 禁止单向认证
    listener.ssl.external.fail_if_no_peer_cert = true
    

    4.EMQX重启并使用客户端验证

    ./emqx/bin/emqx start
    

    使用 MQTT X工具或mqtt fx都可。 MQTT X

    展开全文
  • 在opt目录下创建ssl目录用来临时存储生成的证书文件 mkdir /opt/ssl cd /opt/ssl/ cp /etc/pki/tls/openssl.cnf ./ rm -rf /etc/pki/CA/*.old 生成证书索引库数据库文件 touch /etc/pki/CA/index....

    EMQX SSL双向认证配置与mqtt.fx客户端验证【仅供参考】

    一准备工作

    yum install openssl
    yum install vim
    在opt目录下创建ssl目录用来临时存储生成的证书文件
    mkdir /opt/ssl
    cd /opt/ssl/
    cp /etc/pki/tls/openssl.cnf ./
    rm -rf /etc/pki/CA/*.old
    生成证书索引库数据库文件
    touch /etc/pki/CA/index.txt
    指定第一个颁发证书的序列号
    echo 01 > /etc/pki/CA/serial
    

    证书生成

    方法一【这个是我这边测试,服务器与安卓客户端都可用的一种】

    CA证书的生成

    openssl req -x509 -new -days 3650 -keyout ca.key -out rootCA.crt -nodes
    参数:
    Country Name (2 letter code) [XX]:国家【中国---CN】
    State or Province Name (full name) []:省份
    Locality Name (eg, city) [Default City]:城市
    Organization Name (eg, company) [Default Company Ltd]:组织名称
    Organizational Unit Name (eg, section) []:组织单元名称
    Common Name (eg, your name or your server's hostname) []:服务器IP
    Email Address []: 邮箱地址
    
    

    为server端生成证书

    一、生成私钥
    openssl genrsa -out server.key 2048
    二、生成证书请求csr文件
    openssl req -new -key server.key -out server.csr
    参数填写与前面类似
    A challenge password []: 密码
    An optional company name []:公司名称
    三、生成证书
    openssl ca -in server.csr -out server.crt -cert rootCA.crt -keyfile ca.key -days 3650
    
    

    为Client端生成证书

    一、生成私钥:

    openssl genrsa -out client.key 2048
    二、生成证书请求:
    openssl req -new -key client.key -out client.csr
    参数与服务端证书生成类似,不过这里我用到ip是客户端ip
    三、生成证书
    openssl ca -in client.csr -out client.crt -cert rootCA.crt -keyfile ca.key
    
    

    修改emqx/etc/emqx.conf配置文件

    将生成的CA文件复制到/opt/emqx/etc/certs目录下

    listener.ssl.external.keyfile = /opt/emqx/etc/certs/server.key
    listener.ssl.external.certfile = /opt/emqx/etc/certs/server.crt
    listener.ssl.external.cacertfile = /opt/emqx/etc/certs/rootCA.crt
    
    listener.ssl.external.verify = verify_peer
    listener.ssl.external.fail_if_no_peer_cert = true
    

    方法二【并非所有场景都适合】

    生成CA key和证书(为了方便我这里客户端与服务器共用一个)

     生成CA key【采用2048字节】
     openssl genrsa -out ca.key 2048
     生成CA 证书【默认3650天】
     openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -subj "/CN=www.emqx.io" -out ca.pem
     
    

    生成服务端key和证书

     openssl genrsa -out server.key 2048
     注意将IP修改为服务器IP
     openssl req -new -key ./server.key -out server.csr -subj "/CN=127.0.0.1"
     openssl x509 -req -in ./server.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out server.pem -days 3650 -sha256
     
    

    生成客户端key和证书

    openssl genrsa -out client.key 2048
    注意将IP修改为客户端IP
    openssl req -new -key ./client.key -out client.csr -subj "/CN=127.0.0.1"
    openssl x509 -req -in ./client.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out client.pem -days 3650 -sha256
    
    

    修改emqx/etc/emqx.conf配置文件

    将生成的CA文件复制到/opt/emqx/etc/certs目录下

     ## SSL Options
    listener.ssl.external.handshake_timeout = 15
    listener.ssl.external.keyfile = /etc/certs/server.key
    listener.ssl.external.certfile = /etc/certs/server.pem
    ## 开启双向认证
    listener.ssl.external.cacertfile = /etc/certs/ca.pem
    listener.ssl.external.verify = verify_peer
    listener.ssl.external.fail_if_no_peer_cert = true
    
    

    启用用户名密码验证【可选】

    emqx是默认开启匿名认证的,即客户端不需要任何认证信息即可连上emqx服务器,但生产环境这样肯定是不行的。emqx也支持很多种认证方式,这里我选着最简单的一种,用户名密码认证

    #先要把emq的匿名认证关了,在emqx.conf文件
    allow_anonymous = false
    **重启emqx服务**
    #加载用户名认证插件
    ./bin/emqx_ctl plugins load emqx_auth_username
    #添加用户
    ./bin/emqx_ctl users add <Username> <Password>
    

    使用MQTT.fx客户端验证是否配置成功

    mqtt.fx下载地址:http://www.jensd.de/apps/mqttfx/
    我这里选用的是windows1.7.1的版本
    在这里插入图片描述

    MQTT.fx客户端使用

    创建emqx连接
    在这里插入图片描述
    在这里插入图片描述
    若是启用了 用户名密码 认证,需要在这填写用户名密码信息
    在这里插入图片描述
    配置客户端ssl连接证书(方法一)
    在这里插入图片描述
    配置客户端ssl连接证书(方法二)
    在这里插入图片描述
    连接emqx
    在这里插入图片描述
    发送消息
    在这里插入图片描述

    编码实现

    导入MAVEN依赖

    <dependency>
    	<groupId>org.eclipse.paho</groupId>
    	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    	<version>1.2.0</version>
    </dependency>
    
    

    我这里需要将部分证书文件转换成我所需要的格式备用-【若是有其他的实现,可以不用参考我的】

    cd 进入证书所在目录【方法二】
    将ca.pem 与 client.pem 转化为.crt格式
    openssl x509 -outform der -in your-ca.pem -out your-ca.crt
    openssl x509 -outform der -in your-client.pem -out your-client.crt
    
    将client.key转换为.pem文件【方法一、方法二通用,java代码连接需要】
    openssl pkcs8 -topk8 -inform PEM -in client.key -outform PEM -nocrypt -out client-key-pkcs8.pem
    
    将crt文件转化为p12 文件 再转换为bks文件 供安卓客户端使用
    crt转p12
    openssl pkcs12 -export -in client.crt -inkey client.key -out client.p12
    p12 转bks
    keytool -importkeystore -srckeystore client.p12 -srcstoretype pkcs12 -destkeystore client.bks -deststoretype bks -provider org.bouncycastle.jce.provider.BouncyCastleProvider -providerpath bcprov-ext-jdk15on-157.jar 
    转换过程输入的密码即为证书创建时输入的密码
    
    

    若是转换出来的文件不可用,请参考https://blog.csdn.net/qq_36992688/article/details/78861883

    Java代码

    MQTT静态常量类

    /**
     * MQTT静态参数常量类
     */
    public class MqttConstant {
    
        //MQTT 服务器基础配置
        /**
         * MQTT服务器IP、端口   
         */
        public final static String MQTT_IP_PORT = "ssl://127.0.0.1:8883";
        /**
         * MQTT服务器登录 用户名  elinker
         */
        public final static String MQTT_USERNAME = "username";
        /**
         * MQTT服务器登录 密码
         */
        public final static String MQTT_PASSWORD = "password";
        /**
         * MQTT客户端【应用程序】   ID【自定义】
         */
        public final static String MQTT_CLIENTID = "test";
        /**
         * MQTT客户端【应用程序】  订阅主题【自定义】
         */
        public final static String MQTT_TOPIC = "web/pubClient";
    
        //SSL双向认证 配置文件目录
        /**
         * CA证书
         */
        public final static String SSL_CA_CRT = "ca.crt证书文件路径";
        /**
         * 客户端证书
         */
        public final static String SSL_CLIENT_CRT = "client.crt证书文件路径";
        /**
         * 客户端证书key
         */
        public final static String SSL_CLIENT_KEY_PKCS8_PEM = "client-key-pkcs8.pem证书文件路径";
        /**
         * 客户端证书密码
         */
        public final static String SSL_CLIENT_PASSWORD = "client_password";
    
       //消息发送的类型
        /**
         *  尽力而为。消息发送者会想尽办法发送消息,但是遇到意外并不会重试
         */
        public final static int QOS_UNRELIABLE = 0;
        /**
         *  至少一次。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。
         */
        public final static int QOS_REPEAT = 1;
        /**
         *  恰好一次。保证这种语义肯待会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别2是最合适的。
         */
        public final static int QOS_JUST = 2;
    
    
    }
    
    

    MQTT生产者客户端

    /**
     * 服务器【应用程序】Mqtt消息推送工具类
     */
    public class MqttPushUtil {
    
        public static Logger logger = Logger.getLogger(MqttPushUtil.class);
    
        public static MqttClient mqttClient;
    
        public static MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    
        static {
            try{
                //创建客户端
                mqttClient = new MqttClient(MqttConstant.MQTT_IP_PORT, MqttConstant.MQTT_CLIENTID, new MemoryPersistence());
                //创建链接参数
                mqttConnectOptions = new MqttConnectOptions();
                //在客户端断开连接时是否缓存 订阅消息
                mqttConnectOptions.setCleanSession(true);
                //设置连接的用户名
                mqttConnectOptions.setUserName(MqttConstant.MQTT_USERNAME);
                //设置连接的密码
                mqttConnectOptions.setPassword(MqttConstant.MQTT_PASSWORD.toCharArray());
                //开启自动重连
                mqttConnectOptions.setAutomaticReconnect(true);
                //设置超时时间  单位为秒
                mqttConnectOptions.setConnectionTimeout(30);
                //设置会话心跳时间  单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                mqttConnectOptions.setKeepAliveInterval(20);
                //ssl 双向认证相关配置
                SSLSocketFactory  factory = EmqxSSLFactory.getSSLSocktet(MqttConstant.SSL_CA_CRT, MqttConstant.SSL_CLIENT_CRT,
                        MqttConstant.SSL_CLIENT_KEY_PKCS8_PEM, MqttConstant.SSL_CLIENT_PASSWORD);
                mqttConnectOptions.setSocketFactory(factory);
    //            MqttPushServerUtil.setClient(mqttClient);
    
    
                mqttClient.connect(mqttConnectOptions);
            }catch (Exception e){
                e.printStackTrace();
                logger.error("连接MQTT服务器发生异常", e);
                try{
                    mqttClient.disconnect();;
                    mqttClient.close();
                }catch (MqttException el){
                    e.printStackTrace();
                    logger.error("连接异常时-----》断开与MQTT服务器连接操作发生异常", el);
                }
            }
        }
    
        public static void receiveSubscription(){
           try{
               mqttClient.setCallback(new MqttCallback() {
                   /**
                    *检测到断开连接
                    * @param throwable
                    */
                   @Override
                   public void connectionLost(Throwable throwable) {
                       //MQTT客户端断线重连
                       reconnection();
                   }
    
                   /**
                    * 接收订阅消息处理
                    * @param topic
                    * @param mqttMessage
                    * @throws Exception
                    */
                   @Override
                   public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                       System.out.println("topic:"+topic);
                       System.out.println("Qos:"+mqttMessage.getQos());
                       System.out.println("message content:"+new String(mqttMessage.getPayload()));
                   }
    
                   /**
                    * 消息发布结果
                    * @param iMqttDeliveryToken
                    */
                   @Override
                   public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                       logger.info("MQTT消息发布的结果:---------"+ iMqttDeliveryToken.isComplete());
                   }
               });
    
               mqttClient.subscribe(MqttConstant.MQTT_TOPIC, MqttConstant.QOS_JUST);
           }catch (Exception e){
               e.printStackTrace();
               logger.error("MQTT客户端---公租房系统:接收消息时发生异常!", e);
           }
        }
    
        /**
         * MQTT客户端断线重连
         */
        public static void reconnection(){
            logger.error("与MQTT服务器断开连接,尝试重新连接!");
            try {
                if(!mqttConnectOptions.isAutomaticReconnect()){
                    mqttClient.reconnect();
                }
            }catch (MqttException e){
                logger.error("MQTT重新连接发生异常!", e);
            }
        }
    
        /**
         * 发布非可靠的消息【消息服务质量:0】
         * @param topic   发布主题
         * @param pushMessage  消息内容
         */
        public static boolean publishUnreliable(String topic, String pushMessage) {
            return publish(MqttConstant.QOS_UNRELIABLE, topic, pushMessage);
        }
    
        /**
         * 以至少收到一次的模式发送消息【可能重复,消息服务质量:1】
         * @param topic  发布主题
         * @param pushMessage  消息内容
         */
        public static boolean publishLeastOnce(String topic, String pushMessage) {
            return publish(MqttConstant.QOS_REPEAT, topic, pushMessage);
        }
    
        /**
         * 发送可靠的保证 能且只能收到一次的消息【消息服务质量:2】
         * @param topic
         * @param pushMessage
         */
        public static boolean publishReliable(String topic, String pushMessage) {
            return publish(MqttConstant.QOS_JUST, topic, pushMessage);
        }
    
        /**
         * 发布主题和消息队列
         * @param qos
         * @param topic
         * @param pushMessage
         * @return
         */
        public static boolean publish(int qos, String topic, String pushMessage) {
            try {
                // 创建消息
                MqttMessage message = new MqttMessage(pushMessage.getBytes());
                // 设置消息的服务质量
                message.setQos(qos);
                // 发布消息
                mqttClient.publish(topic, message);
    
            } catch (MqttException e) {
                logger.error("发布消息时发生异常!",e);
                return false;
            }
            return true;
        }
    
    }
    
    

    EmqxSSL连接工厂类

    public class EmqxSSLFactory {
    
        public static javax.net.ssl.SSLSocketFactory getSSLSocktet(String caPath, String crtPath, String keyPath, String password) {
            try{
                CertificateFactory cAf = CertificateFactory.getInstance("X.509");
                FileInputStream caIn = new FileInputStream(caPath);
                X509Certificate ca = (X509Certificate) cAf.generateCertificate(caIn);
                KeyStore caKs = KeyStore.getInstance("JKS");
                caKs.load(null, null);
                caKs.setCertificateEntry("ca-certificate", ca);
                TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
                tmf.init(caKs);
    
                CertificateFactory cf = CertificateFactory.getInstance("X.509");
                FileInputStream crtIn = new FileInputStream(crtPath);
                X509Certificate caCert = (X509Certificate) cf.generateCertificate(crtIn);
    
                crtIn.close();
                KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
                ks.load(null, null);
                ks.setCertificateEntry("certificate", caCert);
                ks.setKeyEntry("private-key", getPrivateKey(keyPath), password.toCharArray(),
                        new java.security.cert.Certificate[]{caCert});
                KeyManagerFactory kmf = KeyManagerFactory.getInstance("PKIX");
                kmf.init(ks, password.toCharArray());
    
                SSLContext context = SSLContext.getInstance("TLSv1");
    
                context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
                return context.getSocketFactory();
            }catch (Exception e){
                e.printStackTrace();
            }
            return null;
        }
    
        public static PrivateKey getPrivateKey(String path) throws Exception {
    
            org.apache.commons.codec.binary.Base64 base64 = new Base64();
            byte[] buffer = base64.decode(getPem(path));
    
            PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(buffer);
            KeyFactory keyFactory = KeyFactory.getInstance("RSA");
            return (RSAPrivateKey) keyFactory.generatePrivate(keySpec);
    
        }
    
        private static String getPem(String path) throws Exception {
            FileInputStream fin = new FileInputStream(path);
            BufferedReader br = new BufferedReader(new InputStreamReader(fin));
            String readLine = null;
            StringBuilder sb = new StringBuilder();
            while ((readLine = br.readLine()) != null) {
                if (readLine.charAt(0) == '-') {
                    continue;
                } else {
                    sb.append(readLine);
                    sb.append('\r');
                }
            }
            fin.close();
            return sb.toString();
        }
    }
    
    

    测试类

    public class MqttPushClientUtil {
    
        public static void main(String[] args) throws MqttException {
            String HOST = MqttConstant.MQTT_IP_PORT;
            String[] TOPIC = {"dev/mac/+", "dev/village/+"};
            int[] qos = new int[]{2, 2};
            String clientid = "subClient";
            String userName = "username";
            String passWord = "password";
            try {
                // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
                MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
                // MQTT的连接设置
                MqttConnectOptions options = new MqttConnectOptions();
                // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
                SSLSocketFactory  factory = EmqxSSLFactory.getSSLSocktet(MqttConstant.SSL_CA_CRT, MqttConstant.SSL_CLIENT_CRT,
                        MqttConstant.SSL_CLIENT_KEY_PKCS8_PEM, MqttConstant.SSL_CLIENT_PASSWORD);
                options.setSocketFactory(factory);
                options.setCleanSession(true);
                // 设置连接的用户名
                options.setUserName(userName);
                // 设置连接的密码
                options.setPassword(passWord.toCharArray());
                // 设置超时时间 单位为秒
                options.setConnectionTimeout(10);
                // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                options.setKeepAliveInterval(20);
                // 设置回调函数
                client.setCallback(new MqttCallback() {
    
                    @Override
                    public void connectionLost(Throwable cause) {
                        System.out.println("connectionLost");
                    }
    
                    @Override
                    public void messageArrived(String topic, MqttMessage message) throws Exception {
                        System.out.println("topic:"+topic);
                        System.out.println("Qos:"+message.getQos());
                        System.out.println("message content:"+new String(message.getPayload()));
    
                    }
    
                    @Override
                    public void deliveryComplete(IMqttDeliveryToken token) {
                        System.out.println("deliveryComplete---------"+ token.isComplete());
                    }
    
                });
                client.connect(options);
                //订阅消息
                client.subscribe(TOPIC, qos);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
    
            Map<String, Object> map = new HashMap<>();
            map.put("index", 1);
            map.put("text", "设备信息");
            MqqtPushServiceUtil.pushDeviceMsg("00:00:a4:01:01:01", "12", map);
            try{
                Thread.sleep(5000);
            }catch (Exception e){
                e.printStackTrace();
            }
    
            map.put("index", 2);
            map.put("text", "小区信息");
            MqqtPushServiceUtil.pushVillageMsg(123, "12", map);
        }
    
    }
    
    

    参考

    证书格式转换: https://vimsky.com/article/3608.html
    其他证书生成方式:
    emqtt安全连接ssl配置 自签证书 单向认证连接 加密 ssl/tls,mosquitto客户端ssl单向认证连接测试
    emqx使用自制CA证书登录配置(双向认证)
    EMQ X 服务器 SSL/TLS 安全连接配置指南
    MQTT研究之EMQ:【SSL双向验证】
    MQTT研究之EMQ:【JAVA代码构建X509证书】
    其他编码实现
    MQTT Java客户端Eclipse paho实现数据的发送和接收
    emqtt 试用(八)ssl认证 - 代码验证
    MQTT.fx客户端安装与使用
    MQTT.fx的安装和使用
    EMQX权限验证
    emqx服务器的权限验证(四)

    展开全文
  • EMQX:(版本:4.2.8/4.3.0) 客户端工具: MQTTX(版本:5.0) 自生成证书工具openssl(ubuntu/windos base): 一:生成自签名的CA key和证书(mqtt服务端和mqtt客户端公用一个CA) (备注:自签名证书在ubuntu/windows ...
  • emqx配置ssl

    千次阅读 2019-10-04 05:20:40
    1、生产自签证书 mkdir /etc/emqttd/certs/ && cd /etc/emqttd/certs/ openssl genrsa -out ca-key.pem 2048 openssl req -x509 -new -nodes -key ca-key.pem -days 10000 -out ca.pem -subj "/...
  • 记一次java代码生成的SSL证书EMQX服务器校验失败原因和解决,附上代码@[TOC](记一次java代码生成的SSL证书EMQX服务器校验失败原因和解决,附上代码) 前段时间做设备证书服务端生成,网上copy一堆代码,但是结果...
  • EMQ X 内置对 TLS/DTLS 的支持,包括支持单双向认证、X.509 证书等多种身份认证和 LB Proxy Protocol V1/2 等。你可以为 EMQ X 支持的所有协议启用 TLS/DTLS,也可以将 EMQ X 提供的 HTTP API 配置为使用 TLS。本文...
  • EMQ 启用 SSL/TLS 加密连接

    千次阅读 2019-09-09 10:34:31
    EMQ 启用 SSL/TLS 加密连接 使用加密连接的时候选择wss协议,并使用域名连接:绑定域名-证书之后...打开etc/emqx.conf配置文件,修改以下配置 # wss 监听地址 listener.wss.external = 8084 # 修改密钥文件地址...
  • 1.生成CA key和证书(为了方便我这里客户端与服务器共用一个) ... openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -subj "/CN=www.emqx.io" -out ca.pem 2.生成服务端key和证书 openssl genr
  • EMQ X MQTT 服务器启用 SSL/TLS 安全连接

    千次阅读 2020-07-13 14:42:27
    作为基于现代密码学公钥算法的安全协议,TLS/SSL 能在计算机通讯网络上保证传输安全,EMQ X 内置对 TLS/SSL 的支持,包括支持单/双向认证、X.509 证书、负载均衡 SSL 等多种安全认证。你可以为 EMQ X 支持的所有协议...
  • 作为基于现代密码学公钥算法的安全协议,TLS/SSL 能在计算机通讯网络上保证传输安全,EMQ X 内置对 TLS/SSL 的支持,包括支持单/双向认证、X.509 证书、负载均衡 SSL 等多种安全认证。你可以为 EMQ X 支持的所有协议...

空空如也

空空如也

1 2 3
收藏数 41
精华内容 16
关键字:

emqxssl证书