精华内容
下载资源
问答
  • Windows搭建Kafka

    2020-03-09 00:14:10
    Kafka是流式计算中重要的数据源,我分享一下在本机Windows搭建Kafka的经验。 一、下载、安装Kafka 访问Kafka的主页: Apache Kafka​kafka.apache.org 进入其下载页面,截图如下: 选择相应的版本,这里...

    Kafka是流式计算中重要的数据源,我分享一下在本机Windows上搭建Kafka的经验。

    一、下载、安装Kafka

    访问Kafka的主页:

    Apache Kafka​kafka.apache.org图标

    进入其下载页面,截图如下:

    选择相应的版本,这里选择 kafka_2.11-2.4.0.tgz,进入下面的页面:

    选择清华的镜像站点进行下载。

    下载到本地后,将文件解压到 D:\kafka_2.11-2.4.0,该文件夹包括了所有相关的运行文件及配置文件,其子文件夹bin\windows 下放的是在Windows系统启动zookeeper和kafka的可执行文件,子文件夹config下放的是zookeeper和kafka的配置文件。

     

    二、启动kafka服务

    我们需要先后启动zookeeper和kafka服务。

    它们都需要进入 D:\kafka_2.11-2.4.0 目录,然后再启动相应的命令。

    cd D:\kafka_2.11-2.4.0

    启动zookeeper服务,运行命令:

    bin\windows\zookeeper-server-start.bat config\zookeeper.properties

    启动kafka服务,运行命令:

    bin\windows\kafka-server-start.bat config\server.properties

    三、创建Topic,显示数据

    Kafka中创建一个Topic,名称为iris

    bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic iris

    创建成功后,可以使用如下命令,显示所有Topic的列表:

    bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 

    显示结果为

    iris

    然后,我们通过Alink的 Kafka SinkStreamOp可以将iris数据集写入该Topic。这里不详细展开,有兴趣的读者可以参阅如下的文章。

    Alink品数:Alink连接Kafka数据源(Python版本)​zhuanlan.zhihu.com图标Alink品数:Alink连接Kafka数据源(Java版本)​zhuanlan.zhihu.com图标

     

    使用如下命令,读取(消费)topic iris中的数据:

     bin\windows\kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic iris --from-beginning

    显示结果如下,略去了中间的大部分数据:

    {"sepal_width":3.4,"petal_width":0.2,"sepal_length":4.8,"category":"Iris-setosa","petal_length":1.6}
    {"sepal_width":4.1,"petal_width":0.1,"sepal_length":5.2,"category":"Iris-setosa","petal_length":1.5}
    {"sepal_width":2.8,"petal_width":1.5,"sepal_length":6.5,"category":"Iris-versicolor","petal_length":4.6}
    {"sepal_width":3.0,"petal_width":1.8,"sepal_length":6.1,"category":"Iris-virginica","petal_length":4.9}
    {"sepal_width":2.9,"petal_width":1.8,"sepal_length":7.3,"category":"Iris-virginica","petal_length":6.3}
    ...........
    {"sepal_width":2.2,"petal_width":1.0,"sepal_length":6.0,"category":"Iris-versicolor","petal_length":4.0}
    {"sepal_width":2.4,"petal_width":1.0,"sepal_length":5.5,"category":"Iris-versicolor","petal_length":3.7}
    {"sepal_width":3.1,"petal_width":0.2,"sepal_length":4.6,"category":"Iris-setosa","petal_length":1.5}
    {"sepal_width":3.4,"petal_width":0.2,"sepal_length":4.8,"category":"Iris-setosa","petal_length":1.9}
    {"sepal_width":2.9,"petal_width":1.4,"sepal_length":6.1,"category":"Iris-versicolor","pe
    展开全文
  • Windows搭建kafka环境

    2018-07-18 10:32:47
    Windows搭建kafka环境 文章转自:https://blog.csdn.net/u010054969/article/details/70241478 https://blog.csdn.net/qq_32485573/article/details/54562237?locationNum=5&fps=1 注意:请确保本地...

    Windows下搭建kafka环境

    文章转自:https://blog.csdn.net/u010054969/article/details/70241478

    https://blog.csdn.net/qq_32485573/article/details/54562237?locationNum=5&fps=1

    注意:请确保本地Java环境变量配置成功

    1.安装Zookeeper 
    Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper 
    1.1 下载安装文件: http://mirror.bit.edu.cn/apache/zookeeper/ 
    1.2 解压文件(本文解压到 D:\zookeeper-3.4.8) 
    1.3 打开D:\zookeeper-3.4.8\conf,把zoo_sample.cfg重命名成zoo.cfg 
    1.4 从文本编辑器里打开zoo.cfg 
    1.5 修改dataDir和dataLogDir保存路径 
    dataDir=D:\data\logs\zookeeper 
    dataLogDir=D:\data\logs\zookeeper 
    1.6 添加如下系统变量:ZOOKEEPER_HOME: D:\zookeeper-3.4.8 
    Path: 在现有的值后面添加 ;%ZOOKEEPER_HOME%\bin; 
    1.7 运行Zookeeper: 打开cmd然后执行zkserver 命令。如果打印以下信息则表示zookeeper已经安装成功并运行在2181端口。 
    这里写图片描述

    2.安装并运行Kafka 
    2.1 下载安装文件: http://kafka.apache.org/downloads.html 
    2.2 解压文件(本文解压到 D:\kafka_2.11-0.10.2.0) 
    2.3 打开D:\kafka_2.11-0.10.2.0\config\ server.properties 
    2.4 把 log.dirs的值改成 log.dirs=D:\data\logs\kafka 
    2.5 D:\kafka_2.11-0.10.2.0\bin文件夹下的.sh命令脚本是在shell下运行的,此文件夹下还有个 windows文件夹,里面是windows下运行的.bat命令脚本 
    2.6 在D:\kafka_2.11-0.10.2.0文件夹中”Shift+鼠标右键”点击空白处打开命令提示窗口 
    2.7 输入并执行一下命令以打开kafka:

    .\bin\windows\kafka-server-start.bat .\config\server.properties

     

    显示的信息如下,则表示正常运行 
    这里写图片描述

    3.创建topics 
    3.1在D:\kafka_2.11-0.10.2.0\bin\windows文件夹中”Shift+鼠标右键”点击空白处打开命令提示窗口

    kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

     

    4.打开一个Producer 
    4.1在D:\kafka_2.11-0.10.2.0\bin\windows文件夹中”Shift+鼠标右键”点击空白处打开命令提示窗口

    kafka-console-producer.bat --broker-list localhost:9092 --topic test

     

    5.打开一个Consumer 
    5.1在D:\kafka_2.11-0.10.2.0\bin\windows文件夹中”Shift+鼠标右键”点击空白处打开命令提示窗口

    kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

     

    注意:以上打开的窗口不要关闭 
    然后就可以在Producer控制台窗口输入消息了。在消息输入过后,很快Consumer窗口就会显示出Producer发送的消息: 
    这里写图片描述

     

     

    转另外一篇:

    近期在搞kafka,在Windows环境搭建的过程中遇到一些问题,把具体的流程几下来防止后面忘了。 
    准备工作: 
    1.安装jdk环境 
    http://www.oracle.com/technetwork/java/javase/downloads/index.html 
    2.下载kafka的程序安装包: 
    http://kafka.apache.org/downloads 
    解压文件提取出里面的文件 
    在cmd命令行打开至根目录下,内容如下: 
    这里写图片描述 
    (1).bin目录下存放的是程序运行时使用的脚本文件,window平台是一个独立的文件夹里面存放着 .bat 文件,bin的目录下存放的是 Linux 平台使用的 .sh 的shell脚本,在window平台上用不到,嫌麻烦可以删了。 
    (2).config目录下存放的是一些程序运行的配置文件,在后期自定义使用kafka的时候需要修改里面的文件内容。 
    (3).libs目录是打包好的jar包,这个版本自带了zookeeper的jar包,所以在安装的过程中不需要再在本地安装zookeeper了。

    1. 启动zookeeper: 
      在正常启动zoopkeeper之前需要修改zookeeper.properties的文件内容,将其data的输出目录指定一下,可自行创建一个文件夹如下: 
      这里写图片描述 
      然后启动:
    bin\windows\zookeeper-server-start.bat config\zookeeper.properties

     

    正常启动后他的状态是这样的: 
    这里写图片描述
    3. 启动kfaka服务: 
    在启动前,任然需要修改server.properties中log.dir的配置目录, 
    这里写图片描述 
    修改后,启动服务: 
    这里写图片描述 
    4. 创建一个主题:

    bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kjTest

     

    这里写图片描述

    可以使用如下命令查看创建的主题列表:

    bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

     

    这里写图片描述 
    5. 启动生产者:

    bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic kjTest

     

    这里写图片描述

    此时可以从控制台输入信息,待消费者启动后可接收到生产者发布的消息。 
    这里写图片描述
    6. 启动消费者:

    bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kjTest --from-beginning

     

    此时便能看到发布出去的消息了 
    这里写图片描述
    7. 在后期还可以启动多个集群服务,同时进行操作,修改server.properties中的内容:便可启动,与上相似。

    config/server-1.properties:
            broker.id=<new id>
          listeners=PLAINTEXT://:<new port>
        log.dir=<you log dir>   

     

    操作步骤:
    

     

    1).分别启动需要的kafka服务

    这里写图片描述

    这里写图片描述 
    2). 创建主题,设置分区数目 
    这里写图片描述
    可以使用下面的命令查看是否创建成功,和具体的分区 
    这里写图片描述
    3). 分别启动生产者消费者 
    这里写图片描述

    这里写图片描述
    4). 可以查看下当前工作状态 
    这里写图片描述
    查看得当前服务使用的是broker编号为2的服务,但我们将其down掉后 
    这里写图片描述 
    再次执行消息发送任务: 
    producer

    consumer 
    消息仍能发送成功,命令行中会提示一个服务已关闭。 
    就查看当前状态: 
    这里写图片描述 
    此时的leader换成了broker 0

    展开全文
  • 下载本文采用 kafka_2.13-2.6.0 版本,2.13 是scala版本号,2.6.0 是kafka版本号zookeeperkafka依赖zookeeperkafka内置了zookeeper,在 kafka_2.13-2.6.0\libs 目录下有zookeeper的jar包。但一般不用内置的kafka_...

    下载

    本文采用 kafka_2.13-2.6.0 版本,2.13 是scala版本号,2.6.0 是kafka版本号

    zookeeper

    kafka依赖zookeeper

    kafka内置了zookeeper,在 kafka_2.13-2.6.0\libs 目录下有zookeeper的jar包。

    但一般不用内置的

    kafka_2.13-2.6.0 版本 需要zookeeper 3.5.8 版本

    查看方式:

    安装、启动

    安装kafka

    解压缩即可

    配置kafka

    kafka配置文件较多,在 kafka_2.13-2.6.0目录\config 目录下

    server.properties:kafka服务配置文件

    consumer.properties:消费者配置文件,用于命令操作,测试

    producer.properties:生产者配置文件,用于命令操作,测试

    zookeeper.properties:内置zookeeper配置文件,一般不用内置的

    修改server.properties

    kafka服务配置文件

    修改broker id# kafka集群中broker的id,必须是整数,且broker之间唯一

    broker.id=0

    配置数据文件夹

    在 kafka_2.13-2.6.0目录目录下创建 logs 文件夹

    修改配置如下:

    # kafka数据文件

    log.dirs=D:\\devtools\\kafka_2.13-2.6.0\\logs

    这里名字是 log.dir 其实不是日志,是数据文件

    配置zookeeper

    将zookeeper服务器ip、端口号配置在下面

    zookeeper.connect=localhost:2181

    配置kafka端口号

    默认是9092

    #listeners=PLAINTEXT://:9092

    其他配置#处理网络请求的线程数量

    num.network.threads=3

    #用来处理磁盘 IO 的现成数量

    num.io.threads=8

    #发送套接字的缓冲区大小

    socket.send.buffer.bytes=102400

    #接收套接字的缓冲区大小

    socket.receive.buffer.bytes=102400

    #请求套接字的缓冲区大小

    socket.request.max.bytes=104857600

    #topic 在当前 broker 上的分区个数

    num.partitions=1

    #用来恢复和清理 data 下数据的线程数量

    num.recovery.threads.per.data.dir=1

    #segment 文件保留的最长时间,超时将被删除

    log.retention.hours=168

    启动程序

    windows启动程序在 kafka_2.13-2.6.0\bin\windows 目录下

    kafka-server-start.bat:启动kafka服务

    kafka-server-stop.bat:关闭kafka服务

    kafka-topics.bat:操作topic

    kafka-console-consumer.bat:控制台操作消费者

    kafka-console-producer.bat:控制台操作生产者

    启动kafka

    需要先启动 zookeeper

    在 kafka_2.13-2.6.0 目录下,按下图操作:

    打开 Powershell 或 cmd ,执行下面命令:

    .\bin\windows\kafka-server-start.bat .\config\server.properties

    看到下面界面表示启动成功:

    关闭kafka.\bin\windows\kafka-server-stop.bat .\config\server.properties

    查看zookeeper

    启动zookeeper客户端,查看节点:

    ls /

    结果如下:

    展开全文
  • C++搭建kafka客户端用了我半个月的时间,avro序列化用了半个月的时间。相信好多人也是苦于没有系统性的参考耽误了好多工夫,我写这篇文档希望能给别人一些辅助。 开发准备工作: 本地装了一套kafka的环境: 序...
    • 需求:

    在原在水力发电厂在线监测VC++6.0软件中实现kafka C++客户端,将原始波形信号及计算指标点通过消息avro序列化发送给大数据平台侧kafka集群。

    C++搭建kafka客户端用了我半个月的时间,avro序列化用了半个月的时间。相信好多人也是苦于没有系统性的参考耽误了好多工夫,我写这篇文档希望能给别人一些辅助。


    • 开发准备工作:

           本地装了一套kafka的环境:

    序号 名称 备注 下载链接
    1 JDK Java开发环境 https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

    2

    Zookeeper

    分布式应用程序协调服务

    http://mirror.bit.edu.cn/apache/zookeeper/

    3

    Kafka

    Kafka开发环境

    http://kafka.apache.org/downloads.html


    • 操作流程:
    • 下载或拷贝JDK安装包:根据计算机系统类型下载对应位数的JDK版本,本人使用的是jdk1.8.0_73版本。安装程序逐步执行,没有特殊要求;
    • 系统环境变量:右键计算机->属性,在左边点击“高级系统设置”,点击 高级->环境变量,这是点击下面的系统环境变量->  新建(系统环境变量)。变量名设置为:JAVA_HOME。变量值设置为:JDK的安装路径(注:bin的上一级路径);

    • 设置PATH:还是在系统环境变量设置界面,找到系统变量名为Path的变量,双击修改。在变量值尾部添加括号内的内容[;C:\Program Files (x86)\Java\jdk1.8.0_73\bin](注:若添加前变量值尾部有‘;’则输入内容无需填写‘;’。即保证JDK环境与之前环境使用;分割开);
    • 验证:进入windows终端[窗口+R],输入命令:java -version。若出现类似下列JDK版本信息则证明安装JDK成功;

    • zookeeper部署
    • kafka运行依赖于zookeeper,因此在安装kafka前需将zookeeper安装完成。运行时同样先运行zookeeper再运行kafka;
    • 解压zookeeper压缩包:将zookeeper解包后放在[D:\]路径下;
    • 重命名zoo配置文件:将zookeeper\ conf\zoo_sample.cfg重命名为zoo.cfg;
    • 修改zoo.cfg配置文件:将原文件中dataDir及dataLogDir所在行修改为:

             dataDir=D:\data\logs\zookeeper 
             dataLogDir=D:\data\logs\zookeeper (若原配置文件中无此行则将此行写在dataDir下面)

    • 系统环境变量:右键计算机->属性,在左边点击“高级系统设置”,点击 高级->环境变量,这是点击下面的系统环境变量->  新建(系统环境变量)。变量名设置为:ZOOKEEPER_HOME。变量值设置为:D:\zookeeper-3.4.14;

    • 设置PATH:还是在系统环境变量设置界面,找到系统变量名为Path的变量,双击修改。在变量值尾部添加括号内的内容[;%ZOOKEEPER_HOME%\bin;](注:若添加前变量值尾部有‘;’则输入内容无需填写‘;’。即保证zookeeper环境与之前环境使用;分割开);
    • 运行zookeeper:进入windows终端[窗口+R],输入命令:zkserver运行zookeeper服务程序。若出现类似下列描述信息则证明zookeeper大致安装成功;

    • kafka部署
    • 解压kafka压缩包:将kafka解包后放在[D:\]路径下;

    修改kafka服务端配置文件:打开D:\ kafka_2.12-2.2.0\config\ server.properties。log.dirs的值改成 log.dirs=D:\data\logs\kafka


    • 验证测试:
    • 1、kafka客户端测试:
    • 1.1、运行zookeeper服务器:[窗口+R]打开终端,输入命令:zkserver 回车;
    • 1.2、执行kafka:在D:\kafka_2.12-2.2.0文件夹上[shift+右键]打开命令行执行kafka,输入命令:.\bin\windows\kafka-server-start.bat .\config\server.properties;
    • 1.3、创建名为test的topic:在D:\ kafka_2.12-2.2.0\bin\windows文件夹上[shift+右键]打开命令行,输入命令:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitons 1 --topic test;
    • 1.4、打开生产者Producer,向名为test的topic推送消息:在D:\kafka_2.12-2.2.0\bin\windows文件夹上[shift+右键]打开命令行,输入:kafka-console-producer.bat --broker-list localhost:9092 --topic test;
    • 1.5、打开消费者Consumer,从名为test的topic读取消息:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning;
    • 1.6、发送数据:在Producer窗口输入框输入字符及数字回车,理论上会在Consumer界面弹出所发送的内容,大致如下;

     

    • 2、测试程序验证

    使用testkafka.cpp,加载librdkafka动态链接库。代码大致如下:

    #include "pch.h"
    #include <stdio.h>
    #include <signal.h>
    #include <string.h>
    #include <inttypes.h>
    
    #include "..\..\rdkafka\rdkafka.h"
    
    #pragma comment(lib, "librdkafka.lib")
    
    static int run = 1;
    
    static void stop(int sig) 
    {
    	run = 0;
    	fclose(stdin); /* abort fgets() */
    }
    
    static void dr_msg_cb(rd_kafka_t *rk,
    					   const rd_kafka_message_t *rkmessage, void *opaque) {
    	if (rkmessage->err)
    		fprintf(stderr, "%% Message delivery failed: %s\n",
    				rd_kafka_err2str(rkmessage->err));
    	else
    		fprintf(stderr,
    				"%% Message delivered (%zd bytes, "
    				"partition %d)\n",
    				 rkmessage->len, rkmessage->partition);
    }
    
    
    
    int main(int argc, char **argv) {
    	rd_kafka_t *rk;         /* Producer instance handle */
    	rd_kafka_topic_t *rkt;  /* Topic object */
    	rd_kafka_conf_t *conf;  /* Temporary configuration object */
    	char errstr[512];       /* librdkafka API error reporting buffer */
    	char buf[512];          /* Message value temporary buffer */
    	const char *brokers;    /* Argument: broker list */
    	const char *topic;      /* Argument: topic to produce to */
    
    	if (argc != 3) {
    		fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
    		return 1;
    	}
    
    	brokers = argv[1];
    	topic   = argv[2];
    
    	conf = rd_kafka_conf_new();
    
    	if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
    		errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    		fprintf(stderr, "%s\n", errstr);
    		return 1;
    	}
    
    	rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
    
    	rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    	if (!rk) {
    		fprintf(stderr,
    				"%% Failed to create new producer: %s\n", errstr);
    		return 1;
    	}
    
    	rkt = rd_kafka_topic_new(rk, topic, NULL);
    
    	if (!rkt) {
    		fprintf(stderr, "%% Failed to create topic object: %s\n",
    				rd_kafka_err2str(rd_kafka_last_error()));
    		rd_kafka_destroy(rk);
    		return 1;
    	}
    
    	signal(SIGINT, stop);
    
    	fprintf(stderr,
    			"%% Type some text and hit enter to produce message\n"
    			"%% Or just hit enter to only serve delivery reports\n"
    			"%% Press Ctrl-C or Ctrl-D to exit\n");
    
    	while (run && fgets(buf, sizeof(buf), stdin)) {
    		size_t len = strlen(buf);
    
    		if (buf[len - 1] == '\n') /* Remove newline */
    			buf[--len] = '\0';
    
    		if (len == 0) {
    			/* Empty line: only serve delivery reports */
    			rd_kafka_poll(rk, 0/*non-blocking */);
    			continue;
    		}
    
    	
    retry:
    		if (rd_kafka_produce(
    			rkt,
    			RD_KAFKA_PARTITION_UA,
    			RD_KAFKA_MSG_F_COPY,
    			buf, len,
    			NULL, 0,
    			NULL) == -1) {
    			fprintf(stderr,
    					"%% Failed to produce to topic %s: %s\n",
    					rd_kafka_topic_name(rkt),
    					rd_kafka_err2str(rd_kafka_last_error()));
    
    			
    			if (rd_kafka_last_error() ==
    				RD_KAFKA_RESP_ERR__QUEUE_FULL) {
    				rd_kafka_poll(rk, 1000/*block for max 1000ms*/);
    				goto retry;
    			}
    		}
    		else {
    			fprintf(stderr, "%% Enqueued message (%zd bytes) "
    					"for topic %s\n",
    					len, rd_kafka_topic_name(rkt));
    		}
    
    		rd_kafka_poll(rk, 0/*non-blocking*/);
    	}
    
    	fprintf(stderr, "%% Flushing final messages..\n");
    	rd_kafka_flush(rk, 10 * 1000 /* wait for max 10 seconds */);
    
    	rd_kafka_topic_destroy(rkt);
    
    	rd_kafka_destroy(rk);
    
    	return 0;
    }
    
    • 操作测试验证1,步骤1/2/3/5;
    • 运行testkafka程序,输入命令 localhost test运行结果如下,证明消息发送成功。


    测试的时候可能出现的问题:

    1. broker may not available:这是因为kafka-server-start.bat没执行成功。
    2. count not reserve enouth space 1048576 object heap:

    找到JDK路径下的bin下运行jvisualvm.exe。 查看JDK堆栈最大空间。例本机256M

    kafka路径下\bin\windows\kafka-server-start.bat 中修改 64位系统堆栈空间修改为:KAFKA_HEAP_OPTS=-Xmx256M -Xms128M;

    重新执行kafka-server-start.bat;


    至此kafka环境搭建好了。下一步搭建avro C++环境。avro的编译环境比较费劲,编译了一周多最后还是我们老大任总搞定的。


    装avro C++可以参考官方手册:https://avro.apache.org/docs/current/api/cpp/html/index.html

    先装CMAKE,我用的编译器是CMAKE版本是3.14.4;

    装BOOST库,我用的是BOOST版本是1.70.0,参数中选用VS2017对应版本的编译器,静态链接,用boost库编译出来几个lib,分别为:boost_filesystem.lib、boost_iostreams.lib、boost_program_options.lib、boost_system.lib。正常编译完boost以后库的名称不是这个,自己根据自己要用debug版还是release版还是,然后库用的静态链接还是动态链接。

    用cmake打开avro C++的原代码生成一个VS2017的工程文件,然后用VS2017编译生成avro-cpp.dll。(中途有些麻烦,有些地方是我们老大帮我弄的,主要是boost库的配置和zlib的库)
     


    我这个项目的需求是,大数据平台有一套avro 的消息格式,这个格式叫做schema,是一个形式为JSON字符串的一个avro格式。

    需要将数据序列化成schema通过kafka客户端发送给大数据平台。在avro-cpp.sln的工程里有一个工程叫做avrogencpp的工程,把这个工程编译生成一个exe,然后子命令行输入命令:avrogencpp -i XXX.JSON -o XXX.h -n c

    会生成一个头文件,需要把数据存进这个头文件中包含的类中然后把数据序列化发送。

    但是C++版本的avro与java的有点区别,java序列化之后数据前面会有一个头部信息,头部信息包含三组信息:

    "O"“b”“j”“0x01”四个字节;

    JSON的schema;

    同步位;

    avro C++序列化时虽然没有这个头部信息,但是c++版本序列化时存成序列化文件时会包含头部信息,可以用现成的api。这样就能实现C++版本客户端发送avro序列化的全部功能了。


    代码奉上:.h

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    
    #ifndef GMDATA_H_487324861__H_
    #define GMDATA_H_487324861__H_
    
    
    #include <sstream>
    #include "boost/any.hpp"
    #include "avro/Specific.hh"
    #include "avro/Encoder.hh"
    #include "avro/Decoder.hh"
    
    namespace c {
    struct GMData_json_Union__0__ {
    private:
        size_t idx_;
        boost::any value_;
    public:
        size_t idx() const { return idx_; }
        float get_float() const;
        void set_float(const float& v);
        std::string get_string() const;
        void set_string(const std::string& v);
        GMData_json_Union__0__();
    };
    
    struct RtValue {
        typedef GMData_json_Union__0__ value_t;
        int64_t time;
        value_t value;
        RtValue() :
            time(int64_t()),
            value(value_t())
            { }
    };
    
    struct RtData {
        std::string code;
        std::vector<RtValue > data;
        RtData() :
            code(std::string()),
            data(std::vector<RtValue >())
            { }
    };
    
    inline
    float GMData_json_Union__0__::get_float() const {
        if (idx_ != 0) {
            throw avro::Exception("Invalid type for union");
        }
        return boost::any_cast<float >(value_);
    }
    
    inline
    void GMData_json_Union__0__::set_float(const float& v) {
        idx_ = 0;
        value_ = v;
    }
    
    inline
    std::string GMData_json_Union__0__::get_string() const {
        if (idx_ != 1) {
            throw avro::Exception("Invalid type for union");
        }
        return boost::any_cast<std::string >(value_);
    }
    
    inline
    void GMData_json_Union__0__::set_string(const std::string& v) {
        idx_ = 1;
        value_ = v;
    }
    
    inline GMData_json_Union__0__::GMData_json_Union__0__() : idx_(0), value_(float()) { }
    }
    namespace avro {
    template<> struct codec_traits<c::GMData_json_Union__0__> {
        static void encode(Encoder& e, c::GMData_json_Union__0__ v) {
            e.encodeUnionIndex(v.idx());
            switch (v.idx()) {
            case 0:
                avro::encode(e, v.get_float());
                break;
            case 1:
                avro::encode(e, v.get_string());
                break;
            }
        }
        static void decode(Decoder& d, c::GMData_json_Union__0__& v) {
            size_t n = d.decodeUnionIndex();
            if (n >= 2) { throw avro::Exception("Union index too big"); }
            switch (n) {
            case 0:
                {
                    float vv;
                    avro::decode(d, vv);
                    v.set_float(vv);
                }
                break;
            case 1:
                {
                    std::string vv;
                    avro::decode(d, vv);
                    v.set_string(vv);
                }
                break;
            }
        }
    };
    
    template<> struct codec_traits<c::RtValue> {
        static void encode(Encoder& e, const c::RtValue& v) {
            avro::encode(e, v.time);
            avro::encode(e, v.value);
        }
        static void decode(Decoder& d, c::RtValue& v) {
            if (avro::ResolvingDecoder *rd =
                dynamic_cast<avro::ResolvingDecoder *>(&d)) {
                const std::vector<size_t> fo = rd->fieldOrder();
                for (std::vector<size_t>::const_iterator it = fo.begin();
                    it != fo.end(); ++it) {
                    switch (*it) {
                    case 0:
                        avro::decode(d, v.time);
                        break;
                    case 1:
                        avro::decode(d, v.value);
                        break;
                    default:
                        break;
                    }
                }
            } else {
                avro::decode(d, v.time);
                avro::decode(d, v.value);
            }
        }
    };
    
    template<> struct codec_traits<c::RtData> {
        static void encode(Encoder& e, const c::RtData& v) {
            avro::encode(e, v.code);
            avro::encode(e, v.data);
        }
        static void decode(Decoder& d, c::RtData& v) {
            if (avro::ResolvingDecoder *rd =
                dynamic_cast<avro::ResolvingDecoder *>(&d)) {
                const std::vector<size_t> fo = rd->fieldOrder();
                for (std::vector<size_t>::const_iterator it = fo.begin();
                    it != fo.end(); ++it) {
                    switch (*it) {
                    case 0:
                        avro::decode(d, v.code);
                        break;
                    case 1:
                        avro::decode(d, v.data);
                        break;
                    default:
                        break;
                    }
                }
            } else {
                avro::decode(d, v.code);
                avro::decode(d, v.data);
            }
        }
    };
    
    }
    #endif
    

    .cpp

    int  GKafkaPushFeild(const  char *BrokersName, const  char *TopicName, unsigned char *Data, int Size, int FeildNum, char *Return)
    {
    	if ((BrokersName == NULL) || (TopicName == NULL) || (Data == NULL))
    	{
    		return -1;
    	}
    	int l_iRetCode = 0;
    	int l_iIndex = 0;
    	std::vector<uint8_t> l_sSchema;
    
    	c::RtValue l_value;
    	std::vector<c::RtData > l_RtData;
    
    	__int64	l_lTime = 0;
    	float	l_fTemp = 0.0F;
    	int		l_iFeildNum = FeildNum;
    	unsigned char *l_cpTempData = Data;
    	unsigned char *l_cStartData = Data;
    	char l_cTempCode[CODENUM + 1] = { 0 };
    
    	memcpy(&l_lTime, l_cpTempData, 8);
    	l_cpTempData += 8;
    
    	char filename[1024] = "tempData.avro";
    	remove(filename);
    	avro::ValidSchema writerSchema(makeValidSchema(sch));
    	typedef std::pair<avro::ValidSchema, c::RtData > Pair;
    	avro::DataFileWriter<c::RtData> df(filename, writerSchema,100);
    	
    	for (int i = 0; i < l_iFeildNum; i++)
    	{
    		c::RtData l_cRtData;
    		c::RtValue l_cRtValue;
    		memcpy(l_cTempCode, l_cpTempData, CODENUM);
    		l_cTempCode[CODENUM - 1] = '\0';
    
    		l_cpTempData += CODENUM;
    		memcpy(&l_fTemp, l_cpTempData, DATANUM);
    		l_cpTempData += DATANUM;
    
    		l_cRtData.code = l_cTempCode;
    		//memcpy(l_RtData[l_iIndex].code.c_str,l_cTempCode,CODENUM);
    		//l_RtData[l_iIndex].code = l_cTempCode;
    		l_cRtValue.time = l_lTime;
    		l_cRtValue.value.set_float(l_fTemp);
    		l_cRtData.data.push_back(l_cRtValue);
    		l_RtData.push_back(l_cRtData);
    		//Pair l_pair(writerSchema, l_cRtData);
    		df.write(l_cRtData);
    	}
    	df.close();	
    	
    	FILE *l_fp = NULL;
    	char l_char = 'a';
    	int l_iReadNum = 0;
    	std::string strContent;
    	errno_t l_err = fopen_s(&l_fp, filename, "rb");
    	if (l_err < 0)
    	{
    		return -1;
    	}
    	else
    	{
    		fseek(l_fp, 0, SEEK_END);
    		l_iReadNum = ftell(l_fp);
    		if (0 == l_iReadNum)
    		{
    
    			l_iReadNum = 0;
    			strContent = "";
    			fclose(l_fp);
    			return 1;
    
    		}
    		strContent.resize(l_iReadNum);
    		fseek(l_fp, 0, SEEK_SET);
    		fread((char*)&strContent[0], l_iReadNum, 1, l_fp);
    	}
    	fclose(l_fp);
    	remove(filename);
    	
    	return 0;
    }

    从工程里面粘出来的,需要自己手动改一下生成一个动态链接库给VC6.0调用。希望对你有帮助!

    展开全文
  • 搭建 Kafka-0.10.2 源码阅读环境及 Windows 本地运行一、版本信息二、构建Kafka源码环境三、配置Kafka源码环境构建 bin 包 一、版本信息 Kafka:0.10.2、Scala:2.10.6、Java:1.8.0_221、IntelliJ IDEA:2019.2、...
  • 首先,本地Java环境变量配置成功 1.安装Zookeeper Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper 1.1 下载安装文件: http://mirror.bit.edu.cn/apache/zookeeper/ 1.2 解压文件...
  • 由于kafka依赖java,Zookeeper,故在之前需要安装java,Zookeeper 安装java:略 安装Zookeeper: 下载地址:https://zookeeper.apache.org/releases.html,并解压到相应文件夹下面 安装Kafka: 下载地址:...
  • windows 系统下搭建kafka超详细教程 + 搭建所需要的软件包,包含: jdk-13_windows-x64_bin apache-zookeeper-3.6.1-bin kafka_2.12-2.5.0
  • 搭建kafka+zookeeper+spark streaming 本地开发环境 暂无kafka,zookpeer集群开发环境,先搭建本地的在线实时计算测试环境1,安装配置zookeeper本地开发环境:下载zookeeper,下载地址 : ...\1.BeiJingSpark\zooke
  • 下面就来演示如何在windows环境下来编译kafka源码,并通过IntelliJ IDEA开发工具搭建kafka的源码环境,以方便在本地通过debug调试来研究kafka的内部实现机制。 具体步骤: (1)安装jdk,版本为1.8.0_131,配置...
  • 最近项目中使用了kafka作为消息中间件进行服务间数据通信;...所以就本地搭个单机版的 kafka自己玩玩...   kafka 2.11的安装包就留个百度云地址了..... https://pan.baidu.com/s/1L_9bbieTSsjWAe_3JLzN3w 自...
  • 本文将本地kafka+springboot服务搭建起来 首先下载kafka和zookeeper kafka_2.12-1.1.0 下载地址:http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz zookeeper-3.4.12 下载地址:...
  • 为了本地测试,又不得不在windows环境下,搭建kafka伪分布式集群,记录一下过程 注意:以下所有目录,都不要出现空格!!! 伪分布式集群节点个数配置:zookeeper 和 kafka 分别3个节点 1.前期准备 JDK 1.8 ...
  • 搭建单节点以及如何CURD主题topic请看我上一篇文章:https://blog.csdn.net/qq_42390636/article/details/114802694 搭建集群 ...首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替
  • 如何快速的投入到Flink的学习当中,很多人在...本文将利用Flink的官方练习环境,在本地Windows系统中快速的搭建Flink环境,并详细的记录整个搭建过程。Flink的环境搭建需要一定的时间,有多种方法可以在各种环境中...
  • 注意:请确保本地Java环境变量配置成功 1.安装Zookeeper Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper 1.1 下载安装文件: http://mirror.bit.edu.cn/apache/zookeeper/ 1.2 ...
  • Windows 10 JDK1.8.0_171 zookeeper-3.4.8/ kafka_2.11-0.10.0.1.tgz 点击链接进行下载 1. JDK安装和环境搭建  自行百度。 2. zookeeper 安装和运行 a. 点击上方链接直接下载,或者有其他链接,下载此...

空空如也

空空如也

1 2 3
收藏数 56
精华内容 22
关键字:

windows本地搭建kafka