-
2017-02-14 14:10:13更多相关内容
-
电信设备-监控系统间通讯状态的方法、装置和系统.zip
2021-09-18 14:47:14电信设备-监控系统间通讯状态的方法、装置和系统.zip -
系统间通讯实现数据信息实时同步解决方案
2018-03-24 17:53:23项目开发阶段遇到一个需求,描述大致就是同一个用户在A系统数据库保存的数据信息与在B系统... 这个可以根据各企业各自的系统间通讯方式来灵活处理。这里我介绍我运用的处理方式,作为经验总结记录和分享。 深谙sp...项目开发阶段遇到一个需求,描述大致就是同一个用户在A系统数据库保存的数据信息与在B系统数据库保存的数据信息要保持同步。当A系统用户修改了个人信息,A系统后台在将用户修改后的信息入库的同时也会向B系统发送消息,让B系统后台进行自动数据信息同步。
这个可以根据各企业各自的系统间通讯方式来灵活处理。这里我介绍我运用的处理方式,作为经验总结记录和分享。
深谙spring的实现原理:使用dom4j技术对xml文件进行解析读取扫描注解通过反射机制来进行对象的创建,于是解决上述需求的方案由此得到启发。对于我们实际系统来说,这就是一个小框架,扩展性非常好,后来者只需要专注业务逻辑的实现即可完成数据同步的需求。
下面先贴目录结构
这里运用先缓存业务逻辑处理方法的策略,即在服务器启动的时候就将写好的业务逻辑处理方法缓存到内存中,通过监听器监听到其他系统有发送同步消息时自动调用相应的处理方法来进行数据同步。
要缓存服务,需要用到注解和反射
下面贴上两个自定义注解:分别是类注解和方法注解
package com.zy.isc.common; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * <p>Title: IscClassMapping</p> * <p>Description: 消息业务处理类注解 * 用于标识类为消息处理类,和IscMethodMapping方法注解配合使用 * spring容器加载完成后会将具休的业务方法缓存起来,用于处理消息。 * </p> * <p>Company: * @author kjkfb_zy 2017-7-31 * <p>Just do it!!!</p> * @version v1.0 */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface IscClassMapping { }
package com.zy.isc.common; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * <p>Title: IscMethodMapping</p> * <p>Description: 消息业务处理方法注解</p> * <p>Company: * @author kjkfb_zy 2017-7-31 * <p>Just do it!!!</p> * @version v1.0 */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface IscMethodMapping { public String name(); public String desc() default ""; }
然后定义一个通用的业务处理类,通过这个类来保存注解类对象,然后运用反射机制来调用具体的业务逻辑处理方法
package com.zy.isc.common; import java.io.Serializable; import java.lang.reflect.Method; /** * <p>Title: ServiceBean</p> * <p>Description: 保存到map中的业务bean</p> * <p>Company: * @author kjkfb_zy 2017-7-31 * <p>Just do it!!!</p> * @version v1.0 */ public class ServiceBean implements Serializable{ private static final long serialVersionUID = 7453372917648514518L; private Method method; private Object object; private String desc; public Method getMethod() { return method; } public void setMethod(Method method) { this.method = method; } public Object getObject() { return object; } public void setObject(Object object) { this.object = object; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } }
spring容器初始化时还需要做的另一件事——将带有注解的类和方法缓存在map中,key值就是方法上面的注解value值,key对应的value就是带注解的对应的业务处理类对象实例
package com.zy.isc.core_receive; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import com.zy.isc.common.IscClassMapping; import com.zy.isc.common.IscMethodMapping; import com.zy.isc.common.ServiceBean; /** * <p>Title: InitServiceMethodMapping</p> * <p>Description: spring容器启动时调用这里的初始化方法,将带有自定义注解的类和方法缓存在map中</p> * <p>Company: * @author kjkfb_zy 2017-7-31 * <p>Just do it!!!</p> * @version v1.0 */ public class InitServiceMethodMapping { private static Logger logger = LoggerFactory.getLogger(InitServiceMethodMapping.class); private static Map<String,ServiceBean> map = null; private InitServiceMethodMapping(){} public static Map<String,ServiceBean> getMethodMap(){ return map; } public static synchronized void init() throws Exception{ if(map == null){ logger.info("initialize biz interface object and save into map start"); map = new HashMap<String, ServiceBean>(); ApplicationContext context = SpringContextReceiveUtil.getApplicationContext(); for(String s : context.getBeanDefinitionNames()){ Class<?> c = context.getBean(s).getClass(); if(c.getAnnotation(IscClassMapping.class)!=null){ Method[]method = c.getDeclaredMethods(); ServiceBean serviceBean = null; for(Method m : method){ IscMethodMapping mksIscMethodMapping = m.getAnnotation(IscMethodMapping.class); if(mksIscMethodMapping!=null){ if(!map.containsKey(mksIscMethodMapping.name())){ serviceBean = new ServiceBean(); serviceBean.setObject(context.getBean(s)); serviceBean.setMethod(m); serviceBean.setDesc(mksIscMethodMapping.desc()); map.put(mksIscMethodMapping.name(),serviceBean); logger.info("@biz interface name:["+mksIscMethodMapping.name()+"],already saved in cached map@"); }else{ throw new Exception("initialize biz interface failed, name:["+mksIscMethodMapping.name()+"]repeated,please modify then try,classpath:"+c.getName()); } } } } } logger.info("initialize biz interface object and save into map start,total biz interface count:"+map.size()); } } }
然后在spring容器启动的时候调用上述类中的初始化方法和启动消息监听器
package com.zy.isc.core_receive; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import com.pingan.isc.ISCMessageBroker; import com.zy.isc.handler.ServerHandler; public class SpringContextReceiveUtil implements ApplicationContextAware{ private Logger logger = LoggerFactory.getLogger(SpringContextReceiveUtil.class); private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext)throws BeansException { logger.info("initialize spring context start:"+applicationContext); SpringContextReceiveUtil.applicationContext = applicationContext; logger.info("initialize spring context end, bean count:"+applicationContext.getBeanDefinitionCount()); try { //这里缓存业务接口时容器还没有完全启动完成,所以使用纯线程来启动消息中心监听程序,以免影响启动超时的情况 InitServiceMethodMapping.init(); //启动消息监听 initMessageHandler(); } catch (Exception e) { logger.error("initialize spring context failed",e); } } public static ApplicationContext getApplicationContext(){ return applicationContext; } public void initMessageHandler() { try { logger.info("initialize MSG listener start"); //启动消息监听 int corePoolSize = 10; int maximumPoolSize = 20; int keepAliveTime = 300; int queueSize = 100; ServerHandler handler = new ServerHandler(); ISCMessageBroker.MessageExecutor(corePoolSize, maximumPoolSize, keepAliveTime, queueSize, handler); logger.info("initialize MSG listener end"); } catch (Exception e) { logger.error("initialize MSG listener exception",e); } } }
消息监听器启动时需要指定消息处理器,这个处理器实现了MessageHandler接口,一旦有消息从其他系统发过来,监听器监听到消息到来就会调用messageReceived(Object arg0)这个方法,参数即为接收到的消息
package com.zy.isc.handler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.pingan.isc.core.MessageHandler; import com.zy.isc.core_receive.ServiceReceiveExecutor; public class ServerHandler implements MessageHandler { private static final Logger logger = LoggerFactory.getLogger(ServerHandler.class); @Override public void messageReceived(Object arg0) throws Exception{ try { logger.info("=======invoke biz method start======="); long start = System.currentTimeMillis(); ServiceReceiveExecutor.execute(arg0); long end = System.currentTimeMillis(); logger.info("=======invoke biz method end======="); logger.info("time cost:"+(end-start)/1000); }catch (Exception e) { logger.error("Message Received Exception"+arg0,e); } } }
然后在这个类的messageReceived(Object arg0)方法中再调用接收消息执行器将接收到的消息进行处理,解析消息内容得到里面用来标记具体业务逻辑处理方法的值,然后将该值与缓存在map中的key值比对,找到对应的方法用反射来调用。
package com.zy.isc.core_receive; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import java.util.ResourceBundle; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.dc.eai.data.CompositeData; import com.dc.eai.data.Field; import com.dcfs.esb.client.converter.PackUtil; import com.zy.isc.common.ServiceBean; /** * <p>Title: ServiceReceiveExcutor</p> * <p>Description:接收ISC消息执行具体的业务方法 </p> * <p>Company: * @author kjkfb_zy 2017-7-31 * <p>Just do it!!!</p> * @version v1.0 */ public class ServiceReceiveExecutor { private static final Logger logger = LoggerFactory.getLogger(ServiceReceiveExecutor.class); //读取配置文件 private static ResourceBundle resource; static{ resource = ResourceBundle.getBundle("IscConfigPlus"); } public static String getValue(String key){ return resource.getString(key); } /** * <p>Description:根据消息子主题和业务码执行具体的业务方法 </p> * @param message * @return */ public static Object execute(Object message) throws Exception{ logger.info("===== unpack message start ====="); Map<String, Object> map = unpackMSG(message); String uniqueId = null; CompositeData compositeData = null; CompositeData body = null; if (map != null && map.size() > 0) { uniqueId = (String) map.get("uniqueId"); compositeData = (CompositeData) map.get("compositeData"); body = compositeData.getStruct("BODY"); }else { logger.info("message is null"); return null; } logger.info("===== unpack message end ====="); try { if(StringUtils.isBlank(uniqueId)){ logger.error("uniqueId is null,no method matches , message infomation:\r\n"+message); throw new Exception("uniqueId is null,no method matches , message infomation:\r\n"+message); } boolean isContainsKey = InitServiceMethodMapping.getMethodMap().containsKey(uniqueId); if (isContainsKey) { ServiceBean serviceBean = InitServiceMethodMapping.getMethodMap().get(uniqueId); logger.info("request biz interface's ID:["+uniqueId+"],biz interface description["+serviceBean.getDesc()+"]"); return serviceBean.getMethod().invoke(serviceBean.getObject(),body); }else { logger.info("no method maches the request,message information\r\n" + compositeData ); } } catch (Exception e) { logger.error("biz method exception,args:\r\n",e); throw e; } return null; } /** * <p>Description: 标准报文体解包,将报文中的消息子主题和交易码拼接后作为业务逻辑方法的唯一标识</p> * @param message * @return */ public static Map<String,Object> unpackMSG(Object message){ Map<String, Object> retMap = new HashMap<String, Object>(); if (message != null) { //解析出报文体,存到map中 CompositeData compositeData = PackUtil.unpackXmlStr((String)message); //打印此日志方便查看报文,生产环境去掉 logger.debug("message content:\r\n" + compositeData); retMap.put("compositeData", compositeData); CompositeData body = compositeData.getStruct("BODY"); retMap.put("body", body); Map<String, Object>dataMap = new HashMap<String, Object>(); Enumeration<String> keys = resource.getKeys(); while (keys.hasMoreElements()) { String key = (String) keys.nextElement(); String value = getValue(key); CompositeData struct = compositeData.getStruct(value); if (struct != null && struct.size()>0) { logger.debug("key:value ——> " + key+":"+value); dataMap.put(key, struct); } } logger.debug("dataMap:\r\n"+dataMap); if (dataMap != null && dataMap.size()>0) { CompositeData iscSysHeadCompositeData = (CompositeData) dataMap.get("iscSysHead"); CompositeData iscPubHeadCompositeData = (CompositeData) dataMap.get("iscPubHead"); Field subTopicField = iscSysHeadCompositeData.getField("SUB_TOPIC"); Field serviceCodeField = iscPubHeadCompositeData.getField("SERVICE_CODE"); if (subTopicField != null) { String subTopic = subTopicField.getValue().toString(); logger.debug("message subtopic: " + subTopic); retMap.put("subTopic", subTopic); }else { retMap.put("subTopic", ""); } if (serviceCodeField != null) { String serviceCode = serviceCodeField.getValue().toString(); logger.debug("message serviceCode: " + serviceCode); retMap.put("serviceCode", serviceCode); }else { retMap.put("serviceCode", ""); } String subTopic = retMap.get("subTopic").toString(); String serviceCode = retMap.get("serviceCode").toString(); String uniqueId = subTopic + serviceCode; retMap.put("uniqueId", uniqueId); }else { logger.info("dataMap is null,uniqueId is null"); } } return retMap; } }
这个ServiceReceiveExecutor类会将消息中解析出来的报文body通过反射参数传到具体的业务逻辑处理类中,最后就是具体的业务逻辑处理类了,这个类或者方法可以按相同的方式进行任意扩展
package com.zy.isc.service; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import com.dc.eai.data.CompositeData; import com.dc.eai.data.Field; import com.zy.isc.common.IscClassMapping; import com.zy.isc.common.IscMethodMapping; /** * <p>Title: UserInfoService</p> * <p>Description: 接收BECIF广播用户信息Service</p> * <p>Company: * @author kjkfb_zy 2017-7-31 * <p>Just do it!!!</p> * @version v1.0 */ @Service @IscClassMapping public class UserInfoService { private static final Logger logger = LoggerFactory.getLogger(UserInfoService.class); //服务类 //@Resource(name = "userService") //private UserService userService; //name 唯一标识 = 子主题(20005)+ 交易码(000012) @IscMethodMapping(name="20005000012",desc="xxx业务需求描述") public void userInfoCombine(CompositeData compositeData) throws Exception{ Field clientNoField = compositeData.getField("CLIENT_NO"); Field clientNo1Field = compositeData.getField("CLIENT_NO1"); String clientNo = null; String clientNo1 = null; if (clientNoField != null) { clientNo = clientNoField.getValue().toString(); logger.info("combine becif:" + clientNo); } if (clientNo1Field != null) { clientNo1 = clientNo1Field.getValue().toString(); logger.info("combined becif: " + clientNo1); } if (StringUtils.isNotBlank(clientNo1) && StringUtils.isNotBlank(clientNo)) { Map<String, Object> paramMap = new HashMap<String, Object>(); paramMap.put("clientNo", clientNo); paramMap.put("aClentNo", clientNo1); //boolean flag = scfpUserService.checkBecifExist(clientNo1); boolean flag = false; logger.info("becif exist:" + flag); if (flag) { try { //scfpUserService.combineUserBecif(paramMap); } catch (Exception e) { logger.error("userInfoCombine() method exception:" + e.getMessage()); } } } } }
最后还有一个容器销毁时释放缓存的监听器
这样整个小框架就完毕了,使用时只需要在spring的配置文件中将SpringContextReceiveUtil这个类的bean配置好,在web.xml中配置StartupListener这个监听器就可以使用了。接收消息按UserInfoService类方法上面注解唯一标识来区分。后续还有其他消息要接收,直接按照这种注解方式在UserInfoService类中扩展或者另外新增类似UserInfoService类都可以。package com.zy.isc.listener; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.pingan.isc.ISCMessageBroker; /** * <p>Title: StartupListener</p> * <p>Description:</p> * <p>Company: * @author kjkfb_zy 2017-7-31 * <p>Just do it!!!</p> * @version v1.0 */ public class StartupListener implements ServletContextListener { private static final Logger LOGGER = LoggerFactory.getLogger(StartupListener.class); @Override public void contextInitialized(ServletContextEvent arg0) { LOGGER.info("===== MSG listener preparation ====="); } @Override public void contextDestroyed(ServletContextEvent arg0) { ISCMessageBroker.destroyed(); LOGGER.info("===== ISCMessageBroker destroyed,resource release ====="); } }
-
Https双向认证,实现系统间通讯双向认证
2019-03-26 12:29:04现在公司系统较多,多系统间需要进行通讯,已经实现了https单项认证,现在为了保证通讯的安全,需要实现https双向认证,让系统间通讯更安全。 2.问题 当时拿到这个问题,第一时间去网上百度https双向认证的案例,...1.背景
现在公司系统较多,多系统间需要进行通讯,已经实现了https单项认证,现在为了保证通讯的安全,需要实现https双向认证,让系统间通讯更安全。
2.问题
当时拿到这个问题,第一时间去网上百度https双向认证的案例,试了好多种,有单项认证的实现,还有双向认证基于浏览器端的访问,对于两个系统之间的https双向认证很少,自己也浪费了好多时间没有成功,于是,下定决心,先把https双向认证的原理搞清楚,懂了原理后,解决起来问题就方便了,下面附上https单项认证和双向认证的原理图
通过图片可知,https单项认证是客户端对服务端的认证,双向认证是在此基础上加了服务端对客户端的认证!
3.前提准备
1. 多个springboot项目,以两个springboot项目为例,这里暂且叫Client项目,Server项目。
2. Client项目所在的服务器,Client.p12(客户端证书库) ,Client.cer(客户端公钥);
Server项目所在的服务器,Server.p12(服务端证书库),Server.cer(服务端公钥);
4.实现步骤
1. 创建springboot项目,Client ,Server 此处,请自行百度创建如何创建springboot项目。
2. 使用jdk自带的keytool工具,生成Client和Server端相应的证书,步骤如下:
a. Client端证书生成步骤:
1.生成客户端 Client.p12文件
keytool -genkey -v -alias Client -keyalg RSA -storetype PKCS12 -keystore C:\D\jdk1.8.0_161\Client.p12
设置密码: lq123456
注意事项:生成证书,您的名字与姓氏一项,应该填写服务器的ip(此处应该是域名,但是没有域名,故此处填写服务器ip)
2 . 导出客户端公钥Client.cer 文件
keytool -keystore C:\D\jdk1.8.0_161\Client.p12 -export -alias Client -file C:\D\jdk1.8.0_161\Client.cerb. Server端证书生成步骤
1. 生成服务端Server.p12文件
keytool -genkey -v -alias Server -keyalg RSA -storetype PKCS12 -keystore C:\D\jdk1.8.0_161\Server.p12
设置密码: lq123456
注意事项:生成证书,您的名字与姓氏一项,应该填写服务器的ip(此处应该是域名,但是没有域名,故此处填写服务器ip)
2. 导出服务端公钥Server.cer 文件keytool -keystore C:\D\jdk1.8.0_161\Server.p12 -export -alias Server -file C:\D\jdk1.8.0_161\Server.cer
c. 将Client端和Server端的公钥文件(.cer文件)导入双方系统的jre运行环境的cacerts证书库
1. 将客户端公钥导入的服务端jdk信任库
keytool -import -file Client.cer -keystore C:\D\jdk1.8.0_161\jre\lib\security\cacerts –v2. 将服务端公钥导入到客户端的jdk信任库
keytool -import -file Server.cer -keystore C:\D\jdk1.8.0_161\jre\lib\security\cacerts –v3. 将客户端公钥导入到服务端Server.p12证书库
keytool -import -v -file C:\D\jdk1.8.0_161\Client.cer -keystore C:\D\jdk1.8.0_161\server.p12
注意事项:此处导入的密码为changeit,默认密码
至此,证书生成完成,证书库导入完成!
d. 代码实现
1.Server端
第一步:在application.properties中添加如下配置:包括本地证书库和受信任证书配置
server.port=8090 server.address=10.119.165.171 server.ssl.key-store=classpath:server.p12 server.ssl.key-store-password=lq123456 server.ssl.key-alias=server server.ssl.keyStoreType=JKS server.ssl.trust-store=classpath:server.p12 server.ssl.trust-store-password=lq123456 server.ssl.client-auth=need server.ssl.trust-store-type=JKS server.ssl.trust-store-provider=SUN
第二步:服务端接口开放
package com.example.server1.controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author lucasliang * @date 01/03/2019 3:21 下午 * @Description */ @RestController @RequestMapping("/server") public class ServerController { @RequestMapping("/hello") public String getUrlInfo() { return "************request https success************"; } }
2. Client端
第一步:在application.properties中添加如下配置:
server.port=8091 server.address=10.119.165.171 server.ssl.key-store=classpath:client.p12 server.ssl.key-store-password=lq123456 server.ssl.key-alias=client server.ssl.keyStoreType=JKS
第二步:单元测试:
package com.example.client; import java.io.FileInputStream; import java.security.KeyStore; import java.security.SecureRandom; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * @author lucasliang * @date 04/03/2019 2:13 下午 * @Description */ @SpringBootTest(classes = {Client1ApplicationTests.class}) @RunWith(SpringRunner.class) public class P12CertTest { private final static String TEST_URL = "https://10.119.165.171:8090/server/hello"; @Test public void getHKVesselTrip() throws Exception { KeyStore clientStore = KeyStore.getInstance("PKCS12"); clientStore .load(new FileInputStream("C:\\D\\jdk1.8.0_161\\shuangxiang\\client_original.p12"), "lq123456".toCharArray()); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(clientStore, "lq123456".toCharArray()); KeyManager[] kms = kmf.getKeyManagers(); TrustManagerFactory tmf = TrustManagerFactory .getInstance(TrustManagerFactory.getDefaultAlgorithm()); KeyStore trustStore = KeyStore.getInstance("JKS"); trustStore.load(new FileInputStream("C:\\D\\jdk1.8.0_161\\jre\\lib\\security\\cacerts"), "changeit".toCharArray()); tmf.init(trustStore); TrustManager[] tms = tmf.getTrustManagers(); SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(kms, tms, new SecureRandom()); SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER); CloseableHttpClient httpclient = HttpClients.custom().setSSLSocketFactory(sslsf).build(); try { HttpGet httpget = new HttpGet(TEST_URL); System.out.println("executing request" + httpget.getRequestLine()); CloseableHttpResponse response = httpclient.execute(httpget); try { HttpEntity entity = response.getEntity(); if (entity != null) { System.out.println(EntityUtils.toString(entity)); } } finally { response.close(); } } finally { httpclient.close(); } } }
此时,Client端发送请求到Server端,请求成功,https双向认证完成!
5. 心得
凡事搞懂原理,事半功倍!
-
Linux系统中的进程间通讯机制.pdf
2021-09-07 00:54:18Linux系统中的进程间通讯机制.pdf -
ERP系统间的数据通讯方法、装置以及系统.zip
2021-09-05 15:13:13ERP系统间的数据通讯方法、装置以及系统.zip -
个人防火墙系统内部模块间通讯的设计与实现.pdf
2021-11-15 10:01:01个人防火墙系统内部模块间通讯的设计与实现.pdf -
电信设备-火灾报警系统与感温探测系统间的通讯协议模块设备.zip
2021-09-14 11:18:06电信设备-火灾报警系统与感温探测系统间的通讯协议模块设备.zip -
spring集成rabbitmq并实现两个系统间的通信
2018-12-12 17:38:23spring集成rabbitmq并实现两个系统间的通信。具体的使用可以看我的文章讲解 -
基于虚拟仪器的ARINC429通讯系统设计
2021-01-13 01:58:22ARINC 429 是一种用于飞机各系统间或系统与设备间互连的航空总线,是各系统间或系统与设备间数字信息传输的主要路径,是飞机的神经网络。本文针对在虚拟仪器平台上实现航空电子总线ARINC429通讯系统的目的,采用硬件... -
电信设备-TD-SCDMA通讯系统中频率间切换系统.zip
2021-09-18 00:10:25电信设备-TD-SCDMA通讯系统中频率间切换系统.zip -
电信设备-基于内核内存共享的容器间通讯系统及方法.zip
2021-09-18 11:38:30电信设备-基于内核内存共享的容器间通讯系统及方法.zip -
电信设备-WiFi终端间通讯方式监控RFID的系统、方法.zip
2021-09-18 00:20:46电信设备-WiFi终端间通讯方式监控RFID的系统、方法.zip -
电信设备-隧道火灾报警系统与光纤光栅感温探测系统间的通讯协议模块设备.zip
2021-09-14 12:52:46电信设备-隧道火灾报警系统与光纤光栅感温探测系统间的通讯协议模块设备.zip -
电信设备-一种班级间的通讯系统.zip
2021-09-14 13:23:07电信设备-一种班级间的通讯系统.zip -
XP系统下用内存映射实现进程间的通讯
2013-06-03 16:31:06该文档讲述在XP系统下,用内存映射实现进程间的通讯! -
架构设计:系统间通信(19)——MQ:消息协议(上)
2016-03-07 15:39:34从本文开始,我们介绍另一类型的系统间通讯及输:MQ消息队列。首先我们将讨论几种常用消息队列协议的基本原理和工作方式,包括MQTT、XMPP、Stomp、AMQP、OpenWire等。然后在这个基础上介绍两款MQ产品:ActiveMQ和...1、概述
从本文开始,我们介绍另一类型的系统间通讯及输:MQ消息队列。首先我们将讨论几种常用消息队列协议的基本原理和工作方式,包括MQTT、XMPP、Stomp、AMQP、OpenWire等。然后在这个基础上介绍两款MQ产品:ActiveMQ和RabbitMQ,它们是现在业务系统中应用广泛的消息队列软件。包括他们的安装、运行、支持协议、集群化和调用方式。
当然,在这个过程中我们还会提到其他的消息队列协议(或者实现),例如微软JBossMQ、MSMQ、商业化产品WebSphere MQ、Oracle高级队列(AQ)等。我们还会讨论这些眼花缭乱的协议、软件、程序库之间的关系。
随后我们会花一些篇幅,讨论现在新兴的消息队列Kafka和ZeroMQ。它们的应用越来越广泛,尤其在大数据的采集方面。最后我们将使用消息队列搭建一个高性能的日志采集系统,作为实战。
2、基本概念
2-1、消息
首先有三个基本概念在开篇前我们需要进行讨论:消息、消息协议、消息队列。消息既是信息的载体 这个描述相信各位读者都能够明白。为了让消息发送者和消息接收者都能够明白消息所承载的信息(消息发送者需要知道如何构造消息;消息接收者需要知道如何解析消息),它们就需要按照一种统一的格式描述消息,这种统一的格式称之为消息协议。所以,有效的消息一定具有某一种格式;而没有格式的消息是没有意义的。
而消息从发送者到接收者的方式也有两种。一种我们可以称为即时消息通讯,也就是说消息从一端发出后(消息发送者)立即就可以达到另一端(消息接收者),这种方式的具体实现就是我们已经介绍过的RPC(当然单纯的http通讯也满足这个定义);另一种方式称为延迟消息通讯,即消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端。 这个容器的一种具体实现就是消息队列。
2-2、知识结构
消息队列和已经介绍过的RPC相同的是:无论是RPC也好,消息队列也好他们都建立在网络IO模型基础上(我们已经介绍过多种网络IO模型)。先进的网络IO模型将赋予MQ协议优异的性能表现(当然,性能也不仅仅取决于网络IO模型)。
从上图可以看到,某一种消息通讯软件(或者叫做程序库)的实现都建立在“协议”基础上:RMI程序库建立在RMI协议上(RMI协议是JAVA规范协议的一部分) ,属于一种“即时消息通讯”;RabbitMQ和Qpid消息通讯软件的设计依据是AMQP协议,属于一种“延迟消息通讯”。
虽然消息协议存在“私有协议”和“开放协议”之分(是否向行业开放消息规范文档、是否允许某个组织更改协议),虽然某一个软件(程序库)不一定只支持一种协议(例如ActiveMQ实现了多种消息协议),虽然某一种协议也不一定只有一种软件(程序库)实现(例如能够支持webservice协议的程序库就有Codehaus XFire、Apache CXF、Jboss RESTEasy等),但是这并不影响“某一种消息通讯软件(或者叫做程序库)的实现都建立在“协议”基础上”的概念,反而是这个基本概念加强了。
3、消息协议
那么要理解消息队列,我们就应该从这些支持“延迟消息通讯”的消息协议开始讨论。这个小节我们首先为各位读者介绍几种使用的消息协议,他们是XMPP、Stomp和AMQP。为了承接后文我们讲解的MQ软件,这三个协议中我们又着重讲解AMQP协议。
3-1、XMPP协议
3-1-1、定义
XMPP is the Extensible Messaging and Presence Protocol, a set of open technologies for instant messaging, presence, multi-party chat, voice and video calls, collaboration, lightweight middleware, content syndication, and generalized routing of XML data.
以上内容引用自XMPP官网,这个定义已经可以清楚表明XMPP协议的用途和特性。XMPP的前身是Jabber,一个开源形式组织制定的网络即时通信协议。XMPP目前被IETF国际标准组织完成了标准化工作。
XMPP基于XML,用于IM系统的开发。国内比较流行的XMPP服务器叫做Openfire,它使用MINA作为下层的网络IO框架(不是MINA2是MINA1);国外用的比较多的XMPP服务器叫做Tigase,它的官网号称单节点可以支撑50万用户在线,集群可以支持100万用户在线:(http://projects.tigase.org/)
Cluster with over 1mn online users . 500k online users on a single machine
当然如果读者所在公司需要开发IM系统,除了使用现成的XMPP服务器以外,还需要实现了XMPP协议的客户端或者开发包(以便进行扩展开发)。您可以在XMPP官网查看到XMPP官方推荐的开发包,各种语言的支持基本上都有:http://xmpp.org/software/libraries.html
笔者曾参与过某几款IM系统的开发(包括自己创业的项目),总的来说XMPP协议本身是不错的选择,但是学习起来会耗费相当的时间,并且某些XMPP客户端、服务器端或者程序库并没有这些开发团队宣传的那么稳定好用。所以如果您的公司需要进行IM系统的开发,那么创立私有的消息协议也会是一个不错的选择。
3-1-2、协议通讯过程示例
为了让各位读者对XMPP协议有一个感性认识,这里我们给出一个XMPP协议处理“IM用户登录”操作的过程(XMPP的登录方式分为有用户密码和无用户密码两种方式,这里我们介绍无密码登录方式)。
XMPP协议本身细节比较丰富,这里我们只讨论登录操作,如果读者有兴趣可以下载全套的XMPP官方规范文档进行研究(http://xmpp.org/):
通过上图可以看到,XMPP协议中的xml片段。这里出现了几个XMPP协议中的关键信息,例如:
stream标记:通讯流标记,是指XMPP的客户端或者服务器端向对方发起的通讯请求(或者响应)。通讯流并不携带正真的内容信息,指示表明客户端和服务器端发生了一次交互。stream的属性包括:to、from、id、xml:lang、version等。
iq标记:iq标记是Info/Query的简称(你可以理解成查询信息请求),一般是一组的形式出现,由客户端发起查询请求,由服务器端返回查询结果。由于查询请求的类型不一样,iq标记中可以嵌入的子标记就有很多。例如,可以嵌入bind标记,表明某个用户和jid的绑定关系;可以嵌入多个item标记,表明查询得到的这个用户的好友信息(如下)。
<iq to='somenode@example.com/someresource' type='result' id='roster'> <query xmlns='jabber:iq:roster'> <item jid='friend1@example.com' name='someone1'/> <item jid='friend2@example.com' name='someone2'/> </query> </iq>
- jid标记:jid(JabberID)是XMPP协议中标示,它用来标示XMPP网络中的各个XMPP实体(实体可以是某一个用户、某一个服务器、某一个聊天室),规范格式如下:
jid = [ node "@" ] domain [ "/" resource ]
- 还有未出现的message、presence标记:message是实体内容标记,记录了聊天的真实内容;presence标记表示了XMPP用户的服务状态(离线,在线、忙碌等)。示例如下:
<message to="somenode@example.com/someresource" type="chat"> <body>helloword。。。</body> </message>
3-2、Stomp协议
3-2-1、定义
Stomp协议,英文全名Streaming Text Orientated Message Protocol,中文名称为 ‘流文本定向消息协议’。是一种以纯文本为载体的协议(以文本为载体的意思是它的消息格式规范中没有类似XMPP协议那样的xml格式要求,你可以将它看作‘半结构化数据’)。目前Stomp协议有两个版本:V1.1和V1.2。
一个标准的Stomp协议包括以下部分:命令/信息关键字、头信息、文本内容。如下图所示:
以下为一段简单的协议信息示例:
CONNECT accept-version:1.2 someparam1:value1 someparam2:value2 this is conntecon ^@
上面的示例中,我们使用了Stomp协议的CONNECT命令,它的意思为连接到Stomp代理端,并且携带了要求代理端的版本信息和两个自定义的K-V信息(请注意’^@’符号,STOMP协议中用它来表示NULL)。
Stomp协议中有两个重要的角色:STOMP客户端与任意STOMP消息代理(Broker)。如下图所示:
看了上面的示意图后有的读者可能会问:为什么称为Stomp消息代理,而不称为Stomp消息服务?因为Stomp Broker只是负责接受和存储客户端发来的消息、只是按照客户端要求的路径转发消息,只是管理客户端连接和订阅:它并不负责根据消息内容做任何业务处理。所以将它称为消息代理端更贴切。
由于Stomp协议的结构如此简单,以至于任何理解Stomp协议命令格式的技术人员都可以开发Stomp的代理端或者Stomp的客户端,并将自己满足Stomp协议的系统轻松接入另一个同样满足Stomp协议的第三方系统(例如activeMQ)。
3-2-2、基本命令/返回信息
和介绍XMPP协议的方式类似,为了让读者对Stomp协议有进一步的认识,本小节我们介绍Stomp协议的基本命令和代理端返回的信息种类,并且列举一些实例进行使用讲解。
在Stomp协议中,主要有以下命令/返回信息(有的文章中也称一个完整的信息为帧)。这些命令/返回信息构成了Stomp协议的主体,并能够支持您的Stomp客户端和Stomp代理端完成连接、发送、订阅、事务、响应的整个操作过程。这些命令/返回是:
CONNECT/STOMP命令: 客户端通过使用CONNECT命令,连接到Stomp代理端。如果使用STOMP命令,那么Stomp代理端的版本必须是1.2。
CONNECTED信息:当Stomp代理端收到客户端发送来的Connect命令并且处理成功后,将向这个客户端返回CONNECTED状态信息;如果这个过程中出现任何问题,还可能返回ERROR信息
SEND 发送命令:客户端使用SEND命令,向某个指定位置(代理端上的一个虚拟路径)发送内容。这样在这个路径上订阅了消息事件的其它客户端,将能够收到这个消息。
SUBSCRIBE 订阅命令:客户端使用SUBSCRIBE订阅命令,向Stomp服务代理订阅某一个虚拟路径上的监听。这样当其它客户端使用SEND命令发送内容到这个路径上时,这个客户端就可以收到这个消息。在使用SUBSCRIBE时,有一个重要的ACK属性。这个ACK属性说明了Stomp服务代理端发送给这个客户端的消息是否需要收到一个ACK命令,才认为这个消息处理成功了。如下所示:
SUBSCRIBE id:XXXXXXXXX destination:/test ack:client ^@
以上SUBSCRIBE命令信息说明,客户端订阅的虚拟位置是test。且命令信息中ack属性为client,说明当客户端收到消息时,必须向代理端发送ack命令,代理端才认为这个消息处理成功了(ack的值只有三种:auto(默认)、client和client-individual)。
UNSUBSCRIBE 退订命令:客户端使用这个命令,取消对某个路径上消息事件的监听。如果客户端给出的路径之前就没有被这个客户端订阅,那么这个命令执行无效。
MESSAGE 信息:当客户端在某个订阅的位置收到消息时,这个消息将通过MESSAGE关键字进行描述。类似以下信息就是从代理端拿到的消息描述:
MESSAGE redelivered:true message-id:ID:localhost-34450-1457321490460-4:24:-1:1:1 destination:/test timestamp:1457331607873 expires:0 priority:4 2345431457331607861
BEGIN 开始事务命令: Stomp协议支持事务模式,在这种模式下,使用Send命令从某个客户端发出的消息,在没有使用COMMIT正式提交前,这些消息是不会真正发送给Stomp代理端的。BEGIN命令就是用于开启事务。注意,一个事务中可以有一条消息,也可以有多条消息。
COMMIT 提交命令: 当完成事务中的信息定义后,使用该命令提交事务。只有使用COMMIT命令后,在某一个事务中的一条或者多条消息才会进入Stomp代理端的队列(订阅了事件的其它客户端才能收到这些消息)。
ABORT 取消/终止事务命令:很明显,这个命令用于取消/终止当前还没有执行COMMIT命令的事务。
ACK 确认命令:当客户端使用SUBSCRIBE命令进行订阅时,如果在SUBSCRIBE命令中制定ack属性为client,那么这个客户端在收到某条消息(id为XXXX)后,必须向Stomp代理端发送ACK命令,这样代理端才会认为消息处理成功了;如果Stomp客户端在断开连接之前都没有发送ACK命令,那么Stomp代理端将在这个客户端断开连接后,将这条消息发送给其它客户端。
ACK id:MESSAGE ID ^@
请注意head部分的id属性,传递的id属性是之前收到的MESSAGE信息的id标示。
NACK 不确认命令:同样是以上的SUBSCRIBE命令的状态下,如果这时Stomp客户端向Stomp代理端发送NACK信息,证明这条消息在这个客户端处理失败。Stomp代理端将会把这条消息发送给另一个客户端(无论当前的客户端是否断开连接)。
DISCONNECT 断开命令:这个命令将断开Stomp客户端与Stomp代理端的连接。
(接下文)
-
AIDL进程间通讯demo
2017-09-23 17:42:00AIDL进程间通讯demo,A [android] I [Interface] D [Definition] L [Language],Android接口定义语言。 作用:方便系统为我们生成代码从而实现跨进程通讯,仅此而已。(玉刚老师如是说也),也就是说这个AIDL就只是一... -
不同系统应用间的通信方式
2019-06-19 12:04:04此时的系统完全是一个孤立的系统,并没有和其他系统进行交互。 那我们考虑如下一个场景,后端业务系统A不光是web系统需要,Android,IOS或者一些其他的业务系统也要用到这些业务怎么办? 当然不可能再开发新的框架使... -
行业分类-设备装置-应用在过程监测控制系统与仿真支撑系统间的实时数据通讯系统及方法.zip
2021-09-05 06:09:14行业分类-设备装置-应用在过程监测控制系统与仿真支撑系统间的实时数据通讯系统及方法.zip -
linux进程间通讯,Android系统各种进程间通信机制
2017-03-10 15:23:09一、linux进程间通讯实现方式有很多种。包含管道、信号、共享内存、套接字 而 管道、信号、共享内存 这三种方式只适用于同一台设备间进程间通讯。套接字可以跨设备进行进程间通讯 1.管道机制 管道机制是... -
行业分类-设备装置-使用多服务器平台实现服务器间通讯的系统和方法.zip
2021-09-12 01:30:07行业分类-设备装置-使用多服务器平台实现服务器间通讯的系统和方法 -
嵌入式操作系统---核间通讯之RingBuf
2018-04-07 10:44:41(1)多个核进行核间通讯时,首先由一个核创建一个核间对象,另外一个核通过名称或索引定位到该对象的句柄,从而对核间通讯对象进行操作。(2)核间中断来通知对方,采用“硬件信号量”对资源进行临界保护,再利用... -
浅谈欧陆直流调速器与PROFIBUS网络的通讯
2021-01-20 02:08:15也就是将传感器、各种操作终端和控制器间的通讯及控制器之间的通讯进行特化的网络。原来这些机器间的主体配线是ON/OFF、接点信号和模拟信号,通过通讯的数字化,使时间分割、多重化、多点化成为可能,从而实现高性能... -
操作系统实验--进程间通信
2020-09-27 21:00:28学生独立设计并实现消息传递系统进程间交换数据的任务。 三、实验原理和设计思路 在Linux系统下用signal()函数进行进程间通信,pipe()函数创建管道进行数据传送。 四、源代码 1.软中断通信 #include<unistd.h>... -
进程间通讯的7种方式
2019-04-26 14:23:24管道pipe:管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。 命名管道FIFO:有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间... -
进程间通讯的四种方式
2018-09-30 14:47:21文章目录共享内存信号管道消息队列 通信方法 无法介于内核态与用户态的原因 ... 共享内存是最快的进程间通讯的方式 原因:相对于其他几种方式,共享内存直接在进程的虚拟地址空间进行操作,不再... -
《行情发布系统》通讯协议设计说明书
2014-08-29 12:57:29《行情发布系统》通讯协议设计说明书本文档用于描述系统间通讯层的数据传输规范。 1. 传输内容包含实时的消息和数据; 2. 建议的错误处理和恢复过程; 3. 建议的业务数据包处理方法; 该文档面向各应用系统的设计... -
S7-1500系统间的通信实例
2014-07-21 14:35:55本文介绍 S7-1500 基于工业以太网的S7通信的组态步骤,用于实现两个独立 S7-1500 项目之间的S7 通信。 -
教你用纯Java实现一个即时通讯系统(附源码)
2021-05-14 00:40:59项目背景和各位读者大致介绍下具体场景,线上的小程序中开放一些语音麦克风的房间,让用户进入房间之后可以互相通过语音聊天的方式进行互动。这里分享一下相关的技术设计方案。这款系统的核心点设计在于...