2019-01-02 13:32:31 ZHBR_F1 阅读数 2464
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2471 人正在学习 去看看 肖滨

1. 基本概念

Flink是一款分布式的计算引擎,它可以用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时地处理一些实时数据流,实时地产生数据的结果;也可以用来做一些基于事件的应用。
kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
kafka名词解释:
producer:生产者。
consumer:消费者。
topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)。
broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

2. 实现思路

实现流处理流程,对应核心要点描述如下:
1 读取配置文件生成配置JobConfig对象,设置Properties对象属性;

JobConfig conf = JobConfig.getInstance();
 if (conf == null)
      throw new ConfigurationException("配置项不能为空!");

Properties props = new Properties();
props.put("bootstrap.servers", conf.getKafkaCluster());
props.put("group.id", conf.getGroupId());

2 获取上下文环境StreamExecutionEnvironment对象与设置并行数;

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  if (conf.getParallelism() != null)
        env.setParallelism(conf.getParallelism());

3 通过topic、属性对象与序列化对象,声明Flink消费kafka的FlinkKafkaConsumer011数据对象与FlinkKafkaConsumer011规则对象;

FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>
                             (conf.getCalcDataTopic(), new SimpleStringSchema(), props);
FlinkKafkaConsumer011<String> ruleConsumer = new FlinkKafkaConsumer011<>
                             (NOTICE_TOPIC, new SimpleStringSchema(), props);

4 将FlinkKafkaConsumer011数据对象加入到上下文环境StreamExecutionEnvironment对象中,并生成DataStream对象;

DataStream<String> dataStream = env.addSource(consumer);

5 声明表达式状态描述符的ExpressionStateDescriptor对象,并指定名称为CalcRuleBroadCastState;

 ExpressionStateDescriptor expressionStateDescriptor = new ExpressionStateDescriptor("CalcRuleBroadCastState");

6 将FlinkKafkaConsumer011规则对象加入到上下文环境StreamExecutionEnvironment对象中生成BroadcastStream,并将ExpressionStateDescriptor对象对象广播到每个task下;

  BroadcastStream<Notice<CalcMSRule>> ruleStream = env.addSource(ruleConsumer)
        .map(new MapFunction<String, Notice<CalcMSRule>>() {
            @Override
            public Notice<CalcMSRule> map(String value) throws Exception {
                Notice<CalcMSRule> notice =   JSON.parseObject(value, 
                                  new TypeReference<DefaultNotice<CalcMSRule>>() {
                });
                return notice;
            }
        }).broadcast(expressionStateDescriptor);

7 将数据流DataStream对象的json字符串转化为对象,并去除空对象;

  DataStream<Measurement> msStream = dataStream
                .map((String jsonStr) -> TSParser.parseMeasurement(jsonStr))
                .filter((Measurement m) -> !m.equals(Measurement.empty()));

8 声明mysql与influxdb数据的连接配置对象;

  MysqlConf mysqlConf = new MysqlConf(conf.getMysqlUrl(), 
               conf.getMysqlUsername(), conf.getMysqlPassword());
  InfluxDBConf influxDBConf = new InfluxDBConf(conf.getInfluxdbAddress(),       
       conf.getInfluxdbDatabase());

9 将数据流DataStream与规则流BroadcastStream连接可以得到新的连接connectedStream,基于connectedStream设置CalcMsFunction实现,来处理新的Stream中的数据记录,可以在每个Task中基于获取到统一的规则配置信息,进而处理用户事件,并设置名称计算点算子。

 DataStream<MeasuringSignal> resultStream = msStream.connect(ruleStream)
                .process(new CalcMsFunction(mysqlConf, influxDBConf)).name("计算点算子");

10 通过指定kafka地址与分组id参数生成kafka生产者FlinkKafkaProducer011对象,将上一步处理结果发送到Flink sink对象中

 FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<>(conf.getKafkaCluster(),              conf.getCalcResultTopic(), new SimpleStringSchema());
 resultStream.map(new MapFunction<MeasuringSignal, String>() {
                @Override
                public String map(MeasuringSignal value) throws Exception {
                    return TSParser.parse(value);
                }
    }).addSink(producer).name("Kafka Sink");

