2018-12-03 00:19:26 shenshouniu 阅读数 250
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2452 人正在学习 去看看 肖滨

本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

1 Producer端基本数据结构

  • ProducerRecord: 一个ProducerRecord表示一条待发送的消息记录,主要由5个字段构成:

      topic          所属topic
      partition      所属分区
      key            键值
      value          消息体
      timestamp      时间戳
    
  • RecordMetadata: Kafka服务器端返回给客户端的消息的元数据信息,前3项相对比较重要,Producer端可以使用这些消息做一些消息发送成功之后的处理。

      offset                   该条消息的位移
      timestamp                消息时间戳
      topic + partition        所属topic的分区
      checksum                 消息CRC32码
      serializedKeySize        序列化后的消息键字节数
      serializedValueSize      序列化后的消息体字节数
    

2 Producer端消息发送流程

  • 在send()的发送消息动作触发之前,通过props属性中指定的servers连接到broker集群,从Zookeeper收集集群Metedata信息,从而了解哪些broker掌管哪一个Topic的哪一个partition,以及brokers的健康状态。

  • 下面就是流水线操作,ProducerRecord对象携带者topic,partition,message等信息,在Serializer这个“车间”被序列化。

  • 序列化过后的ProducerRecord对象进入Partitioner“车间”,按照上文所述的Partitioning 策略决定这个消息将被分配到哪个Partition中。

  • 确定partition的ProducerRecord进入一个缓冲区,通过减少IO来提升性能,在这个“车间”,消息被按照TopicPartition信息进行归类整理,相同Topic且相同parition的ProducerRecord被放在同一个RecordBatch中,等待被发送。什么时候发送?都在Producer的props中被指定了,有默认值,显然我们可以自己指定。

      (1) batch.size:设置每个RecordBatch可以缓存的最大字节数 
      (2) buffer.memory:设置所有RecordBatch的总共最大字节数 
      (3) linger.ms设置每个RecordBatch的最长延迟发送时间 
      (4) max.block.ms 设置每个RecordBatch的最长阻塞时间 
    
  • 一旦,当单个RecordBatch的linger.ms延迟到达或者batch.size达到上限,这个 RecordBatch会被立即发送。另外,如果所有RecordBatch作为一个整体,达到了buffer.memroy或者max.block.ms上限,所有的RecordBatch都会被发送。

  • ProducerRecord消息按照分配好的Partition发送到具体的broker中,broker接收保存消息,更新Metadata信息,同步给Zookeeper。

  • Producer端其他优化点:

      (5) acks:Producer的数据确认阻塞设置,0表示不管任何响应,只管发,发完了立即执行下个任务,这种方式最快,但是很不保险。1表示只确保leader成功响应,接收到数据。2表示确保leader及其所有follwer成功接收保存消息,也可以用”all”。
      (6) retries:消息发送失败重试的次数。
      (7) retry.backoff.ms:失败补偿时间,每次失败重试的时间间隔,不可设置太短,避免第一条消息的响应还没返回,第二条消息又发出去了,造成逻辑错误。
      (8) max.in.flight.request.per.connection:同一时间,每个Producer能够发送的消息上限。
      (9) compression.type  producer所使用的压缩器,目前支持gzip, snappy和lz4。压缩是在用户主线程完成的,通常都需要花费大量的CPU时间,但对于减少网络IO来说确实利器。生产环境中可以结合压力测试进行适当配置
    

