c# 连接kafka

2019-03-01 10:24:39 sinat_31465609 阅读数 4900

一:kafka介绍

kafka(官网地址:http://kafka.apache.org)是一种高吞吐量的分布式发布订阅的消息队列系统,具有高性能和高吞吐率。

1.1 术语介绍

  • Broker

Kafka集群包含一个或多个服务器,这种服务器被称为broker

  • Topic

主题:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

  • Partition

分区:Partition是物理上的概念,每个Topic包含一个或多个Partition.(一般为kafka节点数cpu的总核数)

  • Producer

生产者,负责发布消息到Kafka broker

  • Consumer

消费者:从Kafka broker读取消息的客户端。

  • Consumer Group

消费者组:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
1.2 基本特性
可扩展性
在不需要下线的情况下进行扩容
数据流分区(partition)存储在多个机器上
高性能
单个broker就能服务上千客户端
单个broker每秒种读/写可达每秒几百兆字节
多个brokers组成的集群将达到非常强的吞吐能力
性能稳定,无论数据多大
Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。
1.3 消息格式
一个topic对应一种消息格式,因此消息用topic分类
一个topic代表的消息有1个或者多个patition(s)组成
一个partition应该存放在一到多个server上,如果只有一个server,就没有冗余备份,是单机而不是集群;如果有多个server,一个server为leader(领导者),其他servers为followers(跟随者),leader需要接受读写请求,followers仅作冗余备份,leader出现故障,会自动选举一个follower作为leader,保证服务不中断;每个server都可能扮演一些partitions的leader和其它partitions的follower角色,这样整个集群就会达到负载均衡的效果
消息按顺序存放;消息顺序不可变;只能追加消息,不能插入;每个消息都有一个offset,用作消息ID, 在一个partition中唯一;offset有consumer保存和管理,因此读取顺序实际上是完全有consumer决定的,不一定是线性的;消息有超时日期,过期则删除
1.4 原理解析
producer创建一个topic时,可以指定该topic为几个partition(默认是1,配置num.partitions),然后会把partition分配到每个broker上,分配的算法是:a个broker,第b个partition分配到b%a的broker上,可以指定有每个partition有几分副本Replication,副本的分配策略为:第c个副本存储在第(b+c)%a的broker上。一个partition在每个broker上是一个文件夹,文件夹中文件的命名方式为:topic名称+有序序号。每个partition中文件是一个个的segment,segment file由.index和.log文件组成。两个文件的命名规则是,上一个segmentfile的最后一个offset。这样,可以快速的删除old文件。

producer往kafka里push数据,会自动的push到所有的分区上,消息是否push成功有几种情况:1,接收到partition的ack就算成功,2全部副本都写成功才算成功;数据可以存储多久,默认是两天;producer的数据会先存到缓存中,等大小或时间达到阈值时,flush到磁盘,consumer只能读到磁盘中的数据。

consumer从kafka里poll数据,poll到一定配置大小的数据放到内存中处理。每个group里的consumer共同消费全部的消息,不同group里的数据不能消费同样的数据,即每个group消费一组数据。

consumer的数量和partition的数量相等时消费的效率最高。这样,kafka可以横向的扩充broker数量和partitions;数据顺序写入磁盘;producer和consumer异步

二:环境搭建(windows)

2.1 安装zookeeper
kafka需要用到zookeeper,所以需要先安装zookeeper

到官网下载最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/
解压到指定路径
复制conf目录下zoo_sample.cfg,粘贴改名为zoo.cfg,修改zoo.cfg中的dataDir的值为E:/data/zookeeper,并添加一行dataLogDir=E:/log/zookeeper
修改系统环境变量,在Path后添加 ;E:\zookeeper\zookeeper-3.4.10\bin
运行cmd命令窗口,输入zkServer回车,启动
2.2 安装kafka
到官网下载最新版kafka,http://kafka.apache.org/downloads
解压到指定路径,如:E:\kafka_2.12-0.10.2.0
修改E:\kafka_2.12-0.10.2.0\config目录下的server.properties中 log.dirs的值为E:/log/kafka
添加系统环境变量,在Path后添加 ;E:\kafka_2.12-0.10.2.0\bin\windows
启动kafka,在cmd命令行用cd命令切换到kafka根目录E:\kafka_2.12-0.10.2.0,输入命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
出现started (kafka.server.KafkaServer)字样表示启动成功
运行cmd命令行,创建一个topic,命令如下:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
再打开一个cmd,创建一个Producer,命令如下:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
再打开一个cmd,创建一个Customer,命令如下:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
在Producer窗口下输入信息进行测试 ,每输入一行回车后消息马上就会出现在Customer中,表明kafka已经安装测试成功

三:基于.net的常用类库

基于.net实现kafka的消息队列应用,常用的类库有kafka-net,Confluent.Kafka,官网推荐使用Confluent.Kafka,本文也是基于该库的实现,使用版本预发行版1.0.0-beta,创建控制台应用程序。

四:应用–生产者

生产者将数据发布到指定的主题,一般生产环境下的负载均衡,服务代理会有多个,BootstrapServers属性则为以逗号隔开的多个代理地址

/// <summary>
/// 生产者
/// </summary>
public static void Produce()
{
     var config = new ProducerConfig { BootstrapServers = "localhost:9092" }
     Action<DeliveryReportResult<Null, string>> handler = r =>
     Console.WriteLine(!r.Error.IsError
         ? $"Delivered message to {r.TopicPartitionOffset}"
         : $"Delivery Error: {r.Error.Reason}");

    using (var producer = new Producer<Null, string>(config))
    {
        // 错误日志监视
        producer.OnError += (_, msg) => { Console.WriteLine($"Producer_Erro信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); };

        for (int i = 0; i < 5; i++)
        {
            // 异步发送消息到主题
            producer.BeginProduce("MyTopic", new Message<Null, string> { Value = i.ToString() }, handler);
        }   
        // 3后 Flush到磁盘
        producer.Flush(TimeSpan.FromSeconds(3));
    }
}

五:应用–消费者

消费者使用消费者组名称标记自己,并且发布到主题的每个记录被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在单独的进程中,也可以在不同的机器

如果所有消费者实例具有相同的消费者组,则记录将有效地在消费者实例上进行负载平衡。

如果所有消费者实例具有不同的消费者组,则每个记录将广播到所有消费者进程

上图为两个服务器Kafka群集,托管四个分区(P0-P3),包含两个消费者组。消费者组A有两个消费者实例,B组有四个消费者实例。

默认EnableAutoCommit 是自动提交,只要从队列取出消息,偏移量自动移到后一位,无论消息后续处理成功与否,该条消息都会消失,所以为免除处理失败的数据丢失,消费者方可设置该属性为false,后面进行手动commint()提交偏移

/// <summary>
  /// 消费者
  /// </summary>
  public static void Consumer()
  {
      var conf = new ConsumerConfig
      {
          GroupId = "test-consumer-group",
          BootstrapServers = "localhost:9092",
          AutoOffsetReset = AutoOffsetResetType.Earliest,
          EnableAutoCommit = false  // 设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失
      };
      using (var consumer = new Consumer<Ignore, string>(conf))
      {
          // 订阅topic
          consumer.Subscribe("MyTopic");
          // 错误日志监视 
          consumer.OnError += (_, msg) => { Console.WriteLine($"Consumer_Error信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); };

          while (true)
          {
              try
              {
                  var consume = consumer.Consume();
                  string receiveMsg = consume.Value;
                  Console.WriteLine($"Consumed message '{receiveMsg}' at: '{consume.TopicPartitionOffset}'.");
                  // 开始我的业务逻辑
                  ...
                  // 业务结束
                  if(成功)
                  {
                   	 consumer.Commit(new List<TopicPartitionOffset>() { consume.TopicPartitionOffset }); //手动提交偏移
                  }
              }
              catch (ConsumeException e)
              {
                  Console.WriteLine($"Consumer_Error occured: {e.Error.Reason}");
              }
          }
      }
  }

执行结果
在这里插入图片描述

常见数据问题处理

  1. 重复消费最常见的原因:re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session
    timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
    去重问题:消息可以使用唯一id标识
  2. 保证不丢失消息: 生产者(ack= -1 或 all 代表至少成功发送一次) 消费者
    (offset手动提交,业务逻辑成功处理后,提交offset)
  3. 保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据)
    业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)

Kafka 可视化调试

借助可视化客户端工具 kafka tool
具体使用可参考:https://www.cnblogs.com/frankdeng/p/9452982.html

2018-06-06 10:17:00 weixin_33790053 阅读数 227

最近跟数据部门对接时对方提供的kafka订阅服务,于是找了资料,写了个C#控制台程序消费了这个服务。

本文主要记录的内容是C#消费Kafka消息时选用kafka-net组件,遇到offset不是从0开始的情况时处理方法。

按照入门教程搭建测试环境并调试一切正常。

在生产环境中部署后遇到一直闪烁却无法消费的问题,看日志是可以连接成功的,后来跟生产方沟通后得知运维在部署kafka的时候设置了消息保留3天。

那么三天前的消息自动清除后,前面的offset在kafka里已经不存在了,这时候我们把offset设置成0时就会出现上述情况,那怎么获得起始的offset呢?

一、首先在本地安装jdk、Kafka和zookeeper。

1、jdk建议使用1.7以上版本。

2、下载安装并运行zookeeper (下载地址 http://zookeeper.apache.org/releases.html)

3、安装并运行Kafka (下载地址 http://kafka.apache.org/downloads.html

如上安装步骤在车江毅的博客《.net windows Kafka 安装与使用入门(入门笔记)》中写的很详细,我就不重复了。

二、在kafka中创建topic、生产者、消费者,上述博客中有详细命令,这里跳过。

三、根据上述博客中推荐,选用了kafka-net。源码地址: https://github.com/Jroland/kafka-net

1、生产者代码

private static void Produce(string broker, string topic,string message)
        {
            var options = new KafkaOptions(new Uri(broker));
            var router = new BrokerRouter(options);
            var client = new Producer(router);

            var currentDatetime = DateTime.Now;
            var key = currentDatetime.Second.ToString();
            var events = new[] { new Message(message) };
            client.SendMessageAsync(topic, events).Wait(1500);
            Console.WriteLine("Produced: Key: {0}. Message: {1}", key, events[0].Value.ToUtf8String());

            using (client) { }
        }

//调用示例:
Produce("http://地址:端口", "TopicName", "Hello World");

2、消费者代码,上面都是为了搭建测试环境,这次的任务是消费其他组提供的kafka消息

private static void Consume(string broker, string topic)
       {
            Uri[] UriArr = Array.ConvertAll<string, Uri>(broker.Split(','), delegate (string s) { return new Uri(s); });
            var options = new KafkaOptions(UriArr);
            var router = new BrokerRouter(options);
            var consumer = new Consumer(new ConsumerOptions(topic, router));

       //实际生产环境中消费不是每次都从0 offset开始的,查了资料发现java可以设置自动参数配置从最小offset开始或者最大offset开始
       //但kafka-net这个sdk中没有找到,所以就下面这段代码配置了offset的起始位置 Task<List<OffsetResponse>> offTask = consumer.GetTopicOffsetAsync(topic); var offsetResponseList = offTask.Result; var positionArr = new OffsetPosition[offsetResponseList.Count]; for (int i = 0; i < offsetResponseList.Count; i++) { //LogHelper.WriteLog(string.Format("获取到Kafka OffsetPosition信息 Partition:{0} maxOffset:{1} minOffset:{2}", offsetResponseList[i].PartitionId, offsetResponseList[i].Offsets[0], offsetResponseList[i].Offsets[1])); positionArr[i] = new OffsetPosition(offsetResponseList[i].PartitionId, offsetResponseList[i].Offsets[1]);//这里Offsets[0]是最大值,我们从头消费用的最小值Offsets[1] } Console.WriteLine("开始执行"); //如果作业执行一段时间后重启,下面可以从数据库中读取后再配置开始读取的位置 /*  try { using (IDbConnection dbConn = SingleDataBase.DBFactory.OpenRead<MessageQueue>()) { string sql = string.Format("SELECT [Partition], MAX(Offset)Offset FROM [表名] WITH(NOLOCK) WHERE topic='{0}' GROUP BY [Partition]", topic); var r = dbConn.SqlList<OffsetPositionEntity>(sql); if (r.Count > 0) { positionArr = Array.ConvertAll<OffsetPositionEntity, OffsetPosition>(r.ToArray(), delegate (OffsetPositionEntity p) { return new OffsetPosition(p.Partition, p.Offset + 1); }); } } } catch { //读取上次获取位置失败,从头开始获取,可能会导致重复获取 //LogHelper.WriteLog("没有获取到上次截至节点,将从头开始获取"); }        */ consumer.SetOffsetPosition(positionArr); //Consume returns a blocking IEnumerable (ie: never ending stream) foreach (var message in consumer.Consume()) { try { string messageStr = message.Value.ToUtf8String(); Console.WriteLine(messageStr); long r = 0; /*这里已经删掉了写入数据库部分*/ //LogHelper.WriteLog(string.Format("Response: Partition {0},Offset {1} : {2} ,Insert : {3}", message.Meta.PartitionId, message.Meta.Offset, messageStr, r)); } catch (Exception ex) { //LogHelper.WriteLog("消费Kafka消息保存到数据库时报错:" + ex.Message); } } } //调用示例: Consume("http://100.162.136.70:9092,http://100.162.136.71:9092,http://100.162.136.72:9092", "YourTopicName");

  以上代码中LogHelper.WriteLog部分用Console.WriteLine替换后可以直接输出到屏幕上,这里没有提供写日志相关方法。

 

  这篇文章写出来,就是为了记录高亮显示的那部分片段,其他都是为了让文章尽可能的完整。如果有朋友对kafka-net比较熟悉,有更方便的配置方式请留言分享,多谢~~~

 

转载于:https://www.cnblogs.com/erdeni/articles/kafkaNet-1.html

2019-02-27 22:17:19 qq_23009105 阅读数 6034

一、本文是C#引用Confluent.Kafka.dll实现kafka消息队列的实际开发例子。在实际开发中遇到9094端口始终消息生产和消费超时的问题,需要对网络白名单进行配置或者直接使用9092端口。大部分Time Out情况与代码无关,跟kafka的安装配置有关。

二、、Nuget中添加引用Confluent.Kafka

三、消费

public class KafkaConsumer
    {
        public static string brokerUrl = ConfigurationManager.AppSettings["Broker"];
        public static string topic = ConfigurationManager.AppSettings["ConsumeTopic"];
        public static string groupid = ConfigurationManager.AppSettings["GroupID"];
        public static string consumercount = ConfigurationManager.AppSettings["ConsumerCount"];
        public static void Consume()
        {
            var mode = "consume";
            var brokerList = brokerUrl;
            List<string> topics = new List<string>(topic.Split(','));

            try
            {
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) =>
                {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel();
                };

                switch (mode)
                {
                    case "consume":
                        Run_Consume(brokerList, topics, cts.Token);
                        break;
                    case "manual":
                        Run_ManualAssign(brokerList, topics, cts.Token);
                        break;
                    default:
                        PrintUsage();
                        break;
                }
            }
            catch (Exception ex)
            {
                LogHelper.WriteProgramLog(DateTime.Now.ToString() + ex.Message);
            }

        }
        /// <summary>
        ///     In this example
        ///         - offsets are manually committed.
        ///         - no extra thread is created for the Poll (Consume) loop.
        /// </summary>
        public static void Run_Consume(string brokerList, List<string> topics, CancellationToken cancellationToken)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = brokerList,
                GroupId = groupid,
                EnableAutoCommit = false,
                StatisticsIntervalMs = 5000,
                SessionTimeoutMs = 6000,
                AutoOffsetReset = AutoOffsetResetType.Earliest
            };

            const int commitPeriod = 5;

            using (var consumer = new Consumer<Ignore, string>(config))
            {
                consumer.Subscribe(topics);

                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(cancellationToken);
                        if (consumeResult.Offset % commitPeriod == 0)
                        {
                            // The Commit method sends a "commit offsets" request to the Kafka
                            // cluster and synchronously waits for the response. This is very
                            // slow compared to the rate at which the consumer is capable of
                            // consuming messages. A high performance application will typically
                            // commit offsets relatively infrequently and be designed handle
                            // duplicate messages in the event of failure.
                            var committedOffsets = consumer.Commit(consumeResult);
                            Console.WriteLine(string.Format("Committed offset: {0}", committedOffsets));
                        }
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine(string.Format("Consume error: {0}", e.Error));
                    }
                }

                consumer.Close();
            }
        }
        /// <summary>
        ///     In this example
        ///         - consumer group functionality (i.e. .Subscribe + offset commits) is not used.
        ///         - the consumer is manually assigned to a partition and always starts consumption
        ///           from a specific offset (0).
        /// </summary>
        public static void Run_ManualAssign(string brokerList, List<string> topics, CancellationToken cancellationToken)
        {
            var config = new ConsumerConfig
            {
                // the group.id property must be specified when creating a consumer, even 
                // if you do not intend to use any consumer group functionality.
                GroupId = new Guid().ToString(),
                BootstrapServers = brokerList,
                // partition offsets can be committed to a group even by consumers not
                // subscribed to the group. in this example, auto commit is disabled
                // to prevent this from occuring.
                EnableAutoCommit = true
            };

            using (var consumer = new Consumer<Ignore, string>(config))
            {
                consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList());

                consumer.OnError += (_, e)
                    => Console.WriteLine(string.Format("Error: {0}", e.Reason));

                consumer.OnPartitionEOF += (_, topicPartitionOffset)
                    => Console.WriteLine(string.Format("End of partition: {0}", topicPartitionOffset));

                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(cancellationToken);
                        Console.WriteLine(string.Format("Received message at {0}:${1}", consumeResult.TopicPartitionOffset, consumeResult.Message));
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine(string.Format("Consume error: {0}", e.Error));
                    }
                }

                consumer.Close();
            }
        }
        private static void PrintUsage()
        {
            Console.WriteLine("Usage: .. <poll|consume|manual> <broker,broker,..> <topic> [topic..]");
        }
    }

