精华内容
下载资源
问答
  • 并行处理

    2012-08-31 14:53:34
    并行处理 发表日期:2007-03-28作者:[转贴] 出处: -  要编写一个好的游戏,必需使用到并行机制。并行机制要涉及到一个重要的语句,那就是While语句。你可能不禁会问...
    并行处理
    发表日期:2007-03-28作者:[转贴] 出处:

    -

      要编写一个好的游戏,必需使用到并行机制。并行机制要涉及到一个重要的语句,那就是While语句。你可能不禁会问:那不就是循环吗?对,并行机制就是要利用循环,即游戏循环(Game Loop)。实际上,所有程序并行的本质就是循环,连Windows也不例外。Windows号称多任务操作系统,实际上,在一个时间内CPU只能执行一条指令。所谓多任务,不过是并行的假象而已。在一个循环中分别执行各程序的一条语句,由于执行完该循环速度极快,于是看起来好象是多个任务同时工作一样。还有...,哎,不用费口舌了。相信你对并行有了一个初步应像,这就可以了,让我们继续。
      不是有一种for循环吗,为什么不用它呢?对,for循环也可以。只要可以构成死循环的语句都可以。死循环呀,听起来有点可怕。其实解决死循环简单的很,用break语句,再不行用goto语句,一切死循环都轻松搞定。现在就让我们来练一下兵:

    main()
    {
    int key;
    while(1)
    {
    if(kbhit())
    {
    key=getch();
    if(key==0) { getch();continue; }
    if(key==27) break;
    if(key==13) printf("\n");/*支持回车*/
    if(key==8&&wherex()>0)
    {
    gotoxy(wherex()-1,wherey());/*支持退格*/
    putch(' ');
    }
    printf("%c",key);/*输出字符*/
    }
    }
    }

    上面就是一个典型的文本编辑器界面,现在让我们来修改一下该程序,在屏幕右上角做一个计时器和一个计算击键次数的计数器,该计时器与计数器与文本编辑互不干扰,达到并行的效果。

    main()
    {
    int key,x,y,n=0;
    clrscr();
    while(1)
    {
    if(kbhit())
    {
    key=getch();
    if(key==0) {getch();continue;}
    if(key==27) break;
    if(key==13) printf("\n");/*支持回车*/
    if(key==8&&wherex()>0)
    {
    gotoxy(wherex()-1,wherey());/*支持退格*/
    putch(' ');
    gotoxy(wherex()-1,wherey());
    continue;
    }
    printf("%c",key);/*输出字符*/
    n++;/*计数*/
    }
    x=wherex();y=wherey();
    gotoxy(55,1);
    printf(" %d sec,keydown times %d ",clock()/18,n);/*记时,显示*/
    gotoxy(x,y);
    }
    }


    现在我们实现了简单的并行机制。然而,该循环有个缺点:太耗CPU,如果不按下任何键,CPU也会不停地刷新计数与记时器。于是我们引入了周期的概念,使一个周期只执行一次语句,这样即节省CPU,使用要求执行速度不同的并行语句也容易控制。程序入下:

    /*并行结构*/
    #define delay_time 4; /*定义的周期长度*/
    main()
    {
    long now_time=0,old_time,time_count=0;
    char done=0; /*使用done标志使每一个周期只执行一次指令*/
    clrscr();
    old_time=clock();
    while(!kbhit())
    {
    now_time=clock();
    if(now_time-old_time<delay_time)
    {
    if(!done)
    {
    puts("计算!"); /*该语句可替换为自己需要并行的语句块*/
    done=1;
    }
    }
    else
    {
    old_time=now_time;done=0;
    }
    }
    }

    如果上面的puts语句换成两个沿对角线移动小球的语句,不就可以实现两个小球同时运动了吗?在程序中动态地调整delay_time的值,还可以确定小球移动的快慢。这只是一个提示,只要遵循该结构,还可以实现好多有趣的效果,读者可以自己去研究。

    展开全文
  • Pipeline并行处理模型

    千次阅读 2019-09-03 23:21:07
    文章目录前言Pipeline并行处理模式概要车厢模拟式的Pipeline并行处理模式 前言 在我们平时的程序处理过程中,在效率上而言,串行处理的效率不如并行处理的效率,从线程层面而言,即多线程效率不如单线程。但是...

    前言


    在我们平时的程序处理过程中,在效率上而言,串行处理的效率不如并行处理的效率,从线程层面而言,即多线程效率不如单线程。但是尽管说并行处理效率确实会比较高,但是它在处理拥有数据结果依赖关系的逻辑时,需要额外的同步管控。例如我的输出怎么临时被存放,然后被下游程序收到处理等等。倘若我们设计的并行处理程序能很好地解决,逻辑依赖关系,那么无疑并行处理的方式将会大大提速我们实际系统中的执行效率。本文笔者来聊聊其中一种被称为Pipeline(流水线)模式的并行处理作业模式,相信此并行处理模式在实际工作中还是有所应用场景的。

    Pipeline并行处理模式概要


    首先一个问题,什么叫做Pipeline并行处理模式呢?Pipeline并行处理模式,首先它具有Pipeline属性,它有一条完整的依赖关系处理流程。比如一道工序总共分3个流程,A,B和C, 并且有严格的先后顺序执行要求。B流程的执行必须依赖A流程的执行结果,同样C流程需要依赖B的。其实这么来看,串行处理模式是天然适用于Pipeline处理模式的。

    不过本文我们要讨论的是并行处理Pipeline模式作业。如果按照惯常使用的多线程依赖同步的处理方法,主要有以下两种:

    • 将程序中间结果写出到第三方存储介质内,然后并行处理线程与此第三方存储进行数据交互,同步。
    • 程序中间结果保存到本地变量内,通过线程安全的做法将中间结果进行赋值输出,灵活一点的,还可以进行同步的控制。

    接下来笔者将要讨论的方法是第二种方法。

    车厢模拟式的Pipeline并行处理模式


    这里我们将用模拟车厢的方式来进行Pipeline work的执行,每个车厢代表一个执行单元,车厢具有Pipeline work的独有特征:

    • 车厢具有连接关系,每个车厢有它的前车厢和后车厢。
    • 头车厢无前车厢,它的处理无须依赖前车厢执行结果。
    • 尾车厢无后车厢,它的处理完毕即意味着总执行过程的结束。

    另外在每个车厢内,它还具有以下变量:

    • 多执行线程
    • 中间结果置换列表

    因此,基于车厢模型的Pipeline处理模式如下图所示:
    在这里插入图片描述
    上图中Middle Carriage(中部车厢)可能会有很多节,每个线程都有对应的中间结果输出列表。

    Pipeline并行处理模式代码实现


    此部分笔者来分享一段Pipeline并行处理模式的多线实现代码,引用自Hadoop社区JIRA的一个patch。

    在这个patch代码中,还是沿用了上节车厢的概念,另外它的整个过程可拆分为以下几个步骤:

    1. Pipeline work的定义以及Pipeline task的构建
    2. 根据Pipeline task,来构建车厢
    3. 车厢内部逻辑处理
      3.1) 头车厢(起始线程)处理过程
      3.2) 中部车(中部线程)厢处理过程
      3.3) 尾部车厢(末尾线程)处理过程

    针对上述子步骤,我们逐一来阐述。首先是Pipeline work的定义,此work的是每个车厢的线程的具体执行逻辑。

        PipelineWork header = new PipelineWork<Object, TestItem>() {
          int idx = 0;
    
          @Override
          public TestItem doWork(Object obj) throws IOException {
            if (idx < items.size()) {
              TestItem item = items.get(idx);
              item.setVal(item.getVal() * 2);
              if (exp && (idx == items.size() - 1)) {
                throw ioe;
              }
              idx++;
              
              LOG.info(Thread.currentThread().getName() +
                  ": Head worker produce item: " + item.getVal());
              return item;
            } else {
              LOG.info("Head worker finsihed produce item.");
              return null;
            }
          }
        };
        PipelineWork middle = new PipelineWork<TestItem, TestItem>() {
          @Override
          public TestItem doWork(TestItem item) {
            item.setVal(item.getVal() * 2);
            LOG.info(Thread.currentThread().getName() +
                ": Middle worker set value: " + item.getVal());
    
            return item;
          }
        };
        PipelineWork trailer = new PipelineWork<TestItem, Object>() {
          @Override
          public Object doWork(TestItem item) {
            item.setVal(item.getVal() * 2);
            LOG.info(Thread.currentThread().getName() +
                ": Trailer woker set value: " + item.getVal());
    
            return EMPTY;
          }
        };
    

    然后根据上面的work,构建Pipeline task,

        PipelineTask task = new PipelineTask(conf, "pipeline.testcarriage");
        task.appendWork(header, "header");
        task.appendWork(middle, "middle" + i);
        task.appendWork(trailer, "trailer");
        task.kickOff();
        task.join();
    

    在Pipeline的task的构建过程中,会进行车厢的组织,

      /**
       * Pipeline work的构建
       */
      public void kickOff() {
        for (int i = 0; i < pipeLine.size(); i++) {
          // 从pipeline中获取当前车厢
          Carriage ca = pipeLine.get(i);
          if (i == 0) {
        	// 第一个车厢为头车厢,没有前车厢,标记Header字段
            ca.setPrev(null);
            ca.setHeader();
          }
          if (i == pipeLine.size() - 1) {
        	// 如果是最后一节末尾车厢,则标记Trailer字段
            ca.setTrailer();
          }
          if (i > 0) {
        	// 中部车厢进行前车厢的引用赋值
            ca.setPrev(pipeLine.get(i - 1));
          }
          // 车厢衔接处理设置完毕,然后进行启动执行
          ca.kickOff();
        }
      }
    

    每节车厢在组织完毕后,就可以开始run起来了。

    对于每个车厢,它有如下的定义:

      private class Carriage<K, T> {
        private PipelineWork<K, T> work;
        private ArrayList<T>[] transferList;
        private ReentrantLock[] lock;
        private Throwable[] exception;
        // 每个线程的是否执行完毕的标识
        private boolean[] done;
        // 执行Carriage任务的线程数
        private int numThreads;
        private int blockingThreshold, transferThreshold, logThreshold;
        // 此车厢依赖的前车厢
        private Carriage prev;
        private volatile boolean shouldStop = false;
        final private List<T> EMPTY_LIST = new ArrayList<T>(0);
        // 是否是头车厢
        private boolean header = false;
        // 是否是末尾车厢
        private boolean trailer = false;
        // 执行当前车厢的线程组
        private Thread[] threads = null;
        private String name = null;
        ...
        
        public Carriage(PipelineWork<K, T> inWork, String caName) {
          name = caName;
          blockingThreshold = conf.getInt(getBlockingThresholdKey(name), 10000);
          transferThreshold = conf.getInt(getTransferThresholdKey(name), 100);
          logThreshold = conf.getInt(getLoggingThresholdKey(name), 10000000);
          numThreads = conf.getInt(getNumThreadsKey(name), 3);
    
          if (blockingThreshold < 0 || transferThreshold < 0
              || blockingThreshold < transferThreshold || logThreshold < 0
              || numThreads < 0) {
            throw new IllegalArgumentException("Illegal argument for PipelineTask "
                + name);
          }
          work = inWork;
          transferList = new ArrayList[numThreads];
          for (int i = 0; i < numThreads; i++) {
            transferList[i] = null;
          }
          lock = new ReentrantLock[numThreads];
          for (int i = 0; i < numThreads; i++) {
            lock[i] = new ReentrantLock();
          }
          exception = new Throwable[numThreads];
          for (int i = 0; i < numThreads; i++) {
            exception[i] = null;
          }
          done = new boolean[numThreads];
          for (int i = 0; i < numThreads; i++) {
            done[i] = false;
          }
        }
    
        public void setPrev(Carriage ca) {
          prev = ca;
          Preconditions.checkState(header == false);
        }
    
        public void stopCarriager() {
          shouldStop = true;
        }
    
        public void setHeader() {
          header = true;
          Preconditions.checkState(prev == null);
        }
    
        public void setTrailer() {
          trailer = true;
        }
    
        /**
         * 获取车厢的本地处理结果.
         */
        public List<T> getList() {
          int doneThread = 0;
          for (int i = 0; i < transferList.length; i++) {
            lock[i].lock();
            try {
              if (exception[i] != null) {
                return null;
              }
              // 如果发现i下标线程的中间结果列表不为空,则进行返回
              if (transferList[i] != null) {
                ArrayList<T> res = transferList[i];
                // 然后置空此下标值为空
                transferList[i] = null;
                return res;
              } else {
            	// 如果此下标置换列表为空,则判断是否为当前对应的线程已经处理完毕
                if (done[i]) {
                  // 如果是,则进行技术加一
                  doneThread++;
                }
              }
            } finally {
              lock[i].unlock();
            }
          }
    
          // 如果线程结束数量达到置换结果列表原始长度,则返回空,意为当前车厢数据已经完全处理完毕
          if (doneThread == transferList.length) {
            return null;
          } else {
        	// 否则,返回空list,意为暂时没有处理数据
            return EMPTY_LIST;
          }
        }
    
    

    然后就是各个车厢内的并行处理的核心执行逻辑,

       /**
         * 执行车厢内容的线程类
         */
        private class CarriageThread extends Thread {
          // 线程标识
          private int index;
    
          public CarriageThread(int i) {
            index = i;
            this.setName("PipelineTask-" + name + "-" + index);
          }
    
        @Override
        public void run() {
          try {
              Preconditions.checkState((prev != null) ^ header);
              ArrayList<T> localList = null;
              if (!trailer) {
                localList =
                    new ArrayList<T>((blockingThreshold > 0) ? blockingThreshold
                        : 0);
              }
              LOG.info("Carriage " + name + " thread " + index + " is scheduled");
              int itemHandled = 0;
              while (!shouldStop) {
                if (header) {
                  /** 1.头车厢的处理过程 **/
                  // 如果是头车厢,则没有前车厢依赖输入,直接执行
                  T t = work.doWork(null);
                  itemHandled++;
                  if (itemHandled == logThreshold) {
                    LOG.info("Handled " + itemHandled + " in " + name + " thread "
                        + index);
                    itemHandled = 0;
                  }
                  if (!trailer && t != null) {
                	// 将处理结果加入到当前本地结果列表中
                    localList.add(t);
                  } else if (t == null) {
                	// 如果处理结果为空,意为原始输入数据已经完全处理完毕,头车厢任务宣告全部完成
                    break;
                  }
                } else {
                   /** 2.中部车厢/尾车厢的处理过程 **/
                  // 非头车厢,意为当前为中间车厢或者尾车厢,先取出前车厢的处理结果
                  List<K> inputList = prev.getList();
                  if (inputList == null) {
                	// 如果前车厢的处理结果为空,意为前车厢数据已处理完毕,则当前车厢跳出当前循环
                	LOG.info("Break out.");
                    break;
                  } else if (inputList.isEmpty()) {
                	// 如果前车厢的处理结果为Empty,意为前车厢还有未处理的数据,不过还未将结果赋值到置换列表内
                	// 让出当前CPU给其它线程执行
                    Thread.yield();
                    continue;
                  }
    
                  for (K k : inputList) {
                	// 遍历前车厢的数据处理结果,得到新的处理结果
                    T t = work.doWork(k);
                    itemHandled++;
                    if (itemHandled == logThreshold) {
                      LOG.info("Handled " + itemHandled + " in " + name
                          + " thread " + index);
                      itemHandled = 0;
                    }
                    if (!trailer && t != null) {
                      // 将处理结果加入到当前车厢的本地结果列表内,将作为此车厢下节的依赖输入
                      // 后续同样是中间置换结果的赋值过程
                      localList.add(t);
                    }
                  }
                }
                if (trailer) {
                  // 如果是尾车厢,跳过后续本地输出结果的交换
                  continue;
                }
                
                /** 3.执行线程中间结果交换 **/
                // 如果当前本地结果超出阈值设定大小
                if (localList.size() >= transferThreshold) {
                  if (lock[index].tryLock()) {
                	// 而且之前对应位置交换列表为空,则进行赋值
                    if (transferList[index] == null) {
                      transferList[index] = localList;
                      lock[index].unlock();
                      // 交换完毕,重新置空本地结果列表
                      localList =
                          new ArrayList<T>(
                              (blockingThreshold > 0) ? blockingThreshold : 0);
                    } else {
                      lock[index].unlock();
                    }
                  }
                }
    
                // 本地结果列表达到阻塞阈值设定大小,阻塞意为当前本地输出的结果必须要赋值到置换列表内
                if (blockingThreshold > 0 && localList.size() >= blockingThreshold) {
                  lock[index].lock();
                  // 如果当前置换列表数据不为空,意为还没被下游车厢处理,则进行循环等待
                  while (transferList[index] != null) {
                    lock[index].unlock();
                    // 让出CPU给其它线程执行,然后再循环执行判断
                    Thread.yield();
                    lock[index].lock();
                  }
                  // 置换列表被下游车厢取走,则进行新的本地结果赋值
                  transferList[index] = localList;
                  lock[index].unlock();
                  // 交换完毕,重新置空本地结果列表
                  localList = new ArrayList<T>(blockingThreshold);
                }
              }
    
              /** 4.车厢末端中间结果赋值输出 **/
              // 循环执行结果后,将最后一批处理得到的本地输出结果进行输出赋值
              lock[index].lock();
              while (!trailer) {
                if (localList.size() > 0) {
                  // 本地输出结果有值,并且置换列表为空,则进行辅助
                  if (transferList[index] == null) {
                    transferList[index] = localList;
                    break;
                  } else {
                	// 不为空,让出当前CPU给其它线程处理,然后再等待下次的循环处理
                    lock[index].unlock();
                    Thread.yield();
                    lock[index].lock();
                    continue;
                  }
                } else {
                  break;
                }
              }
              
              // 本地输入结果处理完毕,标记当前线程处理完毕
              done[index] = true;
              if (prev != null) {
                exception[index] = prev.getException();
              }
              lock[index].unlock();
            } catch (Throwable t) {
              // 更新对应下标线程信息的变量
              LOG.warn("Exception in pipeline task ", t);
              lock[index].lock();
              exception[index] = t;
              lock[index].unlock();
              shouldStop = true;
            }
            LOG.info("Carriage " + name + " thread " + index + " is done");
          }
        }
    

    大家可以反复阅读上述的执行逻辑,设计还是比较巧妙的。

    以下是其中的测试结果输出:每节车厢内的执行逻辑可以并行地run起来,无须强同步的控制。

    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(69)) - PipelineTask-middle0-0: Middle worker set value: 2884
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(69)) - PipelineTask-middle0-0: Middle worker set value: 2888
    2019-09-03 23:25:48,392 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(69)) - PipelineTask-middle0-2: Middle worker set value: 1704
    2019-09-03 23:25:48,392 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(56)) - PipelineTask-header-0: Head worker produce item: 1976
    2019-09-03 23:25:48,392 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(79)) - PipelineTask-trailer-2: Trailer woker set value: 176
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(79)) - PipelineTask-trailer-2: Trailer woker set value: 192
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(79)) - PipelineTask-trailer-2: Trailer woker set value: 208
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(79)) - PipelineTask-trailer-2: Trailer woker set value: 232
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(56)) - PipelineTask-header-0: Head worker produce item: 1992
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(79)) - PipelineTask-trailer-0: Trailer woker set value: 16
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(69)) - PipelineTask-middle0-2: Middle worker set value: 1720
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(69)) - PipelineTask-middle0-0: Middle worker set value: 2892
    2019-09-03 23:25:48,395 INFO util.TestPipelineTask (TestPipelineTask.java:doWork(69)) - PipelineTask-middle0-0: Middle worker set value: 2896

    引用


    [1].https://issues.apache.org/jira/browse/HDFS-13700 . The process of loading image can be done in a pipeline model

    展开全文
  • python 并行处理The idea of ​​creating a practical guide for Python parallel processing with examples is actually not that old for me. We know that this is not really one of the main contents for ...

    python 并行处理

    The idea of ​​creating a practical guide for Python parallel processing with examples is actually not that old for me. We know that this is not really one of the main contents for Python. However, when the time comes to work with big data, we cannot be very patient about the time. And at this point, it’s no secret that we need new equipment to take on big tasks.

    创建带有示例的Python并行处理实用指南的想法实际上对我来说并不那么古老。 我们知道,这实际上并不是Python的主要内容之一。 但是,当需要处理大数据时,我们不能对时间非常耐心。 在这一点上,我们需要新设备来承担重大任务已不是什么秘密。

    This article will help you understand:

    本文将帮助您了解:

    • Why is parallel processing and what is parallel processing?

      为什么要进行并行处理,什么是并行处理?

    • Which function is used and how many processors can be used?

      使用哪个功能以及可以使用多少个处理器?

    • What should I know before starting parallelization?

      开始并行化之前我应该​​知道些什么?

    • How is any function parallelization?

      函数如何并行化?

    • How to parallelize Pandas DataFrame?

      如何并行化Pandas DataFrame?

    1.为什么和什么? (1. Why and What?)

    为什么需要并行处理? (Why do i need parallel processing?)

    • A single process covers a separately executable piece of code

      单个过程涵盖了单独的可执行代码段
    • Some sections of code can be run simultaneously and allow, in principle, parallelization

      某些代码段可以同时运行,并且原则上允许并行化
    • Using the features of modern processors and operating systems, we can shorten the total execution time of a program, for example by using each core of a processor.

      利用现代处理器和操作系统的功能,我们可以缩短程序的总执行时间,例如通过使用处理器的每个内核。
    • You may need this to reduce the complexity of your program / code and outsource the workpieces to specialist agents who act as sub-processes.

      您可能需要这样做,以减少程序/代码的复杂性,并将工件外包给充当子流程的专业代理商。

    什么是并行处理? (What is the Parallel Processing?)

    • Parallel processing is a mode of operation in which instructions are executed simultaneously on multiple processors on the same computer to reduce overall processing time.

      并行处理是一种操作模式,其中指令在同一台计算机上的多个处理器上同时执行,以减少总体处理时间。
    • It allows you to take advantage of multiple processors in one machine. In this way, your processes can be run in completely separate memory locations.

      它使您可以在一台计算机上利用多个处理器。 这样,您的进程可以在完全独立的内存位置中运行。

    2.使用什么功能? (2. What function is used?)

    Using the standard multiprocessing module, we can efficiently parallelize simple tasks by creating child processes.Whether you are a Windows user or a Unix user, this does not change. To basically understand parallel processing, this example can be given:

    使用标准的multiprocessing模块,我们可以通过创建子进程来有效地并行化简单任务。无论您是Windows用户还是Unix用户,这都不会改变。 为了基本了解并行处理,可以给出以下示例:

    from multiprocessing import Pooldef f(x):return x*xif __name__ == '__main__':with Pool(5) as p:
    print(p.map(f, [1, 2, 3]))
    output: [1, 4, 9]

    If you think whyif __name__ == '__main__' part is necessary, “Yes, the” entry point “of the program must be protected” (https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing-programming)

    如果您认为为什么if __name__ == '__main__'部分,请“是,必须保护程序的“入口点””( https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing -编程 )

    我最多可以运行几个并行进程? (How many parallel processes can I run at most?)

    The maximum number of processes you can run at one time is limited by the number of processors on your computer. However, if you don’t know how many processors you have, you can query as in the example below:

    一次可以运行的最大进程数受计算机上处​​理器数量的限制。 但是,如果您不知道有多少个处理器,则可以按照以下示例进行查询:

    Image for post

    3.开始并行化之前我应该​​知道些什么? (3. What should I know before starting parallelization?)

    'multiprocessing’ works in Ipython console but not in Jupyter Notebook and why?

    'multiprocessing'在Ipython控制台中起作用,但在Jupyter Notebook中不起作用,为什么?

    It does not work inside jupyter because it need to run directly. In short, the subprocesses do not know they are subprocesses and are attempting to run the main script recursively. It is well known that multiprocessing is a bit dangerous in interactive translators in Windows.

    它在jupyter内部不起作用,因为它需要直接运行。 简而言之,子进程不知道它们是子进程,而是试图递归运行主脚本。 众所周知,在Windows中的交互式翻译器中,多处理有些危险。

    The library was originally developed under Windows and Python2, and there were no problems with multiprocessing before. However multiprocessing works perfectly if you run it from the command line.

    该库最初是在Windows和Python2下开发的,以前的多处理没有问题。 但是,如果您从命令行运行多处理,则可以完美地工作。

    4.什么是函数并行化? (4. How is any function parallelization?)

    处理 (Process)

    By subclassing multiprocessing.process, you can create a process that runs independently. By extending the __init__ method you can initialize resource and by implementing Process.run() method you can write the code for the subprocess.

    通过子类化multiprocessing.process,可以创建独立运行的进程。 通过扩展__init__方法,您可以初始化资源,并通过实现Process.run()方法,可以编写子流程的代码。

    import import class def super(Process, self).__init__()self.id= def time.sleep(1)print("This is the process with id: {}".format(self.id))

    We need to initialize our Process object and invoke Process.start() method. For this Process.start() will create a new process and will invoke the Process.run() method.

    我们需要初始化我们的Process对象并调用Process.start()方法。 为此,Process.start()将创建一个新进程,并将调用Process.run()方法。

    if __name__ == '__main__':
    p = Process(0)
    p.start()

    After p.start() will be executed immediately before the task completion of process p. To wait for the task completion, you can use Process.join()

    在p.start()之后,将立即执行进程p的任务。 要等待任务完成,可以使用Process.join()

    if __name__ == '__main__':
    p = Process(0)
    p.start()
    p.join()=
    p.join()

    And output should appear like this:

    输出应如下所示:

    output: 
    This is the process with id: 0
    This is the process with id: 1

    泳池班 (Pool Class)

    You can initialize a Pool with ’n’ number of processors and pass the function you want to parallelize to one of Pools parallization methods.

    您可以使用'n'个处理器初始化池,然后将要并行化的函数传递给池并行化方法之一。

    The multiprocessing.Pool() class spawns a set of processes called workers and can submit tasks using the methods apply()/apply_asnc() and map()/map_async().

    multiprocessing.Pool()类产生了一组称为worker的进程,可以使用apply() / apply_asnc()map() / map_async()方法提交任务。

    Let’s show Pool using map() first:

    让我们 首先 使用 map() 显示Pool

    import import def return * if ==     pool =     pool = =4)    inputs =     outputs =     print("Input: {}".format(inputs))    print("Output: {}".format(outputs))

    So the output for map() should look like this:

    因此map()的输出应如下所示:

    Input: [0,2,4,6,8]
    Output: [0,4,16,36,64]

    Using map_async(), the AsyncResult object is returned immediately without stopping the main program and the task is done in the background.

    使用map_async() ,无需停止主程序即可立即返回AsyncResult对象,并且该任务在后台完成。

    import import def return * if ==     pool =     inputs =     outputs_async =     outputs =     print("Output: {}".format(outputs))

    The output for map_async() should look like this:

    map_async()的输出应如下所示:

    Output: [0, 1, 4, 9, 16]

    Let’s take a look at an example Pool.apply_async. Pool.apply_async conditions a task consisting of a single function to one of the workers. It takes the function and its arguments and returns an AsyncResult object. For example:

    让我们看一个示例Pool.apply_async. Pool.apply_async一个包含单个功能的任务Pool.apply_async给其中一个工作程序。 它接受函数及其参数,并返回一个AsyncResult对象。 例如:

    import import def return * if ==     pool =     result_async = = for in    range(5)]    results = for in     print("Output: {}".format(results))

    And output for Pool.apply_async :

    并输出Pool.apply_async

    Output: [0, 1, 4, 9, 16]

    5.如何并行化Pandas DataFrame? (5. How to parallelize Pandas DataFrame?)

    When it comes to parallelizing a DataFrame, you can apply it to the dataframe as an input parameter for the function to be parallelized, a row, a column, or the whole dataframe.

    当涉及到并行化一个DataFrame时,您可以将其作为要并行化的函数,行,列或整个数据帧的输入参数应用到该数据帧。

    You just need to know: For the parallelizing on an entire dataframe, you will use the pathos package that uses dill for serialization internally.

    您只需要知道:为了在整个数据帧上进行并行化,您将使用在内部使用dill进行序列化的pathos软件包。

    Forany example, lets create a dataframe and see how to do row-wise and column-wise paralleization.

    例如,让我们创建一个数据框,看看如何进行行和列并行化。

    First, let’s create a sample data frame and see how to do row parallelization.

    首先,让我们创建一个示例数据框,然后看看如何进行行并行化。

    import numpy as np
    import pandas as pd
    import multiprocessing as mp
    df = pd.DataFrame(np.random.randint(3, 10, size=[5, 2]))print(df.head())
    #> 0 1
    #> 0 8 5
    #> 1 5 3
    #> 2 3 4
    #> 3 4 4
    #> 4 7 9

    Now let’s apply the hypotenuse Function to each raw, but since we want to run 4 transactions at a time, we will need df.itertuples(name=False)name=Falsehypotenuse

    现在,将hypotenuse函数应用于每个原始函数,但是由于我们想一次运行4个事务,因此需要df.itertuples(name=False)name=Falsehypotenuse

    # Row wise Operation
    def hypotenuse(row):
    return round(row[1]**2 + row[2]**2, 2)**0.5
    with mp.Pool(4) as pool:
    result = pool.imap(hypotenuse, df.itertuples(name=False), chunksize=10)
    output = [round(x, 2) for x in result]
    print(output)
    #> [9.43, 5.83, 5.0, 5.66, 11.4]

    Now let’s do a column-by-column paralleling. For this I use df.iteritems()sum_of_squares to pass an entire column as an array to the function.

    现在,让我们逐列进行并行化。 为此,我使用df.iteritems()sum_of_squares将整个列作为数组传递给函数。

    # Column wise Operation
    def sum_of_squares(column):
    return sum([i**2 for i in column[1]])
    with mp.Pool(2) as pool:
    result = pool.imap(sum_of_squares, df.iteritems(), chunksize=10)
    output = [x for x in result]
    print(output)
    #> [163, 147]

    Here is an interesting notebook with which you can compare processing times for multitasking. Please read with pleasure

    这是一个有趣的笔记本,您可以用它来比较多任务处理时间。 请愉快地阅读

    结论 (Conclusion)

    In this article, we have seen the various ways to implement parallel processing using the general procedure and multitasking module. Taking advantage of these benefits is indeed the same for any size machine. Our project will gain speed, depending on the number of processors, when the procedures are followed exactly. I hope such a basic start will be useful for you.

    在本文中,我们已经看到了使用常规过程和多任务模块实现并行处理的各种方法。 对于任何大小的机器,利用这些好处的确是相同的。 完全遵循这些步骤,我们的项目将根据处理器的数量提高速度。 我希望这样的基本开始对您有用。

    If your job requires speed, who wouldn’t want that!

    如果您的工作需要速度,谁会不想要速度!

    翻译自: https://medium.com/@kurt.celsius/5-step-guide-to-parallel-processing-in-python-ac0ecdfcea09

    python 并行处理

    展开全文
  • 并行处理技术ppt文档,文档较为详细的介绍了并行计算发展,处理流程,处理思路以及核心部分等。
  • HEVC并行处理方法

    2018-08-08 11:33:17
    HEVC提供了适用于进行数据并行处理的结构单元,如片和Tile,在不同的片和Tile中,数据信息是相互独立的,这样有利于将其分配给不同的运算单元来处理;对于小于片和Tile的划分单元,HEVC支持波前并行处理(Wavefront ...

    HEVC提供了适用于进行数据并行处理的结构单元,如片和Tile,在不同的片和Tile中,数据信息是相互独立的,这样有利于将其分配给不同的运算单元来处理;对于小于片和Tile的划分单元,HEVC支持波前并行处理(Wavefront Parallel Processing, WPP),这是对于相互具有依赖关系的图像单元进行数据并行处理的方法。在HEVC中,并行处理技术主要包括:基于Tile的并行和波前并行两种。在进行基于Tile的并行时,由于Tile的相互独立性,不需要考虑它们之间的相互依赖关系,而在进行波前并行处理时,数据间的相互依赖关系是必不可少的。

    展开全文
  • pytorch并行处理

    千次阅读 2019-03-06 16:39:50
    pytorch并行处理是指将模型和相应的数据放在多个GPU机器上运行。 假如有4台GPU机器,那么会在每一个GPU机器上拷贝一个模型,然后把数据平均分成四份(若batch_size除不尽,会自动适配) 2.代码 self.device = ...
  • 分布式计算与并行处理 分布式计算与并行处理分布式计算与并行处理分布式计算与并行处理分布式计算与并行处理分布式计算与并行处理分布式计算与并行处理
  • 在生信分析中,经常会遇到不同的重复和处理,这样的分析过程有时是非常费时且占用资源并不是很多的,可以同时在后台运行以节约时间,这是并行处理的意义。除了需要并行处理,循环迭代来遍历整个文件夹的需要分析的...
  • MPP(大规模并行处理)简介

    万次阅读 多人点赞 2018-06-07 15:35:56
    MPP (Massively Parallel Processing),即大规模并行处理,在数据库非共享集群中,每个节点都有独立的磁盘存储系统和内存系统,业务数据根据数据库模型和应用特点划分到各个节点上,每台数据节点通过专用网络或者...
  • 当人们想到数据库引擎中的并行处理时,他们会立即想到通过让多个处理器处理单个查询来缩短查询响应时间。 人们往往会忽略的是,并行处理也可以加快索引处理的速度,从而提高数据库服务器的整体性能。 使用IBM DB2 ...
  • Matlab 并行处理需要特别处理

    千次阅读 2018-11-07 10:52:14
    最近用到matlab做测试和数据处理,想到是否可以多线程处理问题,于是测试了一matlab的并行处理效果。  测试环境,联想E480 , 8250U,4核8线程@1.6G~3.4G,内存8G@DDR24000,256G SSD,winddows 10, matlab 2014a ....
  • Pytorch并行处理机制

    千次阅读 2019-05-15 10:55:29
    本文属转载,原文地址:... Pytorch 的多 GPU 处理接口是 torch.nn.DataParallel(module, device_ids),其中 module 参数是所要执行的模型,而 device_ids 则是指定并行的 GPU id 列表。 而其并行处理机...
  • GPU 图像并行处理

    千次阅读 2018-03-23 16:58:09
    GPGPU、GPU、CUDA ...与CPU相比,GPU具有以下优势:强大的并行处理能力和高效率的数据传输能力。其中,并行性主要体现了指令级、数据级和任务级三个层次。高效率的数据传输主要体现在两个方面: G...
  • 理解Flink并行处理

    2020-04-14 07:53:38
    理解Flink并行处理 ··························· 分布式计算中,Flink 将算子(operator)的 subtask *链接(chain)*成 task【类似于Spark中的Stage阶段】。每个 task 由一个线程执行。...
  • R语言 并行处理

    万次阅读 2015-06-17 18:12:58
    parallel packageR自带的包,可以实现并行处理。library(parallel) detectCores(logical = F) #获得实际核数 cl (getOption("cl.cores", 4)) # 设置并行核数为4 clusterExport(cl=cl, varlist=c("text.var", "ntv", ...
  • 通常我们用freesurfer处理一个数据所用时间需要6个多小时,但实际上电脑的CPU并没有被充分的利用,freesurfer处理一个数据一般只需要一个逻辑内核,所以如果您有10个逻辑内核,可以同时并行处理10个数据,让电脑的...
  • 本文主要研究了多DSP(Digital Signal Processor)并行处理系统的设计,涉及并行 处理系统的体系结构、数字信号处理芯片(DSP)在并行处理中的应用、信号处理算法 并行化的研究等等。本文对组成并行处理系统的三个要素:...
  • Java7之前并行处理数据集合是一件很麻烦的事情:第一步:明确把包含数据的数据结构分成若干子部分;第二步,为每一个子部分分配一个线程;第三步,你需要恰当的时候对他们进行同步来避免不出现竞争的条件;第四步,...
  • Oracle 并行处理

    千次阅读 2013-08-03 17:48:56
    Oracle 并行处理  2013-03-30 12:25:25| 分类: Oracle | 标签:并行  |字号大中小 订阅 在一个串行的执行环境中,由单个进行程或线程负责处理SQL的操作,而且每个动作必须在随后的动作开始...
  • DOS-BAT并行处理

    千次阅读 2016-10-14 10:07:42
    DOS-BAT并行处理
  • 如果站的更高一点来看,我们每台机器都可以是一个处理节点,多台机器并行处理;并行的处理方式可以说无处不在,本文主要来谈谈Java在并行处理方面的努力。 无处不在的并行 Java的垃圾回收器,我们可以看到每一代版本...
  • 并行算法:如何利用并行处理提高算法的执行效率 算法的目的就是为了提高代码执行的效率,当算法无法继续优化的情况下,该如何进一步提高执行效率?如何借助并行计算的处理思想对算法进行改造? 并行排序 给大小为8GB...
  • CUDA和OpenCV图像并行处理方法研究,希望对研究并行计算的朋友们有帮助!
  • 多进程并行处理例子

    千次阅读 2019-07-02 15:26:24
    因为服务器cpu比较多,所以可以进行多进程的并行处理任务,定义了48个进程同时跑,单一进程处理一张图片需要3--5分钟,比较耗时。主要任务是从openimage数据集中分割出自己想要的分割数据集。 code: import os ...
  • 分布式处理 和 并行处理系统 定义

    千次阅读 2014-12-05 15:55:21
    并行处理系统  是利用多个功能部件或多个处理机同时工作来提高系统性能或可靠性的计算机系统,这种系统至少包含指令级或指令级以上的并行。  分布式处理系统  广义上说分布式处理也可以认为是一种并行...
  • 并行处理技术

    2009-12-28 14:39:00
    模型并行性是指在同一时刻或同一时间间隔内完成两种或两种以上性质相同或不相同的工作,只要在时间上互相重叠,...(3)任务处理的并行:是指将程序分解成可以并行处理的多个处理任务,而使两个或多个任务并行处理。...
  • 1.单线程 2.多线程 3.线程池 4.foke/join 5.流处理 这样的一个演变过程提高了cpu的利用率和执行速度 ...如果站的更高一点来看,我们每台机器都可以是一个处理节点,多台机器并行处理;并行的处理...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 30,978
精华内容 12,391
关键字:

并行处理