精华内容
下载资源
问答
  • kafka模拟生产者、消费者,集群模式,若是单机版,将ip端口组改为相应ip端口即可;
  • 基于kafka模拟生产者和消费者

    千次阅读 2018-11-14 19:43:03
    //通过shell命令发送消息(模拟生产者) /usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --broker-list 192.168.88.130:9092 --topic test //通过shell消费消息(模拟消费者,另一客户端) /usr/...

    zookeeper的启动脚本:

    #!/bin/sh
    echo "start zookeeper server..."
    
    hosts="hadoop0300 hadoop0301 hadoop0302"
    
    for host in $hosts
    do
      ssh $host  "source /etc/profile; /root/app/zookeeper-3.4.7/bin/zkServer.sh start"
    done
    

    kafka的启动脚本:

    #!/bin/bash
    
    for host in hadoop0300 hadoop0301 hadoop0302
    do
    echo $host
    ssh root@$host "source /etc/profile;/usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/server.properties"
    done
    

    //时间同步
    ntpdate -u ntp.api.bz

    //启动kafka
    /usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.11-0.9.0.1/config/server.properties

    //创建一个topci为test
    /usr/local/kafka_2.11-0.9.0.1/bin./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    //查看当前集群里面所有的topic
    /usr/local/kafka_2.11-0.9.0.1/bin/kafka-topics.sh --list --zookeeper 192.168.88.130:2181

    //通过shell命令发送消息(模拟生产者)
    /usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --broker-list 192.168.88.130:9092 --topic test

    //通过shell消费消息(模拟消费者,另一客户端)
    /usr/local/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --zookeeper 192.168.88.130:2181 --from-beginning --topic test

    //如果报的是下面的错
    kafka.common.FailedToSendMessageException Failed to send messages after 3 tries
    解决:将server.properties里面的host.name该为自己的ip地址

     

    ProducerDemo模拟生产者:
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    import java.util.Properties;
    
    public class ProducerDemo {
        public static void main(String[] args) {
            //创建producer配置信息,发到哪里去
            Properties pro = new Properties();
            //指定消息发送到kafka集群
            pro.put("metadata.broker.list","192.168.88.130:9092,192.168.88.131:9092,192.168.88.132:9092");
            //指定消息序列化方式
            pro.put("serializer.class","kafka.serializer.StringEncoder");
            //配置信息包装
            ProducerConfig config = new ProducerConfig(pro);
            //1.创建producer
            Producer<String,String> producer = new Producer<String, String>(config);
    
            for (int i = 0; i <= 100; i++) {
                producer.send(new KeyedMessage<String,String>("test","message"+i));
            }
        }
    }
    ConsumerDemo模拟消费者:
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.message.MessageAndMetadata;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    public class ConsumerDemo {
    
        //指定消费的主题(哪个类别的消息)
        private static final String topic = "test";
        //指定线程个数
        private static final Integer thread = 2;
    
        public static void main(String[] args) {
            //创建消费者的配置信息
            Properties pro = new Properties();
            //指定连接zookeeper的消息
            pro.put("zookeeper.connect","192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181");
            //消费者是以组的形式消费,指定消费组信息
            pro.put("group.id","testGroup");
            //配置消费消息的开始位置,从偏移量为0的开始消费,smallest代表从topic的第一条消息开始消费
            //对应的largest:代表从我们的消费者启动之后该topic下新产生的消息开始消费
            pro.put("auto.offset.reset","smallest");
            //
            ConsumerConfig config = new ConsumerConfig(pro);
            //创建消费者
            kafka.javaapi.consumer.ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
            //消费者可以消费多个topic数据,创建一个map存放top信息
            Map<String,Integer> topicMaps = new HashMap<String,Integer>();
            topicMaps.put(topic,thread);
            //创建信息流
            Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap=
            consumer.createMessageStreams(topicMaps);
            //获取topic信息
            List<KafkaStream<byte[],byte[]>> kafkaStreams = consumerMap.get(topic);
            //一直循环kafka拉取消息
            for(final KafkaStream<byte[],byte[]> kafkaStream: kafkaStreams){
                //创建一个线程,消费消息
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        //循环读取每一条消息
                        for(MessageAndMetadata<byte[],byte[]> msg:kafkaStream){
                            //读到一条消息
                            String message =new String(msg.message());
                            System.out.println(message);
                        }
                    }
                }).start();
            }
        }
    }
    

     

     

    展开全文
  • kafka中的重要角色   broker:一台kafka服务器就是一个broker,一个集群可有多个broker,一个broker可以容纳多个topic   topic:可以理解为一个消息队列的名字   partition:分区,为了实现扩展性,一个topic...

    基本概念

    kafka中的重要角色
      broker:一台kafka服务器就是一个broker,一个集群可有多个broker,一个broker可以容纳多个topic
      topic:可以理解为一个消息队列的名字
      partition:分区,为了实现扩展性,一个topic可以分布到多个broker上,一个topic可以被分成多个partition,partition中的每条消息 都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体的顺序。也就是说,一个topic在集群中可以有多个partition 。kafka有Key Hash算法和Round Robin算法两种分区策略。
      producer:消息的生产者,是向kafka发消息的客户端
      consumer:消息消费者,向broker取消息的客户端
      offset:偏移量,用来记录consumer消费消息的位置
      Consumer Group:消费组,消息系统有两类,一是广播,二是订阅发布。

    编码实现

      创建一个生产者

    package sancen.kafka
    
    import java.util.Properties
    
    import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
    
    /**
      * 类名  ProducerDemo
      * 作者   彭三青
      * 创建时间  2018-11-26 9:49
      * 版本  1.0
      * 描述: $ 实现一个生产者,把模拟数据发送到kafka集群
      */
    
    object ProducerDemo {
      def main(args: Array[String]): Unit = {
        // 定义一个接收数据的topic
        val topic = "test"
        // 创建配置信息
        val props = new Properties()
        // 指定序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder")
        // 指定kafka列表
        props.put("metadata.broker.list", "SC01:9092, SC01:9092, SC03:9092")
        // 设置发送数据后的响应方式
        props.put("request.required.acks", "0")
        // 指定分区器
        // props.put("partitioner.class", "kafka.producer.DefaultPartitioner
        // 自定义分区器
        props.put("partitioner.class", "day01.kafka.CustomPartitioner")
        // 创建producer对象
        val config: ProducerConfig = new ProducerConfig(props)
        // 创建生产者对象
        val producer: Producer[String, String] = new Producer(config)
    
        // 模拟数据
        for(i <- 1 to 10000){
          val msg = s"$i : producer send data"
          producer.send(new KeyedMessage[String, String](topic, msg)) //key偏移量,也可以给空 v实际的数据
          Thread.sleep(500)
        }
      }
    }
    

      创建消费者

    package sancen.kafka
    
    import java.util.Properties
    import java.util.concurrent.{ExecutorService, Executors}
    
    import kafka.consumer._
    
    import scala.collection.mutable
    
    /**
      * 类名  ConsumerDemo
      * 作者   彭三青
      * 创建时间  2018-11-26 10:08
      * 版本  1.0
      * 描述: $ 创建一个Consumer消费kafka的数据
      */
    
    class ConsumerDemo(val consumer: String, val stream: KafkaStream[Array[Byte], Array[Byte]]) extends Runnable{
      override def run(): Unit = {
        val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator()
        while (it.hasNext()){
          val data = it.next()
          val topic = data.topic
          val partition = data.partition
          val offset = data.offset
          val msg: String = new String(data.message())
          println(s"Consumer:$consumer, topic:$topic, partiton:$partition, offset:$offset, msg:$msg")
        }
      }
    }
    
    object ConsumerDemo {
      def main(args: Array[String]): Unit = {
        // 定义获取的topic
        val topic = "test"
        // 定义一个map,用来存储多个topic key:topic名称,value:指定线程数用来获取topic的数据
        val topics = new mutable.HashMap[String, Int]() // 要求就要传一个map,可以放一个或者多个topic
        topics.put(topic, 2)
        // 配置信息
        val props = new Properties()
        // 指定consumer组名
        props.put("group.id", "group02")
        // 指定zk列表
        props.put("zookeeper.connect", "SC01:2181,SC02:2181,SC03:2181")
        // 指定offset异常时需要获取的offset值
        props.put("auto.offset.reset", "smallest")
        // 创建配置信息
        val config = new ConsumerConfig(props)
        // 创建consumer对象
        val consumer: ConsumerConnector = Consumer.create(config)
        // 获取数据,返回的map类型中key:topic名称,value:topic对应的数据
        val streams: collection.Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = consumer.createMessageStreams(topics)
        // 获取指定topic的数据
        val stream: Option[List[KafkaStream[Array[Byte], Array[Byte]]]] = streams.get(topic)
        // 创建固定大小的线程池
        val pool: ExecutorService = Executors.newFixedThreadPool(3)
         for(i <- 0 until stream.size){
           pool.execute(new ConsumerDemo(s"Consumer:$i", stream.get(i)))
         }
      }
    }
    

      创建自定义分区类

    package sancen.kafka
    
    import kafka.producer.Partitioner
    import kafka.utils.VerifiableProperties
    import org.apache.kafka.common.utils.Utils
    
    /**
      * 类名  CustomPartitioner
      * 作者   彭三青
      * 创建时间  2018-11-26 20:29
      * 版本  1.0
      * 描述: $
      */
    
    // 要实现自定义分区器必须要继承Partitioner
    class CustomPartitioner(props: VerifiableProperties) extends Partitioner{
      override def partition(key: Any, numPartitions: Int): Int = {
        Utils.abs(key.hashCode) % numPartitions
      }
    }
    

    程序测试

      后台启动kafka集群

    kafka-server-start.sh kafka_2.11-0.9.0.1/config/server.properties &
    

      在kafka集群上创建一个名为test的topic,指定分区为2,一般一个topic对应一个分区

    kafka-topics.sh --create --zookeeper SC01:2181 --replication-factor 2 --partitions 2 --topic test
    

      分别运行ProducerDemo和ConsumerDemo则可以在ConsumerDemo端窗口打印出信息
    在这里插入图片描述

    展开全文
  • EFK 收集messages日志环境准备1. 关闭防火墙 selinux2. 修改主机名称3. 修改vim /etc/hosts文件4.... 创建topic 【一台创建模拟生产,一台模拟消费】13. 安装服务 【192.168.112.153端】14. 查看列表15. 安装elastics

    原理介绍

    kafka

    一种高吞吐量的分布式发布订阅消息系统

    特性:

    • 数据是按照 一定 顺序持久化保存的,可以按需读取

    • 高吞吐量【可支持每秒数十万的消息】

    • 支持通过kafka服务器和消费机集群来分区消息

    • 支持Hadoop并行数据加载

    基本概念

     topic:		主题
     partition:	分区,一个topic会有若干个数据分区
     offset:		偏移量,标识分区每条记录的位置
     broker:		服务的节点
    

    ZooKeeper

    是一个开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务

    ZooKeeper的基本运转流程:

    1. 选举Leader
    2. 同步数据
    3. 选举Leader过程中算法有很多,但要达到的选举标准是一致的。
    4. Leader要具有最高的执行ID,类似root权限
    5. 集群中大多数的机器得到响应并接受选出的Leader

    环境准备

    centos7【最小化,运行和核心数都为2】
    192.168.112.153
    jdk, zookeeper, kafka, filbeat, elasticsearch
    192.168.112.154
    jdk, zookeeper, kafka, filebeat,logstash
    192.168.112.155
    jdk, zookeeper, kafka, filbebeat,kibana
    同步时间——命令
    yum -y install ntpdate
    ntpdate -u ntp.aliyun.com

    1. 关闭防火墙 selinux

    systemctl stop firewalld
    setenforce 0

    2. 修改主机名称

    hostnamectl set-hostname kafka01
    hostnamectl set-hostname kafka02
    hostnamectl set-hostname kafka03

    3. 修改vim /etc/hosts文件

    192.168.112.153 kafka01
    192.168.112.154 kafka02
    192.168.112.155 kafka03

    4. 安装jdk

    rpm -ivh jdk-8u131-linux-x64_.rpm

    5. 安装zookeeper

    tar xzf zookeeper-3.4.14.tar.gz

    6. 改名移动

    mv zookeeper-3.4.14 /usr/local/zookeeper
    进入目录,修改配置文件名
    cd /usr/local/zookeeper/conf/
    mv zoo_sample.cfg zoo.cfg

    7. 修改配置文件

    tickTime=2000 # zk服务器之间的心跳时间
    initLimit=10 # zk连接失败的时间
    syncLimit=5 # zk的同步通信时间
    dataDir=/tmp/zookeeper #zk的数据目录
    clientPort=2181 # zk的监听端口号

    server.1=192.168.112.153:2888:3888 
    server.2=192.168.112.154:2888:3888
    server.3=192.168.112.155:2888:3888
    

    服务器编号,2888:通信端口 3888: 选举端口

    7. 创建myid文件

    mkdir /tmp/zookeeper
    kafka01:
    echo “1” > /tmp/zookeeper/myid
    kafka02:
    echo “2” > /tmp/zookeeper/myid
    kafka03:
    echo “3” > /tmp/zookeeper/myid

    8. 开启zk服务

    /usr/local/zookeeper/bin/zkServer.sh start
    验证服务转态
    /usr/local/zookeeper/bin/zkServer.sh status
    Mode: follower
    Mode: leader
    端口验证
    ss -nlpt|grep java 【2888/3888】
    在这里插入图片描述
    补充说明——
    1、2181:对cline端提供服务

    2、3888:选举leader使用

    3、2888:集群内机器通讯使用

    9. 安装kafka

    tar xf kafka_2.11-2.2.0.tgz
    移动/重命名
    mv kafka_2.11-2.2.0 /usr/local/kafka

    10. 进入目录编辑配置文件

    cd /usr/local/kafka/ config/
    vim server.properties
    kafka01:
    broker.id=0
    advertised.listeners=PLAINTEXT://kafka01:9092
    zookeeper.connect=192.168.112.153:2181,192.168.112.154:2181,192.168.112.155:2181
    kafka02:
    broker.id=1
    advertised.listeners=PLAINTEXT://kafka02:9092
    zookeeper.connect=192.168.112.153:2181,192.168.112.154:2181,192.168.112.155:2181
    kafka03:
    broker.id=2
    advertised.listeners=PLAINTEXT://kafka03:9092
    zookeeper.connect=192.168.112.153:2181,192.168.112.154:2181,192.168.112.155:2181

    11. 开启kafka 服务

    /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
    验证
    ss -nlpt | grep 9092
    端口验证图

    12. 创建topic 【一台创建模拟生产,一台模拟消费】

    /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.112.153:2181 --replication-factor 2 --partitions 3 --topic wg007
    

    查看当前的topic

    /usr/local/kafka/bin/kafka-topics.sh  --list  --zookeeper 192.168.112.136:2181
    

    模拟生产者

     /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic wg007
    

    模拟消费者

    /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.112.153:9092 --topic  wg007 --from-beginning
    

    13. 安装服务 【192.168.112.153端】

    rpm -ivh filebeat-6.8.12-x86_64.rpm
    编辑配置文件
    cd /etc/filebeat/
    mv filebeat.yml filebeat.yml.bat
    vim filebeat.yml

    filebeat.inputs:
    - type: log
      enabled: true
      paths:
        - /var/log/messages
    
    output.kafka:
      enabled: true
      hosts: ["192.168.112.153:9092","192.168.112.154:9092","192.168.112.155:9092"]
      topic: msg
    

    启动服务
    systemctl start filebeat

    14. 查看列表

    /usr/local/kafka/bin/kafka-topics.sh--list--zookeeper192.168.112.153:2181
    

    __consumer_offsets
    msg
    wg007

    15. 安装elasticsearch

    rpm -ivh elasticsearch-6.6.2.rpm
    修改配置文件,启动服务
    systemctl start elasticsearch
    验证端口
    ss -nlpt |grep java 【9200/9300】
    Java端口

    16. 安装服务 【192.168.112.154端】

    rpm -ivh logstash-6.6.0.rpm
    编辑配置文件
    vim /etc/logstash/conf.d/msg.conf

    input{
            kafka{
                    bootstrap_servers => ["192.168.112.153:9092,192.168.112.154:9092,192.168.112.155:9092 "]
                    group_id => "logstash"
                    topics => "msg"
                    consumer_threads => 5
            }
    }
    output{
            elasticsearch{
                            hosts => "192.168.112.153:9200"
                            index => "msg_log-%{+YYYY.MM.dd}"
                    }
    

    17. 启动服务

    systemctl start logstash
    验证端口
    ss -nlpt | grep 9600

    端口验证图

    18. 安装服务 【192.168.112.155端】

    rpm -ivh kibana-6.6.2-x86_64.rpm
    编辑配置文件
    vim /etc/kibana/kibana.yml
    第二行端口 ——5601
    第七行本机ip ——192.168.112.155
    第二十七行es的ip ——192.168.112.153
    启动服务
    systemctl start kibana
    验证端口
    ss -nlpt | grep 5601
    在这里插入图片描述

    问题剖析

    没有index 怎么办?【有filebeat端操作/153】

    1: chmod 777
    2: echo “test” >> /var/log/messages

    最后验证消费

    /usr/local/kafaka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.112.153:9092 --topic msg --from-beginning

    查看索引的日志

    tailf /var/log/filebeat/filebeat
    在这里插入图片描述

    在这里插入图片描述

    展开全文
  • 模拟生产者写入消息到Kafka import java.util.Properties import java.util.concurrent.{Executors} import com.sf.gis.commons import com.sf.gis.commons.utils.{DateTimeUtil, RandomUtil} import org....

    模拟生产者写入消息到Kafka

     

    import java.util.Properties
    import java.util.concurrent.{Executors}
    
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
    import org.apache.kafka.common.serialization.StringSerializer
    import org.apache.log4j.{Level, Logger}
    
    import scala.util.control.Breaks
    
    class TestKafkaProducerStream extends Serializable {
        val className: String = this.getClass.getSimpleName.replace("$", "")
        @transient
        val logger: Logger = Logger.getLogger(className)
        Logger.getLogger("org.apache").setLevel(Level.ERROR)
    
        private final val bootstrapServers = "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"
        private final val topic = "sim_link_kafka"
    
        // 创建Kafka客户端
        val properties = new Properties()
        properties.put("bootstrap.servers", bootstrapServers)
        properties.put("key.serializer",classOf[StringSerializer].getName)
        properties.put("value.serializer",classOf[StringSerializer].getName)
        properties.put("acks","all")
        val kafkaProducer = new KafkaProducer[String,String](properties)
    
        // 累计日志数量
        val count =  new java.util.concurrent.atomic.AtomicInteger(0)
    
        val beginEventIndex = 0
        val totalEvents = beginEventIndex + 2500000
    
        case class MetroFlowUserDO(id:Int, phone:String, mac:String, email:String, activeTime:String)
    
        class ThreadKafkaProducerStream(threadIndex: Int) extends Runnable{
            override def run(): Unit = {
                for (event <- Range(beginEventIndex,totalEvents)){
                    count.incrementAndGet()
                    val id = count.get()
                    val activeTime = DateTimeUtil.getCurrentSystemTime("yyyy-MM-dd HH:mm:ss")
                    val phone = RandomUtil.getRandomPhone
                    val mac = RandomUtil.getRandomMac
                    val email = RandomUtil.getRandomEmail
    
                    val metroFlowUserDO = MetroFlowUserDO(id,phone,mac,email,activeTime)
                    import org.json4s.jackson.Serialization
                    implicit val formats = org.json4s.DefaultFormats
                    val metroFlowUserJson: String = Serialization.write(metroFlowUserDO)
                    // logger.info(metroFlowUserJson)
                    val data = new ProducerRecord[String,String](topic,metroFlowUserJson)
                    // 发送消息到kafka
                    val futureCallback = kafkaProducer.send(data)
                    val answer = futureCallback.get()
                    val answerTopic = answer.topic()
                    // logger.info("Message Send : " + answerTopic)
                }
            }
        }
    
        def analysisRichService(incDay: String) : Unit = {
            val nThreads = 10
            val executorService = Executors.newFixedThreadPool(nThreads)
            try {
                // 提交10个线程 每个线程处理250W数据 累计2500W数据
                for(i <- Range(0,nThreads)){
                    executorService.execute(new ThreadKafkaProducerStream(i))
                }
            }finally {
                executorService.shutdown
                logger.info("ExecutorService isShutdown : " + executorService.isShutdown)
                while(true){
                    // 所有的子线程都结束了
                    if (executorService.isTerminated) {
                        logger.info("累计产生日志数据量为: " + count.get())
                        kafkaProducer.close()
                        new Breaks().break()
                    }
                }
            }
        }
    
        def execute(incDay: String): Unit = {
            analysisRichService(incDay)
        }
    }
    
    object TestKafkaProducerStream extends App {
        @Override
        override def main(args: Array[String]): Unit = {
            val applyInstance = new TestKafkaProducerStream
            var incDay: String = DateTimeUtil.getCurrentSystemTime("yyyyMMdd")
            if(args.length >= 1){
                incDay = args(0)
                applyInstance.logger.info(">>>>>>Get Submit Shell Conf Args:" + incDay)
            }
            applyInstance.execute(incDay)
            applyInstance.logger.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka producer has been finished ")
        }
    }

    涉及到的Util类:

    object DateTimeUtil extends Serializable {
        /**
          * 获取本机当前系统时间并转换为指定日期格式
          * @param  format 日期格式
          * @return
          */
        def getCurrentSystemTime(format: String): String ={
            val currentTimeMillis = System.currentTimeMillis()
            val simpleDateFormat = new SimpleDateFormat(format)
            val time = simpleDateFormat.format(new Date(currentTimeMillis))
            time
        }
    }
    object RandomUtil extends Serializable {
    
        /**
          * 随机生成手机号
          */
        private val telFirst = "134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",")
        private def getNum(start: Int, end: Int) = (Math.random * (end - start + 1) + start).toInt
    
        /**
          * 随机手机号
          */
        def getRandomPhone: String = {
            val index = getNum(0, telFirst.length - 1)
            val first = telFirst(index)
            val second = String.valueOf(getNum(1, 888) + 10000).substring(1)
            val third = String.valueOf(getNum(1, 9100) + 10000).substring(1)
            first + second + third
        }
    
        /**
          * 随机mac地址
          */
        private val SEPARATOR_OF_MAC = ":"
        def getRandomMac: String = {
            val random = new Random
            val mac = Array("%02x".format(random.nextInt(0xff)),"%02x".format(random.nextInt(0xff)), "%02x".format(random.nextInt(0xff)), "%02x".format(random.nextInt(0xff)),"%02x".format(random.nextInt(0xff)), "%02x".format(random.nextInt(0xff)))
            mac.mkString(SEPARATOR_OF_MAC)
        }
    
        /**
          * 随机邮箱
          */
        private val email_suffix = "@qq.com,@163.com,@126.com".split(",")
        private val base = "abcdefghijklmnopqrstuvwxyz0123456789"
        /**
          * 返回Email
          *
          * @param lMin 最小长度
          * @param lMax 最大长度
          * @return
          */
        private def getRandomEmail(lMin: Int, lMax: Int) = {
            val length = getNum(lMin, lMax)
            val sb = new StringBuffer
            var i = 0
            while ( {
                i < length
            }) {
                val number = (Math.random * base.length).toInt
                sb.append(base.charAt(number))
    
                {
                    i += 1; i - 1
                }
            }
            sb.append(email_suffix((Math.random * email_suffix.length).toInt))
            sb.toString
        }
    
        def getRandomEmail: String = getRandomEmail(3, 11)
    
    }

     

    展开全文
  • 使用Java操作kafka 生产者——从本地文件读入,进行消费 消费者——直接打印或写入本地文件
  • flume对接kafka模拟生产者实时生产数据 引言 flume可以实时的监控日志,日志每增加一条,flume都会感知到,然后可以将这条新的数据传到kafka上,实际生产中,用户的每个行为,生成一条数据,存到日志或数据库中,...
  • kafka大数据 生产者消费者实例,java代码实现,完整版
  • * 描述: $ 实现一个生产者,把模拟数据发送到kafka集群 */ object ProducerDemo { def main(args: Array[String]): Unit = { // 定义一个接收数据的topic val topic = "test" // 创建配置信息 val props = ...
  • java代码使用IO流模拟生产者生产数据 import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.io.BufferedReader; import java.io....
  • kafka java API模拟生产者消费者1.Linux启动服务2.生产者代码Producer API消息发送流程Linux上查看生产者队列内容3.消费者代码Consumer APILinux上查看消费者队列内容4.maven依赖 1.Linux启动服务 zkServer.sh start...
  • 添加依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <...生产者 import org.apache.kafka.clients.producer.K

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,916
精华内容 2,366
关键字:

kafka模拟生产者