-
2021-02-27 16:25:07
一、准备工作
1.需要在mq服务器上新建一个队列管理器、一个发送队列、一个接收队列、一个服务器通道
2.启动队列管理器(SEND)
strmqm SEND
--启动监听(SL)
runmqlsr -m SL -p 1414 -t tcp &
--用户交互管理界面程序
runmqsc SEND
--创建本地队列
DEFINE QLOCAL (Q1)
--创建的服务器连接通道
DEFINE CHANNEL(C) CHLTYPE (SVRCONN) REPLACE
二、代码如下
package sim;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
/**
* @author Administrator
*
*/
public class Mq_SEND2 {
static MQQueueManager qMgr;
static int CCSID = 1381;
//本地队列
static String queueString = "Q1";
public static void connect() throws MQException {
MQEnvironment.hostname = "9.1.77.196";
MQEnvironment.channel = "C";
MQEnvironment.port = 1414;
MQEnvironment.CCSID = CCSID;
// //MQ中拥有权限的用户名
// MQEnvironment.userID = "mqm";
// //用户名对应的密码
// MQEnvironment.password = "mqm";
qMgr = new MQQueueManager("SEND");
}
public static void sendMsg(String msgStr) {
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_INQUIRE;
MQQueue queue = null;
try {
// 建立Q1通道的连接
queue = qMgr.accessQueue(queueString, openOptions, null, null, null);
System.out.println("发送消息前的深度为:" queue.getCurrentDepth());
MQMessage msg = new MQMessage();// 要写入队列的消息
msg.format = MQC.MQFMT_STRING;
msg.characterSet = CCSID;
msg.encoding = CCSID;
// msg.writeObject(msgStr); //将消息写入消息对象中
msg.writeString(msgStr);
MQPutMessageOptions pmo = new MQPutMessageOptions();
msg.expiry = -1; // 设置消息用不过期
queue.put(msg, pmo);// 将消息放入队列
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (queue != null) {
try {
queue.close();
} catch (MQException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static void receiveMsg() {
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_INQUIRE;
MQQueue queue = null;
try {
queue = qMgr.accessQueue(queueString, openOptions, null, null, null);
System.out.println("该队列当前的深度为:" queue.getCurrentDepth());
System.out.println("===========================");
int depth = queue.getCurrentDepth();
// 将队列的里的消息读出来
while (depth-- > 0) {
MQMessage msg = new MQMessage();// 要读的队列的消息
MQGetMessageOptions gmo = new MQGetMessageOptions();
queue.get(msg, gmo);
System.out.println("消息的大小为(字节):" msg.getDataLength());
System.out.println("消息的内容:\n" msg.readStringOfByteLength(msg.getDataLength()));
System.out.println("---------------------------");
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (queue != null) {
try {
queue.close();
} catch (MQException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws MQException {
connect();
sendMsg("发送消息测试");
receiveMsg();
}
更多相关内容 -
ibmmq java 实现
2021-04-14 16:09:34Maven ...com.ibm.mq</groupId> <artifactId>com.ibm.mq.allclient</artifactId> <version>9.1.1.0</version> </dependency> package com.tms.modules.shoMaven
<dependency> <groupId>com.ibm.mq</groupId> <artifactId>com.ibm.mq.allclient</artifactId> <version>9.1.1.0</version> </dependency>
package com.tms.modules.shop.plugin.gongfu.nantong; import com.ibm.mq.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; public class NantongGongfuPlugin { private static final Logger logger = LoggerFactory.getLogger(NantongGongfuPlugin.class); //队列管理器 static MQQueueManager qMgr; //#字符集 private String CCSIDs="1381"; //#本地接口 private String hostnames="192.168.0.148"; //#本地服务器连接通道 private String channels="Svrconn"; //#mq配置的监听端口 private String ports="1414"; //#属于 mq组 的本地用户账号 private String userIDs="xxxx"; //#属于 mq组 的本地用户密码 private String passwords="xxxx"; //本地队列名字 private String queuenames="xxxx"; private String queueARGs="49"; // #消息格式 private String formats="MQC.MQFMT_STRING"; //远程队列名字 private String queuenamesF="xxxx"; private String queueARGsF="16"; //队列管理器的名称 private String queueManagerName="XXXXXXXXXX"; public Map<String, Object> pushWarehouseOrderToGongfu() { //要发的消息内容 StringBuilder xmlBuilder = new StringBuilder(); //使用ibmmq进行消息收发 //本地接口 MQEnvironment.hostname = hostnames; //本地服务器连接通道 MQEnvironment.channel = channels; //监听器的段端口 MQEnvironment.port =Integer.parseInt(ports); //字符集 MQEnvironment.CCSID =Integer.parseInt(CCSIDs); //MQ中拥有权限的用户名 MQEnvironment.userID = userIDs; //用户名对应的密码 MQEnvironment.password = passwords; try { qMgr = new MQQueueManager(queueManagerName); } catch ( MQException e) { e.printStackTrace(); } //发消息 sendMsg(xmlBuilder); //收消息 receiveMsg(); return null; } //发送消息 private void sendMsg(Object msgStr) { MQQueue queue = null; try { // NT_FG 本地队列 49 给本地队列 发消息 这一句是从队列管理器中获取本地队列 ---------- NT_FG_YC远程 16 queue = qMgr.accessQueue(queuenamesF, Integer.parseInt(queueARGsF), null, null, null); //当队列深度为零时 会报错 2038 try{ System.out.println("发送消息前的深度为:" + queue.getCurrentDepth()); }catch (Exception e){ logger.info("该队列当前的深度为" ,0); } MQMessage msg = new MQMessage();// 要写入队列的消息 msg.format = formats; msg.characterSet = Integer.parseInt(CCSIDs); msg.encoding = Integer.parseInt(CCSIDs); // 将数据放入消息缓冲区 msg.writeString(msgStr.toString()); // msg.writeString(msgStr); //将消息写入本地队列 MQPutMessageOptions pmo = new MQPutMessageOptions(); msg.expiry = -1; // 设置消息用不过期 queue.put(msg, pmo);// 将消息放入队列 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (queue != null) { try { queue.close(); } catch (MQException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } //获取消息 private void receiveMsg() { MQQueue queue = null; try { //获取本地消息队列 queue = qMgr.accessQueue(queuenames, Integer.parseInt(queueARGs), null, null, null); try{ System.out.println("==========================="); System.out.println("发送消息前的深度为:" + queue.getCurrentDepth()); }catch (Exception e){ logger.info("该队列当前的深度为" ,0); return; } int depth = queue.getCurrentDepth(); // 将队列的里的消息读出来 while (depth-- > 0) { MQMessage msg = new MQMessage();// 要读的队列的消息 MQGetMessageOptions gmo = new MQGetMessageOptions(); queue.get(msg, gmo); System.out.println("消息的大小为(字节):" + msg.getDataLength()); System.out.println("消息的内容:\n" + msg.readStringOfByteLength(msg.getDataLength())); logger.info("消息的大小为(字节)" ,msg.getDataLength()); logger.info("消息的内容" ,msg.readStringOfByteLength(msg.getDataLength())); System.out.println("---------------------------"); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (queue != null) { try { queue.close(); } catch (MQException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
-
JAVA连接IBM MQ代码
2019-01-25 11:34:27JAVA连接IBM MQ,具体的详细说明,请再csdn上面搜索JAVA连接IBM MQ关键词 -
MQ JAVA API
2011-11-15 19:01:02WebSphere MQ 的java API文档。 包含了com.ibm.mq.jar的API. -
实现远程消息安全传递 IBM MQ java jar包
2018-04-23 15:17:25实现远程消息安全传递 IBM MQ java jar包 aspectjrt.jar com.ibm.mq.fta.jar com.ibm.mq.jar com.ibm.mq.jms.Nojndi.jar com.ibm.mq.soap.jar com.ibm.mqetclient.jar com.ibm.mqjms.jar -
IBM MQ 连接方法java代码
2018-12-10 15:04:11IBM MQ经常被一些政府公共部门,银行等企业用来做数据传输和报文收发,在互联网应用的开发中较少见到,资源为MQjava 代码 -
Java下操作IBM Websphere MQ的项目案例
2018-04-04 14:25:00Java下操作IBM Websphere MQ的项目案例, eclipse工程压缩包, 导入直接可用. -
java访问IBM MQ SSL加密通道
2018-09-19 15:24:05描述了java程序代码去访问MQ的SSL加密的通道。如何配置JKS,如何配置MQ服务器的SSL秘钥库,如何配置证书制作证书和秘钥库。主要是如何编写java代码去访问SSL通道并取到数据。 -
(七)使用IBMMQ javaAPI对集群进行异步消息的接收
2020-05-27 15:58:00前言:使用第六篇中组建的IBMMQ集群、IBMMQ官方提供的javaAPI,编写代码对集群发送消息并检测负载均衡的效果,若还不了解集群怎样搭建,请移步 : (六)IBMMQ简单集群搭建. 1. 准备工作 jar文件 (1). ...前言:使用第六篇中组建的IBMMQ集群、IBMMQ官方提供的javaAPI,编写代码对集群发送消息并检测负载均衡的效果,若还不了解集群怎样搭建,请移步 : (六)IBMMQ简单集群搭建.
1. 准备工作
- jar文件
(1). com.ibm.mq.jar
(2).connector.jar
2.创建普通java工程,并添加jar到工程中
3. 接收消息的代码
该代码在接收消息达到1000条时会关闭,想要无限接收消息,可自行使用while循环中尝试
package com.qinke.mqcluster; import com.ibm.mq.*; import java.io.IOException; public class Receiver { private static String host = "192.168.137.14"; private static Integer port=1415; private static String channel="TESTCHANNEL"; private static String qmgr="CQM4"; private static String queueName="CQ1"; private static void init(){ //配置要使用的队列管理器的信息,这里使用队列管理器CQM1的信息 MQEnvironment.hostname=host; MQEnvironment.port=port; MQEnvironment.channel=channel; } public static void receive(){ try { //创建队列管理器对象,在实例化的时候会隐式连接队列管理器CQM1 MQQueueManager mqQueueManager = new MQQueueManager(qmgr); //定义打开方式 int openOption = MQC.MQOO_INPUT_SHARED; //以读取方式打开 //直接循环,每次发10条消息 //创建队列对量 MQQueue queue = mqQueueManager.accessQueue(queueName,openOption); //定义获取消息时的一些操作 MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options = gmo.options + MQC.MQGMO_WAIT; //无消息时等待 gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING; //队列管理器停止时退出等待 gmo.waitInterval = 300000; //无消息时等待时长 for(int i=0;i<=1000;i++){ //创建简单消息对象 MQMessage mqMessage = new MQMessage(); queue.get(mqMessage,gmo); String msgContent = mqMessage.readUTF(); System.out.println(msgContent); } //关闭打开的资源,养成好习惯 queue.close(); mqQueueManager.disconnect(); } catch (MQException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { init(); receive(); } }
4. 测试代码效果
5. 附消息发送代码
package com.qinke.mqcluster; import com.ibm.mq.*; import java.io.IOException; public class Sender { private static void init(){ //配置要使用的队列管理器的信息,这里使用队列管理器CQM1的信息 MQEnvironment.hostname="192.168.137.11"; MQEnvironment.port=1415; MQEnvironment.channel="TESTCHANNEL"; } public static void send(){ try { //创建队列管理器对象,在实例化的时候会隐式连接队列管理器CQM1 MQQueueManager mqQueueManager = new MQQueueManager("CQM1"); //定义打开方式 int openOption = MQC.MQOO_OUTPUT; //以写入方式打开 openOption = openOption + MQC.MQOO_BIND_NOT_FIXED;//写入消息方式为不绑定方式(想要负载均衡必须为此方式) //直接循环,每次发10条消息 //创建队列对量 MQQueue queue = mqQueueManager.accessQueue("CQ1",openOption); for(int i=0;i<=10;i++){ //创建简单消息对象 MQMessage mqMessage = new MQMessage(); //将数据写入消息对象中(可自行尝试其他write方法) mqMessage.writeUTF("简单消息:"+i); //使用队列发送消息 queue.put(mqMessage); } //关闭打开的资源,养成好习惯 queue.close(); mqQueueManager.disconnect(); } catch (MQException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { init(); send(); } }
此文章代码亦可用于单节点IBMMQ发送与接收消息
项目地址:异步消息发送与接收. - jar文件
-
MQ的配置与使用方法(包括java程序书写)
2018-09-21 15:18:56MQ队列管理器,队列,通道的配置和使用,包含编写Java程序来实现消息的发送。 -
java MqDemo
2018-03-13 13:52:48这个是java的 Mqdemo, 在这里备份,防止以后能够用到, -
mq-http-java-sdk:阿里云消息队列(MQ) Http Java SDK
2021-05-30 06:23:15MQ JAVA HTTP SDK Alyun MQ 文档: : 阿里云MQ控制台: ://ons.console.aliyun.com 用 添加Maven依赖 <groupId>com.aliyun.mq</groupId> <artifactId>mq-http-sdk</artifactId> <version>1.0.3 或与依赖项 ... -
IBM MQ JAVA UTIL
2009-02-20 11:22:39MQ JAVA 开发工具 demo 涵盖了所有mq开发方法类 -
IBM-MQ-jar包-包含connector..zip
2020-02-12 16:52:49IBM-MQ java所需要jar包,需要的自取,一共有aspectjrt,com.ibm.mq.fta,com.ibm.mq,com.ibm.mq.jms.Nojndi,com.ibm.mq.soap,com.ibm.mqetclient,com.ibm.mqjms,connector -
MQ java编程
2013-04-09 22:12:17简单的java操作IBM MQ的代码说明,并有代码例子,希望对大家有帮助 -
Websphere MQ JavaJMS 客户端的 SSL 配置.doc
2010-10-10 15:58:02Websphere MQ JavaJMS 客户端的 SSL 配置.doc -
JAVA实现MQ发送接收消息详解
2013-03-22 16:46:31JAVA实现MQ发送接收消息详解 MQ配置文档 MQ配置 -
rabbit mq demo spring java
2017-10-10 17:40:26docker 安裝 rabbit mq 並測試 http://knight-black-bob.iteye.com/blog/2395713 -
MQ java 基础编程(一)
2016-10-09 14:38:43MQ java 基础编程(一) 编写人:邬文俊 编写时间 : 2006-2-16 联系邮件 : wenjunwu430@gmail.com 前言通过 2 个多星期对 MQ 学习,在 partner 丁 & partner 武 的帮助下完成了该文档。该文档提供一个简单的...MQ java 基础编程(一)
编写人:邬文俊
编写时间 : 2006-2-16
联系邮件 : wenjunwu430@gmail.com
前言通过 2 个多星期对 MQ 学习,在 partner 丁 & partner 武 的帮助下完成了该文档。该文档提供一个简单的例子,通过对该例子的讲解,你将知道:
1. 用 java 写客户端从 MQ Server 收发消息。
2. MQ 作为 Websphere Application Server 的 JMS 资源提供者。
3. JMS message 映射转化为 MQ message
文档中的知识全部从参考资料和 IBM 提供的文档中获得。 I recommend u to read the documents if u want to know more about the MQ.
参考资料- 《 Using java 》( some place name it 《 base java 》) —–the very important document offered by IBM, every java programmer should read it!
- 《让 WebSphere MQ 成为部署在 WebSphere Application Server 中的应用程序的 JMS 提供程序》
- Websphere MQ 入门教程 (a document written by IBM engineer)
- mqseries_class_for_java
- 《 IBM - JMS 应用和使用 WebSphere MQ MQI 接口的应用如何进行信息头的交换(二)数据映射》 ——- 《 using java 》 mapping message 部分的翻译。
- MQ–IBM MQSeries 使用指南
- WebSphere Application Server V5 and WebSphere MQ Family Integration. PDF
- WebSphere MQ Application Programming Guide. PDF
- IBM MQSeries 的触发机制
- 让 WebSphere MQ 成为部署在 WebSphere Application Server 中的应用程序的 JMS 提供程序
例子说明
例子包括 3 个部分:发送客户端、 MDB 、接收客户端。
客户端 A 把文件发送到服务器,服务器将该文件转发给客户端 B 。客户端A通过向 MQ 客户机直接发送消息。 MQ 队列会把客户端A发送的 MQ Message 转换为 JMS Message 。 MDB 接收到转换后的 JMS 消息后,保存文件在服务器,然后把文件转发到发送队列( JMS 队列)。发送队列由 MQ Server 提供,所以即为发送到了 MQ 队列。 MQ 队列把 JMS Message 转换为 MQ Message 。客户端 B 从 MQ 队列中接收转换后的消息,从消息中读取文件,保存在硬盘。MQMESSAGE JMS MESSAGE
Client A————————->mq queue ——————->MDB
Client B<———————— mq queue <——————-MDB- 配置 MQ Server
这里使用的是 MQ Server 5.2 。 MQ Server 和 WebSphere Application Server 安装在同一台机器上( 2 者可以使用绑定方式通信)。
要配置的项目:
1. 队列管理器 QMGR
2. 侦听端口 4001
3. 本地队列 EXAMPLE.QUEUE
4. 本地队列 EXAMPLE.SENDQUEUE
5. 通道 EXAMPLE.CHANNEL打开 WebSphere MQ 资源管理器。展开队列管理器节点,右键,新建队列管理器。取名字为 QMGR ,设置侦听端口 4001 。
在建好的队列管理器 QMGR 下面新建 2 个本地队列: EXAMPLE.QUEUE , EXAMPLE.SENDQUEUE 。
展开高级节点,新建服务器连接通道 EXAMPLE.CHANNEL 。
Note :不要搞错队列和通道的类型。
2. 验证 MQ 配置打开 WebSphere MQ 服务。可以查看服务是否启动、服务监听端口。
3. 配置 WAS JMS具体操作参考《让 WebSphere MQ 成为部署在 WebSphere Application Server 中的应用程序的 JMS 提供程序》该文章可以在 IBM WebSphere 开发者技术期刊 中找到。
要配置的项目:
1. WebSphere MQ 连接工厂
JNDI name : jms/JMSExampleConnectionFactory
2. WebSphere MQ 队列目标
JNDI name : jms/JMSExampleQueue ;基本队列名: EXAMPLE.QUEUE ;目标客户机: JMS 。目标客户机决定了 MQ 队列接收方的消息格式。因为是用 MDB 接收消息,所以设置为 JMS 。另一个队列是由 MQ 客户端接收消息,所以另一个队列的目标客户机是 MQ 。如果配置错误, MQ 队列转换消息的格式将不是你所想要的。具体参考《 IBM - JMS 应用和使用 WebSphere MQ MQI 接口的应用如何进行信息头的交换(二)数据映射》
3. WebSphere MQ 队列目标
JNDI name : jms/JMSExampleSendQueue ;
基本队列名: EXAMPLE.SENDQUEUE ;目标客户机: MQ 。
4. 配置 MDB在 WAS 上配置 侦听器端口
名称: JMSExampleQueuePort ;
连接工厂 JNDI 名 jms/JMSExampleConnectionFactory ;
目标 JNDI 名: jms/JMSExampleQueue 。
Message Driven Beans 用于侦听消息的侦听器端口。每个端口指定 MDB 将侦听的(依据该端口部署的) JMS 连接工厂和 JMS 目标。MDB 部署描述符中配置
连接工厂 JNDI 名 jms/JMSExampleConnectionFactory ;
目标 JNDI 名: jms/JMSExampleQueue ;
监听端口名称: JMSExampleQueuePort (监听端口名称也可以在管理控制台中修改)
5. 代码客户端 A (发送方)
MqPut.java
package cn.edu.itec.mqclient;import java.io.File;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueueManager;public class MQPut {
private String HOST_URL = “192.168.1.116”;private String MQ_CHANNEL = "EXAMPLE.CHANNEL"; private String MQ_MANAGER = "QMGR"; private String MQ_QUEUE = "EXAMPLE.QUEUE"; private int MQ_PORT = 4001; public static void main(String args[]) { new MQPut().SendFile("f:/JMSExampleEJB.jar"); } public void SendFile(String sFilePath) { try { /* 设置 MQEnvironment 属性以便客户机连接 */ MQEnvironment.hostname = HOST_URL; MQEnvironment.channel = MQ_CHANNEL; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); MQEnvironment.CCSID = 1381; MQEnvironment.port = MQ_PORT; /* 连接到队列管理器 */ MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER); System.out.println("queue manager is connected!"); /* 设置打开选项以便打开用于输出的队列,如果队列管理器正在停止,我们也已设置了选项去应对不成功情况。 */ int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; /* 打开队列 */ com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions); /* 设置放置消息选项我们将使用默认设置 */ MQPutMessageOptions pmo = new MQPutMessageOptions(); /* 创建消息, MQMessage 类包含实际消息数据的数据缓冲区,和描述消息的所有 MQMD 参数 */ /* 创建消息缓冲区 */ MQMessage outMsg = new MQMessage(); /* set the properties of the message fot the selector */ outMsg.correlationId = "clinet_B_receive".getBytes(); outMsg.messageId = "1Aa".getBytes(); /* write msg */ MsgWriter.readFile(outMsg, new File(sFilePath)); /* put message with default options */ queue.put(outMsg, new MQPutMessageOptions()); System.out.println("send file is success!"); /* release resource */ queue.close(); qMgr.disconnect(); } catch (MQException ex) { //System.out.println("fft!"); System.out.println("An MQ Error Occurred: Completion Code is :\t" + ex.completionCode + "\n\n The Reason Code is :\t" + ex.reasonCode); ex.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } private void readFileToMessage(String FilePath) { }
}
JMS message 和 MQ message 有几个字段是相同的,这些字段的值将会在转换中保留。比较方便的是使用 CorrelationID 这个字段。通过设置这个字段,达到选择性的接收特定消息的功能。其它字段没有完全搞清楚,有的数据类型需要转换,例如 MessageID (对应于 JMSMessageID )。 MQ 消息选择和 JMS 不同,后者采用 selector ,前者通过设置接收消息的属性完成。例如设置 CorrelationID 为特定值。
客户端 BMQGet.java
package cn.edu.itec.mqclient;import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Hashtable;import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueueManager;/**
* @author Administrator
*
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class MQGet {
private static String HOST_URL = “192.168.1.116”;private static String MQ_CHANNEL = "EXAMPLE.CHANNEL"; private static String MQ_MANAGER = "QMGR"; private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE"; private static int MQ_PORT = 4001; public static void main(String[] args) { MQGet.getMessage(); } public static void getMessage() { try { /* 设置 MQEnvironment 属性以便客户机连接 */ MQEnvironment.hostname = HOST_URL; MQEnvironment.channel = MQ_CHANNEL; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); MQEnvironment.CCSID = 1381; MQEnvironment.port = MQ_PORT; /* 连接到队列管理器 */ MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER); System.out.println("queue manager is connected!"); /* * 设置打开选项以便打开用于输出的队列,如果队列管理器停止,我们也 已设置了选项去应对不成功情况 */ int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING; /* 打开队列 */ com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions); System.out.println(" 队列连接成功 "); /* 设置放置消息选项 */ MQGetMessageOptions gmo = new MQGetMessageOptions(); /* 在同步点控制下获取消息 */ gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT; /* 如果在队列上没有消息则等待 */ gmo.options = gmo.options + MQC.MQGMO_WAIT; /* 如果队列管理器停顿则失败 */ gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING; /* 设置等待的时间限制 */ gmo.waitInterval = MQC.MQWI_UNLIMITED; /* create the message buffer store */ MQMessage inMessage = new MQMessage(); /* set the selector */ inMessage.correlationId = "clinet_B_receive".getBytes(); /* get the message */ queue.get(inMessage, gmo); System.out.println("get message success"); /* 读出消息对象 */ Hashtable messageObject = (Hashtable) inMessage.readObject(); System.out.println(messageObject); /* 读出消息内容 */ byte[] content = (byte[]) messageObject.get("content"); /* save file */ FileOutputStream output = new FileOutputStream( "f:/exampleReceive.jar"); output.write(content); output.close(); System.out.println(messageObject.get("FileName")); /* 提交事务 , 相当于确认消息已经接收,服务器会删除该消息 */ qMgr.commit(); } catch (MQException e) { e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
MDB
MQMDBBeanBean.java MDB 文件
package ejbs;import javax.jms.ObjectMessage;
import javax.jms.BytesMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import ehub.ihub.exchangeManager.*;
import java.util.Hashtable;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.File;
import java.io.ObjectInputStream;/**
* Bean implementation class for Enterprise Bean: MQMDBBean
*/
public class MQMDBBeanBean implements javax.ejb.MessageDrivenBean,
javax.jms.MessageListener {
private javax.ejb.MessageDrivenContext fMessageDrivenCtx;/** * getMessageDrivenContext */ public javax.ejb.MessageDrivenContext getMessageDrivenContext() { return fMessageDrivenCtx; } /** * setMessageDrivenContext */ public void setMessageDrivenContext(javax.ejb.MessageDrivenContext ctx) { fMessageDrivenCtx = ctx; } /** * ejbCreate */ public void ejbCreate() { } /** * onMessage */ public void onMessage(javax.jms.Message msg) { try { System.out.println(msg.toString()); if (msg instanceof TextMessage) { System.out.println("TextMessage"); } else if (msg instanceof ObjectMessage) { System.out.println("ObjectMessage"); } else if (msg instanceof StreamMessage) { System.out.println("StreamMessage"); } else if (msg instanceof BytesMessage) { System.out.println("BytesMessage"); BytesMessage bytesMessage = (BytesMessage) msg; String sCorrelationID = new String(bytesMessage .getJMSCorrelationIDAsBytes()); String sMessageID = bytesMessage.getJMSMessageID(); long size = bytesMessage.getBodyLength(); System.out.println("size=" + size + "/n CorrelationID=" + sCorrelationID + "/n MessageID=" + sMessageID); /*read the message and save the file*/ ReadMessage.readMessage(bytesMessage); System.out.println("read message success"); /*send the message to the client */ SendMessage.sendFileToReceiveQueue(new File("c:/receivedExample.jar")); System.out.println("send file success"); } else { System.out.println("no message"); } } catch (Exception e) { System.out.println("onmessage 执行错误,回滚! "); e.printStackTrace(System.err); fMessageDrivenCtx.setRollbackOnly(); } } private void getProperties(byte[] p) { } /** * ejbRemove */ public void ejbRemove() { }
}
ReadMessage.java
/*
* Created on 2006-2-15
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package ehub.ihub.exchangeManager;import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Hashtable;import javax.jms.BytesMessage;
import javax.jms.JMSException;/**
* @author Administrator
*
*
*/
public class ReadMessage {
/**
* read message including property and body
*
* @param Message
* @throws JMSException
* @throws IOException
* @throws ClassNotFoundException
*/
public static void readMessage(BytesMessage Message) {
try {
long bodySize = Message.getBodyLength();byte[] buf = new byte[Integer.parseInt(String.valueOf(bodySize))]; /* 消息内容读到字节数组中 */ Message.readBytes(buf); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream( buf); /* 从字节流读出消息对象 */ ObjectInputStream objectInputStream = new ObjectInputStream( byteArrayInputStream); Hashtable messageObject = (Hashtable) objectInputStream .readObject(); /* 解析消息 */ byte[] contentBuf = (byte[]) messageObject.get("content"); /* 把文件保存 */ FileOutputStream fileWriter = new FileOutputStream( "c:/receivedExample.jar"); fileWriter.write(contentBuf); fileWriter.close(); } catch (JMSException e) { e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
SendMessage.java
/*
* Created on 2006-2-16
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package ehub.ihub.exchangeManager;import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Hashtable;import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;/**
* @author Administrator
*
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class SendMessage {
private static String MQ_CHANNEL = “EXAMPLE.CHANNEL”;private static String MQ_MANAGER = "QMGR"; private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE"; private static int MQ_PORT = 4001; private static String JMS_CONNECTIONFACTORY = "jms/JMSExampleConnectionFactory"; private static String QUEUE_NAME="jms/JMSExampleSendQueue"; public static void sendFileToReceiveQueue(File file) { try { Context initContext = new InitialContext(); ConnectionFactory qconFactory = (ConnectionFactory) initContext .lookup(JMS_CONNECTIONFACTORY); Connection qcon = qconFactory.createConnection(); Session session = qcon.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = (Queue) initContext.lookup(QUEUE_NAME); MessageProducer producer = session.createProducer(queue); ObjectMessage outMessage=session.createObjectMessage(); /* write the file information into the message */ Hashtable fileInfo = new Hashtable(); fileInfo.put("FileName", file.getName()); fileInfo.put("FileSize", Long.toString(file.length())); /* write the file content into the message */ FileInputStream fos = new FileInputStream(file); byte[] buf; int size = (int) file.length(); buf = new byte[size]; int num = fos.read(buf); fos.close(); /*add the file byte stream to the object*/ fileInfo.put("content", buf); outMessage.setObject(fileInfo); outMessage.getObject(); outMessage.setJMSCorrelationIDAsBytes((new String("clinet_B_receive")).getBytes());
// qcon.start();
producer.send(outMessage); producer.close(); session.close(); qcon.close(); } catch (NamingException e) { System.out.println(" 获得连接失败 ,jndi 查找失败 "); e.printStackTrace(); } catch (JMSException e) { System.out.println(" 发送文件异常 "); // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block System.out.println(" 发送文件过程中 io 操作失败 "); e.printStackTrace(); } }
}
-
Java语言快速实现简单MQ消息队列服务
2021-04-23 14:25:04使用JAVA语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ) 主要角色首先我们必须需要搞明白MQ (消息队列)中的三个基本角色ProducerBrokerConsumer整体架构如下所示 自定义协议首先从上一篇中介绍了协议的相关信息,... -
MQ java 编程指南
2008-09-24 09:49:11MQ java 编程指南, 如果你打算使用java 开发mq 应用, 就不要错过这本书. -
JAVA IBM MQ 接收、发送
2012-05-13 16:43:46JAVA IBM MQ 接收消息、发送消息例子 -
Websphere Message Broker实践,WebSphere MQ Java编程
2011-10-28 10:02:17Websphere Message Broker实践,WebSphere MQ Java编程,Message Broker 计时器节点编程模式,MessageBroker TCPIP通信协议,wmb关于ws服务的引用,WMB连接oracle数据库实践,全部组件 -
IBMMQ配置SSL连接
2018-08-30 14:30:08手把手教你怎么配置IBM WEBSPHERE MQ通过SSL连接,附详细步骤和实例代码 -
MQ工具类java
2017-04-16 11:42:52包含了IBM的MQ初始化,发送,接收的工具类,方便极了,可直接放入到项目中。 -
java实现MQ消息收发两种方式
2021-02-12 09:15:12定义:消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。简单理解:蓝牙配对jar包依赖:com.rabbitmqamqp-client3.0.4... -
JAVA如何获取mq 发送的信息
2021-04-22 05:34:19JAVA如何获取mq 发送的信息获取信息时报错:MQJE001: 完成代码为“2”,原因为“2045”。A WebSphere MQ error occurred : Completion code 2 Reason code 2045下面是我的测试环境队列管理器和队列配置通道配置代码... -
java 连接IBM MQ的方法和常见问题
2021-02-12 17:14:47第四步:配置class文件以上就是基本配置,关于主要地方加以了注释,关于经常遇到的2035错误主要是因为权限报错,windows版的mq就把连接认证关闭,然后连接用户确认是mqm用户组的用户即可,目前这两种可能性排除可以... -
用java调用zeromq
2019-04-13 01:37:16NULL 博文链接:https://kisseveryone.iteye.com/blog/1728187 -
高级JAVA开发 MQ部分
2019-05-15 02:18:43高级JAVA开发 MQ部分MQMQ的作用、为什么要用MQ常见的MQ的优缺点使用MQ带来的问题以及处理办法MQ带来的问题列举消息重复消费(幂等)问题消息丢失问题消息顺序性问题消息过期丢失、大量积压等问题如何保证MQ高可用性...