在Flink Job中开启Checkpoint功能,默认每隔500毫秒对Flink Job中的状态进行Checkpointing,以保证流处理过程发生故障后,也能够恢复。

2019-10-10 15:14:49 weidajiangjiang 阅读数 28
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2471 人正在学习 去看看 肖滨

原文链接:https://blog.csdn.net/gyshun/article/details/79710534

1:kafka和flume都是日志系统,kafka是分布式消息中间件,自带存储,提供push和pull存取数据功能。flume分为agent(数据采集器),collector(数据简单处理和写入),storage(存储器)三部分,每一部分都是可以定制的。比如agent采用RPC(Thrift-RPC)、text(文件)等,storage指定用hdfs做。

2:kafka做日志缓存应该是更为合适的,但是 flume的数据采集部分做的很好,可以定制很多数据源,减少开发量。所以比较流行flume+kafka模式,如果为了利用flume写hdfs的能力,也可以采用kafka+flume的方式。

采集层 主要可以使用Flume, Kafka两种技术。

Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.

Kafka:Kafka是一个可持久化的分布式的消息队列。

Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。
正如你们所知Flume内置很多的source和sink组件。然而,Kafka明显有一个更小的生产消费者生态系统,并且Kafka的社区支持不好。希望将来这种情况会得到改善,但是目前:使用Kafka意味着你准备好了编写你自己的生产者和消费者代码。如果已经存在的Flume Sources和Sinks满足你的需求,并且你更喜欢不需要任何开发的系统,请使用Flume。

Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。

Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。于是,如果Flume代理的一个节点崩溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择

Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。

Flume和Kafka可以结合起来使用。通常会使用Flume + Kafka的方式。其实如果为了利用Flume已有的写HDFS功能,也可以使用Kafka + Flume的方式。

2019-01-10 16:46:35 qq_33689414 阅读数 929
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2471 人正在学习 去看看 肖滨

Structured Streaming与Kafka的整合,实现不同json结构解耦

问题:Structured Streaming从kafka的不同topic读取数据,每个topic的value存取的数据格式是不同的。那么怎么使用一套模版代码,分别对多个topic进行读取数据。做到解耦呢?

思考:Structured Streaming读取kafka的操作是一致的,只是对kafka的value值的解析操作和一些参数配置,处理数据的sql是不一样的。可以把解析操作抽象出来处理。通过定义Bean对象,将json解析成对应的Bean,最后通过传入配置文件的方式,将对应的配置信息及sql传入,然后对数据来进行处理,得到需要的数据。


一、具体代码如下:

  • CommonStructuedKafka类
package com.test

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object CommonStructuedKafka {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.kafka").setLevel(Level.WARN)

    // 读取配置文件信息
    val masterUrl = Props.get("master", "local")
    val appName = Props.get("appName", "Test7")
    val className = Props.get("className", "")
    val kafkaBootstrapServers = Props.get("kafka.bootstrap.servers", "localhost:9092")
    val subscribe = Props.get("subscribe", "test")
    val tmpTable = Props.get("tmpTable", "tmp")
    val sparksql = Props.get("sparksql", "select * from tmp")


    val spark = SparkSession.builder()
      .master(masterUrl)
      .appName(appName)
      .getOrCreate()


    // 读取kafka数据
    val lines = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBootstrapServers)
      .option("subscribe", subscribe)
      .load()

    //隐式转换
    import spark.implicits._

    val values = lines.selectExpr("cast(value as string)").as[String]

    val res = values.map { value =>
      // 将json数据解析成list集合
      val list = Tools.parseJson(value, className)
      // 将List转成元组
      Tools.list2Tuple7(list)
    }

    res.createOrReplaceTempView(tmpTable)

    val result = spark.sql(sparksql)

    val query = result.writeStream
      .format("console")
      .outputMode("append")
      .start()

    query.awaitTermination()
  }
}
  • Tools:解析json的工具类
