精华内容
下载资源
问答
  • 参数详解 acks buffer.memory compression.type retires batch.size linger.ms client.id max.in.flight.requests.per.connection timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms max.block....

     

    前言

    参数详解

    acks

    buffer.memory

    compression.type

    retires

    batch.size

    linger.ms

    client.id

    max.in.flight.requests.per.connection 

    timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms

    max.block.ms

    max.request.size

    receive.buffer.bytes和send.buffer.bytes


    前言

    本文节选自《Kafka权威指南》一书,内容比较权威。

    生产者有很多可配置的参数,在 kafka文档 里都有说明,它们大部分都有合理的默认值,所以没有必要去修改它 。不过有几个参数在内存使用、性能和可靠性方面对生产者影响比较大,接下来我们会一一说明。

    参数详解

    acks

    默认:1

    acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。 主参数有如下选项。

    • 如果 acks=0 ,生产者在成功写入消息之前不会等待任何来自服务器的相应。也就是说,如果当中出现了问题,导致服务器没有收到消息。那么生产者就无从得知,消息也就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
    • 如果 acks=1 ,只要集群的 leader 节点收到消息,生产这就会收到一个来自服务器的成功响应。如果消息无法达到 leader 节点(比如 leader 节点崩溃,新的 leader 还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新 leader ,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
    • 如果 acks=-1 ,只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。

    buffer.memory

    默认:32MB

    该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send() 方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffe.full 参数(在0.9.0.0 版本里被替换成了 max.block.ms ,表示在抛出异常之前可以阻塞一段时间)。

    compression.type

    默认:none

    默认情况下,消息发送时不会被压缩。该参数可以设置成 snappygziplz4,它指定了消息发送给 broker 之前使用哪一种压缩算法。snappy压缩算法由 Google 发明,它占用较少的CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种压缩算法。gzip压缩算法一般会占用比较多的CPU,但会提供更高的压缩,所以如果网络带宽有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向kafka发送消息的瓶颈所在。

    retires

    默认:0

    生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到 leader )。在这种情况下,retires 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,可以通过 retry.backoff.ms 参数来修改这个时间间隔。建议在设置重试次数和等待间隔之前,先测一下恢复一个崩溃节点需要多少时间(比如所有分区选举出 leader 需要多长时间),让总的重试时间比kafka集群从崩溃中恢复的时间长,否则生产者会过早的放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如消息太大错误)。一般情况下,因为生产者会自动重试,所以没必要在代码逻辑里处理那些可重试的错误,你只需要处理那些不可能重试错误或重试次数超过上限的情况。

    batch.size

    默认:16KB

    当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也可能被发送。所以就算把 batch.size 设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置的太小,生产者会因为频繁发送消息,会增加一些额外的开销。

    linger.ms

    默认:0

    该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,就算批次里只有一个消息,生产者也会把消息发送出去。把 linger.ms 设置成大于0的数,让生产者在发送批次前等一会,使更多的消息加入这个批次,虽然这样做会增加延迟,但也会提高吞吐量。

    client.id

    默认:""

    该参数可以是任意的字符串,服务器会用它识别消息的来源,还可以用在日志和配额指标里。

    max.in.flight.requests.per.connection 

     默认:5

    该参数指定了生产者在收到服务器响应之前可以发送多少消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

    timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms

    默认:

    timeout.ms:

    request.timeout.ms:30s

    metadata.fetch.timeout.ms:

    request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间,metadata.fetch.timeout.ms 指定了生产这在获取元数据(比如目标分区的 leader 是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或者执行回调)。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 acks 的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么 broker 会返回一个错误。

    max.block.ms

    默认:60s

    该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的缓冲区已满,或没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者抛出超时异常。

    max.request.size

    默认:1MB

    该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小。例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB ,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。另外, broker 对可接收的消息最大值也有自己的限制(message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。

    receive.buffer.bytes和send.buffer.bytes

    默认

    receive.buffer.bytes:32KB

    send.buffer.bytes:128KB

    这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小,如果它们被设置成 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

    展开全文
  • kafka生产者参数配置使用于各种编程语言重要参数已标明kafka生产者参数配置使用于各种编程语言重要参数已标明
  • kafka生产者实例配置参数

    千次阅读 2020-03-14 16:42:44
    bootstrap.servers: 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。 ...

    KafkaProducer中有三个参数是必填的:

    bootstrap.servers: 指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个。这里并非需要所有的broker地址,因为生产者会从给定的broker里寻找其它的broker。

    key.serializer和value.serializer:broker接收消息必须以字节数组byte[]形式存在,KafkaProducer<K,V>和ProducerRecord<K,V>中的泛型就是key和value的类型。key.serializer和value.serializer分别用来指定key和value序列化操作的序列化器,无默认值。类的全限定名。

    非必填参数

    client.id:这个参数用来设定kafkaProducer对应的客户端id,默认值为“”,如果不设置,会自动生成一个非空字符串,内容形式如:“producer-1”,“producer-2”…

    retries:配置生产者重试次数,对于可重试异常,那么只要在规定的次数内自行恢复了,就不会抛出异常,默认是0。

    retry.backoff.ms用来设定两次重试之间的时间间隔,默认值100。

    partitioner.class:显示配置使用哪个分区器。

    interceptor.classes:指定自定义拦截器,多个传List集合。

    buffer.memory:生产者客户端RecordAccumulator缓存大小,默认值为33554432B,即32M。

    batch.size:ProducerBatch可以复用内存区域的大小

    max.block.ms:最大阻塞时间,RecordAccumulator缓存不足时或者没有可用的元数据时,KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,此参数的默认值为60000,即60s。

    metadata.max.age.ms:当客户端超过这个时间间隔时就会更新元数据信息默认值300000,即5分钟。元数据指集群中有哪些主题,主题有哪些分区,每个分区leader副本在哪个节点上,follower副本在哪个节点上,哪些副本在AR,ISR等集合中,集群中有哪些节点等等。

    acks:用来指定必须要多少个副本收到这条消息,之后生产者才会认为这条消息成功写入。acks是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks参数有三种类型的值:字符串类型,不是整型。

    ​ 1:只要分区的leader副本写入成功,生产者就会收到来自服务端的成功响应。如果再被其它follower副本拉取前leader副本崩溃,那么此时消息还是会丢失。

    ​ 0:生产者发送消息之后不需要等待任何服务端的响应。如果在消息发送到写入kafka的过程中出现了某些异常,导致kafka并没有收到这条消息,那么生产者也无从得知,消息会丢失。

    ​ -1或者all:生产者发送消息之后,需要等待ISR中所有副本成功写入消息之后才能收到来自服务端的成功响应。

    max.request.size:用来限制生产者客户端能发送的消息的最大值,默认值为1048576B,即1MB。这个参数涉及到其它参数的联动,比如broker端的message.max.bytes参数。对kafka没有足够把控的时候不要更改此参数。

    compression.type:指定消息的压缩方式,默认值为"none",可以配置为"gzip",“snappy”和“lz4”。

    connections.max.idle.ms:用来指定多久之后关闭闲置的连接,默认值540000(ms),即9min

    linger.ms:生产者发送ProducerBatch之前等待更多消息(ProducerRecoder)加入ProducerBatch的时间,默认值为0。生产者客户端会在ProducerBatch被填满或者等待时间超过linger.ms时发送出去。增大这个参数的值会增加消息的延迟,但同时会提高吞吐量。

    receive.buffer.bytes:用来设置socket接收缓冲区的大小,默认值为32768(B),即32KB,如果设置为-1,则使用操作系统的默认值。

    send.buffer.bytes:用来设置socket发送缓冲区的大小,默认值为131072(B),即128KB,如果设置为-1,则使用操作系统默认值。

    request.timeout.ms:配置Producer等待请求响应的最长时间,默认值为30000(ms),请求超时之后可以进行重试。注意这个参数需要比broker端参数replica.lag.time.max.ms的值要大,这样可以减少因客户端重试而引起的消息重复的概率。

    enable.idempotence:是否开启幂等性功能,默认值false

    max.in.flight.requests.per.connection:限制每个连接,也就是客户端与Node之间的连接最多缓存请求数,默认值5

    transactional.id:设置事物id,必须唯一,默认值null

    实时内容请关注微信公众号,公众号与博客同时更新:程序员星星
    在这里插入图片描述

    展开全文
  • Kafka生产者属性配置

    千次阅读 2018-11-07 13:14:29
    不同的场景对生产者API的使用和配置会有直接影响。 必选属性有3个: bootstrap.servers:该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查询...

    多样的使用场景意味着多样的需求:是否每个消息都很重要?是否允许丢失一小部分消息?偶尔出现重复消息是否可以接受?是否有严格的延迟和吞吐量要求?

    不同的场景对生产者API的使用和配置会有直接影响。

    必选属性有3个:

    bootstrap.servers:该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查询其他broker的信息。不过最少提供2个broker的信息,一旦其中一个宕机,生产者仍能连接到集群上。

    key.serializer:生产者接口允许使用参数化类型,可以把Java对象作为键和值传broker,但是broker希望收到的消息的键和值都是字节数组,所以,必须提供将对象序列化成字节数组的序列化器。key.serializer必须设置为实现org.apache.kafka.common.serialization.Serializer的接口类,默认为

    org.apache.kafka.common.serialization.StringSerializer,也可以实现自定义的序列化器。

    value.serializer:同上。

    可选参数:

    acks:指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。

    acks=0:生产者在写入消息之前不会等待任何来自服务器的响应,容易丢消息,但是吞吐量高。

    acks=1:只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新首领没有选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。默认使用这个配置。

    acks=all:只有当所有参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。

    buffer.memory:设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。

    max.block.ms:指定了在调用send()方法或者使用partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。

    batch.size:当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。

    retries:指定生产者可以重发消息的次数。

    receive.buffer.bytes和send.buffer.bytes:指定TCP socket接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用操作系统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

    linger.ms:指定了生产者在发送批次前等待更多消息加入批次的时间。

     

    展开全文
  • Kafka生产者参数优化

    2020-09-02 15:38:07
    在实际的kafka开发中,我们会发现,无论是生产者还是消费者,都需要构建一个Properties对象,里面设置了很多参数。 在这段代码中有很多常用的参数配置,在线上使用时,我们要根据实际的数据量和数据大小来决定这些...

    在实际的kafka开发中,我们会发现,无论是生产者还是消费者,都需要构建一个Properties对象,里面设置了很多参数。
    在这段代码中有很多常用的参数配置,在线上使用时,我们要根据实际的数据量和数据大小来决定这些配置的具体值。下面来挑出其中比较重要的几个参数来详细解析一下。

    Properties props = new Properties();
    //集群地址,多个服务器用","分隔
    props.put("bootstrap.servers", "192.168.72.141:9092,192.168.72.142:9092,192.168.72.143:9092");
    //重新发送消息次数,到达次数返回错误
    props.put("retries", 0);
    //Producer会尝试去把发往同一个Partition的多个Requests进行合并,batch.size指明了一次Batch合并后Requests总大小的上限。如果这个值设置的太小,可能会导致所有的Request都不进行Batch。
    props.put("batch.size", 163840);
    //Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。
    props.put("linger.ms", 1);
    //在Producer端用来存放尚未发送出去的Message的缓冲区大小
    props.put("buffer.memory", 33554432);
    //key、value的序列化,此处以字符串为例,使用kafka已有的序列化类
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    //props.put("partitioner.class", "com.kafka.demo.Partitioner");//分区操作,此处未写
    props.put("acks", "1");
    props.put("request.timeout.ms", "60000");
    props.put("compression.type","lz4");
    

    1、producer 参数acks设置

    在消息被认为是“已提交”之前,producer需要leader确认的produce请求的应答数。该参数用于控制消息的持久性,目前提供了3个取值:

    • acks = 0: 表示produce请求立即返回,不需要等待leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。
    • acks = -1: 表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为produce请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
    • acks = 1: 表示leader副本必须应答此produce请求并写入消息到本地日志,之后produce请求被认为成功。如果此时leader副本应答请求之后挂掉了,消息会丢失。这个方案,提供了不错的持久性保证和吞吐。

    配置推荐:

    如果要较高的持久性要求以及无数据丢失的需求,设置acks = -1。其他情况下设置acks = 1。

    2、producer参数 buffer.memory 设置(吞吐量)

    该参数用于指定Producer端用于缓存消息的缓冲区大小,单位为字节,默认值为:33554432合计为32M。kafka采用的是异步发送的消息架构,prducer启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由一个专属线程负责从缓冲区读取消息进行真正的发送。

    消息持续发送过程中,当缓冲区被填满后,producer立即进入阻塞状态直到空闲内存被释放出来,这段时间不能超过max.blocks.ms设置的值,一旦超过,producer则会抛出TimeoutException 异常,因为Producer是线程安全的,若一直报TimeoutException,需要考虑调高buffer.memory 了。
    用户在使用多个线程共享kafka producer时,很容易把 buffer.memory 打满。

    3、 producer参数 compression.type 设置

    producer压缩器,目前支持none(不压缩),gzip,snappy和lz4。

    2016年8月,FaceBook开源了Ztandard。官网测试: Ztandard压缩率为2.8,snappy为2.091,LZ4 为2.101

    4、 producer参数 retries设置

    producer重试的次数设置。重试时producer会重新发送之前由于瞬时原因出现失败的消息。瞬时失败的原因可能包括:元数据信息失效、副本数量不足、超时、位移越界或未知分区等。倘若设置了retries > 0,那么这些情况下producer会尝试重试。

    5、 producer参数batch.size设置

    producer都是按照batch进行发送的,因此batch大小的选择对于producer性能至关重要。producer会把发往同一分区的多条消息封装进一个batch中,当batch满了后,producer才会把消息发送出去。但是也不一定等到满了,这和另外一个参数linger.ms有关。默认值为16K,合计为16384.

    6、 producer参数linger.ms设置

    producer是按照batch进行发送的,但是还要看linger.ms的值,默认是0,表示不做停留。这种情况下,可能有的batch中没有包含足够多的produce请求就被发送出去了,造成了大量的小batch,给网络IO带来的极大的压力。

    配置推荐:

    为了减少了网络IO,提升了整体的TPS。假设设置linger.ms=5,表示producer请求可能会延时5ms才会被发送。

    展开全文
  • 文章目录必填参数bootstrap.serverskey.serializer和value.serializer非必填参数client.idacksmax.request.sizeretries和retry.backoff.msconnections.max.idele.mslinger.msreceive.buffer.bytessend.buffer.bytes...
  • kafka生产者发送消息的流程 1、消息首先会被封装成ProducerRecord对象,ProducerRecord的构造方法有多种。 可以指定要发送的分区 spring提供的kafkaTemplate也可以指定分区发送,比如这里指定了发送到分区为1 public...
  • 必选属性有3个: bootstrap.servers:该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从...key.serializer:生产者接口允许使用参数化类型,可以把Java对象作...
  • kakfa 生产者参数 比较重要的几个参数下面列出来出来,下面将详细的说明这些参数的含义: bootstrap.servers: broker的地址 key.serializer:关键字的序列化方式 value.serializer:消息值的序列化方式 acks...
  • 基本配置metadata.broker.list:broker服务器集群列表,格式为 host1:port1, host2:port2 ...producer.type:消息发送类型同步还是异步,默认为同步compression.codec:消息的压缩格式,默认为none不压缩,也可以...
  • kafka broker的server配置路径为 $ZOOKEEPER_HOME/config/server.properties中,可根据需要进行配置 kafka broker 核心配置 #broker id,从0开始 broker.id = 0 #broker端口 port = 9092 #zookeeper连接信息 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 23,506
精华内容 9,402
关键字:

kafka生产者参数配置