c# mysql 中间件
2017-07-09 23:05:45 Denghejing 阅读数 1621
  • 任务调度
    • Quartz.NET:Quartz.NET是一个开源的作业调度框架,非常适合在平时的工作中,定时轮询数据库同步,定时邮件通知,定时处理数据等。 Quartz.NET允 许开发人员根据时间间隔(或天)来调度作业。它实现了作业和触发器的多对多关系,还能把多个作业与不同的触发器关联。整合了 Quartz.NET的应用程序可以重用来自不同事件的作业,还可以为一个事件组合多个作业。
  • 日志记录
    • NLog
    • Log4Net
  • IOC
    • Ninject:轻量级ioc
    • Castle:
    • Autofac:
  • ORM:
    • Dapper:轻量级orm,Stackoverflow在使用(Dapper-Extensions),性能优越
    • Entity Framework:微软自家的ORM框架
    • NHibernate:Hibernate的.net实现
    • Mybatis.net:mybatis的.net实现
    • ServiceStack.OrmLite:
  • WPF
    • MvvmLight:轻量级mvvm框架
    • Prism:官方mvvm框架
  • 通讯
    • Akka.net:基于actor模型的高性能并发处理框架
    • Helios:(不再更新)
    • Thrift:一种可伸缩的跨语言(c#、C++、Java...)通讯服务框架
    • Supersocket:国产优秀通讯框架,
    • Fastsocket:另一个国产socket通讯框架
    • DotNetty:开源,netty的.net实现。Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持
  • 总线
    • NServiceBus:需要序列,不免费。NServiceBus 是一个用于构建企业级 .NET系统的开源通讯框架。它在消息发布/订阅支持、工作流集成和高度可扩展性等方面表现优异,因此是很多分布式系统基础平台的理想选择。,它能够帮助 开发人员在搭建企业.NET系统时避免很多典型的常见问题。同时,该框架也提供了一些可伸缩的关键特征,比如对发布/订阅的支持、集成的长时间工作流及深 入的扩展能力等。据作者说,其本意是为构建分布式应用软件创建一个理想的基础设施。
    • MassTransit:开源。Mass Transit是一个.NET平台上的用于构建松耦合应用程序的服务总线框架,这个服务总线支持YAGNI原则(YAGNI原则,就是通过重构提取公因式 当出现一次时,不分层,以后业务复杂了,马上抽象出一个层次来,分层是依赖倒置原则和模版方法模式的应用。)。通过一套严密的关注点,Mass Transit和应用程序之间的接触最小化和清晰的接口.
    • ESB.NET:开源的企业级服务总线,采用的协议是MS-PL。ESB.NET主要包含了MSMQ消息队列机制,SOAP消息收发,ROUTER服务路由,WCF,WSE消息扩展(消息加解密,压缩),还有WF工作流
    • Shuttle:开源。它为开发面向消息的事件驱动架构(EDA[1])系统提供了一种新方法
  • MQ
    • RabbitMQ:基于Erlang开发,是AMQP(高级消息队列协议)的标准实现,性能和可靠性非常不错,支持序列化和集群。
    • Redis.List:基于Redis的List实现(LPUSH,RPOP)
2017-08-01 15:10:00 weixin_33816821 阅读数 6

一、简介

EQueue是一个参照RocketMQ实现的开源消息队列中间件,兼容Mono,具体可以参看作者的文章《分享一个c#写的开源分布式消息队列equeue》。项目开源地址:https://github.com/tangxuehua/equeue,项目中包含了队列的全部源代码以及如何使用的示例。

二、安装EQueue

Producer、Consumer、Broker支持分布式部署,安装EQueue需要.NET 4, Visual Studio 2010/2012/2013. 目前EQueue是个类库,需要自己实现Broker的宿主,可以参照QuickStart,创建一个QuickStart.BrokerServer项目,通过Visual Studio的Nuget 查找equeue

using System;
using System.Text;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.JsonNet;
using ECommon.Log4Net;
using EQueue.Broker;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.BrokerServer
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();
            var setting = new BrokerSetting();
            setting.NotifyWhenMessageArrived = false;
            setting.DeleteMessageInterval = 1000;
            new BrokerController(setting).Initialize().Start();
            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }
}

InitializeEQueue方法初始化EQueue的环境,使用了Autofac作为IOC容器,使用log4Net记录日志, 我们看一下RegisterEQueueComponents方法:

public static class ConfigurationExtensions

{
    public static Configuration RegisterEQueueComponents(this Configuration configuration)
    {
        configuration.SetDefault<IAllocateMessageQueueStrategy, AverageAllocateMessageQueueStrategy>();
        configuration.SetDefault<IQueueSelector, QueueHashSelector>();
        configuration.SetDefault<ILocalOffsetStore, DefaultLocalOffsetStore>();
        configuration.SetDefault<IMessageStore, InMemoryMessageStore>();
        configuration.SetDefault<IMessageService, MessageService>();
        configuration.SetDefault<IOffsetManager, InMemoryOffsetManager>();
        return configuration;
    }
})

