flume kafka
-
flume kafka
2016-06-02 16:45:00flume kafka 转载于:https://www.cnblogs.com/liyulong1982/p/5553605.html转载于:https://www.cnblogs.com/liyulong1982/p/5553605.html
-
Spark streaming 与flume kafka整合
2019-12-08 15:32:15Spark streaming 与flume kafka整合创建 Kafka topic创建Flume agent创建LoggerGenerator日志产生器Spark streaming 程序整个流程的启动步骤 这篇博客主要记录将spark Streaming和Flume,Kafka进行整和的一个简单例子...Spark streaming 与flume kafka整合
这篇博客主要记录将spark Streaming和Flume,Kafka进行整和的一个简单例子。首先创建一个类,产生log信息并且将产生的log传输给Flume。Flume接受数据后,通过Kafka sink将日志发布到Kafka中,将Flume作为Kafka的生产者。最后Spark Stream作为Kafka的消费者将数据进行处理。创建 Kafka topic
首先创建一个Kafka的topic,用作消息订阅。在启动zookeeper和kafka后,使用如下命令创建一个topic
bin/kafka-topics.sh --create --zookeeper hadoop132:2181 --replication-factor 1 --partitions 1 --topic streamingtopic
因为我的kafka的版本为0.11.0,所以要使用zookeeper,版本更高的话不使用zookeeper即可,具体可查看kafka的官网文档。
创建后可以使用如下命令查看kafka中所有的topic,是否有上述创建的streamingtopic。bin/kafka-topics.sh --list --zookeeper hadoop132:2181
然后可使用官方的生产者和消费者进行测试,看是否可以成功传输数据
bin/kafka-console-producer.sh --broker-list hadoop132:9092 --topic streamingtopic bin/kafka-console-consumer.sh --zookeeper hadoop132:9092 --topic streamingtopic --from-beginning
注意,上述的所有操作都是在kafka的目录里面执行的
创建Flume agent
使用Flume就是编写配置文件并且使配置文件生效,我们创建一个Flume agent的配置文件,接受产生的Logger数据,然后传输到Kafka的topic中。
进入Flume所在的文件,然后创建一个配置文件,命名为streaming.confcd $FLUME_HOME vim stream.conf
然后输入下面的配置文件
agent1.sources=avro-source agent1.channels=logger-channel agent1.sinks=kafka-sink # define source agent1.sources.avro-source.type=avro agent1.sources.avro-source.bind=0.0.0.0 agent1.sources.avro-source.port=41414 # define channels agent1.channels.logger-channel.type=memory # define sink agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafka-sink.kafka.topic = streamingtopic agent1.sinks.kafka-sink.kafka.bootstrap.servers = hadoop132:9092,hadoop133:9092,hadoop134:9092 agent1.sinks.kafka-sink.kafka.batchSize = 20 agent1.sinks.k1.kafka.requireAcks = 1 # merge agent1.sources.avro-source.channels=logger-channel agent1.sinks.kafka-sink.channel=logger-channel
其实,一开始可以先不用设置kafka sink。上述是最终的配置,一开始可以使用如下的配置文件进行测试,看产生的Log是否成功传输到Flume中。
agent1.sources=avro-source agent1.channels=logger-channel agent1.sinks=log-sink #define source agent1.sources.avro-source.type=avro agent1.sources.avro-source.bind=0.0.0.0 agent1.sources.avro-source.port=41414 #define channels agent1.channels.logger-channel.type=memory #define sink agent1.sinks.log-sink.type=logger # merge agent1.sources.avro-source.channels=logger-channel agent1.sinks.log-sink.channel=logger-channel
创建好agent后,启动agent
bin/flume-ng agent \ --conf conf \ --conf-file streaming.conf \ --name agent1 \ -Dflume.root.logger=INFO,console
创建LoggerGenerator日志产生器
LoggerGenerator就是简单的日志产生器,我们使用log4j和一个死循环不断地产生日志输出。
首先在Maven的项目中添加log4j的依赖<dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.6.0</version> </dependency>
然后再resources文件中添加log4j的配置文件log4j.properties
log4j.rootLogger=INFO,stdout,flume log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.target = System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = hadoop132 log4j.appender.flume.Port = 41414 log4j.appender.flume.UnsafeMode = true
上述关于Flume的配置是为了说明将日志输入到Flume中,配置文件中的主机名字要换成自己Flume所在主机。则LoggerGenerator的代码如下所示
/** * 模拟日志产生 */ public class LoggerGenerator { private static Logger logger=Logger.getLogger(LoggerGenerator.class.getName()); public static void main(String[] args) throws Exception{ int index=0; while (true){ Thread.sleep(1000); logger.info("value is: "+ index++); } } }
可使用之前讲诉的flume的测试配置,看日志是否输出到Flume中。
Spark streaming 程序
spark streaming的编写主要是与kafka的对接,将spark streaming的程序作为kafka的消费者,从kafka的streamingtopic中订阅消息并进行处理。代码如下
object KafkaWordCount { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf,Seconds(5)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop132:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "test", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("streamingtopic") val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) stream.map(_.value()).flatMap(_.split(" ")).count().print() ssc.start() ssc.awaitTermination() } }
因为我的kafka版本为0.10以上,所以直接采用Direct stream的方法,不同关于不同版本的Kafka如何对接Spark streaming可以直接查看官方文档
上述代码在idea中进行测试,直接运行就行,要打包的话改变一些参数就行,具体的可看github仓库。
上述程序只是简单的进行统计输出而已,这只是一个demo。整个流程的启动步骤
- 启动Flume agent1
bin/flume-ng agent \ --conf conf \ --conf-file streaming.conf \ --name agent1 \ -Dflume.root.logger=INFO,console
- 启动LoggerGenerator产生日志
- 启动KafkaWordCount程序处理产生的日志
上述整个过程中日志经由flume后传输到kafka中,再由KafkaWordCount进行消费。
具体的的主机名需要改成自己主机。也可以打包放到集群上进行测试。再pom.xml中添加下面的依赖
<build> <finalName>SparkStream</finalName> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <archive> <manifest> <mainClass>KafkaWordCount</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
将上诉KafkaWordCount代码改下面两处
"bootstrap.servers" -> args(0) val topics = Array(args(1))
然后点击idea右边的Maven,并点击Lifecycle中package即可打包,打包后将jar上传到集群中,按上述步骤执行即可。
整个项目的代码和配置文件在我的github中,具体jar包的启动说明也在其中。
-
flume kafka spark streaming
2017-10-11 14:26:44flume kafka spark streaming 安装flume 目前1.6 可能不支持Taildir(猜测) ,下载1.7/1.8版本下载地址 http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz 去官网自己找吧。 1....flume kafka spark streaming
安装flume 目前1.6 可能不支持Taildir(猜测) ,下载1.7/1.8版本下载地址
http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
去官网自己找吧。
1.先搭建flume
将下载的包解压 tar -zxvf **
1)配置conf
cp flume-env.sh.template flume-env.sh
配置java home(也可以不配置 环境变量配置的详细就可以省略)
这里我就没有配置,贴出/etc/profile 内容export JAVA_HOME=/usr/local/soft/jdk/jdk1.8.0_45 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export FLUME_HOME=/usr/local/soft/apache-flume-1.8.0-bin export FLUME_CONF_DIR=$FLUME_HOME/conf export FLUME_PATH=$FLUME_HOME/bin export PATH=$SCALA_HOME/bin:$SPARK_HOME/bin:$HADOOP_HOME/bin:${JAVA_HOME}/bin:$FLUME_HOME/bin:$PATH
2)关键配置
kafkanode.conf 配置如下#Agent flumeAgent.channels = c1 flumeAgent.sources = s1 flumeAgent.sinks = k1 #flumeAgent Spooling Directory Source #注(1) flumeAgent.sources.s1.type = TAILDIR flumeAgent.sources.s1.positionFile = /opt/apps/log4j/taildir_position.json flumeAgent.sources.s1.fileHeader = true #flumeAgent.sources.s1.deletePolicy =immediate #flumeAgent.sources.s1.batchSize =1000 flumeAgent.sources.s1.channels =c1 flumeAgent.sources.s1.filegroups = f1 f2 flumeAgent.sources.s1.filegroups.f1=/usr/logs/.*log.* flumeAgent.sources.s1.filegroups.f2=/logs/.*log.* #flumeAgent.sources.s1.deserializer.maxLineLength =1048576 #flumeAgent FileChannel #注(2) flumeAgent.channels.c1.type = file flumeAgent.channels.c1.checkpointDir = /var/flume/spool/checkpoint flumeAgent.channels.c1.dataDirs = /var/flume/spool/data flumeAgent.channels.c1.capacity = 200000000 flumeAgent.channels.c1.keep-alive = 30 flumeAgent.channels.c1.write-timeout = 30 flumeAgent.channels.c1.checkpoint-timeout=600 # flumeAgent Sinks #注(3) flumeAgent.sinks.k1.channel = c1 flumeAgent.sinks.k1.type = avro # connect to CollectorMainAgent flumeAgent.sinks.k1.hostname = data17.Hadoop flumeAgent.sinks.k1.port = 44444
kafka.conf 配置如下
#flumeConsolidationAgent flumeConsolidationAgent.channels = c1 flumeConsolidationAgent.sources = s1 flumeConsolidationAgent.sinks = k1 #flumeConsolidationAgent Avro Source #注(4) flumeConsolidationAgent.sources.s1.type = avro flumeConsolidationAgent.sources.s1.channels = c1 flumeConsolidationAgent.sources.s1.bind = data17.Hadoop flumeConsolidationAgent.sources.s1.port = 44444 flumeConsolidationAgent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink flumeConsolidationAgent.sinks.k1.topic = myflume flumeConsolidationAgent.sinks.k1.brokerList = data14.Hadoop:9092,data15.Hadoop:9092,data16.Hadoop:9092 flumeConsolidationAgent.sinks.k1.requiredAcks = 1 flumeConsolidationAgent.sinks.k1.batchSize = 20 flumeConsolidationAgent.sinks.k1.channel = c1 flumeConsolidationAgent.channels.c1.type = file flumeConsolidationAgent.channels.c1.checkpointDir = /var/flume/spool/checkpoint flumeConsolidationAgent.channels.c1.dataDirs = /var/flume/spool/data flumeConsolidationAgent.channels.c1.capacity = 200000000 flumeConsolidationAgent.channels.c1.keep-alive = 30 flumeConsolidationAgent.channels.c1.write-timeout = 30 flumeConsolidationAgent.channels.c1.checkpoint-timeout=600
kafka.conf 是主节点 启动命令
bin/flume-ng agent --conf conf --conf-file conf/kafka.conf --name flumeConsolidationAgent -Dflume.root.logger=DEBUG,console
kafkanode.conf 是副节点 可以有多个 启动命令
bin/flume-ng agent --conf conf --conf-file conf/kafkanode.conf --name flumeAgent -Dflume.root.logger=DEBUG,console
【副节点配置都一样】
2.kafka 配置
这里就不说了因为没有什么特殊的配置
用原有的kafka就行,新建一个topic 就行了 命令/usr/local/soft/kafka_2.11-0.9.0.1/bin/kafka-topics.sh --create --topic myflume --replication-factor 2 --partitions 5 --zookeeper data4.Hadoop:2181
3.spark streaming
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import kafka.serializer.StringDecoder import scala.collection.immutable.HashMap import org.apache.log4j.Level import org.apache.log4j.Logger object RealTimeMonitorStart extends Serializable { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN); Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR); val conf = new SparkConf().setAppName("stocker").setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) // Kafka configurations val topicMap = "myflume".split(",").map((_, 3)).toMap print(topicMap) val kafkaStrings = KafkaUtils.createStream(ssc,"data4.Hadoop:2181,data5.Hadoop:2181,data6.Hadoop:2181","myflumegroup",topicMap) val urlClickLogPairsDStream = kafkaStrings.flatMap(_._2.split(" ")).map((_, 1)) val urlClickCountDaysDStream = urlClickLogPairsDStream.reduceByKeyAndWindow( (v1: Int, v2: Int) => { v1 + v2 }, Seconds(60), Seconds(5)); urlClickCountDaysDStream.print(); ssc.start() ssc.awaitTermination() } }
目前就这些,有疑问评论留言。
-
flume kafka mysql_flume+kafka+hdfs疑问
2021-01-26 23:32:28本想搭建一个 flume+hdfs+kafka+storm+mysql 的日志实时分析和存储的系统,但是flume日志收集这块一直不通,查看flume的日志也没有报错,不知道该怎么解决了,求大家帮帮忙,贴出集群配置和配置文件如下:共5台机器...本想搭建一个 flume+hdfs+kafka+storm+mysql 的日志实时分析和存储的系统,但是flume日志收集这块一直不通,查看flume的日志也没有报错,不知道该怎么解决了,求大家帮帮忙,贴出集群配置和配置文件如下:
共5台机器:node1~node5,其中node3~node5为日志收集的agent,node1~node2为flume的collector,最终存储两份,一份到kafka,一份到hdfs。
agent的配置文件如下:
#def
agent.sources = src_spooldir
agent.channels = file memory
agent.sinks = collector_avro1 collector_avro2
# sources
agent.sources.src_spooldir.type = spooldir
agent.sources.src_spooldir.channels = file memory
agent.sources.src_spooldir.spoolDir = /data/flume/spoolDir
agent.sources.src_spooldir.selector.type = multiplexing
agent.sources.src_spooldir.fileHeader = true
# channels
agent.channels.file.type = file
agent.channels.file.checkpointDir = /data/flume/checkpoint
agent.channels.file.dataDirs = /data/flume/data
agent.channels.memory.type = memory
agent.channels.memory.capacity = 10000
agent.channels.memory.transactionCapacity = 10000
agent.channels.memory.byteCapacityBufferPercentage = 20
agent.channels.memory.byteCapacity = 800000
# sinks
agent.sinks.collector_avro1.type = avro
agent.sinks.collector_avro1.channel = file
agent.sinks.collector_avro1.hostname = node1
agent.sinks.collector_avro1.port = 45456
agent.sinks.collector_avro2.type = avro
agent.sinks.collector_avro2.channel = memory
agent.sinks.collector_avro2.hostname = node2
agent.sinks.collector_avro2.port = 4545
collector端的配置文件如下:
#def
agent.sources = src_avro
agent.channels = file memory
agent.sinks = hdfs kafka
# sources
agent.sources.src_avro.type = avro
agent.sources.src_avro.channels = file memory
agent.sources.src_avro.bind = node1
agent.sources.src_avro.port = 45456
agent.sources.src_avro.selector.type = replicating
# channels
agent.channels.file.type = file
agent.channels.file.checkpointDir = /data/flume/checkpoint
agent.channels.file.dataDirs = /data/flume/data
agent.channels.memory.type = memory
agent.channels.memory.capacity = 10000
agent.channels.memory.transactionCapacity = 10000
agent.channels.memory.byteCapacityBufferPercentage = 20
agent.channels.memory.byteCapacity = 800000
# sinks
agent.sinks.hdfs.type = hdfs
agent.sinks.hdfs.channel = file
agent.sinks.hdfs.hdfs.path = hdfs://node1/flume/events/%y-%m-%d/%H%M/%S
agent.sinks.hdfs.hdfs.filePrefix = log_%Y%m%d_%H
agent.sinks.hdfs.hdfs.fileSuffix = .txt
agent.sinks.hdfs.hdfs.useLocalTimeStamp = true
agent.sinks.hdfs.hdfs.writeFormat = Text
agent.sinks.hdfs.hdfs.rollCount = 0
agent.sinks.hdfs.hdfs.rollSize = 1024
agent.sinks.hdfs.hdfs.rollInterval = 0
agent.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka.channel = memory
agent.sinks.kafka.kafka.topic = test
agent.sinks.kafka.kafka.bootstrap.servers = node3:9092,node4:9092,node5:9092
agent.sinks.kafka.kafka.flumeBatchSize = 20
agent.sinks.kafka.kafka.producer.acks = 1
agent.sinks.kafka.kafka.producer.linger.ms = 1
agent.sinks.kafka.kafka.producer.compression.type = snappy
最终 hdfs和kafka都没有接收到数据。
-
windows10平台上测试flume kafkaSource->kafkaSink
2020-01-09 18:05:35windows10平台上测试flume kafkaSource->kafkaSink 1.安装kafka 1.1 下载官网安装包 :http://kafka.apache.org/downloads Binary downloads:下的某一个版本即可 选择合适的版本 1.2 解压到不包含中文... -
Flume kafkaChannel
2020-07-06 19:07:57flume的一个agent包括source、channel、sink,但是有时候其实可以不需要source或者是sink,比如在使用flume时,把flume监控的日志发送到kafka中,可以不用使用sink,把channel定义为kafkachannel,kafkachannel作为... -
flume kafka sink kafka对接flume简单实现详解
2020-10-26 21:33:391、flume的配置文件 kafka-sink.properties # Name the components on this agent # source:起一个别名 # properties文件它是java的配置文件,=左边就是键,=右边是值;键的开头都是以a1(就是flume的名字--agent的名字... -
flume kafka sink 抛出异常 Bootstrap Servers must be specified
2019-08-07 21:09:00使用flume kafka sink时, 启动flume抛出异常 Sink kafkasink has been removed due to an error during configuration org.apache.flume.conf.ConfigurationExceptio... -
flume kafka错误
2019-10-30 15:25:571、在flume日志中出现warn级别的error Error while fetching metadata with correlation id {} default-flume-topic=LEADER_NOT_AVAILABLE 解决:先在kafka中创建topic -
如何将Flume kafka source 插件从0.9升级到0.10.1
2018-01-09 17:11:00在使用flume的kafka插件为0.9版本,由于channel队列满后,导致心跳问题,进而导致flume kafka不接收消息, 所以打算升级到0.10版本,fluem首先需要做如下: 将flume中的lib包中kafka 0.9相关升级到0.10,(前提,... -
flume kafka storm mysql_搭建flume+kafka+storm实时流处理平台
2021-02-06 19:04:10软件主要版本号Flume:1.7.0Storm:1.1.0Kafka:2.10-0.9.0.1Zookeeper: 3.4.10reids : 3.2.83.主要软件下载4.安装java环境略ssh免密钥登陆配置--主要用于集群#创建账户useradd hadoopgroupadd hadooppasswd hadoop#... -
flume kafka storm
2019-10-09 20:02:41bin/kafka-server-start.sh config/server.porperties 启动kafka bin/kafka-topic.sh --zookeeper node2:2181,node3:2181,node4:2181 --create --replication-factor2 --partitions 3 --topic test 创建主题 bin/... -
Flume KafkaChannel的使用
2021-03-03 11:46:23Flume提供了Kafka Sink将数据写入Kafka,也提供了KafkaChannel使用Kafka作为Channel存储数据。 使用KafkaChannel既可以避免Memory Channel的内存溢出,又可以提供不错的性能。 另外当我们需要将数据写入Kafka时,... -
flume kafka sink
2019-03-09 16:03:59# 指定Agent的组件名称 a1.sources = r1 ...# 指定Flume source(要监听的路径) a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/dir #每行读取的大小限制 a1.sources.r1.deserializer.maxLine... -
解决 flume KafkaSink 启动后cpu占用100%的问题
2016-06-21 13:09:23解决 flume KafkaSink 启动后cpu占用100%的问题 Flume 版本 :1.6.0-cdh5.5.0 问题描述: 配置kafkasink,将实时数据发送到kafka。 Flume启动完成后,没有日志处理时,cpu使用率飙升到100% 当有日志数据处理时,... -
Flume kafka 简洁快速安装
2020-09-09 14:59:14Flume 下载flume1.8 1:将下载的flume包,解压到/opt目录中 cd /opt tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /opt mv apache-flume-1.8.0-bin flume 2:修改conf下的 flume-env.sh 配置文件,主要是JAVA_HOME变量... -
flume Kafka Channel 无法传输数据到kafka
2019-09-13 23:53:05文章目录第一个问题接下来紧接着就遇到第二个问题了接下来就来了第三... 启动之后发现没有kafka中没有对应的topic, 然后参看flume启动日志,发现1.5版本没有TailDir Source, 我就把flume的版本升级到了1.7,TailD... -
flume kafka2hdfs demo
2019-08-29 11:47:41做了个简单消费kafka数据到HDFS 配置文件名为:test0.conf 在flume conf目录启动 …/bin/flume-ng agent -n a1 -c ./conf -f ./test0.conf -Dflume.root.logger=INFO,console a1.sources = r1 a1.sinks = k1 a1.... -
Flume KafkaChannel数据重复问题
2020-04-23 16:51:19近期笔者在生产环境中使用Flume的hdfssink读取kafkachannel中的数据,然后写到hdfs文件中,出现了数据重复采集的状况,为此,开启了一次Flume数据传输流程的探索。 问题现象 最先发现问题是在hdfs中发现很多大小... -
安装Flume Kafka Sqoop Flink的记录
2020-09-20 23:02:041flume启动监听自己 bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console 2.flume监听集群启动命令 ./bin/flume-ng agent --conf ./conf/ --conf-... -
记录完全分布式开发zookeeper hadoop flume kafka hbase
2021-01-16 21:16:17datanode namenode kafka flume zookeeper nodemanager hadoop102 datanode resourcemanager kafka zookeeper nodemanager hadoop103 datanode secondarynamenode kafka zookeeper nodemana -
flume kafka-sink high cpu
2015-08-04 14:54:15flume sink到kfka时候,...一、flume的kafka的sink cpu太高分析: 1、获取flume的进程id [root@datanode conf]$ top top - 10:17:58 up 14 days, 18:09, 2 users, load average: 1.37, 1.11, 0.65 Tasks: 436 total,