精华内容
参与话题
问答
  • 使用JMX监控Kafka

    万次阅读 2015-04-10 15:43:31
    Kafka可以配置使用JMX进行运行状态的监控,既可以通过JDK自带Jconsole来观察结果,也可以通过Java API的方式来. 关于监控指标的描述,可以参考:http://kafka.apache.org/documentation.html#monitoring ...
    Kafka可以配置使用JMX进行运行状态的监控,既可以通过JDK自带Jconsole来观察结果,也可以通过Java API的方式来.
    关于监控指标的描述,可以参考:http://kafka.apache.org/documentation.html#monitoring

    开启JMX端口

    修改bin/kafka-server-start.sh,添加JMX_PORT参数,添加后样子如下
    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
        export JMX_PORT="9999"
    fi


    通过Jconsole测试时候可以连接





    通过JavaAPI来访问


    通过以下方法获取目标值
    public class KafkaDataProvider{
        protected final Logger LOGGER = LoggerFactory.getLogger(getClass());
        private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";
        private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";
        private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec";
        private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce";
        private static final String CONSUMER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer";
        private static final String FLOWER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower";
        private static final String ACTIVE_CONTROLLER_COUNT = "kafka.controller:type=KafkaController,name=ActiveControllerCount";
        private static final String PART_COUNT = "kafka.server:type=ReplicaManager,name=PartitionCount";
        public String extractMonitorData() {
            //TODO 通过调用API获得IP以及参数
            KafkaRoleInfo monitorDataPoint = new KafkaRoleInfo();
            String jmxURL = "service:jmx:rmi:///jndi/rmi://192.168.40.242:9999/jmxrmi";
            try {
                MBeanServerConnection jmxConnection = MetricDataUtils.getMBeanServerConnection(jmxURL);
                ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC);
                ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC);
                ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC);
                ObjectName produceRequestsPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC);
                ObjectName consumerRequestsPerSecObj = new ObjectName(CONSUMER_REQUEST_PER_SEC);
                ObjectName flowerRequestsPerSecObj = new ObjectName(FLOWER_REQUEST_PER_SEC);
                ObjectName activeControllerCountObj = new ObjectName(ACTIVE_CONTROLLER_COUNT);
                ObjectName partCountObj = new ObjectName(PART_COUNT);
                Long messagesInPerSec = (Long) jmxConnection.getAttribute(messageCountObj, "Count");
                Long bytesInPerSec = (Long) jmxConnection.getAttribute(bytesInPerSecObj, "Count");
                Long bytesOutPerSec = (Long) jmxConnection.getAttribute(bytesOutPerSecObj, "Count");
                Long produceRequestCountPerSec = (Long) jmxConnection.getAttribute(produceRequestsPerSecObj, "Count");
                Long consumerRequestCountPerSec = (Long) jmxConnection.getAttribute(consumerRequestsPerSecObj, "Count");
                Long flowerRequestCountPerSec = (Long) jmxConnection.getAttribute(flowerRequestsPerSecObj, "Count");
                Integer activeControllerCount = (Integer) jmxConnection.getAttribute(activeControllerCountObj, "Value");
                Integer partCount = (Integer) jmxConnection.getAttribute(partCountObj, "Value");
                monitorDataPoint.setMessagesInPerSec(messagesInPerSec);
                monitorDataPoint.setBytesInPerSec(bytesInPerSec);
                monitorDataPoint.setBytesOutPerSec(bytesOutPerSec);
                monitorDataPoint.setProduceRequestCountPerSec(produceRequestCountPerSec);
                monitorDataPoint.setConsumerRequestCountPerSec(consumerRequestCountPerSec);
                monitorDataPoint.setFlowerRequestCountPerSec(flowerRequestCountPerSec);
                monitorDataPoint.setActiveControllerCount(activeControllerCount);
                monitorDataPoint.setPartCount(partCount);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (MalformedObjectNameException e) {
                e.printStackTrace();
            } catch (AttributeNotFoundException e) {
                e.printStackTrace();
            } catch (MBeanException e) {
                e.printStackTrace();
            } catch (ReflectionException e) {
                e.printStackTrace();
            } catch (InstanceNotFoundException e) {
                e.printStackTrace();
            }
            return monitorDataPoint.toString();
        }
        public static void main(String[] args) {
            System.out.println(new KafkaDataProvider().extractMonitorData());
        }
        /**
         * 获得MBeanServer 的连接
         *
         * @param jmxUrl
         * @return
         * @throws IOException
         */
        public MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException {
            JMXServiceURL url = new JMXServiceURL(jmxUrl);
            JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
            MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
            return mbsc;
        }
    }

    其他工具

    除了自己编写定制化的监控程序外


    kafka-web-console

    https://github.com/claudemamo/kafka-web-console
    部署sbt:
    http://www.scala-sbt.org/0.13/tutorial/Manual-Installation.html
    http://www.scala-sbt.org/release/tutorial/zh-cn/Installing-sbt-on-Linux.html

    KafkaOffsetMonitor

    https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.0
    java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day

    Mx4jLoader


    展开全文
  • Kafka面试题

    万次阅读 2019-06-07 07:40:03
    Kafka是现在流行的消息中间件,在大数据开发面试中被问到的可能性非常大,下面放出一些kafka面试中,经常可能被问到的kafka的面试题,大家可以学习学习: Kafka的用途有哪些?使用场景如何? Kafka中的ISR...

    Kafka是现在流行的消息中间件,在大数据开发面试中被问到的可能性非常大,下面放出一些kafka面试中,经常可能被问到的kafka的面试题,大家可以学习学习:

     

    • Kafka的用途有哪些?使用场景如何?

    • Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么

    • Kafka中的HW、LEO、LSO、LW等分别代表什么?

    • Kafka中是怎么体现消息顺序性的?

    • Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?

    • Kafka生产者客户端的整体结构是什么样子的?

    • Kafka生产者客户端中使用了几个线程来处理?分别是什么?

    • Kafka的旧版Scala的消费者客户端的设计有什么缺陷?

    • “消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?如果正确,那么有没有什么hack的手段?

    • 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

    • 有哪些情形会造成重复消费?

    • 那些情景下会造成消息漏消费?

    • KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?

    • 简述消费者与消费组之间的关系

    • 当你使用kafka-topics.sh创建(删除)了一个topic之后,Kafka背后会执行什么逻辑?

    • topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?

    • topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?

    • 创建topic时如何选择合适的分区数?

    • Kafka目前有哪些内部topic,它们都有什么特征?各自的作用又是什么?

    • 优先副本是什么?它有什么特殊的作用?

    • Kafka有哪几处地方有分区分配的概念?简述大致的过程及原理。

    • 简述Kafka的日志目录结构

    • Kafka中有那些索引文件?

    • 如果我指定了一个offset,Kafka怎么查找到对应的消息?

    • 如果我指定了一个timestamp,Kafka怎么查找到对应的消息?

    • 聊一聊你对Kafka的Log Retention的理解

    • 聊一聊你对Kafka的Log Compaction的理解

    • 聊一聊你对Kafka底层存储的理解(页缓存、内核层、块层、设备层)

    • 聊一聊Kafka的延时操作的原理

    • 聊一聊Kafka控制器的作用

    • 消费再均衡的原理是什么?(提示:消费者协调器和消费组协调器)

    • Kafka中的幂等是怎么实现的?

    • Kafka中的事务是怎么实现的?

    • Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?

    • 失效副本是指什么?有那些应对措施?

    • 多副本下,各个副本中的HW和LEO的演变过程

    • 为什么Kafka不支持读写分离?

    • Kafka在可靠性方面做了哪些改进?(HW, LeaderEpoch)

    • Kafka中怎么实现死信队列和重试队列?

    • Kafka中的延迟队列怎么实现

    • Kafka中怎么做消息审计?

    • Kafka中怎么做消息轨迹?

    • Kafka中有那些配置参数比较有意思?聊一聊你的看法

    • Kafka中有那些命名比较有意思?聊一聊你的看法

    • Kafka有哪些指标需要着重关注?

    • 怎么计算Lag?(注意read_uncommitted和read_committed状态下的不同)

    • Kafka的那些设计让它有如此高的性能?

    • Kafka有什么优缺点?

    • 还用过什么同质类的其它产品,与Kafka相比有什么优缺点?

    • 为什么选择Kafka?

    • 在使用Kafka的过程中遇到过什么困难?怎么解决的?

    • 怎么样才能确保Kafka极大程度上的可靠性?

    • 聊一聊你对Kafka生态的理解

     

     

    展开全文
  • 如何使用JMX监控Kafka

    万次阅读 2016-12-08 21:16:44
    使用kafka做消息队列中间件时,为了实时监控其性能时,免不了要使用jmx调取kafka broker的内部数据,不管是自己重新做一个kafka集群的监控系统,还是使用一些开源的产品,比如yahoo的kafka manager, 其都需要使用jmx...

    欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
    在这里插入图片描述
    欢迎跳转到本文的原文链接:https://honeypps.com/mq/how-to-monitor-kafka-with-jmx/

    使用kafka做消息队列中间件时,为了实时监控其性能时,免不了要使用jmx调取kafka broker的内部数据,不管是自己重新做一个kafka集群的监控系统,还是使用一些开源的产品,比如yahoo的kafka manager, 其都需要使用jmx来监控一些敏感的数据。在kafka官网中 http://kafka.apache.org/082/documentation.html#monitoring 这样说:

    Kafka uses Yammer Metrics for metrics reporting in both the server and the client. This can be configured to report stats using pluggable stats reporters to hook up to your monitoring system.
    The easiest way to see the available metrics to fire up jconsole and point it at a running kafka client or server; this will all browsing all metrics with JMX.

    可见kafka官方也是提倡使用jmx并且提供了jmx的调用给用户以监控kafka.

    本博文通过使用jmx调用kafka的几个监测项属性来讲述下如何使用jmx来监控kafka.
    有关Jmx的使用可以参考:

    在使用jmx之前需要确保kafka开启了jmx监控,kafka启动时要添加JMX_PORT=9999这一项,也就是:

    JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &
    

    博主自行搭建了一个kafka集群,只有两个节点。集群中有一个topic(name=default_channel_kafka_zzh_demo),分为5个partition(0 1 2 3 4).

    这里讨论的kafka版本是0.8.1.x和0.8.2.x,这两者在使用jmx监控时会有差异,差异体现在ObjectName之中。熟悉kafka的同学知道,kafka有topic和partition这两个概念,topic中根据一定的策略来分为若干个partitions, 这里就以此举例来看,
    在0.8.1.x中有关此项的属性的ObjectName(String值)为:
    “kafka.log”:type=“Log”,name=“default_channel_kafka_zzh_demo-*-LogEndOffset”

    而在0.8.2.x中有关的属性的ObjectName为:
    kafka.log:type=Log,name=LogEndOffset,topic=default_channel_kafka_zzh_demo,partition=0

    所以在程序中要区别对待。

    这里采用三个监测项来演示如果使用jmx进行监控:

    1. 上面所说的offset (集群中的一个topic下的所有partition的LogEndOffset值,即logSize)
    2. sendCount(集群中的一个topic下的发送总量,这个值是集群中每个broker中此topic的发送量之和)
    3. sendTps(集群中的一个topic下的TPS, 这个值也是集群中每个broker中此topic的发送量之和)

    首先是针对单个kafka broker的。

    package kafka.jmx;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.management.*;
    import javax.management.remote.JMXConnector;
    import javax.management.remote.JMXConnectorFactory;
    import javax.management.remote.JMXServiceURL;
    import java.io.IOException;
    import java.net.MalformedURLException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Set;
    
    /**
     * Created by hidden on 2016/12/8.
     */
    public class JmxConnection {
        private static Logger log = LoggerFactory.getLogger(JmxConnection.class);
    
        private MBeanServerConnection conn;
        private String jmxURL;
        private String ipAndPort = "localhost:9999";
        private int port = 9999;
        private boolean newKafkaVersion = false;
    
        public JmxConnection(Boolean newKafkaVersion, String ipAndPort){
            this.newKafkaVersion = newKafkaVersion;
            this.ipAndPort = ipAndPort;
        }
    
        public boolean init(){
            jmxURL = "service:jmx:rmi:///jndi/rmi://" +ipAndPort+ "/jmxrmi";
            log.info("init jmx, jmxUrl: {}, and begin to connect it",jmxURL);
            try {
                JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
                JMXConnector connector = JMXConnectorFactory.connect(serviceURL,null);
                conn = connector.getMBeanServerConnection();
                if(conn == null){
                   log.error("get connection return null!");
                    return  false;
                }
            } catch (MalformedURLException e) {
                e.printStackTrace();
                return false;
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
            return true;
        }
    
        public String getTopicName(String topicName){
            String s;
            if (newKafkaVersion) {
                s = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=" + topicName;
            } else {
                s = "\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"" + topicName + "-MessagesInPerSec\"";
            }
            return s;
        }
    
        /**
         * @param topicName: topic name, default_channel_kafka_zzh_demo
         * @return 获取发送量(单个broker的,要计算某个topic的总的发送量就要计算集群中每一个broker之和)
         */
    public long getMsgInCountPerSec(String topicName){
        String objectName = getTopicName(topicName);
        Object val = getAttribute(objectName,"Count");
        String debugInfo = "jmxUrl:"+jmxURL+",objectName="+objectName;
        if(val !=null){
            log.info("{}, Count:{}",debugInfo,(long)val);
            return (long)val;
        }
        return 0;
    }
    
        /**
         * @param topicName: topic name, default_channel_kafka_zzh_demo
         * @return 获取发送的tps,和发送量一样如果要计算某个topic的发送量就需要计算集群中每一个broker中此topic的tps之和。
         */
        public double getMsgInTpsPerSec(String topicName){
            String objectName = getTopicName(topicName);
            Object val = getAttribute(objectName,"OneMinuteRate");
            if(val !=null){
                double dVal = ((Double)val).doubleValue();
                return dVal;
            }
            return 0;
        }
    
        private Object getAttribute(String objName, String objAttr)
        {
            ObjectName objectName =null;
            try {
                objectName = new ObjectName(objName);
            } catch (MalformedObjectNameException e) {
                e.printStackTrace();
                return null;
            }
            return getAttribute(objectName,objAttr);
        }
    
        private Object getAttribute(ObjectName objName, String objAttr){
            if(conn== null)
            {
                log.error("jmx connection is null");
                return null;
            }
    
            try {
                return conn.getAttribute(objName,objAttr);
            } catch (MBeanException e) {
                e.printStackTrace();
                return null;
            } catch (AttributeNotFoundException e) {
                e.printStackTrace();
                return null;
            } catch (InstanceNotFoundException e) {
                e.printStackTrace();
                return null;
            } catch (ReflectionException e) {
                e.printStackTrace();
                return null;
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
        }
    
        /**
         * @param topicName
         * @return 获取topicName中每个partition所对应的logSize(即offset)
         */
        public Map<Integer,Long> getTopicEndOffset(String topicName){
            Set<ObjectName> objs = getEndOffsetObjects(topicName);
            if(objs == null){
                return null;
            }
            Map<Integer, Long> map = new HashMap<>();
            for(ObjectName objName:objs){
                int partId = getParId(objName);
                Object val = getAttribute(objName,"Value");
                if(val !=null){
                    map.put(partId,(Long)val);
                }
            }
            return map;
        }
    
        private int getParId(ObjectName objName){
            if(newKafkaVersion){
                String s = objName.getKeyProperty("partition");
                return Integer.parseInt(s);
            }else {
                String s = objName.getKeyProperty("name");
    
                int to = s.lastIndexOf("-LogEndOffset");
                String s1 = s.substring(0, to);
                int from = s1.lastIndexOf("-") + 1;
    
                String ss = s.substring(from, to);
                return Integer.parseInt(ss);
            }
        }
    
        private Set<ObjectName> getEndOffsetObjects(String topicName){
            String objectName;
            if (newKafkaVersion) {
                objectName = "kafka.log:type=Log,name=LogEndOffset,topic="+topicName+",partition=*";
            }else{
                objectName = "\"kafka.log\":type=\"Log\",name=\"" + topicName + "-*-LogEndOffset\"";
            }
            ObjectName objName = null;
            Set<ObjectName> objectNames = null;
            try {
                objName = new ObjectName(objectName);
                objectNames = conn.queryNames(objName,null);
            } catch (MalformedObjectNameException e) {
                e.printStackTrace();
                return  null;
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
    
            return objectNames;
        }
    }
    

    注意代码中对于两种不同kafka版本的区别处理。对应前面所说的三个检测项的方法为:

    public Map<Integer,Long> getTopicEndOffset(String topicName)
    public long getMsgInCountPerSec(String topicName)
    public double getMsgInTpsPerSec(String topicName)
    

    对于整个集群的处理需要另外一个类来保证,总体上是对集群中的每一个broker相应的值进行累加,且看代码:

    package kafka.jmx;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * Created by hidden on 2016/12/8.
     */
    public class JmxMgr {
        private static Logger log = LoggerFactory.getLogger(JmxMgr.class);
        private static List<JmxConnection> conns = new ArrayList<>();
    
        public static boolean init(List<String> ipPortList, boolean newKafkaVersion){
            for(String ipPort:ipPortList){
                log.info("init jmxConnection [{}]",ipPort);
                JmxConnection conn = new JmxConnection(newKafkaVersion, ipPort);
                boolean bRet = conn.init();
                if(!bRet){
                    log.error("init jmxConnection error");
                    return false;
                }
                conns.add(conn);
            }
            return true;
        }
    
        public static long getMsgInCountPerSec(String topicName){
            long val = 0;
            for(JmxConnection conn:conns){
                long temp = conn.getMsgInCountPerSec(topicName);
                val += temp;
            }
            return val;
        }
    
        public static double getMsgInTpsPerSec(String topicName){
            double val = 0;
            for(JmxConnection conn:conns){
                double temp = conn.getMsgInTpsPerSec(topicName);
                val += temp;
            }
            return val;
        }
    
        public static Map<Integer, Long> getEndOffset(String topicName){
            Map<Integer,Long> map = new HashMap<>();
            for(JmxConnection conn:conns){
                Map<Integer,Long> tmp = conn.getTopicEndOffset(topicName);
                if(tmp == null){
                    log.warn("get topic endoffset return null, topic {}", topicName);
                    continue;
                }
                for(Integer parId:tmp.keySet()){//change if bigger
                    if(!map.containsKey(parId) || (map.containsKey(parId) && (tmp.get(parId)>map.get(parId))) ){
                        map.put(parId, tmp.get(parId));
                    }
                }
            }
            return map;
        }
    
        public static void main(String[] args) {
            List<String> ipPortList = new ArrayList<>();
            ipPortList.add("xx.101.130.1:9999");
            ipPortList.add("xx.101.130.2:9999");
            JmxMgr.init(ipPortList,true);
    
            String topicName = "default_channel_kafka_zzh_demo";
            System.out.println(getMsgInCountPerSec(topicName));
            System.out.println(getMsgInTpsPerSec(topicName));
            System.out.println(getEndOffset(topicName));
        }
    }
    

    运行结果:

    2016-12-08 19:25:32 -[INFO] - [init jmxConnection [xx.101.130.1:9999]] - [kafka.jmx.JmxMgr:20]
    2016-12-08 19:25:32 -[INFO] - [init jmx, jmxUrl: service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi, and begin to connect it] - [kafka.jmx.JmxConnection:35]
    2016-12-08 19:25:33 -[INFO] - [init jmxConnection [xx.101.130.2:9999]] - [kafka.jmx.JmxMgr:20]
    2016-12-08 19:25:33 -[INFO] - [init jmx, jmxUrl: service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi, and begin to connect it] - [kafka.jmx.JmxConnection:35]
    2016-12-08 20:45:15 -[INFO] - [jmxUrl:service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi,objectName=kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=default_channel_kafka_zzh_demo, Count:6000] - [kafka.jmx.JmxConnection:73]
    2016-12-08 20:45:15 -[INFO] - [jmxUrl:service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi,objectName=kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=default_channel_kafka_zzh_demo, Count:4384] - [kafka.jmx.JmxConnection:73]
    10384
    3.915592283987704E-65
    {0=2072, 1=2084, 2=2073, 3=2083, 4=2072}
    

    观察运行结果可以发现 6000+4384 = 10384 = 2072+2084+2073+2083+2072,小伙伴们可以揣摩下原因。
    可以通过jconsole连接service:jmx:rmi:///jndi/rmi://xx.101.130.1:9999/jmxrmi或者service:jmx:rmi:///jndi/rmi://xx.101.130.2:9999/jmxrmi来查看相应的数据值。如下图:
    这里写图片描述

    也可以通过命令行的形式来查看某项数据,不过这里要借助一个jar包:cmdline-jmxclient-0.xx.3.jar,这个请自行下载,网上很多。
    将这个jar放入某一目录,博主这里放在了linux系统下的/root/util目录中,以offset举例:
    0.8.1.x版-读取topic=default_channel_kafka_zzh_demo,partition=0的Value值:

    java -jar cmdline-jmxclient-0.10.3.jar - xx.101.130.1:9999 '"kafka.log":type="Log",name="default_channel_kafka_zzh_demo-0-LogEndOffset"' Value
    

    0.8.2.x版-读取topic=default_channel_kafka_zzh_demo,partition=0的Value值:

    java -jar cmdline-jmxclient-0.10.3.jar - xx.101.130.1:9999 kafka.log:type=Log,name=LogEndOffset,topic=default_channel_kafka_zzh_demo,partition=0
    

    看出规律了嘛?如果还是没有,博主再提示一个小技巧,你可以用Jconsole打开相应的属性,然后将鼠标浮于其上,Jconsole会跳出tooltips来提示怎么拼这些属性的ObjectName.

    欢迎跳转到本文的原文链接:https://honeypps.com/mq/how-to-monitor-kafka-with-jmx/


    欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
    在这里插入图片描述

    展开全文
  • 想使用java jmx监控kafka,关于jmx相关的概念请参考 【Java】java jmx 入门案例 进阶版参考:【Spring】SpringBoot 如何使用JMX 2.kafkal开启Jmx 首先开启kafka端口 linux下 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then...

    在这里插入图片描述

    1.概述

    想使用java jmx监控kafka,关于jmx相关的概念请参考 【Java】java jmx 入门案例

    进阶版参考:【Spring】SpringBoot 如何使用JMX

    2.kafkal开启Jmx

    首先开启kafka端口

    linux下

    if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
        export JMX_PORT="9999"
    fi
    

    window下参考“:【Kafka】Window下kafka开启JMX监控

    3. 入门代码

    package com.kafka.metrics.jmx;
    
    import org.junit.Test;
    
    import javax.management.*;
    import javax.management.remote.JMXConnector;
    import javax.management.remote.JMXConnectorFactory;
    import javax.management.remote.JMXServiceURL;
    
    import static org.junit.Assert.*;
    
    public class JmxMointorTest {
    
        @Test
        public void jmxTest() throws Exception {
            JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi");
            JMXConnector jmxc = JMXConnectorFactory.connect(url);
            MBeanServerConnection connection = jmxc.getMBeanServerConnection();
    
            System.out.println("=========Domains=========");
            String[] domains = connection.getDomains();
            for (String d : domains) {
                System.out.println(d);
            }
    
            System.out.println("=========MBeans=========");
            System.out.println(connection.getMBeanCount());
    
    
            System.out.println("=========Invoke=========");
            ObjectName mBeanName = new ObjectName("kafka.log:type=Log,name=Size,topic=topic-lcc,partition=0");
            // 获取值
            Object value = connection.getAttribute(mBeanName, "Value");
            System.out.println(value);
            // 执行MBean的方法
            Object invoke = connection.invoke(mBeanName, "objectName", null, null);
            System.out.println(invoke);
    
    
            System.out.println("=========MBean Info=========");
            mBeanName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec");
            MBeanInfo info = connection.getMBeanInfo(mBeanName);
            System.out.println("ClassName:" + info.getClassName());
            for (MBeanAttributeInfo attr : info.getAttributes()) {
                System.out.println("属性:" + attr.getName() + ",类型:" + attr.getType() + ",值:" + connection.getAttribute(mBeanName, attr.getName()));
    
            }
            for (MBeanOperationInfo op : info.getOperations()) {
                System.out.println("操作:" + op.getName());
            }
    
            jmxc.close();
        }
    }
    

    这里解释一下
    在这里插入图片描述
    ObjectName 都是有相关格式的。然后运行结果如下

    =========Domains=========
    java.util.logging
    kafka.coordinator
    kafka.utils
    kafka.controller
    java.nio
    kafka.network
    JMImplementation
    kafka.log
    java.lang
    com.sun.management
    kafka.server
    kafka.cluster
    kafka
    =========MBeans=========
    512
    =========Invoke=========
    1571
    kafka.log:type=Log,name=Size,topic=topic-lcc,partition=0
    =========MBean Info=========
    ClassName:com.yammer.metrics.reporting.JmxReporter$Meter
    属性:EventType,类型:java.lang.String,值:bytes
    属性:RateUnit,类型:java.util.concurrent.TimeUnit,值:SECONDS
    属性:MeanRate,类型:double,值:0.0
    属性:OneMinuteRate,类型:double,值:0.0
    属性:FiveMinuteRate,类型:double,值:0.0
    属性:FifteenMinuteRate,类型:double,值:0.0
    属性:Count,类型:long,值:0
    操作:objectName
    
    

    解释:

    1. 输出了当前所有的Domain信息。

    2. 还能输出MBean的总数量。

    3. 输出监控信息才是我们真正需要的:ObjectName其中的值对应在JConsole

    在这里插入图片描述
    4. 每一个具体的Object下面都有两个信息:属性和操作(方法),很容易理解吧

    在这里插入图片描述

    3.高级版

    3.1 JMX RMI方式启动Broker,Consumer,Producer

    -ea -Dcom.sun.management.jmxremote.authenticate=false
    -Dcom.sun.management.jmxremote.ssl=false
    -Dcom.sun.management.jmxremote.port=9996
    

    3.2 通过JMX RMI方式连接

    service:jmx:rmi:///jndi/rmi://127.0.0.1:9998/jmxrmi
    

    3.3 监控数据

    3.3.1 broker

    bean name: kafka:type=kafka.SocketServerStats(每次启动都会清空这部分数据)

    def getProduceRequestsPerSecond: Double
    def getFetchRequestsPerSecond: Double
    def getAvgProduceRequestMs: Double
    def getMaxProduceRequestMs: Double
    def getAvgFetchRequestMs: Double
    def getMaxFetchRequestMs: Double
    def getBytesReadPerSecond: Double
    def getBytesWrittenPerSecond: Double
    def getNumFetchRequests: Long
    def getNumProduceRequests: Long
    def getTotalBytesRead: Long
    def getTotalBytesWritten: Long
    def getTotalFetchRequestMs: Long
    def getTotalProduceRequestMs: Long
    

    bean name: kafka:type=kafka.BrokerAllTopicStat(每次启动都会清空这部分数据)
    bean name: kafka:type=kafka.BrokerTopicStat.topic(每次启动都会清空这部分数据)

    def getMessagesIn: Long  写入消息的数量
    def getBytesIn: Long   写入的byte数量
    def getBytesOut: Long   读出byte的数量
    def getFailedProduceRequest: Long   失败的生产数量
    def getFailedFetchRequest: Long  失败的读取操作数量
    

    不是太重要的属性

    bean name: kafka:type=kafka.LogFlushStats

    def getFlushesPerSecond: Double
    def getAvgFlushMs: Double
    def getTotalFlushMs: Long
    def getMaxFlushMs: Double
    def getNumFlushes: Long
    

    bean name: kafka:type=logs.topic-pattern

    def getName: String    监控项目的名字,格式  topic+-+分区ID,比如 guoguo_t_1-0,guoguo_t_1-1
    def getSize: Long 执久化文件的大小
    def getNumberOfSegments: Int  执久化文件的数量
    def getCurrentOffset: Long   基于当前写入kafka的文件的byte偏移量
    def getNumAppendedMessages: Long    追加数据,每次重启清空
    

    其它的需要监控的数据项目:

    java堆(堆的内存使用情况,非堆的内存使用情况等)
    GC信息(GC次数,GC总时间等)
    

    3.3.2 consumer

    消费者的状态
    bean name: kafka:type=kafka.ConsumerStats

    def getPartOwnerStats: String
    比如:guoguo_t_1: [
      {
           0-1,  //  broker+分区的信息
           fetchoffset: 58246,  取的offset,已经消费的offset
          consumeroffset: 58246
       }{ 0-0,  fetchoffset: 2138747,consumeroffset: 2138747}]
    def getConsumerGroup: String    消费者的组,比如guoguo_group_1
    def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long  有多少byte消息没有读取
    def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long 已经消费了多少byte的数据
    def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
    

    bean name: kafka:type=kafka.ConsumerAllTopicStat (所有topic的数据的汇总,重启数据也会被清空)

    kafka:type=kafka.ConsumerTopicStat.topic(重启数据也会被清空)

    def getMessagesPerTopic: Long
    def getBytesPerTopic: Long
    

    bean name: kafka:type=kafka.SimpleConsumerStats

    def getFetchRequestsPerSecond: Double 每秒种发起的取数据请求数
    def getAvgFetchRequestMs: Double 平均取数据请求用的ms数
    def getMaxFetchRequestMs: Double 最大取数据请求用的ms数
    def getNumFetchRequests: Long 取数据请求的数量
    def getConsumerThroughput: Double 消费者的吞吐量,字节每秒
    

    3.3.3 Producer

    bean name: kafka:type=kafka.KafkaProducerStats

    def getProduceRequestsPerSecond: Double
    def getAvgProduceRequestMs: Double
    def getMaxProduceRequestMs: Double
    def getNumProduceRequests: Long
    

    bean name: kafka.producer.Producer:type=AsyncProducerStats

    def getAsyncProducerEvents: Int (发出消息数量,与所有消费者的getMessagesPerTopic值相关不应太大)
    def getAsyncProducerDroppedEvents: Int
    

    3.4 Demo程序

    package com.kafka.metrics.jmx
    
    import javax.management._
    import kafka.log.LogStatsMBean
    import kafka.network.SocketServerStatsMBean
    import kafka.server.BrokerTopicStatMBean
    import javax.management.openmbean.CompositeData
    import java.lang.management.{MemoryUsage, GarbageCollectorMXBean}
    import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
    
    import collection.JavaConversions._
    
    
    object JmxMointor1 {
    
      def main(args: Array[String]) {
        val jmxUrl: JMXServiceURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi")
        val connector: JMXConnector = JMXConnectorFactory.connect(jmxUrl)
        val mBeanServerconnection: MBeanServerConnection = connector.getMBeanServerConnection
    
        val domains: Array[String] = mBeanServerconnection.getDomains
        println("获取所有的 domains:")
        for (domain <- domains) {
          println("%25s:  %s".format("domain", domain))
        }
    
        println("---------遍历每个domain下的object----------------------")
        val beanSet: java.util.Set[ObjectInstance] = mBeanServerconnection.queryMBeans(null, null)
        val beans: Array[ObjectInstance] = beanSet.toArray(new Array[ObjectInstance](0)).sortWith((o1, o2) => o1.getClassName.compare(o2.getClassName) < 0)
        for (instance <- beans) {
          val objectName: ObjectName = instance.getObjectName
          println("%s %s".format(instance.getClassName, objectName))
        }
    
        println("---------获取网络信息----------------------")
    
    
        /***
          * 经过测试发现kafka 0.10 、 1.1.0 、2.3.0 都没有kafka:type=kafka.SocketServerStats 这个东东  怀疑是自定义的
          */
    //        {
    //      val instance: ObjectName = ObjectName.getInstance("kafka:type=kafka.SocketServerStats")
    //      val bean: SocketServerStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection,
    //        instance,
    //        classOf[SocketServerStatsMBean],
    //        true)
    //      println(instance.getCanonicalKeyPropertyListString)
    //      println("%25s:  %s".format("AvgFetchRequestMs", bean.getAvgFetchRequestMs))
    //      println("%25s:  %s".format("AvgProduceRequestMs", bean.getAvgProduceRequestMs))
    //      println("%25s:  %s".format("BytesReadPerSecond", bean.getBytesReadPerSecond))
    //      println("%25s:  %s".format("BytesWrittenPerSecond", bean.getBytesWrittenPerSecond))
    //      println("%25s:  %s".format("FetchRequestsPerSecond", bean.getFetchRequestsPerSecond))
    //      println("%25s:  %s".format("MaxFetchRequestMs", bean.getMaxFetchRequestMs))
    //      println("%25s:  %s".format("MaxProduceRequestMs", bean.getMaxProduceRequestMs))
    //      println("%25s:  %s".format("NumFetchRequests", bean.getNumFetchRequests))
    //      println("%25s:  %s".format("NumProduceRequests", bean.getNumProduceRequests))
    //      println("%25s:  %s".format("ProduceRequestsPerSecond", bean.getProduceRequestsPerSecond))
    //    }
        println("----------获取内存信息---------------------");
        {
          val objNames: java.util.Set[ObjectName] = mBeanServerconnection.queryNames(
            ObjectName.getInstance("java.lang:type=Memory*"), null)
          val array: Array[ObjectName] = objNames.toArray(new Array[ObjectName](0))
          for (name <- array) {
            val info: _root_.javax.management.MBeanInfo = mBeanServerconnection.getMBeanInfo(name)
            val attrInfos: Array[_root_.javax.management.MBeanAttributeInfo] = info.getAttributes
            println(name.toString)
            for (info <- attrInfos) {
              println(info.getName + " " + info.getType)
              info.getType match {
                case "javax.management.openmbean.CompositeData" =>
                  val attribute: AnyRef = mBeanServerconnection.getAttribute(name, info.getName)
                  val bean: MemoryUsage = MemoryUsage.from(attribute.asInstanceOf[CompositeData])
                  println("%25s:  %s".format("Committed", bean.getCommitted()))
                  println("%25s:  %s".format("Init", bean.getInit()))
                  println("%25s:  %s".format("Max", bean.getMax()))
                  println("%25s:  %s".format("Used", bean.getUsed()))
                case _ =>
              }
            }
          }
        }
        println("------------------获取垃圾回收器信息-------------")
    
        {
          val objNames: java.util.Set[ObjectName] = mBeanServerconnection.queryNames(
            ObjectName.getInstance("java.lang:type=GarbageCollector,name=*"), null)
          val array: Array[ObjectName] = objNames.toArray(new Array[ObjectName](0))
          for (next <- array) {
            val bean: GarbageCollectorMXBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, next, classOf[GarbageCollectorMXBean], true)
            println("%25s:  %s".format("Name", bean.getName()))
            println("%25s:  %s".format("MemoryPoolNames", bean.getMemoryPoolNames()))
            println("%25s:  %s".format("ObjectName", bean.getObjectName()))
            println("%25s:  %s".format("Class", bean.getClass()))
            println("%25s:  %s".format("CollectionCount", bean.getCollectionCount()))
            println("%25s:  %s".format("CollectionTime", bean.getCollectionTime()))
          }
        }
    
    
        println("------------------获取重要信息-------------")
    
        val TypeValuePattern = "(.*):(.*)=(.*)".r
        val kafka1: ObjectName = new ObjectName("kafka", "type", "*")
        val kafka: java.util.Set[ObjectInstance] = mBeanServerconnection.queryMBeans(kafka1, null)
        val kafkas: Array[ObjectInstance] = kafka.toArray(new Array[ObjectInstance](0)).sortWith((o1, o2) => o1.getClassName.compare(o2.getClassName) < 0)
        for (instance <- kafkas) {
          val objectName: ObjectName = instance.getObjectName
          println(instance.getClassName + " " + objectName)
    
          objectName.getCanonicalName match {
            case TypeValuePattern(domain, t, v) =>
              instance.getClassName match {
                case "kafka.log.LogStats" =>
                  val oName: ObjectName = new ObjectName(domain, t, v)
                  val bean: LogStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[LogStatsMBean], true)
                  println("%25s:  %s".format("CurrentOffset", bean.getCurrentOffset))
                  println("%25s:  %s".format("Name", bean.getName()))
                  println("%25s:  %s".format("NumAppendedMessages", bean.getNumAppendedMessages))
                  println("%25s:  %s".format("NumberOfSegments", bean.getNumberOfSegments))
                  println("%25s:  %s".format("Size", bean.getSize()))
                case "kafka.message.LogFlushStats" =>
                  val oName: ObjectName = new ObjectName(domain, t, v)
                  val bean: LogStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[LogStatsMBean], true)
                  println("%25s:  %s".format("CurrentOffset", bean.getCurrentOffset))
                  println("%25s:  %s".format("Name", bean.getName()))
                  println("%25s:  %s".format("NumAppendedMessages", bean.getNumAppendedMessages))
                  println("%25s:  %s".format("NumberOfSegments", bean.getNumberOfSegments))
                  println("%25s:  %s".format("Size", bean.getSize()))
                case "kafka.SocketServerStats" =>
                  val oName: ObjectName = new ObjectName(domain, t, v)
                  val bean: SocketServerStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[SocketServerStatsMBean], true)
                  println("%25s:  %s".format("BytesReadPerSecond", bean.getBytesReadPerSecond))
                  println("%25s:  %s".format("AvgFetchRequestMs", bean.getAvgFetchRequestMs))
                  println("%25s:  %s".format("AvgProduceRequestMs", bean.getAvgProduceRequestMs))
                  println("%25s:  %s".format("BytesWrittenPerSecond", bean.getBytesWrittenPerSecond))
                  println("%25s:  %s".format("FetchRequestsPerSecond", bean.getFetchRequestsPerSecond))
                  println("%25s:  %s".format("MaxFetchRequestMs", bean.getMaxFetchRequestMs))
                  println("%25s:  %s".format("MaxProduceRequestMs", bean.getMaxProduceRequestMs))
                  println("%25s:  %s".format("NumFetchRequests", bean.getNumFetchRequests))
                  println("%25s:  %s".format("NumProduceRequests", bean.getNumProduceRequests))
                  println("%25s:  %s".format("ProduceRequestsPerSecond", bean.getProduceRequestsPerSecond))
                  println("%25s:  %s".format("TotalBytesRead", bean.getTotalBytesRead))
                case "kafka.server.BrokerTopicStat" =>
                  val oName: ObjectName = new ObjectName(domain, t, v)
                  val bean: BrokerTopicStatMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[BrokerTopicStatMBean], true)
                  println("%25s:  %s".format("BytesIn", bean.getBytesIn))
                  println("%25s:  %s".format("BytesOut", bean.getBytesOut))
                  println("%25s:  %s".format("FailedFetchRequest", bean.getFailedFetchRequest))
                  println("%25s:  %s".format("FailedProduceRequest", bean.getFailedProduceRequest))
                  println("%25s:  %s".format("MessagesIn", bean.getMessagesIn))
                case _ =>
              }
            case _ =>
          }
        }
      }
    }
    
    展开全文
  • Kafka JMX

    2018-11-09 16:57:04
    Kafka官网的JMX MBEAN列表: Monitor Common monitoring metrics for producer/consumer/connect/streams Common Per-broker metrics for producer/consumer/connect/streams Producer monitoring Producer ...
  • 一、环境准备 系统: ubuntu16 参考博客 安装zabbix Java gateway. kafka配置配置JMX ... KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10149 -Dcom.sun.management.jmx
  • 大数据硬实战之kafka视频教程

    千人学习 2017-10-29 22:03:52
    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。
  • jconsole监控kafka,并上报 public static void main(String[] args) { //kafka客户端 KafkaConsumer consumer = KafkaClient.... //JMX客户端 Map<String, JMXConnector> jmxCon...
  • Kafka

    万次阅读 2017-11-23 16:06:55
    应用场景 按照搭建hadoop完全分布式集群博文搭建完hadoop集群后,发现hadoop完全分布式集群自带了HDFS,MapReduce,Yarn等基本的服务,一些其他的服务组件需要自己重新安装,比如Hive,Hbase,sqoop...Kafka介绍 K
  • Kafka开启JMX

    2020-06-09 22:25:14
    通过JMX自定义监控 通过JMX监控可以看到的数据有: ...在使用jmx之前需要确保kafka开启了jmx监控,kafka启动时要添加JMX_PORT=9999这一项 通过JMX_PORT 指定连接jmx的端口号 JMX_PORT=9999 bin/kafka-ser...
  • JMX exporter采集kafka监控指标

    千次阅读 2018-11-30 10:13:58
     JMX PORT,修改bin/kafka-server-start.sh,添加一行export JMX_PORT="9999",指定9999端口暴露出来供采集。像配置kafka-manager采集,也是要做这个事情的。 下载jmx_exporter  链接 ...
  • CDH kafka JMX 启动

    2020-04-29 09:49:08
    服务正常启动 telnet 127.0.0.1 9393 就可以,直接 telnet ip 9393 就不通 我们查看CDH broker_java_opts 配置项 原内容为 -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=...
  • kafka打开jmx端口

    千次阅读 2017-03-07 17:11:00
    为什么80%的码农都做不了架构师?>>> ...
  • Kafka开启JMX监控

    2020-09-25 11:29:25
    kafka的安装目录bin下找到启动脚本kafka-run-class.sh ...-Djava.rmi.server.hostname=本机ip //在kafka_JMX_OPS中新增指明本机ip 使用jconsole连接kafka 通过jconsole查看kakfa指标 ...
  • KafkaJMX的配置 1. 修改Kafka安装目录下bin中kafka-run-class.sh文件: yitian@heron01:~/kafka/kafka_2.11-2.0.0/bin$ vim kafka-run-class.sh 添加内容如下: 其中在使用heron01时,需要确保在/etc/...
  • 3. kafka开启JMX

    万次阅读 2018-06-25 23:00:08
    1. 启动kafka时增加JMX_PORT=9988,即JMX_PORT=9988 bin/kafka-server-start.sh -daemon config/server.properties 2. 修改kafka-run-class.sh脚本,第一行增加JMX_PORT=9988即可。 事实上这两种配置方式背后的...
  • 因为需要,需要在windows下开启kafka,然后kafka开启JMX监控 同样是修改kafka-server-start文件,但是修改的是kafka-server-start.bat IF ["%KAFKA_HEAP_OPTS%"] EQU [""] ( set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G set ...
  • kafka】java使用jmx 监控KafkaKafka】Window下kafka开启JMX监控 2.远程连接 kafka jxm远程连接 我是docker,但是我的docker是这样的 宿主机:192.168.100.2 eaglenode 可以正常使用jmx监控Kafka kafkanode 暴露...
  • 监控KafkaJMX

    2020-09-27 11:00:02
    Kafka-2.1.1 + Kafka 集群 GitHub Kafka代码 1. 使用JMX 监控集群 JMX的全称为Java Management Extensions。可以管理、监控正在运行中的Java程序。常用于管理线程,内存,日志Level,服务重启,系统环境等。 ...
  • 在Cloudera Manager上创建了3节点的kafka集群服务,想要外部连接kafka JMX监控kakfa数据,需要修改kakfa配置: -Dcom.sun.management.jmxremote.host=192.168.1.125 -Djava.rmi.server.hostname=192.168.1.125 在下...

空空如也

1 2 3 4 5 ... 20
收藏数 107,746
精华内容 43,098
关键字:

jmx