代码中涉及到6个组件:

IAllocateMessageQueueStrategy
IQueueSelector
ILocalOffsetStore
IMessageStore
IMessageService
IOffsetManager
DeleteMessageInterval 这个属性是用来设置equeue的定时删除间隔,单位为毫秒,默认值是一个小时。另外还有ProducerSocketSetting 和 ConsumerSocketSetting 分别用于设置Producer连接Broker和Consumer连接Broker的IP和端口,默认端口是5000和5001。

public class BrokerSetting
    {
        public SocketSetting ProducerSocketSetting { get; set; }
        public SocketSetting ConsumerSocketSetting { get; set; }
        public bool NotifyWhenMessageArrived { get; set; }
        public int DeleteMessageInterval { get; set; }

        public BrokerSetting()
        {
            ProducerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5000, Backlog = 5000 };
            ConsumerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5001, Backlog = 5000 };
            NotifyWhenMessageArrived = true;
            DeleteMessageInterval = 1000 * 60 * 60;
        }
    }

运行项目,如果显示下面类似内容,说明Broker启动成功:


232039500849209.png

三、在Visual Studio中开发测试

1.创建一个VS项目 QuickStart.ProducerClient,通过Nuget引用EQueue,编写下面Producer代码

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Clients.Producers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ProducerClient
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();

            var scheduleService = ObjectContainer.Resolve<IScheduleService>();
            var producer = new Producer().Start();
            var total = 1000;
            var parallelCount = 10;
            var finished = 0;
            var messageIndex = 0;
            var watch = Stopwatch.StartNew();

            var action = new Action(() =>
            {
                for (var index = 1; index <= total; index++)
                {
                    var message = "message" + Interlocked.Increment(ref messageIndex);
                    producer.SendAsync(new Message("SampleTopic", Encoding.UTF8.GetBytes(message)), index.ToString()).ContinueWith(sendTask =>
                    {
                        var finishedCount = Interlocked.Increment(ref finished);
                        if (finishedCount % 1000 == 0)
                        {
                            Console.WriteLine(string.Format("Sent {0} messages, time spent:{1}", finishedCount, watch.ElapsedMilliseconds));
                        }
                    });
                }
            });

            var actions = new List<Action>();
            for (var index = 0; index < parallelCount; index++)
            {
                actions.Add(action);
            }

            Parallel.Invoke(actions.ToArray());

            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }
}
Producer对象在使用之前必须要调用Start初始化,初始化一次即可, 注意:切记不可以在每次发送消息时,都调用Start方法。Producer 默认连接本机的5000端口,可以通过ProducerSetting 进行设置,可以参看下面的代码:

 public class ProducerSetting
    {
        public string BrokerAddress { get; set; }
        public int BrokerPort { get; set; }
        public int SendMessageTimeoutMilliseconds { get; set; }
        public int UpdateTopicQueueCountInterval { get; set; }

        public ProducerSetting()
        {
            BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
            BrokerPort = 5000;
            SendMessageTimeoutMilliseconds = 1000 * 10;
            UpdateTopicQueueCountInterval = 1000 * 5;
        }
2、创建一个VS项目 QuickStart.ConsumerClient,通过Nuget引用EQueue,编写下面Consumer代码

using System;
using System.Linq;
using System.Text;
using System.Threading;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Broker;
using EQueue.Clients.Consumers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ConsumerClient
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();

            var messageHandler = new MessageHandler();
            var consumer1 = new Consumer("Consumer1", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer2 = new Consumer("Consumer2", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer3 = new Consumer("Consumer3", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer4 = new Consumer("Consumer4", "group1").Subscribe("SampleTopic").Start(messageHandler);

            Console.WriteLine("Start consumer load balance, please wait for a moment.");
            var scheduleService = ObjectContainer.Resolve<IScheduleService>();
            var waitHandle = new ManualResetEvent(false);
            var taskId = scheduleService.ScheduleTask(() =>
            {
                var c1AllocatedQueueIds = consumer1.GetCurrentQueues().Select(x => x.QueueId);
                var c2AllocatedQueueIds = consumer2.GetCurrentQueues().Select(x => x.QueueId);
                var c3AllocatedQueueIds = consumer3.GetCurrentQueues().Select(x => x.QueueId);
                var c4AllocatedQueueIds = consumer4.GetCurrentQueues().Select(x => x.QueueId);
                if (c1AllocatedQueueIds.Count() == 1 && c2AllocatedQueueIds.Count() == 1 && c3AllocatedQueueIds.Count() == 1 && c4AllocatedQueueIds.Count() == 1)
                {
                    Console.WriteLine(string.Format("Consumer load balance finished. Queue allocation result: c1:{0}, c2:{1}, c3:{2}, c4:{3}",
                        string.Join(",", c1AllocatedQueueIds),
                        string.Join(",", c2AllocatedQueueIds),
                        string.Join(",", c3AllocatedQueueIds),
                        string.Join(",", c4AllocatedQueueIds)));
                    waitHandle.Set();
                }
            }, 1000, 1000);

            waitHandle.WaitOne();
            scheduleService.ShutdownTask(taskId);

            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }

    class MessageHandler : IMessageHandler
    {
        private int _handledCount;

        public void Handle(QueueMessage message, IMessageContext context)
        {
            var count = Interlocked.Increment(ref _handledCount);
            if (count % 1000 == 0)
            {
                Console.WriteLine("Total handled {0} messages.", count);
            }
            context.OnMessageHandled(message);
        }
    }
}

使用方式给用户感觉是消息从EQueue服务器推到了应用客户端。 但是实际Consumer内部是使用长轮询Pull方式从EQueue服务器拉消息,然后再回调用户Listener方法。Consumer默认连接本机的5001端口,可以通过ConsumerSetting 进行设置,可以参看下面的代码:

public class ConsumerSetting
    {
        public string BrokerAddress { get; set; }
        public int BrokerPort { get; set; }
        public int RebalanceInterval { get; set; }
        public int UpdateTopicQueueCountInterval { get; set; }
        public int HeartbeatBrokerInterval { get; set; }
        public int PersistConsumerOffsetInterval { get; set; }
        public PullRequestSetting PullRequestSetting { get; set; }
        public MessageModel MessageModel { get; set; }
        public MessageHandleMode MessageHandleMode { get; set; }

        public ConsumerSetting()
        {
            BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
            BrokerPort = 5001;
            RebalanceInterval = 1000 * 5;
            HeartbeatBrokerInterval = 1000 * 5;
            UpdateTopicQueueCountInterval = 1000 * 5;
            PersistConsumerOffsetInterval = 1000 * 5;
            PullRequestSetting = new PullRequestSetting();
            MessageModel = MessageModel.Clustering;
            MessageHandleMode = MessageHandleMode.Parallel;
        }

EQueue兼容Linux/Mono,下面是CentOS 6.4/Mono 3.2.3 环境下的运行结果:


232054360069922.png<br>

本文来自云栖社区合作伙伴“doNET跨平台”,了解相关信息可以关注“opendotnet”微信公众号

2014-03-23 20:40:00 weixin_34384557 阅读数 19

一、简介

EQueue是一个参照RocketMQ实现的开源消息队列中间件,兼容Mono,具体可以参看作者的文章《分享一个c#写的开源分布式消息队列equeue》。项目开源地址:https://github.com/tangxuehua/equeue,项目中包含了队列的全部源代码以及如何使用的示例。

二、安装EQueue

Producer、Consumer、Broker支持分布式部署,安装EQueue需要.NET 4, Visual Studio 2010/2012/2013. 目前EQueue是个类库,需要自己实现Broker的宿主,可以参照QuickStart,创建一个QuickStart.BrokerServer项目,通过Visual Studio的Nuget 查找equeue

image

using System;
using System.Text;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.JsonNet;
using ECommon.Log4Net;
using EQueue.Broker;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.BrokerServer
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();
            var setting = new BrokerSetting();
            setting.NotifyWhenMessageArrived = false;
            setting.DeleteMessageInterval = 1000;
            new BrokerController(setting).Initialize().Start();
            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }
}

InitializeEQueue方法初始化EQueue的环境,使用了Autofac作为IOC容器,使用log4Net记录日志, 我们看一下RegisterEQueueComponents方法:

   public static class ConfigurationExtensions
    {
        public static Configuration RegisterEQueueComponents(this Configuration configuration)
        {
            configuration.SetDefault<IAllocateMessageQueueStrategy, AverageAllocateMessageQueueStrategy>();
            configuration.SetDefault<IQueueSelector, QueueHashSelector>();
            configuration.SetDefault<ILocalOffsetStore, DefaultLocalOffsetStore>();
            configuration.SetDefault<IMessageStore, InMemoryMessageStore>();
            configuration.SetDefault<IMessageService, MessageService>();
            configuration.SetDefault<IOffsetManager, InMemoryOffsetManager>();
            return configuration;
        }
    }

代码中涉及到6个组件:

  • IAllocateMessageQueueStrategy
  • IQueueSelector
  • ILocalOffsetStore
  • IMessageStore
  • IMessageService
  • IOffsetManager

DeleteMessageInterval 这个属性是用来设置equeue的定时删除间隔,单位为毫秒,默认值是一个小时。另外还有ProducerSocketSetting 和 ConsumerSocketSetting 分别用于设置Producer连接Broker和Consumer连接Broker的IP和端口,默认端口是5000和5001。

 public class BrokerSetting
    {
        public SocketSetting ProducerSocketSetting { get; set; }
        public SocketSetting ConsumerSocketSetting { get; set; }
        public bool NotifyWhenMessageArrived { get; set; }
        public int DeleteMessageInterval { get; set; }

        public BrokerSetting()
        {
            ProducerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5000, Backlog = 5000 };
            ConsumerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5001, Backlog = 5000 };
            NotifyWhenMessageArrived = true;
            DeleteMessageInterval = 1000 * 60 * 60;
        }
    }

