2018-10-19 13:28:48 baidu_27485577 阅读数 3023
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1722 人正在学习 去看看 李飞

【那些年我们踩过的坑】Kafka与Spark Stream 集成之作业提交报错java.lang.ClassNotFoundException…

kafka和spark的配置和安装这里就不说明了,网上的资料有很多,英语好的可以看官方的文档。关于Kafka与Spark Stream网上也有资料,这里说明一下我用的版本为kafka_2.11-2.0.0,spark2.1,在提交程序是遇到了两个错误。
1. kafka生产者程序提交报错
提交命令如下,这里的程序jar包实现简单的日志发送功能,日志文件是搜狗开源的数据链接

java -cp /home/bd/lc/jar/KafkaSpark-2.0.jar SimpleKafkaProducer /home/bd/lc/data/SogouQ/*

提交报错如下:

Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/Producer
	at java.lang.Class.getDeclaredMethods0(Native Method)
	at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
	at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
	at java.lang.Class.getMethod0(Class.java:3018)
	at java.lang.Class.getMethod(Class.java:1784)
	at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
	at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.producer.Producer
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 7 more

这里的报错是因为所提交的生产者的程序找不到相关的依赖包,程序不能运行,所以在提交程序是要指定依赖包,改为如下命令:

java -cp /home/bd/software/kafka_2.11-2.0.0/libs/*:/home/bd/lc/jar/KafkaSpark-2.0.jar SimpleKafkaProducer /home/bd/lc/data/SogouQ/*`

“/home/bd/software/kafka_2.11-2.0.0/libs/*:”这是指定的依赖的路径,一般是KAFKA_HOME的路径下的libs文件夹,不可照搬。
2. spark-stream 任务提交报错
spark程序实现日志的接受,做实时处理,提交命令如下,这里使用的是spark on yarn 运行模式,还有其他一些其他参数可以自行配置,

spark-submit  --master yarn-client --class Receiver /home/bd/lc/jar/KafkaSpark-2.0.jar 

提交报错如下,这里由于篇幅原因只截取了部分重要的报错,

18/10/19 12:47:00 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
18/10/19 12:47:00 ERROR JobScheduler: Error running job streaming job 1539924420000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, c2-psc1-VB, executor 2): java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaRDDPartition.....
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, c2-psc1-VB, executor 2): java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaRDDPartition
............

报错提示也是因为提交的spark程序没有找到所依赖的jar包spark-streaming-kafka-0-10_2.11-2.1.0.jar和kafka-clients-0.10.0.1.jar,我们把这两个jar包拷贝到所有节点的SPARK_HOME/jars/目录下,然后再提交,可是还是报相同的错误,不知道为什么(有大佬知道的可以告诉下),不过还是找到了其他的解决办法。
在spark的安装配置时,我们在SPARK_HOME/conf/spark-defaults.conf文件中配置了如下的参数,spark on yarn模式,默认情况下会读取spark本地的jar包(再jars目录下)分配到yarn的containers中,但是我只之前的拷贝为甚么还是报错(迷茫脸…),配置好spark.yarn.jars参数后,可以将spark所需的jar包放在对应的hdfs路径上。

spark.yarn.jars=hdfs://c1-psc1-VB:9000/user/bd/spark/sparkjar/*`

至此,两个报错解决,只是我之前将两个jar包拷贝到SPARK_HOME/jars/下怎么不可以。。。。。。。。

2018-06-30 11:11:52 yc_1993 阅读数 12689
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1722 人正在学习 去看看 李飞

前言

flume+kafka+spark stream 是目前比较常用的一套大数据消息日志收集管理框架,至于最后是入到Hive或者者Hbase需看不同业务场景,下面以HBase为场景简述下整个配置与搭建流程以及这些框架如此搭配的优点。

1. flume 配置

1.1 flume 简介

从官网文档 https://flume.apache.org 可以知道Flume的定位是很清晰的,它提供了一个分布式的,高可用的桥梁链接,可以收集、聚合和移动大量的日志数据,从许多不同的数据源到集中式数据存储,大概的结构如下图,流程大致为 从源端(source)接收数据,经过管道(channel)的缓存等等,发送到目标(sink)端。:

其中source的定义flume提供了很多方式,常用的有以下几种:

  • Http source ,这种方式可以通过监听接口方式来收集log;
  • Exec source ,这种方式可以通过执行一些shell命令来收集log,例如通过 tail -f 文件 来监听文件追加的日志;
  • Spooling source,这种方式可以监听某个目录的log,当有新的log产生时即会被发送出去;
  • 还有很多其他的方式,例如可以以kafka作为source,这样flume就充当了kafka的消费者,当然还有很多如 Avro sourceThrift sourceTCP类的等等,具体参考官网文档更加相应场景配置即可。

channel同样flume提供了很多方式,memory channel这种方式已经不太建议了,原因也很明显,不够安全,当出现任何机器问题时数据就会丢失,file channelkafka channel是比较推荐的做法,特别是当需要比较高的并发时,kafka channel是一个不错的选择。

sink同样flume提供了很多方式,常用的有以下几种:

  • HDFS/Hive/Hbase/ElasticSearch sink,直接写入hdfs/Hive/Hbase/ElasticSearch,这种方式适合那些比较无需做ETL的场景。
  • kafka sink,直接充当kafka的生产者,可以看到kafka可以在整个flume生命周期里可以自由穿插。
  • Http sink,直接通过post方法将数据发送到目标api。
  • 其他的一些详细见官网文档即可。

1.2 flume 配置

下面以Spooling Directory Source -> file channel -> kafka sink为例:

一份样例配置参数:

# Name the components on this agent
agent.sources = dir-src
agent.sinks = kafka-sink
agent.channels = file-channel

# Describe/configure the source
agent.sources.dir-src.type = spooldir
agent.sources.dir-src.spoolDir = #监听目录
agent.sources.dir-src.fileHeader = true
agent.sources.dir-src.deserializer.maxLineLength=1000000

# Describe the sink
agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka-sink.kafka.topic = test
agent.sinks.kafka-sink.kafka.bootstrap.servers = #kafka boostrapServer

# Use a channel which buffers events in file
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = # checkpoint目录
agent.channels.file-channel.dataDirs = # 缓存的数据目录

# Bind the source and sink to the channel
agent.sources.dir-src.channels = file-channel
agent.sinks.kafka-sink.channel = file-channel

配置详解:

  • 首先每个flume配置表可以存在多个agent,多个source,多个channel,多个sink,所以可以根据相应业务场景进行组合。
  • 对于每个agent,必须配置相应的source/channel/sink,通过agent.source = ???,agent.channel = ???,agent.sink = ???来指定。
  • 对于具体的source/channel/sink,通过 agent.{source/channel/sink}.???.属性 = ... 来具体配置 source/channel/sink 的属性。
  • 配置完source/channel/sink相应的属性后,需把相应的组件串联一起,如: agent.sources.dir-src.channels = file-channel其中dir-src这个source指定了其channel为我们定义好的file-channel.

一些Tips:

  • flume在收集log的时候经常会出现Line length exceeds max (2048), truncating line!,这个一般情况对于一些log的存储没影响,但是遇到需要解析log的情况就有问题了,有时一个json或者其他格式的log被截断了,解析也会出问题,所以在source的属性配置里可以通过参数deserializer.maxLineLength调高默认的2048。
  • flume在监听相应的目录时,如果有重名的文件,或者直接在监听目录下修改相应正在读取的文件时,都会报错,而且flume-ng目前没有这种容错机制,报错只能重启了,还有一个比较大的问题,flume-ng没有提供相应的kill脚本,只能通过shell直接ps -aux | grep flume找到相应的PID,然后手动kill。
  • flume在监听相应目录时,如果目录下的文件是通过HTTP或者scp传输过来的,小文件的话没问题,但是当文件大小超过网络传输速率,就会造成flume读取文件时报错直接显示文件大小正在变化,这点也是比较麻烦的,所以建议是现有个临时目录先存放文件,等文件传输完成后再通过shell的mv命令直接发送到监听目录。
  • 有时候我们的log文件是以压缩的方式传输过来,但是如果我们想解析后才发送出去的话,可以将当前的Spooling Directory Source的改为Exec Source,可以指定改source的command参数里写shell解析命令。

flume的启动:

flume-ng agnet --conf "配置文件文件目录" --conf-file "配置文件" --name "配置文件里agent的名字"

2. kafka 配置

2.1 kafka简介

kafka的官网 https://kafka.apache.org 同样对kafka的定位做了一个清晰阐述,分布式的消息流平台,与传统的MQ架构类似,kafka解耦了生产者,中间层与消费者三个组件,乍一听似乎与其他的MQ框架没有太大的区别,于是对比了很久,各个框架间并没有表现出显著性的区别以致某一方是不可替代的,但是其中仍有一些值得细细推敲的地方,具体可见下表(以rabbit MQ 为例):

属性 rabbit MQ Kafka
多语言支持 支持,语言无关 支持,语言无关
消息延迟 微妙级 毫秒级
负载均衡 miror queue 多broker,多replication
协议问题 遵从AMQP协议, broker由Exchange,Binding,queue组成,客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费, rabbitMQ以broker为中心,有消息的确认机制。 遵从一般的MQ结构,producer,broker,consumer,consumer从broker上批量pull数据,通过提交offset来做相应消息拉取管控。
集群扩展 支持 原生支持
事务支持 原生支持 支持

除上述所列各点外,还有几点需单独拿出讨论的:

  • kafka利用zookeeper做均衡管理,最新的kafka版本在消费者消费完信息后会将offset保存在kafka本身服务上,而不是zookeeper上,这在很大程度保证了消息队列被消费不会出现缺失与重复,但是要保证0重复0丢失,对于consumer提交offset的设计仍有比较大的考验。
  • kafka在创建topic时一般都是分区存储,如此带来的问题是每个分区间的消息顺序是很难保证全局性,只能在单个分区下保证,因此kafka在日志这个领域会更加的吻合和焕发光芒。

2.2 kafka配置

同样,下面是一份单broker的kafka配置方案:

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

delete.topic.enable = true

有几点配置需要注意:

  • broker.id 必须是全局唯一的,多个broker尽量部署在不同的集群上,通过指定相同的zookeeper.connect 进行统一管理。
  • listeners 是监听相应的IP:Port,如果kafka已经部署在集群上,会通过java.net.InetAddress.getCanonicalHostName()自动获取到相应的地址。
  • num.partitions 是为每个topic保留的默认分区,如果创建topic时不指定即采用默认1。
  • 其他的一些配置参数看注释既可以,delete.topic.enable = true可以让topic的删除什么更加方便。

kafka的启动:

kafka-server-start server.properties

创建无备份,分区为1的topic:

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic test

删除topic:

kafka-topics  --delete --zookeeper localhost:2181 --topic test
zookeeper-client 
rmr /brokers/topics/test

3. spark stream 消费者

3.1 spark stream 简介

Spart Stream 是 Spark 框架下一个流处理的子项目,其基础数据DStream封装的是spark的RDD,通过轮询不断地从源端拉取数据,spark stream支持多种源端数据的拉取,同时基于spark的核心计算模块,使得其在实时性和大数据方面有着很强的优势,其流程结构大概如下图所示:

3.2 spark stream 写 Kafka 消费者

spark stream 写 kafka 消费者,官方提供了相应的示例,这里再稍微简述下:

首先sbt引入spark stream/Kafka相关依赖

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"  % Provided
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.0" 

其次定义好kafka参数:

val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "192.168.1.23:9093,gz19:9092,gz21:9092,gz24:9092,gz18:9092,gz89:9092,bigdata.zuzuche.cn:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "kafka_consumer_tantu",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
    )

订阅相应的topic:

val stream = KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
        )

接下来就可以对stream做进一步的处理,跟spark rdd的处理类似。

同样在写spark stream的时候有一些细节需要注意:

  • spark stream的轮询时间最小可以达到500ms, 但是如此带来的集群资源消耗也会更大,轮询的时间间隔应根据具体的场景设定。
  • spark stream本质上仍为spark的任务,只是添加了轮询机制使其一直挂在后台,当spark-submit提交spark stream的时候若设定的excutor大于kafka topic创建时设定的分区,多出来的部分会处于空闲,所以两者的配置要互相参考。

4. HBase 存储

4.1 HBase 简介

HBase是NoSql中的一个代表,是一个面向列的数据库,支持亿级别的行*百万级别的列,若要定位到某个字段的值,通常需要限定如下:表名 -> rowid -> column family:column name -> timestamp,其中rowid为全局唯一的行键,行键的设计会影响到列的同个列下的排序,column family为列簇,其含义接近于HIve中的分区,通过column family的限定,其下相应的column会被集中存放,不同column family的column会分开存放,这样当需要索引少量的列时,无需遍历全部字段,当然,column family也不是越多越好,而且官方文档似乎也不支持过多的列簇,关于HBase的表结构,参考如下图:

4.2 spark stream 写入 HBase (以HBase 1.2.0 为例)

引入HBase相关依赖:

libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0"

将数据存储为HBase对应的格式:

// 随机产生某个uuid为行键
val put = new Put(Bytes.toBytes(UUID.randomUUID().toString))
// 将列簇,列明,列值添加进对应结构
put.addColumn(Bytes.toBytes("column_family"), Bytes.toBytes("column_name"), Bytes.toBytes("column_value"))

插入HBase:

// 表名
val tablename = "table_name"
// 创建初始配置
val hbaseconf = HBaseConfiguration.create()
// 创建链接
val conn = ConnectionFactory.createConnection(hbaseconf)
// 指定表
val table: HTable = new HTable(hbaseconf, Bytes.toBytes(tablename))
// 提交事务,插入数据
table.put(put)

5. Hive 外部表关联 HBase, impala 映射查询

Hive做HBase的外部关联,需提前定义好列字段,而通常HBase的列都是无限扩展的,所以通过Hive外部映射HBase,只能处理一些日常的查询需求。

5.1 Hive 外部银映射 HBase:

CREATE EXTERNAL TABLE hive_external_HBase(
key string,
time string,
`_track_id` string,
)    
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'    
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,search:time,search:_track_id")    
TBLPROPERTIES("hbase.table.name" = "HBase_table_name");

语法与创建Hive基本一致,需要注意的是hive字段不支持特殊字符如$_*&等开头,需加转义符。

最后,Hive有时候查询的速度并不能达到我们的想象,再做以不impala映射,用impala的查询引擎,会明显快很多:

INVALIDATE METADATA;

总结

flume+kafka+spark stream+hbase是目前比较常用的组合,相信对这种组合存疑的有不少,下面稍微总结下:

  • 为什么不用kafka直接接收源数据,而用flume作为Kafka的源?

从配置方面讲,flume提供了多种源接收方式,且只需做简单的配置即可,灵活的多种源配置也方便后续的收集扩展,kafka作为源会比flume稍微麻烦点,需在前面写一层生产者,实际上cloudera官方也建议,当存在多给消费者时,用kafka会更好,当存在多个多种生产者时,用flume会更加方便,同时,如果并发很高,可以采用kafka做flume的channel。

  • 为什么用spark stream作为kafka的消费者而不是其他?

就目前spark stream的性能来看,spark stream还不能完全称之为实时流处理,更合适的叫法应该是准实时批处理,但是由于其最低延迟可以达到秒级,基本满足了大部分系统需要,对于对实时性要求不高的可以胜任,同时Spark stream内部封装的仍是Spark RDD结构,对于熟悉spark家族的开发者会更友好,且相应的处理解决方案会更多更成熟。另外Storm也是目前spark stream外比较流行的流处理,其实时性比spark stream更高,但属于spark体系外,要求相关开发者具备的能力会更高,所以可以根据不同场景和技术体系,做相应选择。

  • 为什么是入到hbase而不是其他Nosql?

无他,HBase是目前Hadoop家族里BigTable最完善的,列式存储结构最成熟的方案。

2017-02-19 23:09:37 xuyaoqiaoyaoge 阅读数 4894
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1722 人正在学习 去看看 李飞

kafka

kafka中文教程

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 kafka的设计初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力.
Apache kafka是消息中间件的一种。
一 、术语介绍
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker。
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。每个topic都具有这两种模式:(队列:消费者组(consumer group)允许同名的消费者组成员瓜分处理;发布订阅:允许你广播消息给多个消费者组(不同名))。
Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker,比如flume采集机就是Producer。
Consumer
消息消费者,向Kafka broker读取消息的客户端。比如Hadoop机器就是Consumer。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

二、使用场景
1、Messaging
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的”事务性”“消息传输担保(消息确认机制)”“消息分组”等企业级特性;kafka只能使用作为”常规”的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)
2、Websit activity tracking
kafka可以作为”网站活性跟踪”的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等
3、Log Aggregation
kafka的特性决定它非常适合作为”日志收集中心”;application可以将操作日志”批量”“异步”的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.
4、它应用于2大类应用
构建实时的流数据管道,可靠地获取系统和应用程序之间的数据。
构建实时流的应用程序,对数据流进行转换或反应。

三、分布式
Log的分区被分布到集群中的多个服务器上。每个服务器处理它分到的分区。 根据配置每个分区还可以复制到其它服务器作为备份容错。 每个分区有一个leader,零或多个follower。Leader处理此分区的所有的读写请求,而follower被动的复制数据。如果leader宕机,其它的一个follower会被推举为新的leader。 一台服务器可能同时是一个分区的leader,另一个分区的follower。 这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理。

四、消息处理顺序
Kafka保证消息的顺序不变。 在这一点上Kafka做的更好,尽管并没有完全解决上述问题。 Kafka采用了一种分而治之的策略:分区。 因为Topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。

五、安装
kafka安装和启动

六、Key和Value
Kafka是一个分布式消息系统,Producer生产消息并推送(Push)给Broker,然后Consumer再从Broker那里取走(Pull)消息。Producer生产的消息就是由Message来表示的,对用户来讲,它就是键-值对。

Message => Crc MagicByte Attributes Key Value

kafka会根据传进来的key计算其分区,但key可以不传,可以为null,空的话,producer会把这条消息随机的发送给一个partition。

这里写图片描述

MessageSet用来组合多条Message,它在每条Message的基础上加上了Offset和MessageSize,其结构是:

MessageSet => [Offset MessageSize Message]

它的含义是MessageSet是个数组,数组的每个元素由三部分组成,分别是Offset,MessageSize和Message,它们的含义分别是:
这里写图片描述

七、小例子
1.启动ZooKeeper
进入kafka目录,加上daemon表示在后台启动,不占用当前的命令行窗口。
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
如果要关闭,下面这个
bin/zookeeper-server-stop.sh
ZooKeeper 的端口号是2181,输入jps查看进程号是QuorumPeerMain
2.启动kafka
在server.properties中加入,第一个是保证你删topic可以删掉,第二个不然的话就报topic找不到的错误:
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9092
然后:
bin/kafka-server-start.sh -daemon config/server.properties
如果要关闭,下面这个
bin/kafka-server-stop.sh
Kafka的端口号是9092,输入jps查看进程号是Kafka
3.创建一个主题(topic)
创建一个名为“test”的Topic,只有一个分区和一个备份:
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
创建好之后,可以通过运行以下命令,查看已创建了哪些topic:
bin/kafka-topics.sh –list –zookeeper localhost:2181
查看具体topic的信息:
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic test
4.发送消息
启动kafka生产者:
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
5.接收消息
新开一个命令行窗口,启动kafka消费者:
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning
6.最后
在producer窗口中输入消息,可以在consumer窗口中显示:
这里写图片描述
这里写图片描述

spark streaming

spark中文学习指南

Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。

Spark Streaming的优势在于:
能运行在100+的结点上,并达到秒级延迟。
使用基于内存的Spark作为执行引擎,具有高效和容错的特性。
能集成Spark的批处理和交互查询。
为实现复杂的算法提供和批处理类似的简单接口。

首先,Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多块。
在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口类似。
正如Spark Streaming最初的目标一样,它通过丰富的API和基于内存的高速计算引擎让用户可以结合流式处理,批处理和交互查询等应用。因此Spark Streaming适合一些需要历史数据和实时数据结合分析的应用场合。当然,对于实时性要求不是特别高的应用也能完全胜任。另外通过RDD的数据重用机制可以得到更高效的容错处理。

当一个上下文(context)定义之后,你必须按照以下几步进行操作:
定义输入源;
准备好流计算指令;
利用streamingContext.start()方法接收和处理数据;
处理过程将一直持续,直到streamingContext.stop()方法被调用。

可以利用已经存在的SparkContext对象创建StreamingContext对象:

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

窗口函数
对于spark streaming中的窗口函数,参见:
窗口函数解释

对非(K,V)形式的RDD 窗口化reduce:
1.reduceByWindow(reduceFunc, windowDuration, slideDuration)
2.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)

对(K,V)形式RDD 按Key窗口化reduce:
1.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
2.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions, filterFunc)
从效率来说,应选择带有invReduceFunc的方法。

可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支:

 dstream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
          // ConnectionPool is a static, lazily initialized pool of connections
          val connection = ConnectionPool.getConnection()
          partitionOfRecords.foreach(record => connection.send(record))
          ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      })
  })

spark执行时间是少了,但数据库压力比较大,会一直占资源。

小例子:

package SparkStreaming

import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._

object Spark_streaming_Test {
  def main(args: Array[String]): Unit = {
    //local[2]表示在本地建立2个working线程
    //当运行在本地,如果你的master URL被设置成了“local”,这样就只有一个核运行任务。这对程序来说是不足的,因为作为receiver的输入DStream将会占用这个核,这样就没有剩余的核来处理数据了。
    //所以至少得2个核,也就是local[2]
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    //时间间隔是1秒
    val ssc = new StreamingContext(conf, Seconds(1))
    //有滑动窗口时,必须有checkpoint
    ssc.checkpoint("F:\\checkpoint")
    //DStream是一个基类
    //ssc.socketTextStream() 将创建一个 SocketInputDStream;这个 InputDStream 的 SocketReceiver 将监听服务器 9999 端口
    //ssc.socketTextStream()将 new 出来一个 DStream 具体子类 SocketInputDStream 的实例。
    val lines = ssc.socketTextStream("192.168.1.66", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    //    val lines = ssc.textFileStream("F:\\scv")
    val words = lines.flatMap(_.split(" ")) // DStream transformation
    val pairs = words.map(word => (word, 1)) // DStream transformation
    //    val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation
    //每隔3秒钟,计算过去5秒的词频,显然一次计算的内容与上次是有重复的。如果不想重复,把2个时间设为一样就行了。
    //    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(5), Seconds(3))
    val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, _ - _, Seconds(5), Seconds(3))
    windowedWordCounts.filter(x => x._2 != 0).print()
    //    wordCounts.print() // DStream output,打印每秒计算的词频
    //需要注意的是,当以上这些代码被执行时,Spark Streaming仅仅准备好了它要执行的计算,实际上并没有真正开始执行。在这些转换操作准备好之后,要真正执行计算,需要调用如下的方法
    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
    //在StreamingContext上调用stop()方法,也会关闭SparkContext对象。如果只想仅关闭StreamingContext对象,设置stop()的可选参数为false
    //一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面StreamingContext创建之前关闭(不关闭SparkContext)
    ssc.stop()
  }
}

1.启动
start-dfs.sh
start-yarn.sh
这里写图片描述
2.终端输入:
nc -lk 9999
然后在IEDA中运行spark程序。由于9999端口中还没有写东西,所以运行是下图:

这里写图片描述
只有时间,没有打印出东西。然后在终端输入下面的东西,也可以从其他地方复制进来。
hello world
hello hadoop
hadoop love
love cat
cat love rabbit
这时,IDEA的控制台就输出下面的东西。
这里写图片描述

3.下面运行带时间窗口的,注意如果加了时间窗口就必须有checkpoint
输入下面的,不要一次全输入,一次输个几行。
checkpoint
hello world
hello hadoop
hadoop love
love cat
cat love rabbit
ni hao a
hello world
hello hadoop
hadoop love
love cat
cat love rabbit
hello world
hello hadoop
hadoop love
love cat
cat love rabbit

先是++–的那种:
这里写图片描述
这里写图片描述
这里写图片描述
这里写图片描述
再然后是不++–的那种:
这里写图片描述
这里写图片描述
++–的那种是因为把过去的RDD也带进来计算了,所以出现了0这个情况,为了避免这种情况只能在打印前过滤掉0的再打印。而没有++–的那种情况是不需要这样做的。

Checkpointing
在容错、可靠的文件系统(HDFS、s3等)中设置一个目录用于保存checkpoint信息。就可以通过streamingContext.checkpoint(checkpointDirectory)方法来做。
默认的间隔时间是批间隔时间的倍数,最少10秒。它可以通过dstream.checkpoint来设置。需要注意的是,随着 streaming application 的持续运行,checkpoint 数据占用的存储空间会不断变大。因此,需要小心设置checkpoint 的时间间隔。设置得越小,checkpoint 次数会越多,占用空间会越大;如果设置越大,会导致恢复时丢失的数据和进度越多。一般推荐设置为 batch duration 的5~10倍。

package streaming

import java.io.File
import java.nio.charset.Charset

import com.google.common.io.Files

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by Administrator on 2017/3/12.
  */