四、生产

/// <summary>
    /// Kafka消息生产者
    /// </summary>
    public class KafkaProducer
    {
        public static string brokerUrl = ConfigurationManager.AppSettings["Broker"];
        public static string topic = ConfigurationManager.AppSettings["ProduceTopic"];
        public static string groupid = ConfigurationManager.AppSettings["GroupID"];
        private static readonly object Locker = new object();
        private static Producer<string, string> _producer;
        /// <summary>
        /// 单例生产
        /// </summary>
        public KafkaProducer()
        {
            if (_producer == null)
            {
                lock (Locker)
                {
                    if (_producer == null)
                    {
                        var config = new ProducerConfig
                        {
                            BootstrapServers = brokerUrl
                        };
                        _producer = new Producer<string, string>(config);
                    }
                }
            }
        }

        /// <summary>
        /// 生产消息并发送消息
        /// </summary>
        /// <param name="key">key</param>
        /// <param name="message">需要传送的消息</param>
        public static void Produce(string key, string message)
        {
            bool result = false;
            new KafkaProducer();
            if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0)
            {
                throw new ArgumentNullException("消息内容不能为空!");
            }
            var deliveryReport = _producer.ProduceAsync(topic, new Message<string, string> { Key = key, Value = message });
            deliveryReport.ContinueWith(task =>
            {
                Console.WriteLine("Producer:" + _producer.Name + "\r\nTopic:" + topic + "\r\nPartition:" + task.Result.Partition + "\r\nOffset:" + task.Result.Offset + "\r\nMessage:" + task.Result.Value + "\r\nResult:" + result);
            });
            _producer.Flush(TimeSpan.FromSeconds(10));
        }
    }

 