3 消息缓冲区(accumulator)再剖析

  • producer创建时会创建一个默认32MB(由buffer.memory参数指定)的accumulator缓冲区,专门保存待发送的消息。

  • 该数据结构中还包含了一个特别重要的集合信息:消息批次信息(batches)。该集合本质上是一个HashMap,里面分别保存了每个topic分区下的batch队列,即前面说的批次是按照topic分区进行分组的。这样发往不同分区的消息保存在对应分区下的batch队列中。

  • 假设消息M1, M2被发送到test的0分区但属于不同的batch,M3分送到test的1分区,那么batches中包含的信息就是:{“test-0” -> [batch1, batch2], “test-1” -> [batch3]}

  • 每个batch中最重要的3个组件包括:

      compressor: 负责执行追加写入操作
      batch缓冲区:由batch.size参数控制,消息被真正追加写入到的地方
      thunks:保存消息回调逻辑的集合
    
  • 本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,如有任何学术交流,可随时联系。

  • Sender线程自KafkaProducer创建后就一直都在运行着 。它的工作流程基本上是这样的:

      (1)不断轮询缓冲区寻找已做好发送准备的分区 
      (2)将轮询获得的各个batch按照目标分区所在的leader broker进行分组
      (3)将分组后的batch通过底层创建的Socket连接发送给各个broker
      (4)等待服务器端发送response回来
    

  • Sender线程会发送PRODUCE请求给对应的broker,broker处理完毕之后发送对应的PRODUCE response。一旦Sender线程接收到response将依次(按照消息发送顺序)调用batch中的回调方法

4 总结

  • Sender线程自KafkaProducer创建后就一直都在运行着,单个RecordBatch的linger.ms延迟到达或者batch.size达到上限,作为后台线程就会检测到立即发送。
  • accumulator缓冲器按照Topic partion进行分组,来进行集中向某一个Broker发送。
  • 本文通过学习胡夕的相关技术博客和书籍,进行学习笔记总结,辛苦成文,实属不易,各自珍惜,谢谢。
  • 秦凯新 于深圳 201812030018
2018-01-17 09:12:40 smile_smlie 阅读数 198
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2452 人正在学习 去看看 肖滨

引言

Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。借用官方的一张图,可以直观地看到topic和partition的关系。
Anatomy of a Topic

partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。

接下来,本文将分析partition目录中的文件的存储格式和相关的代码所在的位置。

3.1、Partition的数据文件

Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:

  • offset
  • MessageSize
  • data

其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。它的格式和Kafka通讯协议中介绍的MessageSet格式是一致。

Partition的数据文件则包含了若干条上述格式的Message,按offset由小到大排列在一起。它的实现类为FileMessageSet,类图如下:
FileMessageSet类图
它的主要方法如下:

  • append: 把给定的ByteBufferMessageSet中的Message写入到这个数据文件中。
  • searchFor: 从指定的startingPosition开始搜索找到第一个Message其offset是大于或者等于指定的offset,并返回其在文件中的位置Position。它的实现方式是从startingPosition开始读取12个字节,分别是当前MessageSet的offset和size。如果当前offset小于指定的offset,那么将position向后移动LogOverHead+MessageSize(其中LogOverHead为offset+messagesize,为12个字节)。
  • read:准确名字应该是slice,它截取其中一部分返回一个新的FileMessageSet。它不保证截取的位置数据的完整性。
  • sizeInBytes: 表示这个FileMessageSet占有了多少字节的空间。
  • truncateTo: 把这个文件截断,这个方法不保证截断位置的Message的完整性。
  • readInto: 从指定的相对位置开始把文件的内容读取到对应的ByteBuffer中。

我们来思考一下,如果一个partition只有一个数据文件会怎么样?

  1. 新数据是添加在文件末尾(调用FileMessageSet的append方法),不论文件数据文件有多大,这个操作永远都是O(1)的。
  2. 查找某个offset的Message(调用FileMessageSet的searchFor方法)是顺序查找的。因此,如果数据文件很大的话,查找的效率就低。

那Kafka是如何解决查找效率的的问题呢?有两大法宝:1) 分段 2) 索引。

3.2、数据文件的分段

Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。

3.3、为数据文件建索引

数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

  • 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
  • position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。

index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。

在Kafka中,索引文件的实现类为OffsetIndex,它的类图如下:
OffsetIndex类图

主要的方法有:

  • append方法,添加一对offset和position到index文件中,这里的offset将会被转成相对的offset。
  • lookup, 用二分查找的方式去查找小于或等于给定offset的最大的那个offset

小结

我们以几张图来总结一下Message是如何在Kafka中存储的,以及如何查找指定offset的Message的。

Message是按照topic来组织,每个topic可以分成多个的partition,比如:有5个partition的名为为page_visits的topic的目录结构为:
topic_partition

partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件:
partition
可以看到,这个partition有4个LogSegment。

展示是如何查找Message的。
search
比如:要查找绝对offset为7的Message:

  1. 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
  2. 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。
  3. 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。

一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。


2015-12-29 18:27:46 iteye_10680 阅读数 87
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2452 人正在学习 去看看 肖滨

一、Kafka源代码的工程结构

如下图所示:

 

 

二、各模板简要说明

admin:管理员模块,操作和管理topic,paritions相关,包含create,delete topic,扩展patitions

Api:该模块主要负责交互数据的组装,客户端与服务端交互数据编解码

client:该模块比较简单就一个类,Producer读取kafka broker元数据信息topic和partitions,以及leader

cluster:该模块包含几个实体类,Broker,Cluster,Partition,Replica,解释他们之间关系:	
	  Cluster由多个broker组成,一个Broker包含多个partition,一个topic的所有
	  partitions分布在不同broker的中,一个Replica包含多个Partition。

common:通用模块,只包含异常类和错误验证

consumer:consumer处理模块,负责所有客户端消费者数据和逻辑处理

contoroller:负责中央控制器选举,partition的leader选举,副本分配,副本重新分配,partition和replica扩容。

javaapi:提供java的producer和consumer接口api

log:Kafka文件存储模块,负责读写所有kafka的topic消息数据。

message:封装多个消息组成一个“消息集”或压缩消息集。

metrics:内部状态的监控模块

network:网络事件处理模块,负责处理和接收客户端连接

producer:producer实现模块,包括同步和异步发送消息。

serializer:序列化或反序列化当前消息

kafka:kafka门面入口类,副本管理,topic配置管理,leader选举实现(由contoroller模块调用)。

tools:一看这就是工具模块,包含内容比较多:
			a.导出对应consumer的offset值.
			b.导出LogSegments信息,当前topic的log写的位置信息.
			c.导出zk上所有consumer的offset值.
			d.修改注册在zk的consumer的offset值.
			f.producer和consumer的使用例子.

utils:Json工具类,Zkutils工具类,Utils创建线程工具类,KafkaScheduler公共调度器类,公共日志类等等。

 

 

 

 

 

2018-09-07 21:21:02 Jantelope 阅读数 7500
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2452 人正在学习 去看看 肖滨

简单了解一下Kafka:是一种高吞吐量的分布式发布订阅消息系统。依赖Zookeeper,因此搭建Kafka的时候需要事先搭建好Zookeeper。体系结构如下:

当SparkStreaming与Kafka做集成的时候Kafka成了Streaming的高级数据源,由于Spark Streaming和Kafka集成的时候,依赖的jar包比较多,而且还会产生冲突。强烈建议使用Maven的方式来搭建项目工程

pom文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ZDemo5</groupId>
    <artifactId>ZDemo5</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>

</project>

启动kafka:bin/kafka-server-start.sh config/server.properties &       ---后台方式启动

创建topic:bin/kafka-topics.sh --create --zookeeper bigdata111:2181 -replication-factor 1 --partitions 3 --topic mydemo2

发布消息:bin/kafka-console-producer.sh --broker-list bigdata111:9092 --topic mydemo2

SparkStreaming读取Kafka数据源由两种模式: 

模式一:Receiver 的方式,Receivers的实现使用到Kafka高层次的消费者API。对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据。

这种方法吞吐量不高

代码实现:

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel

object KafkaRecciver {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("hdfs://bigdata111:9000/checkpoint")
    //创建kafka对象   生产者 和消费者 
    //模式1 采取的是 receiver 方式  reciver 每次只能读取一条记录
    val topic = Map("mydemo2" -> 1)
    //直接读取的方式  由于kafka 是分布式消息系统需要依赖Zookeeper
    val data = KafkaUtils.createStream(ssc, "192.168.128.111:2181", "mygroup", topic, StorageLevel.MEMORY_AND_DISK)
    //数据累计计算
    val updateFunc =(curVal:Seq[Int],preVal:Option[Int])=>{
    //进行数据统计当前值加上之前的值
    var total = curVal.sum
    //最初的值应该是0
    var previous = preVal.getOrElse(0)
    //Some 代表最终的返回值
    Some(total+previous)
  }
   val result = data.map(_._2).flatMap(_.split(" ")).map(word=>(word,1)).updateStateByKey(updateFunc).print()
   //启动ssc
   ssc.start()
   ssc.awaitTermination()
    
  }
}