object RecoverableNetworkWordCount {

  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String): StreamingContext = {
    println("Creating new context") //如果没有出现这句话,说明StreamingContext是从checkpoint里面加载的
    val outputFile = new File(outputPath) //输出文件的目录
    if (outputFile.exists()) outputFile.delete()
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1)) //时间间隔是1秒
    ssc.checkpoint(checkpointDirectory) //设置一个目录用于保存checkpoint信息

    val lines = ssc.socketTextStream(ip, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    val windowedWordCounts = wordCounts.reduceByKeyAndWindow(_ + _, _ - _, Seconds(30), Seconds(10))
    windowedWordCounts.checkpoint(Seconds(10))//一般推荐设置为 batch duration 的5~10倍,即StreamingContext的第二个参数的5~10倍
    windowedWordCounts.print()
    Files.append(windowedWordCounts + "\n", outputFile, Charset.defaultCharset())
    ssc
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 4) {
      System.exit(1)
    }
    val ip = args(0)
    val port = args(1).toInt
    val checkpointDirectory = args(2)
    val outputPath = args(3)
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createContext(ip, port, outputPath, checkpointDirectory))
    ssc.start()
    ssc.awaitTermination()
  }
}

优化
1.数据接收的并行水平
创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。因此允许数据并行接收,提高整体的吞吐量。

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