2018-11-02 10:45:42 qq_34894585 阅读数 8969

下载nuget包
在这里插入图片描述

1.定义公共配置类

 public abstract class KafkaBase
    {
        /// <summary>
        /// 获取Kafka服务器地址
        /// </summary>
        /// <param name="brokerNameKey">配置文件中Broker服务器地址的key的名称</param>
        /// <returns>返回获取到的Kafka服务器的地址明细</returns>
        public string GetKafkaBroker(string brokerNameKey = "Broker")
        {
            string kafkaBroker = System.Configuration.ConfigurationManager.AppSettings[brokerNameKey];
            if (string.IsNullOrEmpty(kafkaBroker) || string.IsNullOrWhiteSpace(kafkaBroker) || kafkaBroker.Length <= 0)
            {
                throw new ArgumentNullException("Kafka消息服务器地址不能为空!");
            }
            return kafkaBroker;
        }

        /// <summary>
        /// 主题名称
        /// </summary>
        /// <param name="topicNameKey">配置文件中主题的key名称</param>
        /// <returns>返回获取到的主题的具体值</returns>
        public string GetTopicName(string topicNameKey = "Topic")
        {
            string topicName = System.Configuration.ConfigurationManager.AppSettings[topicNameKey]; 

            if (string.IsNullOrEmpty(topicName) || string.IsNullOrWhiteSpace(topicName) || topicName.Length <= 0)
            {
                throw new ArgumentNullException("消息所属的主题不能为空!");
            }
            return topicName;
        }
        /// <summary>
        /// 组
        /// </summary>
        /// <returns></returns>
        public string GetGroupID(string groupIDKey= "GroupID")
        {
            string groupID = System.Configuration.ConfigurationManager.AppSettings[groupIDKey]; 
            if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
            {
                throw new ArgumentNullException("消息所属的组不能为空!");
            }
            return groupID;
        }
        /// <summary>
        /// 分区
        /// </summary>
        /// <returns></returns>
        public List<int> GetPartition(string partitionsKey= "Partitions")
        {

            List<int> partitions = new List<int>();
            System.Configuration.ConfigurationManager.AppSettings[partitionsKey].Split(',').ToList().ForEach(x => 
            {
                partitions.Add(Convert.ToInt32(x));
            });
            return partitions;
        }
        /// <summary>
        /// 一次消费消息数量
        /// </summary>
        /// <returns></returns>
        public int GetConsumerCount(string consumerCountKey= "ConsumerCount")
        {
            int count = Convert.ToInt32(System.Configuration.ConfigurationManager.AppSettings[consumerCountKey]);
            return count;
        }

        /// <summary>
        ///  写日志
        /// </summary>
        /// <param name="type">consumer,producer</param>
        /// <param name="info"></param>
        /// <param name="args"></param>
        public static void WriteLog(string type,string info, params object[] args)
        {
            try
            {
                string filelog = string.Format(@"{0}\{1}\", AppDomain.CurrentDomain.BaseDirectory, "kafkaLog");
                if (!Directory.Exists(filelog))
                {
                    Directory.CreateDirectory(filelog);
                }
                filelog = string.Format(@"{0}\{1}\{2}{3}", AppDomain.CurrentDomain.BaseDirectory, "kafkaLog", type + "_" + DateTime.Now.ToString("yyyy-MM-dd"), ".log");
                using (StreamWriter sw = File.AppendText(filelog))
                {
                    sw.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss 信息:\r\n"));
                    sw.WriteLine(string.Format(info, args));
                    sw.WriteLine("\r\n");
                    sw.Flush();
                    sw.Close();
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }
    }

2.生产

 /// <summary>
    /// Kafka消息生产者
    /// </summary>
    public sealed class KafkaProducer : KafkaBase
    {
        
        private static readonly object Locker = new object();
        private static Producer<string, string> _producer;
        private static string _topic;
        private static List<int> _partition;
        /// <summary>
        /// 单例生产
        /// </summary>
        public KafkaProducer()
        {
            _topic = GetTopicName();      
            _partition = GetPartition();
            if (_producer == null)
            {
                lock (Locker)
                {
                    if (_producer == null)
                    {
                        
                        var config = new Dictionary<string, object>
                        {
                            { "bootstrap.servers", GetKafkaBroker() }         
                        };
                        _producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8));
                    }
                }
            }
        }
               
        /// <summary>
        /// 生产消息并发送消息
        /// </summary>
        /// <param name="key">key</param>
        /// <param name="message">需要传送的消息</param>
        public static void Produce(string key, string message)
        {        
            bool result = false;
            new KafkaProducer();
            int partition = _partition[new Random().Next(_partition.Count)];   
            if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0)
            {
                throw new ArgumentNullException("消息内容不能为空!");
            }  
            var deliveryReport = _producer.ProduceAsync(_topic, key, message, partition);
            deliveryReport.ContinueWith(task =>
            {
                if (task.Result.Error.Code == ErrorCode.NoError)
                {
                    result = true;
                }
                //Console.WriteLine("Producer:" + _producer.Name + "\r\nTopic:" + _topic + "\r\nPartition:" + task.Result.Partition + "\r\nOffset:" + task.Result.Offset + "\r\nMessage:" + task.Result.Value + "\r\nResult:" + result);
                WriteLog("producer", $"Topic:{ _topic } Partition:{ task.Result.Partition } Offset:{ task.Result.Offset } Message:{ task.Result.Value } result:{result}");
            });    
            _producer.Flush(TimeSpan.FromSeconds(10));  
            //return result;
        }
    }