package com.test

import com.google.gson.Gson

import scala.collection.mutable


object Tools {

  def main(args: Array[String]): Unit = {
    val tools = new Tools()
    val res = tools.parse("{'name':'caocao','age':'32','sex':'male'}", "com.test.People")
    println(res)
  }

  def parseJson(json: String, className: String): List[String] = {
    val tools = new Tools()
    tools.parse(json, className)
  }

  // 将List转成Tuple7元组类,这里仅仅是定义7个字段,可以定义更多字段。(ps:这种处理方式很不雅,一时也没想到好办法)
  def list2Tuple7(list: List[String]): (String, String, String, String, String, String, String) = {
    val t = list match {
      case List(a) => (a, "", "", "", "", "", "")
      case List(a, b) => (a, b, "", "", "", "", "")
      case List(a, b, c) => (a, b, c, "", "", "", "")
      case List(a, b, c, d) => (a, b, c, d, "", "", "")
      case List(a, b, c, d, e) => (a, b, c, d, e, "", "")
      case List(a, b, c, d, e, f) => (a, b, c, d, e, f, "")
      case List(a, b, c, d, e, f, g) => (a, b, c, d, e, f, g)
      case _ => ("", "", "", "", "", "", "")
    }
    t
  }
}


class Tools {
  // 通过传进来的Bean的全类名,进行反射,解析json,返回一个List()
  def parse(json: String, className: String): List[String] = {
    val list = mutable.ListBuffer[String]()
    val gson = new Gson()
    val clazz = Class.forName(className)
    val obj = gson.fromJson(json, clazz)
    val aClass = obj.getClass
    val fields = aClass.getDeclaredFields
    fields.foreach { f =>
      val fName = f.getName
      val m = aClass.getDeclaredMethod(fName)
      val value = m.invoke(obj).toString
      list.append(value)
    }
    list.toList
  }
}

  • Props:读取配置文件的工具类
package com.test

import java.io.{FileInputStream, InputStream}
import java.nio.file.{Files, Paths}
import java.util.Properties

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

object Props {
  private val prop = new Properties()

  prop.load(getPropertyFileInputStream)

  /**
    * 在spark-submit中加入--driver-java-options -DPropPath=/home/spark/prop.properties的参数后,
    * 使用System.getProperty("PropPath")就能获取路径:/home/spark/prop.properties如果spark-submit中指定了
    * prop.properties文件的路径,那么使用prop.properties中的属性,否则使用该类中定义的属性
    */
  private def getPropertyFileInputStream: InputStream = {
    var is: InputStream = null
    val filePath = System.getProperty("PropPath")
    if (filePath != null && filePath.length > 0) {
      if (Files.exists(Paths.get(filePath))) {
        is = new FileInputStream(filePath)
      } else {
        println(s"在本地未找到config文件$filePath,尝试在HDFS上获取文件")
        val fs = FileSystem.get(new Configuration())
        if (fs.exists(new Path(filePath))) {
          val fis = fs.open(new Path(filePath))
          is = fis.getWrappedStream
        } else {
          println(s"在HDFS上找不到config文件$filePath,加载失败...")
        }
      }
    } else {
      println(s"未设置配置文件PropPath")
    }
    is
  }


  def get(propertyName: String, defaultValue: String): String = {
    prop.getProperty(propertyName, defaultValue)
  }


  def get(): Properties = {
    println("prop:" + this.prop)
    this.prop
  }


  def reload(): Properties = {
    prop.load(getPropertyFileInputStream)
    prop
  }
}
  • People类和Student类
case class People(name: String, age: String, sex: String) extends Serializable


case class Student(name: String, age: String, sex: String, idNum: String) extends Serializable

二、配置文件

  • people.properties
master=local
appName=Test7
className=com.test.People
kafka.bootstrap.servers=localhost:9092
subscribe=test
tmpTable=tmp
sparksql=select _1 as name, _2 as age, _3 as sex from tmp
  • student.properties