模式二:与基于Receiver接收数据不一样,这种方式定期地从Kafka的topic+partition中查询最新的偏移量,再根据定义的偏移量范围在每个batch里面处理数据。当作业需要处理的数据来临时,spark通过调用Kafka的简单消费者API读取一定范围的数据。这种模式可以有效提高系统的吞吐量

代码实现:

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder

object KafkaDirector {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    //构建conf ssc 对象
    val conf = new SparkConf().setAppName("Kafka_director").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(3))
    //设置数据检查点进行累计统计单词
    ssc.checkpoint("hdfs://192.168.128.111:9000/checkpoint")
    //kafka 需要Zookeeper  需要消费者组
    val topics = Set("mydemo2")
    //                                     broker的原信息                                  ip地址以及端口号
    val kafkaPrams = Map[String,String]("metadata.broker.list" -> "192.168.128.111:9092")
    //                                          数据的输入了类型    数据的解码类型
    val data = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaPrams, topics)
    val updateFunc =(curVal:Seq[Int],preVal:Option[Int])=>{
      //进行数据统计当前值加上之前的值
      var total = curVal.sum
      //最初的值应该是0
      var previous = preVal.getOrElse(0)
      //Some 代表最终的但会值
      Some(total+previous)
    }
    //统计结果
    val result = data.map(_._2).flatMap(_.split(" ")).map(word=>(word,1)).updateStateByKey(updateFunc).print() 
    //启动程序
    ssc.start()
    ssc.awaitTermination()
    
  }
}

结果展示:

-------------------------------------------
Time: 1536325032000 ms
-------------------------------------------
(love,1)
(Beijing,1)
(I,1)

-------------------------------------------
Time: 1536325035000 ms
-------------------------------------------
(love,2)
(Beijing,1)
(I,2)
(Shanghai,1)

-------------------------------------------
Time: 1536325038000 ms
-------------------------------------------
(love,2)
(Beijing,1)
(I,2)
(Shanghai,1)

-------------------------------------------
Time: 1536325041000 ms
-------------------------------------------
(love,2)
(Beijing,1)
(I,2)
(Shanghai,1)

 

2018-06-12 00:09:56 rlnLo2pNEfx9c 阅读数 4202
  • 数据硬实战之kafka视频教程

    大数据硬实战之kafka视频培训教程:本课程将从实战出发,讲解kafka原理以及在大数据中的应用场景的,是目前市面上少有的kafka实战教程,包括Kafka介绍、性能测试、数据、消息。

    2452 人正在学习 去看看 肖滨

一,架构介绍

生产中由于历史原因web后端,mysql集群,kafka集群(或者其它消息队列)会存在一下三种结构。

1,数据先入mysql集群,再入kafka

数据入mysql集群是不可更改的,如何再高效的将数据写入kafka呢?

A),在表中存在自增ID的字段,然后根据ID,定期扫描表,然后将数据入kafka

B),有时间字段的,可以按照时间字段定期扫描入kafka集群。

C),直接解析binlog日志,然后解析后的数据写入kafka

640?wx_fmt=png

2web后端同时将数据写入kafkamysql集群

640?wx_fmt=png

3web后端将数据先入kafka,再入mysql集群

这个方式,有很多优点,比如可以用kafka解耦,然后将数据按照离线存储和计算,实时计算两个模块构建很好的大数据架构。抗高峰,便于扩展等等。

640?wx_fmt=png

二,实现步骤

1mysql安装准备

安装mysql估计看这篇文章的人都没什么问题,所以本文不具体讲解了。

A),假如你单机测试请配置好server_id

B),开启binlog,只需配置log-bin