3.消费

 /// <summary>
    /// Kafka消息消费者
    /// </summary>
    public sealed class KafkaConsumer : KafkaBase
    {

        #region 构造函数

        /// <summary>
        /// 构造函数,初始化IsCancelled属性
        /// </summary>
        public KafkaConsumer()
        {
            IsCancelled = false;
        }

        #endregion

        #region 属性

        /// <summary>
        /// 是否应该取消继续消费Kafka的消息,默认值是false,继续消费消息
        /// </summary>
        public bool IsCancelled { get; set; }

        #endregion

        #region 同步版本

        /// <summary>
        /// 指定的组别的消费者开始消费指定主题的消息
        /// </summary>
        /// <param name="broker">Kafka消息服务器的地址</param>
        /// <param name="topic">Kafka消息所属的主题</param>
        /// <param name="groupID">Kafka消费者所属的组别</param>
        /// <param name="action">可以对已经消费的消息进行相关处理</param>
        public void Consume(string broker, string topic, string groupID, Action<ConsumerResult> action = null)
        {
            if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
            {
                throw new ArgumentNullException("Kafka消息服务器的地址不能为空!");
            }

            if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
            {
                throw new ArgumentNullException("消息所属的主题不能为空!");
            }

            if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
            {
                throw new ArgumentNullException("用户分组ID不能为空!");
            }

            var config = new Dictionary<string, object>
                {
                    { "bootstrap.servers", broker },
                    { "group.id", groupID },
                    { "enable.auto.commit", true },  // this is the default
                    { "auto.commit.interval.ms", 5000 },
                    { "statistics.interval.ms", 60000 },
                    { "session.timeout.ms", 6000 },
                    { "auto.offset.reset", "smallest" }
                };


            using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
            {
                // Note: All event handlers are called on the main thread.
                //consumer.OnMessage += (_, message) => Console.WriteLine("Topic:" + message.Topic + " Partition:" + message.Partition + " Offset:" + message.Offset + " " + message.Value);
                //consumer.OnMessage += (_, message) => Console.WriteLine("Offset:【" + message.Offset + "】Message:【" + message.Value + "】");
                if (action != null)
                {
                    consumer.OnMessage += (_, message) => {
                        ConsumerResult messageResult = new ConsumerResult();
                        messageResult.Broker = broker;
                        messageResult.Topic = message.Topic;
                        messageResult.Partition = message.Partition;
                        messageResult.Offset = message.Offset.Value;
                        messageResult.Message = message.Value;

                        //执行外界自定义的方法
                        action(messageResult);
                    };
                }

                consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic " + end.Topic + " partition " + end.Partition + ", next message will be at offset " + end.Offset);

                consumer.OnError += (_, error) => Console.WriteLine("Error:" + error);

                //引发反序列化错误或消费消息出现错误!= NoError。
                consumer.OnConsumeError += (_, message) => Console.WriteLine("Error consuming from topic/partition/offset " + message.Topic + "/" + message.Partition + "/" + message.Offset + ": " + message.Error);

                consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets:" + commit.Error : "Successfully committed offsets:" + commit.Offsets);

                // 当消费者被分配一组新的分区时引发。
                consumer.OnPartitionsAssigned += (_, partitions) =>
                {
                    Console.WriteLine("Assigned Partitions:" + partitions + ", Member ID:" + consumer.MemberId);
                    //如果您未向OnPartitionsAssigned事件添加处理程序,则会自动执行以下.Assign调用。 如果你为它添加了事件处理程序,你必须明确地调用.Assign以便消费者开始消费消息。
                    consumer.Assign(partitions);
                };

                // Raised when the consumer's current assignment set has been revoked.
                //当消费者的当前任务集已被撤销时引发。
                consumer.OnPartitionsRevoked += (_, partitions) =>
                {
                    Console.WriteLine("Revoked Partitions:" + partitions);
                    // If you don't add a handler to the OnPartitionsRevoked event,the below .Unassign call happens automatically. If you do, you must call .Unassign explicitly in order for the consumer to stop consuming messages from it's previously assigned partitions.
                    //如果您未向OnPartitionsRevoked事件添加处理程序,则下面的.Unassign调用会自动发生。 如果你为它增加了事件处理程序,你必须明确地调用.Usessign以便消费者停止从它先前分配的分区中消费消息。
                    consumer.Unassign();
                };

                //consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: " + json);

                consumer.Subscribe(topic);

                //Console.WriteLine("Subscribed to:" + consumer.Subscription);

                while (!IsCancelled)
                {
                    consumer.Poll(TimeSpan.FromMilliseconds(100));
                }
            }
        }
        /// <summary>
        /// 消费
        /// </summary>
        /// <param name="func">可以消息进行相关处理</param>
        /// <param name="partition">消费的分区及偏移</param>
        public void Consume(Func<List<ConsumerResult>, bool> func = null, Dictionary<int, int> partition = null)
        {
            KafkaManuallyCommittedOffsets(new ConsumerSetting()
            {
                Broker = GetKafkaBroker(),
                Topic = GetTopicName(),
                GroupID = GetGroupID(),
                Func = func,
                Partition = partition
            });
        }
        #endregion

        #region 异步版本

        /// <summary>
        /// 指定的组别的消费者开始消费指定主题的消息
        /// </summary>
        /// <param name="broker">Kafka消息服务器的地址</param>
        /// <param name="topic">Kafka消息所属的主题</param>
        /// <param name="groupID">Kafka消费者所属的组别</param>
        /// <param name="func">可以对已经消费的消息进行相关处理</param>
        public void ConsumeAsync(string broker, string topic, string groupID, Func<List<ConsumerResult>, bool> func = null)
        {
            if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
            {
                throw new ArgumentNullException("Kafka消息服务器的地址不能为空!");
            }

            if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
            {
                throw new ArgumentNullException("消息所属的主题不能为空!");
            }

            if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
            {
                throw new ArgumentNullException("用户分组ID不能为空!");
            }

            ThreadPool.QueueUserWorkItem(KafkaAutoCommittedOffsets, new ConsumerSetting() { Broker = broker, Topic = topic, GroupID = groupID, Func = func });
        }

        #endregion

        #region 两种提交Offsets的版本

        /// <summary>
        /// Kafka消息队列服务器自动提交offset
        /// </summary>
        /// <param name="state">消息消费者信息</param>
        private void KafkaAutoCommittedOffsets(object state)
        {
            ConsumerSetting setting = state as ConsumerSetting;

            var config = new Dictionary<string, object>
                {
                    { "bootstrap.servers", setting.Broker },
                    { "group.id", setting.GroupID },
                    { "enable.auto.commit", true },  // this is the default
                    { "auto.commit.interval.ms", 5000 },
                    { "statistics.interval.ms", 60000 },
                    { "session.timeout.ms", 6000 },
                    { "auto.offset.reset", "smallest" }
                };

            using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
            {
                if (setting.Func != null)
                {
                    consumer.OnMessage += (_, message) =>
                    {
                        ConsumerResult messageResult = new ConsumerResult();
                        messageResult.Broker = setting.Broker;
                        messageResult.Topic = message.Topic;
                        messageResult.Partition = message.Partition;
                        messageResult.Offset = message.Offset.Value;
                        messageResult.Message = message.Value;

                        //执行外界自定义的方法
                        setting.Func(new List<ConsumerResult>() { messageResult });
                    };
                }

                //consumer.OnStatistics += (_, json)=> Console.WriteLine("Statistics: {json}");

                //可以写日志
                //consumer.OnError += (_, error)=> Console.WriteLine("Error:"+error);

                //可以写日志
                //consumer.OnConsumeError += (_, msg) => Console.WriteLine("Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");

                consumer.Subscribe(setting.Topic);

                while (!IsCancelled)
                {
                    consumer.Poll(TimeSpan.FromMilliseconds(100));
                }
            }
        }

        /// <summary>
        /// Kafka消息队列服务器手动提交offset
        /// </summary>
        /// <param name="state">消息消费者信息</param>
        private void KafkaManuallyCommittedOffsets(object state)
        {
            ConsumerSetting setting = state as ConsumerSetting;

            var config = new Dictionary<string, object>
            {
                { "bootstrap.servers", setting.Broker },
                { "group.id", setting.GroupID },
                { "enable.auto.commit", false },
                { "auto.commit.interval.ms", 5000 },
                { "statistics.interval.ms", 60000 },
                { "session.timeout.ms", 6000 },
                { "auto.offset.reset", "smallest" }
            };

            using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
            {
                //订阅
                if (setting.Partition == null)
                {
                    consumer.Subscribe(setting.Topic);
                }
                else
                {
                    List<TopicPartitionOffset> topics = new List<TopicPartitionOffset>();
                    foreach (KeyValuePair<int, int> item in setting.Partition)
                    {
                        topics.Add(new TopicPartitionOffset(setting.Topic, item.Key, item.Value)); //topics.Add(new TopicPartitionOffset(setting.Topic, 0, Offset.Beginning));
                    }
                    consumer.Assign(topics);
                }
                //可以写日志
                consumer.OnError += (_, error) => WriteLog("consumer", "Error:" + error);
                consumer.OnConsumeError += (_, error) => WriteLog("consumer", "Consume error:" + error);

                List<ConsumerResult> consumerResults = new List<ConsumerResult>();
                Message<Ignore, string> message = null;
                int msgCount = GetConsumerCount();
                while (!IsCancelled)
                {
                    if (!consumer.Consume(out message, TimeSpan.FromMilliseconds(100)))
                    {
                        continue;
                    }
                    //Console.WriteLine($"主题: {message.Topic} 分区: {message.Partition} 分区偏移量: {message.Offset} Value:{message.Value} count {msgCount}");
                    WriteLog("consumer", $"主题: {message.Topic} 分区: {message.Partition} 分区偏移量: {message.Offset} Value:{message.Value} count {msgCount}");
                    consumerResults.Add( new ConsumerResult
                    {
                        Broker = setting.Broker,
                        Topic = message.Topic,
                        Partition = message.Partition,
                        Offset = message.Offset.Value,
                        Message = message.Value
                    });
                    msgCount--;
                    if (msgCount == 0)
                    {
                        bool? result = setting.Func?.Invoke(consumerResults);
                        if (result ?? false)
                        {
                            var committedOffsets = consumer.CommitAsync(message).Result;
                            Console.WriteLine($"committed offsets:{committedOffsets}");
                            WriteLog("consumer", $"committed offsets:{committedOffsets}");
                            msgCount = GetConsumerCount();
                        }
                        else
                        {
                            WriteLog("consumer", "调用消费返回False");
                        }
                    }
                }

            }
        }

        #endregion

    }