运行项目,如果显示下面类似内容,说明Broker启动成功:

2014-03-23 20:10:30,255  INFO BrokerController - Broker started, producer:[169.254.80.80:5000], consumer:[169.254.80.80:5001]

三、在Visual Studio中开发测试

1.创建一个VS项目 QuickStart.ProducerClient,通过Nuget引用EQueue,编写下面Producer代码

 using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Clients.Producers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ProducerClient
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();

            var scheduleService = ObjectContainer.Resolve<IScheduleService>();
            var producer = new Producer().Start();
            var total = 1000;
            var parallelCount = 10;
            var finished = 0;
            var messageIndex = 0;
            var watch = Stopwatch.StartNew();

            var action = new Action(() =>
            {
                for (var index = 1; index <= total; index++)
                {
                    var message = "message" + Interlocked.Increment(ref messageIndex);
                    producer.SendAsync(new Message("SampleTopic", Encoding.UTF8.GetBytes(message)), index.ToString()).ContinueWith(sendTask =>
                    {
                        var finishedCount = Interlocked.Increment(ref finished);
                        if (finishedCount % 1000 == 0)
                        {
                            Console.WriteLine(string.Format("Sent {0} messages, time spent:{1}", finishedCount, watch.ElapsedMilliseconds));
                        }
                    });
                }
            });

            var actions = new List<Action>();
            for (var index = 0; index < parallelCount; index++)
            {
                actions.Add(action);
            }

            Parallel.Invoke(actions.ToArray());

            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }
}