[root@localhost ~]# cat /etc/my.cnf

[mysqld]

server_id=1

datadir=/var/lib/mysql

socket=/var/lib/mysql/mysql.sock

user=mysql

# Disabling symbolic-links is recommended to prevent assorted security risks

symbolic-links=0

log-bin=/var/lib/mysql/mysql-binlog

[mysqld_safe]

log-error=/var/log/mysqld.log

pid-file=/var/run/mysqld/mysqld.pid

 

创建测试库和表

create database school character set utf8 collate utf8_general_ci;

 

create table student(

name varchar(20) not null comment '姓名',

sid int(10) not null primary key comment '学员',

majora varchar(50) not null default '' comment '专业',

tel varchar(11) not null unique key comment '手机号',

birthday date not null comment '出生日期'

);

2binlog日志解析

两种方式:

一是扫面binlog文件(有需要的话请联系浪尖)

二是通过复制同步的方式

暂实现了第二种方式,样例代码如下:

MysqlBinlogParse mysqlBinlogParse = new MysqlBinlogParse(args[0],Integer.valueOf(args[1]),args[2],args[3]){
  @Override
  public void processDelete(String queryType, String database, String sql) {
    try {
      String jsonString = SqlParse.parseDeleteSql(sql);
      JSONObject jsonObject = JSONObject.fromObject(jsonString);
      jsonObject.accumulate("database", database);
      jsonObject.accumulate("queryType", queryType);
      System.out.println(sql);
      System.out.println(" ");
      System.out.println(" ");
      System.out.println(jsonObject.toString());
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }

  @Override
  public void processInsert(String queryType, String database, String sql) {
    try {
      String jsonString = SqlParse.parseInsertSql(sql);
      JSONObject jsonObject = JSONObject.fromObject(jsonString);
      jsonObject.accumulate("database", database);
      jsonObject.accumulate("queryType", queryType);
      System.out.println(sql);
      System.out.println(" ");
      System.out.println(" ");
      System.out.println(jsonObject.toString());
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }

  @Override
  public void processUpdate(String queryType, String database, String sql) {
    String jsonString;
    try {
      jsonString = SqlParse.parseUpdateSql(sql);
      JSONObject jsonObject = JSONObject.fromObject(jsonString);
      jsonObject.accumulate("database", database);
      jsonObject.accumulate("queryType", queryType);
      System.out.println(sql);
      System.out.println(" ");
      System.out.println(" ");
      System.out.println(jsonObject.toString());
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }

};
mysqlBinlogParse.setServerId(3);
mysqlBinlogParse.start();

 

 

3sql语法解析

从原始的mysql binlog event中,我们能解析到的信息,主要的也就是mysqldatabasequery类型(INSERT,DELETE,UPDATE),具体执行的sql。我这里封装了三个重要的方法。只暴露了这三个接口,那么我们要明白的事情是,我们入kafka,然后流式处理的时候希望的到的是跟插入mysql后一样格式的数据。这个时候我们就要自己做sql的解析,将querysql解析成字段形式的数据,供流式处理。解析的格式如下:

A),INSERT

640?wx_fmt=png

B),DELETE

640?wx_fmt=png

C),UPDATE

640?wx_fmt=png

最终浪尖是将解析后的数据封装成了json,然后我们自己写kafka producer将消息发送到kafka,后端就可以处理了。

三,总结

最后,浪尖还是建议web后端数据最好先入消息队列,如kafka,然后分离线和实时将数据进行解耦分流,用于实时处理和离线处理。

 

消息队列的订阅者可以根据需要随时扩展,可以很好的扩展数据的使用者。

 

消息队列的横向扩展,增加吞吐量,做起来还是很简单的。这个用传统数据库,分库分表还是很麻烦的。

 

由于消息队列的存在,也可以帮助我们抗高峰,避免高峰时期后端处理压力过大导致整个业务处理宕机。

 

具体源码球友可以在知识星球获取。


欢迎大家进入知识星球,学习更多更深入的大数据知识,面试经验,获取更多更详细的资料。

640?wx_fmt=jpeg

没有更多推荐了,返回首页