4.对象

 /// <summary>
    /// 已经消费的消息的详情信息
    /// </summary>
    public sealed class ConsumerResult
    {
        /// <summary>
        /// Kafka消息服务器的地址
        /// </summary>
        public string Broker { get; set; }

        /// <summary>
        /// Kafka消息所属的主题
        /// </summary>
        public string Topic { get; set; }

        /// <summary>
        /// Kafka消息消费者分组主键
        /// </summary>
        public string GroupID { get; set; }

        /// <summary>
        /// 我们需要处理的消息具体的内容
        /// </summary>
        public string Message { get; set; }

        /// <summary>
        /// Kafka数据读取的当前位置
        /// </summary>
        public long Offset { get; set; }

        /// <summary>
        /// 消息所在的物理分区
        /// </summary>
        public int Partition { get; set; }
    }
 /// <summary>
    /// Kafka消息消费者设置对象,提供Kafka消费消息的参数对象(Consumer.Consum)
    /// </summary>
    public sealed class ConsumerSetting
    {
        /// <summary>
        /// Kafka消息服务器的地址
        /// </summary>
        public string Broker { get; set; }

        /// <summary>
        /// Kafka消息所属的主题
        /// </summary>
        public string Topic { get; set; }

        /// <summary>
        /// Kafka消息消费者分组主键
        /// </summary>
        public string GroupID { get; set; }

        /// <summary>
        /// 分区
        /// </summary>
        public Dictionary<int, int> Partition { get; set; } = null;

        /// <summary>
        /// 消息的详情信息
        /// </summary>
        public List<ConsumerResult> ConsumerResults { get; set; } = null;
        /// <summary>
        /// 消费后提交偏移
        /// </summary>
        public Func<List<ConsumerResult>, bool> Func { get; set; } = null;

    }
