精华内容
下载资源
问答
  • 本文模拟一个用于订单数据分析的实时数据看板 1、需要一个部署在linux服务器上的jar程序来模拟产生实时的订单数据; 2、需要一个工具来采集这些实时的订单数据,本文将通过jar程序将数据写入到一个log文件中,使用...

    实时数据看板项目

    实时数据看板

    简单说来,实时数据看板就是要将实时产生的业务数据,收集起来进行一定分析将分析结果以或文字或图表的形式展示出来

    需求分析

    本文模拟一个用于订单数据分析的实时数据看板
    1、需要一个部署在linux服务器上的jar程序来模拟产生实时的订单数据;
    2、需要一个工具来采集这些实时的订单数据,本文将通过jar程序将数据写入到一个log文件中,使用flume来采集log文件中的数据,既然是要做实时分析,那么flume的sink就不能是hdfs或hive的,因为hdfs需要依托hive才能做数据分析,并且hdfs也不擅长频繁的向文件添加数据,而hive也不擅长频繁的insert数据,并且hive分析速度也较慢,所以文本将flume采集的数据使用Kafka来接收;
    3、Kafka接收到数据后,马上将数据消费,并发送到内存数据库中,比如redis,在内存数据库中进行实时的数据分析,本文模拟到Kafka消费数据即止,不再模拟数据存入数据库和分析的过程。

    项目代码开发

    准备工作

    1、在Kafka中新建一个topic来接收和消费flume发来的数据

    bin/kafka-topics.sh  --create --replication-factor 2 \
    --topic itcast_order --zookeeper node01:2181,node02:2181,node03:2181 --partitions 5
    

    2、IDEA中创建一个maven项目,这个项目需要包含一个将订单数据存入到log文件的类、一个订单数据本身的JavaBean、一个Kafka消费者的类。
    如下是pom文件:

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.41</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    
    
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                	<!-- 模拟实时数据产生的类的路径,需要根据实际更改 -->
                                    <mainClass>realboard.LoggerPrint</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
             <plugin>
                     <artifactId> maven-assembly-plugin </artifactId>
                     <configuration>
                          <descriptorRefs>
                               <descriptorRef>jar-with-dependencies</descriptorRef>
                          </descriptorRefs>
                          <archive>
                               <manifest>
                              		<!-- 模拟实时数据产生的类的路径,需要根据实际更改 -->
                                    <mainClass>realboard.LoggerPrint</mainClass>
                               </manifest>
                          </archive>
                     </configuration>
                     <executions>
                          <execution>
                               <id>make-assembly</id>
                               <phase>package</phase>
                               <goals>
                                    <goal>single</goal>
                               </goals>
                          </execution>
                     </executions>
                </plugin>
        </plugins>
    </build>
    

    代码开发

    订单数据类

    这个类的主要目的是通过random方法生成一条随机的订单数据。

    import com.alibaba.fastjson.JSONObject;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Random;
    import java.util.UUID;
    
    public class PaymentInfo
    {
        private static final long serialVersionUID = -7958315778386204397L;
        private String orderId;//订单编号
        private Date createOrderTime;//订单创建时间
        private String paymentId;//支付编号
        private Date paymentTime;//支付时间
        private String productId;//商品编号
        private String productName;//商品名称
        private long productPrice;//商品价格
        private long promotionPrice;//促销价格
        private String shopId;//商铺编号
        private String shopName;//商铺名称
        private String shopMobile;//商品电话
        private long payPrice;//订单支付价格
        private int num;//订单数量
        /**
         * <Province>19</Province>
         * <City>1657</City>
         * <County>4076</County>
         */
        private String province; //省
        private String city; //市
        private String county;//县
        //102,144,114
        private String catagorys;
        public String getProvince() {
            return province;
        }
        public void setProvince(String province) {
            this.province = province;
        }
    
        public String getCity() {
            return city;
        }
    
        public void setCity(String city) {
            this.city = city;
        }
    
        public String getCounty() {
            return county;
        }
    
        public void setCounty(String county) {
            this.county = county;
        }
    
        public String getCatagorys() {
            return catagorys;
        }
    
        public void setCatagorys(String catagorys) {
            this.catagorys = catagorys;
        }
    
        public PaymentInfo() {
        }
    
        public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
            this.orderId = orderId;
            this.createOrderTime = createOrderTime;
            this.paymentId = paymentId;
            this.paymentTime = paymentTime;
            this.productId = productId;
            this.productName = productName;
            this.productPrice = productPrice;
            this.promotionPrice = promotionPrice;
            this.shopId = shopId;
            this.shopName = shopName;
            this.shopMobile = shopMobile;
            this.payPrice = payPrice;
            this.num = num;
        }
    
        public String getOrderId() {
            return orderId;
        }
    
        public void setOrderId(String orderId) {
            this.orderId = orderId;
        }
    
        public Date getCreateOrderTime() {
            return createOrderTime;
        }
    
        public void setCreateOrderTime(Date createOrderTime) {
            this.createOrderTime = createOrderTime;
        }
    
        public String getPaymentId() {
            return paymentId;
        }
    
        public void setPaymentId(String paymentId) {
            this.paymentId = paymentId;
        }
    
        public Date getPaymentTime() {
            return paymentTime;
        }
    
        public void setPaymentTime(Date paymentTime) {
            this.paymentTime = paymentTime;
        }
    
        public String getProductId() {
            return productId;
        }
    
        public void setProductId(String productId) {
            this.productId = productId;
        }
    
        public String getProductName() {
            return productName;
        }
    
        public void setProductName(String productName) {
            this.productName = productName;
        }
    
        public long getProductPrice() {
            return productPrice;
        }
    
        public void setProductPrice(long productPrice) {
            this.productPrice = productPrice;
        }
    
        public long getPromotionPrice() {
            return promotionPrice;
        }
    
        public void setPromotionPrice(long promotionPrice) {
            this.promotionPrice = promotionPrice;
        }
    
        public String getShopId() {
            return shopId;
        }
    
        public void setShopId(String shopId) {
            this.shopId = shopId;
        }
    
        public String getShopName() {
            return shopName;
        }
    
        public void setShopName(String shopName) {
            this.shopName = shopName;
        }
    
        public String getShopMobile() {
            return shopMobile;
        }
    
        public void setShopMobile(String shopMobile) {
            this.shopMobile = shopMobile;
        }
    
        public long getPayPrice() {
            return payPrice;
        }
    
        public void setPayPrice(long payPrice) {
            this.payPrice = payPrice;
        }
    
        public int getNum() {
            return num;
        }
    
        public void setNum(int num) {
            this.num = num;
        }
    
        @Override
        public String toString() {
            return "PaymentInfo{" +
                    "orderId='" + orderId + '\'' +
                    ", createOrderTime=" + createOrderTime +
                    ", paymentId='" + paymentId + '\'' +
                    ", paymentTime=" + paymentTime +
                    ", productId='" + productId + '\'' +
                    ", productName='" + productName + '\'' +
                    ", productPrice=" + productPrice +
                    ", promotionPrice=" + promotionPrice +
                    ", shopId='" + shopId + '\'' +
                    ", shopName='" + shopName + '\'' +
                    ", shopMobile='" + shopMobile + '\'' +
                    ", payPrice=" + payPrice +
                    ", num=" + num +
                    '}';
        }
    
        public String random() {
            this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
            this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
            this.productPrice = new Random().nextInt(1000);
            this.promotionPrice = new Random().nextInt(500);
            this.payPrice = new Random().nextInt(480);
            this.shopId = new Random().nextInt(200000)+"";
    
            this.catagorys = new Random().nextInt(10000)+","+new Random().nextInt(10000)+","+new Random().nextInt(10000);
            this.province = new Random().nextInt(23)+"";
            this.city = new Random().nextInt(265)+"";
            this.county = new Random().nextInt(1489)+"";
    
            String date = "2015-11-11 12:22:12";
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            try {
                this.createOrderTime = simpleDateFormat.parse(date);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            JSONObject obj = new JSONObject();
            String jsonString = obj.toJSONString(this);
            return jsonString;
            //  return new Gson().toJson(this);
        }
    
    }
    

    将订单数据存到log文件的类

    首先需要配置一个log文件的设置文件,放到maven项目的resources下面,具体配置如下:

    ### 设置###
    log4j.rootLogger = debug,stdout,D,E
    
    ### 输出信息到控制抬 ###
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
    
    ### 输出DEBUG 级别以上的日志到=E://logs/error.log ###
    log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
    #订单数据的输出路径
    log4j.appender.D.File = /export/servers/orderLogs/orderinfo.log
    log4j.appender.D.Append = true
    log4j.appender.D.Threshold = DEBUG 
    log4j.appender.D.layout = org.apache.log4j.PatternLayout
    #log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
    log4j.appender.D.layout.ConversionPattern = %m%n
    
    ### 输出ERROR 级别以上的日志到=E://logs/error.log ###
    log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
    #程序异常信息的输出路径
    log4j.appender.E.File = /export/servers/orderLogs/ordererror.log
    log4j.appender.E.Append = true
    log4j.appender.E.Threshold = ERROR 
    log4j.appender.E.layout = org.apache.log4j.PatternLayout
    #log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
    log4j.appender.E.layout.ConversionPattern =  %m%n
    

    然后写一个类调用数据类的random方法,并将产生的数据存入到log文件

    import org.apache.log4j.Logger;
    
    public class LoggerPrint
    {
        private static Logger logger = Logger.getLogger(LoggerPrint.class);
    
        public static void main(String[] args) throws InterruptedException {
    
            PaymentInfo paymentInfo = new PaymentInfo();
            while (true){
                String random = paymentInfo.random();
                logger.info(random);
                Thread.sleep(500);
            }
        }
    
    }
    

    项目打包,并部署到Linux运行

    将maven项目打成Jar包,文本将Jar包命名为flume_kafka-1.0-SNAPSHOT-jar-with-dependencies.jar,将它上传到Linux机器上,在Jar包的目录下运行以下命令,就可以看log文件产生,并且数据才持续增加。

    java -jar flume_kafka-1.0-SNAPSHOT-jar-with-dependencies.jar
    

    配置flume

    文件的flume要采集log文件的数据并发送给Kafka,flume的conf文件配置如下:

    #为我们的source channel  sink起名
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #指定我们的source收集到的数据发送到哪个管道
    a1.sources.r1.channels = c1
    #指定我们的source数据收集策略
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /export/servers/orderLogs/orderinfo.log
    
    #指定我们的channel为memory,即表示所有的数据都装进memory当中
    a1.channels.c1.type = memory
    #指定我们的sink为kafka  sink,并指定我们的sink从哪个channel当中读取数据
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = itcast_order
    a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    

    配置好conf后,运行flume,将数据发送到Kafka中。

    Kafka消费者的类

    在之前的maven项目中继续开发,新建包,包下新建一个用于Kafka消费者的类。这个类不按顺序消费数据,按分区进行消费,一个分区一个分区的消费,并且手动提交offset。
    代码如下:

    import com.alibaba.fastjson.JSONObject;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    import realboard.PaymentInfo;
    
    import java.util.*;
    
    public class KafkaConsumer
    {
        public static void main(String[] args)
        {
            Properties props=new Properties();
            //指定kafka服务器地址
            props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
            //定义消费者组,是顺便定义的
            props.put("group.id", "order group");
    
            //使用消费者自动提交offset值
            // props.put("enable.auto.commit", "true");
            //每次自动提交offset值的时间间隔
            // props.put("auto.commit.interval.ms",  "1000");
            //使用消费者手动提交offset值
            props.put("enable.auto.commit", "false");
            //定义key的序列化
            props.put("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            //定义value的序列化
            props.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
    
    
            org.apache.kafka.clients.consumer.KafkaConsumer<String,String> consumer=new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
            //订阅名为test的topic
            consumer.subscribe(Arrays.asList("itcast_order"));
    
            while (true)
            {
                ConsumerRecords<String, String> records=consumer.poll(100);
                //按分区消费
                Set<TopicPartition> partitions = records.partitions(); //获取当前topic全部的分区
                for (TopicPartition partition : partitions)
                {
                    List<ConsumerRecord<String,String>> partitionRecords=records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords)
                    {
                        String value=record.value();
                        PaymentInfo paymentInfo=JSONObject.parseObject(value,PaymentInfo.class);
                        System.out.println(paymentInfo.toString());                                
                    }
                    //获取当前分区最后一条数据的offset
                    long lastOffset=partitionRecords.get(partitionRecords.size()-1).offset();
                    //按分区提交offset
                    consumer.commitSync(Collections.singletonMap(partition,
                            new OffsetAndMetadata(lastOffset+1)));
                }
            }
        }
    }
    

    直接运行这个类,可以看到Kafka开始消费数据,后面的数据存入内存数据库和数据分析不继续模拟。

    展开全文
  • [kafka]Kafka Tool模拟数据发送

    千次阅读 2020-10-29 09:45:24
    学习笔记

    学习笔记

    在这里插入图片描述
    在这里插入图片描述

    展开全文
  • https://blog.csdn.net/qq_40180229/article/details/108799761 代码仓库 https://github.com/SmallScorpion/flink-education-online-realtime.git 编写代码 模拟数据收集,编写kafka生产者代码,对应6张表6个topic...

    基于CentOS 7.2的CDH6.3.2离线安装

    https://blog.csdn.net/qq_40180229/article/details/108755530

    基于CentOS 7.2的CDH 6.3.2完整集群添加

    https://blog.csdn.net/qq_40180229/article/details/108756561

    基于CentOS 7.2的CDH 6.3.2 Flink通过Maven构建项目

    https://blog.csdn.net/qq_40180229/article/details/108799761

    代码仓库

    https://github.com/SmallScorpion/flink-education-online-realtime.git

    编写代码

    1. 模拟数据收集,编写kafka生产者代码,对应6张表6个topic,所以编写 6个生产者代码

    在这里插入图片描述
    2. 编写case class用于flink序列化与反序列化

    在这里插入图片描述

    1. 编写flink source与sink的反序列化类和序列化类

    在这里插入图片描述

    展开全文
  • log4j+flume+kafka模拟Spark Streaming流式处理数据 1. java 编程模拟日志产生 /** * 模拟Logger 产生日志 */ public class LoggerGenerator { private static Logger logger = Logger.getLogger...

    log4j+flume+kafka模拟Spark Streaming流式处理数据

    1. java 编程模拟日志产生

    /**
     * 模拟Logger 产生日志
     */
    public class LoggerGenerator {
    
        private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
        
        public static void main(String[] args) throws Exception{
            int index = 0;
            while(true) {
                Thread.sleep(1000);
                logger.info("value : " + index++);
            }
        }
    }
    

    2. log4j.properties文件配置

    log4j.rootLogger=INFO,stdout,flume
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.target = System.out
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
    

    3. 接下来使用log4j将日志输出到flume中,使用Log4J Appender

    1)在log4j.properies文件中添加

    log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
    log4j.appender.flume.Hostname = 192.168.126.31
    log4j.appender.flume.Port = 41414
    log4j.appender.flume.UnsafeMode = true
    

    2)需要添加jar包

    Appends Log4j events to a flume agent’s avro source. A client using this appender must have the flume-ng-sdk in the classpath (eg, flume-ng-sdk-1.6.0.jar).

    需要:flume-ng-sdk.jar

        <dependency>
            <groupId>org.apache.flume.flume-ng-clients</groupId>
            <artifactId>flume-ng-log4jappender</artifactId>
            <version>1.6.0</version>
        </dependency>
    

    flume接收数据agent,resource需要avro

     <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark_version}</version>
            </dependency>
    
            <!-- sparkstreaming kafka-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
                <version>${spark_version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flume.flume-ng-clients</groupId>
                <artifactId>flume-ng-log4jappender</artifactId>
                <version>1.6.0</version>
            </dependency>
    
        </dependencies>
    

    4. log4j_flume.properties 文件配置

    log4j_agent.sources = avro_source
    log4j_agent.channels = memory_channel
    log4j_agent.sinks = kafka_sink
    
    log4j_agent.sources.avro_source.type = avro
    log4j_agent.sources.avro_source.bind = 0.0.0.0
    log4j_agent.sources.avro_source.port = 41414
    
    log4j_agent.channels.memory_channel.type = memory
    log4j_agent.channels.memory_channel.capacity = 10000
    log4j_agent.channels.memory_channel.transactionCapacity = 10000
    
    log4j_agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
    log4j_agent.sinks.kafka_sink.topic = test20
    log4j_agent.sinks.kafka_sink.brokerList = 192.168.126.31:9092
    log4j_agent.sinks.kafka_sink.requiredAcks = 1
    log4j_agent.sinks.kafka_sink.batchSize = 20
    
    log4j_agent.sources.avro_source.channels = memory_channel
    log4j_agent.sinks.kafka_sink.channel = memory_channel
    

    5. 启动测试

    启动flume

    flume-ng agent --name log4j_agent --conf ${FLUME_HOME}/conf --conf-file ${FLUME_HOME}/conf/log4j_flume.properties -Dflume.root.logger=INFO,console

    运行产生日志的程序

    先启动zookeeper

    zkServer.sh start

    启动kafka:

    kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties

    启动kafka消费者,看能否接收到数据

    kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic test20

    6. 开发spark Streaming程序接收kafka 消息

    object KafkaConsumerMsg {
    
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaConsumerMsg")
            val ssc = new StreamingContext(conf, streaming.Seconds(5))
            val topicParams = Map("test20" -> 1)
            val dstream = KafkaUtils.createStream(ssc, "192.168.121.31:2181,192.168.121.32:2181,192.168.121.33:2181", "testgroup_id", topicParams)
            dstream.map(_._2).count().print()
            ssc.start()
            ssc.awaitTermination()
        }
    }
    

    7. 源码

    https://github.com/zhmcode/SsfklProject

    展开全文
  • 通过java api 模拟一个网络流量实时统计。博客地址:http://blog.xiaoxiaomo.com/2016/05/14/Kafka-集群及API操作/
  • 通过java模拟生产环境的日志,flume监控指定目录,采集日志推送到kafka。具体内容可参考 “基于CDH5的flume-kafka对接”这篇
  • kafka模拟生产者、消费者,集群模式,若是单机版,将ip端口组改为相应ip端口即可;
  • import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.... * 模拟产生数据,实时发送Kafka Topic中 * 数
  • Kafka模拟实现(用于自我测试环境)

    千次阅读 2019-01-14 11:02:18
    1.模拟实验 [root@yws85 ~]# cd /opt/software/kafka/ [root@yws85 kafka]# bin/kafka-topics.sh \ &gt; --create \ &gt; --zookeeper 192.168.0.85:2181,192.168.0.86:2181,192.168.0.87:2181/kafka \ &...
  • Kafka消费者-从Kafka读取数据

    千次阅读 2020-07-26 17:24:55
    (1)Customer和Customer Group (1)两种常用的消息模型 队列模型(queuing)和发布-订阅...Kafka为这两种模型提供了单一的消费者抽象模型:消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个.
  • import java.io.{File, PrintWriter} import java.text.SimpleDateFormat import java.util.{Date...import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.spark.sql.SparkSe
  • 实时数据项目Kafka之本地模拟1.大局准备在集群机器上创建两个文件夹2.数据获取3.数据传输4.数据传输 1.大局准备 在集群机器上创建两个文件夹 一个RealTimeDataReceiver4Shell文件夹,一个RealTimeDW4Shell文件夹 ...
  • 如下图,在测试使用nifi往kafka数据的时候,发现通过ambari安装的kafka收不到数据,而通过docker安装的kafka则可以收到数据。 nifi后台log日志报错信息 2020-04-19 11:09:02,916 INFO [Timer-Driven Process ...
  • flume对接kafka模拟生产者实时生产数据 引言 flume可以实时的监控日志,日志每增加一条,flume都会感知到,然后可以将这条新的数据传到kafka上,实际生产中,用户的每个行为,生成一条数据,存到日志或数据库中,...
  • 做一个简单的kafka消息发送订阅功能,使用心跳机制进行监测,监测系统的线上健康状态,通过消息队列给kafka服务端发送健康状态...这里使用SpringBoot Task定时任务 + 时间戳取模 模拟了心跳监测出不同的系统状态给ka...
  • 如使用EMQ企业版,企业版支持数据转发Kafka的插件,但企业版收费。 现需要使用代码的方式将EMQ接收的数据转发到Kafka。 设备模拟: 使用MQTTX模拟设备采集装置向EMQ发送数据: EMQ准备: EMQ安装部署,部署好之后...
  • {"orderMaster":{"area":"周口","orderID":"1207687863","brandId":"Goodme","customerID":"1697","orderStatus":"40","orderChannel":"饿了么","storeCode":"332123","ts":1609840003168},"oredrDetail":{"foodName...
  • Angel - 模拟Kafka数据流调试FTRL的方法 Mac或者Linux版本(Win10的Linux子系统同样适用) 创建一个目录用来安装kafka以及zookeeper等相关软件,比如,新建一个名为streaming的文件夹 mkdir streaming ...
  • 一、kafka 模拟数据: 【1】模拟数据实体类: public class CarDataTest { private String lat; private String lon; private String location; private String status; private String terminaltype; --....
  • java代码使用IO流模拟生产者生产数据 import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.io.BufferedReader; import java.io....
  • producer 消息的生成者,即发布消息 ...broker Kafka以集群的方式运行,可以由一个或多个服务组成,服务即broker zookeeper协调转发 一、创建topic ./kafka-topics.sh --create --zookeeper localhost:2...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 13,239
精华内容 5,295
关键字:

kafka模拟数据