-
在Windows上搭建Kafka
2020-03-09 00:14:10Kafka是流式计算中重要的数据源,我分享一下在本机Windows上搭建Kafka的经验。 一、下载、安装Kafka 访问Kafka的主页: Apache Kafkakafka.apache.org 进入其下载页面,截图如下: 选择相应的版本,这里...Kafka是流式计算中重要的数据源,我分享一下在本机Windows上搭建Kafka的经验。
一、下载、安装Kafka
访问Kafka的主页:
进入其下载页面,截图如下:
选择相应的版本,这里选择 kafka_2.11-2.4.0.tgz,进入下面的页面:
选择清华的镜像站点进行下载。
下载到本地后,将文件解压到 D:\kafka_2.11-2.4.0,该文件夹包括了所有相关的运行文件及配置文件,其子文件夹bin\windows 下放的是在Windows系统启动zookeeper和kafka的可执行文件,子文件夹config下放的是zookeeper和kafka的配置文件。
二、启动kafka服务
我们需要先后启动zookeeper和kafka服务。
它们都需要进入 D:\kafka_2.11-2.4.0 目录,然后再启动相应的命令。
cd D:\kafka_2.11-2.4.0
启动zookeeper服务,运行命令:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
启动kafka服务,运行命令:
bin\windows\kafka-server-start.bat config\server.properties
三、创建Topic,显示数据
Kafka中创建一个Topic,名称为iris
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic iris
创建成功后,可以使用如下命令,显示所有Topic的列表:
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
显示结果为
iris
然后,我们通过Alink的 Kafka SinkStreamOp可以将iris数据集写入该Topic。这里不详细展开,有兴趣的读者可以参阅如下的文章。
Alink品数:Alink连接Kafka数据源(Python版本)zhuanlan.zhihu.com
Alink品数:Alink连接Kafka数据源(Java版本)zhuanlan.zhihu.com
使用如下命令,读取(消费)topic iris中的数据:
bin\windows\kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic iris --from-beginning
显示结果如下,略去了中间的大部分数据:
{"sepal_width":3.4,"petal_width":0.2,"sepal_length":4.8,"category":"Iris-setosa","petal_length":1.6} {"sepal_width":4.1,"petal_width":0.1,"sepal_length":5.2,"category":"Iris-setosa","petal_length":1.5} {"sepal_width":2.8,"petal_width":1.5,"sepal_length":6.5,"category":"Iris-versicolor","petal_length":4.6} {"sepal_width":3.0,"petal_width":1.8,"sepal_length":6.1,"category":"Iris-virginica","petal_length":4.9} {"sepal_width":2.9,"petal_width":1.8,"sepal_length":7.3,"category":"Iris-virginica","petal_length":6.3} ........... {"sepal_width":2.2,"petal_width":1.0,"sepal_length":6.0,"category":"Iris-versicolor","petal_length":4.0} {"sepal_width":2.4,"petal_width":1.0,"sepal_length":5.5,"category":"Iris-versicolor","petal_length":3.7} {"sepal_width":3.1,"petal_width":0.2,"sepal_length":4.6,"category":"Iris-setosa","petal_length":1.5} {"sepal_width":3.4,"petal_width":0.2,"sepal_length":4.8,"category":"Iris-setosa","petal_length":1.9} {"sepal_width":2.9,"petal_width":1.4,"sepal_length":6.1,"category":"Iris-versicolor","pe
-
Windows下搭建kafka环境
2018-07-18 10:32:47Windows下搭建kafka环境 文章转自:https://blog.csdn.net/u010054969/article/details/70241478 https://blog.csdn.net/qq_32485573/article/details/54562237?locationNum=5&fps=1 注意:请确保本地...Windows下搭建kafka环境
文章转自:https://blog.csdn.net/u010054969/article/details/70241478
https://blog.csdn.net/qq_32485573/article/details/54562237?locationNum=5&fps=1
注意:请确保本地Java环境变量配置成功
1.安装Zookeeper
Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper
1.1 下载安装文件: http://mirror.bit.edu.cn/apache/zookeeper/
1.2 解压文件(本文解压到 D:\zookeeper-3.4.8)
1.3 打开D:\zookeeper-3.4.8\conf,把zoo_sample.cfg重命名成zoo.cfg
1.4 从文本编辑器里打开zoo.cfg
1.5 修改dataDir和dataLogDir保存路径
dataDir=D:\data\logs\zookeeper
dataLogDir=D:\data\logs\zookeeper
1.6 添加如下系统变量:ZOOKEEPER_HOME: D:\zookeeper-3.4.8
Path: 在现有的值后面添加;%ZOOKEEPER_HOME%\bin;
1.7 运行Zookeeper: 打开cmd然后执行zkserver
命令。如果打印以下信息则表示zookeeper已经安装成功并运行在2181端口。2.安装并运行Kafka
2.1 下载安装文件: http://kafka.apache.org/downloads.html
2.2 解压文件(本文解压到 D:\kafka_2.11-0.10.2.0)
2.3 打开D:\kafka_2.11-0.10.2.0\config\ server.properties
2.4 把 log.dirs的值改成 log.dirs=D:\data\logs\kafka
2.5 D:\kafka_2.11-0.10.2.0\bin文件夹下的.sh命令脚本是在shell下运行的,此文件夹下还有个 windows文件夹,里面是windows下运行的.bat命令脚本
2.6 在D:\kafka_2.11-0.10.2.0文件夹中”Shift+鼠标右键”点击空白处打开命令提示窗口
2.7 输入并执行一下命令以打开kafka:.\bin\windows\kafka-server-start.bat .\config\server.properties
显示的信息如下,则表示正常运行
3.创建topics
3.1在D:\kafka_2.11-0.10.2.0\bin\windows文件夹中”Shift+鼠标右键”点击空白处打开命令提示窗口kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
4.打开一个Producer
4.1在D:\kafka_2.11-0.10.2.0\bin\windows文件夹中”Shift+鼠标右键”点击空白处打开命令提示窗口kafka-console-producer.bat --broker-list localhost:9092 --topic test
5.打开一个Consumer
5.1在D:\kafka_2.11-0.10.2.0\bin\windows文件夹中”Shift+鼠标右键”点击空白处打开命令提示窗口kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
注意:以上打开的窗口不要关闭
然后就可以在Producer控制台窗口输入消息了。在消息输入过后,很快Consumer窗口就会显示出Producer发送的消息:转另外一篇:
近期在搞kafka,在Windows环境搭建的过程中遇到一些问题,把具体的流程几下来防止后面忘了。
准备工作:
1.安装jdk环境
http://www.oracle.com/technetwork/java/javase/downloads/index.html
2.下载kafka的程序安装包:
http://kafka.apache.org/downloads
解压文件提取出里面的文件
在cmd命令行打开至根目录下,内容如下:
(1).bin目录下存放的是程序运行时使用的脚本文件,window平台是一个独立的文件夹里面存放着 .bat 文件,bin的目录下存放的是 Linux 平台使用的 .sh 的shell脚本,在window平台上用不到,嫌麻烦可以删了。
(2).config目录下存放的是一些程序运行的配置文件,在后期自定义使用kafka的时候需要修改里面的文件内容。
(3).libs目录是打包好的jar包,这个版本自带了zookeeper的jar包,所以在安装的过程中不需要再在本地安装zookeeper了。- 启动zookeeper:
在正常启动zoopkeeper之前需要修改zookeeper.properties的文件内容,将其data的输出目录指定一下,可自行创建一个文件夹如下:
然后启动:
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
正常启动后他的状态是这样的:
3. 启动kfaka服务:
在启动前,任然需要修改server.properties中log.dir的配置目录,
修改后,启动服务:
4. 创建一个主题:bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kjTest
可以使用如下命令查看创建的主题列表:
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
5. 启动生产者:bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic kjTest
此时可以从控制台输入信息,待消费者启动后可接收到生产者发布的消息。
6. 启动消费者:bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kjTest --from-beginning
此时便能看到发布出去的消息了
7. 在后期还可以启动多个集群服务,同时进行操作,修改server.properties中的内容:便可启动,与上相似。config/server-1.properties: broker.id=<new id> listeners=PLAINTEXT://:<new port> log.dir=<you log dir>
操作步骤:
1).分别启动需要的kafka服务
2). 创建主题,设置分区数目
可以使用下面的命令查看是否创建成功,和具体的分区
3). 分别启动生产者消费者
4). 可以查看下当前工作状态
查看得当前服务使用的是broker编号为2的服务,但我们将其down掉后
再次执行消息发送任务:
消息仍能发送成功,命令行中会提示一个服务已关闭。
就查看当前状态:
此时的leader换成了broker 0 - 启动zookeeper:
-
单机 搭建kafka集群 本地_kafka教程-安装部署kafka_2.13-2.6.0(windows单机)
2021-01-13 05:20:15下载本文采用 kafka_2.13-2.6.0 版本,2.13 是scala版本号,2.6.0 是kafka版本号zookeeperkafka依赖zookeeperkafka内置了zookeeper,在 kafka_2.13-2.6.0\libs 目录下有zookeeper的jar包。但一般不用内置的kafka_...下载
本文采用 kafka_2.13-2.6.0 版本,2.13 是scala版本号,2.6.0 是kafka版本号
zookeeper
kafka依赖zookeeper
kafka内置了zookeeper,在 kafka_2.13-2.6.0\libs 目录下有zookeeper的jar包。
但一般不用内置的
kafka_2.13-2.6.0 版本 需要zookeeper 3.5.8 版本
查看方式:
安装、启动
安装kafka
解压缩即可
配置kafka
kafka配置文件较多,在 kafka_2.13-2.6.0目录\config 目录下
server.properties:kafka服务配置文件
consumer.properties:消费者配置文件,用于命令操作,测试
producer.properties:生产者配置文件,用于命令操作,测试
zookeeper.properties:内置zookeeper配置文件,一般不用内置的
修改server.properties
kafka服务配置文件
修改broker id# kafka集群中broker的id,必须是整数,且broker之间唯一
broker.id=0
配置数据文件夹
在 kafka_2.13-2.6.0目录目录下创建 logs 文件夹
修改配置如下:
# kafka数据文件
log.dirs=D:\\devtools\\kafka_2.13-2.6.0\\logs
这里名字是 log.dir 其实不是日志,是数据文件
配置zookeeper
将zookeeper服务器ip、端口号配置在下面
zookeeper.connect=localhost:2181
配置kafka端口号
默认是9092
#listeners=PLAINTEXT://:9092
其他配置#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
启动程序
windows启动程序在 kafka_2.13-2.6.0\bin\windows 目录下
kafka-server-start.bat:启动kafka服务
kafka-server-stop.bat:关闭kafka服务
kafka-topics.bat:操作topic
kafka-console-consumer.bat:控制台操作消费者
kafka-console-producer.bat:控制台操作生产者
启动kafka
需要先启动 zookeeper
在 kafka_2.13-2.6.0 目录下,按下图操作:
打开 Powershell 或 cmd ,执行下面命令:
.\bin\windows\kafka-server-start.bat .\config\server.properties
看到下面界面表示启动成功:
关闭kafka.\bin\windows\kafka-server-stop.bat .\config\server.properties
查看zookeeper
启动zookeeper客户端,查看节点:
ls /
结果如下:
-
Windows系统搭建Kafka C++ 客户端实现消息avro序列化发送
2019-06-25 21:20:49C++搭建kafka客户端用了我半个月的时间,avro序列化用了半个月的时间。相信好多人也是苦于没有系统性的参考耽误了好多工夫,我写这篇文档希望能给别人一些辅助。 开发准备工作: 本地装了一套kafka的环境: 序...- 需求:
在原在水力发电厂在线监测VC++6.0软件中实现kafka C++客户端,将原始波形信号及计算指标点通过消息avro序列化发送给大数据平台侧kafka集群。
C++搭建kafka客户端用了我半个月的时间,avro序列化用了半个月的时间。相信好多人也是苦于没有系统性的参考耽误了好多工夫,我写这篇文档希望能给别人一些辅助。
- 开发准备工作:
本地装了一套kafka的环境:
序号 名称 备注 下载链接 1 JDK Java开发环境 https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html 2
Zookeeper
分布式应用程序协调服务
http://mirror.bit.edu.cn/apache/zookeeper/
3
Kafka
Kafka开发环境
http://kafka.apache.org/downloads.html
- 操作流程:
- 下载或拷贝JDK安装包:根据计算机系统类型下载对应位数的JDK版本,本人使用的是jdk1.8.0_73版本。安装程序逐步执行,没有特殊要求;
- 系统环境变量:右键计算机->属性,在左边点击“高级系统设置”,点击 高级->环境变量,这是点击下面的系统环境变量-> 新建(系统环境变量)。变量名设置为:JAVA_HOME。变量值设置为:JDK的安装路径(注:bin的上一级路径);
- 设置PATH:还是在系统环境变量设置界面,找到系统变量名为Path的变量,双击修改。在变量值尾部添加括号内的内容[;C:\Program Files (x86)\Java\jdk1.8.0_73\bin](注:若添加前变量值尾部有‘;’则输入内容无需填写‘;’。即保证JDK环境与之前环境使用;分割开);
- 验证:进入windows终端[窗口+R],输入命令:java -version。若出现类似下列JDK版本信息则证明安装JDK成功;
- zookeeper部署
- kafka运行依赖于zookeeper,因此在安装kafka前需将zookeeper安装完成。运行时同样先运行zookeeper再运行kafka;
- 解压zookeeper压缩包:将zookeeper解包后放在[D:\]路径下;
- 重命名zoo配置文件:将zookeeper\ conf\zoo_sample.cfg重命名为zoo.cfg;
- 修改zoo.cfg配置文件:将原文件中dataDir及dataLogDir所在行修改为:
dataDir=D:\data\logs\zookeeper
dataLogDir=D:\data\logs\zookeeper (若原配置文件中无此行则将此行写在dataDir下面)- 系统环境变量:右键计算机->属性,在左边点击“高级系统设置”,点击 高级->环境变量,这是点击下面的系统环境变量-> 新建(系统环境变量)。变量名设置为:ZOOKEEPER_HOME。变量值设置为:D:\zookeeper-3.4.14;
- 设置PATH:还是在系统环境变量设置界面,找到系统变量名为Path的变量,双击修改。在变量值尾部添加括号内的内容[;%ZOOKEEPER_HOME%\bin;](注:若添加前变量值尾部有‘;’则输入内容无需填写‘;’。即保证zookeeper环境与之前环境使用;分割开);
- 运行zookeeper:进入windows终端[窗口+R],输入命令:zkserver运行zookeeper服务程序。若出现类似下列描述信息则证明zookeeper大致安装成功;
- kafka部署
- 解压kafka压缩包:将kafka解包后放在[D:\]路径下;
修改kafka服务端配置文件:打开D:\ kafka_2.12-2.2.0\config\ server.properties。log.dirs的值改成 log.dirs=D:\data\logs\kafka
- 验证测试:
- 1、kafka客户端测试:
- 1.1、运行zookeeper服务器:[窗口+R]打开终端,输入命令:zkserver 回车;
- 1.2、执行kafka:在D:\kafka_2.12-2.2.0文件夹上[shift+右键]打开命令行执行kafka,输入命令:.\bin\windows\kafka-server-start.bat .\config\server.properties;
- 1.3、创建名为test的topic:在D:\ kafka_2.12-2.2.0\bin\windows文件夹上[shift+右键]打开命令行,输入命令:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitons 1 --topic test;
- 1.4、打开生产者Producer,向名为test的topic推送消息:在D:\kafka_2.12-2.2.0\bin\windows文件夹上[shift+右键]打开命令行,输入:kafka-console-producer.bat --broker-list localhost:9092 --topic test;
- 1.5、打开消费者Consumer,从名为test的topic读取消息:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning;
- 1.6、发送数据:在Producer窗口输入框输入字符及数字回车,理论上会在Consumer界面弹出所发送的内容,大致如下;
- 2、测试程序验证
使用testkafka.cpp,加载librdkafka动态链接库。代码大致如下:
#include "pch.h" #include <stdio.h> #include <signal.h> #include <string.h> #include <inttypes.h> #include "..\..\rdkafka\rdkafka.h" #pragma comment(lib, "librdkafka.lib") static int run = 1; static void stop(int sig) { run = 0; fclose(stdin); /* abort fgets() */ } static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { if (rkmessage->err) fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err)); else fprintf(stderr, "%% Message delivered (%zd bytes, " "partition %d)\n", rkmessage->len, rkmessage->partition); } int main(int argc, char **argv) { rd_kafka_t *rk; /* Producer instance handle */ rd_kafka_topic_t *rkt; /* Topic object */ rd_kafka_conf_t *conf; /* Temporary configuration object */ char errstr[512]; /* librdkafka API error reporting buffer */ char buf[512]; /* Message value temporary buffer */ const char *brokers; /* Argument: broker list */ const char *topic; /* Argument: topic to produce to */ if (argc != 3) { fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]); return 1; } brokers = argv[1]; topic = argv[2]; conf = rd_kafka_conf_new(); if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); return 1; } rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr, "%% Failed to create new producer: %s\n", errstr); return 1; } rkt = rd_kafka_topic_new(rk, topic, NULL); if (!rkt) { fprintf(stderr, "%% Failed to create topic object: %s\n", rd_kafka_err2str(rd_kafka_last_error())); rd_kafka_destroy(rk); return 1; } signal(SIGINT, stop); fprintf(stderr, "%% Type some text and hit enter to produce message\n" "%% Or just hit enter to only serve delivery reports\n" "%% Press Ctrl-C or Ctrl-D to exit\n"); while (run && fgets(buf, sizeof(buf), stdin)) { size_t len = strlen(buf); if (buf[len - 1] == '\n') /* Remove newline */ buf[--len] = '\0'; if (len == 0) { /* Empty line: only serve delivery reports */ rd_kafka_poll(rk, 0/*non-blocking */); continue; } retry: if (rd_kafka_produce( rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buf, len, NULL, 0, NULL) == -1) { fprintf(stderr, "%% Failed to produce to topic %s: %s\n", rd_kafka_topic_name(rkt), rd_kafka_err2str(rd_kafka_last_error())); if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) { rd_kafka_poll(rk, 1000/*block for max 1000ms*/); goto retry; } } else { fprintf(stderr, "%% Enqueued message (%zd bytes) " "for topic %s\n", len, rd_kafka_topic_name(rkt)); } rd_kafka_poll(rk, 0/*non-blocking*/); } fprintf(stderr, "%% Flushing final messages..\n"); rd_kafka_flush(rk, 10 * 1000 /* wait for max 10 seconds */); rd_kafka_topic_destroy(rkt); rd_kafka_destroy(rk); return 0; }
- 操作测试验证1,步骤1/2/3/5;
- 运行testkafka程序,输入命令 localhost test运行结果如下,证明消息发送成功。
测试的时候可能出现的问题:
- broker may not available:这是因为kafka-server-start.bat没执行成功。
- count not reserve enouth space 1048576 object heap:
找到JDK路径下的bin下运行jvisualvm.exe。 查看JDK堆栈最大空间。例本机256M
kafka路径下\bin\windows\kafka-server-start.bat 中修改 64位系统堆栈空间修改为:KAFKA_HEAP_OPTS=-Xmx256M -Xms128M;
重新执行kafka-server-start.bat;
至此kafka环境搭建好了。下一步搭建avro C++环境。avro的编译环境比较费劲,编译了一周多最后还是我们老大任总搞定的。
装avro C++可以参考官方手册:https://avro.apache.org/docs/current/api/cpp/html/index.html
先装CMAKE,我用的编译器是CMAKE版本是3.14.4;
装BOOST库,我用的是BOOST版本是1.70.0,参数中选用VS2017对应版本的编译器,静态链接,用boost库编译出来几个lib,分别为:boost_filesystem.lib、boost_iostreams.lib、boost_program_options.lib、boost_system.lib。正常编译完boost以后库的名称不是这个,自己根据自己要用debug版还是release版还是,然后库用的静态链接还是动态链接。
用cmake打开avro C++的原代码生成一个VS2017的工程文件,然后用VS2017编译生成avro-cpp.dll。(中途有些麻烦,有些地方是我们老大帮我弄的,主要是boost库的配置和zlib的库)
我这个项目的需求是,大数据平台有一套avro 的消息格式,这个格式叫做schema,是一个形式为JSON字符串的一个avro格式。
需要将数据序列化成schema通过kafka客户端发送给大数据平台。在avro-cpp.sln的工程里有一个工程叫做avrogencpp的工程,把这个工程编译生成一个exe,然后子命令行输入命令:avrogencpp -i XXX.JSON -o XXX.h -n c
会生成一个头文件,需要把数据存进这个头文件中包含的类中然后把数据序列化发送。
但是C++版本的avro与java的有点区别,java序列化之后数据前面会有一个头部信息,头部信息包含三组信息:
"O"“b”“j”“0x01”四个字节;
JSON的schema;
同步位;
avro C++序列化时虽然没有这个头部信息,但是c++版本序列化时存成序列化文件时会包含头部信息,可以用现成的api。这样就能实现C++版本客户端发送avro序列化的全部功能了。
代码奉上:.h
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef GMDATA_H_487324861__H_ #define GMDATA_H_487324861__H_ #include <sstream> #include "boost/any.hpp" #include "avro/Specific.hh" #include "avro/Encoder.hh" #include "avro/Decoder.hh" namespace c { struct GMData_json_Union__0__ { private: size_t idx_; boost::any value_; public: size_t idx() const { return idx_; } float get_float() const; void set_float(const float& v); std::string get_string() const; void set_string(const std::string& v); GMData_json_Union__0__(); }; struct RtValue { typedef GMData_json_Union__0__ value_t; int64_t time; value_t value; RtValue() : time(int64_t()), value(value_t()) { } }; struct RtData { std::string code; std::vector<RtValue > data; RtData() : code(std::string()), data(std::vector<RtValue >()) { } }; inline float GMData_json_Union__0__::get_float() const { if (idx_ != 0) { throw avro::Exception("Invalid type for union"); } return boost::any_cast<float >(value_); } inline void GMData_json_Union__0__::set_float(const float& v) { idx_ = 0; value_ = v; } inline std::string GMData_json_Union__0__::get_string() const { if (idx_ != 1) { throw avro::Exception("Invalid type for union"); } return boost::any_cast<std::string >(value_); } inline void GMData_json_Union__0__::set_string(const std::string& v) { idx_ = 1; value_ = v; } inline GMData_json_Union__0__::GMData_json_Union__0__() : idx_(0), value_(float()) { } } namespace avro { template<> struct codec_traits<c::GMData_json_Union__0__> { static void encode(Encoder& e, c::GMData_json_Union__0__ v) { e.encodeUnionIndex(v.idx()); switch (v.idx()) { case 0: avro::encode(e, v.get_float()); break; case 1: avro::encode(e, v.get_string()); break; } } static void decode(Decoder& d, c::GMData_json_Union__0__& v) { size_t n = d.decodeUnionIndex(); if (n >= 2) { throw avro::Exception("Union index too big"); } switch (n) { case 0: { float vv; avro::decode(d, vv); v.set_float(vv); } break; case 1: { std::string vv; avro::decode(d, vv); v.set_string(vv); } break; } } }; template<> struct codec_traits<c::RtValue> { static void encode(Encoder& e, const c::RtValue& v) { avro::encode(e, v.time); avro::encode(e, v.value); } static void decode(Decoder& d, c::RtValue& v) { if (avro::ResolvingDecoder *rd = dynamic_cast<avro::ResolvingDecoder *>(&d)) { const std::vector<size_t> fo = rd->fieldOrder(); for (std::vector<size_t>::const_iterator it = fo.begin(); it != fo.end(); ++it) { switch (*it) { case 0: avro::decode(d, v.time); break; case 1: avro::decode(d, v.value); break; default: break; } } } else { avro::decode(d, v.time); avro::decode(d, v.value); } } }; template<> struct codec_traits<c::RtData> { static void encode(Encoder& e, const c::RtData& v) { avro::encode(e, v.code); avro::encode(e, v.data); } static void decode(Decoder& d, c::RtData& v) { if (avro::ResolvingDecoder *rd = dynamic_cast<avro::ResolvingDecoder *>(&d)) { const std::vector<size_t> fo = rd->fieldOrder(); for (std::vector<size_t>::const_iterator it = fo.begin(); it != fo.end(); ++it) { switch (*it) { case 0: avro::decode(d, v.code); break; case 1: avro::decode(d, v.data); break; default: break; } } } else { avro::decode(d, v.code); avro::decode(d, v.data); } } }; } #endif
.cpp
int GKafkaPushFeild(const char *BrokersName, const char *TopicName, unsigned char *Data, int Size, int FeildNum, char *Return) { if ((BrokersName == NULL) || (TopicName == NULL) || (Data == NULL)) { return -1; } int l_iRetCode = 0; int l_iIndex = 0; std::vector<uint8_t> l_sSchema; c::RtValue l_value; std::vector<c::RtData > l_RtData; __int64 l_lTime = 0; float l_fTemp = 0.0F; int l_iFeildNum = FeildNum; unsigned char *l_cpTempData = Data; unsigned char *l_cStartData = Data; char l_cTempCode[CODENUM + 1] = { 0 }; memcpy(&l_lTime, l_cpTempData, 8); l_cpTempData += 8; char filename[1024] = "tempData.avro"; remove(filename); avro::ValidSchema writerSchema(makeValidSchema(sch)); typedef std::pair<avro::ValidSchema, c::RtData > Pair; avro::DataFileWriter<c::RtData> df(filename, writerSchema,100); for (int i = 0; i < l_iFeildNum; i++) { c::RtData l_cRtData; c::RtValue l_cRtValue; memcpy(l_cTempCode, l_cpTempData, CODENUM); l_cTempCode[CODENUM - 1] = '\0'; l_cpTempData += CODENUM; memcpy(&l_fTemp, l_cpTempData, DATANUM); l_cpTempData += DATANUM; l_cRtData.code = l_cTempCode; //memcpy(l_RtData[l_iIndex].code.c_str,l_cTempCode,CODENUM); //l_RtData[l_iIndex].code = l_cTempCode; l_cRtValue.time = l_lTime; l_cRtValue.value.set_float(l_fTemp); l_cRtData.data.push_back(l_cRtValue); l_RtData.push_back(l_cRtData); //Pair l_pair(writerSchema, l_cRtData); df.write(l_cRtData); } df.close(); FILE *l_fp = NULL; char l_char = 'a'; int l_iReadNum = 0; std::string strContent; errno_t l_err = fopen_s(&l_fp, filename, "rb"); if (l_err < 0) { return -1; } else { fseek(l_fp, 0, SEEK_END); l_iReadNum = ftell(l_fp); if (0 == l_iReadNum) { l_iReadNum = 0; strContent = ""; fclose(l_fp); return 1; } strContent.resize(l_iReadNum); fseek(l_fp, 0, SEEK_SET); fread((char*)&strContent[0], l_iReadNum, 1, l_fp); } fclose(l_fp); remove(filename); return 0; }
从工程里面粘出来的,需要自己手动改一下生成一个动态链接库给VC6.0调用。希望对你有帮助!
-
搭建 Kafka-0.10.2 源码阅读环境及 Windows 本地运行
2020-01-31 09:39:52搭建 Kafka-0.10.2 源码阅读环境及 Windows 本地运行一、版本信息二、构建Kafka源码环境三、配置Kafka源码环境构建 bin 包 一、版本信息 Kafka:0.10.2、Scala:2.10.6、Java:1.8.0_221、IntelliJ IDEA:2019.2、... -
Windows系统上搭建kafka环境
2021-01-20 13:12:04首先,本地Java环境变量配置成功 1.安装Zookeeper Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper 1.1 下载安装文件: http://mirror.bit.edu.cn/apache/zookeeper/ 1.2 解压文件... -
搭建kafka本地运行环境 - Windows
2020-05-17 16:39:58由于kafka依赖java,Zookeeper,故在之前需要安装java,Zookeeper 安装java:略 安装Zookeeper: 下载地址:https://zookeeper.apache.org/releases.html,并解压到相应文件夹下面 安装Kafka: 下载地址:... -
windows下kafka本地搭建工具包(全).7z
2020-07-28 16:20:10windows 系统下搭建kafka超详细教程 + 搭建所需要的软件包,包含: jdk-13_windows-x64_bin apache-zookeeper-3.6.1-bin kafka_2.12-2.5.0 -
Spark Streaming 实战(1)搭建kafka+zookeeper+spark streaming 的windows本地开发环境
2017-05-16 09:12:11搭建kafka+zookeeper+spark streaming 本地开发环境 暂无kafka,zookpeer集群开发环境,先搭建本地的在线实时计算测试环境1,安装配置zookeeper本地开发环境:下载zookeeper,下载地址 : ...\1.BeiJingSpark\zooke -
windows下IntelliJ IDEA搭建kafka源码环境
2017-12-05 20:45:00下面就来演示如何在windows环境下来编译kafka源码,并通过IntelliJ IDEA开发工具搭建kafka的源码环境,以方便在本地通过debug调试来研究kafka的内部实现机制。 具体步骤: (1)安装jdk,版本为1.8.0_131,配置... -
windows 单机版 kafka 搭建
2018-12-09 23:08:20最近项目中使用了kafka作为消息中间件进行服务间数据通信;...所以就本地搭个单机版的 kafka自己玩玩... kafka 2.11的安装包就留个百度云地址了..... https://pan.baidu.com/s/1L_9bbieTSsjWAe_3JLzN3w 自... -
windows本机搭建使用kafka
2020-07-04 18:54:41本文将本地kafka+springboot服务搭建起来 首先下载kafka和zookeeper kafka_2.12-1.1.0 下载地址:http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz zookeeper-3.4.12 下载地址:... -
Kafka基本原理(一)windows环境下kafka伪分布式集群环境搭建
2019-04-17 11:26:10为了本地测试,又不得不在windows环境下,搭建kafka伪分布式集群,记录一下过程 注意:以下所有目录,都不要出现空格!!! 伪分布式集群节点个数配置:zookeeper 和 kafka 分别3个节点 1.前期准备 JDK 1.8 ... -
搭建kafka最新版本集群
2021-03-14 20:44:39搭建单节点以及如何CURD主题topic请看我上一篇文章:https://blog.csdn.net/qq_42390636/article/details/114802694 搭建集群 ...首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替 -
windows安装kafka_超详细,Windows系统搭建Flink官方练习环境
2020-11-30 10:29:38如何快速的投入到Flink的学习当中,很多人在...本文将利用Flink的官方练习环境,在本地Windows系统中快速的搭建Flink环境,并详细的记录整个搭建过程。Flink的环境搭建需要一定的时间,有多种方法可以在各种环境中... -
Windows平台kafka环境的搭建
2020-03-24 17:02:16注意:请确保本地Java环境变量配置成功 1.安装Zookeeper Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper 1.1 下载安装文件: http://mirror.bit.edu.cn/apache/zookeeper/ 1.2 ... -
windows kafka 环境搭建踩坑记
2019-03-18 14:04:00Windows 10 JDK1.8.0_171 zookeeper-3.4.8/ kafka_2.11-0.10.0.1.tgz 点击链接进行下载 1. JDK安装和环境搭建 自行百度。 2. zookeeper 安装和运行 a. 点击上方链接直接下载,或者有其他链接,下载此...