Producer对象在使用之前必须要调用Start初始化,初始化一次即可, 注意:切记不可以在每次发送消息时,都调用Start方法。Producer 默认连接本机的5000端口,可以通过ProducerSetting 进行设置,可以参看下面的代码:

 public class ProducerSetting
    {
        public string BrokerAddress { get; set; }
        public int BrokerPort { get; set; }
        public int SendMessageTimeoutMilliseconds { get; set; }
        public int UpdateTopicQueueCountInterval { get; set; }

        public ProducerSetting()
        {
            BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
            BrokerPort = 5000;
            SendMessageTimeoutMilliseconds = 1000 * 10;
            UpdateTopicQueueCountInterval = 1000 * 5;
        }

2、创建一个VS项目 QuickStart.ConsumerClient,通过Nuget引用EQueue,编写下面Consumer代码

using System;
using System.Linq;
using System.Text;
using System.Threading;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Broker;
using EQueue.Clients.Consumers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ConsumerClient
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();

            var messageHandler = new MessageHandler();
            var consumer1 = new Consumer("Consumer1", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer2 = new Consumer("Consumer2", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer3 = new Consumer("Consumer3", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer4 = new Consumer("Consumer4", "group1").Subscribe("SampleTopic").Start(messageHandler);

            Console.WriteLine("Start consumer load balance, please wait for a moment.");
            var scheduleService = ObjectContainer.Resolve<IScheduleService>();
            var waitHandle = new ManualResetEvent(false);
            var taskId = scheduleService.ScheduleTask(() =>
            {
                var c1AllocatedQueueIds = consumer1.GetCurrentQueues().Select(x => x.QueueId);
                var c2AllocatedQueueIds = consumer2.GetCurrentQueues().Select(x => x.QueueId);
                var c3AllocatedQueueIds = consumer3.GetCurrentQueues().Select(x => x.QueueId);
                var c4AllocatedQueueIds = consumer4.GetCurrentQueues().Select(x => x.QueueId);
                if (c1AllocatedQueueIds.Count() == 1 && c2AllocatedQueueIds.Count() == 1 && c3AllocatedQueueIds.Count() == 1 && c4AllocatedQueueIds.Count() == 1)
                {
                    Console.WriteLine(string.Format("Consumer load balance finished. Queue allocation result: c1:{0}, c2:{1}, c3:{2}, c4:{3}",
                        string.Join(",", c1AllocatedQueueIds),
                        string.Join(",", c2AllocatedQueueIds),
                        string.Join(",", c3AllocatedQueueIds),
                        string.Join(",", c4AllocatedQueueIds)));
                    waitHandle.Set();
                }
            }, 1000, 1000);

            waitHandle.WaitOne();
            scheduleService.ShutdownTask(taskId);

            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }

    class MessageHandler : IMessageHandler
    {
        private int _handledCount;

        public void Handle(QueueMessage message, IMessageContext context)
        {
            var count = Interlocked.Increment(ref _handledCount);
            if (count % 1000 == 0)
            {
                Console.WriteLine("Total handled {0} messages.", count);
            }
            context.OnMessageHandled(message);
        }
    }
}

使用方式给用户感觉是消息从EQueue服务器推到了应用客户端。 但是实际Consumer内部是使用长轮询Pull方式从EQueue服务器拉消息,然后再回调用户Listener方法。Consumer默认连接本机的5001端口,可以通过ConsumerSetting 进行设置,可以参看下面的代码:

    public class ConsumerSetting
    {
        public string BrokerAddress { get; set; }
        public int BrokerPort { get; set; }
        public int RebalanceInterval { get; set; }
        public int UpdateTopicQueueCountInterval { get; set; }
        public int HeartbeatBrokerInterval { get; set; }
        public int PersistConsumerOffsetInterval { get; set; }
        public PullRequestSetting PullRequestSetting { get; set; }
        public MessageModel MessageModel { get; set; }
        public MessageHandleMode MessageHandleMode { get; set; }

        public ConsumerSetting()
        {
            BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
            BrokerPort = 5001;
            RebalanceInterval = 1000 * 5;
            HeartbeatBrokerInterval = 1000 * 5;
            UpdateTopicQueueCountInterval = 1000 * 5;
            PersistConsumerOffsetInterval = 1000 * 5;
            PullRequestSetting = new PullRequestSetting();
            MessageModel = MessageModel.Clustering;
            MessageHandleMode = MessageHandleMode.Parallel;
        }

EQueue兼容Linux/Mono,下面是CentOS 6.4/Mono 3.2.3 环境下的运行结果:

image

 

Kafka/Metaq设计思想学习笔记

EQueue - 详细谈一下消息持久化以及消息堆积的设计

2015-03-27 13:38:20 wawalike 阅读数 239

一、简介

EQueue是一个参照RocketMQ实现的开源消息队列中间件,兼容Mono,具体可以参看作者的文章《分享一个c#写的开源分布式消息队列equeue》。项目开源地址:https://github.com/tangxuehua/equeue,项目中包含了队列的全部源代码以及如何使用的示例。

二、安装EQueue

Producer、Consumer、Broker支持分布式部署,安装EQueue需要.NET 4, Visual Studio 2010/2012/2013. 目前EQueue是个类库,需要自己实现Broker的宿主,可以参照QuickStart,创建一个QuickStart.BrokerServer项目,通过Visual Studio的Nuget 查找equeue

image

using System;
using System.Text;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.JsonNet;
using ECommon.Log4Net;
using EQueue.Broker;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.BrokerServer
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();
            var setting = new BrokerSetting();
            setting.NotifyWhenMessageArrived = false;
            setting.DeleteMessageInterval = 1000;
            new BrokerController(setting).Initialize().Start();
            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }
}

InitializeEQueue方法初始化EQueue的环境,使用了Autofac作为IOC容器,使用log4Net记录日志, 我们看一下RegisterEQueueComponents方法:

   public static class ConfigurationExtensions
    {
        public static Configuration RegisterEQueueComponents(this Configuration configuration)
        {
            configuration.SetDefault<IAllocateMessageQueueStrategy, AverageAllocateMessageQueueStrategy>();
            configuration.SetDefault<IQueueSelector, QueueHashSelector>();
            configuration.SetDefault<ILocalOffsetStore, DefaultLocalOffsetStore>();
            configuration.SetDefault<IMessageStore, InMemoryMessageStore>();
            configuration.SetDefault<IMessageService, MessageService>();
            configuration.SetDefault<IOffsetManager, InMemoryOffsetManager>();
            return configuration;
        }
    }

代码中涉及到6个组件:

  • IAllocateMessageQueueStrategy
  • IQueueSelector
  • ILocalOffsetStore
  • IMessageStore
  • IMessageService
  • IOffsetManager

DeleteMessageInterval 这个属性是用来设置equeue的定时删除间隔,单位为毫秒,默认值是一个小时。另外还有ProducerSocketSetting 和 ConsumerSocketSetting 分别用于设置Producer连接Broker和Consumer连接Broker的IP和端口,默认端口是5000和5001。

 public class BrokerSetting
    {
        public SocketSetting ProducerSocketSetting { get; set; }
        public SocketSetting ConsumerSocketSetting { get; set; }
        public bool NotifyWhenMessageArrived { get; set; }
        public int DeleteMessageInterval { get; set; }

        public BrokerSetting()
        {
            ProducerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5000, Backlog = 5000 };
            ConsumerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5001, Backlog = 5000 };
            NotifyWhenMessageArrived = true;
            DeleteMessageInterval = 1000 * 60 * 60;
        }
    }