多输入流或者多receiver的可选的方法是明确地重新分配输入数据流(利用inputStream.repartition()),在进一步操作之前,通过集群的机器数分配接收的批数据。
2.任务序列化
运行kyro序列化任何可以减小任务的大小,从而减小任务发送到slave的时间。

    val conf = new SparkConf().setAppName("analyse_domain_day").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

3.设置合适的批间隔时间(即批数据的容量)
批处理时间应该小于批间隔时间。如果时间间隔是1秒,但处理需要2秒,则处理赶不上接收,待处理的数据会越来越多,最后就嘣了。
找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。为了验证你的系统是否能满足数据处理速率,你可以通过检查端到端的延迟值来判断(可以在Spark驱动程序的log4j日志中查看”Total delay”或者利用StreamingListener接口)。如果延迟维持稳定,那么系统是稳定的。如果延迟持续增长,那么系统无法跟上数据处理速率,是不稳定的。你能够尝试着增加数据处理速率或者减少批容量来作进一步的测试。

DEMO

spark流操作kafka有两种方式:
一种是利用接收器(receiver)和kafaka的高层API实现。
一种是不利用接收器,直接用kafka底层的API来实现(spark1.3以后引入)。

相比基于Receiver方式有几个优点:
1、不需要创建多个kafka输入流,然后Union他们,而使用DirectStream,spark Streaming将会创建和kafka分区一样的RDD的分区数,而且会从kafka并行读取数据,Spark的分区数和Kafka的分区数是一一对应的关系。
2、第一种实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次:一次是被Kafka复制;另一次是写入到WAL中。
Direct的方式是会直接操作kafka底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理。即数据一定会被处理。拉数据,是RDD在执行的时候直接去拉数据。
3、Receiver方式读取kafka,使用的是高层API将偏移量写入ZK中,虽然这种方法可以通过数据保存在WAL中保证数据的不对,但是可能会因为sparkStreaming和ZK中保存的偏移量不一致而导致数据被消费了多次。
第二种方式不采用ZK保存偏移量,消除了两者的不一致,保证每个记录只被Spark Streaming操作一次,即使是在处理失败的情况下。如果想更新ZK中的偏移量数据,需要自己写代码来实现。
由于直接操作的是kafka,kafka就相当于你底层的文件系统。这个时候能保证严格的事务一致性,即一定会被处理,而且只会被处理一次。