2020-03-27 22:55:24 mnicsm 阅读数 108

1.C# 连接Kafka知识分享


前些天公司的Boss突然下达一个命令,消息中间件要用Kafka,既然领导都决定了用就用呗。那就网上百度一下去Kafka如何安装啊,Kafka用代码如何连接操作。在安装和使用过过程中遇到了一些坎坷的事情,最总还是解决了。

我所在部门使用C#编程语言,所以连接Kafka用C#语言去实现,可能朋友们会说那不是很简单吗?百度一下网上一大堆。百度是一大堆但未必是你想要的,网上找了好多篇都是基于Java语言编写的,C#的也有,但是没Java资料丰富。

https://gitee.com/autumn_2/MQExtend.Core.git 基于MQ提供的Sdk。二次封装后支持对ActiveMQ、Kafak相关操作(本人也是一个小白,写的东西也是半桶水。但希望对大家有帮助)

2.Confluent kafka

在选择Kafka类库之前看了https://blog.csdn.net/xinlingjun2007/article/details/80295332 这篇博客,所以就选择了Confluent kafka 类库了

2.1 消息发送

kafka 中的auto.create.topics.enable默认为false的,所以在发送消息给Topics之前,确保Topics在Kafka里要存在。这个需要注意一下。

public void PushMessage()
{
	var config = new ProducerConfig()
	{
		BootstrapServers = "localhost:9092",
		Acks = Acks.Leader
	};
	//message<key,value> 这个key目前没用,做消息指定分区投放有用的;我们直接用null
	using(var producer = new ProducerBuilder<Null, string>(config).Build())
	{
		producer.Produce("TopicName", new Message<Null, string>()
		{
			Value = "需要发送的消息内容"
		}, (result) =>
		{
			WriteLog(!result.Error.IsError ? $"Delivered message to {result.TopicPartitionOffset}" : $"Delivery Error: {result.Error.Reason}");
		});
		Console.WriteLine("消息发送成功");
	}
}

