精华内容
下载资源
问答
  • kafka 暂停消费

    千次阅读 2018-10-16 15:48:00
    kafkaListener 需要指定id,例如这里是:full-part-id。 @KafkaListener(topics = "part-full-topic", id = "full-part-id", containerGroup = "full-part-group") public void ...

    1、代码实现

    kafkaListener

    需要指定id,例如这里是:full-part-id。

    @KafkaListener(topics = "part-full-topic", id = "full-part-id", containerGroup = "full-part-group")
    public void listenFullPart(ConsumerRecord<String, String> record) {
        Optional<String> recordOptional = Optional.fromNullable(record.value());
        if (recordOptional.isPresent()) {
            List<PartStockInfoVo> partStockInfoVos = JSONObject.parseArray(recordOptional.get(), PartStockInfoVo.class);
            esPartInfoClient.updateFullIndex(partStockInfoVos);
        }
    }

    消费开关

    @RestController
    public class KafkaManageController {
    
        @Autowired
        private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    
        @RequestMapping("/stop")
        public void stop() {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
            listenerContainer.stop();
        }
    
        @RequestMapping("/start")
        public void start() {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
            listenerContainer.start();
        }
    }

     

     

    参考:

    1、How can i stop consumers from consuming?

     

    展开全文
  • Kafka索引服务可以在Overlord上配置,通过管理Kafka索引任务的创建和生命周期来促进Kafka的消费。这些索引任务使用Kafka自己的分区和偏移机制读取事件,因此能够提供完全一次摄取的保证。他们还能够从Kafka读取非...

    Kafka Indexing Service

    Kafka索引服务可以在Overlord上配置,通过管理Kafka索引任务的创建和生命周期来促进Kafka的消费。这些索引任务使用Kafka自己的分区和偏移机制读取事件,因此能够提供完全一次摄取的保证。他们还能够从Kafka读取非近期事件,并且不受使用Tranquility对其他摄取机制施加的窗口期限的影响。主管监督索引任务的状态,以协调切换,管理故障并确保维护可伸缩性和复制要求。

    Kafka索引服务使用了Kafka 0.10.x中引入的Java consumer。由于此版本中存在协议更改,因此Kafka 0.10.x消费者可能与较旧的brokers不兼容。在使用此功能之前,请确保您的Kafka brokers是0.10.x或更高版本。如果您使用的是旧版Kafka brokers,请参阅Kafka升级指南

    Submitting a Supervisor Spec

    Kafka索引服务要求druid-kafka-indexing-service在Overlord和MiddleManagers上加载扩展。

    通过HTTP POST提交Supervisor Specd来启动一个dataSource的规范:

    http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor

    例如:

    curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://localhost:8090/druid/indexer/v1/supervisor
    

    示例Supervisor Spec如下所示:

    {
      "type": "kafka",
      "dataSchema": {
        "dataSource": "metrics-kafka",
        "parser": {
          "type": "string",
          "parseSpec": {
            "format": "json",
            "timestampSpec": {
              "column": "timestamp",
              "format": "auto"
            },
            "dimensionsSpec": {
              "dimensions": [],
              "dimensionExclusions": [
                "timestamp",
                "value"
              ]
            }
          }
        },
        "metricsSpec": [
          {
            "name": "count",
            "type": "count"
          },
          {
            "name": "value_sum",
            "fieldName": "value",
            "type": "doubleSum"
          },
          {
            "name": "value_min",
            "fieldName": "value",
            "type": "doubleMin"
          },
          {
            "name": "value_max",
            "fieldName": "value",
            "type": "doubleMax"
          }
        ],
        "granularitySpec": {
          "type": "uniform",
          "segmentGranularity": "HOUR",
          "queryGranularity": "NONE"
        }
      },
      "tuningConfig": {
        "type": "kafka",
        "maxRowsPerSegment": 5000000
      },
      "ioConfig": {
        "topic": "metrics",
        "consumerProperties": {
          "bootstrap.servers": "localhost:9092"
        },
        "taskCount": 1,
        "replicas": 1,
        "taskDuration": "PT1H"
      }
    }
    

    Supervisor Configuration

    字段 描述 是否必须
    type Supervisor 类型,这里应该为Kafka。
    dataSchema 摄取过程中Kafka索引任务将使用的Schema,请参阅Ingestion Spec DataSchema
    ioConfig 用于配置管理程序和索引任务的KafkaSupervisorIOConfig,请参见下文。
    tuningConfig 用于配置管理程序和索引任务的KafkaSupervisorTuningConfig,请参见下文。

    KafkaSupervisorTuningConfig

    tuningConfig是可选的,如果未指定tuningConfig,则将使用默认参数。

    字段 类型 描述 是否必须
    type String 索引任务类型,应该始终是kafka
    maxRowsInMemory Integer 持久化之前要聚合的行数。此数字是后聚合行,因此它不等于输入事件的数量,而是等于这些事件导致的聚合行数。这用于管理所需的JVM堆大小。索引的最大堆内存使用量为:maxRowsInMemory *(2 + maxPendingPersists)。通常用户不需要设置它,但是根据数据的性质,如果行以字节为单位比较短,用户可能不希望在内存中存储一百万行,应该设置该值。 否(默认==000000)
    maxBytesInMemory Long 持久化之前在堆内存中聚合的字节数。这是基于对内存使用情况的粗略估计,而不是实际使用情况。通常这是在内部计算的,用户不需要设置它。索引的最大堆内存使用量是:maxBytesInMemory *(2 + maxPendingPersists) 否(默认==大JVM内存的六分之一)
    maxRowsPerSegment Integer 要聚合成段的行数;这个数字是聚合后的行。切换将在maxRowsPerSegmentmaxTotalRows被触发或每次intermediateHandoffPeriod时发生,以先发生者为准。 否(默认==000000)
    maxTotalRows Long 在所有段中聚合的行数; 这个数字是后聚合行。切换将在maxRowsPerSegmentmaxTotalRows被触发或每次intermediateHandoffPeriod时发生,以先发生者为准。 否(默认==nlimited)
    intermediatePersistPeriod ISO8601 Period 决定中间体持续发生率的时期。 否(默认== PT10M)
    maxPendingPersists Integer 可挂起但未启动的最大持久性数量。如果一个新的中间持久化会超过这个限制,那么在当前运行的持久化完成之前,摄入将被阻塞。索引规模的最大堆内存使用量maxRowsInMemory * (2 + maxpendingpersist) 否(默认== 0,表示一个persist可以与摄取同时运行,并且没有一个可以排队)
    indexSpec Object 调整数据的索引方式,有关详细信息,请参阅下面的“IndexSpec”。
    reportParseExceptions Boolean 弃用。如果为true,则在抛出过程中遇到的异常将被抛出并将停止摄取; 如果为false,将跳过不可解析的行和字段。设置reportParseExceptions为true将覆盖现有的配置maxParseExceptionsmaxSavedParseExceptions,设置maxParseExceptions为0,限制maxSavedParseExceptions为不超过1。 否(默认== false)
    handoffConditionTimeout Long 等待段切换的毫秒数。它必须> = 0,其中0表示永远等待。 否(默认== 0)
    resetOffsetAutomatically Boolean 是否重置消费者偏移量,如果它尝试获取的下一个偏移量小于该特定分区的最早可用偏移量。根据useEarliestOffset的属性(见下文),消费者偏移将重置为最早或最近的偏移KafkaSupervisorIOConfig。这种情况通常发生在Kafka的消息不再可供消费时,因此不会被摄入Druid。如果设置为false,则将暂停对该特定分区的摄取,并且需要手动干预来纠正这种情况,请参阅Reset Supervisor下面的API。 否(默认== false)
    workerThreads Integer Supervisor将用于异步操作的线程数。 否(默认== min(10,taskCount))
    chatThreads Integer 将用于与索引任务通信的线程数。 否(默认== min(10,taskCount * replicas))
    chatRetries Integer 在考虑任务无响应之前,将重试HTTP请求索引任务的次数。 否(默认== 8)
    httpTimeout ISO8601 Period 从索引任务等待HTTP响应的时间 否(默认== PT10S)
    shutdownTimeout ISO8601 Period 在退出之前,等待Supervisor尝试优雅地关闭任务需要多长时间。 否(默认== PT80S)
    offsetFetchPeriod ISO8601 Period 主管查询Kafka和索引任务以获取当前偏移量和计算延迟的频率。 否(默认== PT30S,分钟== PT5S)
    segmentWriteOutMediumFactory Object 分段写入介质,用于创建分段。请参阅下面的详细信息。 否(默认情况下未指定,使用的值druid.peon.defaultSegmentWriteOutMediumFactory.type
    intermediateHandoffPeriod ISO8601 Period 任务应该多长时间分配一次。切换将在maxRowsPerSegmentmaxTotalRows被触发或每次intermediateHandoffPeriod时发生,以先发生者为准。 否(默认== P2147483647D
    logParseExceptions Boolean 如果为true,则在发生解析异常时记录错误消息,其中包含有关发生错误的行的信息。 否(默认== false)
    maxParseExceptions Integer 在任务停止摄取和失败之前可能发生的最大解析异常数。如果reportParseExceptions已设置,则重写。 否(默认无限制)
    maxSavedParseExceptions Integer 当发生解析异常时,Druid可以跟踪最近的解析异常。“maxSavedParseExceptions”限制将保存多少个异常实例。这些保存的异常将在任务完成报告中任务完成后可用。如果reportParseExceptions已设置,则重写。 否(默认== 0)

    IndexSpec

    字段 类型 描述 是否必须
    bitmap Object 位图索引的压缩格式。应该是一个JSON对象; 请参阅下文的“Bitmap types”。 否(默认为 Concise)
    dimensionCompression String 维列的压缩格式。选择LZ4LZFuncompressed 否(默认== LZ4
    metricCompression String 公制列的压缩格式。选择LZ4LZFuncompressed,或none 否(默认== LZ4
    longEncoding String 类型为long的度量和维度列的编码格式。选择autolongsauto根据列基数使用偏移量或查找表对值进行编码,并以可变大小存储它们。longs将值存储为每个8字节。 否(默认== longs
    Bitmap types

    Concise bitmaps:

    字段 类型 描述 是否必须
    type String 必须为 concise

    Roaring bitmaps:

    字段 类型 描述 是否必须
    type String 必须为 roaring
    compressRunOnSerialization Boolean 使用行程编码,估计其空间效率更高。 否(默认==true)

    SegmentWriteOutMediumFactory

    字段 类型 描述 是否必须
    type String 有关说明和可用选项,请参阅其他Peon配置:SegmentWriteOutMediumFactory

    KafkaSupervisorIOConfig

    字段 类型 描述 是否必须
    topic String Kafka topic可供阅读。这必须是特定topic,因为不支持topic模式。
    consumerProperties Map 要传递给Kafka consumer的属性的映射。这必须包含一个属性bootstrap.servers,其中包含以下形式的Kafka代理列表:<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...。对于SSL连接,keystoretruststorekey密码可以作为一个被提供密码提供或字符串密码。
    pollTimeout Long 等待kafka consumer轮询记录的时间长度,以毫秒为单位 否(默认== 100)
    replicas Integer 副本的数量,其中1表示一组任务(无复制)。副本任务将始终分配给不同的工作人员,以提供针对流程故障的弹性。 否(默认== 1)
    taskCount Integer 副本集的最大读取任务数。这意味着最大读取任务数为taskCount * replicas,而总任务数(读取+发布)将高于此值。有关详细信息,请参阅下面的“容量规划”。如果任务计数为> {numKafkaPartitions},则读取任务的数量将小于taskCount 否(默认== 1)
    taskDuration ISO8601 Period 任务停止读取并开始发布其段之前的时间长度 否(默认== PT1H)
    startDelay ISO8601 Period 在Supervisor开始管理任务之前等待的时间。 否(默认== PT5S)
    period ISO8601 Period Supervisor执行其管理逻辑的频率。请注意,管理程序也将运行以响应某些事件(例如任务成功,失败和达到tasksDuration),因此该值指定迭代之间的最长时间。 否(默认== PT30S)
    useEarliestOffset Boolean 如果Supervisor第一次管理dataSource,它将从Kafka获得一组起始偏移量。此标志确定它是否检索Kafka中的最早或最新偏移量。在正常情况下,后续任务将从前一个段结束的位置开始,因此该标志仅在首次运行时使用。 否(默认== false)
    completionTimeout ISO8601 Period 在将发布任务声明为失败并终止之前等待的时间长度。如果设置得太低,您的任务可能永远不会发布。任务的发布时间大致在taskDuration过去后开始。 否(默认== PT30M)
    lateMessageRejectionPeriod ISO8601 Period 配置任务以拒绝时间戳早于创建任务之前的时间段的消息;例如,如果将此设置为PT1H并且主管在2016-01-01T12:00Z创建任务,则将删除时间戳早于2016-01-01T11:00Z的消息。如果您的数据流具有延迟消息并且您有多个需要在相同段上运行的管道(例如,实时和夜间批量提取管道),这可能有助于防止并发问题。 否(默认==none)
    earlyMessageRejectionPeriod ISO8601 Period 配置任务以在任务到达taskDuration后拒绝时间戳晚于此时间段的消息; 例如,如果设置为PT1H,则taskDuration设置为PT1H,并且主管在2016-01-01T12:00Z创建任务,将删除时间戳晚于2016-01-01T14:00Z的消息。**注意:**任务有时会超过其任务持续时间,例如,在主管故障转移的情况下。将earlyMessageRejectionPeriod设置得太低可能会导致在任务超过其最初配置的任务持续时间时意外删除消息。 否(默认==none)
    展开全文
  • CentOS7 配置kafka服务

    2019-03-27 16:40:24
    1.进入/lib/systemd/system ...2.编辑文本kafka.service vi kafka.service 输入 i 插入 i 内容如下: [Unit] Description=kafka After=network.target remote-fs.target nss-lookup.target zook...

    1.进入/lib/systemd/system

    cd /lib/systemd/system

     2.编辑文本kafka.service

    vi kafka.service

    输入 i  插入 

    i

     内容如下:

    [Unit]
    Description=kafka
    After=network.target remote-fs.target nss-lookup.target zookeeper.service
    
    [Service]
    Type=forking
    Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8.0_201/bin"
    ExecStart=/usr/local/kafka_2.12-2.1.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.1.1/config/server.properties
    ExecReload=/bin/kill -s HUP $MAINPID
    ExecStop=/usr/local/kafka_2.12-2.1.1/bin/kafka-server-stop.sh
    PrivateTmp=true
    
    [Install]
    WantedBy=multi-user.target
    

    输入 :

    输入 wq  保存退出

    备注:

    [service]

    Type 服务启动类型  forking:后台启动

    Environment 为依赖的环境,“/usr/local/jdk1.8.0_201/bin”是自己jdk的安装路径

    ExecStart 定义启动的命令,“/usr/local/kafka_2.12-2.1.1/” 为自己的kafka安装路径,“-daemon”:后台启动

    ExecReload  定义重启命令,“/bin/kill -s HUP $MAINPID” 通用重启命令

    ExecStop 定义停止命令

    PrivateTmp 是否分配独立空间

    3.刷新配置

    systemctl daemon-reload

    4.启动服务

    #启动
    systemctl start kafka
    #查看状态
    systemctl status kafka -l
    #停止
    systemctl stop kafka
    

    5.zookeeper配置成服务

    zookeeper.service

    [Unit]
    Description=Zookeeper
    After=network.target remote-fs.target nss-lookup.target
    
    [Service]
    Type=forking
    Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8.0_201/bin"
    ExecStart=/usr/local/zookeeper-3.4.13/bin/zkServer.sh start /usr/local/zookeeper-3.4.13/conf/zoo.cfg
    ExecReload=/bin/kill -s HUP $MAINPID
    ExecStop=/usr/local/zookeeper-3.4.13/bin/zkServer.sh stop
    PrivateTmp=true
    
    [Install]
    WantedBy=multi-user.target
    

     

    展开全文
  • kafka-server-stop.sh不起作用,kafka关不了 修改kafka-server-stop.sh文件 cd ~/modules/kafka_2.11/bin vi kafka-server-stop.sh 此时可以把PIDS=$(ps ax | grep -i ‘kafka.Kafka’ | grep java | grep -v grep | ...

    kafka-server-stop.sh不起作用,kafka关不了

    修改kafka-server-stop.sh文件
    cd ~/modules/kafka_2.11/bin

    vi kafka-server-stop.sh

    此时可以把PIDS=$(ps ax | grep -i ‘kafka.Kafka’ | grep java | grep -v grep | awk ‘{print $1}’)这行代码修改为

    PIDS=$(jps -lm | grep -i ‘kafka.Kafka’ | awk ‘{print $1}’)

    展开全文
  • kafka

    2021-07-07 16:33:54
    kafka安装 启动zookeeper bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 关闭zk bin/zookeeper-server-stop.sh -daemon config/zookeeper.properties ...停止kafka bin/kafka-server-stop.
  • 启动kafka:sh /app/pet_kafka_xxxx_cluster/bin/kafka-server-start.sh -daemon /app/pet_kafka_xxxx_cluster/config/server....停止kafka:/app/pet_kafka_xxxx_cluster/bin/kafka-server-stop.shpet_kafka...
  • Kafka 服务器集群部署

    2018-07-24 06:49:19
    上篇文章 Kafka 工作机制 讲述了 Kafka 的各组件(包括配置中心、Broker、消息生产者和消费者)的作用,分区与复制的机制等。有了这些概念,本文以三个 Broker 为例,讲述了 Kafka 集群的搭建步骤和方法,并以官方自带...
  • kafka配置文件如下: broker.id=1 port=9092 host.name=ssy-kafka1 num.network.threads=4 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=...
  • apache kafka消息服务

    千次阅读 2016-06-14 17:05:25
    直到broker堆积数据为18GB为止(停止Producer运行)。 启动Consumer,不间断从broker获取数据, 直到全部数据读取完成为止, 最后查看Producer== Consumer数据 ,没有出现卡死或broker不响应现象 数据...
  • 情况1:使用官方文档上的命令启动,ctrl+c退出后,服务进程关闭。 bin/kafka-server-start.sh config/server.properties 情况2:增加&符后台允许,ctrl+c后进程存在,但是退出服务器控制台后,服务进程关闭。 ...
  • Kafka 服务器安装 目录: 安装 zookipper 安装 Kafka 测试 创建 topic 启动生产者 启动消费者 关闭 服务 1 Zookipper 1.1 Zookipper 简介 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个...
  • Kafka

    2019-01-09 12:45:34
    1.kafka是什么?使用场景? kafka是一个高吞吐的分布式消息队列系统。特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。消息列队常见场景:系统之间解耦合、峰值压力缓冲、...
  • KAFKA

    2019-11-25 20:31:05
    一、Kafka介绍 1、Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的...web/nginx日志、访问日志,消息服务等等,用scala语言...
  • kafka单节点部署 1、Kafka官网下载安装包 http://kafka.apache.org/downloads.html 官网提供了源码包(需自行编译)和编译...kafka自带了zookeeper组件,单节点kafka服务我们可以使用自带的zookeeper组件。在config文
  • Kafka Indexing Service 是 Druid 推出的利用 Druid 的索引服务实时消费 Kafka 数据的插件。该插件会在 Overlord 中启动一个 supervisor,supervisor 启动之后会负责创建task、调度task到Middlemanager中运行,并...
  • kafka停止consumer,producer等进程

    千次阅读 2018-04-22 17:28:18
    kafka停止consumer,producer等进程 1.官网给出的方案: You can now stop the console consumer, the console producer, the Wordcount application, the Kafka broker and the ZooKeeper server in order via ...
  • kafka服务自动关闭

    2019-02-27 14:29:00
    kafka启动的时候添加守护进程 bin/kafka-server-start.sh -daemon ./config/server.properties & 问题原因: 待补充。。。 转载于:https://www.cnblogs.com/chuijingjing/p/10443490.html...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 24,262
精华内容 9,704
关键字:

kafka停止服务