精华内容
下载资源
问答
  • sqlserver2014分页存储过程
    declare @total int,@pgcount int
    exec splitpage 1,10,@total output,@pgcount output

    create proc [dbo].[splitpage]( @page int, --第@page页 @size int, --每页@size行 @total int output, --总行数@pgcount int output --总页数)asbegin select @total=count(1) from dl_capitalselect @pgcount=CEILING((@total+0.0)/@size) select *,@total as total,@pgcount as pgcountfrom dl_capitalorder by id offset (@page-1)*@size rows fetch next @size rows onlyend;
    
    
    
    
    
    
    展开全文
  • KAFKA OFFSET存储问题

    2018-12-17 11:17:21
    KAFKA OFFSET存储问题 注意:从kafka-0.9版本及以后,kafka的消费者组和offset信息就不存zookeeper了,而是存到broker服务器上,所以,如果你为某个消费者指定了一个消费者组名称(group.id),那么,一旦这个...

    KAFKA OFFSET的存储问题

    注意:从kafka-0.9版本及以后,kafka的消费者组和offset信息就不存zookeeper了,而是存到broker服务器上,所以,如果你为某个消费者指定了一个消费者组名称(group.id),那么,一旦这个消费者启动,这个消费者组名和它要消费的那个topic的offset信息就会被记录在broker服务器上

     

    1.概述
    Kafka版本[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早在 0.8.2.2 版本,已支持存入消费的 offset 到Topic中,只是那时候默认是将消费的 offset 存放在 Zookeeper 集群中。那现在,官方默认将消费的offset存储在 Kafka 的Topic中,同时,也保留了存储在 Zookeeper 的接口,通过 offsets.storage 属性来进行设置。

    2.内容
    其实,官方这样推荐,也是有其道理的。之前版本,Kafka其实存在一个比较大的隐患,就是利用 Zookeeper 来存储记录每个消费者/组的消费进度。虽然,在使用过程当中,JVM帮助我们完成了一些优化,但是消费者需要频繁的去与 Zookeeper 进行交互,而利用ZKClient的API操作Zookeeper频繁的Write其本身就是一个比较低效的Action,对于后期水平扩展也是一个比较头疼的问题。如果期间 Zookeeper 集群发生变化,那 Kafka 集群的吞吐量也跟着受影响。

    在此之后,官方其实很早就提出了迁移到 Kafka 的概念,只是,之前是一直默认存储在 Zookeeper集群中,需要手动的设置,如果,对 Kafka 的使用不是很熟悉的话,一般我们就接受了默认的存储(即:存在 ZK 中)。在新版 Kafka 以及之后的版本,Kafka 消费的offset都会默认存放在 Kafka 集群中的一个叫 __consumer_offsets 的topic中。

    当然,其实她实现的原理也让我们很熟悉,利用 Kafka 自身的 Topic,以消费的Group,Topic,以及Partition做为组合 Key。所有的消费offset都提交写入到上述的Topic中。因为这部分消息是非常重要,以至于是不能容忍丢数据的,所以消息的 acking 级别设置为了 -1,生产者等到所有的 ISR 都收到消息后才会得到 ack(数据安全性极好,当然,其速度会有所影响)。所以 Kafka 又在内存中维护了一个关于 Group,Topic 和 Partition 的三元组来维护最新的 offset 信息,消费者获取最新的offset的时候会直接从内存中获取。

     

     

    kafka 提供三种语义的传递:

                   1至少一次

                   2至多一次

                   3精确一次

      首先在 producer 端保证1和2的语义是非常简单的,至少一次只需要同步确认即可(确认方式分为只需要 leader 确认以及所有副本都确认,第二种更加具有容错性),至多一次最简单只需要异步不断的发送即可,效率也比较高。目前在 producer 端还不能保证精确一次,在未来有可能实现,实现方式如下:在同步确认的基础上为每一条消息加一个主键,如果发现主键曾经接受过,则丢弃

      在 consumer 端,大家都知道可以控制 offset,所以可以控制消费,其实 offset 只有在重启的时候才会用到。在机器正常运行时我们用的是 position,我们实时消费的位置也是 position 而不是 offset。我们可以得到每一条消息的 position。如果我们在处理消息之前就将当前消息的 position 保存到 zk 上即 offset,这就是只多一次消费,因为我们可能保存成功后,消息还没有消费机器就挂了,当机器再打开时此消息就丢失了;或者我们可以先消费消息然后保存 position 到 zk 上即 offset,此时我们就是至少一次,因为我们可能在消费完消息后offset 没有保存成功。而精确一次的做法就是让 position的保存和消息的消费成为原子性操作,比如将消息和 position 同时保存到 hdfs 上 ,此时保存的 position 就称为 offset,当机器重启后,从 hdfs重新读入offset,这就是精确一次。

    • consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。
    • consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”。
    • “精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起。比如用Hadoop ETL处理消息时,将处理后的结果和offset同时保存在HDFS中,这样就能保证消息和offser同时被处理了。
    展开全文
  • Kafka offset存储方式与获取消费实现

    千次阅读 2017-03-05 18:07:58
    1.概述 Kafka版本[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早在 0.8.2.2 版本,已支持存入...那现在,官方默认将消费的offset存储在 Kafka 的Topic中,同
    1.概述
    
    Kafka版本[ 0.10.1.1 ],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早在 0.8.2.2 版本,已支持存入消费的 offset 到Topic中,只是那时候默认是将消费的 offset 存放在 Zookeeper 集群中。那现在,官方默认将消费的offset存储在 Kafka 的Topic中,同时,也保留了存储在 Zookeeper 的接口,通过 offsets.storage 属性来进行设置。

    2.内容
    其实,官方这样推荐,也是有其道理的。之前版本,Kafka其实存在一个比较大的隐患,就是利用 Zookeeper 来存储记录每个消费者/组的消费进度。虽然,在使用过程当中,JVM帮助我们完成了一些优化,但是消费者需要频繁的去与 Zookeeper 进行交互,而利用ZKClient的API操作Zookeeper频繁的Write其本身就是一个比较低效的Action,对于后期水平扩展也是一个比较头疼的问题。如果期间 Zookeeper 集群发生变化,那 Kafka 集群的吞吐量也跟着受影响。

    在此之后,官方其实很早就提出了迁移到 Kafka 的概念,只是,之前是一直默认存储在 Zookeeper集群中,需要手动的设置,如果,对 Kafka 的使用不是很熟悉的话,一般我们就接受了默认的存储(即:存在 ZK 中)。在新版 Kafka 以及之后的版本,Kafka 消费的offset都会默认存放在 Kafka 集群中的一个叫 __consumer_offsets 的topic中。

    当然,其实她实现的原理也让我们很熟悉,利用 Kafka 自身的 Topic,以消费的Group,Topic,以及Partition做为组合 Key。所有的消费offset都提交写入到上述的Topic中。因为这部分消息是非常重要,以至于是不能容忍丢数据的,所以消息的 acking 级别设置为了 -1,生产者等到所有的 ISR 都收到消息后才会得到 ack(数据安全性极好,当然,其速度会有所影响)。所以 Kafka 又在内存中维护了一个关于 Group,Topic 和 Partition 的三元组来维护最新的 offset 信息,消费者获取最新的offset的时候会直接从内存中获取。

    3.实现
    那我们如何实现获取这部分消费的 offset,我们可以在内存中定义一个Map集合,来维护消费中所捕捉到 offset,如下所示:
    [Java]  纯文本查看  复制代码
    ?
    1
    protected static Map<GroupTopicPartition, OffsetAndMetadata> offsetMap = new ConcurrentHashMap<>();


    然后,我们通过一个监听线程来更新内存中的Map,代码如下所示:
    [Java]  纯文本查看  复制代码
    ?
    01
    02
    03
    04
    05
    06
    07
    08
    09
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    private static synchronized void startOffsetListener(ConsumerConnector consumerConnector) {
             Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
             topicCountMap.put(consumerOffsetTopic, new Integer( 1 ));
             KafkaStream< byte [], byte []> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(consumerOffsetTopic).get( 0 );
     
             ConsumerIterator< byte [], byte []> it = offsetMsgStream.iterator();
             while ( true ) {
                 MessageAndMetadata< byte [], byte []> offsetMsg = it.next();
                 if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2 ) {
                     try {
                         GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key()));
                         if (offsetMsg.message() == null ) {
                             continue ;
                         }
                         OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message()));
                         offsetMap.put(commitKey, commitValue);
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
                 }
             }
         }


    在拿到这部分更新后的offset数据,我们可以通过 RPC 将这部分数据共享出去,让客户端获取这部分数据并可视化。RPC 接口如下所示:
    [Java]  纯文本查看  复制代码
    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    namespace java org.smartloli.kafka.eagle.ipc
     
    service KafkaOffsetServer{
         string query( 1 :string group, 2 :string topic, 3 :i32 partition),
         string getOffset(),
         string sql( 1 :string sql),
         string getConsumer(),
         string getActiverConsumer()
    }


    这里,如果我们不想写接口来操作 offset,可以通过 SQL 来操作消费的 offset 数组,使用方式如下所示:

    引入依赖JAR
    [Java]  纯文本查看  复制代码
    ?
    1
    2
    3
    4
    5
    <dependency>
         <groupId>org.smartloli</groupId>
         <artifactId>jsql-client</artifactId>
         <version> 1.0 . 0 </version>
    </dependency>


    使用接口
    [Java]  纯文本查看  复制代码
    ?
    1
    JSqlUtils.query(tabSchema, tableName, dataSets, sql);


    tabSchema:表结构;tableName:表名;dataSets:数据集;sql:操作的SQL语句。

    4.预览
    消费者预览如下图所示:
     
    正在消费的关系图如下所示:
     
    消费详细 offset 如下所示:
     
    消费和生产的速率图,如下所示:
     

    5.总结
    这里,说明一下,当 offset 存入到 Kafka 的topic中后,消费线程ID信息并没有记录,不过,我们通过阅读Kafka消费线程ID的组成规则后,可以手动生成,其消费线程ID由:Group+ConsumerLocalAddress+Timespan+UUID(8bit)+PartitionId,由于消费者在其他节点,我们暂时无法确定ConsumerLocalAddress。最后,欢迎大家使用 Kafka 集群监控 ——[ Kafka Eagle  ],[ 操作手册  ]。
    展开全文
  • Kafka中的Offset存储问题小记

    千次阅读 2018-06-14 17:55:16
    Kafka的offset用于记录消费者消息消费的情况。1.概述Kafka版本[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早在 0.8.2.2 版本,已支持存入消费的 offset 到...

    Kafka的offset用于记录消费者消息消费的情况。

    1.概述

    Kafka版本[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早在 0.8.2.2 版本,已支持存入消费的 offset 到Topic中,只是那时候默认是将消费的 offset 存放在 Zookeeper 集群中。那现在,官方默认将消费的offset存储在 Kafka 的Topic中,同时,也保留了存储在 Zookeeper 的接口,通过 offsets.storage 属性来进行设置。
    2.内容
    其实,官方这样推荐,也是有其道理的。之前版本,Kafka其实存在一个比较大的隐患,就是利用 Zookeeper 来存储记录每个消费者/组的消费进度。虽然,在使用过程当中,JVM帮助我们完成了一些优化,但是消费者需要频繁的去与 Zookeeper 进行交互,而利用ZKClient的API操作Zookeeper频繁的Write其本身就是一个比较低效的Action,对于后期水平扩展也是一个比较头疼的问题。如果期间 Zookeeper 集群发生变化,那 Kafka 集群的吞吐量也跟着受影响。
    在此之后,官方其实很早就提出了迁移到 Kafka 的概念,只是,之前是一直默认存储在 Zookeeper集群中,需要手动的设置,如果,对 Kafka 的使用不是很熟悉的话,一般我们就接受了默认的存储(即:存在 ZK 中)。在新版 Kafka 以及之后的版本,Kafka 消费的offset都会默认存放在 Kafka 集群中的一个叫 __consumer_offsets 的topic中。
    当然,其实她实现的原理也让我们很熟悉,利用 Kafka 自身的 Topic,以消费的Group,Topic,以及Partition做为组合 Key。所有的消费offset都提交写入到上述的Topic中。因为这部分消息是非常重要,以至于是不能容忍丢数据的,所以消息的 acking 级别设置为了 -1,生产者等到所有的 ISR 都收到消息后才会得到 ack(数据安全性极好,当然,其速度会有所影响)。所以 Kafka 又在内存中维护了一个关于 Group,Topic 和 Partition 的三元组来维护最新的 offset 信息,消费者获取最新的offset的时候会直接从内存中获取。这也说明offset的提交是以消费者为单位的。通过消费组、主题、与分区就能确定是哪个消费组中的消费者提交的offset的了。

    kafka 提供三种语义的传递:

                   1至少一次

                   2至多一次

                   3精确一次

      首先在 producer 端保证1和2的语义是非常简单的,至少一次只需要同步确认即可(确认方式分为只需要 leader 确认以及所有副本都确认,第二种更加具有容错性),至多一次最简单只需要异步不断的发送即可,效率也比较高。目前在 producer 端还不能保证精确一次,在未来有可能实现,实现方式如下:在同步确认的基础上为每一条消息加一个主键,如果发现主键曾经接受过,则丢弃

      在 consumer 端,大家都知道可以控制 offset,所以可以控制消费,其实 offset 只有在重启的时候才会用到。在机器正常运行时我们用的是 position,我们实时消费的位置也是 position 而不是 offset。我们可以得到每一条消息的 position。如果我们在处理消息之前就将当前消息的 position 保存到 zk 上即 offset,这就是只多一次消费,因为我们可能保存成功后,消息还没有消费机器就挂了,当机器再打开时此消息就丢失了;或者我们可以先消费消息然后保存 position 到 zk 上即 offset,此时我们就是至少一次,因为我们可能在消费完消息后offset 没有保存成功。而精确一次的做法就是让 position的保存和消息的消费成为原子性操作,比如将消息和 position 同时保存到 hdfs 上 ,此时保存的 position 就称为 offset,当机器重启后,从 hdfs重新读入offset,这就是精确一次。

    • consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。
    • consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”。
    • “精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起。比如用Hadoop ETL处理消息时,将处理后的结果和offset同时保存在HDFS中,这样就能保证消息和offser同时被处理了。

        项目中由于消息可被重复消费、因此采用最少一次这种方式。从kafka集群中获取消息之后,存入数据库后再手动提交offset。这样能保证不会遗漏需要消费的消息。如果要保证精确一次的话可能比较麻烦,就得把offset存入数据库,保证业务逻辑的处理与offset的原子性(同一个事务中)。

    展开全文
  • kafka 自定义存储offset 到mysql中

    千次阅读 2019-11-22 15:01:32
    kafka0.9版本之前,offset存储在zookeeper,0.9版本以及之后,默认offset存储在kafka的一个内置的topic中。除此之外,kafka还可以选择自定义存储offset。 offset的维护是相当繁琐的,因为需要考虑到消费者的...
  • kafka offset存储机制

    2017-07-31 15:10:13
    1.概述  目前,Kafka 官网最新版[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早在 0.8.2.2 版本,已支持存入消费...那现在,官方默认将消费的offset存储在 Ka
  • 1、Kafka Offset 管理–Checkpoint 启用Spark Streaming的checkpoint是存储偏移量最简单的方法。 流式checkpoint专门用于保存应用程序的状态, 比如保存在HDFS上, 在故障时能恢复。 Spark Streaming的checkpoint...
  • Kafka中的每个partition都由一系列有序的、不可...Offset从语义上来看拥有两种:Current Offset和Committed Offset。 Current Offset Current Offset保存在Consumer客户端中,它表示Consumer希望收到的下一条消息的序号
  • SparkStreaming基于redis来管理整合kafka过程中涉及到的offset提交问题 思路: 1°、可能没有读取到数据,从源头来开始读取数据 2°、读取到了数据,从指定offset位置开始读取数据 2、对读取的kafka数据,进行...
  • kafka offset存储问题

    2017-12-27 20:34:00
    注意:从kafka-0.9版本及以后,kafka的消费者组和offset信息就不存zookeeper了,而是存到broker服务器上,所以,如果你为某个消费者指定了一个消费者组名称(group.id),那么,一旦这个消费者启动,这个消费者组名...
  • 下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。 2.Kafka文件存储机制 Kafka部分名词解释如下: Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多...
  • SqlServer存储过程入门

    千次阅读 2019-04-06 13:40:45
    什么是存储过程存储过程Procedure是一组为了完成特定功能的SQL语句集合,经编译后存储在数据库中,用户通过指定存储过程的名称并给出参数来执行。 存储过程中可以包含逻辑控制语句和数据操纵语句,它可以接受...
  • oracle 存储过程心得2

    2017-07-09 12:39:15
    1、退出存储过程 return if old_save_time = new_save_time then--没有最新数据,退出 insert into hy_data_handle_mark(id,save_time,mark_start,mark_finish) values(seq_hy_data_handle_mark.nextval,new_save_...
  • instr和substr存储过程,分析内部大对象的内容 instr函数 instr函数用于从指定的位置开始,从大型对象中查找第N个与模式匹配的字符串。 用于查找内部大对象中的字符串的instr函数语法如下: dbms_lob.instr( lob_...
  • MyBatis 调用储存过程 MyBatis 同样支持调用数据库中的储存过程,以下是一个调用数据库中分页查询储存过程的示例: MySQL 中的储存过程如下:   # 储存过程,根据 user_name...
  • MyBatis 示例之存储过程(二)

    千次阅读 2017-08-13 18:09:01
    MyBatis 示例之存储过程(一) http://blog.csdn.net/isea533/article/details/76358365 本示例是《MyBatis 从入门到精通》书中第二个存储过程的例子,有关本示例的基础环境,可以从 http://mybatis.tk 或者 ...
  • 什么是存储过程,有哪些优缺点 ...create proc 储存过程名字 as sql语句 调用存储过程: exec 存储过程名字 当存储过程已经存在的时候 可以用alter 来进行修改 存储过程使用实列: 列如在book表里面添加存储过程
  • 1.MySQL创建存储offset的表格 mysql> use hxh mysql> create table hxh_offset( topic varchar(32), groupid varchar(50), partitions int, fromoffset bigint, untilof...
  • Kafka offset管理

    2020-07-17 11:06:44
    参考文章:Kafka offset管理 kafka系列之(3)——Coordinator与offset管理和Consumer Rebalance Kafka中的每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每...
  • Kafka是什么 Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也...一个商业化消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务...
  • Kafka Offset Storage

    2017-02-09 10:44:20
    那现在,官方默认将消费的offset存储在 Kafka 的Topic中,同时,也保留了存储在 Zookeeper 的接口,通过 offsets.storage 属性来进行设置。 2.内容  其实,官方这样推荐,也是有其道理的。之前版本,Kafka其实...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 101,594
精华内容 40,637
关键字:

offset存储过程