c#读取kafka

2019-03-01 10:24:39 sinat_31465609 阅读数 4900

一:kafka介绍

kafka(官网地址:http://kafka.apache.org)是一种高吞吐量的分布式发布订阅的消息队列系统,具有高性能和高吞吐率。

1.1 术语介绍

  • Broker

Kafka集群包含一个或多个服务器,这种服务器被称为broker

  • Topic

主题:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

  • Partition

分区:Partition是物理上的概念,每个Topic包含一个或多个Partition.(一般为kafka节点数cpu的总核数)

  • Producer

生产者,负责发布消息到Kafka broker

  • Consumer

消费者:从Kafka broker读取消息的客户端。

  • Consumer Group

消费者组:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
1.2 基本特性
可扩展性
在不需要下线的情况下进行扩容
数据流分区(partition)存储在多个机器上
高性能
单个broker就能服务上千客户端
单个broker每秒种读/写可达每秒几百兆字节
多个brokers组成的集群将达到非常强的吞吐能力
性能稳定,无论数据多大
Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。
1.3 消息格式
一个topic对应一种消息格式,因此消息用topic分类
一个topic代表的消息有1个或者多个patition(s)组成
一个partition应该存放在一到多个server上,如果只有一个server,就没有冗余备份,是单机而不是集群;如果有多个server,一个server为leader(领导者),其他servers为followers(跟随者),leader需要接受读写请求,followers仅作冗余备份,leader出现故障,会自动选举一个follower作为leader,保证服务不中断;每个server都可能扮演一些partitions的leader和其它partitions的follower角色,这样整个集群就会达到负载均衡的效果
消息按顺序存放;消息顺序不可变;只能追加消息,不能插入;每个消息都有一个offset,用作消息ID, 在一个partition中唯一;offset有consumer保存和管理,因此读取顺序实际上是完全有consumer决定的,不一定是线性的;消息有超时日期,过期则删除
1.4 原理解析
producer创建一个topic时,可以指定该topic为几个partition(默认是1,配置num.partitions),然后会把partition分配到每个broker上,分配的算法是:a个broker,第b个partition分配到b%a的broker上,可以指定有每个partition有几分副本Replication,副本的分配策略为:第c个副本存储在第(b+c)%a的broker上。一个partition在每个broker上是一个文件夹,文件夹中文件的命名方式为:topic名称+有序序号。每个partition中文件是一个个的segment,segment file由.index和.log文件组成。两个文件的命名规则是,上一个segmentfile的最后一个offset。这样,可以快速的删除old文件。

producer往kafka里push数据,会自动的push到所有的分区上,消息是否push成功有几种情况:1,接收到partition的ack就算成功,2全部副本都写成功才算成功;数据可以存储多久,默认是两天;producer的数据会先存到缓存中,等大小或时间达到阈值时,flush到磁盘,consumer只能读到磁盘中的数据。

consumer从kafka里poll数据,poll到一定配置大小的数据放到内存中处理。每个group里的consumer共同消费全部的消息,不同group里的数据不能消费同样的数据,即每个group消费一组数据。

consumer的数量和partition的数量相等时消费的效率最高。这样,kafka可以横向的扩充broker数量和partitions;数据顺序写入磁盘;producer和consumer异步

二:环境搭建(windows)

2.1 安装zookeeper
kafka需要用到zookeeper,所以需要先安装zookeeper

到官网下载最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/
解压到指定路径
复制conf目录下zoo_sample.cfg,粘贴改名为zoo.cfg,修改zoo.cfg中的dataDir的值为E:/data/zookeeper,并添加一行dataLogDir=E:/log/zookeeper
修改系统环境变量,在Path后添加 ;E:\zookeeper\zookeeper-3.4.10\bin
运行cmd命令窗口,输入zkServer回车,启动
2.2 安装kafka
到官网下载最新版kafka,http://kafka.apache.org/downloads
解压到指定路径,如:E:\kafka_2.12-0.10.2.0
修改E:\kafka_2.12-0.10.2.0\config目录下的server.properties中 log.dirs的值为E:/log/kafka
添加系统环境变量,在Path后添加 ;E:\kafka_2.12-0.10.2.0\bin\windows
启动kafka,在cmd命令行用cd命令切换到kafka根目录E:\kafka_2.12-0.10.2.0,输入命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
出现started (kafka.server.KafkaServer)字样表示启动成功
运行cmd命令行,创建一个topic,命令如下:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
再打开一个cmd,创建一个Producer,命令如下:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
再打开一个cmd,创建一个Customer,命令如下:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
在Producer窗口下输入信息进行测试 ,每输入一行回车后消息马上就会出现在Customer中,表明kafka已经安装测试成功

三:基于.net的常用类库

基于.net实现kafka的消息队列应用,常用的类库有kafka-net,Confluent.Kafka,官网推荐使用Confluent.Kafka,本文也是基于该库的实现,使用版本预发行版1.0.0-beta,创建控制台应用程序。

四:应用–生产者

生产者将数据发布到指定的主题,一般生产环境下的负载均衡,服务代理会有多个,BootstrapServers属性则为以逗号隔开的多个代理地址

/// <summary>
/// 生产者
/// </summary>
public static void Produce()
{
     var config = new ProducerConfig { BootstrapServers = "localhost:9092" }
     Action<DeliveryReportResult<Null, string>> handler = r =>
     Console.WriteLine(!r.Error.IsError
         ? $"Delivered message to {r.TopicPartitionOffset}"
         : $"Delivery Error: {r.Error.Reason}");

    using (var producer = new Producer<Null, string>(config))
    {
        // 错误日志监视
        producer.OnError += (_, msg) => { Console.WriteLine($"Producer_Erro信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); };

        for (int i = 0; i < 5; i++)
        {
            // 异步发送消息到主题
            producer.BeginProduce("MyTopic", new Message<Null, string> { Value = i.ToString() }, handler);
        }   
        // 3后 Flush到磁盘
        producer.Flush(TimeSpan.FromSeconds(3));
    }
}

五:应用–消费者

消费者使用消费者组名称标记自己,并且发布到主题的每个记录被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在单独的进程中,也可以在不同的机器

如果所有消费者实例具有相同的消费者组,则记录将有效地在消费者实例上进行负载平衡。

如果所有消费者实例具有不同的消费者组,则每个记录将广播到所有消费者进程

上图为两个服务器Kafka群集,托管四个分区(P0-P3),包含两个消费者组。消费者组A有两个消费者实例,B组有四个消费者实例。

默认EnableAutoCommit 是自动提交,只要从队列取出消息,偏移量自动移到后一位,无论消息后续处理成功与否,该条消息都会消失,所以为免除处理失败的数据丢失,消费者方可设置该属性为false,后面进行手动commint()提交偏移

/// <summary>
  /// 消费者
  /// </summary>
  public static void Consumer()
  {
      var conf = new ConsumerConfig
      {
          GroupId = "test-consumer-group",
          BootstrapServers = "localhost:9092",
          AutoOffsetReset = AutoOffsetResetType.Earliest,
          EnableAutoCommit = false  // 设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失
      };
      using (var consumer = new Consumer<Ignore, string>(conf))
      {
          // 订阅topic
          consumer.Subscribe("MyTopic");
          // 错误日志监视 
          consumer.OnError += (_, msg) => { Console.WriteLine($"Consumer_Error信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); };

          while (true)
          {
              try
              {
                  var consume = consumer.Consume();
                  string receiveMsg = consume.Value;
                  Console.WriteLine($"Consumed message '{receiveMsg}' at: '{consume.TopicPartitionOffset}'.");
                  // 开始我的业务逻辑
                  ...
                  // 业务结束
                  if(成功)
                  {
                   	 consumer.Commit(new List<TopicPartitionOffset>() { consume.TopicPartitionOffset }); //手动提交偏移
                  }
              }
              catch (ConsumeException e)
              {
                  Console.WriteLine($"Consumer_Error occured: {e.Error.Reason}");
              }
          }
      }
  }

执行结果
在这里插入图片描述

常见数据问题处理

  1. 重复消费最常见的原因:re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session
    timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
    去重问题:消息可以使用唯一id标识
  2. 保证不丢失消息: 生产者(ack= -1 或 all 代表至少成功发送一次) 消费者
    (offset手动提交,业务逻辑成功处理后,提交offset)
  3. 保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据)
    业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)

Kafka 可视化调试

借助可视化客户端工具 kafka tool
具体使用可参考:https://www.cnblogs.com/frankdeng/p/9452982.html

2018-06-06 10:17:00 weixin_33790053 阅读数 227

最近跟数据部门对接时对方提供的kafka订阅服务,于是找了资料,写了个C#控制台程序消费了这个服务。

本文主要记录的内容是C#消费Kafka消息时选用kafka-net组件,遇到offset不是从0开始的情况时处理方法。

按照入门教程搭建测试环境并调试一切正常。

在生产环境中部署后遇到一直闪烁却无法消费的问题,看日志是可以连接成功的,后来跟生产方沟通后得知运维在部署kafka的时候设置了消息保留3天。

那么三天前的消息自动清除后,前面的offset在kafka里已经不存在了,这时候我们把offset设置成0时就会出现上述情况,那怎么获得起始的offset呢?

一、首先在本地安装jdk、Kafka和zookeeper。

1、jdk建议使用1.7以上版本。

2、下载安装并运行zookeeper (下载地址 http://zookeeper.apache.org/releases.html)

3、安装并运行Kafka (下载地址 http://kafka.apache.org/downloads.html

如上安装步骤在车江毅的博客《.net windows Kafka 安装与使用入门(入门笔记)》中写的很详细,我就不重复了。

二、在kafka中创建topic、生产者、消费者,上述博客中有详细命令,这里跳过。

三、根据上述博客中推荐,选用了kafka-net。源码地址: https://github.com/Jroland/kafka-net

1、生产者代码

private static void Produce(string broker, string topic,string message)
        {
            var options = new KafkaOptions(new Uri(broker));
            var router = new BrokerRouter(options);
            var client = new Producer(router);

            var currentDatetime = DateTime.Now;
            var key = currentDatetime.Second.ToString();
            var events = new[] { new Message(message) };
            client.SendMessageAsync(topic, events).Wait(1500);
            Console.WriteLine("Produced: Key: {0}. Message: {1}", key, events[0].Value.ToUtf8String());

            using (client) { }
        }

//调用示例:
Produce("http://地址:端口", "TopicName", "Hello World");

2、消费者代码,上面都是为了搭建测试环境,这次的任务是消费其他组提供的kafka消息

private static void Consume(string broker, string topic)
       {
            Uri[] UriArr = Array.ConvertAll<string, Uri>(broker.Split(','), delegate (string s) { return new Uri(s); });
            var options = new KafkaOptions(UriArr);
            var router = new BrokerRouter(options);
            var consumer = new Consumer(new ConsumerOptions(topic, router));

       //实际生产环境中消费不是每次都从0 offset开始的,查了资料发现java可以设置自动参数配置从最小offset开始或者最大offset开始
       //但kafka-net这个sdk中没有找到,所以就下面这段代码配置了offset的起始位置 Task<List<OffsetResponse>> offTask = consumer.GetTopicOffsetAsync(topic); var offsetResponseList = offTask.Result; var positionArr = new OffsetPosition[offsetResponseList.Count]; for (int i = 0; i < offsetResponseList.Count; i++) { //LogHelper.WriteLog(string.Format("获取到Kafka OffsetPosition信息 Partition:{0} maxOffset:{1} minOffset:{2}", offsetResponseList[i].PartitionId, offsetResponseList[i].Offsets[0], offsetResponseList[i].Offsets[1])); positionArr[i] = new OffsetPosition(offsetResponseList[i].PartitionId, offsetResponseList[i].Offsets[1]);//这里Offsets[0]是最大值,我们从头消费用的最小值Offsets[1] } Console.WriteLine("开始执行"); //如果作业执行一段时间后重启,下面可以从数据库中读取后再配置开始读取的位置 /*  try { using (IDbConnection dbConn = SingleDataBase.DBFactory.OpenRead<MessageQueue>()) { string sql = string.Format("SELECT [Partition], MAX(Offset)Offset FROM [表名] WITH(NOLOCK) WHERE topic='{0}' GROUP BY [Partition]", topic); var r = dbConn.SqlList<OffsetPositionEntity>(sql); if (r.Count > 0) { positionArr = Array.ConvertAll<OffsetPositionEntity, OffsetPosition>(r.ToArray(), delegate (OffsetPositionEntity p) { return new OffsetPosition(p.Partition, p.Offset + 1); }); } } } catch { //读取上次获取位置失败,从头开始获取,可能会导致重复获取 //LogHelper.WriteLog("没有获取到上次截至节点,将从头开始获取"); }        */ consumer.SetOffsetPosition(positionArr); //Consume returns a blocking IEnumerable (ie: never ending stream) foreach (var message in consumer.Consume()) { try { string messageStr = message.Value.ToUtf8String(); Console.WriteLine(messageStr); long r = 0; /*这里已经删掉了写入数据库部分*/ //LogHelper.WriteLog(string.Format("Response: Partition {0},Offset {1} : {2} ,Insert : {3}", message.Meta.PartitionId, message.Meta.Offset, messageStr, r)); } catch (Exception ex) { //LogHelper.WriteLog("消费Kafka消息保存到数据库时报错:" + ex.Message); } } } //调用示例: Consume("http://100.162.136.70:9092,http://100.162.136.71:9092,http://100.162.136.72:9092", "YourTopicName");

  以上代码中LogHelper.WriteLog部分用Console.WriteLine替换后可以直接输出到屏幕上,这里没有提供写日志相关方法。

 

  这篇文章写出来,就是为了记录高亮显示的那部分片段,其他都是为了让文章尽可能的完整。如果有朋友对kafka-net比较熟悉,有更方便的配置方式请留言分享,多谢~~~

 

转载于:https://www.cnblogs.com/erdeni/articles/kafkaNet-1.html

2020-02-17 10:55:00 qgx1990 阅读数 228

本文是C#引用Confluent.Kafka.dll实现kafka消息队列的实际开发例子

项目需要实现对Kafka的操作,看了很多博客基本上都是旧的方法,代码不可用,所以重新写一篇调用的方法,不足之处多多指出

一、开发环境

IDE:VS2017

.Net Framwork:4.5

Confluent.Kafka:1.3.0

二、测试代码,仅供参考

private readonly static string mTopick = "test";
private readonly static string mBootstrapServers = "localhost:9092";

/// <summary>
/// 生产者
/// </summary>
public static void Produce()
{
    var config = new ProducerConfig { BootstrapServers = mBootstrapServers };
    Action<DeliveryReport<Null, string>> handler = r =>
    Console.WriteLine(!r.Error.IsError
        ? $"Delivered message to {r.TopicPartitionOffset}"
        : $"Delivery Error: {r.Error.Reason}");
    var producerBuilder = new ProducerBuilder<Null, string>(config);
    // 错误日志监视
    producerBuilder.SetErrorHandler((p, msg) =>
    {
         Console.WriteLine($"Producer_Erro信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}");
    });

    using (var producer = producerBuilder.Build())
    {
         for (int i = 0; i < 5; i++)
         {
         // 异步发送消息到主题
         producer.Produce(mTopick, new Message<Null, string> { Value = i.ToString() }, handler);
         }
         // 3后 Flush到磁盘
         producer.Flush(TimeSpan.FromSeconds(3));
     }
}

/// <summary>
/// 消费者
/// </summary>
public static void Consumer()
{
     var conf = new ConsumerConfig
     {
        GroupId = "test-consumer-group",
        BootstrapServers = mBootstrapServers,
        AutoOffsetReset = AutoOffsetReset.Earliest,
        EnableAutoCommit = false  // 设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失
     };
     var consumerBuilder = new ConsumerBuilder<Ignore, string>(conf);
     // 错误日志监视
     consumerBuilder.SetErrorHandler((p, msg) =>
     {
          Console.WriteLine($"Consumer_Error信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}");
     });
     using (var consumer = consumerBuilder.Build())
     {
          // 订阅topic
          consumer.Subscribe(mTopick);
          while (true)
          {
               try
               {
                  var consume = consumer.Consume();
                  string receiveMsg = consume.Value;
                  Console.WriteLine($"Consumed message '{receiveMsg}' at: '{consume.TopicPartitionOffset}'.");
                   // 开始我的业务逻辑
                   //...
                   // 业务结束
                   if (true)
                   {
                        consumer.Commit(new List<TopicPartitionOffset>() { consume.TopicPartitionOffset }); //手动提交偏移
                   }
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Consumer_Error occured: {e.Error.Reason}");
                }
         }
    }
}

 

2018-11-02 10:45:42 qq_34894585 阅读数 8969

下载nuget包
在这里插入图片描述

1.定义公共配置类

 public abstract class KafkaBase
    {
        /// <summary>
        /// 获取Kafka服务器地址
        /// </summary>
        /// <param name="brokerNameKey">配置文件中Broker服务器地址的key的名称</param>
        /// <returns>返回获取到的Kafka服务器的地址明细</returns>
        public string GetKafkaBroker(string brokerNameKey = "Broker")
        {
            string kafkaBroker = System.Configuration.ConfigurationManager.AppSettings[brokerNameKey];
            if (string.IsNullOrEmpty(kafkaBroker) || string.IsNullOrWhiteSpace(kafkaBroker) || kafkaBroker.Length <= 0)
            {
                throw new ArgumentNullException("Kafka消息服务器地址不能为空!");
            }
            return kafkaBroker;
        }

        /// <summary>
        /// 主题名称
        /// </summary>
        /// <param name="topicNameKey">配置文件中主题的key名称</param>
        /// <returns>返回获取到的主题的具体值</returns>
        public string GetTopicName(string topicNameKey = "Topic")
        {
            string topicName = System.Configuration.ConfigurationManager.AppSettings[topicNameKey]; 

            if (string.IsNullOrEmpty(topicName) || string.IsNullOrWhiteSpace(topicName) || topicName.Length <= 0)
            {
                throw new ArgumentNullException("消息所属的主题不能为空!");
            }
            return topicName;
        }
        /// <summary>
        /// 组
        /// </summary>
        /// <returns></returns>
        public string GetGroupID(string groupIDKey= "GroupID")
        {
            string groupID = System.Configuration.ConfigurationManager.AppSettings[groupIDKey]; 
            if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
            {
                throw new ArgumentNullException("消息所属的组不能为空!");
            }
            return groupID;
        }
        /// <summary>
        /// 分区
        /// </summary>
        /// <returns></returns>
        public List<int> GetPartition(string partitionsKey= "Partitions")
        {

            List<int> partitions = new List<int>();
            System.Configuration.ConfigurationManager.AppSettings[partitionsKey].Split(',').ToList().ForEach(x => 
            {
                partitions.Add(Convert.ToInt32(x));
            });
            return partitions;
        }
        /// <summary>
        /// 一次消费消息数量
        /// </summary>
        /// <returns></returns>
        public int GetConsumerCount(string consumerCountKey= "ConsumerCount")
        {
            int count = Convert.ToInt32(System.Configuration.ConfigurationManager.AppSettings[consumerCountKey]);
            return count;
        }

        /// <summary>
        ///  写日志
        /// </summary>
        /// <param name="type">consumer,producer</param>
        /// <param name="info"></param>
        /// <param name="args"></param>
        public static void WriteLog(string type,string info, params object[] args)
        {
            try
            {
                string filelog = string.Format(@"{0}\{1}\", AppDomain.CurrentDomain.BaseDirectory, "kafkaLog");
                if (!Directory.Exists(filelog))
                {
                    Directory.CreateDirectory(filelog);
                }
                filelog = string.Format(@"{0}\{1}\{2}{3}", AppDomain.CurrentDomain.BaseDirectory, "kafkaLog", type + "_" + DateTime.Now.ToString("yyyy-MM-dd"), ".log");
                using (StreamWriter sw = File.AppendText(filelog))
                {
                    sw.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss 信息:\r\n"));
                    sw.WriteLine(string.Format(info, args));
                    sw.WriteLine("\r\n");
                    sw.Flush();
                    sw.Close();
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
    }

2.生产

 /// <summary>
    /// Kafka消息生产者
    /// </summary>
    public sealed class KafkaProducer : KafkaBase
    {
        
        private static readonly object Locker = new object();
        private static Producer<string, string> _producer;
        private static string _topic;
        private static List<int> _partition;
        /// <summary>
        /// 单例生产
        /// </summary>
        public KafkaProducer()
        {
            _topic = GetTopicName();      
            _partition = GetPartition();
            if (_producer == null)
            {
                lock (Locker)
                {
                    if (_producer == null)
                    {
                        
                        var config = new Dictionary<string, object>
                        {
                            { "bootstrap.servers", GetKafkaBroker() }         
                        };
                        _producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8));
                    }
                }
            }
        }
               
        /// <summary>
        /// 生产消息并发送消息
        /// </summary>
        /// <param name="key">key</param>
        /// <param name="message">需要传送的消息</param>
        public static void Produce(string key, string message)
        {        
            bool result = false;
            new KafkaProducer();
            int partition = _partition[new Random().Next(_partition.Count)];   
            if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0)
            {
                throw new ArgumentNullException("消息内容不能为空!");
            }  
            var deliveryReport = _producer.ProduceAsync(_topic, key, message, partition);
            deliveryReport.ContinueWith(task =>
            {
                if (task.Result.Error.Code == ErrorCode.NoError)
                {
                    result = true;
                }
                //Console.WriteLine("Producer:" + _producer.Name + "\r\nTopic:" + _topic + "\r\nPartition:" + task.Result.Partition + "\r\nOffset:" + task.Result.Offset + "\r\nMessage:" + task.Result.Value + "\r\nResult:" + result);
                WriteLog("producer", $"Topic:{ _topic } Partition:{ task.Result.Partition } Offset:{ task.Result.Offset } Message:{ task.Result.Value } result:{result}");
            });    
            _producer.Flush(TimeSpan.FromSeconds(10));  
            //return result;
        }
    }

3.消费

 /// <summary>
    /// Kafka消息消费者
    /// </summary>
    public sealed class KafkaConsumer : KafkaBase
    {

        #region 构造函数

        /// <summary>
        /// 构造函数,初始化IsCancelled属性
        /// </summary>
        public KafkaConsumer()
        {
            IsCancelled = false;
        }

        #endregion

        #region 属性

        /// <summary>
        /// 是否应该取消继续消费Kafka的消息,默认值是false,继续消费消息
        /// </summary>
        public bool IsCancelled { get; set; }

        #endregion

        #region 同步版本

        /// <summary>
        /// 指定的组别的消费者开始消费指定主题的消息
        /// </summary>
        /// <param name="broker">Kafka消息服务器的地址</param>
        /// <param name="topic">Kafka消息所属的主题</param>
        /// <param name="groupID">Kafka消费者所属的组别</param>
        /// <param name="action">可以对已经消费的消息进行相关处理</param>
        public void Consume(string broker, string topic, string groupID, Action<ConsumerResult> action = null)
        {
            if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
            {
                throw new ArgumentNullException("Kafka消息服务器的地址不能为空!");
            }

            if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
            {
                throw new ArgumentNullException("消息所属的主题不能为空!");
            }

            if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
            {
                throw new ArgumentNullException("用户分组ID不能为空!");
            }

            var config = new Dictionary<string, object>
                {
                    { "bootstrap.servers", broker },
                    { "group.id", groupID },
                    { "enable.auto.commit", true },  // this is the default
                    { "auto.commit.interval.ms", 5000 },
                    { "statistics.interval.ms", 60000 },
                    { "session.timeout.ms", 6000 },
                    { "auto.offset.reset", "smallest" }
                };


            using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
            {
                // Note: All event handlers are called on the main thread.
                //consumer.OnMessage += (_, message) => Console.WriteLine("Topic:" + message.Topic + " Partition:" + message.Partition + " Offset:" + message.Offset + " " + message.Value);
                //consumer.OnMessage += (_, message) => Console.WriteLine("Offset:【" + message.Offset + "】Message:【" + message.Value + "】");
                if (action != null)
                {
                    consumer.OnMessage += (_, message) => {
                        ConsumerResult messageResult = new ConsumerResult();
                        messageResult.Broker = broker;
                        messageResult.Topic = message.Topic;
                        messageResult.Partition = message.Partition;
                        messageResult.Offset = message.Offset.Value;
                        messageResult.Message = message.Value;

                        //执行外界自定义的方法
                        action(messageResult);
                    };
                }

                consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic " + end.Topic + " partition " + end.Partition + ", next message will be at offset " + end.Offset);

                consumer.OnError += (_, error) => Console.WriteLine("Error:" + error);

                //引发反序列化错误或消费消息出现错误!= NoError。
                consumer.OnConsumeError += (_, message) => Console.WriteLine("Error consuming from topic/partition/offset " + message.Topic + "/" + message.Partition + "/" + message.Offset + ": " + message.Error);

                consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets:" + commit.Error : "Successfully committed offsets:" + commit.Offsets);

                // 当消费者被分配一组新的分区时引发。
                consumer.OnPartitionsAssigned += (_, partitions) =>
                {
                    Console.WriteLine("Assigned Partitions:" + partitions + ", Member ID:" + consumer.MemberId);
                    //如果您未向OnPartitionsAssigned事件添加处理程序,则会自动执行以下.Assign调用。 如果你为它添加了事件处理程序,你必须明确地调用.Assign以便消费者开始消费消息。
                    consumer.Assign(partitions);
                };

                // Raised when the consumer's current assignment set has been revoked.
                //当消费者的当前任务集已被撤销时引发。
                consumer.OnPartitionsRevoked += (_, partitions) =>
                {
                    Console.WriteLine("Revoked Partitions:" + partitions);
                    // If you don't add a handler to the OnPartitionsRevoked event,the below .Unassign call happens automatically. If you do, you must call .Unassign explicitly in order for the consumer to stop consuming messages from it's previously assigned partitions.
                    //如果您未向OnPartitionsRevoked事件添加处理程序,则下面的.Unassign调用会自动发生。 如果你为它增加了事件处理程序,你必须明确地调用.Usessign以便消费者停止从它先前分配的分区中消费消息。
                    consumer.Unassign();
                };

                //consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: " + json);

                consumer.Subscribe(topic);

                //Console.WriteLine("Subscribed to:" + consumer.Subscription);

                while (!IsCancelled)
                {
                    consumer.Poll(TimeSpan.FromMilliseconds(100));
                }
            }
        }
        /// <summary>
        /// 消费
        /// </summary>
        /// <param name="func">可以消息进行相关处理</param>
        /// <param name="partition">消费的分区及偏移</param>
        public void Consume(Func<List<ConsumerResult>, bool> func = null, Dictionary<int, int> partition = null)
        {
            KafkaManuallyCommittedOffsets(new ConsumerSetting()
            {
                Broker = GetKafkaBroker(),
                Topic = GetTopicName(),
                GroupID = GetGroupID(),
                Func = func,
                Partition = partition
            });
        }
        #endregion

        #region 异步版本

        /// <summary>
        /// 指定的组别的消费者开始消费指定主题的消息
        /// </summary>
        /// <param name="broker">Kafka消息服务器的地址</param>
        /// <param name="topic">Kafka消息所属的主题</param>
        /// <param name="groupID">Kafka消费者所属的组别</param>
        /// <param name="func">可以对已经消费的消息进行相关处理</param>
        public void ConsumeAsync(string broker, string topic, string groupID, Func<List<ConsumerResult>, bool> func = null)
        {
            if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
            {
                throw new ArgumentNullException("Kafka消息服务器的地址不能为空!");
            }

            if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
            {
                throw new ArgumentNullException("消息所属的主题不能为空!");
            }

            if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
            {
                throw new ArgumentNullException("用户分组ID不能为空!");
            }

            ThreadPool.QueueUserWorkItem(KafkaAutoCommittedOffsets, new ConsumerSetting() { Broker = broker, Topic = topic, GroupID = groupID, Func = func });
        }

        #endregion

        #region 两种提交Offsets的版本

        /// <summary>
        /// Kafka消息队列服务器自动提交offset
        /// </summary>
        /// <param name="state">消息消费者信息</param>
        private void KafkaAutoCommittedOffsets(object state)
        {
            ConsumerSetting setting = state as ConsumerSetting;

            var config = new Dictionary<string, object>
                {
                    { "bootstrap.servers", setting.Broker },
                    { "group.id", setting.GroupID },
                    { "enable.auto.commit", true },  // this is the default
                    { "auto.commit.interval.ms", 5000 },
                    { "statistics.interval.ms", 60000 },
                    { "session.timeout.ms", 6000 },
                    { "auto.offset.reset", "smallest" }
                };

            using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
            {
                if (setting.Func != null)
                {
                    consumer.OnMessage += (_, message) =>
                    {
                        ConsumerResult messageResult = new ConsumerResult();
                        messageResult.Broker = setting.Broker;
                        messageResult.Topic = message.Topic;
                        messageResult.Partition = message.Partition;
                        messageResult.Offset = message.Offset.Value;
                        messageResult.Message = message.Value;

                        //执行外界自定义的方法
                        setting.Func(new List<ConsumerResult>() { messageResult });
                    };
                }

                //consumer.OnStatistics += (_, json)=> Console.WriteLine("Statistics: {json}");

                //可以写日志
                //consumer.OnError += (_, error)=> Console.WriteLine("Error:"+error);

                //可以写日志
                //consumer.OnConsumeError += (_, msg) => Console.WriteLine("Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");

                consumer.Subscribe(setting.Topic);

                while (!IsCancelled)
                {
                    consumer.Poll(TimeSpan.FromMilliseconds(100));
                }
            }
        }

        /// <summary>
        /// Kafka消息队列服务器手动提交offset
        /// </summary>
        /// <param name="state">消息消费者信息</param>
        private void KafkaManuallyCommittedOffsets(object state)
        {
            ConsumerSetting setting = state as ConsumerSetting;

            var config = new Dictionary<string, object>
            {
                { "bootstrap.servers", setting.Broker },
                { "group.id", setting.GroupID },
                { "enable.auto.commit", false },
                { "auto.commit.interval.ms", 5000 },
                { "statistics.interval.ms", 60000 },
                { "session.timeout.ms", 6000 },
                { "auto.offset.reset", "smallest" }
            };

            using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
            {
                //订阅
                if (setting.Partition == null)
                {
                    consumer.Subscribe(setting.Topic);
                }
                else
                {
                    List<TopicPartitionOffset> topics = new List<TopicPartitionOffset>();
                    foreach (KeyValuePair<int, int> item in setting.Partition)
                    {
                        topics.Add(new TopicPartitionOffset(setting.Topic, item.Key, item.Value)); //topics.Add(new TopicPartitionOffset(setting.Topic, 0, Offset.Beginning));
                    }
                    consumer.Assign(topics);
                }
                //可以写日志
                consumer.OnError += (_, error) => WriteLog("consumer", "Error:" + error);
                consumer.OnConsumeError += (_, error) => WriteLog("consumer", "Consume error:" + error);

                List<ConsumerResult> consumerResults = new List<ConsumerResult>();
                Message<Ignore, string> message = null;
                int msgCount = GetConsumerCount();
                while (!IsCancelled)
                {
                    if (!consumer.Consume(out message, TimeSpan.FromMilliseconds(100)))
                    {
                        continue;
                    }
                    //Console.WriteLine($"主题: {message.Topic} 分区: {message.Partition} 分区偏移量: {message.Offset} Value:{message.Value} count {msgCount}");
                    WriteLog("consumer", $"主题: {message.Topic} 分区: {message.Partition} 分区偏移量: {message.Offset} Value:{message.Value} count {msgCount}");
                    consumerResults.Add( new ConsumerResult
                    {
                        Broker = setting.Broker,
                        Topic = message.Topic,
                        Partition = message.Partition,
                        Offset = message.Offset.Value,
                        Message = message.Value
                    });
                    msgCount--;
                    if (msgCount == 0)
                    {
                        bool? result = setting.Func?.Invoke(consumerResults);
                        if (result ?? false)
                        {
                            var committedOffsets = consumer.CommitAsync(message).Result;
                            Console.WriteLine($"committed offsets:{committedOffsets}");
                            WriteLog("consumer", $"committed offsets:{committedOffsets}");
                            msgCount = GetConsumerCount();
                        }
                        else
                        {
                            WriteLog("consumer", "调用消费返回False");
                        }
                    }
                }

            }
        }

        #endregion

    }

4.对象

 /// <summary>
    /// 已经消费的消息的详情信息
    /// </summary>
    public sealed class ConsumerResult
    {
        /// <summary>
        /// Kafka消息服务器的地址
        /// </summary>
        public string Broker { get; set; }

        /// <summary>
        /// Kafka消息所属的主题
        /// </summary>
        public string Topic { get; set; }

        /// <summary>
        /// Kafka消息消费者分组主键
        /// </summary>
        public string GroupID { get; set; }

        /// <summary>
        /// 我们需要处理的消息具体的内容
        /// </summary>
        public string Message { get; set; }

        /// <summary>
        /// Kafka数据读取的当前位置
        /// </summary>
        public long Offset { get; set; }

        /// <summary>
        /// 消息所在的物理分区
        /// </summary>
        public int Partition { get; set; }
    }
 /// <summary>
    /// Kafka消息消费者设置对象,提供Kafka消费消息的参数对象(Consumer.Consum)
    /// </summary>
    public sealed class ConsumerSetting
    {
        /// <summary>
        /// Kafka消息服务器的地址
        /// </summary>
        public string Broker { get; set; }

        /// <summary>
        /// Kafka消息所属的主题
        /// </summary>
        public string Topic { get; set; }

        /// <summary>
        /// Kafka消息消费者分组主键
        /// </summary>
        public string GroupID { get; set; }

        /// <summary>
        /// 分区
        /// </summary>
        public Dictionary<int, int> Partition { get; set; } = null;

        /// <summary>
        /// 消息的详情信息
        /// </summary>
        public List<ConsumerResult> ConsumerResults { get; set; } = null;
        /// <summary>
        /// 消费后提交偏移
        /// </summary>
        public Func<List<ConsumerResult>, bool> Func { get; set; } = null;

    }
2016-05-01 21:43:51 sheismylife 阅读数 8332

读取Kafka集群的消息

1 先启动zookeeper

kafka安装包自带了zookeeper程序,很方便。 进入bin目录,运行下面的脚本

[dean@dell_xps_13 bin]$ ./zookeeper-server-start.sh ../config/zookeeper.properties 
[2016-04-29 16:03:41,484] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-04-29 16:03:41,484] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2016-04-29 16:03:41,498] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-04-29 16:03:41,499] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
[2016-04-29 16:03:41,503] INFO Server environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:host.name=localhost.localdomain (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:java.version=1.8.0_77 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:java.home=/usr/lib/jvm/java-8-openjdk/jre (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:java.class.path=:/home/dean/work/software/kafka_2.10-0.8.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../clients/build/libs//kafka-clients*.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../examples/build/libs//kafka-examples*.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../libs/jopt-simple-3.2.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../libs/kafka_2.10-0.8.1.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../libs/log4j-1.2.15.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../libs/metrics-annotation-2.2.0.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../libs/metrics-core-2.2.0.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../libs/scala-library-2.10.1.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../libs/slf4j-api-1.7.2.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../libs/snappy-java-1.0.5.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../libs/zkclient-0.3.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../libs/zookeeper-3.3.4.jar:/home/dean/work/software/kafka_2.10-0.8.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:os.version=4.5.1-1-ARCH (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:user.name=dean (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:user.home=/home/dean (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,503] INFO Server environment:user.dir=/home/dean/work/software/kafka_2.10-0.8.1/bin (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,507] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,507] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,507] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-04-29 16:03:41,520] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-04-29 16:03:41,535] INFO Snapshotting: 0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2016-04-29 16:10:44,072] INFO Accepted socket connection from /0:0:0:0:0:0:0:1:34216 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-04-29 16:10:44,111] INFO Client attempting to establish new session at /0:0:0:0:0:0:0:1:34216 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-04-29 16:10:44,113] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
[2016-04-29 16:10:44,118] INFO Established session 0x154610b75630000 with negotiated timeout 30000 for client /0:0:0:0:0:0:0:1:34216 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-04-29 16:24:37,145] INFO Processed session termination for sessionid: 0x154610b75630000 (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-04-29 16:24:37,151] INFO Closed socket connection for client /0:0:0:0:0:0:0:1:34216 which had sessionid 0x154610b75630000 (org.apache.zookeeper.server.NIOServerCnxn)

2 本地启动多个brokers

这里打算启动两个broker, 需要两个配置文件,进入config目录后复制出两个文件

cp server.properties server1.properties
cp server.properties server2.properties

两个配置文件里面分别修改3处: server1.properties

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9093

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

server2.properties

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9094

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

运行命令分别启动两个brokers

./kafka-server-start.sh ../config/server1.properties 
./kafka-server-start.sh ../config/server2.properties

3 producer产生多个消息,分布在一个topic的2个partition中

3.1 创建一个test2 topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test2
Created topic "test2".

3.2 查看下topic信息

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test2
Topic:test2     PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: test2    Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: test2    Partition: 1    Leader: 2       Replicas: 2,1   Isr: 2,1

3.3 写入消息

用producer脚本写入多个消息

./kafka-console-producer.sh --broker-list localhost:9093,localhost:9094 --topic test2
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
shanghai
beijing
hangzhou
jinan
yangzhou
shenzheng
hongkong
nanjing

4 consumer读取所有brokers的指定topic的所有消息

用consumer脚本读取

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test2 --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
shanghai
beijing
hangzhou
jinan
yangzhou
shenzheng
hongkong
nanjing

读取成功。

Created: 2016-05-01 Sun 21:40

Validate

C# 操作Kafka

阅读数 108