master=local
appName=Test7
className=com.test.Student
kafka.bootstrap.servers=localhost:9092
subscribe=test
tmpTable=tmp
sparksql=select _1 as name, _2 as age, _3 as sex, _4 as idNum from tmp

三、执行

提交Structured Streaming程序,需要加上参数,例如:-DPropPath=/Users/zhangzhiqiang/Documents/test_project/comtest/src/main/resources/people.properties

本地调试,可以Idea的VM Option选项添加。

image

3.1 传入people.properties文件,执行程序,在kafka生产端命令行输入{"name":"caocao","age":"32","sex":"male"},结果显示:

image

3.2 传入student.properties文件,执行程序,在kafka生产端命令行输入{"name":"caocao","age":"32","sex":"male","idNum":"1001"},结果显示:

image


github代码:structuredstreamngdemo项目

2016-10-09 10:01:34 xfg0218 阅读数 3801
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2471 人正在学习 去看看 肖滨
  1. 创建maven工程,pom.xml中增加如下:  
  2.  <dependency>  
  3.         <groupId>org.apache.kafka</groupId>  
  4.         <artifactId>kafka_2.10</artifactId>  
  5.         <version>0.8.2.0</version>  
  6.     </dependency>  
  7.   
  8.   
  9. 2 java代码:  向主题test内写入数据  
  10.   
  11. import java.util.Properties;  
  12. import java.util.concurrent.TimeUnit;  
  13.   
  14. import kafka.javaapi.producer.Producer;  
  15. import kafka.producer.KeyedMessage;  
  16. import kafka.producer.ProducerConfig;  
  17. import kafka.serializer.StringEncoder;  
  18.   
  19.   
  20.   
  21.   
  22. public class kafkaProducer extends Thread{  
  23.   
  24.     private String topic;  
  25.       
  26.     public kafkaProducer(String topic){  
  27.         super();  
  28.         this.topic = topic;  
  29.     }  
  30.       
  31.       
  32.     @Override  
  33.     public void run() {  
  34.         Producer producer = createProducer();  
  35.         int i=0;  
  36.         while(true){  
  37.             producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));  
  38.             try {  
  39.                 TimeUnit.SECONDS.sleep(1);  
  40.             } catch (InterruptedException e) {  
  41.                 e.printStackTrace();  
  42.             }  
  43.         }  
  44.     }  
  45.   
  46.     private Producer createProducer() {  
  47.         Properties properties = new Properties();  
  48.         properties.put("zookeeper.connect""192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk  
  49.         properties.put("serializer.class", StringEncoder.class.getName());  
  50.         properties.put("metadata.broker.list""192.168.1.110:9092,192.168.1.111:9093,192.168.1.112:9094");// 声明kafka broker  
  51.         return new Producer<Integer, String>(new ProducerConfig(properties));  
  52.      }  
  53.       
  54.       
  55.     public static void main(String[] args) {  
  56.         new kafkaProducer("test").start();// 使用kafka集群中创建好的主题 test   
  57.           
  58.     }  
  59.        
  60. }  
  61.   
  62.   
  63.   
  64.   
  65. 3  kafka集群中消费主题test的数据:  
  66. [root@h2master kafka]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginnin  
  67.   
  68. 4   启动java代码,然后在看集群消费的数据如下:  
  69.   
  70. message: 0  
  71. message: 1  
  72. message: 2  
  73. message: 3  
  74. message: 4  
  75. message: 5  
  76. message: 6  
  77. message: 7  
  78. message: 8  
  79. message: 9  
  80. message: 10  
  81. message: 11  
  82. message: 12  
  83. message: 13  
  84. message: 14  
  85. message: 15  
  86. message: 16  
  87. message: 17  
  88. message: 18  
  89. message: 19  
  90. message: 20  
  91. message: 21  

 

 3 kafka 使用Java写消费者,这样 先运行kafkaProducer ,在运行kafkaConsumer,即可得到生产者的数据:

 

