精华内容
下载资源
问答
  • 重启kafka集群

    2021-04-08 16:49:59
    重启kafka nohup bin/kafka-server-start.sh config/server.properties & 重启zookeeper sh bin/zkServer.sh restart

    重启kafka

    nohup bin/kafka-server-start.sh config/server.properties &
    

    重启zookeeper

    sh bin/zkServer.sh restart
    
    展开全文
  • 重启kafka进程

    千次阅读 2018-11-09 16:24:54
    1,写了一个重启kafka进程的脚本,内容如下: [root@newsclient.dataanalysis.hadoop19v11.syq bin]# more kafka-restart.sh #!/bin/sh source ~/.bash_profile ##java env export JAVA_HOME=/usr/local/java/jdk...

    1,写了一个重启kafka进程的脚本,内容如下:

    [root@newsclient.dataanalysis.hadoop19v11.syq bin]# more kafka-restart.sh
    #!/bin/sh
    source ~/.bash_profile
    ##java env
    export JAVA_HOME=/usr/local/java/jdk1.8.0_112
    export JRE_HOME=/usr/local/java/jdk1.8.0_112/jre
    
    stopApp="kill -9  $(jps | grep Kafka | awk '{print $1}')"
    startApp="/data1/kafka/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh -daemon /data1/kafka/kafka_2.11-0.10.1.0/config/server.properties"
    
    function stop(){
        echo "stopping kafka"
        $stopApp
        echo "stoped success"
    }
    
    function start(){
        echo "starting kafka"
        # $startApp
        COMMAND=$(jps | grep Kafka | awk '{print $1}')
        for((i=1;i<=5;i++));
        do
            if [[ $COMMAND -le 0 ]]; then
                $startApp
                echo "start $i times"
                sleep 10s 
            else
                echo "process alived"
                break
            fi
        done
        echo "started sucess"
    }
    
    function restart(){
        echo "restarting kafka"
        echo "kafka process is $(jps | grep Kafka | awk '{print $1}')"
        stop 
        sleep 10s
        start
        echo "kafka process is $(jps | grep Kafka | awk '{print $1}')"
        echo "restarted success"
    }
    
    
    
    case "$1" in
    	start )
    		start
    		;;
    	stop )
    		stop
    		;;
    	restart )
    		restart
    		;;
    	* )
    		echo "no command"
    		;;
    esac
    exit 0

    2,定时写入计划

    0 */2 * * * . /etc/profile;/data1/kafka/kafka_2.11-0.10.1.0/bin/kafka-restart.sh restart  >> /data1/kafka/kafka_2.11-0.10.1.0/logs/cron.log

    3,执行过程,成功!

     

    展开全文
  • kafka-configs.sh –zookeeper localhost:2181 –entity-type topics –entity-name test –alter –add-config log.retention.hours=87600 kafka-configs.sh –zookeeper localhost:2181 –entity-type topics ...

    默认7天 168小时 现在改成5天
    kafka-configs.sh –zookeeper localhost:2181 –entity-type topics –entity-name test –alter –add-config log.retention.hours=120

    报错

    更改参数解决,需要自己转换毫秒
    kafka-configs.sh –zookeeper localhost:2181 –entity-type topics –entity-name test –alter –add-config retention.ms=43200000
    这里写图片描述

    展开全文
  • 问题需求:在实际工程中,比如淘宝等,重复消费...重启程序后,消费数据前利用redis判断数据是否被消费过,将消费过的数据过滤掉。 选择redis的原因:redis基于内存,对程序的开销影响不大。 代码1:在kafka获取...

    问题需求:在实际工程中,比如淘宝等,重复消费可能导致重复支付问题,导致用户的RMB损失。

    解决办法:利用redis,将消费过的数据存起来,并设置失效时间,以及消费的标志位,消费过的数据标志位为1,未消费的数据标志位为0。重启程序后,消费数据前利用redis判断数据是否被消费过,将消费过的数据过滤掉。

    选择redis的原因:redis基于内存,对程序的开销影响不大。

    代码1:在kafka获取数据并判断

    package sparkStreamingRedis

    import org.apache.kafka.clients.consumer.ConsumerRecord

    import org.apache.kafka.common.serialization.StringDeserializer

    import org.apache.spark.{SparkConf, SparkContext}

    import org.apache.spark.streaming.{Seconds, StreamingContext}

    import org.apache.spark.streaming.dstream.InputDStream

    import org.apache.spark.streaming.kafka010._

    import Utils.RedisUtils

    import redis.clients.jedis.Jedis

    object sparkStreamingKafka {

    def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaSparkStreaming")  

    val context = new SparkContext(conf)

    context.setLogLevel("WARN")                                                                                 //设置日志级别

    val ssc = new StreamingContext(context , Seconds(5))          

    var locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent       //定义kafka的位置策略

    val brokers = "spark1:9092"          //kafka的brokers

    val topic = "sparkdemo"                 //kafka的主题

    val group = "sparkaGroup"             //kafka的分组

    val kafkaParam = Map(                  //定义kafka的配置信息

    "bootstrap.servers"-> brokers,

    "key.deserializer" ->classOf[StringDeserializer],

    "value.deserializer"->classOf[StringDeserializer],

    "group.id"->group,

    "auto.offset.reset"-> "latest",

    "enable.auto.commit" ->(false:java.lang.Boolean)

    );

    var consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(Array(topic), kafkaParam)//消费策略

    var resultDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy)                                                        //从kafka获取数据

    resultDStream.foreachRDD(iter=>{                              //使用foreachPartition遍历每个分片的方式

    if(iter.count() > 0){

    iter.foreachPartition{partitionOfRecords =>{

    partitionOfRecords.foreach(record=>{

    val text=record.value()

    val label=text+": "+record.offset()

    val redisConn:Jedis=RedisUtils.getContion()                    //获取一个redis连接

    if(redisConn.get(label)==null){                                           //key对应的value为空,说明redis中不存在该数据

    redisConn.set(label,"0","NX","EX",120)                             //数据写入redis,设置失效时间为120秒

    println(text)                                                                         //这里打印一下数据就代表消费数据了(简单一点)

    redisConn.setrange(label,0,"1")                                         //setrange方法只会覆盖原value的值,不会覆盖失效时间

    }else if(redisConn.get(label)=="0"){                                    //为"0"时只写入的数据,但是并未消费数据

    println(text) //

    redisConn.setrange(label,0,"1")

    }else{                                                                                   //当为"1"的时候,不需要做任何操作,继续迭代

    }

    RedisUtils.returnConn(redisConn) //将redis连接释放掉

    })

    }

    }

    val ranges: Array[OffsetRange] = iter.asInstanceOf[HasOffsetRanges].offsetRanges

    resultDStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)

    }

    })

    ssc.start()

    ssc.awaitTermination()

    }

    }

    代码2:redis工具类,定义redis的相关属性和方法

    package Utils

    import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

    object RedisUtils {

    val conf=new JedisPoolConfig()

    conf.setMaxTotal(10)                                       //设置最大连接数

    conf.setMaxIdle(5)                                           //设置最大空闲数

    conf.setTestOnBorrow(true)                            //当调用borrow object 方法时,是否进行有效性验证

    val redisPool=new JedisPool(conf,"192.168.116.189",6379,3000,"test")  //定义pool配置,配置信息,主机IP,端口,超时时间,密码

    def getContion(): Jedis = {                              //从连接池获取redis连接

    val redisConn:Jedis=redisPool.getResource()

    redisConn

    }

    def returnConn(redisConn:Jedis){                  //释放redis连接

    redisPool.returnResource(redisConn)

    }

    }

    注:有什么问题可以留言,看到会回复的

    展开全文
  • kafka集群重启方法

    万次阅读 2018-11-20 10:24:47
    3.重启kafka:./startup.sh 4.进入zookeeper:cd /home/tmkj/zookeeper/bin 5.重启zookeeper:./zkServer.sh restart 按以上步骤操作集群中的其他服务器,即可重启kafka集群。 附:kafka startup.sh脚本 ...
  • Kafka重启出错:Corrupt index found

    千次阅读 2019-07-21 23:37:02
    今天发现一台kafka broker宕掉,重启kafka broker集群发现日志中报如下错误,查阅各种资料,解决问题如下 一、出现的问题 Found a corrupted index file due to requirement failed: Corrupt index found, ...
  • 部署集群版的kafka(3节点),删除某些topic数据后或重启kafka,结果在kafka恢复过程中,某一节点每隔几分钟重启一次,导致该节点broker无法使用; 解决方法: 删除 文件目录logs /kfk/zoo/log/version-2/* 删除 ...
  • 1.美图 2.背景 kafka突然发现,有问题了,然后重启,却发现报错,但是zk是好的。
  • 某台kafka服务器负载过高,机器挂掉一段是时间后,kill掉占用内存的进程,然后重启kafka服务,但是一直不能完成启动和数据同步,日志如下fset 0 to broker BrokerEndPoint(11,192.168.207.79,9092)] ) (kafka.server...
  • 为了提升故障处理经验分享、信息互通的技术交流氛围,保障部监控中心在中通之家开通了“排障案例分享”专栏,邀请各位技术...(投稿咨询请联系规划部 徐蕊)接下来这篇《Kafka 节点重启失败导致数据丢失的分析排查与...
  • Kafka集群平滑重启

    万次阅读 2014-12-30 19:54:53
    最近修改公司线上kafka集群配置然后直接kill掉进程来重启集群发现所有生产者都无法写入数据导致丢了数据,栽了一个大坑,接下来的工作肯定是补坑找原因,就分享一下。 系统环境说明:kafka版本为0.8.1.1,kafka集群...
  • 如图,点击kafka ,配置----KafkaMirrorMaker ,现在Topic Whiltelist白名单 重启kafka启动,解决无法启动MirrorMaker 的问题
  • kafka使用K8s部署,并使用NFS做数据持久化配置。 由于kafka集群重新部署时会产生新的ID,如果id不匹配就会导致服务重启。 解决方法: 修改meta.properties 将cluster ID修改成新生成的ID
  • 我们遇到“不能分配内存的问题”导致Kafka Crash,观察kafka maps数超过了6w,实际RedHat 6.4(包括7.2和7.4)默认是65530,即使重启Kafka,也无法运行。于是我们在sysctl.conf文件中,将vm.max_map_count设置为...
  • 我们遇到“不能分配内存的问题”导致Kafka Crash,观察kafka maps数超过了6w,实际RedHat 6.4(包括7.2和7.4)默认是65530,即使重启Kafka,也无法运行。于是我们在sysctl.conf文件中,将vm.max_map_count设置为...
  • kafka 重启consumer 重复消费问题

    千次阅读 2020-09-03 22:47:04
    kafka 重启consumer 重复消费问题 原文链接:https://blog.csdn.net/z1941563559/java/article/details/88753938 问题描述:kafka的某些topic在消费完毕后一段时间,重启唯一消费者,offset会重置为最小offset重新...
  • kafka 重启的一些记录

    千次阅读 2019-05-28 11:14:59
    重启前集群的一些状态记录: [els@els1 wchane-1]$ kafka-topics.sh --zookeeper 10.156.10.126:2181 --describe --topic wchane Topic:wchane PartitionCount:3 ReplicationFactor:2 Configs: Topic: wchane ...
  • Kafka Tools连接不上远程kafka

    千次阅读 2020-07-03 11:52:43
    今天使用Kafka Tools连接阿里云服务器上的kafka时一直连不上 报如下错误: 解决方法如下: 修改kafka配置文件config/server.properties 把31行的注释去掉,...修改完配置文件后重启kafka 就可以连上了 ...
  • 1.概述 因为需要,需要在windows下开启kafka,然后kafka开启JMX监控 同样是修改kafka-server-start文件,但是修改的是kafka-server-start.bat IF ["%KAFKA_HEAP_OPTS%"] EQU [""] ( ...然后重启kafka就好了 ...
  • 前言对于小业务量的业务,往往多个业务共享 kafka 集群,随着业务规模的增长需要不停的增加 topic 或者是在原 topic 的基础上扩容 partition 数,另外一些后来大体量的业务在试水阶段也可能不会部署独立的集群,当...
  • 监控到业务异常,kafka异常 查看kafka日志,发现大量...使用自写的删除topic工具,删除一部分历史topic后,重启kafka,问题解决。之后继续删除topic,文件句柄降到2000。 1.运维规范:kafka历史topic定时清理,可配
  • 我的kafka共有4个节点, 每个topic都是3个副本,然后莫名挂了一个,然后我重启的这个broker,但是发现在重启之后 这个broker上的kafka log里面就更新了一个index文件 ,而且log文件大小为0,然后就没去追leader的...
  • kafka的topic默认是不允许被删除的,删除后在topic后会出现”marked for deletion”字样,实际并未删除,现在创建同样的topic会提示topic...之后重启kafka,会发现之前被标记删除的topic已经不存在了。 kafka操...
  • 背景在 2 月10 号下午大概 1 点半左右,收到用户方反馈,发现日志 kafka 集群 A 主题 的 34 分区选举不了 leader,导致某些消息发送到该分区时,会报如下 no leader 的错误信息:In the middle of a leadership ...
  • kafka常用操作

    千次阅读 2020-05-26 10:16:45
    让kafka支持外网访问 1.修改config/下的server.properties listeners=PLAINTEXT://192.168.230.130:9092 ...2.至此,配置完成,记得重启kafka服务,可以下载kafka tools来验证是否可以外网访问 注:配置完以后,操
  • kafka分区无leader

    2018-07-23 09:28:10
    产生原因:kafka分区无leader 解决方案:1、重启Icop-Rice 2、或重启kafka服务
  • 上次的 Kafka 重启失败事件,对为什么重启失败的原因似乎并没有解释清楚,那么我就在这里按照我对 Kafka 的认识,从源码和日志文件结构去尝试寻找原因。从源码中定位到问题的根源首先把导致 Kafka 进程退出的异常栈...
  • 1. Kafka全部数据清空 kafka全部数据清空的步骤为: 停止每台机器上的kafka; 删除kafka存储目录(server.properties文件log.dirs配置,默认为...重启kafka、如果删除topic还在则需要重启zookeeper; 这里以192.16...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 776
精华内容 310
关键字:

重启kafka