运行项目,如果显示下面类似内容,说明Broker启动成功:

2014-03-23 20:10:30,255  INFO BrokerController - Broker started, producer:[169.254.80.80:5000], consumer:[169.254.80.80:5001]

三、在Visual Studio中开发测试

1.创建一个VS项目 QuickStart.ProducerClient,通过Nuget引用EQueue,编写下面Producer代码

 using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Clients.Producers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ProducerClient
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();

            var scheduleService = ObjectContainer.Resolve<IScheduleService>();
            var producer = new Producer().Start();
            var total = 1000;
            var parallelCount = 10;
            var finished = 0;
            var messageIndex = 0;
            var watch = Stopwatch.StartNew();

            var action = new Action(() =>
            {
                for (var index = 1; index <= total; index++)
                {
                    var message = "message" + Interlocked.Increment(ref messageIndex);
                    producer.SendAsync(new Message("SampleTopic", Encoding.UTF8.GetBytes(message)), index.ToString()).ContinueWith(sendTask =>
                    {
                        var finishedCount = Interlocked.Increment(ref finished);
                        if (finishedCount % 1000 == 0)
                        {
                            Console.WriteLine(string.Format("Sent {0} messages, time spent:{1}", finishedCount, watch.ElapsedMilliseconds));
                        }
                    });
                }
            });

            var actions = new List<Action>();
            for (var index = 0; index < parallelCount; index++)
            {
                actions.Add(action);
            }

            Parallel.Invoke(actions.ToArray());

            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }
}

