精华内容
下载资源
问答
  • flume kafka

    2016-06-02 16:45:00
    flume kafka 转载于:https://www.cnblogs.com/liyulong1982/p/5553605.html

    flume kafka

    转载于:https://www.cnblogs.com/liyulong1982/p/5553605.html

    展开全文
  • Spark streaming 与flume kafka整合创建 Kafka topic创建Flume agent创建LoggerGenerator日志产生器Spark streaming 程序整个流程的启动步骤 这篇博客主要记录将spark Streaming和Flume,Kafka进行整和的一个简单例子...


    这篇博客主要记录将spark Streaming和Flume,Kafka进行整和的一个简单例子。首先创建一个类,产生log信息并且将产生的log传输给Flume。Flume接受数据后,通过Kafka sink将日志发布到Kafka中,将Flume作为Kafka的生产者。最后Spark Stream作为Kafka的消费者将数据进行处理。

    创建 Kafka topic

    首先创建一个Kafka的topic,用作消息订阅。在启动zookeeper和kafka后,使用如下命令创建一个topic

    bin/kafka-topics.sh --create --zookeeper hadoop132:2181 --replication-factor 1 --partitions 1 --topic streamingtopic
    

    因为我的kafka的版本为0.11.0,所以要使用zookeeper,版本更高的话不使用zookeeper即可,具体可查看kafka的官网文档
    创建后可以使用如下命令查看kafka中所有的topic,是否有上述创建的streamingtopic。

    bin/kafka-topics.sh --list --zookeeper hadoop132:2181
    

    然后可使用官方的生产者和消费者进行测试,看是否可以成功传输数据

    bin/kafka-console-producer.sh --broker-list  hadoop132:9092 --topic streamingtopic
    bin/kafka-console-consumer.sh --zookeeper hadoop132:9092 --topic streamingtopic  --from-beginning
    

    注意,上述的所有操作都是在kafka的目录里面执行的

    创建Flume agent

    使用Flume就是编写配置文件并且使配置文件生效,我们创建一个Flume agent的配置文件,接受产生的Logger数据,然后传输到Kafka的topic中。
    进入Flume所在的文件,然后创建一个配置文件,命名为streaming.conf

    cd $FLUME_HOME
    vim stream.conf
    

    然后输入下面的配置文件

    agent1.sources=avro-source
    agent1.channels=logger-channel
    agent1.sinks=kafka-sink
    
    
    # define source
    agent1.sources.avro-source.type=avro
    agent1.sources.avro-source.bind=0.0.0.0
    agent1.sources.avro-source.port=41414
    
    # define channels
    agent1.channels.logger-channel.type=memory
    
    # define sink
    agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.kafka-sink.kafka.topic = streamingtopic
    agent1.sinks.kafka-sink.kafka.bootstrap.servers = hadoop132:9092,hadoop133:9092,hadoop134:9092
    agent1.sinks.kafka-sink.kafka.batchSize = 20
    agent1.sinks.k1.kafka.requireAcks = 1
    
    # merge
    agent1.sources.avro-source.channels=logger-channel
    agent1.sinks.kafka-sink.channel=logger-channel
    

    其实,一开始可以先不用设置kafka sink。上述是最终的配置,一开始可以使用如下的配置文件进行测试,看产生的Log是否成功传输到Flume中。

    agent1.sources=avro-source
    agent1.channels=logger-channel
    agent1.sinks=log-sink
    
    
    #define source
    agent1.sources.avro-source.type=avro
    agent1.sources.avro-source.bind=0.0.0.0
    agent1.sources.avro-source.port=41414
    
    #define channels
    agent1.channels.logger-channel.type=memory
    
    #define sink
    agent1.sinks.log-sink.type=logger
    # merge
    agent1.sources.avro-source.channels=logger-channel
    agent1.sinks.log-sink.channel=logger-channel
    

    创建好agent后,启动agent

    bin/flume-ng agent \
    --conf conf \
    --conf-file streaming.conf \
    --name agent1 \
    -Dflume.root.logger=INFO,console
    

    创建LoggerGenerator日志产生器

    LoggerGenerator就是简单的日志产生器,我们使用log4j和一个死循环不断地产生日志输出。
    首先在Maven的项目中添加log4j的依赖

    <dependency>
               <groupId>org.apache.flume.flume-ng-clients</groupId>
                <artifactId>flume-ng-log4jappender</artifactId>
                <version>1.6.0</version>
    </dependency>
    

    然后再resources文件中添加log4j的配置文件log4j.properties

    
    log4j.rootLogger=INFO,stdout,flume
    
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.target = System.out
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
    
    log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
    log4j.appender.flume.Hostname = hadoop132
    log4j.appender.flume.Port = 41414
    log4j.appender.flume.UnsafeMode = true
    
    

    上述关于Flume的配置是为了说明将日志输入到Flume中,配置文件中的主机名字要换成自己Flume所在主机。则LoggerGenerator的代码如下所示

    /**
     * 模拟日志产生
     */
    public class LoggerGenerator {
        private static Logger logger=Logger.getLogger(LoggerGenerator.class.getName());
    
        public static void main(String[] args) throws Exception{
            int index=0;
            while (true){
                Thread.sleep(1000);
                logger.info("value is: "+ index++);
            }
        }
    }
    

    可使用之前讲诉的flume的测试配置,看日志是否输出到Flume中。

    Spark streaming 程序

    spark streaming的编写主要是与kafka的对接,将spark streaming的程序作为kafka的消费者,从kafka的streamingtopic中订阅消息并进行处理。代码如下

    object KafkaWordCount {
      def main(args: Array[String]): Unit = {
    
        val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaWordCount")
    
        val ssc = new StreamingContext(sparkConf,Seconds(5))
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "hadoop132:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "test",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val topics = Array("streamingtopic")
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
         stream.map(_.value()).flatMap(_.split(" ")).count().print()
        ssc.start()
        ssc.awaitTermination()
      }
    
    }
    

    因为我的kafka版本为0.10以上,所以直接采用Direct stream的方法,不同关于不同版本的Kafka如何对接Spark streaming可以直接查看官方文档
    上述代码在idea中进行测试,直接运行就行,要打包的话改变一些参数就行,具体的可看github仓库。
    上述程序只是简单的进行统计输出而已,这只是一个demo。

    整个流程的启动步骤

    1. 启动Flume agent1
    bin/flume-ng agent \
    --conf conf \
    --conf-file streaming.conf \
    --name agent1 \
    -Dflume.root.logger=INFO,console
    
    1. 启动LoggerGenerator产生日志
    2. 启动KafkaWordCount程序处理产生的日志

    上述整个过程中日志经由flume后传输到kafka中,再由KafkaWordCount进行消费。
    具体的的主机名需要改成自己主机。

    也可以打包放到集群上进行测试。再pom.xml中添加下面的依赖

        <build>
            <finalName>SparkStream</finalName>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.0.0</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>KafkaWordCount</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
            </plugins>
        </build>
    

    将上诉KafkaWordCount代码改下面两处

    "bootstrap.servers" -> args(0)
    val topics = Array(args(1))
    

    然后点击idea右边的Maven,并点击Lifecycle中package即可打包,打包后将jar上传到集群中,按上述步骤执行即可。
    3.PNG

    整个项目的代码和配置文件在我的github中,具体jar包的启动说明也在其中。

    展开全文
  • flume kafka spark streaming

    千次阅读 2017-10-11 14:26:44
    flume kafka spark streaming 安装flume 目前1.6 可能不支持Taildir(猜测) ,下载1.7/1.8版本下载地址 http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz 去官网自己找吧。 1....

    flume kafka spark streaming
    安装flume 目前1.6 可能不支持Taildir(猜测) ,下载1.7/1.8版本下载地址
    http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
    去官网自己找吧。
    1.先搭建flume
    将下载的包解压 tar -zxvf **
    1)配置conf
    cp flume-env.sh.template flume-env.sh
    配置java home(也可以不配置 环境变量配置的详细就可以省略)
    这里我就没有配置,贴出/etc/profile 内容

    export JAVA_HOME=/usr/local/soft/jdk/jdk1.8.0_45
    export JRE_HOME=${JAVA_HOME}/jre 
    export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib 
    export  FLUME_HOME=/usr/local/soft/apache-flume-1.8.0-bin
    export  FLUME_CONF_DIR=$FLUME_HOME/conf
    export  FLUME_PATH=$FLUME_HOME/bin
    export PATH=$SCALA_HOME/bin:$SPARK_HOME/bin:$HADOOP_HOME/bin:${JAVA_HOME}/bin:$FLUME_HOME/bin:$PATH

    2)关键配置
    kafkanode.conf 配置如下

    #Agent
    flumeAgent.channels = c1
    flumeAgent.sources  = s1
    flumeAgent.sinks    = k1 
    #flumeAgent Spooling Directory Source
    #注(1)
    flumeAgent.sources.s1.type = TAILDIR
    flumeAgent.sources.s1.positionFile = /opt/apps/log4j/taildir_position.json
    flumeAgent.sources.s1.fileHeader = true
    #flumeAgent.sources.s1.deletePolicy =immediate
    #flumeAgent.sources.s1.batchSize =1000
    flumeAgent.sources.s1.channels =c1
    flumeAgent.sources.s1.filegroups = f1 f2
    flumeAgent.sources.s1.filegroups.f1=/usr/logs/.*log.*
    flumeAgent.sources.s1.filegroups.f2=/logs/.*log.*
    #flumeAgent.sources.s1.deserializer.maxLineLength =1048576
    #flumeAgent FileChannel
    #注(2)
    flumeAgent.channels.c1.type = file
    flumeAgent.channels.c1.checkpointDir = /var/flume/spool/checkpoint
    flumeAgent.channels.c1.dataDirs = /var/flume/spool/data
    flumeAgent.channels.c1.capacity = 200000000
    flumeAgent.channels.c1.keep-alive = 30
    flumeAgent.channels.c1.write-timeout = 30
    flumeAgent.channels.c1.checkpoint-timeout=600
    # flumeAgent Sinks
    #注(3)
    flumeAgent.sinks.k1.channel = c1
    flumeAgent.sinks.k1.type = avro
    # connect to CollectorMainAgent
    flumeAgent.sinks.k1.hostname = data17.Hadoop
    flumeAgent.sinks.k1.port = 44444
    

    kafka.conf 配置如下

    #flumeConsolidationAgent
    flumeConsolidationAgent.channels = c1
    flumeConsolidationAgent.sources  = s1
    flumeConsolidationAgent.sinks    = k1 
    
    #flumeConsolidationAgent Avro Source
    #注(4)
    flumeConsolidationAgent.sources.s1.type = avro
    flumeConsolidationAgent.sources.s1.channels = c1
    flumeConsolidationAgent.sources.s1.bind = data17.Hadoop
    flumeConsolidationAgent.sources.s1.port = 44444
    flumeConsolidationAgent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink  
    flumeConsolidationAgent.sinks.k1.topic = myflume
    flumeConsolidationAgent.sinks.k1.brokerList = data14.Hadoop:9092,data15.Hadoop:9092,data16.Hadoop:9092
    flumeConsolidationAgent.sinks.k1.requiredAcks = 1  
    flumeConsolidationAgent.sinks.k1.batchSize = 20  
    flumeConsolidationAgent.sinks.k1.channel = c1
    flumeConsolidationAgent.channels.c1.type = file
    flumeConsolidationAgent.channels.c1.checkpointDir = /var/flume/spool/checkpoint
    flumeConsolidationAgent.channels.c1.dataDirs = /var/flume/spool/data
    flumeConsolidationAgent.channels.c1.capacity = 200000000
    flumeConsolidationAgent.channels.c1.keep-alive = 30
    flumeConsolidationAgent.channels.c1.write-timeout = 30
    flumeConsolidationAgent.channels.c1.checkpoint-timeout=600
    
    

    kafka.conf 是主节点 启动命令

    bin/flume-ng agent --conf conf --conf-file conf/kafka.conf --name flumeConsolidationAgent -Dflume.root.logger=DEBUG,console

    kafkanode.conf 是副节点 可以有多个 启动命令

    bin/flume-ng agent --conf conf --conf-file conf/kafkanode.conf --name flumeAgent -Dflume.root.logger=DEBUG,console

    【副节点配置都一样】
    2.kafka 配置
    这里就不说了因为没有什么特殊的配置
    用原有的kafka就行,新建一个topic 就行了 命令

    /usr/local/soft/kafka_2.11-0.9.0.1/bin/kafka-topics.sh --create --topic myflume --replication-factor 2 --partitions 5 --zookeeper data4.Hadoop:2181

    3.spark streaming

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.StreamingContext
    import kafka.serializer.StringDecoder
    import scala.collection.immutable.HashMap
    import org.apache.log4j.Level
    import org.apache.log4j.Logger
    
    object RealTimeMonitorStart extends Serializable {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
    
        val conf = new SparkConf().setAppName("stocker").setMaster("local[2]")
        val sc = new SparkContext(conf)
    
        val ssc = new StreamingContext(sc, Seconds(1))
    
        // Kafka configurations
    
    
    
        val topicMap =  "myflume".split(",").map((_, 3)).toMap
        print(topicMap)
        val kafkaStrings = KafkaUtils.createStream(ssc,"data4.Hadoop:2181,data5.Hadoop:2181,data6.Hadoop:2181","myflumegroup",topicMap)
        val urlClickLogPairsDStream = kafkaStrings.flatMap(_._2.split(" ")).map((_, 1))
    
        val urlClickCountDaysDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
          (v1: Int, v2: Int) => {
            v1 + v2
          },
          Seconds(60),
          Seconds(5));
    
        urlClickCountDaysDStream.print();
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

    目前就这些,有疑问评论留言。

    展开全文
  • 本想搭建一个 flume+hdfs+kafka+storm+mysql 的日志实时分析和存储的系统,但是flume日志收集这块一直不通,查看flume的日志也没有报错,不知道该怎么解决了,求大家帮帮忙,贴出集群配置和配置文件如下:共5台机器...

    本想搭建一个 flume+hdfs+kafka+storm+mysql 的日志实时分析和存储的系统,但是flume日志收集这块一直不通,查看flume的日志也没有报错,不知道该怎么解决了,求大家帮帮忙,贴出集群配置和配置文件如下:

    共5台机器:node1~node5,其中node3~node5为日志收集的agent,node1~node2为flume的collector,最终存储两份,一份到kafka,一份到hdfs。

    agent的配置文件如下:

    #def

    agent.sources = src_spooldir

    agent.channels = file memory

    agent.sinks = collector_avro1 collector_avro2

    # sources

    agent.sources.src_spooldir.type = spooldir

    agent.sources.src_spooldir.channels = file memory

    agent.sources.src_spooldir.spoolDir = /data/flume/spoolDir

    agent.sources.src_spooldir.selector.type = multiplexing

    agent.sources.src_spooldir.fileHeader = true

    # channels

    agent.channels.file.type = file

    agent.channels.file.checkpointDir = /data/flume/checkpoint

    agent.channels.file.dataDirs = /data/flume/data

    agent.channels.memory.type = memory

    agent.channels.memory.capacity = 10000

    agent.channels.memory.transactionCapacity = 10000

    agent.channels.memory.byteCapacityBufferPercentage = 20

    agent.channels.memory.byteCapacity = 800000

    # sinks

    agent.sinks.collector_avro1.type = avro

    agent.sinks.collector_avro1.channel = file

    agent.sinks.collector_avro1.hostname = node1

    agent.sinks.collector_avro1.port = 45456

    agent.sinks.collector_avro2.type = avro

    agent.sinks.collector_avro2.channel = memory

    agent.sinks.collector_avro2.hostname = node2

    agent.sinks.collector_avro2.port = 4545

    collector端的配置文件如下:

    #def

    agent.sources = src_avro

    agent.channels = file memory

    agent.sinks = hdfs kafka

    # sources

    agent.sources.src_avro.type = avro

    agent.sources.src_avro.channels = file memory

    agent.sources.src_avro.bind = node1

    agent.sources.src_avro.port = 45456

    agent.sources.src_avro.selector.type = replicating

    # channels

    agent.channels.file.type = file

    agent.channels.file.checkpointDir = /data/flume/checkpoint

    agent.channels.file.dataDirs = /data/flume/data

    agent.channels.memory.type = memory

    agent.channels.memory.capacity = 10000

    agent.channels.memory.transactionCapacity = 10000

    agent.channels.memory.byteCapacityBufferPercentage = 20

    agent.channels.memory.byteCapacity = 800000

    # sinks

    agent.sinks.hdfs.type = hdfs

    agent.sinks.hdfs.channel = file

    agent.sinks.hdfs.hdfs.path = hdfs://node1/flume/events/%y-%m-%d/%H%M/%S

    agent.sinks.hdfs.hdfs.filePrefix = log_%Y%m%d_%H

    agent.sinks.hdfs.hdfs.fileSuffix = .txt

    agent.sinks.hdfs.hdfs.useLocalTimeStamp = true

    agent.sinks.hdfs.hdfs.writeFormat = Text

    agent.sinks.hdfs.hdfs.rollCount = 0

    agent.sinks.hdfs.hdfs.rollSize = 1024

    agent.sinks.hdfs.hdfs.rollInterval = 0

    agent.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink

    agent.sinks.kafka.channel = memory

    agent.sinks.kafka.kafka.topic = test

    agent.sinks.kafka.kafka.bootstrap.servers = node3:9092,node4:9092,node5:9092

    agent.sinks.kafka.kafka.flumeBatchSize = 20

    agent.sinks.kafka.kafka.producer.acks = 1

    agent.sinks.kafka.kafka.producer.linger.ms = 1

    agent.sinks.kafka.kafka.producer.compression.type = snappy

    最终 hdfs和kafka都没有接收到数据。

    展开全文
  • windows10平台上测试flume kafkaSource->kafkaSink 1.安装kafka 1.1 下载官网安装包 :http://kafka.apache.org/downloads Binary downloads:下的某一个版本即可 选择合适的版本 1.2 解压到不包含中文...
  • Flume kafkaChannel

    2020-07-06 19:07:57
    flume的一个agent包括source、channel、sink,但是有时候其实可以不需要source或者是sink,比如在使用flume时,把flume监控的日志发送到kafka中,可以不用使用sink,把channel定义为kafkachannel,kafkachannel作为...
  • 1、flume的配置文件 kafka-sink.properties # Name the components on this agent # source:起一个别名 # properties文件它是java的配置文件,=左边就是键,=右边是值;键的开头都是以a1(就是flume的名字--agent的名字...
  • 使用flume kafka sink时, 启动flume抛出异常 Sink kafkasink has been removed due to an error during configuration org.apache.flume.conf.ConfigurationExceptio...
  • flume kafka错误

    2019-10-30 15:25:57
    1、在flume日志中出现warn级别的error Error while fetching metadata with correlation id {} default-flume-topic=LEADER_NOT_AVAILABLE 解决:先在kafka中创建topic
  • 在使用flumekafka插件为0.9版本,由于channel队列满后,导致心跳问题,进而导致flume kafka不接收消息, 所以打算升级到0.10版本,fluem首先需要做如下: 将flume中的lib包中kafka 0.9相关升级到0.10,(前提,...
  • 软件主要版本号Flume:1.7.0Storm:1.1.0Kafka:2.10-0.9.0.1Zookeeper: 3.4.10reids : 3.2.83.主要软件下载4.安装java环境略ssh免密钥登陆配置--主要用于集群#创建账户useradd hadoopgroupadd hadooppasswd hadoop#...
  • flume kafka storm

    2019-10-09 20:02:41
    bin/kafka-server-start.sh config/server.porperties 启动kafka bin/kafka-topic.sh --zookeeper node2:2181,node3:2181,node4:2181 --create --replication-factor2 --partitions 3 --topic test 创建主题 bin/...
  • Flume KafkaChannel的使用

    2021-03-03 11:46:23
    Flume提供了Kafka Sink将数据写入Kafka,也提供了KafkaChannel使用Kafka作为Channel存储数据。 使用KafkaChannel既可以避免Memory Channel的内存溢出,又可以提供不错的性能。 另外当我们需要将数据写入Kafka时,...
  • flume kafka sink

    2019-03-09 16:03:59
    # 指定Agent的组件名称 a1.sources = r1 ...# 指定Flume source(要监听的路径) a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/dir #每行读取的大小限制 a1.sources.r1.deserializer.maxLine...
  • 解决 flume KafkaSink 启动后cpu占用100%的问题 Flume 版本 :1.6.0-cdh5.5.0 问题描述: 配置kafkasink,将实时数据发送到kafkaFlume启动完成后,没有日志处理时,cpu使用率飙升到100% 当有日志数据处理时,...
  • Flume 下载flume1.8 1:将下载的flume包,解压到/opt目录中 cd /opt tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /opt mv apache-flume-1.8.0-bin flume 2:修改conf下的 flume-env.sh 配置文件,主要是JAVA_HOME变量...
  • 文章目录第一个问题接下来紧接着就遇到第二个问题了接下来就来了第三... 启动之后发现没有kafka中没有对应的topic, 然后参看flume启动日志,发现1.5版本没有TailDir Source, 我就把flume的版本升级到了1.7,TailD...
  • flume kafka2hdfs demo

    2019-08-29 11:47:41
    做了个简单消费kafka数据到HDFS 配置文件名为:test0.conf 在flume conf目录启动 …/bin/flume-ng agent -n a1 -c ./conf -f ./test0.conf -Dflume.root.logger=INFO,console a1.sources = r1 a1.sinks = k1 a1....
  • 近期笔者在生产环境中使用Flume的hdfssink读取kafkachannel中的数据,然后写到hdfs文件中,出现了数据重复采集的状况,为此,开启了一次Flume数据传输流程的探索。 问题现象 最先发现问题是在hdfs中发现很多大小...
  • 1flume启动监听自己 bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console 2.flume监听集群启动命令 ./bin/flume-ng agent --conf ./conf/ --conf-...
  • datanode namenode kafka flume zookeeper nodemanager hadoop102 datanode resourcemanager kafka zookeeper nodemanager hadoop103 datanode secondarynamenode kafka zookeeper nodemana
  • flume kafka-sink high cpu

    千次阅读 2015-08-04 14:54:15
    flume sink到kfka时候,...一、flumekafka的sink cpu太高分析: 1、获取flume的进程id [root@datanode conf]$ top top - 10:17:58 up 14 days, 18:09, 2 users, load average: 1.37, 1.11, 0.65 Tasks: 436 total,

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 7,218
精华内容 2,887
关键字:

flumekafka