首先去maven的官网上下载jar包
spark-streaming_2.10-1.6.2.jar
spark-streaming-kafka_2.10-1.6.2.jar
我的Scala是2.10的,spark是1.6.0的,下载的spark.streaming和kafka版本要与之对应,spark-streaming_2.10-1.6.2.jar中2.10是Scala版本号,1.6.2是spark版本号。当然下载1.6.1也行。
需要添加 kafka-clients-0.8.2.1.jar以及kafka_2.10-0.8.2.1.jar
这里的2.10是Scala版本号,0.8.2.1是kafka的版本号。就下这个版本,别的版本不对应,会出错。

在kafka的配置文件里面:
delete.topic.enable=true
host.name=192.168.1.66
zookeeper.connect=192.168.1.66:2181
我这里写主机名的话,各种报错,所以干脆就写IP地址了。
启动kafka以及ZK的步骤和kafka 1-2是一样的。
进入/kafka_2.10-0.8.2.1 新建一个主题:
bin/kafka-topics.sh –create –zookeeper 192.168.1.66:2181 –replication-factor 1 –partitions 1 –topic test
启动一个生产者:
bin/kafka-console-producer.sh –broker-list 192.168.1.66:9092 –topic test
在自己的电脑上运行spark程序后,在命令行输入:
这里写图片描述
在控制台会显示:
这里写图片描述