2.2 消息消费

session.timeout.ms

如果consumer在这段时间内没有发送心跳信息,则它会被认为挂掉了。默认3秒。

auto.offset.reset

消费者在读取一个没有偏移量的分区或者偏移量无效的情况下,如何处理。默认值是latest。

earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

none:各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

enable .auto.commit

默认值true,表明消费者是否自动提交偏移。为了尽量避免重复数据和数据丢失,可以改为false,自行控制何时提交

/// <summary>
/// 消息订阅
/// </summary>
/// <param name="subscribe"></param>
public void Subscribe(string queueName, Action<IMessageContent> action)
{
	var config = new ConsumerConfig()
	{
		BootstrapServers = "",
		GroupId = "gms_20200327_group",
		AutoOffsetReset =  AutoOffsetReset.Earliest,
		EnableAutoCommit = false
	};
	//如果Kafka配置了安全认证(我这里只是写案例,本地配置了sasl安全认证)就加这块代码
	if (!string.IsNullOrEmpty(this.UserName) && !string.IsNullOrEmpty(this.Password))
	{
		config.SecurityProtocol = SecurityProtocol.SaslPlaintext;
		config.SaslMechanism = SaslMechanism.Plain;
		config.SaslUsername = this.UserName;
		config.SaslPassword = this.Password;
	}

	using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
	{
		//订阅topicName
		consumer.Subscribe(queueName);

		CancellationTokenSource cts = new CancellationTokenSource();
		Console.CancelKeyPress += (sender, e) =>
		{
			//prevent the process from terminating.
			e.Cancel = true;
			cts.Cancel();
		};

		//是否消费成功
		bool isOK = false;
		//result
		ConsumeResult<Ignore, string> consumeResult = null;
		try
		{
			while (true)
			{
				isOK = false;
				try
				{
					//consumer.Assign(new TopicPartitionOffset(queueName, 0, Offset.Beginning));
					consumeResult = consumer.Consume(cts.Token);
					if (consumeResult.IsPartitionEOF)
					{
						WriteLog($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
						continue;
					}
					//接收到的消息记录Log
					WriteLog($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");
					//消息消费
					action?.Invoke(new KafkaMessageContent(consumeResult.Value, consumeResult.Key?.ToString()));
					//消费成功
					isOK = true;
					//提交方法向Kafka集群发送一个“提交偏移量”请求,并同步等待响应。
					//与消费者能够消费消息的速度相比,这是非常慢的。
					//一个高性能的应用程序通常会相对不频繁地提交偏移量,并且在失败的情况下被设计来处理重复的消息
					consumer.Commit(consumeResult);
					//消费成功Log记录
					WriteLog($"Consumed message '{consumeResult.Value}' at: '{consumeResult.TopicPartitionOffset}'.");
				}
				catch (ConsumeException e)
				{
					isOK = false;
					WriteError($"Error occured: {e.Error.Reason}");
				}
				catch (Exception ex)
				{
					isOK = false;
					WriteError($"Error occured: {ex.StackTrace}");
				}

				//消费失败后置处理
				if (!isOK && consumeResult != null)
				{
					//消费失败代码逻辑处理
					ErrorHandler(consumer, consumeResult);
				}
			}
		}
		catch (OperationCanceledException e)
		{
			WriteException(e);
			// Ensure the consumer leaves the group cleanly and final offsets are committed.
			consumer.Close();
		}
	}
}

注意:这里有一点点需要注意下consumer.Commit(consumeResult)。我先列举一个例子如果一个分区里面有10条消息

1

2

3(消费失败)

4

5

6

7

8

9

10

 

  1. 如果3这条消息消费失败,那么就会被catch捕获,代码没执行到consumer.Commit() 提交偏移量这段代码。
  2. 因为异常被catch了,所以消费者继续poll消息,获取到4这条消息。4这条消息成功了,就会执行consumer.Commit() 提交偏移量。(注意:这里就提交了最新的偏移量了),换句话说如果3这条消费失败,不去做一些额外的处理3这条消息就消费不到了(注意:不人工干预是消费不到的)。
  3. 我这边处理逻辑是将消费失败的消息将其转发到DLQ队列,就是创建一个Topics的同时在创建一个DLQ.Topics。这个DLQ.Topics专门用来存放 Topics消费失败的消息。处理代码如下:
/// <summary>
/// 消费异常处理
/// </summary>
/// <param name="consumer">消费者</param>
/// <param name="consumeResult">消息</param>
private void ErrorHandler(IConsumer<Ignore, string> consumer, ConsumeResult<Ignore, string> consumeResult)
{
	if (consumeResult != null && consumer != null)
	{
		string queueName = consumeResult.Topic;
		WriteLog($"Consumed '{queueName}' message fail '{consumeResult.Value}' at: '{consumeResult.TopicPartitionOffset}'.");
		//消费失败,并且需要不需要转发到DLQ队列中,所以我们这里需要把(Offset-1)
		if (!this.SubscribeConfig.TransformToDLQ || queueName.StartsWith("DLQ.", StringComparison.OrdinalIgnoreCase))
		{
			//偏移量往回拉一位,尝试6次操作。如果执行失败,确保消息不遗漏直接停止消费。
			OffsetBack(consumer, consumeResult);
			return;
		}

		//需要转发到DLQ队列中
		string transformTopics = "DLQ." + queueName;
		WriteLog($"消息开始转发到{transformTopics}队列");
		KafkaProducerConfig config = new KafkaProducerConfig(ServerConfig.BrokerUri,
			ServerConfig.UserName, ServerConfig.Password, transformTopics);
		try
		{
			//将消息转发到死信队列
			using (IProducerChannel producer = new KafkaProducer(config))
			{
				producer.Producer(consumeResult.Value?.ToString());
			}
			//提交偏移量
			consumer.Commit(consumeResult);
			WriteLog($"消息转发到{transformTopics}队列成功");
		}
		catch (Exception ex)
		{
			WriteError($"消息转发到{transformTopics}队列失败。Error occured: {ex.StackTrace}");
			//偏移量往回拉一位,尝试6次操作。如果执行失败,确保消息不遗漏直接停止消费。
			OffsetBack(consumer, consumeResult);
		}
	}
}

/// <summary>
/// 把Offset偏移量往回拉一位
/// </summary>
/// <param name="consumer"></param>
/// <param name="consumeResult"></param>
/// <param name="tryTimes">默认执行6次</param>
private void OffsetBack(IConsumer<Ignore, string> consumer, ConsumeResult<Ignore, string> consumeResult, int tryTimes = 6)
{
	int count = tryTimes;
	string queueName = consumeResult.Topic;
	while (count > 0)
	{
		WriteLog($"消息消费失败,执行偏移量Offset-1操作");
		try
		{
			//消费失败,重置一下最新偏移量
			consumer.Assign(new TopicPartitionOffset(queueName, consumeResult.Partition, consumeResult.Offset));
			WriteLog($"偏移量重置成功{consumeResult.Offset}");
			count--;
			return;
		}
		catch (Exception ex)
		{
			WriteLog($"消息消费失败,执行偏移量Offset-1操作失败。Error occured: {ex.StackTrace}");
			//尝试重置偏移量次数到了最大次数,直接抛出异常。停止消费
			if (count == 0)
			{
				WriteError($"消息消费失败,执行偏移量Offset-1操作失败次数已达到${tryTimes},消费者停止消费");
				//抛出这个异常,会引发Subscribe()到catch代码块。catch会停止消费
				throw new OperationCanceledException($"消息消费失败,执行偏移量Offset-1操作失败次数已达到${tryTimes},消费者停止消费");
			}
			//停止3s在重新重置偏移量
			Thread.Sleep(3000);
		}
	}
}
  • 以上的消费异常处理只是本人的观点,可能有更好的处理方案,输入在下方一起交交流。

 

.net 连接kafka

阅读数 92