精华内容
下载资源
问答
  • 最近由于是实验室项目需要,在linux虚拟机中下载安装了Kafka,实现了单机kafka运行,并实现了java到kafka的数据传输,后续将继续打通kafka-flume-hdfs的数据链路,并将单机kafka扩展为kafka集群。目前记录一些对于...

        最近由于是实验室项目需要,在linux虚拟机中下载安装了Kafka,实现了单机kafka运行,并实现了java到kafka的数据传输,后续将继续打通kafka-flume-hdfs的数据链路,并将单机kafka扩展为kafka集群。目前记录一些对于kafka的操作方法,用于后续参考。
        附一篇对kafka基础知识介绍非常详细的博客
        Kafka为什么那么快?

    1. 安装与配置

        kafka是基于scala语言开发,所以需要java运行环境,下载前请先确认是否已经安装并配置java环境,如果没有的话需要先安装JAVA1.8并完成配置。安装指南可参考本篇博客。
        笔者是通过windows系统下载,然后利用WinSCP将文件传输到自己的linux服务器中。
        解压:

    tar -xzf kafka_2.11-0.9.0.0.tgz
    

        解压后可以采用下述指令更改文件名称,从而方面后续的操作:

    rm kafka_2.11-0.9.0.0 kafka
    

    1.1 zookeeper配置

        其实这样就已经完成安装了,但是接下来需要对相关文件进行配置,才能保证kafka的正常运行。由于kafka的运行依赖于zookeeper的运行,不过kafka内自带的zookeeper服务,因此,我们只在已有的安装包里进行配置即可。
        进入kafka文件夹内的config文件夹,并打开zookeeper.properties进行相应的配置。

    cd config
    vim zookeeper.properties
    

        在这里主要是配置数据存放路径,从而方便后续的查看,免得后面一旦需要的时候无从找起,另外,需要注意的是,文件夹一定要自己创建,配置文件不会帮你自动生成相应的文件夹

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    # 
    #    http://www.apache.org/licenses/LICENSE-2.0
    # 
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # the directory where the snapshot is stored.
    dataDir=/usr/local/kafka/data/zk_data
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on the number of connections since this is a non-production config
    maxClientCnxns=0
    # Disable the adminserver by default to avoid port conflicts.
    # Set the port to something non-conflicting if choosing to enable this
    admin.enableServer=false
    # admin.serverPort=8080
    
    

        按i对文件进行插入删除操作,按esc退出操作过程,按:wq保存并退出。

    1.2 Kafka配置

        在同样的文件夹下,打开server.properties,同样进行相应的配置:

    vim server.priperties
    

        为了能够使远端顺利连接到我们的kafka服务器,Ip和Port肯定是要填写的,建议为了区分明显,ip都写成本机ip,而不要用localhost,变得搭建其他服务的时候混乱;需要配置advertised.listeners以及advertised.host.name,可以按照一下模板修改成自己的服务器ip;另外,日志文件的存放位置log.dirs也建议自己创建和修改,以备不时之需;zookeeper.connect也需要改成自己的zookeeper服务器ip,因为我使用的是kafka自带的服务,所以ip地址也是本机。

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    # see kafka.server.KafkaConfig for additional details and defaults
    
    ############################# Server Basics #############################
    
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    
    ############################# Socket Server Settings #############################
    
    # The address the socket server listens on. It will get the value returned from 
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    #   FORMAT:
    #     listeners = listener_name://host_name:port
    #   EXAMPLE:
    #     listeners = PLAINTEXT://your.host.name:9092
    
    advertised.listeners=PLAINTEXT://192.168.232.134:9092
    
    # Hostname and port the broker will advertise to producers and consumers. If not set, 
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    #advertised.listeners=PLAINTEXT://your.host.name:9092
    host.name=192.168.232.134
    port=9092
    advertised.host.name=192.168.232.134
    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    
    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    num.network.threads=3
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    
    ############################# Log Basics #############################
    
    # A comma separated list of directories under which to store log files
    log.dirs=/usr/local/kafka/data/ka_data
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    ############################# Internal Topic Settings  #############################
    # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    ############################# Log Flush Policy #############################
    
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
    
    ############################# Log Retention Policy #############################
    
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=192.168.232.134:2181/kafka
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=18000
    
    
    ############################# Group Coordinator Settings #############################
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    # The default value for this is 3 seconds.
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    group.initial.rebalance.delay.ms=0
    
    

        进行完以上操作,就基本配置完成了。注意,以上配置只是单机kafka服务器的配置,而非集群。另外,对于配置信息的介绍,可以通过阅读配置文件或者文章开头的博客去了解。

    2. 启动操作汇总

        启动Kafka之前,需要对zookeeper服务器进行启动,可以在kafka文件夹下进行如下操作进行启动:

    ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
    

        启动Kafka:

    ./bin/kafka-server-start.sh ./config/server.properties
    

        判断服务是否启动:

    ps -ef |prep kafka #(or zookeeper)
    

        创建topic:

    bin/kafka-topics.sh --create --zookeeper your.ip:2181 --replication-factor 1 --partitions 1 --topic your.topic.name 
    

        记得改成自己的zookeeper服务器ip以及自己想建立的topic名字。
        replication-factor : 复制数目,提供failover机制;1代表只在一个broker上有数据记录,一般值都大于1,代表一份数据会自动同步到其他的多个broker,防止某个broker宕机后数据丢失。    partitions : 一个topic可以被切分成多个partitions,一个消费者可以消费多个partitions,但一个partitions只能被一个消费者消费,所以增加partitions可以增加消费者的吞吐量。kafka只保证一个partitions内的消息是有序的,多个一个partitions之间的数据是无序的。
        查看已经创建的topic:

    bin/kafka-topics.sh --list --zookeeper your.ip:2181
    

        创建生产者和消费者:

    创建kafka生产者:
    bin/kafka-console-producer.sh --broker-list your.ip:9092 --topic your.topic.name
    
    创建kafka消费者:
    bin/kafka-console-consumer.sh --bootstrap-server your.ip:9092 --topic your.topic.name --from-beginning
    
    设置kafka消费组名:
    bin/kafka-console-consumer.sh --bootstrap-server your.ip:9092 --topic your.topic.name --from-beginning --consumer-property group.id=test-group
    

        相应地,如果想要关闭某项服务,也直接启动bin目录中对应地可执行文件即可。

    3. JAVA远程生产数据到Kafka

        接下来实现本地windows生产数据发送到Kafka的特定Topic中。在此之前需要确认一下操作:
    1.确认Kafka所在服务器防火墙已经关闭;

    查看防火墙状态:
    firewall-cmd --state
    如果是running,采用下述指令将其关闭:
    systemctl stop firewalld.service
    防火墙开机启动指令:
    systemctl enable firewalld.service
    防火墙开机关闭指令:
    systemctl disable firewalld.service
    再次查看防火墙状态,如果变成not running说明关闭成功
    

    2.Kafka配置文件中一定要增加listener进行端口监听,这样才能被JAVA进程找到。

    advertised.listeners=PLAINTEXT://192.168.232.134:9092
    advertised.host.name=192.168.232.134
    

        接下来,当zookeeper和kafka服务都开启之后,就可以用JAVA代码来对虚拟机内的Kafka进行远程的访问。

    package com.hpcdata.kafka;
    
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class ProducerDemo2 {
    
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "192.168.232.134:9092");
            properties.put("acks", "all");
            properties.put("retries", 0);
            properties.put("batch.size", 16384);
            properties.put("linger.ms", 1);
            properties.put("buffer.memory", 33554432);
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = null;
            try {
                producer = new KafkaProducer<String, String>(properties);
                for (int i = 0; i < 100; i++) {
                    String msg = "This is Message " + i;
                    producer.send(new ProducerRecord<String, String>("hpcdata", msg));
                    //producer.flush();
                    System.out.println("Sent:" + msg);
                }
            } catch (Exception e) {
                e.printStackTrace();
    
            } finally {
                producer.close();
            }
        }
    }
    

        在Kafka服务器中开启一个消费者,可以看到它对传输过来的数据进行了消费。
    在这里插入图片描述

    4. 开机自启设置

        笔者是按本篇博客进行的配置,亲测可以实现,具体操作详见该博客:
    kafka开机自启操作指南

    展开全文
  • Java远程监控ActiveMQ消息队列

    千次阅读 2019-08-19 23:24:10
    因为公司的产品架构有用到ActiveMQ消息队列,在之前的压力测试上有发现ActiveMQ存在消费不过来的情况,这里记录下在Linux系统下如何实现通过Java对ActiveMQ消息队列的监控。 Java是通过配置JMX来监控ActiveMQ的消息...

    因为公司的产品架构有用到ActiveMQ消息队列,在之前的压力测试上有发现ActiveMQ存在消费不过来的情况,这里记录下在Linux系统下如何实现通过Java对ActiveMQ消息队列的监控。
    在这里插入图片描述
    Java是通过配置JMX来监控ActiveMQ的消息队列的,这里用到消息队列工具类,放下实现类:

    import java.io.IOException;
    import java.util.PriorityQueue;
    
    import javax.management.MBeanServerConnection;
    import javax.management.MBeanServerInvocationHandler;
    import javax.management.MalformedObjectNameException;
    import javax.management.ObjectName;
    import javax.management.remote.JMXConnector;
    import javax.management.remote.JMXConnectorFactory;
    import javax.management.remote.JMXServiceURL;
    
    import org.apache.activemq.broker.jmx.BrokerViewMBean;
    import org.apache.activemq.broker.jmx.QueueViewMBean;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    import cn.itcast.ssm.po.MQQue;
    
    public class ActiveMqKit {
    
    	 
    	 	public static final String reportQueueName ="zc-queue-actual";//生成核对报告队列名
    	    private static Log log = LogFactory.getLog(ActiveMqKit.class);
    	    private static final String connectorPort = "11099";
    	    private static final String connectorPath = "/jmxrmi";
    	    private static final String jmxDomain = "org.apache.activemq";
    
    
    	    public static PriorityQueue<MQQue> getAllQueue(String ip)throws Exception {
    	        
    	        BrokerViewMBean mBean=null;
    	        MBeanServerConnection connection=null;
    	        PriorityQueue<MQQue> ques=new PriorityQueue<MQQue>();
    	        try{
    	            JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"+ip+":" + connectorPort + connectorPath);
    	            JMXConnector connector = JMXConnectorFactory.connect(url);
    	            connector.connect();
    	            connection = connector.getMBeanServerConnection();
    	            ObjectName name = new ObjectName(jmxDomain + ":brokerName=localhost,type=Broker");
    	            mBean = MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true);
    	        }catch (IOException e){
    	            log.error("ActiveMQUtil.getAllQueueSize",e);
    	        }catch (MalformedObjectNameException e){
    	            log.error("ActiveMQUtil.getAllQueueSize",e);
    	        }
    	        if(mBean!=null){
    	            for (ObjectName queueName : mBean.getQueues()) {
    	            	QueueViewMBean  queueMBean = MBeanServerInvocationHandler.newProxyInstance(connection, queueName, QueueViewMBean.class, true);
    	                MQQue que=new MQQue();
    	                que.setName(queueMBean.getName());
    	                que.setQueueSize(queueMBean.getQueueSize());
    	                que.setConsumerCount(queueMBean.getConsumerCount());
    	                que.setDequeueCount(queueMBean.getDequeueCount());
    	                if(ques.size()<=2) {	                
    	                ques.add(que);
    	                }else {
    	                	if(que.getQueueSize()>ques.peek().getQueueSize()) {
    	                		ques.poll();
    	                		ques.add(que);
    	                	}
    	                }
    	                //System.out.println("Queue Name --- " + queueMBean.getName());// 消息队列名称
    	               // System.out.println("Queue Size --- " + queueMBean.getQueueSize());// 队列中剩余的消息数
    	                //System.out.println("Number of Consumers --- " + queueMBean.getConsumerCount());// 消费者数
    	               // System.out.println("Number of Dequeue ---" + queueMBean.getDequeueCount());// 出队数
    	            }
    	        }
    
    	        return ques;
    	    }      
    }
    

    这里有几点说下:
    1.mq默认的后台端口号是61616,很多博客上也有写连接端口号是1093的,其实不对,应该是11099,当然可能1093是用于监控Windows系统下的mq的,一开始我照搬一些博客端口号是用1093,结果运行一直报拒绝连接的错误,不明所以,直到用了11099才豁然开朗。
    2.mq的消费队列可能有很多,在做性能测试的时候可能重点关注的是发现消费不过来的队列(就是队列中剩余消费数特别多的队列),我在获取到每个队列的信息后用一个MQQue 类进行包装,设置一个大小为2的最小堆,以队列中剩余的消息数为比较找出剩余消费数最多的TOP2队列(消费不过来的队列)。
    MQQue类实现如下:

    public class MQQue implements Comparable<MQQue> {
    
    	private String name;// 消息队列名称
    	private long queueSize;// 队列中剩余的消息数
    	private long consumerCount;// 消费者数
    	private long dequeueCount;// 出队数
    	public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    	public long getQueueSize() {
    		return queueSize;
    	}
    	public void setQueueSize(long queueSize) {
    		this.queueSize = queueSize;
    	}
    	public long getConsumerCount() {
    		return consumerCount;
    	}
    	public void setConsumerCount(long consumerCount) {
    		this.consumerCount = consumerCount;
    	}
    	public long getDequeueCount() {
    		return dequeueCount;
    	}
    	public void setDequeueCount(long dequeueCount) {
    		this.dequeueCount = dequeueCount;
    	}
    	@Override
    	public int compareTo(MQQue o1) {
    		// TODO Auto-generated method stub
    		long res=this.getQueueSize()-o1.getQueueSize();
    		if(res>0) return 1;
    		else if(res<0) return -1;
    		else return 0;
    	}
    	
    	
    }
    

    这样代码就算写完了,但是你以为这样就完事了那真是too young too simple,这样运行程序还是会报错的,在Linux系统下还需要对ActiveMQ修改相应的配置才能开启jmx监控。

    1.首先修改/etc目录下的hosts文件:
    127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
    ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
    将 127.0.0.1修改为 ActiveMQ 所在服务器的ip:
    1.32.160.240 localhost localhost.localdomain localhost4 localhost4.localdomain4
    ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

    2.在activemq文件夹下的conf目录中,找到activemq.xml,在broker节点增加useJmx="true"属性:
    在这里插入图片描述
    3.继续修改activemq.xml的managementContext 节点:

    	<managementContext>   
    	<managementContext createConnector="true" connectorPort="11099" />
    	</managementContext> 
    

    这里的connectorPort="11099"要加上,否则会出现java.lang.RuntimeException: java.rmi.ConnectException: Connection refused to host: 127.0.0.1的错误。

    4.在mq的bin 目录的 activemq 文件最后一行加上如下配置:

     ACTIVEMQ_SUNJMX_START="-Dcom.sun.management.jmxremote.port=11099 "
     ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.password.file=${ACTIVEMQ_CONF}/jmx.password"
     ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.access.file=${ACTIVEMQ_CONF}/jmx.access"
     ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.ssl=false"
    

    5.修改conf目录下jmx.password和jmx.access 文件的权限:
    chmod 400 …/conf/jmx.*
    这两个文件权限必须是当前用户只读(也就是400)否则会使得activemq无法启动,而且没有任何地方报错。

    6.重启mq:
    systemctl stop mq;
    systemctl start mq;

    当环境配置好后再运行前面写的代码,就可以获取到mq的消费者数、剩余消费数、已消费数、队列名称等信息了,实现对ActiveMQ的监控。

    展开全文
  • java远程调用

    2020-12-02 16:23:56
     Java远程方法调用(RMI)提供了Java程序语言的远程通讯功能,这种特性使客户机上运行的程序可以调用远程服务器上的对象,使Java编程人员能够在网络环境中分布操作。  创建一个简单的Java分布式远程方法调用程序可以...
    Java 远程处理 
       Java远程方法调用(RMI)提供了Java程序语言的远程通讯功能,这种特性使客户机上运行的程序可以调用远程服务器上的对象,使Java编程人员能够在网络环境中分布操作。 
       创建一个简单的Java分布式远程方法调用程序可以按以下几个步骤操作, 
       
    一、定义远程接口: 
       在 Java 中,远程对象是实现远程接口的类的实例, 远程接口声明每个要远程调用的方法。在需要创建一个远程对象的时候,我们通过传递一个接口来隐藏基层的实施细节,客户通过接口句柄发送消息即可。 
     远程接口具有如下特点: 
     1) 远程接口必须为public属性。如果不这样,除非客户端与远程接口在同一个包内,否则 当试图装入实现该远程接口的远程对象时,调用会得到错误结果。 
     2) 远程接口必须扩展接口java.rmi.Remote。 
     3) 除与应用程序本身特定的例外之外,远程接口中的每个方法都必须在自己的throws从句中 声明java.rmi.RemoteException。(或 RemoteException 的父类)。 
     4) 作为参数或返回值传递的一个远程对象(不管是直接,还是本地对象中嵌入)必须声明为远 程接口,而不应声明为实施类。 
    
    下面是远程接口的定义
    package test;  
    import java.rmi.Remote;  
    import java.rmi.RemoteException;  
    import java.math.BigInteger;  
    
    public interface Fib extends Remote {  
        public int getFib(int n) throws RemoteException;  
    //    public BigInteger getFib(BigInteger n) throws RemoteException;  
    }  
    二、实现远程接口: 
       远程对象实现类必须扩展远程对象java.rmi.UnicastRemoteObject类,并实现所定义的远程接口。远程对象的实现类中包含实现每个远程接口所指定的远程方法的代码。这个类也可以含有附加的方法,但客户只能使用远程接口中的方法。因为客户是指向接口的一个句柄,而不是它的哪个类。必须为远程对象定义构造函数,即使只准备定义一个默认构造函数,用它调用基础类构造函数。因为基础类构造函数可能会抛出 java.rmi.RemoteException,所以即使别无它用必须抛出java.rmi.RemoteException例外。 
       以下是远程对象实现类的声明:
    package test;  
    import java.math.BigInteger;  
    import java.rmi.*;  
    import java.rmi.server.UnicastRemoteObject;  
    
    public class FibImp extends UnicastRemoteObject implements Fib {  
        public FibImp() throws RemoteException {  
            super();  
        }  
    
    public int getFib(int n) throws RemoteException {  
            return n+2;  
        }  
    
    }  
    三、编写服务器类: 
       包含 main 方法的类可以是实现类自身,也可以完全是另一个类。下面通过RmiSampleServer 来创建一个远程对象的实例,并通过java.rmi.registry.LocateRegistry类的createRegistry 方法从指定端口号启动注册服务程序,也可以通过执行 rmiregistry 命令启动注册服务程序,注册服务程序的缺省运行端口为 1099。必须将远程对象名字绑定到对远程对象的引用上: Naming.rebind("//localhost:8808/SAMPLE-SERVER" , Server); 
       以下是服务器类的声明:
    
    package test;  
    import java.net.MalformedURLException;  
    import java.rmi.Naming;  
    import java.rmi.RemoteException;  
    import java.rmi.registry.LocateRegistry;   
    public class FibonacciServer {  
        /** 
         * @param args 
         */  
        public static void main(String[] args) {  
            try {  
                LocateRegistry.createRegistry(8804);    
                FibImp f = new FibImp();  
    
                // 注册到 registry 中  
                Naming.rebind("//localhost:8804/SAMPLE-SERVER", f);  
                System.out.println("fib server ready");  
    
            } catch (RemoteException re) {  
                System.out.println("Exception in FibonacciImpl.main: " + re);  
            } catch (MalformedURLException e) {  
                System.out.println("MalformedURLException " + e);  
            }  
        }  
    }  
     四、编写使用远程服务的客户机类:
       客户机类的主要功能有两个,一是通过Naming.lookup方法来构造注册服务程序 stub 程序实例,二是调用服务器远程对象上的远程方法。 
       以下是客户端类的声明:
    package testClient;  
    
    import test.Fib;  
    import java.math.BigInteger;  
    import java.net.MalformedURLException;  
    import java.rmi.Naming;  
    import java.rmi.NotBoundException;  
    import java.rmi.RemoteException;  
    public class FibClient {  
        /** 
         * @param args 
         */  
        public static void main(String[] args) {  
            String url = "//localhost:8804/SAMPLE-SERVER";  
            try {  
    
                Fib calc = (Fib) Naming.lookup(url);  
                for (int i = 0; i < 10; ++i) {  
                    int f = calc.getFib(i);  
                    System.out.println(f);  
                }  
            } catch (MalformedURLException e) {  
                e.printStackTrace();  
            } catch (RemoteException e) {  
                e.printStackTrace();  
            } catch (NotBoundException e) {  
                e.printStackTrace();  
            }  
        }  
    }

     

    展开全文
  • 该篇文章将向读者展示,怎样使用java去操作MSMQ(Windows自带的一个消息队列)。分为发送和接收消息,在操作MSMQ时,我们需要下载MsmqJava文件,该文件包含一个jar包和一个dll文件。下面将逐步教您怎样编写一个MSMQ...

    该篇文章将向读者展示,怎样使用java去操作MSMQ(Windows自带的一个消息队列)。分为发送和接收消息,在操作MSMQ时,我们需要下载MsmqJava文件,该文件包含一个jar包和一个dll文件。下面将逐步教您怎样编写一个MSMQ程序。

    第一步:开启MSMQ

    Windows默认是没有开启MSMQ功能的,需要我们自己手动去开启,开启方法如下:

    注意:点击“启动或关闭Windows功能”,然后在弹出的选择框中找到MSMQ即可(这就不详细说了,相信您能办到)。

    第二步:创建一个测试队列

    右击“我的电脑”点击管理。在管理中找到“服务和应用程序”项,点击“消息队列”。然后右击“专用队列”,选择“新建”/“专用队列”。

    第三步:设置新建专用队列权限

    在“专用队列”中,找到刚才创建的新队列,右击选择“属性”。在属性框中,选择“安全”页,设置“Everyone”权限为完全控制。

    第四步:下载MsmqJava

    打开浏览器,输入“http://msmqjava.codeplex.com/”,然后下载MsmqJava包,然后解压。

    Java利用MsmqJava操作MSMQ消息队列

    该项目使用Eclipse进行开发,jdk1.6,同时引入了日志框架(Slf4j、log4j)、MsmqJava.dll、MsmqJava.jar。

    基础代码(BaseMessage.java)

    package com.huangx;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    /**

    * 基础类

    *

    * @author Administrator

    * @date 2017-06-23 22:01:27

    */

    public abstract class BaseMessage {

    private static final Logger LOG = LoggerFactory.getLogger(BaseMessage.class);

    /**

    * 队列名称。其中:192.32.12.76为本机的IP地址(经测试不能使用127.0.0.1,不然会报错)

    * private$\\javaTest是队列名字

    */

    protected static final String QUEUE_NAME = "direct=tcp:192.168.1.115\\private$\\javaTest";

    static {

    try {

    LOG.debug("开始加载DLL文件...");

    System.loadLibrary("MsmqJava");

    LOG.debug("加载DLL文件成功");

    } catch (Exception e) {

    LOG.error("加载DLL失败,原因:" + e.getMessage(), e);

    }

    }

    /**

    * 执行业务逻辑,由具体业务实现

    */

    public abstract void execute();

    }

    log4j.properties

    log4j.rootLogger=debug,Console

    log4j.logger.org.apache.zookeeper=warn,Console

    log4j.appender.Console=org.apache.log4j.ConsoleAppender

    log4j.appender.Console.layout=org.apache.log4j.PatternLayout

    log4j.appender.Console.layout.ConversionPattern=%d %-5p [%c] - %m%n

    Java发送代码(SendMessage.java)

    package com.huangx;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import ionic.Msmq.Message;

    import ionic.Msmq.Queue;

    /**

    * 发送消息到MSMQ队列

    *

    * @author Administrator

    * @date 2017-06-23 21:38:11

    */

    public class SendMessage extends BaseMessage {

    private static final Logger LOG = LoggerFactory.getLogger(SendMessage.class);

    @Override

    public void execute() {

    try {

    LOG.debug("开始发送消息...");

    // 创建队列

    Queue queue = new Queue(QUEUE_NAME);

    // 创建消息

    String label = "testMessage";

    String body = "Hello, World!";

    byte[] correlationId = { 0, 2, 4, 6, 8, 9 };

    Message msg = new Message(body, label, correlationId);

    // 发送消息

    LOG.debug("发送消息[label={}, body={}]", msg.getLabel(), msg.getBodyAsString());

    queue.send(msg);

    LOG.debug("发送消息成功^_^");

    } catch (Exception e) {

    LOG.error("发送消息失败!原因:" + e.getMessage(), e);

    }

    }

    public static void main(String[] args) {

    new SendMessage().execute();

    }

    }

    编写Java接收代码(ReceiveMessage.java)

    package com.huangx;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import ionic.Msmq.Message;

    import ionic.Msmq.Queue;

    /**

    * 接收消息

    *

    * @author Administrator

    * @date 2017-06-23 22:19:49

    */

    public class ReceiveMessage extends BaseMessage {

    private static final Logger LOG = LoggerFactory.getLogger(ReceiveMessage.class);

    @Override

    public void execute() {

    try {

    // 创建队列

    Queue queue = new Queue(QUEUE_NAME);

    // 接收消息

    Message message = queue.receive();

    LOG.debug("接收到消息[label={}, body={}]",

    message.getLabel(), message.getBodyAsString());

    } catch (Exception e) {

    LOG.debug(e.getMessage(), e);

    }

    }

    public static void main(String[] args) {

    new ReceiveMessage().execute();

    }

    }

    展开全文
  • 资源下载地址:见我博客中分享的JAVA远程发送MSMQ资源 三、步骤: 1.将MsmqJava.jar文件引用到项目中,将其余三个文件放到jre/bin目录下 2.具体发送方法如下: public void send() { String l...
  • 1 相关知识介绍1.1 SMBMicrosoft网络配置中主要采用SMB形式实现文件共享和打印服务,SMB(服务器消息块)是一种客户端/服务器文件共享协议。IBM于20世纪80年代末期开发了服务器信息块(SMB),用于规范共享网络资源(如...
  • 实现远程消息安全传递 IBM MQ java jar包 aspectjrt.jar com.ibm.mq.fta.jar com.ibm.mq.jar com.ibm.mq.jms.Nojndi.jar com.ibm.mq.soap.jar com.ibm.mqetclient.jar com.ibm.mqjms.jar
  • 1.RMI 目前使用Java远程消息交 换协议JRMP(Java Remote Messaging Protocol)进行通信。JRMP是专为Java的远程对象制定的协议。因此,Java RMI具有Java的“Write Once,Run Anywhere”的优点,是分布式应用系统的百...
  • java 远程监控

    2008-04-28 10:00:27
    这是一个用java写的远程监控原代码,支持多客户同时连接,将com包放到你的开发环境放原代码...目前主要功能:监视远程屏幕(还没做控制功能)、查看远程电脑的文件并下载、给目标发消息。使用时请阅读com包下的使用说明
  • 本案例使用 Java 实现了远程控制其他电脑的关机,重启,注销,控制鼠标失效,远程打开计算器,打开浏览器及指定网站的功能。内容大纲什么是远程控制?实现远程控制的思路?什么是 UDP 和 TCP ?发送消息工具类封装。...
  • 我做了一些类似于这个几年的东西。前。但在我的情况下,在同一局域网中的服务器和PC...所以,如果您的应用服务器收到消息后,你只需要做类似于上面的链接显示的代码的东西:DocFlavor flavor = DocFlavor.INPUT_STRE...
  • } catch (java.io.IOException e) { e.printStackTrace(); System.out .println("An error occurred whilst to the message buffer " + e); } } /** * 从消息队列取数据 */ public void GetMsg...
  • } //前台收到消息 -(void)application:(UIApplication *)application didReceiveRemoteNotification:(NSDictionary *)userInfo{ // 处理推送消息 NSLog(@"userinfo:%@",userInfo); NSLog(@"收到推送消息:%@",[...
  • Java.RMI远程方法调用

    2017-02-16 11:56:07
    RMI(Remote Method Invocation,远程方法调用)是Java进行调用远程对象的一种技术,基于Java远程消息交换协议JRMP(Java Remote Messaging Protocol)进行通信。 形象来说,就是你把服务端的对象写好、绑定到某个...
  • } //前台收到消息 -(void)application:(UIApplication *)application didReceiveRemoteNotification:(NSDictionary *)userInfo{ // 处理推送消息 NSLog(@"userinfo:%@",userInfo); NSLog(@"收到推送消息:%@",[...
  • 3.用certificate和PushChatKey.pem创建apns_developer_identity.p12文件(java后台配置用到的),需要输入密语:abc123 openssl pkcs12 -export -in PushChatCert.pem -inkey PushChatKey.pem -certfile Certificate...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,367
精华内容 546
关键字:

java远程消息

java 订阅