Producer对象在使用之前必须要调用Start初始化,初始化一次即可, 注意:切记不可以在每次发送消息时,都调用Start方法。Producer 默认连接本机的5000端口,可以通过ProducerSetting 进行设置,可以参看下面的代码:

 public class ProducerSetting
    {
        public string BrokerAddress { get; set; }
        public int BrokerPort { get; set; }
        public int SendMessageTimeoutMilliseconds { get; set; }
        public int UpdateTopicQueueCountInterval { get; set; }

        public ProducerSetting()
        {
            BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
            BrokerPort = 5000;
            SendMessageTimeoutMilliseconds = 1000 * 10;
            UpdateTopicQueueCountInterval = 1000 * 5;
        }

2、创建一个VS项目 QuickStart.ConsumerClient,通过Nuget引用EQueue,编写下面Consumer代码

using System;
using System.Linq;
using System.Text;
using System.Threading;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Broker;
using EQueue.Clients.Consumers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ConsumerClient
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();

            var messageHandler = new MessageHandler();
            var consumer1 = new Consumer("Consumer1", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer2 = new Consumer("Consumer2", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer3 = new Consumer("Consumer3", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer4 = new Consumer("Consumer4", "group1").Subscribe("SampleTopic").Start(messageHandler);

            Console.WriteLine("Start consumer load balance, please wait for a moment.");
            var scheduleService = ObjectContainer.Resolve<IScheduleService>();
            var waitHandle = new ManualResetEvent(false);
            var taskId = scheduleService.ScheduleTask(() =>
            {
                var c1AllocatedQueueIds = consumer1.GetCurrentQueues().Select(x => x.QueueId);
                var c2AllocatedQueueIds = consumer2.GetCurrentQueues().Select(x => x.QueueId);
                var c3AllocatedQueueIds = consumer3.GetCurrentQueues().Select(x => x.QueueId);
                var c4AllocatedQueueIds = consumer4.GetCurrentQueues().Select(x => x.QueueId);
                if (c1AllocatedQueueIds.Count() == 1 && c2AllocatedQueueIds.Count() == 1 && c3AllocatedQueueIds.Count() == 1 && c4AllocatedQueueIds.Count() == 1)
                {
                    Console.WriteLine(string.Format("Consumer load balance finished. Queue allocation result: c1:{0}, c2:{1}, c3:{2}, c4:{3}",
                        string.Join(",", c1AllocatedQueueIds),
                        string.Join(",", c2AllocatedQueueIds),
                        string.Join(",", c3AllocatedQueueIds),
                        string.Join(",", c4AllocatedQueueIds)));
                    waitHandle.Set();
                }
            }, 1000, 1000);

            waitHandle.WaitOne();
            scheduleService.ShutdownTask(taskId);

            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }

    class MessageHandler : IMessageHandler
    {
        private int _handledCount;

        public void Handle(QueueMessage message, IMessageContext context)
        {
            var count = Interlocked.Increment(ref _handledCount);
            if (count % 1000 == 0)
            {
                Console.WriteLine("Total handled {0} messages.", count);
            }
            context.OnMessageHandled(message);
        }
    }
}

使用方式给用户感觉是消息从EQueue服务器推到了应用客户端。 但是实际Consumer内部是使用长轮询Pull方式从EQueue服务器拉消息,然后再回调用户Listener方法。Consumer默认连接本机的5001端口,可以通过ConsumerSetting 进行设置,可以参看下面的代码:

    public class ConsumerSetting
    {
        public string BrokerAddress { get; set; }
        public int BrokerPort { get; set; }
        public int RebalanceInterval { get; set; }
        public int UpdateTopicQueueCountInterval { get; set; }
        public int HeartbeatBrokerInterval { get; set; }
        public int PersistConsumerOffsetInterval { get; set; }
        public PullRequestSetting PullRequestSetting { get; set; }
        public MessageModel MessageModel { get; set; }
        public MessageHandleMode MessageHandleMode { get; set; }

        public ConsumerSetting()
        {
            BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
            BrokerPort = 5001;
            RebalanceInterval = 1000 * 5;
            HeartbeatBrokerInterval = 1000 * 5;
            UpdateTopicQueueCountInterval = 1000 * 5;
            PersistConsumerOffsetInterval = 1000 * 5;
            PullRequestSetting = new PullRequestSetting();
            MessageModel = MessageModel.Clustering;
            MessageHandleMode = MessageHandleMode.Parallel;
        }

EQueue兼容Linux/Mono,下面是CentOS 6.4/Mono 3.2.3 环境下的运行结果:

image

2017-11-08 18:30:00 weixin_33720078 阅读数 7

一、简介

EQueue是一个参照RocketMQ实现的开源消息队列中间件,具体可以参看作者的文章《分享一个c#写的开源分布式消息队列equeue》。项目开源地址:https://github.com/tangxuehua/equeue,项目中包含了队列的全部源代码以及如何使用的示例。

二、安装EQueue

Producer、Consumer、Broker支持分布式部署,安装EQueue需要.NET 4, Visual Studio 2010/2012/2013. 目前EQueue是个类库,需要自己实现Broker的宿主,可以参照QuickStart,创建一个QuickStart.BrokerServer项目,通过Visual Studio的Nuget 查找equeue

image

using System;
using System.Text;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.JsonNet;
using ECommon.Log4Net;
using EQueue.Broker;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.BrokerServer
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();
            var setting = new BrokerSetting();
            setting.NotifyWhenMessageArrived = false;
            setting.DeleteMessageInterval = 1000;
            new BrokerController(setting).Initialize().Start();
            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }
}

InitializeEQueue方法初始化EQueue的环境,使用了Autofac作为IOC容器,使用log4Net记录日志, 我们看一下RegisterEQueueComponents方法:

   public static class ConfigurationExtensions
    {
        public static Configuration RegisterEQueueComponents(this Configuration configuration)
        {
            configuration.SetDefault<IAllocateMessageQueueStrategy, AverageAllocateMessageQueueStrategy>();
            configuration.SetDefault<IQueueSelector, QueueHashSelector>();
            configuration.SetDefault<ILocalOffsetStore, DefaultLocalOffsetStore>();
            configuration.SetDefault<IMessageStore, InMemoryMessageStore>();
            configuration.SetDefault<IMessageService, MessageService>();
            configuration.SetDefault<IOffsetManager, InMemoryOffsetManager>();
            return configuration;
        }
    }

代码中涉及到6个组件:

  • IAllocateMessageQueueStrategy
  • IQueueSelector
  • ILocalOffsetStore
  • IMessageStore
  • IMessageService
  • IOffsetManager

DeleteMessageInterval 这个属性是用来设置equeue的定时删除间隔,单位为毫秒,默认值是一个小时。另外还有ProducerSocketSetting 和 ConsumerSocketSetting 分别用于设置Producer连接Broker和Consumer连接Broker的IP和端口,默认端口是5000和5001。

 public class BrokerSetting
    {
        public SocketSetting ProducerSocketSetting { get; set; }
        public SocketSetting ConsumerSocketSetting { get; set; }
        public bool NotifyWhenMessageArrived { get; set; }
        public int DeleteMessageInterval { get; set; }

        public BrokerSetting()
        {
            ProducerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5000, Backlog = 5000 };
            ConsumerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5001, Backlog = 5000 };
            NotifyWhenMessageArrived = true;
            DeleteMessageInterval = 1000 * 60 * 60;
        }
    }

