精华内容
下载资源
问答
  • kafka调优
    2022-05-15 07:25:42


    1.硬件选择

    服务器个数 内存、磁盘的大小
    服务器台数= 2 * (生产者峰值生产速率 * 副本 / 100) + 1
    括号内向上进位 所以为1
    假如峰值是20m/s 副本为2 那么应该是3台

    2.生产者调优

    buffer.memory 默认是32兆 可增大
    batch.size 默认是16K 可增大
    linger.ms 默认是0S 表示直接发送 可设置为1-5ms
    acks 默认是-1 假如只是日志数据 可设置为1
    retries 重试次数 默认是int最大值
    max.in.flight.requests.per.connection 允许最大没有ack值返回的个数 默认是5
    compression.type 默认是none 支持none、gzip、snappy、lz4 和 zstd

    2.broker调优

    修改分区数 (只能增大 不能减小)
    增加副本因子
    leader负载均衡 默认是开启 建议关闭或者调大负载均衡触发的阈值
    自动创建主题 (为了安全性着想,建议关闭)

    3.消费者调优

    指定offset消费
    增大吞吐量:

    • 增大poll 默认最大500条
    • Fetch.max.bytes每批次最大抓取大小,默认50m

    总结

    提示:这里对文章进行总结:
    例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。

    更多相关内容
  • kafka调优
  • Kafka 调优

    千次阅读 2020-10-30 17:59:04
    由于kafka存在分区和副本机制 leader 副本对外提供服务,并且所有副本都采用自动话管理,所以当分区数量越多,那么自动化管理就会越多。 分区数越多也会让Kafka的正常启动和关闭的耗时变得越长。 在没有压测情况下...

    image.png

    生产者

    缓冲区大小与Sender线程发送包大小

    从生产者架构图我们可以看出,有两个地方是比较重要且影响性能的:

    1. RecordAccumulator内存缓冲区;
    2. Sender线程的两个阈值;
    • batch.size:只有数据积累到 batch.size之后,sender 才会发送数据,默认16kb。
    • linger.ms:如果数据迟迟未达到 batch.sizesender 等待 linger.time之后就会发送数据。
    • buffer.memory:RecordAccumulator 缓冲区大小 默认32M。

    这两个区域参数需要根据发送消息的QPS、每条消息的占用空间的大小和Sender线程发送消息的速度进行容量预估,然后通过压测工具进行验证。

    缓冲区域默认是32M,如果一条消息占用空间大小是512B,那么这个区域最大存放消息的数据就是32*1024*1024/512=65535条数据。
    Sender线程一次发送数据量的大小,默认是16kB,16*1024/512=32,也就是说一次可以发送32条数据。那么65535条数据就需要发送2048次。

    消息发送确认机制acks

    生产者中还有一个比较重要的参数会影响性能acks(确认机制):

    • 1:当消息写到 leader 成功后向生产者返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据,默认是这个级别;
    • 0:不向生产者返回ack,这种情况下生产者延迟最低,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
    • -1(all):当ISR中的所有副本都成功写入消息之后才像生产者返回ack。这种方式数据可靠,但是延迟最高;如果ISR副本同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复

    重试机制(retries和retry.backoff.ms)

    • retries:配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。
    • retry.backoff.ms:设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100。

    消费者配置

    fetch.min.bytes:

    每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。

    auto.commit.enable

    如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程挂掉时,由新的consumer使用。

    选择合适的分区数

    在Kafka中,性能与分区数有着必然的关系,在设定分区数时一般也需要考虑性能的因素。但分区数越多吞吐量就越高吗?

    分区是Kafka 中最小的并行操作单元,对生产者而言,每一个分区的数据写入是完全可以并行化的;对消费者而言,Kafka 只允许单个分区中的消息被一个消费者线程消费,一个消费组的消费并行度完全依赖于所消费的分区数。但是分区数不是越多越好,这需要进行压测,最后得出一个比较好的一个分区数。

    分区太多会带来的问题:

    1. 分区数会占用文件描述符,而一个进程所能支配的文件描述符是有限的。
    2. 分区数的多少还会影响系统的可用性。由于kafka存在分区和副本机制 leader 副本对外提供服务,并且所有副本都采用自动话管理,所以当分区数量越多,那么自动化管理就会越多。
    3. 分区数越多也会让Kafka的正常启动和关闭的耗时变得越长。

    在没有压测情况下建议将分区数设定为集群中broker的倍数。假定集群中有3个broker节点,可以设定分区数为3、6、9等,至于倍数的选定可以参考预估的吞吐量。但是还是建议进行压测,进而找到最优的分区数。

    常见优化

    提升吞吐量

    image.png

    保证低时延

    image.png

    保证高持久

    image.png

    参考

    https://www.cnblogs.com/gxyandwmm/p/11420736.html

    展开全文
  • Kafka调优

    2020-10-20 14:57:25
    配置参数调优 broker配置 num.recovery.threads.per.data.dir 对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段: 服务器正常启动,用于打开每个分区的日志片段; 服务器崩溃后重启,用于检查和...

    配置参数调优

    broker配置

    num.recovery.threads.per.data.dir

    对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段:

    • 服务器正常启动,用于打开每个分区的日志片段;
    • 服务器崩溃后重启,用于检查和截短每个分区的日志片段;
    • 服务器正常关闭,用于关闭日志片段。

    默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。

    主题的默认配置

    num.partitions

    num.partitions 参数指定了新创建的主题将包含多少个分区。如果启用了主题自动创建功能(该功能默认是启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。Kafka 集群通过分区对主题进行横向扩展,所以当有新的 broker 加入集群时,可以通过分区个数来实现集群的负载均衡。拥有大量消息的主题如果要进行负载分散,就需要大量的分区。

    如何选定分区数量

    你需要很多分区,但不能太多。因为分区越多,占用的内存越多,完成首领选举需要的时间也越长。如果你估算出主题的吞吐量和消费者吞吐量,可以用主题吞吐量除以消费者吞吐量算出分区的个数。也就是说,如果每秒钟要从主题上写入和读取 1GB 的数据,并且每个消费者每秒钟可以处理 50MB 的数据,那么至少需要 20 个分区。这样就可以让 20 个消费者同时读取这些分区,从而达到每秒钟 1GB 的吞吐量。

    log.retention.ms

    Kafka 通常根据时间来决定数据可以被保留多久。默认使用 log.retention.hours 参数来配置时间,默认值为 168 小时,也就是一周。除此以外,还有其他两个参数 log.retention.minutes 和 log.retention.ms 。这 3 个参数的作用是一样的,都是决定消息多久以后会被删除,不过还是推荐使用 log.retention.ms 。如果指定了不止一个参数,Kafka 会优先使用具有最小值的那个参数。

    log.retention.bytes

    另一种方式是通过保留的消息字节数来判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设为 1GB,那么这个主题最多可以保留 8GB 的数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。

    根据字节大小和时间保留数据

    如果同时指定了 log.retention.bytes 和 log.retention.ms (或者另一个时间参数),只要任意一个条件得到满足,消息就会被删除。例如,假设 log.retention.ms 设置为 86 400 000(也就是 1 天),log.retention.bytes 设置为 1 000 000 000(也就是 1GB),如果消息字节总数在不到一天的时间就超过了 1GB,那么多出来的部分就会被删除。相反,如果消息字节总数小于 1GB,那么一天之后这些消息也会被删除,尽管分区的数据总量小于 1GB。

    log.segment.bytes

    以上的设置都作用在日志片段上,而不是作用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片段上。当日志片段大小达到 log.segment.bytes 指定的上限(默认是 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就会越频繁地关闭和分配新文件,从而降低磁盘写入的整体效率。

    如果主题的消息量不大,那么如何调整这个参数的大小就变得尤为重要。例如,如果一个主题每天只接收 100MB 的消息,而 log.segment.bytes 使用默认设置,那么需要 10 天时间才能填满一个日志片段。因为在日志片段被关闭之前消息是不会过期的,所以如果 log.retention.ms 被设为 604 800 000(也就是 1 周),那么日志片段最多需要 17 天才会过期。

    这是因为关闭日志片段需要 10 天的时间,而根据配置的过期时间,还需要再保留 7 天时间(要等到日志片段里的最后一个消息过期才能被删除)。
    简单来说,对于使用时间戳获取偏移量的操作来说,日志片段越小,结果越准确。(这句没懂)

    log.segment.ms

    另一个可以控制日志片段关闭时间的参数是 log.segment.ms ,它指定了多长时间之后日志片段会被关闭。就像 log.retention.bytes 和 log.retention.ms 这两个参数一样,log.segment.bytes 和 log.retention.ms 这两个参数之间也不存在互斥问题。日志片段会在大小或时间达到上限时被关闭,就看哪个条件先得到满足。默认情况下,log.segment.ms 没有设定值,所以只根据大小来关闭日志片段。
    在使用基于时间的日志片段时,要着重考虑并行关闭多个日志片段对磁盘性能的影响。如果多个分区的日志片段永远不能达到大小的上限,就会发生这种情况,因为 broker 在启动之后就开始计算日志片段的过期时间,对于那些数据量小的分区来说,日志片段的关闭操作总是同时发生。

    message.max.bytes

    broker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认值是 1 000 000,也就是 1MB。如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误信息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 message.max.bytes 指定的值,消息的实际大小可以远大于这个值。

    这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。

    在服务端和客户端之间协调消息大小的配置

    消费者客户端设置的 fetch.message.max.bytes 必须与服务器端设置的消息大小进行协调。如果这个值比 message.max.bytes 小,那么消费者就无法读取比较大的消息,导致出现消费者被阻塞的情况。在为集群里的 broker 配置 replica.fetch.max.bytes 参数时,也遵循同样的原则。

    kafka集群配置

    需要多少个broker

    一个 Kafka 集群需要多少个 broker 取决于以下几个因素。首先,需要多少磁盘空间来保留数据,以及单个 broker 有多少空间可用。如果整个集群需要保留 10TB 的数据,每个 broker 可以存储 2TB,那么至少需要 5 个 broker。如果启用了数据复制,那么至少还需要一倍的空间,不过这要取决于配置的复制系数是多少(将在第 6 章介绍)。也就是说,如果启用了数据复制,那么这个集群至少需要 10 个 broker。

    第二个要考虑的因素是集群处理请求的能力。这通常与网络接口处理客户端流量的能力有关,特别是当有多个消费者存在或者在数据保留期间流量发生波动(比如高峰时段的流量爆发)时。如果单个 broker 的网络接口在高峰时段可以达到 80% 的使用量,并且有两个消费者,那么消费者就无法保持峰值,除非有两个 broker。如果集群启用了复制功能,则要把这个额外的消费者考虑在内。因磁盘吞吐量低和系统内存不足造成的性能问题,也可以通过扩展多个 broker 来解决。(这里没看懂)

    操作系统调优

    虚拟内存

    一般来说,Linux 的虚拟内存会根据系统的工作负荷进行自动调整。我们可以对交换分区的处理方式和内存脏页进行调整,从而让 Kafka 更好地处理工作负载。对于大多数依赖吞吐量的应用程序来说,要尽量避免内存交换。内存页和磁盘之间的交换对 Kafka 各方面的性能都有重大影响。Kafka 大量地使用系统页面缓存,如果虚拟内存被交换到磁盘,说明已经没有多余内存可以分配给页面缓存了。

    一种避免内存交换的方法是不设置任何交换分区。内存交换不是必需的,不过它确实能够在系统发生灾难性错误时提供一些帮助。进行内存交换可以防止操作系统由于内存不足而突然终止进程。基于上述原因,建议把 vm.swappiness 参数的值设置得小一点,比如 1。该参数指明了虚拟机的子系统将如何使用交换分区,而不是只把内存页从页面缓存里移除。要优先考虑减小页面缓存,而不是进行内存交换。

    脏页会被冲刷到磁盘上,调整内核对脏页的处理方式可以让我们从中获益。Kafka 依赖 I/O 性能为生产者提供快速的响应。这就是为什么日志片段一般要保存在快速磁盘上,不管是单个快速磁盘(如 SSD)还是具有 NVRAM 缓存的磁盘子系统(如 RAID)。这样一来,在后台刷新进程将脏页写入磁盘之前,可以减少脏页的数量,这个可以通过将 vm.dirty_background_ratio 设为小于 10 的值来实现。该值指的是系统内存的百分比,大部分情况下设为 5 就可以了。它不应该被设为 0,因为那样会促使内核频繁地刷新页面,从而降低内核为底层设备的磁盘写入提供缓冲的能力。

    通过设置 vm.dirty_ratio 参数可以增加被内核进程刷新到磁盘之前的脏页数量,可以将它设为大于 20 的值(这也是系统内存的百分比)。这个值可设置的范围很广,60~80 是个比较合理的区间。不过调整这个参数会带来一些风险,包括未刷新磁盘操作的数量和同步刷新引起的长时间 I/O 等待。如果该参数设置了较高的值,建议启用 Kafka 的复制功能,避免因系统崩溃造成数据丢失。

    为了给这些参数设置合适的值,最好是在 Kafka 集群运行期间检查脏页的数量,不管是在生存环境还是模拟环境。可以在 /proc/vmstat 文件里查看当前脏页数量。

    这些都是可控制的选项,根据工作负载和数据,你可以决定如何设置它们:

    $ sysctl -a | grep dirty
    vm.dirty_background_bytes = 0
    vm.dirty_background_ratio = 10
    vm.dirty_bytes = 0
    vm.dirty_ratio = 20
    vm.dirty_writeback_centisecs = 500
    vm.dirty_expire_centisecs = 3000
    vm.dirtytime_expire_seconds = 43200
    
    • vm.dirty_background_ratio 是内存可以填充脏数据的百分比。这些脏数据稍后会写入磁盘,后台进程会稍后清理脏数据。比如,我有32G内存,那么有3.2G的脏数据可以待着内存里,超过3.2G的话就会有后台进程来清理。
    • vm.dirty_ratio是可以用脏数据填充的绝对最大系统内存量,当系统到达此点时,必须将所有脏数据提交到磁盘,同时所有新的I/O块都会被阻塞,直到脏数据被写入磁盘。这通常是长I/O卡顿的原因,但这也是保证内存中不会存在过量脏数据的保护机制。
    • vm.dirty_background_bytesvm.dirty_bytes是另一种指定这些参数的方法。如果设置_bytes版本,则_ratio版本将变为0,反之亦然。
    • vm.dirty_expire_centisecs 指定脏数据能存活的时间。在这里它的值是30秒。当 pdflush/flush/kdmflush 在运行的时候,他们会检查是否有数据超过这个时限,如果有则会把它异步地写到磁盘中。毕竟数据在内存里待太久也会有丢失风险。
    • vm.dirty_writeback_centisecs 指定多长时间 pdflush/flush/kdmflush 这些进程会唤醒一次,然后检查是否有缓存需要清理。

    可以通过下面方式看内存中有多少脏数据:一共有106页的脏数据:

    $ cat /proc/vmstat | egrep "dirty|writeback"
    nr_dirty 106
    nr_writeback 0
    nr_writeback_temp 0
    nr_dirty_threshold 3934012
    nr_dirty_background_threshold 1964604

    方法2:增加缓存

    在某些情况下,显著提高缓存对性能有积极的影响。在这些情况下,Linux客户机上包含的数据不是关键的,可能会丢失,而且应用程序通常会重复或以可重复的方式写入相同的文件。理论上,通过允许内存中存在更多脏页,你将在缓存中一遍又一遍地重写相同的块,只需要每隔一段时间向实际磁盘写一次。为此,我们提出了以下参数:

    vm.dirty_background_ratio = 50
    vm.dirty_ratio = 80
    

    有时候还会提高vm.dirty_expire_centisecs 这个参数的值,来允许脏数据更长时间地停留。除了增加数据丢失的风险之外,如果缓存已满并需要同步,还会有长时间I/O卡顿的风险,因为在大型虚拟机缓存中有大量数据。

    方法3:增减都用

    有时候系统需要应对突如其来的高峰数据,它可能会拖慢磁盘。比如说:每小时或者午夜进行批处理作业、在Raspberry Pi上写SD卡等等。这种情况下,我们可以允许大量的写I/O存储在缓存中,这样后台刷新操作就可以慢慢异步处理它:

    vm.dirty_background_ratio = 5
    vm.dirty_ratio = 80
    

    这个时候,系统后台进程在脏数据达到5%时就开始异步清理,但在80%之前系统不会强制同步写磁盘。在此基础上,你只需要调整RAMvm.dirty_ratio大小以便能缓存所有的写数据。当然,磁盘上的数据一致性也存在一定风险。

    无论你选择哪种方式,都应该始终收集数据来支持你的更改,并帮助你确定是在改进还是变得更糟。我们可以从应用程序,/proc/vmstat/proc/meminfoiostatvmstat 以及/proc/sys/vm里面获得大量有用信息。

    为什么不把 vm.swappiness 设为零

    先前,人们建议尽量把 vm.swapiness 设为 0,它意味着“除非发生内存溢出,否则不要进行内存交换”。直到 Linux 内核 3.5-rc1 版本发布,这个值的意义才发生了变化。这个变化被移植到其他的发行版上,包括 Red Hat 企业版内核 2.6.32-303。在发生变化之后,0 意味着“在任何情况下都不要发生交换”。所以现在建议把这个值设为 1。

    磁盘

    除了选择合适的磁盘硬件设备和使用 RAID 外,文件系统是影响性能的另一个重要因素。有很多种文件系统可供选择,不过对于本地文件系统来说,EXT4(第四代可扩展文件系统)和 XFS 最为常见。近来,XFS 成为很多 Linux 发行版默认的文件系统,因为它只需要做少量调优就可以承担大部分的工作负荷,比 EXT4 具有更好的表现。EXT4 也可以做得很好,但需要做更多的调优,存在较大的风险。其中就包括设置更长的提交间隔(默认是 5),以便降低刷新的频率。EXT4 还引入了块分配延迟,一旦系统崩溃,更容易造成数据丢失和文件系统毁坏。XFS 也使用了分配延迟算法,不过比 EXT4 的要安全些。XFS 为 Kafka 提供了更好的性能,除了由文件系统提供的自动调优之外,无需额外的调优。批量磁盘写入具有更高的效率,可以提升整体的 I/O 吞吐量。

    不管使用哪一种文件系统来存储日志片段,最好要对挂载点的 noatime 参数进行合理的设置。文件元数据包含 3 个时间戳:创建时间(ctime)、最后修改时间(mtime)以及最后访问时间(atime)。默认情况下,每次文件被读取后都会更新 atime,这会导致大量的磁盘写操作,而且 atime 属性的用处不大,除非某些应用程序想要知道某个文件在最近一次修改后有没有被访问过(这种情况可以使用 realtime )。Kafka 用不到该属性,所以完全可以把它禁用掉。为挂载点设置 noatime 参数可以防止更新 atime,但不会影响 ctime 和 mtime。

    网络

    默认情况下,系统内核没有针对快速的大流量网络传输进行优化,所以对于应用程序来说,一般需要对 Linux 系统的网络栈进行调优,以实现对大流量的支持。实际上,调整 Kafka 的网络配置与调整其他大部分 Web 服务器和网络应用程序的网络配置是一样的。首先可以对分配给 socket 读写缓冲区的内存大小作出调整,这样可以显著提升网络的传输性能。socket 读写缓冲区对应的参数分别是 net.core.wmem_default 和 net.core.rmem_default ,合理的值是 131 072(也就是 128KB)。读写缓冲区最大值对应的参数分别是 net.core.wmem_max 和 net.core.rmem_max ,合理的值是 2 097 152(也就是 2MB)。要注意,最大值并不意味着每个 socket 一定要有这么大的缓冲空间,只是说在必要的情况下才会达到这个值。

    除了设置 socket 外,还需要设置 TCP socket 的读写缓冲区,它们的参数分别是 net.ipv4.tcp_wmem 和 net.ipv4.tcp_rmem 。这些参数的值由 3 个整数组成,它们使用空格分隔,分别表示最小值、默认值和最大值。最大值不能大于 net.core.wmem_max 和 net.core.rmem_max 指定的大小。例如,“4096 65536 2048000”表示最小值是 4KB、默认值是 64KB、最大值是 2MB。根据 Kafka 服务器接收流量的实际情况,可能需要设置更高的最大值,为网络连接提供更大的缓冲空间。

    还有其他一些有用的网络参数。例如,把 net.ipv4.tcp_window_scaling 设为 1,启用 TCP 时间窗扩展,可以提升客户端传输数据的效率,传输的数据可以在服务器端进行缓冲。把 net.ipv4.tcp_max_syn_backlog 设为比默认值 1024 更大的值,可以接受更多的并发连接。把 net.core.netdev_max_backlog 设为比默认值 1000 更大的值,有助于应对网络流量的爆发,特别是在使用千兆网络的情况下,允许更多的数据包排队等待内核处理。

    垃圾回收器选项

    为应用程序调整 Java 垃圾回收参数就像是一门艺术,我们需要知道应用程序是如何使用内存的,还需要大量的观察和试错。幸运的是,Java 7 为我们带来了 G1 垃圾回收器,让这种状况有所改观。在应用程序的整个生命周期,G1 会自动根据工作负载情况进行自我调节,而且它的停顿时间是恒定的。它可以轻松地处理大块的堆内存,把堆内存分为若干小块的区域,每次停顿时并不会对整个堆空间进行回收。

    正常情况下,G1 只需要很少的配置就能完成这些工作。以下是 G1 的两个调整参数。

    MaxGCPauseMillis :

    该参数指定每次垃圾回收默认的停顿时间。该值不是固定的,G1 可以根据需要使用更长的时间。它的默认值是 200ms。也就是说,G1 会决定垃圾回收的频率以及每一轮需要回收多少个区域,这样算下来,每一轮垃圾回收大概需要 200ms 的时间。

    InitiatingHeapOccupancyPercent :

    该参数指定了在 G1 启动新一轮垃圾回收之前可以使用的堆内存百分比,默认值是 45。也就是说,在堆内存的使用率达到 45% 之前,G1 不会启动垃圾回收。这个百分比包括新生代和老年代的内存。

    Kafka 对堆内存的使用率非常高,容易产生垃圾对象,所以可以把这些值设得小一些。如果一台服务器有 64GB 内存,并且使用 5GB 堆内存来运行 Kafka,那么可以参考以下的配置:MaxGCPauseMillis 可以设为 20ms;InitiatingHeapOccupancyPercent 可以设为 35,这样可以让垃圾回收比默认的要早一些启动。

    Kafka 的启动脚本并没有启用 G1 回收器,而是使用了 Parallel New 和 CMS( Concurrent Mark-Sweep,并发标记和清除)垃圾回收器。不过它可以通过环境变量来修改。本章前面的内容使用 start 命令来修改它:

    # export JAVA_HOME=/usr/java/jdk1.8.0_51
    # export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC
    -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
    -XX:+DisableExplicitGC -Djava.awt.headless=true"
    # /usr/local/kafka/bin/kafka-server-start.sh -daemon
    /usr/local/kafka/config/server.properties
    #
    

    数据中心布局

    在开发阶段,人们并不会太关心 Kafka 服务器在数据中心所处的物理位置,因为即使集群在短时间内出现局部或完全不可用,也不会造成太大影响。但是,在生产环境,服务不可用意味着金钱的损失,具体表现为无法为用户提供服务或者不知道用户正在做什么。这个时候,使用 Kafka 集群的复制功能就变得尤为重要(请参考第 6 章),而服务器在数据中心所处的物理位置也变得重要起来。如果在部署 Kafka 之前没有考虑好这个问题,那么在后续的维护过程中,移动服务器需要耗费更高的成本。

    在为 broker 增加新的分区时,broker 并无法获知机架的信息。也就是说,两个 broker 有可能是在同一个机架上,或者在同一个可用区域里(如果运行在像 AWS 这样的的云服务上),所以,在为分区添加副本的时候,这些副本很可能被分配给同一个机架上的 broker,它们使用相同的电源和网络连接。如果该机架出了问题,这些分区就会离线,客户端就无法访问到它们。更糟糕的是,如果发生不完整的主节点选举,那么在恢复时就有可能丢失数据(第 6 章将介绍更多细节)。

    所以,最好把集群的 broker 安装在不同的机架上,至少不要让它们共享可能出现单点故障的基础设施,比如电源和网络。也就是说,部署服务器需要至少两个电源连接(两个不同的回路)和两个网络交换器(保证可以进行无缝的故障切换)。除了这些以外,最好还要把 broker 安放在不同的机架上。因为随着时间的推移,机架也需要进行维护,而这会导致机器离线(比如移动机器或者重新连接电源)。

    共享Zookeeper

    Kafka 使用 Zookeeper 来保存 broker、主题和分区的元数据信息。对于一个包含多个节点的 Zookeeper 群组来说,Kafka 集群的这些流量并不算多,那些写操作只是用于构造消费者群组或集群本身。实际上,在很多部署环境里,会让多个 Kafka 集群共享一个 Zookeeper 群组(每个集群使用一个 chroot 路径)。
    不过,消费者和 Zookeeper 之间还是有一个值得注意的地方,消费者可以选择将偏移量提交到 Zookeeper 或 Kafka,还可以选择提交偏移量的时间间隔。如果消费者将偏移量提交到 Zookeeper,那么在每个提交时间点上,消费者将会为每一个消费的分区往 Zookeeper 写入一次偏移量。合理的提交间隔是 1 分钟,因为这刚好是消费者群组的某个消费者发生失效时能够读取到重复消息的时间。值得注意的是,这些提交对于 Zookeeper 来说流量不算小,特别是当集群里有多个消费者的时候。如果 Zookeeper 群组无法处理太大的流量,就有必要使用长一点的提交时间间隔。不过不管怎样,还是建议使用最新版本的 Kafka,让消费者把偏移量提交到 Kafka 服务器上,消除对 Zookeeper 的依赖。

    虽然多个 Kafka 集群可以共享一个 Zookeeper 群组,但如果有可能的话,不建议把 Zookeeper 共享给其他应用程序。Kafka 对 Zookeeper 的延迟和超时比较敏感,与 Zookeeper 群组之间的一个通信异常就可能导致 Kafka 服务器出现无法预测的行为。这样很容易让多个 broker 同时离线,如果它们与 Zookeeper 之间断开连接,也会导致分区离线。这也会给集群控制器带来压力,在服务器离线一段时间之后,当控制器尝试关闭一个服务器时,会表现出一些细小的错误。其他的应用程序因重度使用或进行不恰当的操作给 Zookeeper 群组带来压力,所以最好让它们使用自己的 Zookeeper 群组。

    生产者配置

    生产者有很多可配置的参数,在 Kafka 文档里都有说明,它们大部分都有合理的默认值,所以没有必要去修改它们。不过有几个参数在内存使用、性能和可靠性方面对生产者影响比较大,接下来我们会一一说明。

    acks

    acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。该参数有如下选项。

    • 如果 acks=0 ,生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
    • 如果 acks=1 ,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
    • 如果 acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行(第 5 章将讨论更多的细节)。不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。

    buffer.memory

    该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候, send() 方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffer.full 参数(在 0.9.0.0 版本里被替换成了 max.block.ms ,表示在抛出异常之前可以阻塞一段时间)。

    compression.type

    默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy 、gzip 或 lz4 ,它指定了消息被发送给 broker 之前使用哪一种压缩算法进行压缩。snappy 压缩算法由 Google 发明,它占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

    retries

    生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。

    batch.size

    当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。

    linger.ms

    该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。

    max.in.flight.requests.per.connection

    该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。

    timeout.msrequest.timeout.ms 和 metadata.fetch.timeout.ms

    request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间,metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回一个错误。

    max.block.ms

    该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

    max.request.size

    该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes ),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。

    receive.buffer.bytes 和 send.buffer.bytes

    这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

    顺序保证

    Kafka 可以保证同一个分区里的消息是有序的。也就是说,如果生产者按照一定的顺序发送消息,broker 就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。在某些情况下,顺序是非常重要的。例如,往一个账户存入 100 元再取出来,这个与先取钱再存钱是截然不同的!不过,有些场景对顺序不是很敏感。
    如果把 retries 设为非零整数,同时把 max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。
    一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把 retries 设为 0。可以把 max.in.flight.requests.per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给 broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

    消费者配置

    fetch.min.bytes

    该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。

    fetch.max.wait.ms

    我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 feth.max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。如果 fetch.max.wait.ms 被设为 100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返回 1MB 数据,要么在 100ms 后返回所有可用的数据,就看哪个条件先得到满足。

    max.partition.fetch.bytes

    该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB,也就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes 指定的字节。如果一个主题有 20 个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用 poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把 max.partition.fetch.bytes 值改小,或者延长会话过期时间。

    session.timeout.ms

    该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与 heartbeat.interval.ms 紧密相关。heartbeat.interval.ms 指定了 poll() 方法向协调器发送心跳的频率,session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 1s。把 session.timeout.ms 值设得比默认值小,可以更快地检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置得大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。

    auto.offset.reset

    该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是 latest ,意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是 earliest ,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录。

    enable.auto.commit

    该属性指定了消费者是否自动提交偏移量,默认值是 true 。为了尽量避免出现重复数据和数据丢失,可以把它设为 false ,由自己控制何时提交偏移量。如果把它设为 true ,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。

    partition.assignment.strategy

    我们知道,分区会被分配给群组里的消费者。PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略。可以通过设置partition.assignment.strategy 来选择分区策略。默认使用的是 org.apache.kafka.clients.consumer.RangeAssignor ,这个类实现了 Range 策略,不过也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor 。我们还可以使用自定义策略,在这种情况下,partition.assignment.strategy 属性的值就是自定义类的名字。
    max.poll.records

    Range
    该策略会把主题的若干个连续的分区分配给消费者。假设消费者 C1 和消费者 C2 同时订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了 Range 策略,而且分区数量无法被消费者数量整除,就会出现这种情况。

    RoundRobin
    该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1 和消费者 C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。

    max.poll.records

    该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。

    receive.buffer.bytes 和 send.buffer.bytes

    socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

    以上内容摘抄至:《Kafka权威指南》

    展开全文
  • 所使用集群架构组件及版本信息如下: 名称 版本号 filebeat 7.10.0 kafka 2.4.1 zookeeper 3.5.7 logstash 7.5.2 elasticsearch 7.5.2 kibana 7.5.2

    一.背景:

    最近发现生产ELK集群中的Logstash服务器内存资源和CPU负载经常性跑高的情况,同时考虑到Logstash节点为单点部署没有容灾特性,需增加一个Logstash节点,并同时对Kafka做相关调优,调整每个topic的分区数partition和副本数replica,提升集群的吞吐能力和容灾能力。

    二.系统环境:

    所使用集群架构组件及版本信息如下:

    操作系统OS版本:CentOS Linux release 7.9.2009 (Core) 

    名称  版本号
    filebeat7.10.0
    kafka2.4.1
    zookeeper3.5.7
    logstash7.5.2
    elasticsearch7.5.2
    kibana7.5.2

    三.实施步骤:

    以下涉及到的操作输入命令根据实际情况路径和文件名修改

    1.Kafka调优:

    (1)首先备份原有配置文件:

    cp -a server.properties server.properties.bak

    (2)然后修改server.properties配置文件里的

    打开自动创建标题topic的选项(上图划红线处),否则后续出现新的项目的时候,生产者传递过来的数据不会自动创建对应的标题,自动指定该标题的分区partition和副本replica,配合后续参数项使用

    (3)调整用于负责网络收发数据和处理数据读写请求的线程数,这两个参数需要根据实际的网络带宽情况和磁盘的io性能设置

    (4) 调整kafka中生成除内置标题topic之外标题的分区数partition,副本数replica,副本复制成功最小确认数min.insync.replica,副本复制成功反馈机制ack,标题offset的分区数,副本数,等参数项

     (5)在调整标题topic的分区数num.partition后,需要让系统把数据分发和同步到新的分区中,形成副本集,所以这里要调整触发重新分发文件的时间为1个小时,以便尽快让数据均衡到每个新的节点,所以把log.retention.hours调整由24调整为1

    (6)以上修改配置文件的步骤在kafka集群所有节点上都执行一遍,确保集群内节点的配置一致

    (7)Kafka的配置文件修改完毕后,先不要重启kafka,否则会出现当前运行集群的数据和配置文件不匹配的错误,所以需要首先手动调整原有的标题topic的分区数partition和副本数replica,首先list一下现有分区

    /usr/local/elk/src/kafka-2.4.1/bin/kafka-topics.sh --list --zookeeper localhost:2181

     

     其中除了__consumer_offsets为kafka内置标题topic不可进行在线调整其分区数外(副本数可以在线调整),其余均为用户创建的标题,可以通过kafka提供的脚本工具在线调整,见下文

    (8)接着首先调整内置标题__consumer_offsets,由于其分区partition默认为50且不可调整,所以只能通过脚本工具bin/kafka-reassign-partitions.sh调整其副本数replica,首先根据步骤4中配置文件的定义,确定其副本数为3,然后可以编写json文件定义其副本配置如下:

     共50个分区,所以需要有50个配置块,编辑完文件后保存为partition-topic-test3.json然后使用如下命令调整副本数

    /usr/local/elk/src/kafka-2.4.1/bin/kafka-reassign-partitions.sh --reassignment-json-file /root/partitions-topic-test3.json --zookeeper localhost:2181 --execute

    之后使用下面命令确认副本调整情况,确认每一个分区状态都已调整成功才算调整完毕

    /usr/local/elk/src/kafka-2.4.1/bin/kafka-reassign-partitions.sh --reassignment-json-file /root/partitions-topic-test3.json --zookeeper localhost:2181 --verify

    (9)处理完内置标题后,接着来处理其他标题,根据步骤(4)中配置文件的定义,普通标题topic的默认分区数为8,然后使用下图的脚本工具在线调整其分区数为8,如下

    /usr/local/elk/src/kafka-2.4.1/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mms-admin-test --partitions 8

    每一个标题topic都执行以上操作,确保所有标题分区调整完毕

    (10)然后和步骤8类似,同样编写partitions-topic-test.json文件定义所有标题topic的分区数和副本数的配置,确保每一个topic都有8个配置块,每个配置块有0,1,2三个副本数,代表8个分区数的副本分配情况。其中分区partition编号从0开始,副本数编号统一为0,1,2,代表每个分区的副本分散在3个Kafka节点上,形成冗余备份

    (11)之后同样使用下图中的脚本工具加载上一步中编写的json文件,调整副本数replica,之后进行确认

    /usr/local/elk/src/kafka-2.4.1/bin/kafka-reassign-partitions.sh --reassignment-json-file /root/partitions-topic-test.json --zookeeper localhost:2181 --execute

     

    确认副本数调整情况:

    /usr/local/elk/src/kafka-2.4.1/bin/kafka-reassign-partitions.sh --reassignment-json-file /root/partitions-topic-test.json --zookeeper localhost:2181 --verify

    (12) 现有标题topic的分区数partition和副本数replica全部调整完毕后(注意在集群的其中一台节点上执行一遍调整即可),即可重启kafka集群

     2.Logstash调优:

    (1)调整logstash的标题topic分区消费策略为轮询,默认消费策略为按范围range,添加如下图箭头所示的配置段

     

    注意有多个input配置段则都需要添加上该配置段。接着调整logstash针对单个标题topic的消费线程数,这里只要保持 “logstash的个数” x “线程总数”=“单个标题topic的总分区数partition” 即可,当前有2个logstash同时工作(消费),则单个logstash的线程数调整为4。这样的配置可以使logstash消费性能达到最大。这也是开头Kafka的配置文件中分区数调整为8的原因(见Kafka调优步骤(4))。性能相关的配置项见下图箭头所示:

     修改完所有Logstash节点后,等logstash自动加载配置文件即可,若没有配置自动重载,则手动重启所有的logstash服务即可

    四.调优结果:

    经过调优后,消费者负载已达到均衡,采集的事件数较为接近,如下图:

    最后把所有kafka节点的配置文件server.properties里的数据均衡分发时间间隔log.retention.hours还原回24小时,避免频繁触发数据同步,增加系统资源花销,影响kafka集群的正常使用,具体操作步骤详见Kafka调优中的步骤(5),最后重启整个kafka集群

    展开全文
  • 之前讲述了spark如何从kafka中消费数据,这次来将一下spark如何将数据写入到kafka中。 一、spark写消息到kafka中 直接在spark或者sparkstreaming每一批次处理结束后,在rdd.foreachPartition方法体内创建new ...
  • Spark streaming+Kafka调优

    2018-12-24 10:25:15
    Spark streaming+Kafka的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置。 合理的批处理...
  • [Kafka调优]--调优Apache Kafka集群

    千次阅读 2018-04-04 22:09:56
    本文转自:http://www.cnblogs.com/huxi2b/p/6936348.html今天带来一篇译文“调优Apache Kafka集群”,里面有一些观点并无太多新颖之处,但总结得还算详细。该文从四个不同的目标出发给出了各自不同的参数配置,值得...
  • kafka配置调优实践

    2018-11-12 11:18:16
    kafka配置调优实践
  • kafka 调优

    2022-09-25 14:38:57
    kafka 调优 压力测试
  • Kafka调优参数大全

    千次阅读 2022-03-02 14:40:12
    1)生产者 参数名称 描述 replica.lag.time.max.ms ... ISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。... leader.imbalanc
  • Kafka调优(30%)

    2022-08-19 15:18:08
    kafka调优(30%)
  • Kafka参数调优

    千次阅读 2019-07-04 10:46:58
    自己公司用到了kafka作为MQ,在使用的时候需要设置很多参数,这里便来总结一下Kafka参数的一些配置 1、一段Kafka生产端的示例代码 Properties props = new Properties(); props.put("bootstrap.servers", ...
  • kafka-38kafka调优

    2021-11-01 10:25:14
    kafka-38kafka调优
  • Kafka性能调优

    千次阅读 2018-09-10 17:17:00
    kafka作为时下最流行的开源消息系统,被广泛地应用在数据缓冲、异步通信、汇集日志、系统解耦等方面。相比较于RocketMQ等其他常见消息系统,Kafka在保障了大部分功能特性的同时,还提供了超一流的读写性能。 本文将...
  • 背景引入:很多同学看不懂kafka参数 今天给大家聊一个很有意思的话题,大家知道很多公司都会基于Kafka作为MQ来开发一些复杂的大型系统。而在使用Kafka的客户端编写代码与服务器交互的时候,是需要对客户端设置很...
  • spark离线批处理写入kafka调优

    千次阅读 2020-09-10 12:03:29
    采用老版本spark(1.6)在离线批处理环境下,将DataFrame快速写入kafka,通常可以查到的大部分都是针对Spark-Streaming进行Kafka写入的说明,但是 在离线批处理环境下,也希望将批量计算的DataFrame中的数据直接写入到...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 26,674
精华内容 10,669
关键字:

kafka调优