精华内容
参与话题
问答
  • Kafka基础-生产者发送消息

    万次阅读 2018-06-20 15:12:13
    下文介绍如何使用Java来发送消息到Kafka。1. 发送消息的主要步骤 首先创建ProducerRecord对象,此对象除了包括需要发送的数据value之外还必须指定topic,另外也可以指定key和分区。当发送ProducerRecord的时候,...

    无论你是使用Kafka作为队列,消息总线还是数据存储平台,你都会用到生产者,用于发送数据到Kafka。下文介绍如何使用Java来发送消息到Kafka。

    1. 发送消息的主要步骤

    • 首先创建ProducerRecord对象,此对象除了包括需要发送的数据value之外还必须指定topic,另外也可以指定key和分区。当发送ProducerRecord的时候,生产者做的第一件事就是把key和value序列化为ByteArrays,以便它们可以通过网络发送。
    • 接下来,数据会被发送到分区器。如果在ProducerRecord中指定了一个分区,那么分区器会直接返回指定的分区;否则,分区器通常会基于ProducerRecord的key值计算出一个分区。一旦分区被确定,生产者就知道数据会被发送到哪个topic和分区。然后数据会被添加到同一批发送到相同topic和分区的数据里面,一个单独的线程会负责把那些批数据发送到对应的brokers。
    • 当broker接收到数据的时候,如果数据已被成功写入到Kafka,会返回一个包含topic、分区和偏移量offset的RecordMetadata对象;如果broker写入数据失败,会返回一个异常信息给生产者。当生产者接收到异常信息时会尝试重新发送数据,如果尝试失败则抛出异常。

    2. 创建生产者

    发送数据到Kafka的第一步是创建一个生产者,必须指定以下三个属性:

    • bootstrap.servers:生产者用于与Kafka集群建立初始连接的主机和端口的列表。该列表不需要包括所有的brokers信息,因为生产者在建立连接后能够获取所有brokers的信息。但建议至少包含两个,防止一个broker宕机,生产者仍然能够通过另外一个broker连接到群集。
    • key.serializer:用于序列化keys的类名。Kafka brokers期待key和value的类型为byte数组,但是也允许使用参数化的Java对象作为key和value。这使得代码非常易读,但也意味着生产者必须知道如何把这些对象转换为byte数组。key.serializer应设为实现了org.apache.kafka.common.serialization.Serializer接口的类名,生产者将会使用这个类来把key对象序列化为byte数组。Kafka内置实现了ByteArraySerializer、StringSerializer和IntegerSerializer。注意,即使生产者发送的数据没有指定key,也必须设置key.serializer这个属性。
    • value.serializer:用于序列化value的类名。类似于key.serializer,生产者将会使用指定的类来把value对象序列化为byte数组。

    下面是创建生产者的代码示例:

    Properties kafkaProps = new Properties();
    kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producer = new KafkaProducer<String, String>(kafkaProps);

    3. 发送消息

    发送消息主要有以下三种方法:

    3.1 Fire-and-forget

    发送消息后不需要关心是否发送成功。因为Kafka是高可用的,而且生产者会自动重新发送,所以大多数情况都会成功,但是有时也会失败。

    下面是代码示例:

    ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry",
    		"Precision Products", "France");
    try {
    	producer.send(record);
    } catch (Exception e) {
    	e.printStackTrace();
    }

    ProducerRecord有多个构造器,这里使用了三个参数的,topic、key、value。

    在发送消息之前有可能会发生异常,例如是序列化消息失败的SerializationException、缓冲区满的BufferExhaustedException、发送超时的TimeoutException或者发送的线程被中断的InterruptException。

    3.2 Synchronous send

    同步发送,调用send()方法后返回一个Future对象,再调用get()方法会等待直到结果返回,根据返回的结果可以判断是否发送成功。

    简单的使用下面的代码替换上面try里面的一行代码:

    producer.send(record).get();

    在调用send()方法后再调用get()方法等待结果返回。如果发送失败会抛出异常,如果发送成功会返回一个RecordMetadata对象,然后可以调用offset()方法获取该消息在当前分区的偏移量。

    KafkaProducer有两种类型的异常,第一种是可以重试的Retriable,该类异常可以通过重新发送消息解决。例如是连接异常后重新连接、“no leader”异常后重新选取新的leader。KafkaProducer可以配置为遇到该类异常后自动重新发送消息直到超过重试次数。第二类是不可重试的,例如是“message size too large”(消息太大),该类异常会马上返回错误。

    3.3 Asynchronous send

    异步发送,在调用send()方法的时候指定一个callback函数,当broker接收到返回的时候,该callback函数会被触发执行。

    class DemoProducerCallback implements Callback {
    	@Override
    	public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    		if (e != null) {
    			e.printStackTrace();
    		}
    	}
    }
    
    producer.send(record, new DemoProducerCallback());

    要使用callback函数,先要实现org.apache.kafka.clients.producer.Callback接口,该接口只有一个onCompletion方法。如果发送异常,onCompletion的参数Exception e会为非空。

    4. 生产者配置属性

    生产者有很多配置属性,除了上述的三个之外,下面是一些比较重要的属性:

    4.1 acks

    此配置设置在生产者可以认为发送请求完成之前,有多少分区副本必须接收到数据。此选项对消息可能丢失的可能性有重大影响,此配置有三个允许的值,默认为1:

    • acks=0,生产者不会等待broker的任何确认,消息会被立即添加到缓冲区并被认为已经发送。在这种情况下,不能保证服务器已经收到消息,并且重试配置不会生效(因为客户端通常不会知道任何异常),每条消息返回的偏移量始终设置为-1。由于生产者不等待broker的任何确认,因此它可以以网络支持的最快速度发送消息,所以这个配置适用于实现非常高的吞吐量。
    • acks=1,在leader服务器的副本收到消息的同一时间,生产者会接收到broker的确认。如果消息不能写入leader的副本(例如,如果leader宕机并且还没有选出新的leader),生产者将接收到异常响应,然后可以重新发送消息,避免丢失数据。如果leader宕机并且消息没有被写入到新的leader(通过不确定的leader选举),该消息仍然会丢失。在这种情况下,吞吐量取决于消息是同步还是异步发送。如果我们的客户端等待服务器的回复(通过上述的调用发送消息时返回的Future对象的get()方法),它明显会显著地增加延迟(至少通过网络往返)。如果客户端使用callback,则延迟不会那么明显,但吞吐量将受到正在发送消息数量的限制(例如,在接收到响应之前生产者将会发送多少消息)。
    • acks=all(或-1),一旦所有的同步副本接收到消息,生产者才会接收到broker的确认。这是最安全的模式,因为可以确保多于一个的broker接收到该消息,即使在宕机的情况下,该消息也能被保存。然而,延迟性会比acks=1的时候更高,因为需要等待所有broker接收到消息。

    4.2 buffer.memory

    此配置设置生产者可用于缓冲等待发送给brokers消息的总内存字节数,默认为33554432=32MB。如果消息发送到缓存区的速度比发送到broker的速度快,那么生产者会被阻塞(根据max.block.ms配置的时间,默认为60000ms=1分钟,在0.9.0.0版本之前使用block.on.buffer.full配置),之后会抛出异常。

    4.3 compression.type

    生产者对生成的所有数据使用的压缩类型,默认值是none(即不压缩),有效值为none,gzip,snappy或lz4。Snappy压缩技术是Google开发的,它可以在提供较好的压缩比的同时,减少对CPU的使用率并保证好的性能,所以建议在同时考虑性能和带宽的情况下使用。Gzip压缩技术通常会使用更多的CPU和时间,但会产生更好的压缩比,所以建议在网络带宽更受限制的情况下使用。通过启用压缩功能,可以减少网络利用率和存储空间,这往往是向Kafka发送消息的瓶颈。

    4.4 retries

    默认值为0,当设置为大于零的值,客户端会重新发送任何发送失败的消息。注意,此重试与客户端收到错误时重新发送消息是没有区别的。在配置max.in.flight.requests.per.connection不等于1的情况下,允许重试可能会改变消息的顺序,因为如果两个批次的消息被发送到同一个分区,第一批消息发送失败但第二批成功,而第一批消息会被重新发送,则第二批消息会先被写入。

    4.5 batch.size

    当多个消息被发送到同一个分区时,生产者会把它们一起处理。此配置设置用于每批处理使用的内存字节数,默认为16384=16KB。当使用的内存满的时候,生产者会发送当前批次的所有消息。但是,这并不意味着生产者会一直等待使用的内存变满,根据下面linger.ms配置的时间也会触发消息发送。设置较小的值会增加发送的频率,从而可能会减少吞吐量;设置较大的值会使用较多的内存,设置为0会关闭批处理的功能。

    4.6 linger.ms

    此配置设置在发送当前批次消息之前等待新消息的时间量,默认值为0。KafkaProducer会在当前批次使用的内存已满或等待时间到达linger.ms配置时间的时候发送消息。当linger.ms>0时,延时性会增加,但会提高吞吐量,因为会减少消息发送频率。

    4.7 client.id

    用于标识发送消息的客户端,通常用于日志和性能指标以及配额。

    4.8 max.in.flight.requests.per.connection

    此配置设置客户端在单个连接上能够发送的未确认请求的最大数量,默认为5,超过此数量会造成阻塞。设置大的值可以提高吞吐量但会增加内存使用,但是需要注意的是,当设置值大于1而且发送失败时,如果启用了重试配置,有可能会改变消息的顺序。设置为1时,即使重新发送消息,也可以保证发送的顺序和写入的顺序一致。

    4.9 request.timeout.ms

    此配置设置客户端等待请求响应的最长时间,默认为30000ms=30秒,如果在这个时间内没有收到响应,客户端将重发请求,如果超过重试次数将抛异常。此配置应该比replica.lag.time.max.ms(broker配置,默认10秒)大,以减少由于生产者不必要的重试造成消息重复的可能性。

    4.10 max.block.ms

    当发送缓冲区已满或者元数据不可用时,生产者调用send()和partitionsFor()方法会被阻塞,默认阻塞时间为60000ms=1分钟。由于使用用户自定义的序列化器和分区器造成的阻塞将不会计入此时间。

    4.11 max.request.size

    此配置设置生产者在单个请求中能够发送的最大字节数,默认为1048576字节=1MB。例如,你可以发送单个大小为1MB的消息或者1000个大小为1KB的消息。注意,broker也有接收消息的大小限制,使用的配置是message.max.bytes=1000012字节(好奇怪的数字,约等于1MB)。

    4.12 receive.buffer.bytes和send.buffer.bytes

    • receive.buffer.bytes:读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小,默认值为32768字节=32KB。如果设置为-1,则将使用操作系统的默认值。
    • send.buffer.bytes:发送数据时使用的TCP发送缓冲区(SO_SNDBUF)的大小,默认值为131072字节=128KB。如果设置为-1,则将使用操作系统的默认值。

    5. 序列化器

    根据之前的代码示例,生产者配置必须指定序列化器。除了默认提供的序列化器之外还可以实现自定义的序列化器。

    5.1 自定义序列化器

    当发送给Kafka的消息不是简单的字符串或整数时,可以使用像JSON、Avro、Thrift或Protobuf这样的通用序列化库,也可以实现自定义的序列化库,强烈建议使用通用序列化库。以下是自定义序列化器的示例代码:

    创建一个简单的Customer类:

    public class Customer {
    	
    	private int customerID;
    	private String customerName;
    
    	public Customer(int ID, String name) {
    		this.customerID = ID;
    		this.customerName = name;
    	}
    
    	public int getID() {
    		return customerID;
    	}
    
    	public String getName() {
    		return customerName;
    	}
    }

    创建一个简单的序列化器:

    import java.nio.ByteBuffer;
    import java.util.Map;
    
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Serializer;
    
    public class CustomerSerializer implements Serializer<Customer> {
    
    	@Override
    	public void configure(Map<String, ?> configs, boolean isKey) {
    		// nothing to configure
    	}
    
    	@Override
    	/**
    	 * We are serializing Customer as:
    	 * 4 byte int representing customerId
    	 * 4 byte int representing length of customerName
    	 *     in UTF-8 bytes (0 if name is Null)
    	 * N bytes representing customerName in UTF-8
    	 */
    	public byte[] serialize(String topic, Customer data) {
    		try {
    			byte[] serializedName;
    			int stringSize;
    			if (data == null)
    				return null;
    			else {
    				if (data.getName() != null) {
    					serializedName = data.getName().getBytes("UTF-8");
    					stringSize = serializedName.length;
    				} else {
    					serializedName = new byte[0];
    					stringSize = 0;
    				}
    			}
    			ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
    			buffer.putInt(data.getID());
    			buffer.putInt(stringSize);
    			buffer.put(serializedName);
    			return buffer.array();
    		} catch (Exception e) {
    			throw new SerializationException("Error when serializing Customer to byte[] " + e);
    		}
    	}
    
    	@Override
    	public void close() {
    		// nothing to close
    	}
    
    }

    实现CustomerSerializer后可以定义ProducerRecord<String, Customer>并且直接发送Customer对象。上述例子虽然非常简单,但一般不建议使用自定义的序列化器,因为如果一旦需要修改,例如更改customerID的类型为Long,或者添加新的startDate属性,那么在维护新旧消息代码的兼容性时会遇到不同程度的问题。

    5.2 Apache Avro

    Apache Avro是一个数据序列化系统,它支持丰富的数据结构,提供了紧凑的,快速的,二进制的数据格式。当使用Avro读取消息时,需要先读取整个的schema。为了实现这一点,可以使用了一个名为Schema Registry的架构,Confluent Schema Registry是实现该架构的开源软件之一。

    我们只需要设置schema.registry.url属性即可:

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
    props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
    props.put("schema.registry.url", schemaUrl);
    
    String topic = "customerContacts";
    KafkaProducer<String, Customer> producer = new KafkaProducer<String, Customer>(props);
    // We keep producing new events
    while (true) {
    	Customer customer = CustomerGenerator.getNext();
    	System.out.println("Generated customer " + customer.toString());
    	ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);
    	producer.send(record);
    }

    6. 分区

    在上述提过,在创建消息时既可以指定key也可以不指定。Key除了可以保存额外的信息之外,还用于决定消息将会写入哪个分区,也就是说具有相同key的消息都会保存在同一分区。

    当key为空且使用默认的分区器时,消息会被随机发送到指定topic的其中一个可用分区,会使用round-robin算法均衡分区间的消息。

    当key不为空且使用默认的分区器时,Kafka会计算该key的hash值(使用其自己的hash算法,因此当升级Java版本时hash值不会改变),并使用得到的hash值把消息映射到特定的分区。因为把一个key始终映射到同一分区是非常重要的,所以需要使用一个topic的所有分区来计算映射关系,而不仅仅是可用的分区。这意味着,如果当写入消息到一个不可用的分区时,会出现异常,但是这种情况很少见。

    只要一个topic的分区数量不变,key与分区的映射关系就能保证一致。但是如果你添加一个新的分区到一个topic时,虽然存在的数据仍然会保存在原来的分区里,但具有相同key的新消息不能保证还会写入到原来的分区。所以在创建topic时最好预先定义好需要的分区数量,避免后期添加新的分区造成映射关系的不一致。

    6.1 自定义分区器

    在使用默认的分区器时有可能会造成数据倾斜,数据被集中写入到某个分区,因此Kafka支持自定义的分区器,实现自己的分区策略。以下是示例代码,用于把指定key的数据写到最后一个分区里,其余key对应的值按照hash算法写入到其它分区:

    import java.util.List;
    import java.util.Map;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.record.InvalidRecordException;
    import org.apache.kafka.common.utils.Utils;
    
    public class CustomerPartitioner implements Partitioner {
    	
    	private Map<String, ?> configs;
    
    	@Override
    	public void configure(Map<String, ?> configs) {
    		this.configs = configs;
    	}
    
    	@Override
    	public int partition(String topic, Object key, byte[] keyBytes,
    			Object value, byte[] valueBytes, Cluster cluster) {
    		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    		int numPartitions = partitions.size();
    		if ((keyBytes == null) || (!(key instanceof String))) {
    			throw new InvalidRecordException("We expect all messages to have customer name as key");
    		}
    		// Customer key will always go to last partition
    		// Other records will get hashed to the rest of the partitions
    		if (((String) key).equals(configs.get("key"))) {
    			return numPartitions;
    		}
    		return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1));
    	}
    
    	@Override
    	public void close() {
    		// do something here
    	}
    
    }

    END O(∩_∩)O

    展开全文
  • kafka发送消息的三种方式

    千次阅读 2019-06-10 21:10:31
    package ... import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import o...
    package com.zl.kafkademo;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.quartz.*;
    import org.quartz.impl.StdSchedulerFactory;
    
    import java.util.Properties;
    
    /**
     * @Auther: le
     * @Date: 2019/4/23 22:05
     * @Description:
     */
    public class MyProducer implements Job {
        private static KafkaProducer<String,String> producer;
    
        static {
            Properties properties = new Properties();
            properties.put("bootstrap.servers","127.0.0.1:9092");
            properties.put("key.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<String, String>(properties);
        }
    
        /**
         * 第一种直接发送,不管结果
         */
        private static void sendMessageForgetResult(){
            ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                    "kafka-study","name","Forget_result"
            );
            producer.send(record);
            producer.close();
        }
    
        /**
         * 第二种同步发送,等待执行结果
         * @return
         * @throws Exception
         */
        private static RecordMetadata sendMessageSync() throws Exception{
            ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                    "kafka-study","name","sync"
            );
            RecordMetadata result = producer.send(record).get();
            System.out.println(result.topic());
            System.out.println(result.partition());
            System.out.println(result.offset());
            return result;
        }
    
        /**
         * 第三种执行回调函数
         */
        private static void sendMessageCallback(){
            ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                    "kafka-study","name","callback"
            );
            producer.send(record,new MyProducerCallback());
        }
    
        //定时任务
        @Override
        public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
            try {
                sendMessageSync();
            }catch (Exception e){
                System.out.println("error:"+e);
            }
    
        }
    
        private static class MyProducerCallback implements Callback{
    
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e !=null){
                    e.printStackTrace();
                    return;
                }
                System.out.println(recordMetadata.topic());
                System.out.println(recordMetadata.partition());
                System.out.println(recordMetadata.offset());
                System.out.println("Coming in MyProducerCallback");
            }
        }
    
    
        public static void main(String[] args){
            //sendMessageForgetResult();
            //sendMessageCallback();
            JobDetail job = JobBuilder.newJob(MyProducer.class).build();
    
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build();
    
            try {
                Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
                scheduler.scheduleJob(job,trigger);
                scheduler.start();
            }catch (SchedulerException e){
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
    
    }
    

    需要引入文件:

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.10.0.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz</artifactId>
                <version>2.3.0</version>
            </dependency>

    测试方法:

    MAC下操作指令:

    1、创建主题:

    ./kafka-topics.sh --create --topic kafka-study --zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1

    2、运行上述程序,执行定时任务

    3、查看消费情况

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-study --from-beginning

    windows操作指令:

     1、进入  D:\zookeeper-3.4.14\bin   打开新的cmd,输入“zkServer“,运行Zookeeper
     2、进入 D:\kafka_2.11-0.11.0.0 运行cmd
      .\bin\windows\kafka-server-start.bat .\config\server.properties
     3、 创建主题,
        进入D:\kafka_2.11-0.11.0.0运行cmd,输入:
        .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
        查看已创建主题:
        .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
        查看指定主题的详细信息:
        .\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test
        查看主题消费详情:
        .\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kafka-study --from-beginning
        

    展开全文
  • Kafka生产者发送消息的三种方式

    千次阅读 2019-06-06 05:53:47
    Kafka发送消息主要有三种方式:1.发送并忘记 2.同步发送 3.异步发送+回调函数 方式一:发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理) 发送并忘记的方式本质上也是一种异步的方式,只是它不会...

    Kafka发送消息主要有三种方式:1.发送并忘记 2.同步发送 3.异步发送+回调函数

    方式一:发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理)
    发送并忘记的方式本质上也是一种异步的方式,只是它不会获取消息发送的返回结果,这种方式的吞吐量是最高的,但是无法保证消息的可靠性

    方式二:同步发送(通过get方法等待Kafka的响应,判断消息是否发送成功)
    以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断, 可以明确地知道每条消息的发送情况,但是由于同步的方式会阻塞,只有当消息通过get返回future对象时,才会继续下一条消息的发送

    方式三:异步发送+回调函数(消息以异步的方式发送,通过回调函数返回消息发送成功/失败)
    在调用send方法发送消息的同时,指定一个回调函数,服务器在返回响应时会调用该回调函数,通过回调函数能够对异常情况进行处理,当调用了回调函数时,只有回调函数执行完毕生产者才会结束,否则一直会阻塞

    三种方式的应用场景:

    场景1:如果业务要求消息必须是按顺序发送的,那么可以使用同步的方式,并且只能在一个partation上,结合参数设置retries的值让发送失败时重试,设置max_in_flight_requests_per_connection=1,可以控制生产者在收到服务器晌应之前只能发送1个消息,从而控制消息顺序发送;

    场景2:如果业务只关心消息的吞吐量,容许少量消息发送失败,也不关注消息的发送顺序,那么可以使用发送并忘记的方式,并配合参数acks=0,这样生产者不需要等待服务器的响应,以网络能支持的最大速度发送消息;

    场景3:如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用异步+回调的方式来发送消息,配合参数retries=0,并将发送失败的消息记录到日志文件中

    展开全文
  • WebSocket 实现链接 发送消息

    千次阅读 2019-03-01 19:13:00
    Websocket 原理浅析地址:... 直接上代码: myWebSocket.py 文件中 """ 下载 gevent-websocket 0.10.1 ... 基于Flask + geventWebSocket 建立连接,发送消息 """ from geven...

    Websocket 原理浅析地址: https://www.cnblogs.com/yuanyongqiang/articles/10457793.html

    直接上代码:

     myWebSocket.py 文件中

    """
     下载 gevent-websocket 0.10.1
     基于Flask + geventWebSocket 建立连接,发送消息
    """
    
    from geventwebsocket.server import WSGIServer  # 我要WSGI为我提供服务
    from geventwebsocket.handler import WebSocketHandler  # WSGI遇到WS协议的时候,处理方式
    from geventwebsocket.websocket import WebSocket  # 语法提示,使用方法: "# type:WebSocket"
    from flask import Flask, request
    
    app = Flask(__name__)
    
    
    @app.route("/ws")
    def my_websocket_func():
        print(dir(request.environ))
        # ['__class__', '__contains__', '__delattr__', '__delitem__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__subclasshook__', 'clear', 'copy', 'fromkeys', 'get', 'items', 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values']
    
        # 客户端服务器之间2次链接生成了长连接(大致流程这么理解,细节不是很对,有待深究)
        # 第1次
        # 当 WSGIServer 遇到 ws 协议请求的时候, 就会交给 WebSocketHandler去处理(验证),
        # WebSocketHandler 处理完之后,把验证信息发给客户端,断开连接.
        # 第2次
        # 客户端收到验证信息立,马再次发送友好链接,这时候,websocket就会保持住你的链接.那么链接 就会存放在这个位置上(wsgi.websocket).
        user_socket = request.environ.get("wsgi.websocket")    # type:WebSocket  # 写了这个,下边再使用user_socket的时候就有提示了.
       # 获取到websocket链接.
    print(user_socket) return "success 了" if __name__ == '__main__': # WSGIServer(("地址", 端口), 应用, WSGI遇到WS协议的时候处理类型使用 WebSocketHandler) 实例化一个WSGIServer的对象 http_server = WSGIServer(("0.0.0.0", 9527), application=app, handler_class=WebSocketHandler) # WSGIServer对象.serve_forever() 把应用永远的跑起来 http_server.serve_forever() # 这样运行起来的话,所有的提示都不在IDE环境中显示了,都会包裹在 WebSocketHandler 中

     

    这时候在浏览器中 访问 http://127.0.0.1:9527/ws 就可以了

     

    那么问题来了:

    浏览器默认是http请求,那我该怎么发送ws请求呢?

    这样,我们写一个静态网页,通过 javascript 来实现发送 ws请求:

    my_websocker.html 文件中:

    <!DOCTYPE html>
    <html lang="zh-CN">
    <head>
        <meta http-equiv="content-Type" charset="UTF-8">
        <meta http-equiv="x-ua-compatible" content="IE=edge">
        <meta name="viewport" content="width=device-width, initial-scale=1">
        <title>Title</title>
    </head>
    <body>
    
    <script type="text/javascript">
        var ws = new WebSocket("ws://127.0.0.1:9527/ws")
        // 定义一个变量 ws,
        // new一个对象 WebSocket,用来创建连接.
        // 来发送 ws 协议请求 "ws://127.0.0.1:9527/ws"
    
    </script>
    
    </body>
    </html>

     这时候通过网页访问:

     

    就出来了,这就代表链接创建成功了

     

     下面我们就开始实现服务器和客户端的消息通信:

     myWebSocket.py 文件中

    """
     下载 gevent-websocket 0.10.1
     基于Flask + geventWebSocket 建立连接,发送消息
    """
    
    from geventwebsocket.server import WSGIServer  # 我要WSGI为我提供服务
    from geventwebsocket.handler import WebSocketHandler  # WSGI遇到WS协议的时候,处理方式
    from geventwebsocket.websocket import WebSocket  # 语法提示,使用方法: "# type:WebSocket"
    from flask import Flask, request
    
    app = Flask(__name__)
    
    
    @app.route("/ws")
    def my_websocket_func():
        print(dir(request.environ))
        # ['__class__', '__contains__', '__delattr__', '__delitem__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__subclasshook__', 'clear', 'copy', 'fromkeys', 'get', 'items', 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values']
    
        # 客户端服务器之间2次链接生成了长连接(大致流程这么理解,细节不是很对,有待深究)
        # 第1次
        # 当 WSGIServer 遇到 ws 协议请求的时候, 就会交给 WebSocketHandler去处理(验证),
        # WebSocketHandler 处理完之后,把验证信息发给客户端,断开连接.
        # 第2次
        # 客户端收到验证信息立,马再次发送友好链接,这时候,websocket就会保持住你的链接.那么链接 就会存放在这个位置上(wsgi.websocket).
        user_socket = request.environ.get("wsgi.websocket")  # type:WebSocket   # 写了这个,下边再使用 user_socket 的时候就有提示了
       获取到 websocket链接
    print(user_socket) # <geventwebsocket.websocket.WebSocket object at 0x03B89688> while 1: msg = user_socket.receive() # 等待接收客户端发送过来的消息 print(msg) user_socket.send(msg) # 把接收到的消息再发送回去 if __name__ == '__main__': # WSGIServer(("地址", 端口), 应用, WSGI遇到WS协议的时候处理类型使用 WebSocketHandler) 实例化一个WSGIServer的对象 http_server = WSGIServer(("0.0.0.0", 9527), application=app, handler_class=WebSocketHandler) # WSGIServer对象.serve_forever() 把应用永远的跑起来 http_server.serve_forever() # 这样运行起来的话,所有的提示都不在IDE环境中显示了,都会包裹在 WebSocketHandler 中

     

    my_websocker.html 文件中:

    <!DOCTYPE html>
    <html lang="zh-CN">
    <head>
        <meta http-equiv="content-Type" charset="UTF-8">
        <meta http-equiv="x-ua-compatible" content="IE=edge">
        <meta name="viewport" content="width=device-width, initial-scale=1">
        <title>Title</title>
    </head>
    <body>
    
    <script type="text/javascript">
        var ws = new WebSocket("ws://127.0.0.1:9527/ws");
        // 定义一个变量 ws,
        // new一个对象 WebSocket,用来创建链接.
        // 来发送 ws 协议请求 "ws://127.0.0.1:9527/ws"
    

       // 因为不知道建立起链接之后,服务端什么时候会给我发消息.所以这里需要一个当有消息来了触发的一个函数: ws.onmessage = function (websocket_data) { console.log(websocket_data.data); } // 当 ws 收到消息时执行 onmessage // 触发 收到消息 用 onmessage </script> </body> </html>

    my_websocker.html 访问:

    通过 ws.send("你好") 给服务器发信息,可以看到服务器回复的信息.

     

    readyState: 0  0代表没有创建 

    readyState: 1  1代表当前状态正确,可以使用了 

    readyState: 3  3代表曾经创建成功了,又关上了

     

    这个版本仅仅是为了了解WebSocket 运行,未作容错,所以在页面刷新的时候后台会报错...

    转载于:https://www.cnblogs.com/yuanyongqiang/p/10458308.html

    展开全文
  • 04 消息发送样例

    2020-05-17 23:47:20
    文章目录4.1、基本样例4.1.1、消息发送1)发送同步消息2)发送异步消息3)单向发送消息4.1.2 消费消息1)负载均衡模式(默认)4.2、顺序消息4.2.1 顺序消息生产4.2.2 顺序消费消息4.3、消息延时4.3.1、 启动消息消费...
  • MQ发送普通消息(三种方式)

    万次阅读 2017-05-07 21:03:14
    MQ 发送普通消息有三种实现方式:可靠同步发送、可靠异步发送、单向(Oneway)发送。本文介绍了每种实现的原理、使用场景以及三种实现的异同,同时提供了代码示例以供参考。 可靠同步发送 原理:同步发送是...
  • 1.添加钉钉自定义机器人 参考文章如下: 官方网址;https://open-doc.dingtalk.com/docs/doc.htm?spm=a219a.7629140.0.0.p2lr6t&amp;treeId=257&amp;articleId=105733&amp;docType=1 2.如何使用...
  • 钉钉自定义机器人定时发送群消息

    千次阅读 2020-05-11 20:54:47
    班里要每日打卡三次,钉钉普通没有定时提示的功能,想着自己写个脚本放服务器上 直接开始 轻松实现钉钉机器人定时发消息 因为其他的上面的文章都有了 所以只记一下关于Crontab实现定时推送的内容 crontab -e ...
  • 文/chengf 图片来源于网络01使用场景及环境要求使用场景:定时上下班打卡,定时提醒等。系统环境:windows/Mac(linux没试过,但是应该和mac原理差不多)程序环境:P...
  • 异步消息及使用ActiveMQ发送消息

    千次阅读 2018-08-21 21:01:08
    一,异步消息  像RMI和Hession/Burlap... 有两个主要概念,消息代理(message broker)和目的地(destination),当一个应用消息发送时,会将消息发送消息代理,消息代理可以确保消息被投递到目的地,同时解...
  • 微信公众号开发之接收与发送消息

    万次阅读 2018-12-28 22:04:14
    说明:该篇博客是博主一字一码编写的,实属不易,请... 公众号接收与发送消息 验证URL有效性成功后即接入生效,成为开发者。如果公众号类型为服务号(订阅号只能使用普通消息接口),可以在公众平台网站中申请认证...
  • 微信自动发送消息

    万次阅读 2018-01-14 16:38:52
    今天加入微信辅助大军,奈何要一直去广告,又懒又烦!!! 于是乎,想到能不能自动去打广告~ 可以的~ 哈哈 方案: 最近在看api文档,就最先想到能不能java模拟发送信息,但是又没头绪(放弃) 然后百度...
  • 发送信息(274,61440,0) 控件.发送信息(274,61450,0) 控件最大化: 控件.发送信息(274,61488,0) 参数2在61488-61503都有效 移动控件: 控件.发送信息(274,61449,0) 2.调整控件尺寸 调整左边: 控件.发送信息...
  • webQQ协议——发送消息

    千次阅读 2014-03-08 23:39:13
    用法: 传入一个已经登录的QQ, 调用send...)可以通过getInfo模块, 确定要发送消息的好友的uin), word为要发送的话。。) webQQLogin在http://blog.csdn.net/qq506657335/article/details/20801793 getInfo在http://blog.c
  • 微信开发之发送消息接口

    万次阅读 2016-12-23 16:53:57
    发送消息,是指用户公众号向用户发送相应形式的消息。根据微信开发文档,由以下四种形式:被动回复,群发接口,客服消息接口以及模板消息接口。本文将基于Java语言以及个人微信测试号,说明被动回复、客服消息接口...
  • 上一章我们讲了怎么在linux设置ip地址,相信大家都已经掌握了,今天我们来讲讲怎么linux系统当中给用户发消息。首先我们给用户发消息前提是该用户必须是在线的,之前教程有说怎么查看当前在线用户使用w、who命令都...
  • packJsonmsg 可按照个人模板需求来进行更改 自己重新封装下方法即可   import java.text.SimpleDateFormat; import org.apache.tools.ant.types.resources.comparators.Date; import ...
  • 发送信息 (274, 61458, 0)其中参数1的值自61457---61471都可用,结果是一样的,都是移动控件。还有一个大家都熟,就是控件.发送信息 (161, 2, 0)2.调整控件尺寸 控件.发送信息 (274, 61441, 0) (274,61442,0) (274,...
  • Windows 窗口发送消息参数详解

    万次阅读 2020-06-24 15:32:25
    窗口发送消息参数详解 // 窗口.发送消息 函数功能: 将指定的消息发送到一个窗口,同win32 api 里面的SendMessage等同的效果 中文函数原型: 发送消息(hwnd,msg,wparam,iparam) 英文函数原型: sendmessage(hwnd,...
  • rocketmq发送消息的三种方式

    千次阅读 2019-09-01 12:20:28
    从功能上来说,rocketmq支持三种发送消息的方式,分别是同步发送(sync),异步发送(async)和直接发送(oneway)。下面来简单说明一下这三种发送消息的方式,以便了解它们之间的差异。 以下的案例代码将会使用...

空空如也

1 2 3 4 5 ... 20
收藏数 2,117,201
精华内容 846,880
关键字:

发送消息