精华内容
下载资源
问答
  • 一、前言 ...其中使用到了ID号器,可能很多小伙伴还不懂什么是ID号器以及如何去实现,今天我们就一起探讨一下什么是ID号器?ID号器的原理是什么?如何实现一个ID号器等。 二、从数据...

    一、前言

    上一篇文章《如何将一个长URL转换为一个短URL?》中谈到如何将长地址URL转换为短地址URL,其中谈到了一个比较理想的解决方案就是使用发号器生成一个唯一的整数ID,然后转换为62进制,作为短地址URL。

    其中使用到了ID发号器,可能很多小伙伴还不懂什么是ID发号器以及如何去实现,今天我们就一起探讨一下什么是ID发号器?ID发号器的原理是什么?如何实现一个ID发号器等。

    二、从数据库主键ID说起

    1、单机数据库

    当我们的业务访问量不是很大的时候,我们可以使用一台数据库服务器满足我们的业务需求,我们一般设计数据库的时候主键ID用bigint类型,并且设置为自增、无符号,如下所示:

    这里写图片描述

    这种方式完全可以满足我们的业务需求,生成全局唯一递增ID是数据库可以提供给我们的功能,具有如下优点:

    (1)能够保证唯一性;
    (2)能够保证递增性;
    (3)步长固定;

    但是当我们的业务逐渐扩大,我们需要对数据库进行分库分表等操作的时候,这种方式是就变得没有办法了!

    试想一下,如果我们有一个业务,每一个省份维护自己的一台数据库,User表用于记录当前省份的用户信息,假如有一天我们需要把每一个省份的User表用户信息全部合并到一台中央数据库User表中进行统计的时候,结果是不是会崩掉,因为每一个省份User表中的ID都是从1主键递增的!

    2、数据库集群、分库分表

    当我们的数据库达到一定规模的时候,就需要对其进行分库分表,分库分表的时候我们就很难保证主键ID的唯一性,这一点很好理解。这是因为,我们的一张表被分割到不同机器上的数据库中,如果还依靠与数据库自带的自增功能的话就很那保证ID唯一性!如下图所示:

    这里写图片描述

    可以看出,User表中的100W数据被分到两个数据库中,在每一个数据库内部主键ID是自增的,但是却没法保证全局主键ID自增的,这显然是错误的!如何解决这种问题哪?

    (1)使用UUID

    最简单、最容易想到的就应该是使用UUID了,根据UUID的特性,可以产生一个唯一的字符串,这一点大家都知道。UUID是在本地生成的,所以相对性能较高、时延低、扩展性高,完全不受分库分表的影响!

    但是使用UUID是有点小问题的,主要体现在:

    • UUID无法保证趋势递增;
    • UUID过长,往往用32位字符串表示,占用数据库空间较大,做主键的时候索引中主键ID占据的空间较大;
    • UUID作为主键建立索引查询效率低,常见优化方案为转化为两个uint64整数存储;
    • 由于使用实现版本的不一样,在高并发情况下可能会出现UUID重复的情况;

    UUID虽然能够保证全局主键ID的唯一性,但是UUID并不具有有序性,会导致B+树索引在写的时候有过多的随机写操作(连续的ID会产生部分的顺序写);另外,由于在写的时候不能产生有顺序的append操作,而需要进行insert操作,将会读取整个B+树节点加到内存中,在插入这条记录后将整个节点写回磁盘,这种操作在记录占用空间比较大的情况下,性能下降明显。

    (2)ID分组

    虽然,UUID很方便,但由于他的一些弊端我们无法接受,所以在很多对一些性能要求较高的业务场景中,我们是很少使用UUID的,那我们还有没有什么其他方法哪?接下来让我们看一下ID分组的使用:

    这里写图片描述

    如上图所述,由1个数据库变成4个库,每个数据库设置不同的auto_increment初始值init,以及相同的增长步长step,以保证每个数据库生成的ID是不同的,改进后的架构保证了可用性,但缺点是:

    • 丧失了ID生成的“绝对递增性”,但这个问题不大,我们的目标是趋势递增,不是绝对递增;
    • 数据库的写压力依然很大,每次生成ID都要访问数据库;
    • 可扩展性差;

    我们可以想象的是,目前虽然我们的机器只有4台,然后由不同的init和不同的step,但是如果我们需要在其中再加一台机器的话,可想而知我们需要手动更新init和step,这是一件比较繁琐的事情!但有人可能会说了,我们可以直接把 step设置大一些,假如,我们预期数据最大规模的时候用100台数据库服务器就可以了,那我们就可以设置step为100。尽管如此,扩展性还不是很高!

    3、还有什么操作哪?

    上述我们讨论了一个一个的优缺点,当然,还有很多其他的主键ID生成方案。但总的来说,我们讨论问题的关键浮出水面:如何高效生成趋势有序的全局唯一ID,兼顾有序性、高性能、可扩展等因素!

    这就需要我们今天的主角登场了,他就是:ID发号器!ID发号器的主要思想大致相同,但不同平台的实现方式可能会有所不同,本文主要介绍一下:Twitter公司的SnowFlake、如何自己实现一个ID发号器、Vesta框架。

    三、SnowFlake简介

    Twitter公司的SnowFlake算法就是著名的《雪花算法》,SnowFlake是通过Scala语言实现的,目前GitHub上已经看不到源代码了,只有一个2010年的版本,地址为:https://github.com/twitter/snowflake/releases/tag/snowflake-2010,因此很难在我们实际的项目中真正的使用到 ,我们更多的是采用雪花算法的思想,去构建自己属于自己的ID发号器。

    1、SnowFlake原理

    SnowFlake产生的ID是一个64位的整型,结构如下(每一部分用“-”符号分隔):

    0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000
    

    (1)**1位:**标识部分,在java中由于long的最高位是符号位,正数是0,负数是1,一般生成的ID为正数,所以为0;

    (2)**41位:**时间戳部分,这个是毫秒级的时间,一般实现上不会存储当前的时间戳,而是时间戳的差值(当前时间-固定的开始时间),这样可以使产生的ID从更小值开始;41位的时间戳可以使用69年,(1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69年;

    (3)**10位:**节点部分,Twitter实现中使用前5位作为数据中心标识,后5位作为机器标识,可以部署1024个节点;

    (4)**12位:**序列号部分,支持同一毫秒内同一个节点可以生成4096个ID;

    SnowFlake算法生成的ID大致上是按照时间递增的,用在分布式系统中时,需要注意数据中心标识和机器标识必须唯一,这样就能保证每个节点生成的ID都是唯一的!

    2、SnowFlake算法如何实现

    SnowFlake算法的实现在GitHub或者码云上有各种实现版本!SnowFlake算法为我们提供了一个可行的思路,但是我们不一定都需要像上面那样使用5位作为数据中心标识,5位作为机器标识,可以根据我们业务的需要,灵活分配节点部分,如:若不需要数据中心,完全可以使用全部10位作为机器标识;若数据中心不多,也可以只使用3位作为数据中心,7位作为机器标识。所以,我们可以看出SnowFlake算法只是一种指导思想,我们下边自己简单的实现一个一下!

    四、如何自己实现一个ID发号器

    注意这里只有生成ID的部分,没有Client也没有Server,如果想要详细的,请看第五节《Vesta框架简介》!

    /**
     * Twitter的SnowFlake算法,使用SnowFlake算法生成一个整数,然后转化为62进制变成一个短地址URL
     * @author beyond  https://github.com/beyondfengyu/SnowFlake
     * @author xuliugen
     * @date 2018/04/23
     */
    public class SnowFlake {
    
        /**
         * 起始的时间戳
         */
        private final static long START_TIMESTAMP = 1480166465631L;
    
        /**
         * 每一部分占用的位数
         */
        private final static long SEQUENCE_BIT = 12;   //序列号占用的位数
        private final static long MACHINE_BIT = 5;     //机器标识占用的位数
        private final static long DATA_CENTER_BIT = 5; //数据中心占用的位数
    
        /**
         * 每一部分的最大值
         */
        private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);
        private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
        private final static long MAX_DATA_CENTER_NUM = -1L ^ (-1L << DATA_CENTER_BIT);
    
        /**
         * 每一部分向左的位移
         */
        private final static long MACHINE_LEFT = SEQUENCE_BIT;
        private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
        private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
    
        private long dataCenterId;  //数据中心
        private long machineId;     //机器标识
        private long sequence = 0L; //序列号
        private long lastTimeStamp = -1L;  //上一次时间戳
    
        /**
         * 根据指定的数据中心ID和机器标志ID生成指定的序列号
         * @param dataCenterId 数据中心ID
         * @param machineId    机器标志ID
         */
        public SnowFlakeShortUrl(long dataCenterId, long machineId) {
            if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {
                throw new IllegalArgumentException("DtaCenterId can't be greater than MAX_DATA_CENTER_NUM or less than 0!");
            }
            if (machineId > MAX_MACHINE_NUM || machineId < 0) {
                throw new IllegalArgumentException("MachineId can't be greater than MAX_MACHINE_NUM or less than 0!");
            }
            this.dataCenterId = dataCenterId;
            this.machineId = machineId;
        }
    
        /**
         * 产生下一个ID
         * @return
         */
        public synchronized long nextId() {
            long currTimeStamp = getNewTimeStamp();
            if (currTimeStamp < lastTimeStamp) {
                throw new RuntimeException("Clock moved backwards.  Refusing to generate id");
            }
    
            if (currTimeStamp == lastTimeStamp) {
                //相同毫秒内,序列号自增
                sequence = (sequence + 1) & MAX_SEQUENCE;
                //同一毫秒的序列数已经达到最大
                if (sequence == 0L) {
                    currTimeStamp = getNextMill();
                }
            } else {
                //不同毫秒内,序列号置为0
                sequence = 0L;
            }
    
            lastTimeStamp = currTimeStamp;
    
            return (currTimeStamp - START_TIMESTAMP) << TIMESTAMP_LEFT //时间戳部分
                    | dataCenterId << DATA_CENTER_LEFT       //数据中心部分
                    | machineId << MACHINE_LEFT             //机器标识部分
                    | sequence;                             //序列号部分
        }
    
        private long getNextMill() {
            long mill = getNewTimeStamp();
            while (mill <= lastTimeStamp) {
                mill = getNewTimeStamp();
            }
            return mill;
        }
    
        private long getNewTimeStamp() {
            return System.currentTimeMillis();
        }
    
        public static void main(String[] args) {
            SnowFlakeShortUrl snowFlake = new SnowFlakeShortUrl(2, 3);
    
            for (int i = 0; i < (1 << 4); i++) {
                //10进制
                System.out.println(snowFlake.nextId());
            }
        }
    }
    //输出结果:
    185892988017455104
    185892988021649408
    185892988021649409
    185892988021649410
    185892988021649411
    185892988021649412
    185892988021649413
    185892988021649414
    185892988021649415
    185892988021649416
    185892988021649417
    185892988021649418
    185892988021649419
    185892988021649420
    185892988021649421
    185892988021649422
    

    还记得《如何将一个长URL转换为一个短URL?》文中提到的短地址吗?上文中已经生成了唯一不重复的ID,我们只需要增加一个进制转换的工具就可以了,进制转换的工具如下:

    /**
     * 进制转换工具,最大支持十进制和62进制的转换
     * 1、将十进制的数字转换为指定进制的字符串;
     * 2、将其它进制的数字(字符串形式)转换为十进制的数字
     * @author xuliugen
     * @date 2018/04/23
     */
    public class NumericConvertUtils {
    
        /**
         * 在进制表示中的字符集合,0-Z分别用于表示最大为62进制的符号表示
         */
        private static final char[] digits = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
                'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
                'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
                'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
                'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'};
    
        /**
         * 将十进制的数字转换为指定进制的字符串
         * @param number 十进制的数字
         * @param seed   指定的进制
         * @return 指定进制的字符串
         */
        public static String toOtherNumberSystem(long number, int seed) {
            if (number < 0) {
                number = ((long) 2 * 0x7fffffff) + number + 2;
            }
            char[] buf = new char[32];
            int charPos = 32;
            while ((number / seed) > 0) {
                buf[--charPos] = digits[(int) (number % seed)];
                number /= seed;
            }
            buf[--charPos] = digits[(int) (number % seed)];
            return new String(buf, charPos, (32 - charPos));
        }
    
        /**
         * 将其它进制的数字(字符串形式)转换为十进制的数字
         * @param number 其它进制的数字(字符串形式)
         * @param seed   指定的进制,也就是参数str的原始进制
         * @return 十进制的数字
         */
        public static long toDecimalNumber(String number, int seed) {
            char[] charBuf = number.toCharArray();
            if (seed == 10) {
                return Long.parseLong(number);
            }
    
            long result = 0, base = 1;
    
            for (int i = charBuf.length - 1; i >= 0; i--) {
                int index = 0;
                for (int j = 0, length = digits.length; j < length; j++) {
                    //找到对应字符的下标,对应的下标才是具体的数值
                    if (digits[j] == charBuf[i]) {
                        index = j;
                    }
                }
                result += index * base;
                base *= seed;
            }
            return result;
        }
    }  
    

    写个测试用例如下:

    public static void main(String[] args) {
            SnowFlakeShortUrl snowFlake = new SnowFlakeShortUrl(2, 3);
    
            for (int i = 0; i < (1 << 4); i++) {
                //10进制
                Long id = snowFlake.nextId();
                //62进制
                String convertedNumStr = NumericConvertUtils.toOtherNumberSystem(id, 62);
    
                //10进制转化为62进制
                System.out.println("10进制:" + id + "  62进制短地址:" + convertedNumStr);
    
                //TODO 执行具体的存储操作,可以存放在Redis等中
    
                //62进制转化为10进制
                System.out.println("62进制短地址:" + convertedNumStr + "  10进制:" + NumericConvertUtils.toDecimalNumber(convertedNumStr, 62));
                System.out.println();
            }
        }
    //执行结果如下:
    10进制:185894506410029056  62进制短地址:dJoJ1Xyo3C
    62进制短地址:dJoJ1Xyo3C  10进制:185894506410029056
    
    10进制:185894506414223360  62进制短地址:dJoJ1XPZbG
    62进制短地址:dJoJ1XPZbG  10进制:185894506414223360
    
    10进制:185894506414223361  62进制短地址:dJoJ1XPZbH
    62进制短地址:dJoJ1XPZbH  10进制:185894506414223361
    
    10进制:185894506414223362  62进制短地址:dJoJ1XPZbI
    62进制短地址:dJoJ1XPZbI  10进制:185894506414223362
    
    10进制:185894506414223363  62进制短地址:dJoJ1XPZbJ
    62进制短地址:dJoJ1XPZbJ  10进制:185894506414223363
    
    10进制:185894506414223364  62进制短地址:dJoJ1XPZbK
    62进制短地址:dJoJ1XPZbK  10进制:185894506414223364
    ......
    

    所有代码地址在:https://gitee.com/xuliugen/codes/9upvmzyk6c2i78eb3lgnj63

    五、Vesta框架简介

    Vesta是一款通用的ID产生器,互联网俗称统一发号器,它具有全局唯一、粗略有序、可反解和可制造等特性,它支持三种发布模式:嵌入发布模式、中心服务器发布模式、REST发布模式,根据业务的性能需求,它可以产生最大峰值型和最小粒度型两种类型的ID,它的实现架构使其具有高性能,高可用和可伸缩等互联网产品需要的质量属性,是一款通用的高性能的发号器产品。

    码云:https://gitee.com/robertleepeak/vesta-id-generator
    GitHub:https://github.com/cloudatee/vesta-id-generator

    由于Vesta的设计与实现较为复杂,一小节不足以说明清楚,这里不再详细的介绍,有兴趣的参考上述仓库地址文档!


    参考文章:

    1、https://gitee.com/robertleepeak/vesta-id-generator
    2、http://www.wolfbe.com/detail/201611/381.html#
    3、http://www.wolfbe.com/detail/201701/386.html
    4、细聊分布式ID生成方法

    在这里插入图片描述

    【视频福利】2T免费学习视频,搜索或扫描上述二维码关注微信公众号:Java后端技术(ID: JavaITWork)回复:1024,即可免费获取!内含SSM、Spring全家桶、微服务、MySQL、MyCat、集群、分布式、中间件、Linux、网络、多线程,Jenkins、Nexus、Docker、ELK等等免费学习视频,持续更新!

    展开全文
  • C语言 扑克牌洗牌牌统计花顺个数程序

    千次阅读 多人点赞 2018-12-18 00:17:14
    目录一、2个算法关键点关键点1:洗牌算法关键点2:查找花顺算法二、运行结果三、完整代码 题目: 一张扑克牌可用结构类型描述,一副扑克牌的52张牌则是一个结构...(2)出现花顺的牌个数; (3)牌总次数; ...

    题目:
    一张扑克牌可用结构类型描述,一副扑克牌的52张牌则是一个结构数组。
    1、试编写洗牌函数和供4人玩牌的发牌函数。
    2、统计出现同花顺的概率

    提示:模拟发牌100000次(这个次数可用#define设置),程序最后统计如下数据
    (1)出现同花顺的总个数;
    (2)出现同花顺的发牌个数;
    (3)发牌总次数;
    (4)出现同花顺的发牌概率。

    所使用的结构:
    struct card{
    char shape; // 花色首字母,即’S’, ‘H’, ‘C’, 'D’四种
    int value; // 牌的大小,从2开始,JQKA分别为11, 12, 13, 14
    };

    5张同花连号算同花顺,连号规则为从2开始,到A结束。
    如:7、8、9、10、J、Q、K只能算作7~J一个同花顺。

    一、2个算法关键点

    关键点1:洗牌算法

    经典洗牌算法有很多,可以直接自行百度洗牌算法,理解一下不同的算法优劣。这里采用一种复杂度较小的算法Knuth-Durstenfeld Shuffle算法(算法2),它是Fisher-Yates Shuffle算法(算法1)的改进版。

    问题描述:将一个有N个元素有序数组a[N]乱序,即洗牌。

    算法1核心:从原始数组里随机抽取一个元素放在新的数组里,再从原数组剩下的元素里随机选一个元素依次放到新数组里,如此循环,直到原始数组里所有元素被抽取到新数组里,完成乱序(洗牌)。
    算法复杂度:时间复杂度O(N*N),空间复杂度O(N)
    优点:容易理解。
    缺点:时间复杂度和空间复杂度都较大,抽取元素后还需在原数组中删去抽取掉的元素,较麻烦。

    算法2核心:对算法1进行改进,不新建数组,而是在原数组内进行互换。从原始数组里随机选一个元素,与最后一个元素a[N-1]互换。然后在除最后一个元素a[N-1]外的其他元素里随机挑一个元素与倒数第二个元素a[N-2]互换。然后在除最后2个元素外的其他元素里随机挑一个元素与倒数第三个元素a[N-3]互换…直到抽取完毕,完成乱序(洗牌)。
    算法复杂度:时间复杂度O(N),空间复杂度O(1)
    优点:复杂度小,操作简单。
    缺点:需知道数组大小N(不过这点较容易克服)。

    洗牌程序:
    采用算法2,对长为5的结构化数组c[len]进行洗牌,其中num_time是为在快速循环中为srand函数赋予随机数种子用的(因为在快速循环中程序运行很快,time()返回的时间秒数基本不变,若只采用time(NULL)作为srand的随机数种子是远远不够的)。

    // 洗牌函数
    // 算法:对于size为Num的数组array[Num],len=Num。每次随机产生一个[0, len)的数字x,
    //      将array[x]与数组尾端array[len-1]调换,然后len减1,重复操作。
    // 复杂度:时间复杂度O(N),空间复杂度O(1)
    void Shuffle(struct card *c, int num_time)
    {
    	int x, len = 52;
    	struct card temp;
    	
    	// 初始化随机数种子
    	// srand((unsigned)time(NULL)); 
    	// 在循环中运算过快time可能不变,加上循环变量num_time保证随机数种子的不一样。
    	srand((unsigned)time(NULL) + num_time);
    	while (len > 1)
    	{
    		x = rand() % len;
    		temp = c[len - 1];
    		c[len - 1] = c[x];
    		c[x] = temp;
    		len--;
    	}
    }
    

    关键点2:查找同花顺算法

    一种容易想到的算法是:
    (1)发牌:洗牌后,按顺序发给四个人,每人13张牌,假设某人领到的牌存在结构数组a0[13]里。
    (2)排序:先对a0[13]根据元素value值大小进行排序,冒泡、快排等算法均可,这里因元素较少(13个)采用冒泡,若想更快可采用快排。先不分花色就进行排序,后续操作就不用排序了,可使得进一步的操作——分花色和查找顺子更简单。
    (3)分花色:因不知各花色各有多少张牌,所以取最大值13,,定义4个大小为13的int型数组(C语言必须在定义数组时规定大小)。然后遍历排序后的a0[13]元素,按花色分到4个数组array里,只存value值,并注意保存每个数组的元素个数。
    (4)找顺子:因为此人手中的13张牌按花色存到了4个int数组里,所以问题变成了:在一个长为len的int数组array[len]里查找5顺子的个数。这里可新建一个函数解决这个问题。
    算法核心:先确认数组元素是否大于5,然后从array[0]到array[len-5],确认每一个元素与其后4个元素构成公差为1的递增等差数列(5连顺子)。若出现一个顺子,将下标元素变到当前顺子后的第一个元素,在程序中为i+=4,因为i++还会再+1,所以是一共是+5。这样就可以保证满足题目中“7,8,9,10,J,Q,K中只有 7~J一个顺子”的要求。
    (5)相加:分别计算此人4种花色数组里的5连顺子个数,并相加,即为此人手里的同花顺个数。同理,计算此次发牌后4人的同花顺个数相加,即为此次发牌的同花顺个数。

    注意:
    出现同花顺个数和出现同花顺的发牌次数并不相同,因为一次发牌可能有多个同花顺出现。这里的代码运用了一个技巧,先统计每次发牌后同花顺个数是否增加,若增加(改变)了,即证明此次发牌有同花顺出现,出现同花顺的发牌次数自增1即可。

    if (Num_tong != Num_tong_org)
    			Num_tong_fa++;
    

    二、运行结果

    在这里插入图片描述

    三、完整代码

    #include <stdio.h>
    #include <stdlib.h>
    #include <time.h>
    
    // 模拟发牌总次数
    #define N 100000
    
    // 统计个数
    int Num_tong = 0;		// 出现同花顺个数
    int Num_tong_fa = 0;	// 出现同花顺的发牌次数
    double P_tong = 0.0;	// 出现同花顺的概率
    
    // 牌的花色数组
    char shapes[4] = { 'S', 'H', 'C', 'D' };
    
    // 定义牌的结构体
    struct card {
    	char shape;
    	int  value;
    };
    
    // 4人各自的牌
    struct card a0[13], a1[13], a2[13], a3[13];
    
    // 初始化一副牌函数
    void Init_cards(struct card *c)
    {
    	int i, j;
    	for (i = 0; i < 13; i++)
    	{
    		c[4 * i].value = c[4 * i + 1].value = \
    			c[4 * i + 2].value = c[4 * i + 3].value = i + 2;
    		for (j = 0; j <= 3; j++)
    		{
    			c[4 * i + j].shape = shapes[j];
    		}
    	}
    }
    
    // 打印一副牌函数
    void Print_cards(struct card *c, int len)
    {
    	int i;
    	for (i = 0; i < len; i++)
    	{
    		printf("%2d: %2d %c\n", i, c[i].value, c[i].shape);
    	}
    }
    
    // 洗牌函数
    // 算法:对于size为Num的数组array[Num],len=Num。每次随机产生一个[0, len)的数字x,
    //      将array[x]与数组尾端array[len-1]调换,然后len减1,重复操作。
    // 复杂度:时间复杂度O(N),空间复杂度O(1)
    void Shuffle(struct card *c, int num_time)
    {
    	int x, len = 52;
    	struct card temp;
    	
    	// 初始化随机数种子
    	// srand((unsigned)time(NULL)); 
    	// 在循环中运算过快time可能不变,加上循环变量num_time保证随机数种子的不一样。
    	srand((unsigned)time(NULL) + num_time);
    	while (len > 1)
    	{
    		x = rand() % len;
    		temp = c[len - 1];
    		c[len - 1] = c[x];
    		c[x] = temp;
    		len--;
    	}
    }
    
    // 排序函数:将发到每个人手上的牌按大小排序,便于归类花色
    void Sort_cards(struct card *c, int len)
    {
    	int i, j;
    	struct card temp;
    	for (i = len - 1; i > 0; i--)
    	{
    		for (j = 0; j < i; j++)
    		{
    			if (c[j].value > c[j + 1].value)
    			{
    				temp = c[j + 1];
    				c[j + 1] = c[j];
    				c[j] = temp;
    			}
    		}
    	}
    }
    
    // 发牌函数:洗牌后,按循环给4个人发牌
    void Deal(struct card *c)
    {
    	int i;
    
    	// 给4人发牌
    	for (i = 0; i < 13; i++)
    	{
    		a0[i] = c[4 * i];
    		a1[i] = c[4 * i + 1];
    		a2[i] = c[4 * i + 2];
    		a3[i] = c[4 * i + 3];
    	}
    
    	// 给4人的牌排序,以便后续分花色和查找同花顺
    	Sort_cards(a0, 13);
    	Sort_cards(a1, 13);
    	Sort_cards(a2, 13);
    	Sort_cards(a3, 13);
    }
    
    // 打印数组
    void Print_array(int *array, int len)
    {
    	int i;
    	for (i = 0; i < len; i++)
    	{
    		printf("%d ", array[i]);
    	}
    	printf("\n");
    }
    
    // 数组同花顺函数:判断数组中是否含有同花顺,返回同花顺个数
    int Have_shunzi(int *array, int len)
    {
    	int i, j;
    	int Num = 0, Num_org = 0;
    	if (len >= 5)
    	{
    		for (i = 0; i <= (len - 5); i++)
    		{
    			Num_org = Num;
    			for (j = i; j < i + 4; j++)
    			{
    				if (array[j + 1] == (array[j] + 1))
    				{
    					if (j == (i + 3))
    					{
    						Num++;
    						printf("%d %d %d %d %d\n", array[i], array[i + 1], array[i + 2], array[i + 3], array[i + 4]);
    					}
    					continue;
    				}
    				else break;
    			}
    
    			// 若出现同花顺,将遍历下标调到后面第5个
    			if (Num_org != Num)
    				i += 4;
    		}
    	}
    
    	return Num;
    }
    
    // 结构体同花顺函数:判断某人的牌里是否含有同花顺,返回同花顺个数
    int Find_cards(struct card *c, int len)
    {
    	int i, j, k, l, m;
    	int array[4][13];
    	j = k = l = m = 0;
    
    	// 把某人手中的牌按花色分开
    	for (i = 0; i < len; i++)
    	{
    		if (c[i].shape == 'S')
    		{
    			array[0][j] = c[i].value;
    			j++;
    		}
    		else if (c[i].shape == 'H')
    		{
    			array[1][k] = c[i].value;
    			k++;
    		}
    		else if (c[i].shape == 'C')
    		{
    			array[2][l] = c[i].value;
    			l++;
    		}
    		else
    		{
    			array[3][m] = c[i].value;
    			m++;
    		}
    	}
    
    	// 统计各花色中出现同花顺的个数
    	return Have_shunzi(array[0], j) + Have_shunzi(array[1], k) + Have_shunzi(array[2], l) + Have_shunzi(array[3], m);
    }
    
    
    
    // 主函数
    int main(void)
    {
    	int i;
    	int Num_tong_org = 0;
    	
    	// 定义一副牌
    	struct card new_cards[52];
    
    	// 初始化牌
    	Init_cards(new_cards);
    
    	// 循环发牌N次,统计出现同花顺的数据
    	for (i = 0; i < N; i++)
    	{
    		Num_tong_org = Num_tong;
    		Shuffle(new_cards, i);
    		Deal(new_cards);
    		Num_tong += Find_cards(a0, 13) + Find_cards(a1, 13) + Find_cards(a2, 13) + Find_cards(a3, 13);
    		if (Num_tong != Num_tong_org)
    			Num_tong_fa++;	
    	}
    
    	// 计算出现同花顺的发牌概率
    	P_tong = (double)Num_tong_fa / (double)N;
    
    	// 打印信息
    	printf("\n");
    	printf("每次发牌结果数据过多,以上为出现的同花顺情况。\n");
    	printf("\n");
    	printf("统计数据如下:\n");
    	printf("1、出现同花顺的总个数:   %d\n", Num_tong);
    	printf("2、出现同花顺的发牌次数: %d\n", Num_tong_fa);
    	printf("3、发牌总次数:           %d\n", N);
    	printf("4、出现同花顺的发牌概率: %f\n", P_tong);
    
    	return 0;
    }
    
    展开全文
  • RocketMQ

    万次阅读 多人点赞 2019-07-31 19:17:34
    这也是为什么,当MQ采用双Master集群方式时,如果向MQ发送100条消息,其中52条在BrokerA上,48条在BrokerB上。因为4条发给A,4条发给B…依次循环下去,最后4条是发给了A,所以A比B多存储了4条消息。 ###1.3.2、群组...

    此为博主(yjclsx)原创文章,如若转载请标明出处,谢谢!
    #一、RocketMQ简介
    ##1.1、介绍
    RocketMQ是一款分布式、队列模型的消息中间件,由Metaq3.X版本改名而来,RocketMQ并不遵循包括JMS规范在内的任何规范,但是参考了各种规范不同类产品的设计思想,自己有一套自定义的机制,简单来说就是使用订阅主题的方式去发送和接收任务,但是支持集群和广播两种消息模式。开源项目地址:https://github.com/apache/rocketmq
    具有以下特点:
    1、能够保证严格的消息顺序
    2、提供丰富的消息拉取模式
    3、高效的订阅者水平扩展能力
    4、实时的消息订阅机制
    5、亿级消息堆积能力
    选用理由:
    1、强调集群无单点,可扩展,任意一点高可用,水平可扩展。
    2、海量消息堆积能力,消息堆积后,写入低延迟。
    3、支持上万个队列。
    4、消息失败重试机制。
    5、消息可查询。
    6、开源社区活跃。
    7、成熟度(历经多次天猫双十一海量消息考验)
    ##1.2、专业术语
    1、Producer
    消息生产者,负责产生消息,一般由业务系统负责产生消息。
    2、Consumer
    消息消费者,负责消费消息,一般是后台系统负责异步消费。
    3、Push Consumer
    Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。
    4、Pull Consumer
    Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。
    5、Producer Group
    一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。
    6、Consumer Group
    一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。
    7、Broker
    消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。
    8、广播消费
    一条消息被多个 Consumer 消费,即使返些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。
    在 CORBA Notification 规范中,消费方式都属于广播消费。
    在 JMS 规范中,相当于 JMS publish/subscribe model
    9、集群消费
    一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。
    在 CORBA Notification 规范中,无此消费方式。
    在 JMS 规范中,JMS point-to-point model 与之类似,但是 RocketMQ 的集群消费功能大等于 PTP 模型。
    因为 RocketMQ 单个 Consumer Group 内的消费者类似于 PTP,但是一个 Topic/Queue 可以被多个 Consumer Group 消费。
    10、顺序消息
    消费消息的顺序要同収送消息的顺序一致,在 RocketMQ 中,主要挃的是尿部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序収送,丏収送到同一个队列,返样 Consumer 就可以挄照 Producer 发送的顺序去消费消息。
    11、普通顺序消息
    顺序消息的一种,正常情冴下可以保证完全的顺序消息,但是一旦収生通信异常,Broker 重启,由亍队列总数収生发化,哈希叏模后定位的队列会发化,产生短暂的消息顺序丌一致。如果业务能容忍在集群异常情冴(如某个 Broker 宕机戒者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
    12、严格顺序消息
    顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker 集群中只要有一台机器丌可用,则整个集群都丌可用,服务可用性大大降低。
    如果服务器部署为同步双写模式,此缺陷可通过备机自劢切换为主避免,丌过仍然会存在几分钟的服务丌可用。(依赖同步双写,主备自劢切换,自劢切换功能目前迓未实现)
    目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。
    13、Message Queue
    在 RocketMQ 中,所有消息队列都是持丽化,长度无限的数据结构,所谓长度无限是挃队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。
    也可以认为 Message Queue 是一个长度无限的数组,offset 就是下标。
    ##1.3、关键概念
    ###1.3.1、主题与标签
    主题Topic:第一级消息类型,书的标题;
    标签Tags:第二级消息类型,书的目录,可以基于Tag做简单的消息过滤,通常这已经可以满足90%的需求了,如果有更复杂的过滤场景,就需要使用rocketmq-filtersrv组件了。
    例如,主题是订单交易,那么标签可以是订单交易-创建、订单交易-付款、订单交易-完成。
    通过查看源码就可以发现:一个主题在MQ上默认会有4个Queue队列来存储该主题上的消息,Queue的数量也可以在创建主题时指定。这也是为什么,当MQ采用双Master集群方式时,如果向MQ发送100条消息,其中52条在BrokerA上,48条在BrokerB上。因为4条发给A,4条发给B…依次循环下去,最后4条是发给了A,所以A比B多存储了4条消息。
    ###1.3.2、群组
    这里写图片描述
    生产组:用于消息的发送的群组,官方推荐:一个生产组理应发送的是同一主题的消息,消息子类型再使用Tags来区分;
    消费组:用于消息的订阅处理的群组,官方推荐:一个消费组理应消费的是同一主题的消息,再使用Tags在Broker做消息过滤。
    生产组和消费组极大地方便了扩缩机器、增减处理能力等,同时只有群组名相同才会被认为是一个集群组的,RocketMQ默认情况下采用集群消费模式,所以消息每次只会随机的发给每个消费群组中的一员,这也体现了RocketMQ集群无单点、水平可扩展、任意一点高可用、支持负载均衡等特点。
    ##1.4、RocketMQ核心模块
    rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。
    rocketmq-client:提供发送、接受消息的客户端API。
    rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息。
    rocketmq-common:通用的一些类,方法,数据结构等。
    rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定义二进制协议。
    rocketmq-store:消息、索引存储等。
    rocketmq-filtersrv:消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!【一般而言,我们利用Tag足以满足大部分的过滤需求,如果更灵活更复杂的过滤需求,可以考虑filtersrv组件】。
    rocketmq-tools:命令行工具。
    #二、RocketMQ示例
    ##2.1、RocketMQ部署–双master方式
    可参考我的博文:“RocketMQ部署–双master方式”。
    ##2.2、HelloWorld示例
    ###2.2.1、生产者

    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    public class Producer {
    	public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    		//实例化生产者,实例化时需要指定生产组名
    		DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
    		//设置namesrc地址,有多个的话用";"隔开
    		producer.setNamesrvAddr("192.168.246.130:9876;192.168.246.131:9876");
    		//启动生产者
    		producer.start();
    		for(int i=1;i<=100;i++){
    			//创建一条消息,指定了消息的主题topic、标签tag、消息的内容
    			Message msg = new Message("TopicQuickStart", "TagA", ("Hello RocketMQ "+i).getBytes());
    			//发送消息
    			SendResult sendResult = producer.send(msg);
    			System.out.println(sendResult);
    		}
    		//关闭生产者,main方法主线程结束,程序终止
    		producer.shutdown();
    	}
    }
    

    ###2.2.2、消费者

    import java.util.List;
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    /**
     * Consumer,订阅消息
     */
    public class Consumer {
        public static void main(String[] args) throws InterruptedException, MQClientException {
        	//实例化消费者,实例化时需要指定消费组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
            //设置namesrc地址,有多个的话用";"隔开
            consumer.setNamesrvAddr("192.168.246.130:9876;192.168.246.131:9876");
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //设置每次消费的消息最大数量,默认是1,即一条条拉取
            consumer.setConsumeMessageBatchMaxSize(10);
            //设置订阅的消息主题topic和标签tags,这里订阅TopicQuickStart主题下的所有消息,所以会收到上面生产者发送的该主题下标签为TagA的消息
            consumer.subscribe("TopicQuickStart", "*");
            //注册消费监听
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                        ConsumeConcurrentlyContext context) {
                	//如果不设置每次消费的消息最大数量,这里的msgs里只会有一条
                	System.out.println("消息条数:"+msgs.size());
                	for(MessageExt msg : msgs){
                		System.out.println(Thread.currentThread().getName()+"收到消息:topic:"+msg.getTopic()+",tags:"+msg.getTags()+",msg:"+new String(msg.getBody()));
                	}
                	//回复RocketMQ,这条消息消费成功,如果返回的是ConsumeConcurrentlyStatus.RECONSUME_LATER,即表明消息消费失败,那RocketMQ会对这条消息进行重发操作
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
     //启动消费者,main方法主线程结束后,程序不会停止,进入阻塞状态,来一条消息就触发一次监听事件
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }
    

    可以执行多次上面消费者的main方法,也就是启动多个这样的消费者,因为在一个群组里,消息每次只会发送给群组里的一个成员,所以假设有100条消息,启动了两个同一群组的消费者,那么每个消费者各消费50条消息。可见,RocketMQ自动完成了相同群组下的消费者的负载均衡操作,而且如果想增减消费者,只需启动或者关闭消费者即可,无需任何配置,水平可扩展性好!
    如果要切换成广播消费模式,每个消费端都需进行下面的设置:
    consumer.setMessageModel(MessageModel.BROADCASTING);//设置为广播消费模式
    这样即使是同一个消费组的消费者,也都会收到订阅的所有消息,不会进行均衡消费。
    ##2.3、两类Consumer
    在RocketMQ里,Consumer分为两类:MQPullConsumer和MQPushConsumer。其实两种都是拉模式(pull),即Consumer轮询从broker拉取消息。
    push方式就是上面例子里的消费者,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumerMessage()来消费,对用户而言,感觉消息是被推送过来的。
    pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
    #三、消息重试
    ##3.1、生产端消息重试
    生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败,这种消息失败重试我们可以手动设置发送失败重试的次数。

    producer.setRetryTimesWhenSendFailed(3); //设置重试次数
    producer.send(msg, 1000); //发送消息,并设置消息发送超时时间
    

    上面的代码表示消息在1S内没有发送成功就会触发重试,重试最多3次。
    ##3.2、消费端消息重试
    消费端在收到消息并处理完成会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消费成功,如果返回了失败或者没返回就会触发重试,即MQ会把消息再发一遍。所以,发生消费端的消息重试有两种情况:1、返回了ConsumeConcurrentlyStatus.RECONSUME_LATER直接表明消费失败;2、长时间没有返回消息处理状态给MQ导致超时。
    消息重复消费
    值得注意的是,当一个消费组有多个消费者时,其中一个消费者处理消息后长时间没返回,那么MQ就会把这条消息进行重试,会发送给同一消费组的另外一个消费者进行消费。要是这时候之前的消费者又把消息处理结果返回了,那就出现了消息重复消费的问题。
    RocketMQ无法避免消息重复,如果业务对消息重复非常敏感,务必要在业务层面去重,这就要求我们一定要做好消费端幂等处理。比如每条消息都有一个唯一编号,每处理完一条消息就记录日志,当消息再来的时候判断一下本条消息是否处理过。需要注意的是,如果消费端处理消息后的结果保存在DB中,那记录日志的操作也一定要保存在这个DB中,这样才能保证事务,其中有一步失败了就会一起回滚。倘若把消息处理后的结果存在mysql里,日志却记录在redis中,然后每次消息再来的时候去redis中查看是否已经处理过,这样是错误的做法,本以为放redis里再去查询的时候速度快,可以提升性能,但是却导致事务的一致性无法保证(比如mysql操作成功了而redis操作失败了那怎么回滚呢),至少目前为止单靠spring的事务管理无法回滚两个数据源的操作,需要增加其他的组件,所以建议都在一个DB中操作。
    #四、集群
    推荐的几种 Broker 集群部署方式,这里的 Slave 不可写,但可读,类似于 Mysql 主备方式。当主节点挂了,就可以访问从节点来获取之前未消费的数据。但是因为Slave是只读的,所以不会接收生产者生产的新数据,新数据只会存储到其他的Broker主备节点上,直到宕机的主节点重新启动了才会接收新数据。至少截止到v3.2.4版本,RocketMQ还未能支持主备自动切换功能。
    ##4.1、单个 Master
    返种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用
    ##4.2、多 Master 模式
    一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
    优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。
    缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
    ##4.3、多 Master 多 Slave 模式,异步复制
    每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。
    优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
    缺点:Master 宕机、磁盘损坏等情况,会丢失少量消息。
    ##4.4、多 Master 多 Slave 模式,同步双写
    每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,主备都写成功,才会向应用返回成功。
    优点:数据与服务都无单点,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
    缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。
    #五、顺序消费
    普通模式下,使用传统的send发送消息即可,比如2.2里的示例代码,但是这种模式下不能保证消息消费顺序的一致性。假如我们在网购的时候,需要下单,那么下单需要有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成,也就是这个三个环节要有顺序,这个订单才有意义,这种场景下就需要顺序消费。
    世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!
    那通过RocketMQ怎么实现顺序消费的呢?
    答:需要顺序消费的消息在生成端必须发送到同一个主题的同一个队列中(一个主题默认4个队列),比如创建订单1、订单1付款,订单1完成这三条消息就需要在同一个队列中,创建订单2、订单2付款,订单2完成这三条消息也需要在同一队列中,但订单1和订单2的队列可以不是同一个队列。然后消费端消费时必须实现MessageListenerOrderly接口以保证一个队列只会被同一个消费端的一个线程所消费,因为队列先进先出的原则,就可以保证顺序消费了。
    比如有1个生产端和2个消费端,要保证顺序消费,示例代码如下:
    ##5.1、生产者

    public class Producer {  
        public static void main(String[] args) {  
            try {  
                DefaultMQProducer producer = new DefaultMQProducer("order_Producer");  
                producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
                producer.start();  
                for (int i = 1; i <= 5; i++) {  
      // 主题:TopicOrderTest,标签:order_1,KEY:"KEY" + i,消息内容:"order_1 " + i
                    Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());  
      // RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
    		// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
    		// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                            Integer id = (Integer) arg;  //arg就是producer.send方法的最后一个参数,这里是0
                            int index = id % mqs.size();  //队列数量没有事先设置那就是4,0%4=0
                            return mqs.get(index);  //返回下标为0的队列,即这5条消息存放在0号队列中
                        }  
                    }, 0);  
                    System.out.println(sendResult);  
                }  
                for (int i = 1; i <= 5; i++) {  
                    Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());  
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                            Integer id = (Integer) arg;  
                            int index = id % mqs.size();  
                            return mqs.get(index);  //返回下标为1的队列,即这5条消息存放在1号队列中
                        }  
                    }, 1);  
                    System.out.println(sendResult);  
                }  
                for (int i = 1; i <= 5; i++) {  
                    Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());  
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                            Integer id = (Integer) arg;  
                            int index = id % mqs.size();  
                            return mqs.get(index);  //返回下标为2的队列,即这5条消息存放在2号队列中
                        }  
                    }, 2);  
                    System.out.println(sendResult);  
                }  
                producer.shutdown();  
            } catch (MQClientException e) {  
                e.printStackTrace();  
            } catch (RemotingException e) {  
                e.printStackTrace();  
            } catch (MQBrokerException e) {  
                e.printStackTrace();  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }
    

    ##5.2、消费者1

    public class Consumer1 {    
        public static void main(String[] args) throws MQClientException {  
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
            consumer.subscribe("TopicOrderTest", "*");  
            /** 
             * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列 
      * 所以为了保证顺序消费,消费逻辑里不应该有多线程逻辑,比如通过线程池并发消费,这都是不允许的
             */  
            consumer.registerMessageListener(new MessageListenerOrderly() {  
                AtomicLong consumeTimes = new AtomicLong(0);  
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                    // 设置自动提交  
                    context.setAutoCommit(true);  
                    for (MessageExt msg : msgs) {  
                        System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                    }  
                    try {  
                        TimeUnit.SECONDS.sleep(5L);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                    return ConsumeOrderlyStatus.SUCCESS;  
                }  
            });  
            consumer.start();  
            System.out.println("Consumer1 Started.");  
        }  
    }
    

    ##5.3、消费者2

    public class Consumer2 {  
        public static void main(String[] args) throws MQClientException {  
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
            consumer.subscribe("TopicOrderTest", "*");  
            /** 
             * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列 
      * 所以为了保证顺序消费,消费逻辑里不应该有多线程逻辑,比如通过线程池并发消费,这都是不允许的
             */  
            consumer.registerMessageListener(new MessageListenerOrderly() {  
                AtomicLong consumeTimes = new AtomicLong(0);  
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                    // 设置自动提交  
                    context.setAutoCommit(true);  
                    for (MessageExt msg : msgs) {  
                        System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                    }  
                    try {  
                        TimeUnit.SECONDS.sleep(5L);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                    return ConsumeOrderlyStatus.SUCCESS;  
                }  
            });  
            consumer.start();  
            System.out.println("Consumer2 Started.");  
        }  
    }
    

    先启动Consumer1和Consumer2,然后启动Producer,Producer会发送15条消息。
    Consumer1消费情况如图,都按照顺序执行了
    这里写图片描述
    Consumer2消费情况如图,也都按照顺序执行了
    这里写图片描述
    #六、事务消费
    考虑生活中的场景:我们去北京庆丰包子铺吃炒肝,先去营业员那里付款(Action1),拿到小票(Ticket),然后去取餐窗口排队拿炒肝(Action2)。思考2个问题:第一,为什么不在付款的同时,给顾客炒肝?如果这样的话,会增加处理时间,使得后面的顾客等待时间变长,相当于降低了接待顾客的能力(降低了系统的QPS)。第二,付了款,拿到的是Ticket,顾客为什么会接受?从心理上说,顾客相信Ticket会兑现炒肝。事实上也是如此,就算在最后炒肝没了,或者断电断水(系统出现异常),顾客依然可以通过Ticket进行退款操作,这样都不会有什么损失!(虽然这么说,但是实际上包子铺最大化了它的利益,如果炒肝真的没了,浪费了顾客的时间,不过顾客顶多发发牢骚,最后接受)
    生活已经告诉我们处理分布式事务,保证数据最终一致性的思路!这个Ticket(凭证)其实就是消息!
    通过RocketMQ可以实现分布式事务,比如银行A向银行B转账,银行A扣款1000,那银行B一定要加1000才行,通过RocketMQ的执行逻辑如下:
    这里写图片描述
    如上图所示,消息数据独立存储,业务和消息解耦,实质上消息的发送有2次,一条是转账消息,另一条是确认消息。发送转账消息后,消息在MQ的状态是prepared,这时消费者还无法收到这条消息,需等生产者这边的本地事务执行完并发送确认消息后,才能收到这条消息。
    到这里,我们先来看看基于RocketMQ的代码:
    ##6.1、消费者

    public class Consumer {  
        public static void main(String[] args) throws InterruptedException, MQClientException {  
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer");  
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
            consumer.setConsumeMessageBatchMaxSize(10);  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
            consumer.subscribe("TopicTransactionTest", "*");  
            consumer.registerMessageListener(new MessageListenerConcurrently() {  
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
                    try {  
                        for (MessageExt msg : msgs) {  
                            System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                        }  
                    } catch (Exception e) {  
                        e.printStackTrace();  
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试  
                    }  
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
                }  
            }); 
            consumer.start(); 
            System.out.println("transaction_Consumer Started.");  
        }  
    }
    

    ##6.2、生产者
    ###6.2.1、生产者

    public class Producer {  
        public static void main(String[] args) throws MQClientException, InterruptedException {  
            TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();  
            TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer");  
            producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
            // 事务回查最小并发数  
            producer.setCheckThreadPoolMinSize(2);  
            // 事务回查最大并发数  
            producer.setCheckThreadPoolMaxSize(2);  
            // 队列数  
            producer.setCheckRequestHoldMax(2000);  
            producer.setTransactionCheckListener(transactionCheckListener);  
            producer.start();   
            TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();  
            for (int i = 1; i <= 2; i++) {  
                try {  
                    Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i,  
                            ("Hello RocketMQ " + i).getBytes());  
      //发送消息后,消息在MQ的状态是prepared,这时消费者还无法收到这条消息,需等生产者这边的本地事务执行完并发送确认消息后,才能收到这条消息
                    SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);  
                    System.out.println(sendResult);  
                    Thread.sleep(10);  
                } catch (MQClientException e) {  
                    e.printStackTrace();  
                }  
            }  
            for (int i = 0; i < 100000; i++) {  
                Thread.sleep(1000);  
            }  
            producer.shutdown();  
        }  
    }
    

    ###6.2.2、执行本地事务
    TransactionExecuterImpl类用于执行本地事务如下:

    public class TransactionExecuterImpl implements LocalTransactionExecuter {  
        public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {  
            System.out.println("执行本地事务msg = " + new String(msg.getBody()));  
            System.out.println("执行本地事务arg = " + arg);  
            String tags = msg.getTags();  
            if (tags.equals("transaction2")) {  
                System.out.println("======我的操作============,失败了  -进行ROLLBACK");  
                return LocalTransactionState.ROLLBACK_MESSAGE;  //返回失败并发送回滚消息
            }  
            return LocalTransactionState.COMMIT_MESSAGE;  //返回成功并发送确认消息
            // return LocalTransactionState.UNKNOW;  
        }  
    }
    

    ###6.2.3、针对未决事务,MQ服务器回查客户端
    如果因网络问题最后发送确认消息给MQ失败了或者发送了LocalTransactionState.UNKNOW,那事务就一直没能完成,一直处于prepared状态,针对未决事务,MQ服务器会回查客户端看看到底有没有完成(目前已经被阉割啦),这时会调用TransactionCheckListener接口,所以TransactionCheckListenerImpl类实现了这个接口用于回查,代码如下:

    public class TransactionCheckListenerImpl implements TransactionCheckListener {  
        //在这里,我们可以根据由MQ回传的key去数据库查询,这条数据到底是成功了还是失败了。  
        public LocalTransactionState checkLocalTransactionState(MessageExt msg) {  
            System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString()));  
            // return LocalTransactionState.ROLLBACK_MESSAGE;  
            return LocalTransactionState.COMMIT_MESSAGE;  
            // return LocalTransactionState.UNKNOW;  
        }  
    }
    

    producer端发送数据到MQ,并且处理本地事物,这里模拟了一个成功一个失败。Consumer只会接收到本地事物成功的数据,第二个数据失败了,不会被消费。
    因为MQ回查客户端的功能被阿里去除了,导致即使返回了LocalTransactionState.UNKNOW,TransactionCheckListenerImpl里的代码也不会被触发,所以目前事务回查这部分需要自己设计实现。
    #七、参考文章
    RocketMQ重点原理讲解:https://www.jianshu.com/p/453c6e7ff81c

    此为博主(yjclsx)原创文章,如若转载请标明出处,谢谢!

    展开全文
  • 接口测试之发包工具介绍

    千次阅读 2017-10-20 16:13:35
     Poster为Firefox浏览器的一个插件,主要用来模拟并HTTP请求。随着Chrome浏览器的流行,它也出了chrome版本:Chrome Poster  在Fiefox浏览器中的安装非常简单。首先,打开Fiefox浏览器,菜单栏“工具”--> ...

    1、Poster

         PosterFirefox浏览器的一个插件,主要用来模拟发并HTTP请求。随着Chrome浏览器的流行,它也出了chrome版本:Chrome Poster

      在Fiefox浏览器中的安装非常简单。首先,打开Fiefox浏览器,菜单栏“工具”--> “添加组件”,搜索“poster”,在搜索例表中点击“安装”,然后重启浏览器即可。

      打开方法:菜单栏“工具”--> Poster”。如下图

     

     

    发送GET请求:http://127.0.0.1:8000/polls/questions

     

      发送POST请求:http://127.0.0.1:8000/polls/question_vote/

      在Parameters 标签中添加post请求的参数,“Name”为参数名,“Value”为参数值。然后点击“Add/Change”按钮添加。

      然后,切换到Content to Send”标签页,点击“Body from Parameters”按钮,添加post请求的参数。然后,点击“POST”按钮,发送post请求。



    2、HttpRequester

      火狐浏览器的一个插件,用法同Poster

    3、Fiddler

      composer面板下,则可以模拟向相应的服务器发送数据的过程(不错,这就是灌水机器人的基本原理,也可以是部分http flood的一种方式)。

    也可以粘贴一次请求的raw http headers,达到模拟请求的目的:


    4、soapUI

    1>soapUI简介

          soapUI是一个开源测试工具,通过soap/http来检查、调用、实现Web Service的功能/负载/符合性测试。该工具既可作为一个单独的测试软件使用,也可利用插件集成到Eclipse,maven2.X,Netbeans 和intellij中使用。soapUI pro是soapUI的商业非开源版本,实现的功能较开源的soapUI更多。

    2>soapUI初体验

    1、soapUI安装及破解(参考:http://blog.csdn.net/liuchangxin1982/article/details/47614625)

    1)安装方法

    先下载SOAPUI,我这里用的是5.1.2 PRO  版

    下载路径:http://dl.eviware.com/list_soapui2.html?_ga=1.16http://dl.eviware.com/list_soapui2.html?_ga=1.162568764.18578086.14016161482568764.18578086.1401616148

    双击安装文件

     

     

     

     

     

     

     

     

     安装之后需要破解才能用

    2)

    下载破解包,链接:http://pan.baidu.com/s/1nvuqAqX 密码:axgc

    这两个文件能破解5.1.2的SoapUI 的Pro版本,mac 和 windows均可。

    1、拷贝Protection-4.6.jar到soapui安装目录下的lib目录下替换原来的文件

    2、运行soapui安装目录下的程序bin\soapui-pro.bat

    出现如下画面先按取消

     

     

    出现这种界面说明破解成功

    2、soapUI界面介绍

    3、创建项目

    这里创建REST服务

                        

    3>soapUI测试用例

    1、测试数据单一

    1)新建测试套件:

             

    增加断言:

     

     

     

     

    运行结果:

    2、测试数据来源于excel:(参考:http://www.cnblogs.com/heiyexiaoguai/p/4852334.html)

    1)创建测试数据文件:testaccept.xlsx

    2)选择测试步骤,添加【Datasource】

    3)添加请求:

     

    4)添加DataSource Loop,又来控制循环取数

                

    5)增加断言:

    6)运行结果

     

    我们看到测试用例执行失败后,后面的将不会执行,而我们希望即使失败也要执行所有的测试用例,我们做如下设置:取消Abor test if an fasiled TestSteps前面的钩,再运行:

            

    7)查看测试结果信息:

    8)导出测试报告

     

     4>soapUI性能测试

    1、创建性能测试用例:

         

    2)点击运行,运行结果如下:

    3)参数设置说明:

    4)生成性能测试统计图


     
    展开全文
  • TCP三次握手详解-深入浅出(有图实例演示)

    万次阅读 多人点赞 2018-08-08 21:13:48
    半连接队列(syn queue)最大值 /proc/sys/net/ipv4/tcp_max_syn_backlog SYN flood攻击 攻击方的客户端只发送SYN分节给服务器,然后对服务器回来的SYN+ACK什么也不做,直接忽略掉, 不发送ACK给服务器;...
  • 版流程及对外版本规范

    万次阅读 2018-06-06 18:41:05
    每次的版内部版本应先于公开release的版本。公开release的版本理论上应有至少半个月的缓冲。对每次公开的版本经过仿佛测试后,方可发出给用户。 二、BUG追踪 Java层,C层,内存泄露。
 针对这三种类型...
  • 图像算法---头发检测算法研究

    万次阅读 2015-10-27 18:18:37
    本文介绍一种简单,效果不错的色检测算法,并分享完整DEMO下载,希望大家喜欢!
  • 什么叫同步,什么叫异步?

    千次阅读 2019-03-08 09:44:53
    异步就是发送信息后,发送者发送信息后,就不再管,发送者并不管接收者在不在线 ----------短信就是异步 同步就是发送者与接收者同步交流,一方退出,另一方也随之中断通信。。--------------如打电话。 定义...
  • Oracle EBS 中直订单Drop Ship流程的系统操作记录应用场景:A公司向客户B销售产品,但是自己不生产该产品,而是向供应商C来采购,并且通常是要供应商C直接把货到B客户处,属于贸易型企业经常用到的业务流程,...
  • win10自带邮箱件箱为空

    千次阅读 2020-10-18 00:37:13
    件箱 win10 自带邮箱,登录 QQ 邮箱,密码正确,收件箱能够显示邮件列表;件箱空空如也。 已发送邮件 原来已发送的邮件在【已发送邮件】里面 ???? 所以,件箱可以理解为是草稿箱。 参考 ...
  • websocket 同步数据 、异步数据

    万次阅读 2016-09-22 17:57:06
    websoekt 异步发送信息
  • JAVA上百实例源码以及开源项目

    千次下载 热门讨论 2016-01-03 17:37:40
     Java二进制IO类与文件复制操作实例,好像是一本书的例子,源代码有的是独立运行的,与目录下的其它代码文件互不联系,这些代码面向初级、中级Java程序员。 Java访问权限控制源代码 1个目标文件 摘要:Java源码,...
  • (同时有多个请求) 2、什么是超?(其实就是在高发场景下产生的数据读取错误) 3、高并发与多线程的关系? 4、现在接触过的并发与多线程。 一、大规模并发带来的挑战 在过去的工作中,我曾经面对过5w每秒的高并发...
  • 金蝶K3修改物料默认仓库,同步修正BOM默认料仓库
  • 微信朋友圈测试用例

    千次阅读 2020-07-10 11:41:40
    安全: 1、发送朋友圈时,文本输入脚本代码,是否出现异常 界面: 1、是否是显示朋友圈的人的昵称、头像、以及具体内容 2、是否按照朋友圈的时间距离现在远近来排序 3、图片显示是否正确 4、是否显示自己的个人...
  • 老听公司的老哥们说MFC基于消息机制什么的巴拉巴拉一大堆,实际上自己并没有真真用过,每次看讲解什么的也是一知半懂,像我这种半路出家的,不遇到实际问题根本就搞不懂.由于目前做到公司的项目,按照需求,需要用到消息...
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • java发送邮件的两种通用方法 一、 本文讲解的是基于smtp协议,发送...不过不了解也没关系,只需要知道,smtp协议存在一个安全漏洞,就是smtp协议允许你两次设置件人和收件人信息。第一次发送命令行mail from:...
  • 需要用到的jar包 ... 做这个功能是因为我女朋友每个月都需要手动去几十个人的考勤、考核邮件,实在是太过重复的做一件很乏味的事情,所以才有了这个程序,不过,界面是使用的控制台,简单一点。 核心代码展示
  • centos7配置syslog日志外

    万次阅读 2019-01-09 15:09:11
    在rsyslog.conf文件下,进行配置  vi /etc/rsyslog.conf   如下图,在最下面位置,修改成syslog服务器的IP和端口即可。
  • 微信封号推送卡包消息方法揭秘

    千次阅读 2019-06-14 01:14:27
    众所周知,微信封号之后是不能消息的,这就大大的影响了正常的网络沟通,在诸多微信被封人群中有各色各样的职业,特别是微商封号或者永久封号联系不到客户都是极大损失;所谓树跟被断、鱼鳞被揭、微信被封作为由古...
  • 信息量为什么要表示成对数的形式

    万次阅读 多人点赞 2018-01-18 21:06:20
    近期在路上进行了不少的思考,任何方面,任何领域…我会把这些记录在手机的备忘录里,然后在周末总结出来,早就成了习惯。   近日对信息论,排队论以及贝叶斯定理...有人问一件事发生后所携带的信息量为什么要表示成
  • tiktok海外抖音视频0播放怎么办?

    千次阅读 2020-10-22 01:22:04
    tiktok海外抖音视频0播放怎么办? tiktok海外抖音视频0播放怎么办 首先没有播放量,我们需要自检一下,自己的账号是否存在被关了小黑屋,如果注册下来就被关了小黑屋,在怎么发布视频都是没有播放量。自检账号...
  • 轻松实现钉钉机器人定时消息

    千次阅读 2020-01-03 17:01:57
    一. 准备工作 1.钉钉群; 2.Linux主机 二. 开始 1.添加钉钉机器人 在电脑上登录钉钉客户端后,点击左上角自己的头像,出现下图,点击 [机器人管理],[选择要添加的机器人]-[自定义] ...yum ins...
  • 什么多个客户端可以连接服务器的个端口?

    万次阅读 多人点赞 2017-09-07 18:03:44
    平时我们使用ServerSocket...为什么可以这样呢?操作系统的进程在个端口的多个连接是如何进行分辨的?译文我们这里讲Socket连接: 1. 端口只是一个数字辨识,不是真正的物理端口; 2. 一个Socket连接的主键(即不同
  • 【杂记】000-Outlook发送邮件后在件箱出现两次 【杂记】000-Outlook发送邮件后在件箱出现两次 1、现象 2、解决方法 1、现象   使用Outlook 2016配置好邮箱账号之后,能够成功收发邮件,但所有的已...
  • 同级部门之间的文件叫什么?

    千次阅读 2017-06-01 16:22:00
    推荐于2016-09-24 21:31:16 最佳答案   同级部门之间的文件叫——平行文,特别的,函也可用于同级部门之间使用。按照行文关系、文件去向,可分为上行文、平行文、下行文: 上行文:指下级机关向所属上级机关...
  • 微信公众号红包开发教程

    千次阅读 2018-07-31 11:38:32
    红包 demo 下载 一、开通现金红包权限 在使用现金红包之前,请前往开通现金红包功能。操作路径:【登录微信支付商户平台——&gt;产品中心——&gt;现金红包——&gt;开通】。 二、下载API证书 商户...
  • 一提及程序员,人们第一时间想到的就是智商高薪资高,仿佛是高技术高薪资的代名词,还是一个不断进步的职业,可能正是由于发展太快的关系,程序员并不能在一个岗位待得太久,于是这个职业还成为了跳槽频繁的代名词...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 645,540
精华内容 258,216
关键字:

发什么同什么