精华内容
下载资源
问答
  • 阿里云消息队列服务(MQS) ——入门指南 带目录 MQS 是一种高效、可靠、安全、便捷、可弹性扩展的分布式消息队列服务。MQS 能够帮助应用开发者在他们应用的分布式组件上自由的传递数据,构建松耦合系统。
  • 在进入关于REUSEPORT的讨论之前,先看一张图,下图描述了单队列服务和多队列服务的区别: 单队列典型实例:银行业务叫号,机场火车站出排队坐租车等。 多队列典型实例:超市排队结账,医院挂号收费等。 到底哪一...

    在进入关于REUSEPORT的讨论之前,先看一张图,下图描述了单队列服务和多队列服务的区别:
    在这里插入图片描述

    • 单队列典型实例:银行业务叫号,机场火车站出排队坐租车等。
    • 多队列典型实例:超市排队结账,医院挂号收费等。

    到底哪一种好呢?很多人估计会想当然选择多队列,但我个人更倾向于单队列。

    在计算机领域,人们似乎总是倾向于并行化,这背后似乎有着对同步锁的笃恨,比方说,只要你把一个数据结构设计成单链表,那么肯定会有一大堆人跳出来说你这个在多线程环境会很大的锁开销。

    当多线程操作同一数据结构时,饱受诟病的就是它的同步开销。

    事实确实如此,但不能人云亦云。

    同步和锁固然可恨,但这并不是全部,由于近20年来并行多处理器编程的概念被炒作的非常火,以至于很多人本末倒置,忘记了多处理模型中的大头其实并非同步和锁,而是排队模型本身。跳出计算机的圈子试试看。

    当然,在操作系统领域,同步和锁争用的话题是非常重要的一啪,同步和锁的问题是单处理器进化到多处理器过程中必须要解决的一个问题。

    我们回忆一下早期传统的最简单的服务器编程模型:

    sd = socket();
    bind(sd);
    listen(sd);
    while (i++ < NUM) {
    	if (fork() == 0) {
    		while (true) {
    			cd = accept(sd);
    			do_work();
    			close(cd);
    		}
    	}
    }
    wait();
    

    fork出多个进程来处理同一个socket。这是一个典型的 “单队列多服务台排队模型”。 基于它的种种不足,select,poll,epoll被设计了出来。但事情直到REUSEPORT被引入时才发生了本质的变化。

    REUSEPORT允许让多个独立的socket绑定完全相同的IP+Port,这在事实上将socket处理本身变成了 “多队列多服务台排队模型”

    有了REUSEPORT之后,人们欢呼着纷纷引用。人们认为, *REUSEPORT终于把操作系统协议栈中socket队列的那一把锁解了!

    然而,REUSEPORT真的百利无一弊吗?并不是!看看上图中单队列和多队列的对比,REUSEPORT可能更大概率造成客户端延时分布的发散:

    • 对于单独客户端而言,它更容易遭遇到REUSEPORT单独socket的忙碌,被动排队。

    是不是这样呢?

    在继续之前,我不得不事先声明:

    看完本文后,很多人第一反应就是这low爆了,都2908年了,竟然不用epoll!其实我是故意不用epoll的,我的目标不是演示一个高性能服务器应该如何设计(其实我也不会),而是为了揭露REUSEPORT的一个问题,当然是要用最最最简单的方式了。

    下面的实验比对了单队列和多队列对客户端延迟的影响。先看简单的单队列服务器,这是一个简单的echo服务器:

    #!/usr/bin/python
    # single.py
    
    import os
    import socket
    import time
    import random
    
    sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sd.bind(('127.0.0.1', 1234))
    sd.listen(100)
    for i in range(4):
        if os.fork () == 0:
            while True:
                cd, addr = sd.accept()
                print('receive data at', str(i))
                data = cd.recv(1024);
                # 制造概率性忙碌
                if random.randint(1,3) == 1:
                    time.sleep(2) 
                cd.sendall(data + ' ' + str(i))
                cd.close()
    os.wait()
    

    下面代码做同样的事情,只不过它是REUSEPORT多队列版本:

    #!/usr/bin/python
    # multi.py
    
    import os
    import socket
    import time
    import random
    
    for i in range(4):
        if os.fork () == 0:
            sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
            sd.bind(('127.0.0.1', 1234))
            sd.listen(100)
            while True:
                cd, addr = sd.accept()
                data = cd.recv(1024);
                print('receive data at', str(i))
                if random.randint(1,3) == 1:
                    time.sleep(2)
                cd.sendall(data + ' ' + str(i))
                cd.close()
    os.wait()
    

    下面是一个客户端,它同时产生100个线程连接服务器,在获得回应后打印消耗时长,并且在最后打印总消耗时长:

    #!/usr/bin/python
    
    import socket
    import time
    import threading
    
    HOST = '127.0.0.1'
    PORT = 1234
    
    def do_echo():
        sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        l_start = time.time()
        sd.connect((HOST, PORT))
        sd.sendall(b'Hello, world')
        data = sd.recv(1024)
        l_end = time.time()
    
        print str(l_end - l_start) + ' Sec'
    
    threads = []
    for i in range(0, 100):
        t = threading.Thread(target = do_echo)
        threads.append(t)
    
    t_start = time.time()
    for t in threads:
        t.start()
    
    for t in threads:
        t.join()
    t_end = time.time()
    print 'cost ' + str(t_end - t_start) + ' Sec'
    

    OK,现在让我们测试一下,首先看单队列版本的表现:

    0.000965118408203 Sec
    0.00181603431702 Sec
    0.00233006477356 Sec
    0.00104022026062 Sec
    0.00102591514587 Sec
    0.00202798843384 Sec
    0.000540971755981 Sec
    0.00217485427856 Sec
    0.000724077224731 Sec
    0.00132298469543 Sec
    0.00124096870422 Sec
    0.00100612640381 Sec
    0.00180292129517 Sec
    0.00173401832581 Sec
    ...
    10.0033230782 Sec
    12.0018620491 Sec
    12.0039548874 Sec
    12.0068860054 Sec
    12.0079398155 Sec
    cost 12.0472490788 Sec
    

    再看REUSEPORT版本服务器的表现:

    0.000521898269653 Sec
    0.00278997421265 Sec
    0.00123596191406 Sec
    0.00226593017578 Sec
    0.00240898132324 Sec
    0.00142693519592 Sec
    0.00168108940125 Sec
    0.00109219551086 Sec
    2.00345492363 Sec
    2.00189590454 Sec
    2.00391507149 Sec
    ...
    17.9960510731 Sec
    18.001486063 Sec
    18.01060009 Sec
    18.0108709335 Sec
    18.0199809074 Sec
    18.0170469284 Sec
    18.0191478729 Sec
    18.0191700459 Sec
    18.0200259686 Sec
    18.0203499794 Sec
    18.0164711475 Sec
    cost 18.0505480766 Sec
    

    多测试几次,取平均,我们发现REUSEPORT版本作为服务器时,客户端的延时分布在一个更广的范围内,从上面的多队列的描述可以看出,这应该是客户端由于 “正好撞到” 被延迟处理的队列中,造成了积累排队延时导致。

    为了更加形象地说明问题,我们把问题放大。

    把两类服务器中的下面代码进行修改:

    if random.randint(1,3) == 1:
    

    修改为:

    if i == 1:
    

    这样我们固定第一个处理进程是延迟的,然后观察客户端的表现,先看单队列版本的:

    ...
    0.000363111495972 Sec
    0.000207901000977 Sec
    2.00301790237 Sec
    cost 2.0051279068 Sec
    

    合乎逻辑,毕竟只有一个进程忙碌时,其它所有的进程均可接管处理任何队列中的请求,总耗时几乎就是 那一个 碰巧被延迟的请求处理时间。下面我们看REUSEPORT版本的表现:

    ...
    0.000730991363525 Sec
    0.00215291976929 Sec
    0.000869989395142 Sec
    0.000274896621704 Sec
    0.000379800796509 Sec
    # 从这里往下,我们可以一眼看出所谓的积累延迟效应,以下所有的请求都很不幸被dispatch到了1号进程。
    2.00326395035 Sec
    4.00580787659 Sec
    6.00063920021 Sec
    7.99950909615 Sec
    9.99837803841 Sec
    12.0005970001 Sec
    14.0024950504 Sec
    15.996434927 Sec
    17.9980938435 Sec
    19.9969918728 Sec
    21.997202158 Sec
    23.9789199829 Sec
    25.9731481075 Sec
    27.9732391834 Sec
    29.9746050835 Sec
    31.9765660763 Sec
    33.978415966 Sec
    35.9799618721 Sec
    37.9760642052 Sec
    cost 38.0491991043 Sec
    

    事情到了这里,你可能已经误会了我,以为我要开历史的倒车,摒弃REUSEPORT了,但其实这并非我要表达的意思。

    任何成功的东西都是妥协的结果,为REUSEPORT的socket准备一个standby进程就好了:

    #!/usr/bin/python
    
    import os
    import socket
    import time
    
    for i in range(4):
        if os.fork () == 0:
            sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
            sd.bind(('127.0.0.1', 1234))
            sd.listen(10)
            if os.fork () == 0:
                while True:
                    cd, addr = sd.accept()
                    data = cd.recv(1024);
                    print('receive data at standby', str(i))
                    cd.sendall(data + ' ' + str(i))
                    cd.close()
    
            while True:
                cd, addr = sd.accept()
                data = cd.recv(1024);
                print('receive data at', str(i))
                if i == 1:
                    time.sleep(2)
                cd.sendall(data + ' ' + str(i))
                cd.close()
    os.wait()
    

    这样一来,每一个REUSEPORT的socket就有两个处理进程了,一个进程不幸忙碌阻塞队列的时候,另一个就可以替补,当然了,也不一定是两个,多个也可以,要看服务器的资源以及忙碌进程阻塞队列的概率了。

    分享一篇关于单队列和多队列的文章:
    https://www.irisys.net/queue-management-blog/single-vs.-multiple-queues-which-one-is-best-for-you-

    经理湿了皮鞋,连绵暴雨,经理或为鱼鳖。


    浙江温州皮鞋湿,下雨进水不会胖。

    展开全文
  • 主要介绍了基于golang的简单分布式延时队列服务的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • 之前已经成功启动队列服务的用户 查找queue服务进程ID,SSH命令: ps -ef | grep -v grep | grep queue/listen 执行SSH kill命令结束进程: kill 进程ID 再按官方教程复制启动队列服务的命令,正常启用就可以...

    报此错误一般是已经启动过队列,重启失败

    之前已经成功启动队列服务的用户
    查找queue服务进程ID,SSH命令:

    ps -ef | grep -v grep | grep queue/listen

    执行SSH kill命令结束进程:

    kill 进程ID

    再按官方教程复制启动队列服务的命令,正常启用就可以了
     

    展开全文
  • 消息队列服务

    千次阅读 2013-12-13 15:18:59
    整理的OSChina第37期高手问答——消息队列服务,嘉宾为 @shaneyuan 。 @shaneyuan 现就职于广州 UC 公司,是 UCMQ 的作者,以下简称SY。 UCMQ是一款轻量的HTTP协议级消息队列服务组件,项目的最初原型来自 ...

    整理的OSChina第37期高手问答——消息队列服务,嘉宾为 @shaneyuan 。

    @shaneyuan 现就职于广州 UC 公司,是 UCMQ 的作者,以下简称SY。

    UCMQ是一款轻量的HTTP协议级消息队列服务组件,项目的最初原型来自 @张宴 的HTTPSQS

    基本特性:

    • 支持标准的HTTP协议( GET/POST方法),支持长连接(keep-alive);
    • 请求响应非常快速,入队列、出队列速度超过10000次/秒;
    • 每个UCMQ实例支持多队列,队列通过操作接口自动创建;
    • 单个队列默认限制存储100w未读消息,可以不限制(非必要建议限制);
    • 可以在不停止服务的情况下便捷地修改单个队列的属性(大小限制/只读锁/延时队列/同步频率);
    • 可以实时查看队列属性(入队列数量、出队列数量、未读消息数量、消息积压数量)。
    • 每个队列有独立的数据文件易管理易搬迁。
    • 项目地址:https://github.com/ucweb/ucmq

    • 使用指南:http://ucweb.github.io/ucmq_guide/

    消息队列简介及应用场景相关

    消息队列(Message Queue):把消息按照产生的次序加入队列,而由另外的处理程序/模块将其从队列中取出,并加以处理;从而形成了一个基本的消息队列。使用消息队列可以很好地将任务以异步的方式进行处理,或者进行数据传送和存储等。例如,当你频繁地向数据库中插入数据、频繁地向搜索引擎提交数据,就可采取消息队列来异步插入。另外,还可以将较慢/较复杂的处理逻辑、有并发数量限制的处理逻辑,通过消息队列放在后台处理。

    常规的使用场景短信服务、电子邮件服务图片处理服务、好友动态推送服务等。

    协议相关:

    使用HTTP协议单纯考虑了跨语言/跨平台和易接入。我个人觉得UCMQ的性能不需要太过担忧,大家可以看相关测试数据:http://tech.uc.cn/?p=1344 前提是UCMQ只是一个单进程单线程的服务。如果实在对性能有极高要求可以部署多个实例。

    周边方案对比

    1.UCMQ比起其它消息组件的长处和短处是什么。

    SY:与其他消息队列相比,延续了:协议通用性好/性能高;扩展了:易用性/数据安全性高/内存消耗小(数据缓存随读写位置移动)/易搬迁(每个队列数据独立)/易维护(轻量级)/监控简单(可实时获取“所有/单独”队列状态信息)/添加了特色的队列服务(延时队列/队列写锁等)。

    主要有价值的问答整理如下:

    1、单个UCMQ无法满足性能要求怎么办?

    SY:如果单个实例已经到达性能瓶颈,建议部署多个实例客户端已实现负载均衡机制(轮询或随机)。

    2、一般消息队列里储存的数据格式是json吗?  消费消息的时候, 从队列中取出数据, 做相应的业务处理, 请问你们是使用crontab来做定时还是nohup呢?消费队列这块想多了解点。

    SY:消息队列中的数据可以是任意格式,对于业务来说json是个不错的选择。消费者模块可以处理完后即时再去读取队列中的新消息,如果队列取空后服务端会返回特殊的标识,消费者模块通过识别该标识休眠读取线程,建议使用定期休眠机制(如:100ms)。

    3、 任务分发策略,有订阅功能吗?

    SY:UCMQ不支持订阅功能,业务不分发。相对于gearman UCMQ没有同步操作(即:生产者将消息写入队列后,队列将触发消费者来读取消息),在UCMQ中读写队列都是由客户端(包括:生产者和消费者)主动发起的,所以不是由消息队列分发。

    4、发生异常时的处理流程是怎样?

    SY:如果消息生产者成功的将消息写入队列后,该消息一直有效直到被消费者成功读取或者消息体被损坏。取出后的消息将不可再访问。

    5、UCMQ是怎么减少IO消耗提高读写性能的?

    SY:读写位置的数据都是缓存在内存中的,并随读写位置移动而移动。

    6、如果不手工清理的情况下,数据量级变大后会不会对系统产生性能的影响?数据是如何进行清理的?

    SY:后台数据存储是分文件存储的,已读完的数据文件将被清理,所以不会消耗存储资源。从存储设计http://tech.uc.cn/?p=1344)了解到只缓存当前读和写的数据文件,性能不随数据量增大而下降的。

    7、我曾短暂地使用过gearman的队列服务。
    以下疑问请解答:
    1.  任务分发策略(如何选择worker)
    简单读了一下您github上的代码,是不是说您的队列入队时,就是按先后入队,然后mgr去等待所有已经注册的work的请求,然后按请求顺序理出 队?或 是其它的算法策略?

    2.  发生异常时的处理流程
    如果work或mgr服务在运行中,出现异常,那异常前正在执行的任务将如何容错和重置?

    3.  如何保证队列中的每一个任务,可以正常的执行完成
    一般来说,您的见解是一个任务分发出去之后就被消费掉了(一次执行),如果执行时异常导至任务未能正常执行完,任务将消失?还是采用什么策略保 障任务消费是可以正常完成的?是由消息队列服务来监控保障还是采用日志处理方式,由开发者后期自行重置触发呢?

    4. 您提到了,您的队列是记录在文件系统中的,我是否可以理解,是保存在了hash后的目录的文本文件中?如果队列任务是较大并发的项目或系统中, 是否会因为这块的IO瓶颈最后导致队列服务低效呢影响整体服务呢?如果存在这种可能性,您在设计时,是否有相应的解决方法呢?比如混合使用FS和 基于Ram的NoSQL或是自己实现一个基于内存的可持久化的存储形式。

    SY:1:从MQ自身来说,出队列是按照入队列的先后顺序的--保证时序性是MQ的一个基本要求。

    2:如果异常前数据已经到达MQ,或者尚未从MQ中取出,那么数据将持续保持有效,异常恢复后可以继续正常使用。如果异常时数据尚未到达MQ,或者已经从MQ中取出,则该条数据会有丢失的可能(具体情况看各自的客户端的异常处理机制是否完善)。

    3:是的。当任务从MQ中取出后,其执行的正确性、完整性、安全性由取出者保证。

    4:任何需要做持久化的产品,最终瓶颈都逃不过磁盘io的限制。我们能做的是,根据木桶原理,确保系统中不出线其它比磁盘io更短的短板。如果确实需要高性能的同时提供持久化和安全性的保障,那么可以考虑使用ssd硬盘--实测表明性能提升相当明显。至于纯内存的MQ,我们不排除后续版本中增加的可能性。但是混合使用纯内存和持久化的话,会使使用者无法确保当前到底用的是纯内存模式还是持久化模式--这样,在使用者看来,这样的MQ既无法时刻保证安全性,也无法时刻提供高性能,所以这样的MQ是不可信赖的(不可信赖的产品,在稍微重要的场合下,基本上就等于是不可用的)。


    8、 消息队列只用过beanstalkd, 不知道和beanstalkd想比,有什么差异?能否做多像beanstalkd那样启动多个daemon客户端挂在并行等待处理消息?有没有实例可以展示下比如发送邮件,推送动态等的应用?
    SY:你提到的beanstalkd据我的了解他是内存式的,数据不会持久化的。我那么回答也是表明其存储上的差异。对beanstalkd了解不深其他方面暂时做不了评价/对比。UCMQ是会将写入消息持久化的,实例重启或异常退出数据都不会丢失,即便是服务器宕机也只是丢失部分未持久化的数据。同时持久化间隔可配置。

    9、您好,在实际开发过程中,我并没有遇到需要用消息队列的需求,对于消息队列我也只是停留在概念上, 我想问:消息队列的典型应用场景?对于高并发的请求使用消息队列是否能保证及时性。消息队列设计那哪些基本技术? 
    SY:消息队列是异步的所以及时性不能保证,至于使用到的技术可以阅读我写的博客(http://tech.uc.cn/?p=1344)或阅读相关代码(https://github.com/ucweb/ucmq)。

    10、消息队列还没怎么接触,想问下,有什么应用场景会用到消息队列呢?消息队列可以解决那些问题咧??
    SY:消息队列有自己的一定的特性:异步/顺序读写/高性能/协议简单。所以一般会用于解决大量的服务器端异步请求,同时可以实现服务端的负载均衡和业务的容灾。

    11、消息队列产品有很多, 有的提供socket监听, 不支http协议访问, 有的支持http协议访问, 这2者有什么区别吗?  我是否可以理解消息现在很多消息队列都是一个nosql呢? 消息队列和nosql 最显著区别是什么?
    SY:消息队列与NOSQL的不同还是挺显著的:首先,应用场景不同:nosql是高效的弱关系型数据存储,消息队列是一次性消费的顺序消息组件。其次,数据保持的差异:nosql的数据是持久或定时持久的,消息队列的数据随取出而失效,即一次消费。使用什么协议都行,用http考虑的是简单/易接入。

    12、 消息队列和用数据库做存储,然后取数据库内容做处理有什么区别吗?举个例子,发送邮件,我可以先把邮件内容存到是数据库,然后以扫描数据库依次发送。 第二点,消息队列是多线程的吗?
    SY:异步消息机制保证正确发送,不保证及时发送,短信就是这样 。  1.用数据库慢。2.多进程或多线程读取数据库时需要添加标记,否则会造成数据重复,使用消息队列不会出现此问题。

    13、消息队列的长度设置为多大合适?一般是预分配还是动态分配的?
    SY:置为多大是一个经验值来的。一般来说“生产者”突增大量消息,而“消费者”短时间无法处理完,这样消息就会停留在消息队列中,而停留的消息数是有限制的,超出此限制的消息将无法写入。但如果不限制则队列新进来的请求需要等未处理的请求处理完。
    展开全文
  • 队列服务开发

    万次阅读 2013-04-17 01:21:58
    看了HTTPSQS,自己尝试开发了队列服务器。原理是借鉴HTTPSQS,只是通信协议层没有使用HTTP协议,而是自己的定的协议规则。 一是为了练手,练习检测内存泄露同时巩固指针的使用。 二是为了理解通信协议。 目前只...

    看了HTTPSQS,自己尝试开发了队列服务器。原理是借鉴HTTPSQS,只是通信协议层没有使用HTTP协议,而是自己的定的协议规则。

    一是为了练手,练习检测内存泄露同时巩固指针的使用。

    二是为了理解通信协议。

    目前只支持Windows下的编译,后面会支持Linux。现已支持Linux

    我每次都是先在windows下开发,之后再到调试支持linux下的编译。因为Linux的桌面版真心蛋疼,还不如直接windows下开发,然后通过虚拟机下的调试来支持Linux快。


    现有支持的功能:

    1.创建队列

    2.获取队列

    3.插入队列

    4.删除队列

    5.清空队列

    6.队列状态

    后续还会支持事务,事务与队列的关系可以参考Redis的设计实现:http://www.redisbook.com/en/latest/feature/transaction.html


    一、开发用到的库

    1、libevent:通信框架

    2、glib:跨平台的基础库

    3、Tokyo Cabinet:高效的数据存储


    二、服务器端通信协议及队列原理

    通信协议图:



    单个队列图:



    多个队列间的关系图直接借用HTTPSQS的:



    三、PHP客户端的请求

    <?php
    define('QUEUE_MAX_LENGTH', 10000);
    
    class QueueSocket{
    	private $socket;
    	private $errmsg;
    
    	public function __construct($host, $port){
    		$this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
    		socket_bind($this->socket, $host);
    		if(!socket_connect($this->socket, $host, $port)){
    			$this->set_errmsg("Unable to connect<pre>".socket_strerror(socket_last_error())."</pre>");
    			return false;
    		}
    
    		return true;
    	}
    
    	public function send($cmd, $content){
    		if(!is_array($content) || !$content) {
    			return false;
    		}
    
    		$params = false;
    		foreach($content as $key=>$value){
    			$params[] = strtoupper($key)."={$value}";
    		}
    
    		$params = join('&', $params)."\r\n";
    		$binary = pack("a20i1a*", strtoupper($cmd), strlen($params), $params);
    
    		socket_write($this->socket, $binary, strlen($binary));
    	}
    
    	public function read(){
    		$response = socket_read($this->socket, 256, PHP_BINARY_READ);
    
    		if($response) {
    			$response = json_decode($response, true);
    		}else{
    			return false;
    		}
    
    		if(isset($response['errno']) && $response['errno'] > 0) 
    			$this->set_errmsg($response['errmsg']);
    
    		return $response;
    	}
    
    	public function set_errmsg($msg){
    		$this->errmsg = $msg;
    	}
    
    	public function get_errmsg(){
    		return $this->errmsg;
    	}
    
    	public function __destory(){
    		socket_close($this->socket);
    	}
    }
    
    class QueueClient extends QueueSocket{
    	private $appkey;
    
    	public function __construct($host, $port, $appkey){
    		$this->appkey = $appkey;
    		parent::__construct($host, $port);
    	}
    
    	public function create_new_queue($queue_name, $max_length = QUEUE_MAX_LENGTH){
    		$this->send("CREATE", array('queue_name'=>$queue_name, 'max_length'=>$max_length, 'appkey'=>$this->appkey));
    		return $this->read();
    	}
    
    	public function delete_queue($queue_name){
    		$this->send("DELETE", array('queue_name'=>$queue_name, 'appkey'=>$this->appkey));
    		return $this->read();
    	}
    
    	public function clear_queue($queue_name){
    		$this->send("CLEAR", array('queue_name'=>$queue_name, 'appkey'=>$this->appkey));
    		return $this->read();
    	}
    
    	public function put_queue($queue_name, $data){
    		if(is_array($data) || is_object($data)) $data = serialize($data);
    
    		$this->send("PUT", array('queue_name'=>$queue_name, 'data'=>$data, 'appkey'=>$this->appkey));
    		return $this->read();
    	}
    
    	public function get_queue($queue_name){
    		$this->send("GET", array('queue_name'=>$queue_name, 'appkey'=>$this->appkey));
    		return $this->read();
    	}
    
    	public function state($queue_name){
    		$this->send("STATE", array('queue_name'=>$queue_name, 'appkey'=>$this->appkey));
    		return $this->read();
    	}
    }
    
    $queue_client = new QueueClient("localhost", 8080, 'testkey');
    
    //test create queue
    $result = $queue_client->create_new_queue("test_queue", 10000);
    print_r($result);
    
    //test put
    for($i=0; $i<100; $i++){
    	$result = $queue_client->put_queue("test_queue", "jinyong");
    	print_r($result);
    }
    
    // test get_queue
    for($i=0; $i<100; $i++){
    	$result = $queue_client->get_queue("test_queue");
    	print_r($result);
    }
    
    // test state_queue
    $result = $queue_client->state("test_queue");
    print_r($result);
    
    // test clear_queue
    $queue_client->clear_queue("test_queue");

    下面是客户端请求结果的部分截图:



    队列服务器软件下载(Windows):http://yunpan.cn/QWPWu4kk6N36D

    队列服务器源码、PHP客户端源码下载:http://yunpan.cn/QW563d8CJhzaM

    Linux下的源码安装:

    tar -zxf queue_server.tar.gz
    cd queue_server/libevent
    make
    在安装之前需要先安装Kyoto Cabinet.

    wget http://fallabs.com/tokyocabinet/tokyocabinet-1.4.48.tar.gz
    tar -zxf tokyocabinet-1.4.48.tar.gz
    cd tokyocabinet-1.4.48
    ./configure
    make && make install

    展开全文
  • 注意:修改redis配置之后都需要重启队列服务,只针对之前已经成功启动队列服务的用户 查找queue服务进程ID,SSH命令: ps -ef | grep -v grep | grep queue/listen 执行SSH kill命令结束进程: kill 进程ID 例如本...
  • iOS音频队列服务

    千次阅读 2013-06-27 19:33:14
    音频队列服务提供一个可能,那就是把音频数据块填充到音频队列服务缓冲区中,从而达到播放声音的目的,这种方式很类似 Windows 中的 waveOutWrite 方法。这样,我们就可以通过这个方法实现播放从网络上传输过来的...
  • 流行消息队列服务

    千次阅读 2012-07-26 21:38:21
    一、简单消息队列服务 HTTPSQS HTTPSQS(HTTP Simple Queue Service)是一款基于 HTTP GET/POST 协议的轻量级开源简单消息队列服务,使用 Tokyo Cabinet 的 B+Tree Key/Value 数据库来做数据的持久化存储。 队列...
  • 使用beanstalk搭建队列服务

    千次阅读 2016-11-17 10:18:02
    这时候就需要队列服务来缓冲服务以达到更高的吞吐率(扛过高峰) 延迟处理请求,比如:延迟添加用户奖励,延迟写入mysql等等。逻辑可以只关注主流程即返回。 二. 我们现在的解决方案-beanstalk队列服务 项目...
  • rabbitmq的队列服务功能

    千次阅读 2012-06-16 21:26:36
    RabbitMQ是一个基于Erlang实现的消息队列服务,遵循AMQP通信协议,是比较流行通用的异步消息队列服务。 (背景知识,Erlang是一个结构化,动态类型编程语言,内建并行计算支持。使用Erlang编写的程序通常有许多轻量...
  • 为什么学习Redis作为消息队列服务器

    千次阅读 2015-01-06 17:27:04
    使用Redis作为消息队列服务场景  “ 消息 ”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中,“ 消息队列 ”是在消息的传输...
  • Java实现消息队列服务

    千次阅读 2019-08-30 16:34:15
    首先我们必须需要搞明白 MQ (消息队列) 中的三个基本角色 ProducerBrokerConsumer 整体架构如下所示 自定义协议 首先从上一篇中介绍了协议的相关信息,具体厂商的 MQ(消息队列) 需要遵循某种协议或者自定义协议 , ...
  • 音频队列服务(Audio Queue Services)

    千次阅读 2014-07-30 14:49:23
     本文档介绍了如何使用音频队列服务(Audio Queue Services),这是Core Audio Toolbox框架中的一个C语言编程接口。 什么是音频队列服务(Audio Queue Services)  在iOS和Mac OS X中,音频队列服务提供了...
  • 摘要:消息队列Kafka是一个分布式的、高吞吐量、高可扩展性消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等,是大数据生态中不可或缺的产品之一...
  •  ● 可以在不停止服务的情况下便捷地修改单个队列的最大队列数量。  ● 可以实时查看队列状态(入队列位置、出队列位置、未读队列数量、最大队列数量)。  ● 可以查看指定队列ID(队列点)的内容,包括未出、已...
  • 关于 -- IPhone 的音频队列服务

    千次阅读 2012-06-14 00:40:10
    音频队列服务提供一个可能,那就是:把音频数据块填充到音频队列服务缓冲区中,从而达到播放声音的目的,这种方式很类似 Windows 中的 waveOutWrite 方法。这样,我们就可以通过这个方法实现播放从网络上传输过来的...
  •  本文档介绍了如何使用音频队列服务(Audio Queue Services),这是Core Audio Toolbox框架中的一个C语言编程接口。 什么是音频队列服务(Audio Queue Services)  在iOS和Mac OS X中,音频队列服务提供了...
  • laravel下使用beanstalkd队列服务

    千次阅读 2016-08-15 17:38:04
    1.使用Artisan CLI生成新的队列任务 php artisan make:job someJobs  该命令将会在app/Jobs目录下生成一个新的类,并且该类实现了Illuminate\Contracts\Queue\ShouldQueue接口,告诉Laravel该任务应该被推送...
  • 在使用Python队列服务 Python RQ 时候的报错: Functions from the __main__ module cannot be processed by workers. 原因:  work 不能和job放在同一模块中,否则程序会报错 解决: 把使用...
  • 阿里云MQ消息队列服务推送

    千次阅读 2018-06-04 17:17:35
    A、首先在阿里云上申请消息队列MQ服务;B、然后创建一个Topic(主题,一级主题);然后创建生产者与消费者;C、不过此时还没有结束 ,还需要创建一个AccessKey和AccessSecret(在访问控制中创建用户);再在访问控制...
  •  当你使用音频队列服务进行录制的时候,你可以将音频录制到任何地方——磁盘文件、网络连接或内存对象等等。本章将介绍中最常见的一种情况——将音频录制到磁盘文件中。 注意: 本章介绍了基于ANSI-...
  • 一、消息队列场景简介  “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中,“消息队列”是在消息的传输过程中保存消息的...
  • Linux下常用轻量级队列服务比较

    千次阅读 2014-03-24 18:06:41
    Linux IPC: IPC进程间通信(Inter-...使用此队列不需要额外安装服务,是系统内置功能。 由于shell中也可以操作此队列,故PHP与Shell需要通讯时使用此队列会比较方便 注:PHP使用前需要开启内置的 sysvmsg扩展模块
  • 轻量级消息队列服务UCMQ

    千次阅读 2013-07-03 20:18:36
    获取某队列的状态,查看队列的属性 http://127.0.0.1:8803/?name=testmq&opt=status&ver=2 数据data文件结构 参考: http://ucweb.github.io/ucmq_guide/
  • 它是著名微博客网站Twitter开发用来处理大量的队列消息,以及保持服务的响应。Starling已经在生产环境中使用,不仅是Twitter在使用,FiveRuns同样在使用。FiveRuns甚至还根据自己的应用做了改进 ,他

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 593,073
精华内容 237,229
关键字:

队列服务