package SparkStreaming

//TopicAndPartition是对 topic和partition的id的封装的一个样例类
import kafka.common.TopicAndPartition
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import kafka.serializer.StringDecoder

object SparkStreaming_Kafka_Test {

  val kafkaParams = Map(
    //kafka broker的IP加端口号,这个是必须的
    "metadata.broker.list" -> "192.168.1.66:9092",
    // "group.id" -> "group1",
    /*此配置参数表示当此groupId下的消费者,
     在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),
     consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),
     smallest表示最小offset,即从topic的开始位置消费所有消息.*/
    "auto.offset.reset" -> "smallest"
  )

  val topicsSet = Set("test")

  //  val zkClient = new ZkClient("xxx:2181,xxx:2181,xxx:2181",Integer.MAX_VALUE,100000,ZKStringSerializer)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming_Kafka_Test")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(2))
    ssc.checkpoint("F:\\checkpoint")
    /*
    KafkaUtils.createDirectStream[
       [key的数据类型], [value的数据类型], [key解码的类], [value解码的类] ](
       streamingContext, [Kafka配置的参数,是一个map], [topics的集合,是一个set])
       */
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2) //取value
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。所以说checkpoint就已经可以保证容错性了。
如果需要把偏移量写入ZK,首先在工程中新建一个包:org.apache.spark.streaming.kafka,然后建一个KafkaCluster类:

package org.apache.spark.streaming.kafka

import kafka.api.OffsetCommitRequest
import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig

import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal

class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
  type Err = ArrayBuffer[Throwable]

  @transient private var _config: SimpleConsumerConfig = null

  def config: SimpleConsumerConfig = this.synchronized {
    if (_config == null) {
      _config = SimpleConsumerConfig(kafkaParams)
    }
    _config
  }

  def setConsumerOffsets(groupId: String, offsets: Map[TopicAndPartition, Long], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = {
    val meta = offsets.map {
      kv => kv._1 -> OffsetAndMetadata(kv._2)
    }
    setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
  }

  def setConsumerOffsetMetadata(groupId: String, metadata: Map[TopicAndPartition, OffsetAndMetadata], consumerApiVersion: Short): Either[Err, Map[TopicAndPartition, Short]] = {
    var result = Map[TopicAndPartition, Short]()
    val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
    val errs = new Err
    val topicAndPartitions = metadata.keySet
    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
      val resp = consumer.commitOffsets(req)
      val respMap = resp.commitStatus
      val needed = topicAndPartitions.diff(result.keySet)
      needed.foreach { tp: TopicAndPartition =>
        respMap.get(tp).foreach { err: Short =>
          if (err == ErrorMapping.NoError) {
            result += tp -> err
          } else {
            errs.append(ErrorMapping.exceptionFor(err))
          }
        }
      }
      if (result.keys.size == topicAndPartitions.size) {
        return Right(result)
      }
    }
    val missing = topicAndPartitions.diff(result.keySet)
    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
    Left(errs)
  }

  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)(fn: SimpleConsumer => Any): Unit = {
    brokers.foreach { hp =>
      var consumer: SimpleConsumer = null
      try {
        consumer = connect(hp._1, hp._2)
        fn(consumer)
      } catch {
        case NonFatal(e) =>
          errs.append(e)
      } finally {
        if (consumer != null) {
          consumer.close()
        }
      }
    }
  }

  def connect(host: String, port: Int): SimpleConsumer =
    new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)
}