运行项目,如果显示下面类似内容,说明Broker启动成功:

2014-03-23 20:10:30,255  INFO BrokerController - Broker started, producer:[169.254.80.80:5000], consumer:[169.254.80.80:5001]

三、在Visual Studio中开发测试

1.创建一个VS项目 QuickStart.ProducerClient,通过Nuget引用EQueue,编写下面Producer代码

 using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Clients.Producers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ProducerClient
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();

            var scheduleService = ObjectContainer.Resolve<IScheduleService>();
            var producer = new Producer().Start();
            var total = 1000;
            var parallelCount = 10;
            var finished = 0;
            var messageIndex = 0;
            var watch = Stopwatch.StartNew();

            var action = new Action(() =>
            {
                for (var index = 1; index <= total; index++)
                {
                    var message = "message" + Interlocked.Increment(ref messageIndex);
                    producer.SendAsync(new Message("SampleTopic", Encoding.UTF8.GetBytes(message)), index.ToString()).ContinueWith(sendTask =>
                    {
                        var finishedCount = Interlocked.Increment(ref finished);
                        if (finishedCount % 1000 == 0)
                        {
                            Console.WriteLine(string.Format("Sent {0} messages, time spent:{1}", finishedCount, watch.ElapsedMilliseconds));
                        }
                    });
                }
            });

            var actions = new List<Action>();
            for (var index = 0; index < parallelCount; index++)
            {
                actions.Add(action);
            }

            Parallel.Invoke(actions.ToArray());

            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }
}

