精华内容
下载资源
问答
  • 偏移量
    千次阅读
    2022-06-10 19:17:05

    1、Topic相关:创建Topic、删除Topic、查看Topic详细信息、查看Topic列表、修改topic分区数
    创建Topic

     
    
    1. 创建一个3分区1副本名为test的topic,必须指定分区数 --partitions 和副本数--replication-factor,其中副本数量不能超过kafka节点(broker)数量

    2. ./kafka-topics.sh --zookeeper localhost:2181 --topic test --partitions 3 --replication-factor 1 --create

    • 删除Topic
     
    
    1. # 删除名为test的topic

    2. # 删除topic时只有在kafka安装目录config目录下的server.properties中将delete.topic.enable 设置为true topic才会真实删除,否则只是标记为删除,实则不会删除

    3. ./kafka-topics.sh --zookeeper localhost:2181 --topic test --delete

    查看某个Topic 分区 副本信息

     
    
    1. # 查看名为test的topic的详细信息,分区 副本的数量

    2. ./kafka-topics.sh --zookeeper localhost:2181 --topic test --describe

    •  查看有那些Topic
     
    
    1. # 查看kafka中创建了那些topic

    2. ./kafka-topics.sh --zookeeper localhost:2181 --list

    修改topic的分区数

     
    
    1. # 将名为test的topic 修改为4个分区

    2. # 注意 分区数只能增加不能减少

    3. ./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 4 --topic test

    2、生产者相关:往某个topic中生产数据
    使用命令行往某个topic中写入数据

     
    
    1. # 使用命令行 给名为 test 的topic 中生产数据

    2. # 执行以下命令,然后在命令行中写入要发送kafka的数据回车即可发送数据到kafka

    3. ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

    3、消费者相关:从某个topic中消费数据
    消费某个topic中的最新数据

     
    
    1. # 0.8版本及以下的的kafka 使用如下命令test topic中的数据

    2. ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

    3. # 指定消费10条数据

    4. ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --max-messages 10

    5. # 0.9版本及以上的kafka建议使用如下命令进行消费,当然也可使用上一条命令消费

    6. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

    消费某个topic中最老的数据

     
    
    1. # 0.8版本及以下的的kafka 使用如下命令test topic中的数据

    2. ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    3. # 0.9版本及以上的kafka建议使用如下命令进行消费,当然也可使用上一条命令消费

    4. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    消费某个topic中的数据并指定groupid

     
    
    1. # 在命令行消费某个topic中的数据通过/config/consumer.properties 配置文件指定groupid

    2. # 0.8版本及以下的的kafka 使用如下命令test topic中的数据

    3. ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --consumer.config ../config/consumer.properties

    4. # 0.9版本及以上的kafka建议使用如下命令进行消费,当然也可使用上一条命令消费

    5. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config ../config/consumer.properties

    将/config/consumer.properties配置文件中groupid对应的offset删除,该groupid重置为未使用状态

     
    
    1. # 使用这条命令会从最新消息开始消费,会将之前groupid记录的offset重置,并重新开始记录

    2. ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --consumer.config ../config/consumer.properties --delete-consumer-offsets

    3. # 使用consumer.properties 不可以和--from-beginning一同使用 除非与--delete-consumer-offsets一同使用

    4. # 使用这条命令会从头开始消费数据,会将之前groupid记录的offset重置,并重新开始记录

    5. ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --consumer.config ../config/consumer.properties --delete-consumer-offsets --from beginning

    4、消费组(group)相关:查看消费者group、查看消费者消费情况(消费至那个offset/积压数据量多少)
    查看有那些消费者group

     
    
    1. # 0.8版本及以下的的kafka 使用如下命令查看有那些消费者group

    2. ./kafka-consumer-groups.sh --zookeeper localhost:2181 --list

    3. # 0.9版本及以上的kafka建议使用如下命令查看有那些消费者group,当然也可使用上一条命令消费

    4. ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

    查看某个消费者消费情况(消息队列堆积情况)

     
    
    1. # 0.8版本及以下的的kafka 使用如下命令查看名为testgroup 的消费组的消费情况

    2. ./kafka-consumer-groups.sh --zookeeper localhost:2181 --group testgroup --describe

    3. # 0.9版本及以上的的kafka 使用如下命令查看名为testgroup 的消费组的消费情况

    4. ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testgroup --describe

    5、修改某个消费组的偏移量(offset)
    通过zk客户端对topic的分区修改offset 为任意偏移量

     
    
    1. # 独立安装的zk,进入zookeeper安装目录的bin目录下,使用如下命令进入zk客户端

    2. ./zkCli.sh -server localhost:2181

    3. # 非独立安装的的zk, 直接在kafka安装目录bin目录下,使用如下命令进入zk客户端

    4. ./zookeeper-shell.sh localhost:2181

    5. # 进入zk客户端后可查看某个分区的偏移量 例如名为test的topic的消费者组 test-consumer-group 0分区的offset的消费情况

    6. get /consumers/test-consumer-group/offsets/test/0

    7. # 设置名为test的topic的消费者组 test-consumer-group 0分区的offset 为1000

    8. set /consumers/test-consumer-group/offsets/test/0 1000

    通过kafka内置的kafka.tools.UpdateOffsetsInZK类实现修改某个topic 的消费组(config/consumer.properties中配置的groupid)的所有分区的偏移量为最新(latest)或者最旧(earliest)

     
    
    1. # 将名为test的topic的消费组(groupid必须从consumer.properties获取,即需要将需要修改的groupid写入consumer.properties配置文件)所有分区的offset设置为最早earliest

    2. ./kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest ../config/consumer.properties test

    3. # 将名为test的topic的消费组(groupid必须从consumer.properties获取,即需要将需要修改的groupid写入consumer.properties配置文件)所有分区的offset设置为最新latest

    4. ./kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest ../config/consumer.properties test

    0.11.0.0及以上版本修改偏移量可使用Kafka自带的kafka-consumer-groups.sh脚本

     
    
    1. # 以下可将--zookeeper localhost:2181 更换为--bootstrap-server localhost:9092 高版本的消费者建议连接bootstrap

    2. # 将test topic的消费组test-consumer-group的0分区的偏移量设置为最新

    3. ./kafka-consumer-groups.sh --zookeeper localhost:2181 --group test-consumer-group --topic test:0 --reset-offsets --to-earliest –execute

    4. # 将test topic的消费组test-consumer-group的0和1分区的偏移量设置为最旧

    5. ./kafka-consumer-groups.sh --zookeeper localhost:2181 --group test-consumer-group --topic test:0,1 --reset-offsets --to-latest –execute

     
    
    1. # 将test topic的消费组test-consumer-group的所有分区的偏移量设置为1000

    2. ./kafka-consumer-groups.sh --zookeeper localhost:2181 --group test-consumer-group --topic test --reset-offsets --to-offset 1000 –execute

    3. # --reset-offsets后可以跟的其他用法:--to-current:把位移调整到分区当前位移

    4. # --reset-offsets后可以跟的其他用法:--shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动

    5. # --reset-offsets后可以跟的其他用法:--to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000

    更多相关内容
  • 通过四个属性可以获得元素的偏移量: 1、offsetHeight: 元素在垂直方向上占用的空间的大小,(像素)。包括元素的高度,(可见的)水平滚动条的高度,上边框高度和下边框高度。 2、offsetWidth:元素在水平方向上占用...
  • 而获取偏移量可以直接获取相对于document的偏移量,也可以获取相对与视口的偏移量(viewpoint)加上页面滚动量(scroll)获得。 1.获取相对与document的偏移量 function getOffsetSum(ele){ var top= 0,left=0; while...
  • 本文主要讨论 limit 分页大偏移量慢的原因及优化方案,为了模拟这种情况,下面首先介绍表结构和执行的 SQL。 场景模拟 建表语句 user 表的结构比较简单,id、sex 和 name,为了让 SQL 的执行时间变化更加明显,这里...
  • 本文实例为大家分享了android实现选项卡功能,通过计算偏移量,设置tetxview和imageView的对应值,一些color的值读者自己去补充 实现效果图: (1)简单写一个主界面的布局activity_main.xml <?xml version=1.0...
  • 笔记本电脑安装MSSQLSERVER,在文件 'D:\test.mdf' 中、偏移量为 0x00000000150000 的位置执行 读取 期间,。。。。。。。出错提示,亲自测试过能解决。
  • 下面小编就为大家带来一篇js获取元素的偏移量offset简单方法(必看)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • 一个手动管理spark streaming集成kafka时的偏移量到zookeeper中的小项目
  • Android 完美获取手机当前时区,解决时间偏移量和夏令时差问题.从本地时间里扣除这些变量,即可以取得UTC时间
  • 单一的 InSAR 观测技术可提取地表沿雷达视线方向(LOS)上的一维位移,而利用 SAR 影像配准过程中的同名像素偏移量可提取地表沿雷达方位向(近南北向)与距离向(近东西向)的二维形变场,与 LOS方向的一维形变形成优势互补...
  • 在使用C语言对STM32编程的过程中,经常使用到结构体,本文介绍了一种得到结构体中一个field的偏移量的方法。
  • 根据SAR干涉测量原理,以配准偏移量为观测值,采用非线性最小二乘迭代法对干涉对的基线进行估计,并以ENVISAT卫星在西藏地区获取的两景SAR数据为例,对此基线估计算法进行实验验证。结果表明,该算法能有效地估计...
  • 应用随机振动理论,通过对轮对输入轨道谱和解动力学方程得到准高速车辆最大横向振动偏移量,与线路动力学试验结果基本一致。用同样的方法再计算200 -350km/h运行时的车体横向振动偏移量,即可作为制定高速机车车辆限界...
  • Android 完美获取手机当前时区,解决时间偏移量和夏令时差问题.从本地时间里扣除这些变量,解决时间偏移量和夏令时差问题.从本地时间里扣除这些变量,
  • 针对现有基于偏移量计算的在线GPS轨迹数据压缩算法不能有效评估关键点的问题,提出基于偏移量计算的在线GPS轨迹数据压缩算法——关键点前继修正算法(KPFA)。该算法通过计算同步欧式距离(SED)累积偏移量来发现...
  • 提出了一种无需外部数据的新同震形变偏移量计算流程。以2003年伊朗巴姆里氏6.6级地震为例,利用两景震前无形变数据证明了新方法的精度较传统方法提高了0.224个像素。利用震前震后两景数据计算了方位向同震形变场,与...
  • 偏移量管理主要是指管理每个消息队列的消费进度:集群模式消费下会将消息队列的消费进度保存在Broker端,广播模式消费下消息队列的消费进度保存在消费者本地。 组件分析:RocketMQ定义了一个接口OffsetStore。它的...

    1 客户端逻辑

    1.1 概述

    偏移量管理主要是指管理每个消息队列的消费进度:集群模式消费下会将消息队列的消费进度保存在Broker端,广播模式消费下消息队列的消费进度保存在消费者本地。

    组件分析:RocketMQ定义了一个接口OffsetStore。它的实现类有两个:RemoteBrokerOffsetStoreLocalFileOffsetStore前者主要是集群消费模式下使用,即与broker进行打交道,将消息队列的消费偏移量通过网络传递给Broker;后者主要是广播消费模式下使用,即直接将消费偏移量存储在消费者所在的本地中。入下图所示:

    在这里插入图片描述

    offsetstore保存在消费者内部客户单ConsumerInner的实现类中的,其初始化创建的时机在内部客户端的start()方法中。

    switch (this.defaultMQPushConsumer.getMessageModel()) {
        // 广播模式偏移量持久化为本地
        case BROADCASTING:
            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        // 集群模式下,偏移量持久化方式为远程
        case CLUSTERING:
            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        default:
            break;
    }
    

    下面主要分析RemoteBrokerOffsetStore的逻辑。

    主要是两个逻辑,如下图所示

    • 将消息偏移量更新到本地内存中管理消息偏移量的组件
    • 将内存中保存的消息偏移量发送给Broker,更新Broker端保存的消息偏移量

    在这里插入图片描述

    1.2 更新消息队列的偏移量

    updateOffset
    并发消息消费服务中ConsumeMessageConcurrentlyService#processConsumeResult()处理消息消费结果的方法中在消息处理完成以后会调用更新消息队列的偏移量

    // 获取偏移量存储实现,并调用其更新偏移量方法更新偏移量
    
    this.defaultMQPushConsumerImpl.getOffsetStore()
    .updateOffset(consumeRequest.getMessageQueue(), offset, true);
    

    下面是RemoteBrokerOffsetStore的更新逻辑
    将已经确认消费了的偏移量存储偏移量管理器中。此处的更新仅仅是更新了保存每个消息队列的偏移量的map中的值,并没有将偏移量上传到broker。

    
    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
        if (mq != null) {
            // ConcurrentMap<MessageQueue, AtomicLong>
            // 获取消息队列对应的偏移量
            AtomicLong offsetOld = this.offsetTable.get(mq);
            if (null == offsetOld) {
                // 更新table
                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
            }
    
            if (null != offsetOld) {
                // 是否是只增模式
                if (increaseOnly) {
                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
                } else {
                    offsetOld.set(offset);
                }
            }
        }
    }
    
    

    1.3 向Broker发送消息偏移量

    向服务端发送消息偏移量是通过MQClientInstance中启动的一个定时任务来完成的。

    1 在其startScheduledTask方法中开启下列定时任务

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
        @Override
        public void run() {
            try {
                // 对已消费的消息的偏移量进行持久化
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    

    2 调用MQClientInstancepersisAllConsumerOffset()方法

    private void persistAllConsumerOffset() {
        // 获取所有消费者组对应的内部客户端
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            // 调用内部客户端进行持久化
            impl.persistConsumerOffset();
        }
    }
    

    3 调用内部消费者客户端的持久化方法

    public void persistConsumerOffset() {
        try {
            this.makeSureStateOK();
            Set<MessageQueue> mqs = new HashSet<MessageQueue>();
            // 获取所有的分配的消息队列
            Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
            mqs.addAll(allocateMq);
            // 持久化偏移量
            this.offsetStore.persistAll(mqs);
        } catch (Exception e) {
            log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
        }
    }
    

    4 调用偏移量管理器的更新

    public void persistAll(Set<MessageQueue> mqs) {
        if (null == mqs || mqs.isEmpty())
            return;
    
        final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
    
        // 遍历保存消息队列偏移量的map
        for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
            MessageQueue mq = entry.getKey();
            AtomicLong offset = entry.getValue();
            if (offset != null) {
                if (mqs.contains(mq)) {
                    try {
                        // 更新到
                        this.updateConsumeOffsetToBroker(mq, offset.get());
                        log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
                                this.groupName,
                                this.mQClientFactory.getClientId(),
                                mq,
                                offset.get());
                    } catch (Exception e) {
                        log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                    }
                } else {
                    unusedMQ.add(mq);
                }
            }
        }
    
        if (!unusedMQ.isEmpty()) {
            for (MessageQueue mq : unusedMQ) {
                this.offsetTable.remove(mq);
                log.info("remove unused mq, {}, {}", mq, this.groupName);
            }
        }
    }   
    

    接下来就是通过网络层发送网络请求给Broker进行更新消息对立偏移量。

    1.4 读取消息队列的偏移量

    两个时刻需要获取Broker保存的偏移量

    • 消费者刚启动的时候会去Broker获取消息队列对应的偏移量
    • 消费者重平衡后,分配得到新的消息队列,也要重新获取偏移量

    readOffset
    在DefaultMQPushConsumerImpl的pullMessage方法中
    在消费之前会读取一次

    commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
    

    2 服务端的处理逻辑

    服务端注册了的消费消息偏移量的请求处理器,首先是有关偏移量的三个请求码

    • GET_CONSUMER_LIST_BY_GROUP:根据组名获取消费者列表
    • UPDATE_CONSUMER_OFFSET:更新消费偏移量的请求
    • QUERY_CONSUMER_OFFSET:查询消费者的偏移量

    所以这三个的请求码将交给ConsumerManageProcessor来进行处理。

    ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
    this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
    this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    

    2.1 更新消费者传给broker的消费偏移量

    内存存储方式

    • 位置:ConsumerOffsetManageroffsetTable
    • 格式:ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>>,第一层key是主题+消费者组,集群模式下的消费模式;第二层的keyQueueID队列ID

    外部存储位置:

    下图表示整个更新消费偏移量和持久化的过程;整体流程: 先更新内存中的存储offetTable,然后通过一个持久化的线程将offsetTable中的数据落盘持久化。

    在这里插入图片描述

    2.2 源码分析

    2.2.1 处理偏移量更新请求和更新到内存中的流程

    1 请求处理的入口

    // RocketMQ里面的通用做法,发送请求时将给请求赋值一个请求码;
    // 服务端在接收到请求的时候将根据请求码选择不同的请求处理处理器;
    // 统一的接口processRequest()
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
        // ConsuemrManagerProcessor内部又分了不同的处理逻辑
        switch (request.getCode()) {
            // 处理
            case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
                return this.getConsumerListByGroup(ctx, request);
            // 处理更新偏移量
            case RequestCode.UPDATE_CONSUMER_OFFSET:
                return this.updateConsumerOffset(ctx, request);
            case RequestCode.QUERY_CONSUMER_OFFSET:
                return this.queryConsumerOffset(ctx, request);
            default:
                break;
        }
        return null;
    }
    

    2 处理更新消费偏移量的入口

    private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
        // 首先创建响应,RocketMQ中惯例做法,具体可参照
        final RemotingCommand response =
                RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
        // 解码请求头
        final UpdateConsumerOffsetRequestHeader requestHeader =
                (UpdateConsumerOffsetRequestHeader) request
                        .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
        // 调用消费偏移量偏移器进行更新消费偏移量
        this.brokerController.getConsumerOffsetManager()
                .commitOffset(
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.getConsumerGroup(), // 消费者组
                        requestHeader.getTopic(), // 主题
                        requestHeader.getQueueId(), // 队列ID
                        requestHeader.getCommitOffset()); // 偏移量
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
    

    3 消费偏移量管理器更新偏移量的入口

    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
        final long offset) {
        // topic@group
        // 构建key: 主题/消费者组名
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        this.commitOffset(clientHost, key, queueId, offset);
    }
    

    4 将消费者端传上来的消费偏移量存储到内存之中的map

    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        // 使用 主题/消费者名 获取存储偏移量的map<queueId, offset>
        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
        if (null == map) {
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
            }
        }
    }
    

    2.2.2 消息偏移量持久化到磁盘

    1、启动定时任务,该定时任务在BrokerController中被启动的;

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                // 持久化偏移量
                BrokerController.this.consumerOffsetManager.persist();
            } catch (Throwable e) {
                log.error("schedule persist consumerOffset error.", e);
            }
        }
    }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    

    2、调用ConsuemerOffsetManager进行偏移量持久化

    public synchronized void persist() {
        // 先进行编码
        String jsonString = this.encode(true);
        if (jsonString != null) {
            // 获取存储文件的路径
            String fileName = this.configFilePath();
            try {
                // 将存储内容存到磁盘
                MixAll.string2File(jsonString, fileName);
            } catch (IOException e) {
                log.error("persist file " + fileName + " exception", e);
            }
        }
    }
    
    展开全文
  • 3.5.5 缓存分区的偏移量

    千次阅读 2021-04-15 22:51:05
    3.5.5 缓存分区的偏移量 消费者提交自己负责分区的偏移量,除了写入服务端(协调节点)内部主题某个分区的日志文件中,还要把这部分数据保存一份到当前服务端的内存中,这样分区的偏移量保存在了磁盘和内存两个地方。...

    3.5.5 缓存分区的偏移量

    消费者提交自己负责分区的偏移量,除了写入服务端(协调节点)内部主题某个分区的日志文件中,还要把这部分数据保存一份到当前服务端的内存中,这样分区的偏移量保存在了磁盘和内存两个地方。偏移量消息的键由消费组、主题、分区组成(GoupTopicPat1tion),消息的值是分区的偏移盘。查询分区的偏移量时给定GoupTop1cPat1hon,会返回分区对应的偏移盘,即分区当前的消费进度。

    由于消费者会周期性地提交偏移量,同一个分区在每次提交时都会产生新的偏移盐。比如分区p。在第一次提交时偏移量为10,在第二次提交时偏移量为20。每次提交偏移盘写入日志文件都采用追加消息的方式。对于写入锺存而言,因为使用Map结构,所以相同分区的偏移量会被覆盖更新。相关代码如下:
    在这里插入图片描述
    缓存的作用是为了方便查询,而且会被重复查询,如果没有重复查询,就没有必要放入缓存。比如,不能把普通的消息内容作为缓存,因为普通消息量很大,而且消费者读取过一次之后一般不会再次读取。

    如表3-3所示,服务端有两种作用域类型的缓存:“所有节点共享”“每个节点独享”。如果是共享数据,贝I]向任意一个服务端节点发送请求,都可以获取到一致的状态(比如主题的元数据),它的特点是和业务逻辑的任何组件都无关。如果是节点独享的数据,节点之间数据不一致,要保证读写请求连接的是同一个节点,才能读取到一致的数据。它的特点是和业务逻辑的某个组件有关,比如消费者提交的分区偏移量和消费组有关。
    在这里插入图片描述

    如图3-29所示,偏移盘请求和消费组有关,客户端只能连接指定的节点,所以是协调节点独享的缓存。而主题元数据(TopicMetadata)和消费组的协调者(GroupCoordinator)因为在每个服务端节点保存的数据都一样,可以请求任何一个节点,所以是所有节点共享的缓存。
    在这里插入图片描述

    我们来讨论一个问题:为什么分区偏移量消息的键由“消费组、主题、分区”组成,而分区方式
    却只由消费组决定?下面我们来循序渐进地回答这个问题。

    首先,要回答消息的键为什么有消费组,而没有消费者。虽然分区是由消费者提交的,但是偏移量消息的任tt不能存在消费者。假设键是GroupConsuJ11erTopicPart1hon,每个消费者提交的偏移量都有向己的标识。比如消费者1提交的偏移盘是G1-C1-T1P0:10,消费者2提交的偏移量是G1-C2-T1P1:20,保存到缓存的数据是[(G1C1T1P0,10),(G1αT1P1,20)]。再平衡后,TlPO分配笋消费者2,在缓存中就不会查询至l]G1CZT1P0的记录;如果TlPl分配给消费者l,也无法查到G1ζ1T1P1的记录。而以消费组存储时缓存的内容是[(G1T1问,10),(G1T1P1,20汀,这样不管是消费者l还是消费者2分配到TIPO,都可以从缓存中读取州TIPO的偏移盘。只要消费组所有消费者都提交了分区的消费进度,再平衡时无论怎么重新分配分区,任何一个消费者都可以查询到任意一个分区的最新消费进度。

    另外,必须要有消费组的原因是,不同的消费组可能会订阅同一个主题。如果只有“主题、分区”作为分区偏移£l消息的键,就无法区分不同的消费组。而实际上,不同消费组,即使主题分区相同,它们的分区偏移盐也可能不同,所以偏移量消息的键需要有“消费组”。

    其次,因为服务端要保存分区的偏移盐,所以消息值是偏移量,其他信息比如主题、分区都放在消息的健中。所以偏移量消息的键由“消费组、主题、分区”3部分组成。

    最后,再来看看为什么分区方式只Fl=I消费组决定的,而不是偏移量消息的键?因为同一个消费组的分区偏移量消息都在同一个协调节点上,为消息进行分区的方式只能是消费组。如果分区方式也是“消费组、主题、分区”,那么只有这3个数据都相同时,内部主题的分区才相同。比如G1T1P0和G1T1P1因为分区不同,内部主题的分区也不同,提交偏移量时就不在同一个协调节点了。而这和前面的“相同消费组的消费者提交偏移量是在同一个协调节点”就发生了矛盾。

    展开全文
  • 李宏毅教授2017年的介绍深度学习理论,介绍在网络模型中的偏移量和变量计算。
  • RocketMQ的物理偏移量和逻辑偏移量 消息存储中CommitLog、ComsumeQueue、IndexFile之间的关系 CommitLog 文件: 消息存储文件,所有主题的消息随着到达 Broker 的顺序写入 CommitLog 文件,每个文件默认为1G,文件...

    RocketMQ的物理偏移量和逻辑偏移量
    在这里插入图片描述

    消息存储中CommitLog、ComsumeQueue、IndexFile之间的关系

    CommitLog 文件:

    消息存储文件,所有主题的消息随着到达 Broker 的顺序写入 CommitLog 文件,每个文件默认为1G,文件的命名也及其巧妙,使用该存储在消息文件中的第一个全局偏移量来命名文件,这样的设计主要是方便根据消息的物理偏移量,快速定位到消息所在的物理文件。RocketMQCommitLog 文件使用顺序写,极大提高了文件的写性能。

    ConsumeQueue 文件:

    消息消费队列文件,是 CommitLog 文件的基于 Topic 的索引文件,主要用于消费者根据 Topic消费消息,其组织方式为 /topic/queue,同一个队列中存在多个文件,ConsumeQueue 设计极具技巧性,其每个条目使用固定长度(8字节 CommitLog 物理偏移量、4字节消息长度、8字节 Tag HashCode),这里不是存储 tag 的原始字符串,而是存储 HashCode,目的就是确保每个条目的长度固定,可以使用访问类似数组下标的方式来快速定位条目,极大的提高了 ConsumeQueue文件的读取性能,试想一下,消息消费者根据Topic、消息消费进度(ConsumeQueue 逻辑偏移量),即第几个 ConsumeQueue 条目,这样根据消费进度去访问消息的方法为使用逻辑偏移量logicOffset* 20即可找到该条目的起始偏移量( ConsumeQueue 文件中的偏移量),然后读取该偏移量后20个字节即得到了一个条目,无需遍历 ConsumeQueue 文件。

    IndexFile 文件:

    基于物理磁盘文件实现 Hash 索引。其文件由40字节的文件头、500W个 Hash 槽,每个Hash槽为4个字节,最后由2000万个 Index 条目,每个条目由20个字节构成,分别为4字节的索引key的 HashCode、8字节消息物理偏移量、4字节时间戳、4字节的前一个Index条目( Hash 冲突的链表结构)。

    展开全文
  • 一、结构体成员偏移量、 二、完整代码示例
  • kafka消费者偏移量提交

    千次阅读 2021-03-17 22:02:54
    自动提交当前偏移量如果客户端属性enable.auto.commit被设为true,那么每过5s,消费者会自动把从poll()方法接收到的最大偏移量提交上去,提交时间间隔由auto.commit.interval.ms控制,默认为5s假设我们使用默认的5s提交...
  • Flink kafka偏移量

    千次阅读 2021-11-22 13:32:31
    文章目录Flink kafka偏移量1 代码2 offset 五种模式2.1 kafkaConsumer.setStartFromGroupOffsets();2.2 kafkaConsumer.setStartFromEarliest();2.3 kafkaConsumer.setStartFromLatest()2.4 kafkaConsumer....
  • NULL 博文链接:https://jdkleo.iteye.com/blog/1934722
  • python文件偏移量

    2022-03-24 21:04:48
    python文件偏移量
  • 如果执行到一半断掉了,消费失败,就会造成数据丢失(生产者已经发出消息,消费者也修改了偏移量,但是消费失败,数据核对不上) (2)如果是定时提交偏移量记录,例如五秒更新一次,当四秒的时候消费服务重启了,这...
  • NULL 博文链接:https://langyu.iteye.com/blog/1483702
  • } 运行程序,输出: 北京时间2021-01-19T08:25:13.162,美国纽约的偏移量:-05:00 北京时间2021-05-05T18:01:01.000,美国纽约(夏令时期间)的偏移量:-04:00 由此可见,纽约这个城市因为有夏令时的存在,因此在...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 508,523
精华内容 203,409
关键字:

偏移量