精华内容
下载资源
问答
  • 把work_struct加入到工作队列workqueue的API在系列文章1有介绍,这些API虽然用法不一,但是最终都会调用__queue_work()函数来进行实际加入操作,比如API:queue_work_on:bool queue_work_on(int cpu, struct ...

    文章系列

    1.linux工作队列 - workqueue总览
    2.linux工作队列 - workqueue_struct创建
    3.linux工作队列 - 把work_struct加入工作队列
    4.linux工作队列 - work_struct被调用过程

    把work_struct加入工作队列

    把work_struct加入到工作队列workqueue的API在系列文章1有介绍,这些API虽然用法不一,但是最终都会调用__queue_work()函数来进行实际加入操作,比如API:queue_work_on:

    bool queue_work_on(int cpu, struct workqueue_struct *wq,
               struct work_struct *work)
    {
        bool ret = false;
        unsigned long flags;
    
        local_irq_save(flags);
    
        if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
            __queue_work(cpu, wq, work);------调用到__queue_work
            ret = true;
        }
    
        local_irq_restore(flags);
        return ret;
    }

    下面分析__queue_work的代码,如下:

    static void __queue_work(int cpu, struct workqueue_struct *wq,
                 struct work_struct *work)
    {
        struct pool_workqueue *pwq;
        struct worker_pool *last_pool;
        struct list_head *worklist;
        unsigned int work_flags;
        unsigned int req_cpu = cpu;
    
        /*
         * While a work item is PENDING && off queue, a task trying to
         * steal the PENDING will busy-loop waiting for it to either get
         * queued or lose PENDING.  Grabbing PENDING and queueing should
         * happen with IRQ disabled.
         */
        WARN_ON_ONCE(!irqs_disabled());
    
        debug_work_activate(work);
    
        /* if draining, only works from the same workqueue are allowed */
        if (unlikely(wq->flags & __WQ_DRAINING) &&
            WARN_ON_ONCE(!is_chained_work(wq)))
            return;
    retry:
        if (req_cpu == WORK_CPU_UNBOUND)
            cpu = wq_select_unbound_cpu(raw_smp_processor_id());
    
        //根据传入的参数struct workqueue_struct *wq来选择需要的类型
        /* pwq which will be used unless @work is executing elsewhere */
        if (!(wq->flags & WQ_UNBOUND))
            pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);----------------得到bound类型的pool_workqueue 
        else
            pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));-----得到unbound类型的pool_workqueue 
    
        /*
         * If @work was previously on a different pool, it might still be
         * running there, in which case the work needs to be queued on that
         * pool to guarantee non-reentrancy.
         */
        //work有可能已经存在另一个不同的work pool中,它可能仍旧在运行,为了避免重入,work仍然要加入到原来的work pool
        last_pool = get_work_pool(work);
        if (last_pool && last_pool != pwq->pool) {
            struct worker *worker;
    
            spin_lock(&last_pool->lock);
    
            worker = find_worker_executing_work(last_pool, work);
    
            if (worker && worker->current_pwq->wq == wq) {
                pwq = worker->current_pwq;----------------------得到last的pool_workqueue
            } else {
                /* meh... not running there, queue here */
                spin_unlock(&last_pool->lock);
                spin_lock(&pwq->pool->lock);
            }
        } else {
            spin_lock(&pwq->pool->lock);
        }
    
        /*
         * pwq is determined and locked.  For unbound pools, we could have
         * raced with pwq release and it could already be dead.  If its
         * refcnt is zero, repeat pwq selection.  Note that pwqs never die
         * without another pwq replacing it in the numa_pwq_tbl or while
         * work items are executing on it, so the retrying is guaranteed to
         * make forward-progress.
         */
        if (unlikely(!pwq->refcnt)) {
            if (wq->flags & WQ_UNBOUND) {
                spin_unlock(&pwq->pool->lock);
                cpu_relax();
                goto retry;
            }
            /* oops */
            WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
                  wq->name, cpu);
        }
    
        /* pwq determined, queue */
        trace_workqueue_queue_work(req_cpu, pwq, work);
    
        if (WARN_ON(!list_empty(&work->entry))) {
            spin_unlock(&pwq->pool->lock);
            return;
        }
    
        pwq->nr_in_flight[pwq->work_color]++;
        work_flags = work_color_to_flags(pwq->work_color);
    
        if (likely(pwq->nr_active < pwq->max_active)) {
            trace_workqueue_activate_work(work);
            pwq->nr_active++;
            worklist = &pwq->pool->worklist;----------------得到pool_workqueue的worker_pool的worklist
            if (list_empty(worklist))
                pwq->pool->watchdog_ts = jiffies;
        } else {
            work_flags |= WORK_STRUCT_DELAYED;
            worklist = &pwq->delayed_works;
        }
    
        insert_work(pwq, work, worklist, work_flags);-----把work加入到worklist,并唤醒
    
        spin_unlock(&pwq->pool->lock);
    }
    

    看函数insert_work代码:

    static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
                struct list_head *head, unsigned int extra_flags)
    {
        struct worker_pool *pool = pwq->pool;
    
        /* we own @work, set data and link */
        set_work_pwq(work, pwq, extra_flags);
        list_add_tail(&work->entry, head);---------------加入work pool的链表
        get_pwq(pwq);
    
        /*
         * Ensure either wq_worker_sleeping() sees the above
         * list_add_tail() or we see zero nr_running to avoid workers lying
         * around lazily while there are works to be processed.
         */
        smp_mb();
    
        if (__need_more_worker(pool))
            wake_up_worker(pool);----------------------唤醒work pool
    }
    展开全文
  • 关于消息队列的一些理解,明白我们为什么要在自己的系统加入消息队列? 对于一个业务比较小的系统,以前的做法是直接通过一个项目集成了所有业务,比如是一个商城系统,登陆,注册,购物车,支付功能等等,一...

    消息队列技术,它是分布式应用间交换信息的一种技术。

    关于消息队列的一些理解,不明白我们为什么要在自己的系统加入消息队列?

    对于一个业务比较小的系统,以前的做法是直接通过一个项目集成了所有业务,比如是一个商城系统,登陆,注册,购物车,支付功能等等,一个系统把所有的事情做完了,但是随着业务的不断扩张,单纯的单一系统,已经不能再适应这个商城系统,我们很多时候回对整个系统进行解耦,进行分布式的开发,把一些功能细化出去,形成各种各样的子系统,例如登陆的模块分离出去,变成了独立的授权验证服务器子系统,支付功能形成一个独立的子系统,那么问题来了,由于两个不同的服务器之间,两者是如何进行消息的同步的呢。,那么我们首先可能会想到诸如resetful api或者socket来实现,其实答案是可以的,不过假如是socket实现的,那么对于我们开发者来说,需要我们解决的问题有很多:

    1、我们需要具备网络的底层知识,以及使用相应的客户端语言来实现收发信息,而且还需要定制一套相应的通信协议。

    2、我们如何做到消息传递的准确性以及数据已经完整的接收.

    3、解决网络中断消息传递的处理问题。

    4、在多个系统之间,哪个子系统最先会受到信息.

    5、对于异构系统间的数据交换,如何解决.

    6、如何处理应用程序阻塞.

    7、负载均衡的问题

    8、子系统通信之间的地址管理

    其实,我们要解决的问题远远不止上面这些,但是我们使用了消息队列的中间件之后,可以轻松的把问题解决掉。总的来说,使用消息队列,可以解决我们开发者在解决分布式架构之间的传递问题。

    对于我们开发来者来说,我们一般会采用一些开源的MQ协议来接入我们的系统,下面我简单介绍一些消息队列中间件。

    目前比较热门的三种消息队列

    ActiveMQ

    ActiveMQ 由Apache维护的比较优秀的开源协议。它是可以支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,是支持Apache2.0协议,对于spring项目,我们可以很轻易的嵌入进去。

    支持的客户端语言:

    Java, C, C++, C#, Ruby, Perl, Python, PHP。

    支持的应用协议:

    OpenWire、Stomp、XMPP、AMQP、MQTT等等。

    对于XMPP,我在前一些文章介绍过,它是可以用来我们的即时通讯的协议,以XML格式传递。而MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议。该协议支持所有平台,目前可以用于物联网的通讯中,前景非常可观.

    官方网站:

    http://activemq.apache.org/

    RabbitMQ

    是用Erlang编写的一款基于AMQP协议的消息队列中间件,在开源项目关注度比较高。

    支持的客户端语言:

    Python、Ruby、.NET、Java、Node.js、C、PHP、Go、Erlang

    支持的协议XMPP、STOMP等

    官方网站:

    http://www.rabbitmq.com/devtools.html

    RocketMQ

    它是由阿里维护的一个开源消息队列,它在阿里各个业务线得到了广泛的应用,业务场景如下:

    Mysql binlog 、同步订单类应用、流计算IM 等实时消息领域、Cache 同步、削峰填谷,目前阿里云的消息中间件也是由它演变而来的

    支持的客户端语言:

    Java 、C++

    支持的协议:

    JMS,MQTT

    官方网站:

    https://github.com/alibaba/rocketmq?spm=5176.doc29532.2.1.I7qrLz

    当然消息的中间件,不止上面这些,比如有关注比较高的Jafka/Kafka、open mq,还有一些商业的消息中间件。这里就不一一介绍了。

    大概介绍了MQ的一些知识,我们如何在我们的Spring mvc项目中加入消息队列?

    上面的三种消息队列中间件,大家可以根据自己的业务需要,使用一个适合自己的中间件,我这里主要讲解一些如何在spring MVC加入RabbitMQ。

    RabbitMQ的使用过程:

    大概如下

    (1)client连接到消息队列服务器。

    (2)声明一个exchange。

    (3)声明一个queue。

    (4)设置好exchange和queue之间的关系。

    (5)客户端投递消息的监听。

    RabbitMQ有三种类型的Exchange:direct, topic 和fanout。

    direct:相当于点对点的,下图所示

    消息队列与如何spring mvc 项目中加入消息队列topic:它是将路由键和某模式进行匹配,只有符合规则才会接收到
    消息队列与如何spring mvc 项目中加入消息队列fanout就是广播模式。


    消息队列与如何spring mvc 项目中加入消息队列


    首先,我们知道消息中间件,它是相当于一台服务器程序那样,负责我们客户端的消息的传递,所以我们是要安装消息中间件程序的,而不是简单配置到我们的spring就可以完成事情的,我们spring 配置的只是客户端,这个可能很多人在刚刚接触的时候会有点模糊。

    那么我们是如何安装RabbitMQ

    我们刚才介绍过RabbitMQ是有Erlang语音编写的,那么运行RabbitMQ是需要配置一下erlang的环境的。具体各个操作系统的安装方法不一样,我这里不做详细的解释,大家可以自行研究。

    安装RabbitMQ

    安装环境:Mac +rabbitmq

    对于mac 系统,安装Rabbitmq非常简单,可以通过brew工具来安装,当然这个是自行安装了。直接输入如下指令就可以了。

    brew install rabbitmq

    过了一段时间后,安装完毕。

    启动RabbitMq服务器

    sudo /usr/local/sbin/rabbitmq-server

    这个时候已经成功启动服务器了。我们可以输入http://localhost:15672/#/

    看看我们的服务器上面的一些信息,账号和密码为:guest和guest

    那么对于客户端的连接,我们可以使用127.0.0.1,端口5672,账号和密码guest

    消息队列与如何spring mvc 项目中加入消息队列

    那么我们的spring mvc 项目是如何配置rabbitmq的。

    maven管理

    我这里是通过maven来关系项目,可以在pom.xml加入spring-rabbit和amqp-client,代码如下:

    <!-- http://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->

    <dependency>

    <groupId>org.springframework.amqp</groupId>

    <artifactId>spring-rabbit</artifactId>

    <version>1.6.0.RELEASE</version>

    </dependency>

    <!-- rabbit -->

    <dependency>

    <groupId>com.rabbitmq</groupId>

    <artifactId>amqp-client</artifactId>

    <version>3.5.4</version>

    </dependency>

    下载好后,我们需要了解连接rabbit的一些步骤,

    1、配置连接配置

    2、声明template,可以通过注解中使用rabbit来收发信息

    3、消息对象json转换类。

    4、Queue队列的声明。

    5、交换机的定义

    配置信息

    在这里我编写一个spring管理rabbitmq的xml,我这里只是以topic为例子,其他的几种比如是

    direct和

    这里我命名为:spring-amqp.xml,具体说明在xml文件里

    <beansxmlns="http://www.springframework.org/schema/beans"

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:context="http://www.springframework.org/schema/context"

    xsi:schemaLocation="http://www.springframework.org/schema/rabbit

    http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd

    http://www.springframework.org/schema/beans

    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

    http://www.springframework.org/schema/context

    http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- rabbit服务器连接的一些配置 -->

    <rabbit:connection-factoryid="connectionFactory"

    host="127.0.0.1"

    port="5672"

    username="guest"

    password="guest" />

    <!-- rabbit服务器连接的一些配置 -->

    <rabbit:adminconnection-factory="connectionFactory"id="amqpAdmin"/>

    <!-- rabbit模板的配置 -->

    <rabbit:templateid="amqpTemplate"connection-factory="connectionFactory"channel-transacted="true"

    message-converter="jsonMessageConverter"/>

    <!--json消息转换-->

    <beanid="jsonMessageConverter"class="org.springframework.amqp.support.converter.JsonMessageConverter">

    <propertyname="classMapper">

    <beanclass="org.springframework.amqp.support.converter.DefaultClassMapper">

    </bean>

    </property>

    </bean>

    <!--消息队列,id和name可以是名字一致的-->

    <rabbit:queueid="com.yeehot.topic.queue"name="com.yeehot.topic.queue">

    <rabbit:queue-arguments>

    <entrykey="x-ha-policy"value="all"/>

    </rabbit:queue-arguments>

    </rabbit:queue>

    <!-- 声明Exchange的类型为topic并设定Exchange的名称 -->

    <rabbit:topic-exchangename="yeehot.topic">

    <rabbit:bindings>

    <!-- 这里的queue是<rabbit:queue 里的ID -->

    <rabbit:bindingpattern="com.yeehot.topic"queue="com.yeehot.topic.queue"/>

    </rabbit:bindings>

    </rabbit:topic-exchange>

    <!--消息的监听,如果有多个子系统的话,可以在子系统中定义,我这里只是同时让他把消息发送和接收放在同一个系统而已。-->

    <rabbit:listener-containerconnection-factory="connectionFactory"message-converter="jsonMessageConverter"

    channel-transacted="true"concurrency="10"

    auto-startup="true">

    <!-- queues是我们上面的消息的队列,这句话的意思就是监听上面名字的队列 -->

    <rabbit:listenerqueues="com.yeehot.topic.queue"ref="testReceiveMessageListener"method="handleMessage"/>

    </rabbit:listener-container>

    </beans>



    编写信息发送器的代码:

    @Controller

    public class MessageQueeController {

    privatestaticfinal String EXCHANGE = "yeehot.topic";

    @Resource(name="amqpTemplate")

    private AmqpTemplate amqpTemplate;

    @RequestMapping(value="/sendmsg", method=RequestMethod.GET, produces = "text/html;charset=UTF-8")

    @ResponseBody

    public String showInfo( String msg){

    System.out.println("message="+msg);

    amqpTemplate.convertAndSend(EXCHANGE, "com.yeehot.topic",msg);

    return msg;

    }

    }

    编写接收信息的代码

    @Component

    public class TestReceiveMessageListener {

    public void handleMessage(String result) {

    if(result==null)

    {

    System.out.println("oh ,no");

    return;

    }

    System.out.println("receive msg="+result);

    //处理逻辑

    }

    }



    运行:

    我们在浏览器输入我们的项目地址

    http://192.168.3.114:8080/Yeehot-Program-King/sendmsg?msg=ming,可以发现控制台会打印出如下信息,说明我们的消息监听已经成功了。

    消息队列与如何spring mvc 项目中加入消息队列

    展开全文
  • 考虑到音乐歌曲都是3、4Mb的小文件,断点下载的功能便需要了。因此只需要实现一个特别轻量、简单的下载管理类,进行管理即可。最初的思路便是任务队列,单线程顺序执行,一个文件接着一个文件进行下载。之前看过...

    最近的项目是一个与音乐相关的App,其中有一个功能:收藏喜欢的歌曲,在wifi的环境下自动下载。

    考虑到音乐歌曲都是3、4Mb的小文件,断点下载的功能便不需要了。因此只需要实现一个特别轻量、简单的下载管理类,进行管理即可。

    最初的思路便是任务队列,单线程顺序执行,一个文件接着一个文件进行下载。

    之前看过AsyncTask的部分源码,其设计与我的想法类似,于是便借鉴着AsyncTask的源码,实现了一个特别简单、轻量的下载管理类。

    源码如下:

    public class MyDownloadManager {
    
        private static final String TAG = "MyDownloadManager";
        private File downloadDir; // 文件保存路径
        private static MyDownloadManager instance; // 单例
    
        // 单线程任务队列
        public static Executor executor;
        private static final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
    
            public Thread newThread(Runnable r) {
                return new Thread(r, "MyDownloadManager #" + mCount.getAndIncrement());
            }
        };
        private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingQueue<>(128);
        public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 1, 1,
                TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
    
    
        private MyDownloadManager() {
            // 初始化下载路径
            downloadDir = new File(AndroidCacheUtils.getCacheDirFile(MiaApplication.getInstance()), "download");
            if (!downloadDir.exists()) {
                downloadDir.mkdirs();
            }
            executor = new SerialExecutor();
        }
    
        /**
         * 顺序执行下载任务
         */
        private static class SerialExecutor implements Executor {
            final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
            Runnable mActive;
    
            public synchronized void execute(final Runnable r) {
                mTasks.offer(new Runnable() {
                    public void run() {
                        try {
                            r.run();
                        } finally {
                            scheduleNext();
                        }
                    }
                });
                if (mActive == null) {
                    scheduleNext();
                }
            }
    
            protected synchronized void scheduleNext() {
                if ((mActive = mTasks.poll()) != null) {
                    THREAD_POOL_EXECUTOR.execute(mActive);
                }
            }
        }
    
        /**
         * 获取单例对象
         *
         * @return
         */
        public static MyDownloadManager getInstance() {
            if (instance == null) {
                instance = new MyDownloadManager();
            }
            return instance;
        }
    
        /**
         * 添加下载任务
         *
         * @param path
         */
        public void addDownloadTask(final String path) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    download(path);
                }
            });
        }
    
        /**
         * 下载文件
         *
         * @param path
         */
        private void download(String path) {
            String fileName = AndroidMD5.MD5(path);
            File savePath = new File(downloadDir, fileName); // 下载文件路径
            File finallyPath = new File(downloadDir, fileName + ".mp3"); // 下载完成后加入.mp3后缀
            if (finallyPath.exists()) { // 文件存在则已下载
                Log.i(TAG, "file is existed");
                return;
            }
            if (AndroidNetWorkUtils.isWifiDataEnable(MiaApplication.getInstance())) { // 如果是Wifi则开始下载
                if (savePath.exists() && savePath.delete()) { // 如果之前存在文件,证明没有下载完成,删掉重新创建
                    savePath = new File(downloadDir, fileName);
                }
                Log.i(TAG, "download start");
                try {
                    byte[] bs = new byte[1024];
                    int len;
                    URL url = new URL(path);
                    InputStream is = url.openStream();
                    OutputStream os = new FileOutputStream(savePath);
                    while ((len = is.read(bs)) != -1) {
                        os.write(bs, 0, len);
                    }
                    os.close();
                    is.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if (savePath.renameTo(finallyPath)) { // 下载完成后重命名为.mp3文件
                    Log.i(TAG, "download end");
                    EventBus.getDefault().post(new DownloadDoneEvent(path));
                }
            } else { // 不是wifi则不下载
                Log.i(TAG, "not wifi net, stop download");
            }
    
        }
    
        /**
         * 添加删除任务
         *
         * @param path
         */
        public void addDeleteTask(final String path) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    delete(path);
                }
            });
        }
    
        /**
         * 删除本地文件
         *
         * @param path
         */
        private void delete(String path) {
            String fileName = AndroidMD5.MD5(path);
            File savePath = new File(downloadDir, fileName + ".mp3");
            Log.i(TAG, savePath.getPath());
            if (savePath.exists()) {
                if (savePath.delete()) {
                    Log.i(TAG, "file is deleted");
                }
            }
        }
    
        /**
         * 返回下载路径
         *
         * @return
         */
        public File getDownloadDir() {
            return downloadDir;
        }
    }

    我们看到public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(1, 1, 1,TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);,这句代码便是创建一个线程池。其方法源码及参数说明:

    /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters and default rejected execution handler.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }

    这里我前三个参数传的都是1,既是最多只有1个线程。sPoolWorkQueue参数则是一个容量为128的任务队列,既最多能存放128个任务。

    下面我们看到SerialExecutor的代码,它有一个Runnable队列mTasks ,不断的接受Runnable对象,并通过poll操作,每次取出顶部的Runnable进行执行。结合创建的单一线程池,便实现了我需要的简易、轻量的下载器。

    展开全文
  • 队列实现队列中元素排序

    千次阅读 2015-03-05 22:57:49
    题目要求对一个队列中的元素进行排序,只允许使用一个临时队列能进行除去入队、出队、判空以外的任何操作。 实现方法为每次遍历队列,从中找出最小的元素,放入临时队列,遍历的过程是出队的过程,注意如果...

    题目要求对一个队列中的元素进行排序,只允许使用一个临时队列,不能进行除去入队、出队、判空以外的任何操作。


    实现方法为每次遍历队列,从中找出最小的元素,放入临时队列,遍历的过程是出队的过程,注意如果一个元素比当前的最小值大,则要放回队列当中,如果比当前的最小值小,则保存起来,暂时不放回队列中,发现更小的,把原来的最小值放入,更新最小值,在遍历完一次以后,将最小值存入临时队列。然后开始第二次遍历,注意每次遍历原队列中都会减少一个元素,因此共遍历队列N次,每次对队列N、N-1、N-2 ... 1这么多次出队操作来找最小值,在最后一次完成后临时队列中存放的就是排序好的结果,出队N次即可按非降序输出。


    注意最坏的情况:如果最初的队列为非降序,则会造成大量的无用功,因此在原队列生成时就动态判断是否是非降序列,如果是,则直接将原队列打印。

    #include<iostream>
    #include<stdio.h>
    using namespace std;
    
    #define INF   99999999
    
    #define MAXSIZE 100001
    
    typedef struct {
    	int items[MAXSIZE];
    	int front;
    	int rear;
    }Queue;
    
    void initQueue(Queue *q){
    
    	q->front = q->rear = 0;
    
    }
    
    bool isEmpty(Queue *q){
    
    	return q->front == q->rear;
    
    }
    
    void AddQ(Queue *q, int item){
    
    	if ((q->rear + 1)%MAXSIZE == q->front)
    	{
    		// 为了区分空队列与满队列,必须保证rear和front不能再次碰面,因此要留出一个距离,提前判断是否添加后front=rear。
    		printf("队列满");
    		return;
    	}
    	q->rear = (q->rear + 1) % MAXSIZE;
    	*(q->items + q->rear) = item;
    }
    
    int DeleteQ(Queue *q){
    
    	if (q->rear == q->front)
    	{
    		printf("队列空");
    		return NULL;
    	}
    	
    	q->front = (q->front + 1) % MAXSIZE;
    	return *(q->items + q->front);
    
    }
    
    int main(){
    	
    	Queue q;
    	Queue tempQ;
    	
    	initQueue(&q);
    	initQueue(&tempQ);
    	
    	int N;
    
    	scanf("%d", &N);
    
    	int buffer;
    
    	bool isRising = true;
    	int last = 0, now = -1;
    
    	for (int i = 0; i < N; i++){
    		scanf("%d", &buffer);
    		AddQ(&q, buffer);
    		now = buffer;
    		if (i > 0 && now < last){
    			isRising = false;
    		}
    		last = now;
    	}
    
    	if (isRising)
    	{
    		printf("%d", DeleteQ(&q));
    		while (!isEmpty(&q))
    		{
    			printf(" %d", DeleteQ(&q));
    		}
    		printf("\n");
    		return 0;
    	}
    
    	int min;
    
    	for (int i = 0; i < N; i++){
    
    		min = INF;
    
    		for (int j = i; j < N; j++){
    
    			buffer = DeleteQ(&q);
    
    			if (buffer < min){
    				if (min != INF) AddQ(&q, min);
    				min = buffer;
    			}
    			else{
    				AddQ(&q, buffer);
    			}
    
    		}
    
    		AddQ(&tempQ, min);
    
    	}
    
    	printf("%d", DeleteQ(&tempQ));
    	while (!isEmpty(&tempQ))
    	{
    		printf(" %d", DeleteQ(&tempQ));
    	}
    	printf("\n");
    	
    
    	return 0;
    }


    展开全文
  • Python 队列

    万次阅读 多人点赞 2018-06-28 15:42:29
    队列是线性的集合,对于队列来说,插入限制在一端(队尾),删除限制在另一端(队头)。队列支持先进先出(FIFO)的协议。我们在后面会提到一种特殊的队列——优先队列,在优先队列中,具有较高优先级的项会在那些...
  • 队列相关资料: http://chenjumin.iteye.com/blog/2182322 http://blog.csdn.net/luohuacanyue/article/details/16359777 Queue  ------------  1.ArrayDeque, (数组双端队列)  2.PriorityQueue, ...
  • 消息队列

    千次阅读 多人点赞 2019-09-19 21:42:59
    消息队列 “消息队列”是在消息的传输过程中保存消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高...
  • 阻塞队列

    千次阅读 2017-12-08 09:42:22
    Java中的阻塞队列 1.1 什么是阻塞队列(BlockingQueue) 支持阻塞操作的队列。具体来讲,支持阻塞添加和阻塞移除。 阻塞添加:当队列满的时候,队列会阻塞插入插入的元素的线程,直到队列不满; 阻塞移除:...
  • FIFO队列和优先队列

    千次阅读 2017-08-17 17:49:30
    FIFO队列和优先队列一.区别FIFO队列相当于一般的队列 优先队列为操作时有优先级的队列二.定义方式1.头文件: #include<queue> 2.FIFO队列: queue<类型名> 变量名; 3.优先队列: priority_queue<类型名> 变量名...
  • 队列简介:队列(Queue)是一种数据结构,可以在队列的一端插入元素而在队列的另一端删除元素。 ( 1 )允许删除的一端称为 队头( Front ) 。 ( 2 )允许插入的一端称为 队尾( Rear ) 。 ( 3 )当队列中没有...
  • 双端单调队列

    千次阅读 多人点赞 2018-09-16 10:16:51
    这次介绍一种新的数据结构:双端队列:双端队列是指允许两端都可以进行入队和出队操作的队列,其元素的逻辑结构仍是线性结构。将队列的两端分别称为前端和后端,两端都可以入队和出队。 堆栈、队列和优先队列都可以...
  • 核心线程满了后,再有任务需要执行,线程池为什么继续创建新的线程呢,而是中间引入阻塞队列,能能在核心线程满了后,继续创建线程,直到线程数达到最大线程数了,再把任务引入阻塞队列,这样的步骤也是可以的...
  • 近日,蚂蚁金服消息队列 SOFAMQ 宣布加入 Linux 基金会旗下 OpenMessaging 开源社区,将与阿里巴巴、Yahoo、滴滴、微众银行等社区成员一起,持续推进消息开放标准...
  • 单调队列

    千次阅读 2013-11-03 13:31:25
    以单调减队列为例:队列内的元素(e1,e2,e3...en)存在(e1)的关系,所以队首元素e1一定是最小的元素。与优先队列不同的是,当有一个新的元素e入队时,先要将队尾的所有大于e的元素弹出,以保证单调性,再让...
  • 队列 队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端...
  • 环形队列

    万次阅读 2016-07-16 23:50:19
    我们都是新世纪的好青年,所以我们也不会也允许插队。而从生活中,我们可以抽象出队列的概念。队列就是一个能够实现“先进先出”(FIFO(first in first out))的存储结构。队列分为普通队列和环形队列。先讲一讲普通...
  • java 中 阻塞队列 非阻塞队列 和普通队列的区别

    万次阅读 热门讨论 2018-08-22 23:14:26
    阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。...同样,试图往满的阻塞队列中添加新元素的线程同样也会被阻塞,...
  • python 多线程与队列

    千次阅读 2019-01-17 11:27:41
    各位好,之前写了多线程,但是在实际的生产中,往往情况比较复杂,要处理一批任务(比如要处理列表中所有元素),这时候可能创建很多的线程,线程过多反而不好,还会造成资源开销太大,这时候想到了队列。...
  • RabbitMQ之惰性队列与镜像队列

    千次阅读 多人点赞 2020-04-04 00:58:18
    文章目录1、惰性队列1.1、使用场景1.2、定义1.3、队列模式1.4、工作流程1.5、总结2、镜像队列2.1、消息流转过程2.2、负载均衡2.3、消息的可靠性2.4、GM协议2.5、镜像队列宕机2.6、镜像队列启动与停止顺序在这里插入...
  • FreeRTOS队列

    千次阅读 2018-01-08 13:22:52
    开始学习FreeRTOS,学习参考书籍和视频来自正点原子FreeRTOS源码详解与应用开发,北京航空航天大学出版社 1 队列简介  队列是为了任务与任务、任务与中断之间的通信而准备的,可以在任务与任务、任务与中断之间...
  • 优先队列(PriorityQueue)的实现: 一,PriorityQueue的特性 PriorityQueue是一种比较特殊的队列数据结构,传统的队列复合(FIFO)先进先出原则,而PriorityQueue是以数据的优先级进行存储; PriorityQueue类在Java1.5中...
  • 之前了解到一个朋友通过监听key实现来实现延时队列的功能。 后面了解到包括Java单机版的DelayQueue以及RabbitMQ延时队列/延迟重试等相对更靠谱一些。 常见的有: 定期轮询(数据库等) DelayQueue Timer ...
  • 消息队列之 RabbitMQ

    万次阅读 多人点赞 2019-04-30 17:11:00
    关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了。 市面上的消息队列产品有很多,比如老牌的 ActiveMQ、...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 366,979
精华内容 146,791
关键字:

下载已加入队列不开始下载