kafka存取大数据_kafka 存取 - CSDN
  • Kafka大数据生态系统中的价值

    千次阅读 2016-03-17 16:15:25
    在最近几年,Apache Kafka的使用量急剧增长。目前Kafka的用户包括Uber,Twitter,Netflix,LinkedIn,Yahoo,Cisco,...本博客解释了为什么Kafka越来越受欢迎,以及它在大数据生态系统的作用。one-size-fits-all模型

    在最近几年,Apache Kafka的使用量急剧增长。目前Kafka的用户包括Uber,Twitter,Netflix,LinkedIn,Yahoo,Cisco,Goldman Sachs等等。Kafka是一个可扩展的发布/订阅系统。用户可以发布大量信息到系统,同样可以通过订阅消费这些数据。本博客解释了为什么Kafka越来越受欢迎,以及它在大数据生态系统的作用。

    one-size-fits-all模型的局限性

    长时间以来,数据库称为存储和处理最有意思数据的主要地点。数据库供应商不断增加新功能,如搜索,流和分析,因此很多有意思的工作都可以在数据库中完成。然后这种模式不再完美有两个原因。第一,数据库变得昂贵,因为人们试图收集其他数据集,如用户行为跟踪记录,操作指标,应用日志等等。对于挖掘新的内容这些数据集和事务数据同样重要,但是大了2~3数量级。因为传统数据库通常依赖于昂贵的高端存储系统(如。SAN),在数据库中存储所有这些数据集变得非常昂贵。第二,随着更多的特征累积,数据库变的越来越复杂,而且增加新的功能同时维持旧的更困难。一个多年发布周期在数据库厂商之间是常见的。

    专业分布系统出现

    为了克服这些局限性,在最近10年间人们开始构建专业系统。这些系统设计为止做一件事情,但是做的非常好。因为简单,而且构建为运行在常见磁盘的分布式系统是可行的。结果是,这些专业的系统比基于SAN的数据库更经济。通常,搭建这样的系统作为开源项目,这样进一步压低了成本。因为这些专业系统专注于一件事,与庞大的数据库相比可以更快发展、提高。Hadoop首创这种模式。它专门从事于离线处理,通过一个分布式文件系统(HDFS)和一个计算引擎(MapReduce)用于存储以及出批数据。通过使用HDFS,公司现在可以承受收集其他有价值的数据集。通过使用Mapreduce,人们可以以非常经济的方式对这些新数据集生成报告和执行分析。这种模式开始被使用于其他领域。
    - Key/Value存储:Cassandra,MongoDB,HBase等等
    - 搜索:Elastic search, Sole等等
    - 流处理:Storm,Spark streaming ,Samza等等
    - 图形:GraphLab,FlockDB等等
    - 时间序列:Open TSDB等等。
    - …
    这些专业系统确保公司提取新的见解,以及构建以前不可能的新的应用。

    满足专业系统

    虽然这些专业系统已经改变了IT行业,它带来了新的挑战:如果向这些专业系统传输数据。首先,这里有各种各样的数据类型,从事务记录,到用户追踪数据,操作指标,服务日志等等。通常,相同数据集需要传输到多个专业系统中。例如,对于离线日志分析应用日志是有用的,他和搜索单个日志条目一样重要。这使得构建一个独立的管道收集每种数据,以及直接传输到每个相关专业系统是不可行的。第二,Hadoop通常保持所有种类数据的副本,从Hadoop传输到所有其他系统是不可行的,因为大多数他们要求数据极近实时的,Hadoop是无法满足的。这是Kafka诞生的原因。Kafka有如下优秀功能。
    - 设计为一个分布系统并且能够在平常磁盘存储大量数据
    - 设计为多订阅的系统。同样的发布数据集可以被消费多次
    - 持久化数据到磁盘,可以同时分发信息以实时或者批consumer没有性能退化
    - 内置冗余,因此可以用来提供所以关键任务数据的可靠性

    大多数这些公司无一例外采用了几种专业系统。他们使用Kafka作为中心位置,实时摄取所有类型数据。Kafka发布相同的数据到不同专业系统。我们涉及到的作为流数据平台的结构如下图所示。增加额外专业系统进来非常容易,因为新系统可以简单的额外订阅Kafka获取数据。

    接下来?

    行业的趋势是多个专业系统共存于大数据生态系统。流数据平台由类似Kafka分布式发布/订阅系统提供支持,并且将扮演越来越重要的角色,因为更多的公司转向与实时处理。目前,大部分如模式化数据curation,不断变化模式延后知道数据加载到Hadoop。在流数据平台这是不好的,因为相同的数据curation处理在其他专业系统重复。更好的方法是提前推理数据被摄取到Kafka持续时间。这是正在做的事情,你可以到网站了解更多详情。

    展开全文
  • 大数据flume的概念原理和flume与kafka的简单对比一、flume的基本概念原理:二、flume与kafka的简单对比: 一、flume的基本概念原理: 1、Apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大...

    大数据flume的概念原理和flume与kafka的简单对比

    一、flume的基本概念原理:

    1、Apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务,或者数集中机制。flume具有高可用,分布式,配置工具,其设计的原理也是基于将数据流,如日志数据从各种网站服务器上汇集起来存储到HDFS,HBase等集中存储器中。

    Flume组成架构:
    在这里插入图片描述
    2、Flume的组成部分:

    ①、Flume事件:
    事件作为Flume内部数据传输的最基本单元。它是由一个转载数据的字节数组(该数据组是从数据源接入点传入,并传输给传输器,也就是HDFS/HBase)和一个可选头部构成。

    ②、Flume Agent:
    Flume内部有一个或者多个Agent,然而对于每一个Agent来说,它就是一共独立的守护进程(JVM),它从客户端哪儿接收收集,或者从其他的 Agent哪儿接收,然后迅速的将获取的数据传给下一个目的节点sink,或者agent.。

    Agent主要由:source,channel,sink三个组件组成。

    Source:
    从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channal,Flume提供多种数据接收的方式,比如Avro,Thrift,twitter1%等。

    Channel:
    channal是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着一共桥梁的作用,channal是一个完整的事务,这一点保证了数据在收发的时候的一致性。并且它可以和任意数量的source和sink链接.。支持的类型有:JDBC channel,File System channel,Memort channel等。

    sink:
    sink将数据存储到集中存储器比如Hbase和HDFS,它从channals消费数据(events)并将其传递给目标地.。目标地可能是另一个sink,也可能HDFS,HBase。

    3、Flume的内部原理:
    在这里插入图片描述

    二、flume与kafka的简单对比:

    1、flume与kafka对比:

    (1)kafka和flume都是日志系统。kafka是分布式消息中间件,自带存储,提供push和pull存取数据功能。flume分为agent(数据采集器),collector(数据简单处理和写入),storage(存储器)三部分,每一部分都是可以定制的。比如agent采用RPC(Thrift-RPC)、text(文件)等,storage指定用hdfs做。

    (2)kafka做日志缓存应该是更为合适的,但是 flume的数据采集部分做的很好,可以定制很多数据源,减少开发量。所以比较流行flume+kafka模式,如果为了利用flume写hdfs的能力,也可以采用kafka+flume的方式。

    温馨提示:该文章为本人自己创作,转载请标明出处。感谢各位!!!谢谢大家!!!
    本人联系方式:
    QQ:961094233
    邮箱:961094233@qq.com

    展开全文
  • flume主要用于日志采集,其中的agent里面包含3个核心的组件:source(采集/输入)—->channel(缓存/管道)—–>sink(输出),类似生产者、仓库、消费者的架构。 source:source组件是专门用来收集...

    flume主要用于日志采集,其中的agent里面包含3个核心的组件:source(采集/输入)—->channel(缓存/管道)—–>sink(输出),类似生产者、仓库、消费者的架构。 
    source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。 
    channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。 
    sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。

    Flume与Kafka对比可参看文章末尾部分

    Flume

    在具体介绍本文内容之前,先给大家看一下Hadoop业务的整体开发流程: 

    è¿éåå¾çæè¿°

    从Hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据的采集是十分重要的一步,也是不可避免的一步,从而引出我们本文的主角—Flume。本文将围绕Flume的架构、Flume的应用(日志采集)进行详细的介绍。 
    (一)Flume架构介绍 
    1、Flume的概念 
     è¿éåå¾çæè¿°
    flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到图中的HDFS,简单来说flume就是收集日志的。 
    2、Event的概念 
    在这里有必要先介绍一下flume中event的相关概念:flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。 
    在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?—–event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。 
    为了方便大家理解,给出一张event的数据流向图: 
     è¿éåå¾çæè¿°
    一个完整的event包括:event headers、event body、event信息(即文本文件中的单行记录),如下所以: 

    è¿éåå¾çæè¿°

    其中event信息就是flume收集到的日记记录。 
    3、flume架构介绍 
    flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent,agent本身是一个java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。 
    agent里面包含3个核心的组件:source—->channel—–>sink,类似生产者、仓库、消费者的架构。 
    source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。 
    channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。 
    sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。 

    4、flume的运行机制 
    flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据的输入——source,一个是数据的输出sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方—-例如HDFS等,注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。 
    5、flume的广义用法 
    flume之所以这么神奇—-其原因也在于flume可以支持多级flume的agent,即flume可以前后相继,例如sink可以将数据写到下一个agent的source中,这样的话就可以连成串了,可以整体处理了。flume还支持扇入(fan-in)、扇出(fan-out)。所谓扇入就是source可以接受多个输入,所谓扇出就是sink可以将数据输出多个目的地destination中。 
     è¿éåå¾çæè¿°
    (二)flume应用—日志采集 
    对于flume的原理其实很容易理解,我们更应该掌握flume的具体使用方法,flume提供了大量内置的Source、Channel和Sink类型。而且不同类型的Source、Channel和Sink可以自由组合—–组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。下面我将用具体的案例详述flume的具体用法。 
    其实flume的用法很简单—-书写一个配置文件,在配置文件当中描述source、channel与sink的具体实现,而后运行一个agent实例,在运行agent实例的过程中会读取配置文件的内容,这样flume就会采集到数据。 
    配置文件的编写原则: 
    1>从整体上描述代理agent中sources、sinks、channels所涉及到的组件

     

       # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1



    2>详细描述agent中每一个source、sink与channel的具体实现:即在描述source的时候,需要 
    指定source到底是什么类型的,即这个source是接受文件的、还是接受http的、还是接受thrift 
    的;对于sink也是同理,需要指定结果是输出到HDFS中,还是Hbase中啊等等;对于channel 
    需要指定是内存啊,还是数据库啊,还是文件啊等等。   

    # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 44444
    
        # Describe the sink
        a1.sinks.k1.type = logger
    
        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100


    3>通过channel将source与sink连接起来

       

     # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1



    启动agent的shell操作:

       

    flume-ng  agent -n a1  -c  ../conf   -f  ../conf/example.file  
        -Dflume.root.logger=DEBUG,console  



    参数说明: -n 指定agent名称(与配置文件中代理的名字相同) 
    -c 指定flume中配置文件的目录 
    -f 指定配置文件 
    -Dflume.root.logger=DEBUG,console 设置日志等级

    具体案例: 
    案例1: NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。 其中 Sink:logger Channel:memory 
    flume官网中NetCat Source描述:

    Property Name Default     Description
    channels       –     
    type           –     The component type name, needs to be netcat
    bind           –  日志需要发送到的主机名或者Ip地址,该主机运行着netcat类型的source在监听          
    port           –  日志需要发送到的端口号,该端口号要有netcat类型的source在监听

         

    a) 编写配置文件:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 192.168.80.80
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    b) 启动flume agent a1 服务端

    flume-ng  agent -n a1  -c ../conf  -f ../conf/netcat.conf   -Dflume.root.logger=DEBUG,console



    c) 使用telnet发送数据

    telnet  192.168.80.80  44444  big data world!(windows中运行的)



    d) 在控制台上查看flume收集到的日志数据: 

    è¿éåå¾çæè¿°
    案例2:NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。 其中 Sink:hdfs Channel:file (相比于案例1的两个变化) 
    flume官网中HDFS Sink的描述: 

    è¿éåå¾çæè¿°

    a) 编写配置文件:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 192.168.80.80
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 0
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    # Use a channel which buffers events in file
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /usr/flume/checkpoint
    a1.channels.c1.dataDirs = /usr/flume/data
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    b) 启动flume agent a1 服务端

     

    flume-ng  agent -n a1  -c ../conf  -f ../conf/netcat.conf   -Dflume.root.logger=DEBUG,console


    c) 使用telnet发送数据

    telnet  192.168.80.80  44444  big data world!(windows中运行的)



    d) 在HDFS中查看flume收集到的日志数据: 
     è¿éåå¾çæè¿°
    案例3:Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。其中 Sink:

    logger Channel:memory 
    flume官网中Spooling Directory Source描述:
    
    Property Name       Default      Description
    channels              –  
    type                  –          The component type name, needs to be spooldir.
    spoolDir              –          Spooling Directory Source监听的目录
    fileSuffix         .COMPLETED    文件内容写入到channel之后,标记该文件
    deletePolicy       never         文件内容写入到channel之后的删除策略: never or immediate
    fileHeader         false         Whether to add a header storing the absolute path filename.
    ignorePattern      ^$           Regular expression specifying which files to ignore (skip)
    interceptors          –          指定传输中event的head(头信息),常用timestamp


    Spooling Directory Source的两个注意事项:

    ①If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing.
    即:拷贝到spool目录下的文件不可以再打开编辑
    ②If a file name is reused at a later time, Flume will print an error to its log file and stop processing.
    即:不能将具有相同文件名字的文件拷贝到这个目录下
    


    a) 编写配置文件:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /usr/local/datainput
    a1.sources.r1.fileHeader = true
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = timestamp
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    b) 启动flume agent a1 服务端

     

    flume-ng  agent -n a1  -c ../conf  -f ../conf/spool.conf   -Dflume.root.logger=DEBUG,console


    c) 使用cp命令向Spooling Directory 中发送数据

     

    cp datafile  /usr/local/datainput   (注:datafile中的内容为:big data world!)
    


    d) 在控制台上查看flume收集到的日志数据: 

    è¿éåå¾çæè¿°

    从控制台显示的结果可以看出event的头信息中包含了时间戳信息。 
    同时我们查看一下Spooling Directory中的datafile信息—-文件内容写入到channel之后,该文件被标记了:

     

    [root@hadoop80 datainput]# ls
    datafile.COMPLETED


    案例4:Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。 其中 Sink:hdfs Channel:file (相比于案例3的两个变化)

    a) 编写配置文件:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /usr/local/datainput
    a1.sources.r1.fileHeader = true
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = timestamp
    
    # Describe the sink
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 0
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    # Use a channel which buffers events in file
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /usr/flume/checkpoint
    a1.channels.c1.dataDirs = /usr/flume/data
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    b) 启动flume agent a1 服务端

    flume-ng  agent -n a1  -c ../conf  -f ../conf/spool.conf   -Dflume.root.logger=DEBUG,console



    c) 使用cp命令向Spooling Directory 中发送数据

     

    cp datafile  /usr/local/datainput   (注:datafile中的内容为:big data world!)



    d) 在控制台上可以参看sink的运行进度日志: 

    è¿éåå¾çæè¿°

    d) 在HDFS中查看flume收集到的日志数据: 

     è¿éåå¾çæè¿°

    è¿éåå¾çæè¿°
    从案例1与案例2、案例3与案例4的对比中我们可以发现:flume的配置文件在编写的过程中是非常灵活的。

    案例5:Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源 
    常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。 其中 Sink:hdfs Channel:file 
    这个案列为了方便显示Exec Source的运行效果,结合Hive中的external table进行来说明。

    a) 编写配置文件:

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /usr/local/log.file
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 0
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    # Use a channel which buffers events in file
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /usr/flume/checkpoint
    a1.channels.c1.dataDirs = /usr/flume/data
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    b)在hive中建立外部表—–hdfs://hadoop80:9000/dataoutput的目录,方便查看日志捕获内容

     

    hive> create external table t1(infor  string)
        > row format delimited
        > fields terminated by '\t'
        > location '/dataoutput/';
    OK
    Time taken: 0.284 seconds


    c) 启动flume agent a1 服务端

    flume-ng  agent -n a1  -c ../conf  -f ../conf/exec.conf   -Dflume.root.logger=DEBUG,console



    d) 使用echo命令向/usr/local/datainput 中发送数据

     

    echo  big data > log.file



    d) 在HDFS和Hive分别中查看flume收集到的日志数据: 

    è¿éåå¾çæè¿°

    hive> select * from t1;
    OK
    big data
    Time taken: 0.086 seconds


    e)使用echo命令向/usr/local/datainput 中在追加一条数据

    echo big data world! >> log.file



    d) 在HDFS和Hive再次分别中查看flume收集到的日志数据: 

    è¿éåå¾çæè¿°

    è¿éåå¾çæè¿°

    hive> select * from t1;
    OK
    big data
    big data world!
    Time taken: 0.511 seconds


    总结Exec source:Exec source和Spooling Directory Source是两种常用的日志采集的方式,其中Exec source可以实现对日志的实时采集,Spooling Directory Source在对日志的实时采集上稍有欠缺,尽管Exec source可以实现对日志的实时采集,但是当Flume不运行或者指令执行出错时,Exec source将无法收集到日志数据,日志会出现丢失,从而无法保证收集日志的完整性。

    案例6:Avro Source:监听一个指定的Avro 端口,通过Avro 端口可以获取到Avro client发送过来的文件 。即只要应用程序通过Avro 端口发送文件,source组件就可以获取到该文件中的内容。 其中 Sink:hdfs Channel:file 
    (注:Avro和Thrift都是一些序列化的网络端口–通过这些网络端口可以接受或者发送信息,Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制) 
    Avro Source运行原理如下图: 
     è¿éåå¾çæè¿°
    flume官网中Avro Source的描述:

    Property     Name   Default Description
    channels      –  
    type          –     The component type name, needs to be avro
    bind          –     日志需要发送到的主机名或者ip,该主机运行着ARVO类型的source
    port          –     日志需要发送到的端口号,该端口要有ARVO类型的source在监听


    1)编写配置文件

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = 192.168.80.80
    a1.sources.r1.port = 4141
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 0
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    # Use a channel which buffers events in file
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /usr/flume/checkpoint
    a1.channels.c1.dataDirs = /usr/flume/data
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    b) 启动flume agent a1 服务端

    flume-ng  agent -n a1  -c ../conf  -f ../conf/avro.conf   -Dflume.root.logger=DEBUG,console



    c)使用avro-client发送文件

    flume-ng avro-client -c  ../conf  -H 192.168.80.80  -p 4141 -F /usr/local/log.file



    注:log.file文件中的内容为:

    [root@hadoop80 local]# more log.file
    big data
    big data world!


    d) 在HDFS中查看flume收集到的日志数据: 

    è¿éåå¾çæè¿°

    è¿éåå¾çæè¿°

    è¿éåå¾çæè¿°
    通过上面的几个案例,我们可以发现:flume配置文件的书写是相当灵活的—-不同类型的Source、Channel和Sink可以自由组合!

    最后对上面用的几个flume source进行适当总结: 
    ① NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件 
    就可以获取到信息。 
    ②Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文 
    件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记 
    该文件已完成或者删除该文件。 
    ③Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源 
    常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 。 
    ④Avro Source:监听一个指定的Avro 端口,通过Avro 端口可以获取到Avro client发送过来的文件 。即只要应用程序通过Avro 端口发送文件,source组件就可以获取到该文件中的内容。

    Flume与Kafka对比

    kafka和flume都是日志系统,kafka是分布式消息中间件,自带存储,提供push和pull存取数据功能。flume分为agent(数据采集器),collector(数据简单处理和写入),storage(存储器)三部分,每一部分都是可以定制的。比如agent采用RPC(Thrift-RPC)、text(文件)等,storage指定用hdfs做。
    kafka做日志缓存应该是更为合适的,但是 flume的数据采集部分做的很好,可以定制很多数据源,减少开发量。所以比较流行flume+kafka模式,如果为了利用flume写hdfs的能力,也可以采用kafka+flume的方式。


    采集层 主要可以使用Flume, Kafka两种技术。

    Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.

    Kafka:Kafka是一个可持久化的分布式的消息队列。

    Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。

     

    正如你们所知Flume内置很多的source和sink组件。然而,Kafka明显有一个更小的生产消费者生态系统,并且Kafka的社区支持不好。希望将来这种情况会得到改善,但是目前:使用Kafka意味着你准备好了编写你自己的生产者和消费者代码。如果已经存在的Flume Sources和Sinks满足你的需求,并且你更喜欢不需要任何开发的系统,请使用Flume。

     

    Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。

     

    Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支持副本事件。于是,如果Flume代理的一个节点崩溃了,即使使用了可靠的文件管道方式,你也将丢失这些事件直到你恢复这些磁盘。如果你需要一个高可靠行的管道,那么使用Kafka是个更好的选择。

     

    Flume和Kafka可以很好地结合起来使用。如果你的设计需要从Kafka到Hadoop的流数据,使用Flume代理并配置Kafka的Source读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume与HDFS及HBase的结合的所有好处。你可以使用Cloudera Manager对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。

    Flume和Kafka可以结合起来使用。通常会使用Flume + Kafka的方式。其实如果为了利用Flume已有的写HDFS功能,也可以使用Kafka + Flume的方式。
     

    原文:https://blog.csdn.net/a2011480169/article/details/51544664 

    展开全文
  • kafka的JavaAPI操作

    千次阅读 2020-03-23 16:12:00
    1、创建maven工程并添加jar包 创建maven工程并添加以下依赖jar包的坐标到pom.xml ...-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apach...

    1、创建maven工程并添加jar包

    创建maven工程并添加以下依赖jar包的坐标到pom.xml

    <dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
    </dependency>    
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>1.0.0</version>
        </dependency>
    
    </dependencies>
    
    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
    

    2、生产者代码

    1、使用生产者,生产数据

    /**
    * 订单的生产者代码,
    */
    public class OrderProducer {
    public static void main(String[] args) throws InterruptedException {
    /* 1、连接集群,通过配置文件的方式
    * 2、发送数据-topic:order,value
    */
    Properties props = new Properties(); 
    //kafka服务器地址
    props.put("bootstrap.servers", "node09:9092"); 
    //消息确认机制
    props.put("acks", "all");
    //重试机制
    props.put("retries", 0);
    //批量发送的大小
    props.put("batch.size", 16384);
    //消息延迟
    props.put("linger.ms", 1);
    //批量的缓冲区大小
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");
    
     KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>
    (props);
    for (int i = 0; i < 1000; i++) {
    // 发送数据 ,需要一个producerRecord对象,最少参数 String topic, V value kafkaProducer.send(new ProducerRecord<String, String>("order", "订单信
    息!"+i));
    Thread.sleep(100);
    }
    }
    }
    

    2、kafka当中的数据分区

    kafka生产者发送的消息,都是保存在broker当中,我们可以自定义分区规则,决定消息发送到哪个partition里面去进行保存
    查看ProducerRecord这个类的源码,就可以看到kafka的各种不同分区策略
    kafka当中支持以下四种数据的分区方式:

    //第一种分区策略,如果既没有指定分区号,也没有指定数据key,那么就会使用轮询的方式将数据均匀的发送到不同的分区里面去
      //ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("mypartition", "mymessage" + i);
      //kafkaProducer.send(producerRecord1);
      //第二种分区策略 如果没有指定分区号,指定了数据key,通过key.hashCode  % numPartitions来计算数据究竟会保存在哪一个分区里面
      //注意:如果数据key,没有变化   key.hashCode % numPartitions  =  固定值  所有的数据都会写入到某一个分区里面去
      //ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("mypartition", "mykey", "mymessage" + i);
      //kafkaProducer.send(producerRecord2);
      //第三种分区策略:如果指定了分区号,那么就会将数据直接写入到对应的分区里面去
    //  ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>("mypartition", 0, "mykey", "mymessage" + i);
     // kafkaProducer.send(producerRecord3);
      //第四种分区策略:自定义分区策略。如果不自定义分区规则,那么会将数据使用轮询的方式均匀的发送到各个分区里面去
      kafkaProducer.send(new ProducerRecord<String, String>("mypartition","mymessage"+i));
    

    自定义分区策略

    public class KafkaCustomPartitioner implements Partitioner {
    	@Override
    	public void configure(Map<String, ?> configs) {
    	}
    
    	@Override
    	public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {
    		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    	    int partitionNum = partitions.size();
    		Random random = new Random();
    		int partition = random.nextInt(partitionNum);
    	    return partition;
    	}
    
    	@Override
    	public void close() {
    		
    	}
    
    }
    

    主代码中添加配置

    @Test
    	public void kafkaProducer() throws Exception {
    		//1、准备配置文件
    	    Properties props = new Properties();
    	    props.put("bootstrap.servers", "node09:9092,node10:9092,node11:9092");
    	    props.put("acks", "all");
    	    props.put("retries", 0);
    	    props.put("batch.size", 16384);
    	    props.put("linger.ms", 1);
    	    props.put("buffer.memory", 33554432);
    	    props.put("partitioner.class", "cn.itcast.kafka.partitioner.KafkaCustomPartitioner");
    	    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    	    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    	    //2、创建KafkaProducer
    	    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
    	    for (int i=0;i<100;i++){
    	        //3、发送数据
    	        kafkaProducer.send(new ProducerRecord<String, String>("testpart","0","value"+i));
    	    }
    
    		kafkaProducer.close();
    	}
    

    3、消费者代码

    消费必要条件
    消费者要从kafka Cluster进行消费数据,必要条件有以下四个

    #1、地址
    bootstrap.servers=node09:9092
    #2、序列化 
    key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #3、主题(topic) 需要制定具体的某个topic(order)即可。
    #4、消费者组 group.id=test
    

    1、自动提交offset

    消费完成之后,自动提交offset

    /**
    * 消费订单数据--- javaben.tojson
    */
    public class OrderConsumer {
    public static void main(String[] args) {
    // 1\连接集群
    Properties props = new Properties(); 
    //指定kafka服务器
    props.put("bootstrap.servers", "hadoop-01:9092"); 
    //消费组
    props.put("group.id", "test");
    
    //以下两行代码 ---消费者自动提交offset值 
    props.put("enable.auto.commit", "true"); 
    //自动提交的周期
    props.put("auto.commit.interval.ms",  "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>
    (props);
    //		 2、发送数据 发送数据需要,订阅下要消费的topic。	order kafkaConsumer.subscribe(Arrays.asList("order")); 
    while (true) {
    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);// jdk queue offer插入、poll获取元素。 blockingqueue put插入原生, take获取元素
    for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println("消费的数据为:" + record.value());
    }
    }
    }
    }
    

    2、手动提交offset

    如果Consumer在获取数据后,需要加入处理,数据完毕后才确认offset,需要程序来控制offset的确认? 关闭自动提交确认选项

    props.put("enable.auto.commit",  "false");
    手动提交ofset值
      kafkaConsumer.commitSync();
    完整代码如下所示:
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("group.id", "test");
    //关闭自动提交确认选项
    props.put("enable.auto.commit", "false"); 
    props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test"));
    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
    buffer.add(record);
    }
    if (buffer.size() >= minBatchSize) { 
    insertIntoDb(buffer);
    // 手动提交offset值
    consumer.commitSync(); 
    buffer.clear();
    }
    }
    
    

    3、消费完每个分区之后手动提交offset

    上面的示例使用commitSync将所有已接收的记录标记为已提交。 在某些情况下,您可能希望通过明确指定偏移量 来更好地控制已提交的记录。 在下面的示例中,我们在完成处理每个分区中的记录后提交偏移量。

    try {
    while(running) {
    ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 
    for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value());
    }
    long lastOffset = partitionRecords.get(partitionRecords.size() -1).offset();
    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
    }
    }
    } finally { consumer.close();}
    
    

    注意事项:
    提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。 因此,在调用commitSync(偏移量)时,应该 在最后处理的消息的偏移量中添加一个

    4、指定分区数据进行消费

    1、如果进程正在维护与该分区关联的某种本地状态(如本地磁盘上的键值存储),那么它应该只获取它在磁盘上 维护的分区的记录。
    2、如果进程本身具有高可用性,并且如果失败则将重新启动(可能使用YARN,Mesos或AWS工具等集群管理框 架,或作为流处理框架的一部分)。 在这种情况下,Kafka不需要检测故障并重新分配分区,因为消耗过程将在另 一台机器上重新启动。

    Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); 
    props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000"); 
    props.put("key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    //consumer.subscribe(Arrays.asList("foo",  "bar"));
    
    //手动指定消费指定分区的数据---start 
    String topic = "foo";
    TopicPartition partition0 = new TopicPartition(topic, 0); 
    TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0,  partition1));
    //手动指定消费指定分区的数据---end
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records)
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    
    

    注意事项:
    1、要使用此模式,您只需使用要使用的分区的完整列表调用assign(Collection),而不是使用subscribe订阅 主题。
    2、主题与分区订阅只能二选一

    5、重复消费与数据丢失

    说明:

    • 1、已经消费的数据对于kafka来说,会将消费组里面的offset值进行修改,那什么时候进行修改了?是在数据消费 完成之后,比如在控制台打印完后自动提交;
    • 2、提交过程:是通过kafka将offset进行移动到下个message所处的offset的位置。
    • 3、拿到数据后,存储到hbase中或者mysql中,如果hbase或者mysql在这个时候连接不上,就会抛出异常,如果在处理数据的时候已经进行了提交,那么kafka伤的offset值已经进行了修改了,但是hbase或者mysql中没有数据,这个时候就会出现数据丢失。
    • 4、什么时候提交offset值?在Consumer将数据处理完成之后,再来进行offset的修改提交。默认情况下offset是 自动提交,需要修改为手动提交offset值。
    • 5、如果在处理代码中正常处理了,但是在提交offset请求的时候,没有连接到kafka或者出现了故障,那么该次修 改offset的请求是失败的,那么下次在进行读取同一个分区中的数据时,会从已经处理掉的offset值再进行处理一 次,那么在hbase中或者mysql中就会产生两条一样的数据,也就是数据重复

    6、consumer消费者消费数据流程

    在这里插入图片描述

    流程描述
    Consumer连接指定的Topic partition所在leader broker,采用pull方式从kafkalogs中获取消息。对于不同的消费模式,会将offset保存在不同的地方
    官网关于high level API 以及low level API的简介
    http://kafka.apache.org/0100/documentation.html#impl_consumer

    高阶API(High Level API)

    kafka消费者高阶API简单;隐藏Consumer与Broker细节;相关信息保存在zookeeper中。

    /* create a connection to the cluster */
    ConsumerConnector connector = Consumer.create(consumerConfig);
    
    interface ConsumerConnector {
    
    /**
    This method is used to get a list of KafkaStreams, which are iterators over
    MessageAndMetadata objects from which you can obtain messages and their
    associated metadata (currently only topic).
    Input: a map of <topic, #streams>
    Output: a map of <topic, list of message streams>
    */
    public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
    /**
    You can also obtain a list of KafkaStreams, that iterate over messages
    from topics that match a TopicFilter. (A TopicFilter encapsulates a
    whitelist or a blacklist which is a standard Java regex.)
    */
    public List<KafkaStream> createMessageStreamsByFilter( TopicFilter topicFilter, int numStreams);
    /* Commit the offsets of all messages consumed so far. */ public commitOffsets()
    /* Shut down the connector */ public shutdown()
    }
    

    说明:大部分的操作都已经封装好了,比如:当前消费到哪个位置下了,但是不够灵活(工作过程推荐使用)

    低级API(Low Level API)

    kafka消费者低级API非常灵活;需要自己负责维护连接Controller Broker。保存offset,Consumer Partition对应 关系。

    class SimpleConsumer {
    
    /* Send fetch request to a broker and get back a set of messages. */ 
    public ByteBufferMessageSet fetch(FetchRequest request);
    
    /* Send a list of fetch requests to a broker and get back a response set. */ public MultiFetchResponse multifetch(List<FetchRequest> fetches);
    
    /**
    
    Get a list of valid offsets (up to maxSize) before the given time.
    The result is a list of offsets, in descending order.
    @param time: time in millisecs,
    if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest
    offset available. if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest
    
    available. public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
    
    
    * offset
    */
    

    说明:没有进行包装,所有的操作有用户决定,如自己的保存某一个分区下的记录,你当前消费到哪个位置。

    kafka Streams API开发

    需求:使用StreamAPI获取test这个topic当中的数据,然后将数据全部转为大写,写入到test2这个topic当中去

    第一步:创建一个topic

    node09服务器使用以下命令来常见一个topic 名称为test2

    cd /export/servers/kafka_2.11-1.0.0/
    bin/kafka-topics.sh --create  --partitions 3 --replication-factor 2 --topic test2 --zookeeper node09:2181,node10:2181,node11:2181
    

    第二步:开发StreamAPI

    public class StreamAPI {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node09:9092");
            props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            KStreamBuilder builder = new KStreamBuilder();
            builder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
            KafkaStreams streams = new KafkaStreams(builder, props);
            streams.start();
        }
    }
    
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import java.util.Properties;
    
    public class Stream {
    
        public static void main(String[] args) {
            Properties props = new Properties();
            //设置程序的唯一标识
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
            //设置kafka集群
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node09:9092");
            //设置序列化与反序列化
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            //实例一个计算逻辑
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            //设置计算逻辑
            streamsBuilder.stream("18BD12").mapValues(line->line.toString().toUpperCase()).to("18BD12-1");
            //构建Topology对象(拓扑,流程)
            final Topology topology = streamsBuilder.build();
            //实例 kafka流
           KafkaStreams streams = new KafkaStreams(topology, props);
           //启动流计算
            streams.start();
    
        }
    }
    
    

    第三步:生产数据

    node09执行以下命令,向test这个topic当中生产数据

    cd /export/servers/kafka_2.11-1.0.0
    bin/kafka-console-producer.sh --broker-list node09:9092,node10:9092,node11:9092 --topic test
    

    第四步:消费数据

    node10执行一下命令消费test2这个topic当中的数据

    cd /export/servers/kafka_2.11-1.0.0
    bin/kafka-console-consumer.sh --from-beginning  --topic test2 --zookeeper node09:2181,node10:2181,node11:2181
    
    展开全文
  • 一、kafka概述  Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。  在...
  • 0、大数据5V Volume 体积大,高速率增长的数据量,PB为单位 Velocity 数据增长速度快 Variety 不同数据类型,文本、音频、视频 Veracity 数据不确定性,大量数据带来不完整性和不一致性 Vlue 数据价值 ---------...
  • 大数据技术--kafka和flume的对比

    千次阅读 2017-01-14 20:46:16
    来源:http://blog.csdn.net/crazyhacking/article/details/45746191
  • 1. 背景介绍 许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征: (1) 构建应用系统和分析系统的桥梁,...
  • [Kafka]为什么使用kafka?

    万次阅读 2017-11-08 16:41:14
    在介绍为什么使用kafka之前,我们有必要来了解一下什么是kafka? 1. 什么是kafkaKafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源...
  • 一、Kafka的基本概述   1、Kafka是什么? Kafka官网上对Kafka的定义叫:Adistributed publish-subscribe messaging system。publish-subscribe是发布和订阅的意思,所以准确的说Kafka是一个消息订阅和发布...
  • Kafka 设计与原理详解

    万次阅读 多人点赞 2015-08-31 15:53:56
    一、Kafka简介1.1 背景历史当今社会各种应用系统诸如商业、社交、搜索、浏览等像信息工厂一样不断的生产出各种信息,在大数据时代,我们面临如下几个挑战: 如何收集这些巨大的信息 如何分析它 如何及时做到如上两点...
  • 一、kafka简介Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司于2010年12月份开源,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。1...
  • 大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转。有没有一个系统可以同时搞定在线应用(消息)和离线应用(数据文件,日志)?这就需要kafka...
  • 日志采集系统flume和kafka有什么区别及联系,它们分别在什么时候使用,什么时候又可以结合?观点一:简言之:这两个差别很大,使用场景区别也很大。先说flume:日志采集。线上数据一般主要是落地文件或者通过socket...
  • 大数据平台每天会产生大量的日志,处理这些日志需要特定的日志系统。 一般而言,这些系统需要具有以下特征: 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦; 支持近实时的在线分析系统和类似于...
  • Kafka学习总结(1)——Kafka入门简介

    千次阅读 2017-08-08 14:53:16
    Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。 在大数据系统中,常常会...
  • Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。 Apache Kafka与传统消息系统...
  • Kafka原理详解

    千次阅读 2017-02-16 11:13:04
    原文链接:Kafka 设计与原理详解一、Kafka简介本文综合了我之前写的kafka相关文章,可作为一个全面了解学习kafka的培训学习资料。1.1 背景历史当今社会各种应用系统诸如商业、社交、搜索、浏览等像信息工厂一样不断...
  • 大数据开发面试经验

    2020-07-19 15:44:24
    大数据开发面试题目 Hadoop 介绍 MapReduce 的运行过程 ,shuffle 过程 如果在现场,我可以手绘 MapReduce 从 InputFormat 到 OutputFormat 的流程,一边画图一边说。如果讲到环形缓冲区那里,是不是有很多调优的...
1 2 3 4 5 ... 20
收藏数 1,455
精华内容 582
关键字:

kafka存取大数据