Java代码  收藏代码
  1. import java.util.HashMap;  
  2. import java.util.List;  
  3. import java.util.Map;  
  4. import java.util.Properties;  
  5.   
  6. import kafka.consumer.Consumer;  
  7. import kafka.consumer.ConsumerConfig;  
  8. import kafka.consumer.ConsumerIterator;  
  9. import kafka.consumer.KafkaStream;  
  10. import kafka.javaapi.consumer.ConsumerConnector;  
  11.   
  12.   
  13.   
  14.   
  15. /** 
  16.  * 接收数据 
  17.  * 接收到: message: 10 
  18. 接收到: message: 11 
  19. 接收到: message: 12 
  20. 接收到: message: 13 
  21. 接收到: message: 14 
  22.  * @author zm 
  23.  * 
  24.  */  
  25. public class kafkaConsumer extends Thread{  
  26.   
  27.     private String topic;  
  28.       
  29.     public kafkaConsumer(String topic){  
  30.         super();  
  31.         this.topic = topic;  
  32.     }  
  33.       
  34.       
  35.     @Override  
  36.     public void run() {  
  37.         ConsumerConnector consumer = createConsumer();  
  38.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  39.         topicCountMap.put(topic, 1); // 一次从主题中获取一个数据  
  40.          Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
  41.          KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据  
  42.          ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
  43.          while(iterator.hasNext()){  
  44.              String message = new String(iterator.next().message());  
  45.              System.out.println("接收到: " + message);  
  46.          }  
  47.     }  
  48.   
  49.     private ConsumerConnector createConsumer() {  
  50.         Properties properties = new Properties();  
  51.         properties.put("zookeeper.connect""192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181");//声明zk  
  52.         properties.put("group.id""group1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据  
  53.         return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
  54.      }  
  55.       
  56.       
  57.     public static void main(String[] args) {  
  58.         new kafkaConsumer("test").start();// 使用kafka集群中创建好的主题 test   
  59.           
  60.     }  
  61.        
  62. }  
2016-09-19 15:54:16 guangshuishi 阅读数 169
  • 大数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2471 人正在学习 去看看 肖滨
摘要: (1)kafka和flume都是日志系统。kafka是分布式消息中间件,自带存储,提供push和pull存取数据功能。flume分为agent(数据采集器),collector(数据简单处理和写入),storage(存储器)三部分,每一部分都是可以定制的。比如agent采用RPC(Thrift-RPC)、text(文件)等,storage指定用hdfs做。

(2)kafka做日志缓存应该是更为合适的,但是 flume的数据采集部分做的很好,可以定制很多数据源,减少开发量。所以比较流行flume+kafka模式,如果为了利用flume写hdfs的能力,也可以采用kafka+flume的方式。


采集层 主要可以使用Flume, Kafka两种技术。

Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.

Kafka:Kafka是一个可持久化的分布式的消息队列。

• Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。




• 正如你们所知Flume内置很多的source和sink组件。然而,Kafka明显有一个更小的生产消费者生态系统,并且Kafka的社区支持不好。希望将来这种情况会得到改善,但是目前:使用Kafka意味着你准备好了编写你自己的生产者和消费者代码。如果已经存在的Flume Sources和Sinks满足你的需求,并且你更喜欢不需要任何开发的系统,请使用Flume。




• Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。




• Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。于是,如果Flume代理的一个节点奔溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。




• Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。

因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。


Flume和Kafka可以结合起来使用。通常会使用Flume + Kafka的方式。其实如果为了利用Flume已有的写HDFS功能,也可以使用Kafka + Flume的方式。

Kafka、Flume都可以实现数据的传输,但它们的侧重点不同。

Kafka追求的是高吞吐量、高负载(topic下可以有多个partition)

Flume追求的是数据的多样性:数据来源的多样性、数据流向的多样性


如果数据来源很单一、想要高吞吐的话可以使用Kafka

如果数据来源很多、数据流向很多的话可以使用Flume

也可以将Kafka和Flume结合起来使用。

Kafka核心思想

阅读数 6006

Kafka中的ACK机制

阅读数 9

没有更多推荐了,返回首页