然后在主函数中:

// 手动更新ZK偏移量,使得基于ZK偏移量的kafka监控工具可以使用
    messages.foreachRDD(rdd => {
      // 先处理消息
      val lines = rdd.map(_._2) //取value
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
      wordCounts.foreach(println)
      // 再更新offsets
      //spark内部维护kafka偏移量信息是存储在HasOffsetRanges类的offsetRanges中
      //OffsetRange 包含信息有:topic名字,分区Id,开始偏移,结束偏移。
      val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
      val kc = new KafkaCluster(kafkaParams)
      for (offsets <- offsetsList) {
        val topicAndPartition = TopicAndPartition("test", offsets.partition)
        val o = kc.setConsumerOffsets("group1", Map((topicAndPartition, offsets.untilOffset)),8)
        if (o.isLeft) {
          println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
        }
      }
    })

下面是用kafka的API自己写一个程序读取文件,作为kafka的生产者,需要将Scala和kafka的所有的jar包都导入,lib文件夹下面的都导入进去。
如果没有2台电脑,可以开2个开发环境,IDEA作为消费者,eclipse作为生产者。
生产者代码如下:

package spark_streaming_kafka_test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MakeRealtimeDate extends Thread {

    private Producer<Integer, String> producer;

    public MakeRealtimeDate() {
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("zk.connect", "192.168.1.66:2181");
        props.put("metadata.broker.list", "192.168.1.66:9092");
        ProducerConfig pc = new ProducerConfig(props);
        producer = new Producer<Integer, String>(pc);
    }

    public void run() {
        while (true) {
            File file = new File("C:\\Users\\Administrator\\Desktop\\wordcount.txt");
            BufferedReader reader = null;
            try {
                reader = new BufferedReader(new FileReader(file));
            } catch (FileNotFoundException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            String lineTxt = null;
            try {
                while ((lineTxt = reader.readLine()) != null) {
                    System.out.println(lineTxt);
                    producer.send(new KeyedMessage<Integer, String>("test", lineTxt));
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] args) {
        new MakeRealtimeDate().start();
    }

}

先启动之前写的sparkstreaming消费者统计单词个数的程序,然后再启动我们现在写的这个生产者程序,最后就会在IDEA的控制台中看到实时结果。

2017-11-22 13:15:34 h_1_y_m 阅读数 23
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1722 人正在学习 去看看 李飞
一,flume配置
# Name the components on this agent
a1.sources = tailsource-1
a1.sinks = remotesink
a1.channels = memoryChnanel-1

# Describe/configure the source
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.command = tail -F /var/log/test/raw_data.txt

a1.sources.tailsource-1.channels = memoryChnanel-1

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.memoryChnanel-1.type = memory
a1.channels.memoryChnanel-1.keep-alive = 10
a1.channels.memoryChnanel-1.capacity = 100000
a1.channels.memoryChnanel-1.transactionCapacity = 100000

# Bind the source and sink to the channel
a1.sinks.remotesink.type = avro
a1.sinks.remotesink.hostname = 172.18.203.137
a1.sinks.remotesink.port = 9999
a1.sinks.remotesink.channel = memoryChnanel-1


#agent section
producer.sources = s
producer.channels = c
producer.sinks = r

#source section
producer.sources.s.type = avro
producer.sources.s.bind = 172.18.203.137
producer.sources.s.port = 9999

producer.sources.s.channels = c

# Each sink's type must be defined
producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.r.topic = mytopic
producer.sinks.r.brokerList = master1:9092,master2:9092,slave2:9092
producer.sinks.r.requiredAcks = 1
producer.sinks.r.batchSize = 20
producer.sinks.r.channel = c1

#Specify the channel the sink should use
producer.sinks.r.channel = c

# Each channel's type is defined.
producer.channels.c.type = org.apache.flume.channel.kafka.KafkaChannel
producer.channels.c.capacity = 10000
producer.channels.c.transactionCapacity = 1000
producer.channels.c.brokerList=master1:9092,master2:9092,slave2:9092
producer.channels.c.topic=channel1
producer.channels.c.zookeeperConnect=master2:2181,slave2:2181,slave4:2181

二, Spark代码

import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
* Author: david
* Date : 3/7/17
*/
object StreamingDataTest {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

val conf = new SparkConf().setAppName("StreamingDataTest").setMaster("local[4]")

val sc = new SparkContext(conf)

val ssc = new StreamingContext(sc, Seconds(1))

// Kafka的topic
val topics = Set("mytopic")

//kafka brokers列表
val brokers = "master1:9092,master2:9092,slave3:9092"

//kafka查询参数
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

//创建direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

//kafkaStream这个tuple的第二部分为接收kafka topic里的文本流
val rawDStream = kafkaStream.flatMap(_._2.split("\\s+")).map((_, 1))

val resDStream = rawDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Seconds(8),
Seconds(4));

resDStream.print();

ssc.start()
ssc.awaitTermination()
}

}


三,注意事项
查看/var/log/flume-ng下面的日志报错信息
avro端口号绑定大于公共端口1024
注意linux防火墙service iptables stop
注意运行scala依赖的scope为 provided编译可以,但本机运行找不到class
2018-03-28 14:56:42 xo19882011 阅读数 1333
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1722 人正在学习 去看看 李飞

被kafka的新版配置折磨的死去活来的,终于搞定了。。。放松一下写此篇博客以记录一下。

开发环境

  • spark 2.2.0
  • scala 2.11.8 (目前为止,高版本的scala貌似对kafka的支持还有坑。。。)
  • sbt(目前为止,顺便说一下,如果是mac 10.13 之后的系统,并且使用 IntelliJ IDEA的话,sbt的版本要选择 1.0.3左右的,选择1.1.0之后那又是会爽的死去活来的)
  • IntelliJ IDEA 社区版 2017.2.16
  • kafka的版本,由于设置了sasl认证,Kafka 的版本要1.0之后的,具体的对应关系可以查看 此官网链接

为了方便大家配置贴出我的sbt配置:

name := "kstreaming"

version := "0.1"

scalaVersion := "2.11.8"

val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion
)

当然了链接的普通例子还是以官网的为准,戳这里
回归正题,接着说咱们的sasl配置,相对于官网的配置,只需要修改以下部位…

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "security.protocol" -> "SASL_PLAINTEXT", // 这两项是我们自己的sasl设置,相对于官网的代码也只改了这两行
  "sasl.mechanism" -> "PLAIN"              // 这两项是我们自己的sasl设置,相对于官网的代码也只改了这两行
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

接下来就是在 IntelliJ IDEA里边跑以下结果了,当然要顺手设置以下IntelliJ IDEA了~

  • 界面右上角运行的按钮旁有个下拉框,下拉选择 Edit Configurations
  • Confirguration界面的 VM options选项的最右边有个按钮
  • 在里边添加如下代码即可
-Dspark.master=local
-Djava.security.auth.login.config=kafka_client_jaas.conf

第一行是单机运行的意思
第二行是告诉spark运行的时候把认证信息加到运行环境中
当然这里边设置了个文件,这个文件要放到工作目录中~
文件的内容呢,如下

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxx"
  password="xxx";
};

完成了这三步简单的配置之后,跑起来就清爽了!!!
废话不多说,敲代码去了 T_T

Spark Kafka

阅读数 237

没有更多推荐了,返回首页