精华内容
下载资源
问答
  • Windows 10版本business_editions和consumer_editions的区别?

    万次阅读 多人点赞 2018-06-26 23:40:48
    Windows 10版本business_editions和consumer_editions的区别?【答1】二者都内置专业版,不同之处在于:consumer_editions 版本包含:Home(家庭版); Education(教育版) ; Professional(专业版);business_editions ...

    Windows 10版本business_editions和consumer_editions的区别?

    【答1】

    二者都内置专业版,不同之处在于:

    consumer_editions 版本包含:Home(家庭版); Education(教育版) ; Professional(专业版);

    business_editions 版本包含:Education(教育版); Enterprise (企业版); Professional(专业版);

    【答2】

    Windows 10 (business editions) VL版
    ISO镜像包含以下版本(根据需要选择安装):

    专业版、企业版、教育版、专业工作站版、专业教育版

    Windows 10 (consumer editions) 零售版
    ISO镜像包含以下版本(根据需要选择安装):
    家庭版、专业版、教育版、家庭单语言版、专业工作站版、专业教育版

    【答3】

    Windows 10 各版本区别:
    家庭版(Home):供家庭用户使用,无法加入Active Directory和Azure AD,不允许远程链接
    专业版(Professional):供小型企业使用 在家庭版基础上增加了域账号加入、bitlocker、企业商店等功能
    企业版(Enterprise):供中大型企业使用 在专业版基础上增加了DirectAccess,AppLocker等高级企业功能
    教育版(Education):供学校使用 (学校职员, 管理人员, 老师和学生) 其功能基本和企业版的一样
    LTSB版:无Edge浏览器、小娜,无磁贴,可选是否下载和安装补丁,其它版都不能自选补丁
    N版:带“N”的版本相当于阉割版,移除了Windows Media Player,几乎用不到N版。


    【答4】

    Win10各版本详细区别

    Windows 10 企业版(和教育版功能一样,功能最完整版本)
    1. 核心功能
    ①熟悉,更好用:自定义开始菜单,Windows Defender 与Windows防火墙,Hiberboot与InstantGo,系统启动更快速,TPM支持,节电模式,Windows更新
    ②Cortana小娜:更自然的语音和按键输入,主动、个性化建议,提醒,从网络、本地以及云中搜索,无需动手,直接喊出“你好,小娜”即可激活
    ③Windows Hello:指纹识别,面部和虹膜识别,企业级安全
    ④多任务操作:虚拟桌面,Snap协同(同一屏幕最多支持显示4个应用),跨不同显示器的Snap功能支持
    ⑤Microsoft Edge:阅读视图,内置墨水书写支持,整合Cortana小娜
    2. 企业特性
    ①基础功能:设备加密,加入域功能,组策略管理器,Bitlocker加密,企业模式Internet Explorer浏览器(EMIE),Assigned Access 8.1(访问分配),远程桌面,Direct Access(直接访问),Windows To Go创建工具,Applocker(应用程序锁定),BranchCache(分支缓存),可通过组策略控制的开始屏幕,
    ②管理部署功能:企业应用旁加载功能,移动设备管理,可加入到Azure活动目录,单点登录到云托管应用,Win10企业商店,粒度UX控制,可轻松从专业版升级到企业版
    ③安全:Microsoft Passport登录,企业数据保护,凭据保护,设备保护
    ④Windows即服务:Windows更新,Windows Update for Business,Current Branch for Business(用于企业的当前更新分支)
    Windows 10 教育版
    与企业板功能一致,授权方式不同,可轻松从家庭版升级到教育版,(经过测试专业版也可以通过输入教育版key升级到教育版)
    Windows 10 专业版
    与企业版对比【无】以下功能:
    ①基础功能:Direct Access(直接访问),Windows To Go创建工具,Applocker(应用程序锁定),BranchCache(分支缓存),可通过组策略控制的开始屏幕,
    ②管理部署功能:粒度UX控制
    ③安全:凭据保护,设备保护
    Windows 10 家庭版
    与专业版对比【无】以下功能:
    ①基础功能:加入域功能,组策略管理器,Bitlocker加密,企业模式Internet Explorer浏览器(EMIE),Assigned Access 8.1(访问分配),远程桌面,Direct Access(直接访问),Windows To Go创建工具,Applocker(应用程序锁定),BranchCache(分支缓存),可通过组策略控制的开始屏幕,
    ②管理部署功能:可加入到Azure活动目录,单点登录到云托管应用,Win10企业商店,粒度UX控制,可轻松从专业版升级到企业版
    ③安全:企业数据保护,凭据保护,设备保护
    ④Windows即服务:Windows Update for Business,Current Branch for Business(用于企业的当前更新分支)
    Windows 10 企业版长期服务分支2015 LTSB
    与企业版功能一致,可手动设置更新服务,无Edge浏览器,无商店。
    展开全文
  • JAVA8 Consumer接口

    万次阅读 多人点赞 2018-06-08 09:52:17
    Consumer的语义是消费的意思,了解一些消息队列的同学,肯定对这个单词,有一定的理解,我们先看接口的定义 @FunctionalInterface public interface Consumer<T> { void accept(T t); default ...

    Consumer的语义是消费的意思,了解一些消息队列的同学,肯定对这个单词,有一定的理解,我们先看接口的定义

    @FunctionalInterface
    public interface Consumer<T> {
    
        
        void accept(T t);
    
        default Consumer<T> andThen(Consumer<? super T> after) {
            Objects.requireNonNull(after);
            return (T t) -> { accept(t); after.accept(t); };
        }
    }

    这个接口,接收一个泛型的参数T,然后调用accept,对这个参数做一系列的操作,没有返回值;看到这里,是不是很懵,下面用一个简单的小例子

     Consumer<Integer> consumer = x -> {
                int a = x + 2;
                System.out.println(a);// 12
                System.out.println(a + "_");// 12_
            };
            consumer.accept(10);

    调用accept的时候,传入一个泛型T的参数Integer,然后定义参数对一系列操作,然后打印这些值;

    只有调用了

    consumer.accept(10);

    这个函数,控制台才会 有输出;还是很懵?

    其实,主要是理解Consumer,消费者,就可以了~主要是对入参做一些列的操作,在stream里,主要是用于forEach;内部迭代的时候,对传入的参数,做一系列的业务操作,没有返回值;

    这个接口,只有一个默认方法,看下注释内容,就清楚用法了,就不在写案例解释了

    /**传入一个Consumer类型的参数,
    	 *他的泛型类型,
    	 *跟本接口是一致的T,先做本接口的accept操作,
    	 *然后在做传入的Consumer类型的参数的accept操作
    	*/
        default Consumer<T> andThen(Consumer<? super T> after) {
            Objects.requireNonNull(after);
            return (T t) -> { accept(t); after.accept(t); };
        }

     

    1.lambda表达式

    《java8 Lambda表达式简介》

    《java8 lambda表达式,方法的引用以及构造器的引用》

    2.函数式接口

    《java8 函数式接口简介》

    《JAVA8 Function接口以及同类型的特化的接口》

    《JAVA8 Consumer接口》

    《JAVA8 Supplier接口》

    《JAVA8 UnaryOperator接口》

    《JAVA8 BiConsumer 接口》

    3.stream接口操作

    《java8 Stream接口简介》

    《 java8 Stream-创建流的几种方式》

    《JAVA8 stream接口 中间操作和终端操作》

    《JAVA8 Stream接口,map操作,filter操作,flatMap操作》

    《JAVA8 stream接口 distinct,sorted,peek,limit,skip》

    《java8 stream接口 终端操作 forEachOrdered和forEach》

    《java8 stream接口 终端操作 toArray操作》

    《java8 stream接口 终端操作 min,max,findFirst,findAny操作》

    《java8 stream接口终端操作 count,anyMatch,allMatch,noneMatch》

    《java8 srteam接口终端操作reduce操作》

    《java8 stream接口 终端操作 collect操作》

    4.其他部分

    《java8 Optional静态类简介,以及用法》

     

    展开全文
  • Kafka Consumer

    千次阅读 2019-02-14 22:49:19
    客户端从kafka集群中消费数据,同时对于kafka&amp;nbsp;broker的失败客户端可以自动进行处理,也可以自动的适应topic partition在集群间...在使用consumer之后如果没有关闭这些链接的话会导致资源泄露,consumer...

    客户端从kafka集群中消费数据,同时对于kafka broker的失败客户端可以自动进行处理,也可以自动的适应topic partition在集群间的迁移。允许使用consumer group来与broker进行交互以实现负载均衡。

    consumer维护着到broker的TCP链接以便获取数据。在使用consumer之后如果没有关闭这些链接的话会导致资源泄露,consumer不是线程安全的,有关详细信息请参看: Multi-threaded Processing

    一、Cross-Version Compatibility

    该文章主要针对0.10.0或者更新的版本介绍,老版本的broker可能不支持这些特性,例如0.10.0不支持offsetsForTimew,因为这个特性是在0.10.1中添加的。当你执行不可用的API时,会收到一个 UnsupportedVersionException 异常。

    二、Offsets and Consumer Position

    kafka为partition中的每一条记录对应着一个数字的偏移量,我们称之为:offset。在一个partition中每一个记录的offset是该记录的唯一标识,即每一个offset唯一标识当前partition中的一条记录,同时offset也可以标识consumer在partition中的position,例如,一个consumer的position为5,其深层层含义就是该consumer已经消费了partition中offset为0到4之间的所有记录,下一次接收的将会是offset等于5的记录。实际上有两个概念与consumer position相关:

    • 调用consumer的 position方法将返回该consumer下一条将要处理消息的offset。且这个position的值等于consumer在partition看到的最大的offset+1。每当consumer通过调用 poll(Duration).获取记录之后consumer 的position都会自动增加。
    • 成功提交的position, 即  committed position 是最后一个成功存储的offset。如果consumer进程失败并且重启,那么这个offset就是consumer将要恢复的position。consumer可以定期自动提交offset,或者可以通过调用API (e.g. commitSync and commitAsync)来选择手动控制提交,这种设计使得consumer可以自己控制何时消费记录。

    三、Consumer Groups and Topic Subscriptions

    kafka 使用 consumer groups的概念来允许一个线程池来划分消费任务和处理记录。这些线程可以运行在同一台机器上也可以分布在多台机器上以便提供扩展性和容错能力。所有共享同一个 group.id的consumer属于同一个consumer group。

    consumer group中的每一个consumer可以通过 subscribe  API动态的获取他想要订阅的Topic列表。Kafka将把topic中的每一条消息分配给所有订阅该Topic的consumer group中的一个线程进行处理。 同时可以平衡consumer group 中所有成员之间消费的partition,consumer Group中的每个consumer之间消费的partition个数基本一致,这样Topic的每个partition都被精确地分配给consumer group中的一个consumer。 如果一个Topic有四个partition,一个consumer group 有两个进程,那么每个进程将消费两个partition。

    consumer group中成员的关系是动态维护的:如果一个consumer进程失败,之前分配给该consumer进程的所有partition将会被重新分配给同一consumer group中的其他的consumer。如果一个新的consumer加入consumer group,该consumer group中其他的consumer将会把自身处理的一部分partition分配这个新的consumer。这就是consumer group中的rebalancing。当一个consumer group中添加订阅新的topic时,group rebalancing同样适用,group将定期刷新元数据以便检测新的partition并将他们分配给组中的成员。

    从概念上讲,您可以将consumer group看作是由多个consumer进程组成的单个逻辑订阅者。作为一个多订阅系统( multi-subscriber system),kafka天然支持为一个Topic分配任意数量的consumer group,并且这不是通过数据复制实现的,因此对于Kafka而言,添加额外的consumer代价非常低。

    本文是消息系统中常见功能的一个简单概括, 为了获得与传统消息交付系统中的队列类似的语义,所有consumer都将是单个consumer group的一部分,因此消息交付将会在group中进行平衡,就像使用队列一样。 但是,与传统的消息传递系统不同,您可以有多个这样的group。 要获得与传统消息传递系统中类似的 pub-sub的语义时,每个consumer进程可以拥有自己的consumer  group,即所有的consumer进程都属于不同的consumer group,因此每个consumer进程将订阅发布到主题的所有记录。

    此外,当组 rebalancing 分配自动发生时,consumers会通过 ConsumerRebalanceListener 被通知到(ConsumerRebalanceListener是一个回调接口,当分配给consumer的partition发生更改时,用户可以实现该回调接口来触发自定义操作),以便它们完成必要的应用程序级逻辑,如状态清理,手动提交offset 等等。详见 Offsets Outside Kafka

    开发人员也可以通过使用 assign(Collection) 为consumer手动分配partition,此时将禁用自动partition分配以及consumer group协调。

    四、Detecting Consumer Failures

    订阅一系列的topic之后,当consumer调用  poll(Duration) 时会自动加入相应的consumer  group。  poll API的设计是为了确保consumer的活力,只要持续调用 poll,consumer将留在组中,并持续从分配给它的topic partitions中接收消息。consumer会在后台持续向服务发送心跳,如果consumer进程崩溃或者在 session.timeout.ms期间没有发送心跳,这个consumer将会被认为已经死掉了,他的分区将会被重新分配。

    还有一种可能,消费者可能会遇到一种“ livelock”的情况,即它继续发送心跳,但没有取得任何进展(即处理消息的进程很缓慢)。 在这种情况下,为了防止consumer无限期地保留其分区,我们通过max.poll.interval.ms配置提供了一种活动检测机制。 基本上,如果consumer不在max.poll.interval.ms配置的最大间隔内频繁地调用poll,那么consumer将主动离开组,以便另一个consumer可以接管其分区, 发生这种情况时,您可能会看到offset提交失败( as indicated by a CommitFailedException thrown from a call to commitSync())。这是一种安全机制,它保证只有组中的活动成员才能提交offset。因此,要留在组中,必须继续调用poll。

    consumer 提供了两个配置项来控制轮询poll的行为:

    1. max.poll.interval.ms: 通过增加poll之间的间隔,您可以给consumer更多的时间来处理从  poll(Duration)返回的一批记录。 缺点是,增加这个值可能会延迟group rebalancing,因为consumer只会在调用 poll时参与rebalancing。 您可以使用此设置来限制完成rebalancing的时间,但是如果consumer实际上不能频繁地调用poll,那么你可能会面临进展缓慢的风险。

    2. max.poll.records: 使用此设置可对单次调用poll返回的记录数进行限制。这样可以更容易地预测每个poll调用间隔内consumer所需处理的最大值。通过调优此值,你能够减少轮询间隔,这将降低group rebalancing的影响。

    对于消息处理时间变化不可预测的用例, 这两种选择可能不够。 处理这些情况的推荐方法是将消息处理转移到另一个线程,这允许consumer在处理器仍在工作时继续调用poll。但是必须采取一些措施以确保提交的offset不会超出实际位置。 通常,你必须禁用自动提交,并仅在线程完成对记录的处理后手动提交处理过的offset(取决于您需要的交付语义)。 还请注意,你需要 pause 分区(即 暂停从请求的分区获取数据),以便在线程处理完上一次poll返回的记录之前,不会从poll中接收到任何新记录。

    五、Usage Examples

    consumer 的api提供了针对各种消费用例的灵活性。下面是一些示例来演示如何使用它们

    5.1、Automatic Offset Committing

    这个示例演示了Kafka的consumer api的简单用法,它依赖于自动提交offset。

         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList("foo", "bar"));
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
         }
    

    通过使用配置bootstrap.servers指定要连接的一个或多个broker的列表以便连接到集群,此列表仅用于发现集群中的其他broker,不需要是集群中服务器的详尽列表(尽管您可能希望指定多个服务器,以防客户机连接时服务器宕机)。

    设置enable.auto.commit意味着将以配置项auto.commit.interval.ms指定的频率自动提交offset。

    在本例中,consumer订阅了主题foo和bar,group.id配置为test,意味着该consumer属于一个名为test的consumer group。

    deserializer设置指定如何将字节转换为对象。例如,通过指定StringDeserializer,我们可以认为记录的键和值将只是简单的字符串。

    5.2、Manual Offset Control

    与依赖consumer定期提交已使用的offset不同,用户还可以控制何时应将记录视为已使用,从而提交其offset。当消息的使用与某些处理逻辑结合在一起时,这非常有用,因此在完成处理之前,不应将消息视为已使用。

         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "false");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList("foo", "bar"));
         final int minBatchSize = 200;
         List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records) {
                 buffer.add(record);
             }
             if (buffer.size() >= minBatchSize) {
                 insertIntoDb(buffer);
                 consumer.commitSync();
                 buffer.clear();
             }
         }
    

    在本例中,我们将在内存中缓存一批记录并对它们进行批处理。当我们批处理了足够多的记录时,我们将把它们插入数据库中。如果我们像前面的示例那样允许offset自动提交,那么在将记录返回给调用poll的consumer之后,它们将被视为已使用。 然而,在批处理完成之后,但在将记录插入数据库之前,我们的进程可能会失败。

    为了避免这种情况,我们仅仅在相应的记录插入数据库之后手动提交offset。这使我们能够准确地控制记录何时被消费。同时,也存在另一种可能性:进程可能在插入数据库之后但在提交offset之前的间隔时间内失败(尽管这可能只有几毫秒,但这是一种可能性)。 在这种情况下,接管消费的进程将从上次提交的offset之后继续消费,并重复插入最后一批数据。 以这种方式就是Kafka中所提供的“ at-least-once”的交付保证, 因为每个记录至少只交付一次,但是在失败的情况下可能会重复。

    注意:使用自动offset提交也可以提供“at-least-once”的交付, 但是要求您在调用 poll(Duration) 或者 closing consumer之前必须在消费掉所有返回的数据,。 如果这两种方法都失败,提交的offset就有可能超过消耗的位置,从而导致丢失记录。使用手动offset控制的优点是,您可以直接控制记录何时被认为“消费”。

    上面的示例使用  commitSync 将所有接收到的记录标记为已提交。在某些情况下,您可能希望通过显式地指定offset来更好地控制已提交的记录。在下面的示例中,我们在处理完每个分区中的记录后提交offset。

        try {
             while(running) {
                 ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                 for (TopicPartition partition : records.partitions()) {
                     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                     for (ConsumerRecord<String, String> record : partitionRecords) {
                         System.out.println(record.offset() + ": " + record.value());
                     }
                     long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                 }
             }
         } finally {
           consumer.close();
         }
    

    提交的offset应该始终是应用程序将读取的下一条消息的offset。因此,在调用  commitSync(offsets)时,应该是最后处理的消息的offset+1。

    5.3、Manual Partition Assignment

    在前面的示例中,我们订阅了感兴趣的主题,并让Kafka根据consumer group中活跃的consumer将Topic的partition动态的分配给各个consumer。但是,在某些情况下,你可能需要手动的为某个consumer分配特定的partition。例如:

    • 如果consumer进程维护这与该分区相关的某种本地状态(比如本地磁盘上的键值存储),那么它应该只获取它在磁盘上维护的分区中的记录。
    • 如果进程本身是高可用的,并且在失败时将重新启动( 可能使用诸如YARN、Mesos或AWS设施之类的集群管理框架,或者作为流处理框架的一部分)。 在这种情况下,Kafka不需要检测故障并重新分配分区,因为消费进程将在另一台机器上重新启动。

    要使用这种模式,不需要使用 subscribe 订阅topic,只需使用要使用的分区的完整列表调用 assign(Collection) 。

         String topic = "foo";
         TopicPartition partition0 = new TopicPartition(topic, 0);
         TopicPartition partition1 = new TopicPartition(topic, 1);
         consumer.assign(Arrays.asList(partition0, partition1));
    

    一旦分配完成,你可以循环调用 poll ,正如前面例子中所展示的那样。 使用者指定的组仍然用于提交偏移量,但是现在分区集只会随着另一个 assign 的调用而更改,不会自动进行rebalancing。 手动分区分配不使用组协调( group coordination),因此使用者失败不会导致分配的分区被重新 rebalanced, 即使与另一个consumer共享一个groupId,每个consumer的行为也是独立的。 为了避免offset提交冲突,你通常应该确保每个consumer实例都有唯一的groupId。

    注意,手动的partition分配( i.e. using assign)与动态分区分配( i.e. using subscribe)混合使用是不可能的。

    5.4、Storing Offsets Outside Kafka

    消费者应用程序不需要使用Kafka内置的offset存储,它可以在自己选择的存储中存储offset。 这方面的主要用例是允许应用程序原子性的存储结果和offset。 这并非不可能的,但如果是这样,它将使消费完全原子化,并且可以实现比kafka自动提交offset所默认的“ at-least once”语义更强的“ exactly once”语义。 下面是一些这类用法的例子:

    • 如果消费的结果存储在关系数据库中,那么在数据库中存储offset时,允许在单个事务中同时提交结果和对应的offset。 因此,要么事务成功,offset将根据实际情况进行更新,要么结果将不会存储,offset也不会更新
    • 如果结果存储在本地存储中,那么也可以在那里存储offset。例如,可以通过订阅特定分区并同时存储offset和索引数据来构建搜索索引。如果以原子的方式执行此操作,通常可能会出现这样的情况,即发生了崩溃导致未同步的数据丢失,但是以存储的数据所对应的offset不会丢失。在这种情况下,可以从已保存的数据位置恢复数据,确保数据不会丢失。

    每个记录都有自己的偏移量,所以要管理自己的偏移量,只需执行以下操作:

    当分区分配也是手工完成时,这种类型的使用是最简单的(这可能在上面描述的搜索索引用例中)。 如果分区分配是自动完成的,则需要特别注意处理分区分配发生更改的情况。 这可以通过 This can be done by providing a ConsumerRebalanceListener instance in the call to subscribe(Collection, ConsumerRebalanceListener) and subscribe(Pattern, ConsumerRebalanceListener)。 例如,当从consumer获取分区时,consumer将希望通过实现  ConsumerRebalanceListener.onPartitionsRevoked(Collection)来提交这些分区的offset。当分区被分配一个consumer时 consumer将希望查找这些新分区的offset,并将consumer正确初始化到该位置,此时可以调用 ConsumerRebalanceListener.onPartitionsAssigned(Collection).

    ConsumerRebalanceListener 的另一个常见用途是刷新应用程序为分区维护的缓存。

    5.4、Controlling The Consumer’s Position

    在大多数用例中,consumer只是从头到尾地使用记录,定期提交其position(自动或手动)。然而Kafka允许consumer手动控制它的position,在分区中任意向前或向后移动。这意味着consumer可以重新使用旧记录,或者跳到最近的记录,而不需要实际使用中间记录。

    在一些情况下,手动控制consumer的位置是有用的。

    一种情况是时间敏感的记录处理,对于远远落后于处理所有记录的consumer来说,这可能是有意义的,因为他们不会试图赶上处理所有记录的速度,而只是跳到最近的记录

    另一个用例用于维护上一节中描述的本地状态的系统。在这样的系统中,consumer将希望在启动时将其position初始化为本地存储的内容。同样,如果本地状态被破坏(比如磁盘丢失),则可以通过重新使用所有数据和重新创建状态(假设Kafka保留了足够的历史)在新机器上重新创建状态。

    通过调用 seek(TopicPartition, long)Kafka允许指定一个新的 position , 也可以使用特殊的方法来查找服务器维护的最早的offset和最近的offset( seekToBeginning(Collection) and seekToEnd(Collection) )。

    5.5、Consumption Flow Control

    如果将多个partition分配给consumer以从中获取数据,则consumer将尝试同时从所有partition中进行消费,从而有效地为这些partition提供相同的消费优先级。但是,在某些情况下,使用者可能希望首先集中从指定partition的某个子集全速获取数据,并且只在这些partition只有很少或没有数据要使用时才开始获取其他partition

    其中一种情况是流处理,处理器从两个topic 获取数据并在这两个流上执行连接。当其中一个topic远远滞后于另一个topic时,处理器希望暂停从前一个topic获取数据,以便让滞后的流赶上来。另一个例子是在用户启动时引导,在启动时需要跟踪大量历史数据,应用程序通常希望在考虑获取其他topic之前获得其中一些topic的最新数据。

    通过使用  pause(Collection) 和 resume(Collection)kafka支持动态控制消费流,以便在 poll(Duration)调用中暂停消费和恢复消费。

    六、Reading Transactional Messages

    Kafka 0.11.0引入了事务,其中应用程序可以原子地写入多个topic和partition。为了使其工作,从这些topic读取数据的consumer应该配置为只读取已提交的数据。这可以通过在consumer的配置中设置** isolation.level=read_committed**

    In read_committed mode, consumer将只读取那些已成功提交的事务消息。它将像以前一样继续读取非事务性消息。 read_committed模式下没有客户端缓冲。 相反,read_committed consumer 的分区的end offset 将是属于开放事务的分区中的第一个消息的offset。这个offset称为“最后稳定偏移量”(Last Stable offset, LSO)。

    一个read_committed consumer  只读取到LSO并过滤掉任何已中止的事务消息。 LSO还影响read_committed consumer 的  seekToEnd(Collection) 和 endOffsets(Collection)  的行为,每个方法的文档中都有详细的说明。 最后,对于read_committed consumer,fetch滞后指标也被调整为相对于LSO.

    带有事务消息的分区将包括提交或中止标记,这些标记指示事务的结果。 这些标记没有返回给应用程序,但是日志中有一个偏移量。 因此,应用程序从具有事务性消息的主题中读取消费的offset将看到空白。 这些丢失的消息将是事务标记,并在两个隔离级别中为consumer过滤掉它们。此外,使用read_committed consumer 的应用程序可能还会看到由于事务中止而造成的差距,因为这些消息不会被consumer返回,但是会有有效的offset。

    七、Multi-threaded Processing

    Kafka消费者不是线程安全的。所有网络I/O都发生在调用应用程序的线程中。确保多线程访问正确同步是用户的责任。非同步访问将导致ConcurrentModificationException异常。 此规则的唯一例外是 wakeup(),可以从外部线程安全地使用它来中断活动操作。在这种情况下,将从操作的线程阻塞中抛出  WakeupException。这可以用于从另一个线程关闭consumer。下面的代码片段展示了典型的模式:

    public class KafkaConsumerRunner implements Runnable {
         private final AtomicBoolean closed = new AtomicBoolean(false);
         private final KafkaConsumer consumer;
         public void run() {
             try {
                 consumer.subscribe(Arrays.asList("topic"));
                 while (!closed.get()) {
                     ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
                     // Handle new records
                 }
             } catch (WakeupException e) {
                 // Ignore exception if closing
                 if (!closed.get()) throw e;
             } finally {
                 consumer.close();
             }
         }
         // Shutdown hook which can be called from a separate thread
         public void shutdown() {
             closed.set(true);
             consumer.wakeup();
         }
    }
    

    然后在单独的线程中,consumer可以被关闭通过设置closed 标值

        closed.set(true);
        consumer.wakeup();
    

    请注意,虽然可以使用线程中断而不是 wakeup()来中止阻塞操作(在这种情况下,将引发 InterruptException),但是我们不建议使用它们,因为它们可能会导致consumer完全关闭,从而中止阻塞操作。中断主要支持那些不可能使用 wakeup()的情况,例如,当consumer线程由位置代码的Kafka客户机管理时。

    我们有意避免为处理实现特定的线程模型。这为实现记录的多线程处理留下了几个选项

    7.1、One Consumer Per Thread

    一个简单的选项是给每个线程一个它自己的consumer实例。下面是这种方法的优缺点

    优点:

    • 它是最容易实现的
    • 它通常是最快的,因为不需要线程间的协作
    • 它使得在每个分区的基础上按顺序处理非常容易实现(每个线程只按照它接收到的顺序处理消息)

    缺点:

    • 更多的consumer意味着到集群的TCP连接更多(每个线程一个)。一般来说,Kafka非常有效地处理连接,所以这通常是一个小成本。
    • 多个consumer意味着发送到服务器的请求更多,批处理的数据更少,这可能会导致I/O吞吐量下降。
    • 所有进程的线程总数将受到分区总数的限制

    7.2、 Decouple Consumption and Processing

    另一种选择是让一个或多个consumer线程执行所有数据消费,并将 ConsumerRecords 实例传递给由实际处理记录处理的处理器线程池所消费的阻塞队列。这个选择也有优点和缺点:

    优点:

    • 这个选项允许独立地扩展消费者和处理器的数量。这样就可以有一个单一的consumer来提供许多处理器线程,从而避免分区上的任何限制

    缺点:

    • 确保处理器之间的顺序需要特别注意,因为线程将独立执行较早的数据块,由于线程执行计时的幸运,实际上可能在稍后的数据块之后处理较早的数据块。对于没有排序需求的处理,这不是问题。
    • 手动提交位置变得更加困难,因为它要求所有线程协调以确保该分区的处理完成

    这种方法有许多可能的变体。例如,每个处理器线程可以有自己的队列,而consumer线程可以使用TopicPartition散列到这些队列中,以确保按顺序使用和简化提交。

    展开全文
  • 错误日志: Exception in thread &amp;quot;main&amp;quot; org.apache.kafka.... at org.apache.kafka.clients.consumer.KafkaConsumer.&amp;amp;lt;init&amp;amp;gt;(KafkaConsumer.java:79...

    错误日志:

    Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:793)
    	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644)
    	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
    	at com.unigroup.consumer.CanalConsumer.init(CanalConsumer.java:29)
    	at com.unigroup.starup.test.main(test.java:22)
    Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
    	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
    	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:688)
    	... 4 more
    
    
    

    这里从报错来看很明显了
    producer: org.apache.kafka.common.serialization.StringSerializer
    consumer:org.apache.kafka.common.serialization.StringDeserializer
    一个是序列化一个是反序列化,两个并不一样。查看配置文件。

    确认API版本号与安装包版本号是否一致:
    如果安装的是kafka,直接查看libs目录下的jar版本就知道kafka版本了。
    如果安装的是confluent,查看kafka版本号方式可以关注私信我。


    更多文章关注公众号
    ![在这里插入图片描述](https://img-blog.csdnimg.cn/20210325093921176.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3hpYW95dV9CRA==,size_16,color_FFFFFF,t_70
    更多:kafka深入理解专栏
    ——————————————————————————————————
    作者:桃花惜春风
    转载请标明出处,原文地址:
    https://blog.csdn.net/xiaoyu_BD/article/details/82464606
    如果感觉本文对您有帮助,您的支持是我坚持写作最大的动力,谢谢!

    展开全文
  • 目前发现报错的原因有三个,主机localhost出错,端口出错,防火墙没关 ...WARN [Consumer clientId=consumer-1, groupId=console-consumer-94437] Connection to node -1 (localhost/127.0.0.1:9092) could...
  • 消费者 consumer 是线程安全的吗?多线程实例、单线程实例、单 consumer + 多 worker 线程的优缺点? 消息拉取时,什么情况下会造成消息重复消费?谈谈你对位移提交的理解? 理解消息交付语义: 最多一次(atmost...
  • kafka集群中去消费消息 kafka-console-consumer.sh --bootstrap-...却一直报警告:WARN [Consumer clientId=consumer-1, groupId=console-consumer-82820] 2 partitions have leader brokers without a matching l
  • Kafka Consumer Rebalance

    2017-09-10 22:10:02
    Kafka保证同一consumer group中只有一个consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每一个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer...
  • kafka的__consumer_offsets管理consumer

    千次阅读 2019-09-10 16:07:55
    __consumer_offsets管理consumer 说明: 公司的kafka(1.0.0)要清理一些过期的消费组。开始以为像旧版的kafka(好像是0.9.0.0及以后的是新版本)那样,topic和consumer都交给zookeeper保管,傻乎乎地通过shell命令查看...
  • Kafka常用命令之kafka-console-consumer.sh

    万次阅读 热门讨论 2019-04-06 15:29:10
      kafka-console-consumer.sh 脚本是一个简单的控制台 Consumer。此脚本的功能通过调用 ConsoleConsumer 实现。
  • RabbitMQ之Consumer

    千次阅读 2018-10-06 23:21:48
    文章目录RabbitMQ-java-client版本Consumer推拉模式代码消息确认与拒绝prefetch关闭 RabbitMQ-java-client版本 com.rabbitmq:amqp-client:4.3.0 RabbitMQ版本声明: 3.6.15 Consumer RabbitMQ的消费模式分为两种:...
  • JDK不仅提供的这些函数式接口,其中一些接口还为我们提供了实用的默认方法,这次我们来介绍一下Consumer、Predicate、Function复合。
  • Kafka Consumer重置Offset

    千次阅读 2019-06-25 17:59:46
    在Kafka Version为0.11.0.0之后,Consumer的Offset信息不再默认保存在Zookeeper上,而是选择用Topic的形式保存下来。 在命令行中可以使用kafka-consumer-groups的脚本实现Offset的相关操作...
  • Kafa consumer是否可以消费指定分区消息? Kafa consumer消费消息时,向broker发出"fetch"请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer拥有了...
  • kafka consumer 配置详解

    2018-05-18 15:04:22
    1、Consumer Group 与 topic 订阅每个Consumer 进程都会划归到一个逻辑的Consumer Group中,逻辑的订阅者是Consumer Group。所以一条message可以被多个订阅message 所在的topic的每一个Consumer Group,也就好像是这...
  • Kafka中的消费者组(Consumer Group)

    万次阅读 多人点赞 2019-01-25 13:52:42
    1. 消费者组(Consumer Group)  消费者组是Kafka实现单播和广播两种消息模型的手段。同一个topic,每个消费者组都可以拿到相同的全部数据。 1.1 消费者多于分区数 创建一个用于测试的单分区topic test 设置...
  • WARN [Consumer clientId=consumer-1, groupId=console-consumer-950] Connection to node -1 could not be established. Broker may not be available. 这是因为你的配置文件中的PLAINTEXT跟你请求的内容不同...
  • kafka consumer partition分配

    千次阅读 2018-10-19 18:58:07
    成功Rebalance的结果是,被订阅的所有Topic的每一个Partition将会被Consumer Group内的一个(有且仅有一个)Consumer拥有。每一个Broker将被选举为某些Consumer Group的Coordinator。某个Cosnumer Group的...
  • 解决dubbo问题:forbid consumer

    万次阅读 热门讨论 2015-12-17 19:45:46
    原文地址:http://www.jameswxx.com/%e4%b8%ad%e9%97%b4%e4%bb%b6/%e8%a7%a3%e5%86%b3dubbo%e9%97%ae%e9%a2%98%ef%bc%9aforbid-consumer/ 线下环境经常出现类似这种异常: ...
  • 详解RocketMQ中的consumer

    万次阅读 2016-04-30 15:08:09
    上一篇博客着重讲解了一下RocketMQ中的Producer,那么接下来这篇博客来带大家来了解一下RocketMQ中的Consumer角色 上述就是MQ中有关Consumer的类图,下面来介绍一下每个类 1.MQAdmin:底层类,上篇博客...
  • Kafka Consumer 拉取消息

    2017-04-10 15:16:36
    基于0.10,最近在测试consumer端消费集群消息, 设置“一次最大拉取的条数”的参数,但是实际拉取的条数不唯一,如果将其设置成500或600,那么每次拉取的条数就是500或600定值,但是如果设置成1W,那么拉取条数在4k-...
  • kafka Consumer详解

    千次阅读 2017-10-29 22:13:06
    1.ZookeeperConsumer架构 ZookeeperConsumer类中consumer运行过程架构图:  图1 过程分析: ConsumerG
  • Kafka中的消费者有两套API,分别是high level的和low level的。...High Level Consumer API围绕着Consumer Group这个逻辑概念展开,它屏蔽了每个Topic的每个Partition的Offset管理(自动读取zookeeper中该Consu
  • 描述下kafka consumer 再平衡步骤? ①关闭数据拉取线程,情空队列和消息流,提交偏移量; ②释放分区所有权,删除zk中分区和消费者的所有者关系; ③将所有分区重新分配给每个消费者,每个消费者都会分到不同分区; ...
  • RabbitMQ 注册consumer

    千次阅读 2018-06-11 14:07:34
    订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息,这样当MQ 中queue有消息时,会自动把消息通过该socket...
  • kafka consumer 什么情况会触发再平衡reblance? ①一旦消费者加入或退出消费组,导致消费组成员列表发生变化,消费组中的所有消费者都要执行再平衡。 ②订阅主题分区发生变化,所有消费者也都要再平衡。 ...
  • KafkaConsumer 抛出KafkaConsumer is not safe for multi-threaded access异常

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 279,367
精华内容 111,746
关键字:

consumer