Producer对象在使用之前必须要调用Start初始化,初始化一次即可, 注意:切记不可以在每次发送消息时,都调用Start方法。Producer 默认连接本机的5000端口,可以通过ProducerSetting 进行设置,可以参看下面的代码:

 public class ProducerSetting
    {
        public string BrokerAddress { get; set; }
        public int BrokerPort { get; set; }
        public int SendMessageTimeoutMilliseconds { get; set; }
        public int UpdateTopicQueueCountInterval { get; set; }

        public ProducerSetting()
        {
            BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
            BrokerPort = 5000;
            SendMessageTimeoutMilliseconds = 1000 * 10;
            UpdateTopicQueueCountInterval = 1000 * 5;
        }

2、创建一个VS项目 QuickStart.ConsumerClient,通过Nuget引用EQueue,编写下面Consumer代码

using System;
using System.Linq;
using System.Text;
using System.Threading;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Broker;
using EQueue.Clients.Consumers;
using EQueue.Configurations;
using EQueue.Protocols;

namespace QuickStart.ConsumerClient
{
    class Program
    {
        static void Main(string[] args)
        {
            InitializeEQueue();

            var messageHandler = new MessageHandler();
            var consumer1 = new Consumer("Consumer1", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer2 = new Consumer("Consumer2", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer3 = new Consumer("Consumer3", "group1").Subscribe("SampleTopic").Start(messageHandler);
            var consumer4 = new Consumer("Consumer4", "group1").Subscribe("SampleTopic").Start(messageHandler);

            Console.WriteLine("Start consumer load balance, please wait for a moment.");
            var scheduleService = ObjectContainer.Resolve<IScheduleService>();
            var waitHandle = new ManualResetEvent(false);
            var taskId = scheduleService.ScheduleTask(() =>
            {
                var c1AllocatedQueueIds = consumer1.GetCurrentQueues().Select(x => x.QueueId);
                var c2AllocatedQueueIds = consumer2.GetCurrentQueues().Select(x => x.QueueId);
                var c3AllocatedQueueIds = consumer3.GetCurrentQueues().Select(x => x.QueueId);
                var c4AllocatedQueueIds = consumer4.GetCurrentQueues().Select(x => x.QueueId);
                if (c1AllocatedQueueIds.Count() == 1 && c2AllocatedQueueIds.Count() == 1 && c3AllocatedQueueIds.Count() == 1 && c4AllocatedQueueIds.Count() == 1)
                {
                    Console.WriteLine(string.Format("Consumer load balance finished. Queue allocation result: c1:{0}, c2:{1}, c3:{2}, c4:{3}",
                        string.Join(",", c1AllocatedQueueIds),
                        string.Join(",", c2AllocatedQueueIds),
                        string.Join(",", c3AllocatedQueueIds),
                        string.Join(",", c4AllocatedQueueIds)));
                    waitHandle.Set();
                }
            }, 1000, 1000);

            waitHandle.WaitOne();
            scheduleService.ShutdownTask(taskId);

            Console.ReadLine();
        }

        static void InitializeEQueue()
        {
            Configuration
                .Create()
                .UseAutofac()
                .RegisterCommonComponents()
                .UseLog4Net()
                .UseJsonNet()
                .RegisterEQueueComponents();
        }
    }

    class MessageHandler : IMessageHandler
    {
        private int _handledCount;

        public void Handle(QueueMessage message, IMessageContext context)
        {
            var count = Interlocked.Increment(ref _handledCount);
            if (count % 1000 == 0)
            {
                Console.WriteLine("Total handled {0} messages.", count);
            }
            context.OnMessageHandled(message);
        }
    }
}

使用方式给用户感觉是消息从EQueue服务器推到了应用客户端。 但是实际Consumer内部是使用长轮询Pull方式从EQueue服务器拉消息,然后再回调用户Listener方法。Consumer默认连接本机的5001端口,可以通过ConsumerSetting 进行设置,可以参看下面的代码:

    public class ConsumerSetting
    {
        public string BrokerAddress { get; set; }
        public int BrokerPort { get; set; }
        public int RebalanceInterval { get; set; }
        public int UpdateTopicQueueCountInterval { get; set; }
        public int HeartbeatBrokerInterval { get; set; }
        public int PersistConsumerOffsetInterval { get; set; }
        public PullRequestSetting PullRequestSetting { get; set; }
        public MessageModel MessageModel { get; set; }
        public MessageHandleMode MessageHandleMode { get; set; }

        public ConsumerSetting()
        {
            BrokerAddress = SocketUtils.GetLocalIPV4().ToString();
            BrokerPort = 5001;
            RebalanceInterval = 1000 * 5;
            HeartbeatBrokerInterval = 1000 * 5;
            UpdateTopicQueueCountInterval = 1000 * 5;
            PersistConsumerOffsetInterval = 1000 * 5;
            PullRequestSetting = new PullRequestSetting();
            MessageModel = MessageModel.Clustering;
            MessageHandleMode = MessageHandleMode.Parallel;
        }






本文转自 张善友 51CTO博客,原文链接:http://blog.51cto.com/shanyou/1381850,如需转载请自行联系原作者

C# Mysql

阅读数 401

C# with MySQL

阅读数 42

c# mysql

阅读数 531

C#与MySQL

阅读数 441

没有更多推荐了,返回首页