精华内容
下载资源
问答
  • 包含两种平台上运行的kmeans算法:一种是在Hadoop系统上的并行化kmeans算法,支持读文件,执行聚类算法,输出质心文件,将每个数据的聚类信息输出到控制台上;另一种是串行的聚类算法,支持读文件数据,执行kmeans...
  • 高斯消去法(LU分解)并行算法:设计实现SSE算法,加速计算过程。包括代码以及说明文档。
  • 在理想情况下,编译器使用自动并行化能够管理一切事务,使用OpenMP指令的一个优点是将并行性和算法分离,阅读代码时候无需考虑并行化是如何实现的。当然for循环是可以并行化处理的天然材料,满足一些约束的for循环...



    在理想情况下,编译器使用自动并行化能够管理一切事务,使用OpenMP指令的一个优点是将并行性和算法分离,阅读代码时候无需考虑并行化是如何实现的。当然for循环是可以并行化处理的天然材料,满足一些约束的for循环可以方便的使用OpenMP进行傻瓜化的并行。


    为了使用自动并行化对Mandelbrot集合进行计算,必须对代码进行内联:书中首次使用自动并行化时候,通过性能分析发现工作在线程中并未平均分配。

    #include <stdio.h>
    #include <malloc.h>
    #define SIZE 4000
    
    int inSet(double ix,double iy)
    {
    	int iterations = 0;
    	double x = ix,y = iy;
    	double x2 = x*x, y2 = y*y;
    
    	while ((x2 + y2 < 4) && (iterations < 1000))
    	{
    		y = 2*x*y + iy;
    		x = x2 -y2 +ix;
    		x2 = x*x;
    		y2 = y*y;
    		iterations++;
    	}
    
    	return iterations;
    }
    
    int main()
    {
    	int *matrix[SIZE];
    	for (int i = 0; i < SIZE; i++)
    	{
    		matrix[i] = (int* )malloc( SIZE*sizeof(int) );
    	}
    
    #pragma omp parallel for
    	for (int x = 0 ;x <SIZE; x++)
    	{
    		for (int y =0;y <SIZE;y++)
    		{
    			double xv = ((double)x -(SIZE/2)) / (SIZE/4);
    			double yv = ((double)y -(SIZE/2)) / (SIZE/4);
    			matrix[x][y] = inSet(xv,yv);
    		}
    	}
    
    	for (int x =0; x<SIZE;x++)
    	{
    		for (int y =0;y<SIZE;y++)
    		{
    			if (matrix[x][y] == -7)
    			{
    				printf(" ");
    			}
    		}
    	}
    
    	return 0;
    }


        当我们看到 分形图的时候应该可以很快的理解负荷不均衡从那里产生,分形图中大部分点不在集合中,这部分点只需要少量的迭代就可以确定,但有些在集合中的点则需要大量的迭代。

         当然我再一次见识到了OpenMP傻瓜化的并行操作机制,纠正工作负荷不均衡只要更改并行代码调度子句就可以了,使用动态指导调度,下面代码是增加了OpenCV的显示部分:


    #include "Fractal.h"
    #include <Windows.h>
    #include <omp.h>
    
    int Fractal::Iteration(Complex a, Complex c)
    {
    	double maxModulus = 4.0;
    	int maxIter = 256;
    	int iter = 0;
    	
    	Complex temp(0,0) ;
    
    	while ( iter < maxIter && a.modulus() < maxModulus)
    	{
    		a = a * a ;
    		a += c;
    		iter++;
    	}
    	return iter;
    }
    
    cv::Mat Fractal::generateFractalImage(Border border, CvScalar colortab[256] )
    {
    	cv::Size size(500,500);
    
    	double xScale = (border.xMax - border.xMin) / size.width;
    	double yScale = (border.yMax - border.yMin) / size.height;
    
    	cv::Mat img(size, CV_8UC3);
    
    #pragma omp parallel for schedule(dynamic)
    	for (int y=0; y<size.height; y++)
    	{
    		for (int x=0; x<size.width; x++)
    		{
    			double cx = border.xMin + x * xScale;
    			double cy = border.yMin + y * yScale;
    
    			Complex a(0.0, 0.0);
    			Complex c(cx, cy);
    			int nIter ;
    
    			if (type == MANDELBROT)
    			{
    				nIter = Iteration(a, c);
    			}
    			else if (type == JUALIA)
    			{
    				nIter = Iteration(c, offset);
    			}
    
    			int colorIndex =  (nIter) % 255;
    
    			cv::Vec3b color;
    			color.val[0] = colortab[colorIndex].val[0];
    			color.val[1] = colortab[colorIndex].val[1];
    			color.val[2] = colortab[colorIndex].val[2];
    			img.at<cv::Vec3b>(y,x) = color;
    		}
    	}
    
    	return img;
    }
    

      #pragma omp parallel for schedule(dynamic) 子句

    schedule子句:

      schedule(type[, size]),

      参数type是指调度的类型,可以取值为static,dynamic,guided,runtime四种值。其中runtime允许在运行时确定调度类型,因此实际调度策略只有前面三种。

      参数size表示每次调度的迭代数量,必须是整数。该参数是可选的。当type的值是runtime时,不能够使用该参数。

    动态调度dynamic

      动态调度依赖于运行时的状态动态确定线程所执行的迭代,也就是线程执行完已经分配的任务后,会去领取还有的任务。由于线程启动和执行完的时间不确定,所以迭代被分配到哪个线程是无法事先知道的。

      当不使用size 时,是将迭代逐个地分配到各个线程。当使用size 时,逐个分配size个迭代给各个线程。


    动态调度迭代的分配是依赖于运行状态进行动态确定的,所以哪个线程上将会运行哪些迭代是无法像静态一样事先预料的。

    加速结果:

    1.放大加速结果


    2.未加速时候的放到功能,基本是3-5倍这个水平,也就是相当于台式机cpu 的个数?本人的猜测


    3.图像计算结果(未加速)


    4. 动态加速结果


    代码:http://download.csdn.net/detail/wangyaninglm/9516035


    参考文献:


    http://baike.baidu.com/view/1777568.htm?fromtitle=Mandelbrot%E9%9B%86%E5%90%88&fromid=1778748&type=syn

    http://www.cnblogs.com/easymind223/archive/2013/01/19/2867620.html
    戈夫. 多核应用编程实战[M]. 人民邮电出版社, 2013.

    http://openmp.org/mp-documents/OpenMP3.1-CCard.pdf

    http://blog.csdn.net/gengshenghong/article/details/7000979

    展开全文
  • Python并行化

    2020-03-31 12:33:10
    当处理的数据很小时大可不必考虑并行的问题,但是一但处理可并行化的任务以及大文件时,让程序并行化无疑能提高很大的性能。 需要并行的任务很多,比如IO密集型的爬数据,读写磁盘等,CPU计算密集型的计算任务等等。...

    计算密集型和数据密集型解释

    futures包的使用

    当处理的数据很小时大可不必考虑并行的问题,但是一但处理可并行化的任务以及大文件时,让程序并行化无疑能提高很大的性能。
    需要并行的任务很多,比如IO密集型的爬数据,读写磁盘等,CPU计算密集型的计算任务等等。而Python由于GIL的原因,默认情况下只能单线程运行,无法直接利用硬件的多核多线程,因此效率较低,python也早提供了一些列的多线程多进程的库可以用来使用,比如multiprocessing, queue 等等, 不过使用起来都相对复杂,不易控制。 经过几番尝试发现,发现futures基本是最方便将单线程/进程代码改为并发的代码的一个模块。

    futures的使用:
    #!/usr/bin/env python
    # encoding: utf-8
    from concurrent import futures
    import requests
    r = requests.Session()
    def func(url):
        data = r.get(url).text
        return data
    task_list = ["https://www.163.com", "https://www.zhibo8.cc", "https://www.baidu.com"]
    res = []
    with futures.ThreadPoolExecutor(max_workers=2) as executor:
       f_tasks = {executor.submit(func, url): url for url in task_list}
       for f in futures.as_completed(f_tasks):
                url = f_tasks[f]
                try:
                    data = f.result()
                    res.append(data)
                    print(f"{url} finished")
                except Exception as exc:
                    print(f"{url} in wrong:{exc}")
    

    futures详细介绍
    futures介绍
    futures官方示例

    展开全文
  • 数据并行化

    2019-04-28 20:29:23
    2.并行化流操作 3.模拟掷骰子 4.性能 5.总结 1.并行和并发 并发是两个任务共享时间段,并行则是两个任务在同一时间发生,比如运行在多核CPU上。如果一个程序要运行两个任务,并且只有一个CPU给他们分配了不同的...

    目录

    1.并行和并发

    2.并行化流操作

    3.模拟掷骰子

    4.性能

    5.总结


    1.并行和并发

    并发是两个任务共享时间段,并行则是两个任务在同一时间发生,比如运行在多核CPU上。如果一个程序要运行两个任务,并且只有一个CPU给他们分配了不同的时间片,那么这就是并发而不是并行。并发和并行的区别如下:

    数据并行化是指将数据分成块,为每块数据分配单独的处理单元。当需要在大量数据上执行同样的操作时,数据并行化很管用,它将问题分解为可在多块数据上求解的形式,然后对每块数据执行运算,最后将各数据块上得到的结果汇总,从而获得最终答案。

    2.并行化流操作

    并行化操作流只需改变一个方法调用即可实现,如果已经有一个Stream对象,调用它的parallel方法就能让其拥有并行操作的能力。如果想从一个集合类创建一个流调用parallelStream就能立即获得一个拥有并行能力的流。

    并行化运行基于流的代码是否比串行化运行更快呢?答案是否定的,并行流带来速度提升受输入流的大小,编写代码的方式和核的数量的影响。

    3.模拟掷骰子

    如果公平地投掷两次骰子,然后将朝上的一面的点数相加,就会得到一个2~12的数字,重复执行N次,计算点数之和出现的概率。我们可以使用下面的代码来并行化模拟掷骰子:

        public Map<Integer, Double> parallelDiceRolls() {
            double fraction = 1.0 / N;
            return IntStream.range(0, N).parallel().mapToObj(twoDiceThrows()).collect(groupingBy(side -> side, summingDouble(n -> fraction)));
        }
    
        private IntFunction<Integer> twoDiceThrows() {
            return i -> {
                ThreadLocalRandom random = ThreadLocalRandom.current();
                int firstThrow = random.nextInt(1, 7);
                int secondThrow = random.nextInt(1, 7);
                return firstThrow + secondThrow;
            };
        }
    
        public static void main(String[] args) {
            ManualDiceRolls rolls = new ManualDiceRolls();
            Map<Integer, Double> result = rolls.parallelDiceRolls();
            result.entrySet().forEach(System.out::println);
        }

    上述代码中,我们使用IntStream的range方法创建大小为N的流,然后调用parallel方法使用流的并行化操作,twoDiceThrow函数模拟了连续两次扔骰子事件,返回值是两次点数之和;使用mapToObject方法在流上使用该函数,然后得到需要合并的所有结果的流,使用groupingBy方法将点数一样的结果合并,summingDouble将数字映射为1/N并进行简单相加,最终得到Map<Integer,Double>是点数之和到它们的概率的映射。这是一个很好的并行化案例,并行化能带来速度的提升。

    下面的代码给出了手动实现并行化模拟掷骰子的代码,可以看出,大多数代码都在处理调度和等待线程池中的某项任务完成。而使用并行化流时,这些都不用程序员手动管理。

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ThreadLocalRandom;
    
    /**
     * 公平的掷骰子
     *
     * @author: xuzongxin
     * @date: 2019/4/21 23:07
     * @description:
     */
    public class ManualDiceRolls {
        private static final int N = 1000000000;
    
        private final double fraction;
        private final Map<Integer, Double> results;
        private final int numberOfThreads;
        private final ExecutorService executor;
        private final int workPerThread;
    
        public ManualDiceRolls() {
            this.fraction = 1.0 / N;
            this.results = new ConcurrentHashMap<>();
            this.numberOfThreads = Runtime.getRuntime().availableProcessors();
            this.executor = Executors.newFixedThreadPool(numberOfThreads);
            this.workPerThread = N / numberOfThreads;
        }
    
        public void simulateDiceRoles() {
            List<Future<?>> futures = submitJobs();
            awaitCompletion(futures);
            printResults();
        }
    
        private void printResults() {
            results.entrySet().forEach(System.out::println);
        }
    
        private List<Future<?>> submitJobs() {
            List<Future<?>> futures = new ArrayList<>();
            for (int i = 0; i < numberOfThreads; i++) {
                futures.add(executor.submit(makeJob()));
            }
            return futures;
        }
    
        private Runnable makeJob() {
            return () -> {
                ThreadLocalRandom random = ThreadLocalRandom.current();
                for (int i = 0; i < workPerThread; i++) {
                    int entry = twoDiceThrows(random);
                    accumulateResult(entry);
                }
            };
        }
    
        /**
         * compute相当于put,用于存放新的值
         *
         * @param entry
         */
        private void accumulateResult(int entry) {
            results.compute(entry, (key, previous) -> previous == null ? fraction : previous + fraction);
        }
    
        private int twoDiceThrows(ThreadLocalRandom random) {
            int firstThrow = random.nextInt(1, 7);
            int secondThrow = random.nextInt(1, 7);
            return firstThrow + secondThrow;
        }
    
    
        private void awaitCompletion(List<Future<?>> futures) {
            futures.forEach(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            executor.shutdown();
        }
    
    
        public static void main(String[] args) {
            ManualDiceRolls rolls = new ManualDiceRolls();
            rolls.simulateDiceRoles();
        }
    }

    执行的结果为:

    2=0.02777869699056155
    3=0.05555046997594372
    4=0.08331533196132952
    5=0.1111279239466902
    6=0.13889180693207653
    7=0.1666550339174632
    8=0.13889050193207722
    9=0.11112350594669253
    10=0.08334383096131452
    11=0.05554903997594447
    12=0.0277738569905641

    注意:流框架在并行化处理时,要避免持有锁,流框架会在需要的时候,自己处理同步操作,因此程序员没有必要为自己的数据结构加锁。我们可以使用parallel方法将流转换为并行流,也可以使用sequential方法生成串行流,但是在对流求值时,不能同时处于两种模式,要么是并行的,要么是串行的。

    4.性能

    影响并行流性能的主要因素有5个,依次分析如下:

    1. 数据大小:输入数据的大小会影响并行化处理对性能的提升,只有处理的数据足够多,每个数据处理花费的时间足够长时,并行化处理才有意义。
    2. 源数据结构:每个管道的操作都基于一些初始数据源, 通常是集合。 将不同的数据源分割相对容易,这里的开销影响了在管道中并行处理数据时到底能带来多少性能上的提升。
    3. 装箱:处理基本类型比处理装箱类型要快。
    4. 核的数量:极端情况下, 只有一个核, 因此完全没必要并行化。
    5. 单元处理开销:比如数据大小, 这是一场并行执行花费时间和分解合并操作开销之间的战争。花在流中每个元素身上的时间越长, 并行操作带来的性能提升越明显

    在底层,并行流还是使用了fork/join框架,fork递归式地分解问题,然后每段并行执行,最终由join合并结果,返回最后的值。看如下的并行求和操作代码:

    private int addIntegers(List<Integer> values) {
         return values.parallelStream().mapToInt(i -> i).sum();
    }

    下图形象地展示了上述代码所示的操作:

    假设并行流将我们的工作分解开,在一个四核的机器上并行执行。

    1. 数据被分成四块。
    2. 计算工作在每个线程里并行执行。这包括将每个Integer对象映射成为int值,然后在每个线程里面将1/4的数字相加,理想情况下,我们希望在这里花的时间越多越好,因为这里是并行操作的最佳场所。
    3. 然后合并结果。

    根据问题的分解方式,初始的数据源的特性变得尤为重要,它影响了分解的性能。根据性能的好坏,将核心类库提供的通用数据结构分成以下3组:

    • 性能好:ArrayList、数组或IntStream.range,这些数据结构支持随机读取,能轻易地被任意分解。
    • 性能一般:HashSet、TreeSet,这些数据结构不易公平地被分解,但是大多数时候分解是可能的。
    • 性能差:有些数据结构难以分解,可能要花O(N)的时间复杂度来分解问题,其中包括LinkedList,对半分解太难。还有Streams.iterate和BufferedReader.lines,它们长度未知,因此很难预测该在哪里分解。

    在讨论流中单独操作每一块的种类时,可以分成两种不同的操作:无状态和有状态的。无状态操作整个过程中不必维护状态,有状态操作则有维护状态所需的开销和限制。如果能避开有状态,选用无状态操作,就能获得更好的并行性能。无状态操作包括map、filter和flatMap,有状态操作包括sorted、distinct和limit。

    5.总结

    • 数据并行化是把工作拆分,同时在多核CPU上执行的方式。
    • 如果使用流编写代码,可以通过parallel或者parallelStream方法实现数据并行化操作。
    • 影响性能的五要素是:数据大小、源数据结构、值是否装箱、可用的CPU核数量,以及处理每个元素所花的时间。

     

    展开全文
  • NCS算法的并行化设计实现
  • 介绍了十余种代表性的基于 CPU 和 GPU 并行化碰撞检测算法,并从算法的可扩展性和存储空间消耗以及任务量均衡化等方面分析了这些算法的优缺点。最后总结了并行化碰撞检测算法研究中存在的问题和新的发展方向以及常用...
  • 程序并行化优化目前在PC平台上有一定运用,但在嵌入式平台上还很少,另外,嵌入式多核处理器与PC平台多核处理器有很大不同,因此不能直接将PC平台的并行化优化方法应用到嵌人式平台。本文分别从任务并行和缓存优化两...
  • java并行化编程

    千次阅读 2018-04-03 11:09:57
    说明在写算法时,经常会想并行化一些没有关联的步骤,其原因会减少算法的运行时间,最近再搞算法编程的时候,就遇到这种问题。这里提供一种简单的,并行化java编程。code如下为测试的算法,只需要在compute中书写你...

    说明

    在写算法时,经常会想并行化一些没有关联的步骤,其原因会减少算法的运行时间,最近再搞算法编程的时候,就遇到这种问题。这里提供一种简单的,并行化java编程。

    code

    如下为测试的算法,只需要在compute中书写你要并行化的程序就行了。

    import utility.Parallel;
    
    public class ParallelTest {
    
        public static void main(String[] args) {
            //并行化运行
            testParallel();
    
        }
        public static void testParallel()
        {
    
            Parallel.loop(100, new Parallel.LoopInt()
            {
                //需要并行化的程序
                public void compute(int i)
                {
                    System.out.println("i:"+i);
                }
            });
        }
    }
    

    执行结果

    这里写图片描述

    Parallel类

    关于底层实现的类,这里提供给大家。国外的一个人写的。

    package utility;
    
    import java.util.Collection;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * Utilities for parallel computing in loops over independent tasks. This class
     * provides convenient methods for parallel processing of tasks that involve
     * loops over indices, in which computations for different indices are
     * independent.
     * <p>
     * As a simple example, consider the following function that squares floats in
     * one array and stores the results in a second array.
     * 
     * <pre>
     * <code>
     * static void sqr(float[] a, float[] b) {
     *   int n = a.length;
     *   for (int i=0; i&lt;n; ++i)
     *     b[i] = a[i]*a[i];
     * }
     * </code>
     * </pre>
     * 
     * A serial version of a similar function for 2D arrays is:
     * 
     * <pre>
     * <code>
     * static void sqrSerial(float[][] a, float[][] b) 
     * {
     *   int n = a.length;
     *   for (int i=0; i&lt;n; ++i) {
     *     sqr(a[i],b[i]);
     * }
     * </code>
     * </pre>
     * 
     * Using this class, the parallel version for 2D arrays is:
     * 
     * <pre>
     * <code>
     * static void sqrParallel(final float[][] a, final float[][] b) {
     *   int n = a.length;
     *   Parallel.loop(n,new Parallel.LoopInt() {
     *     public void compute(int i) {
     *       sqr(a[i],b[i]);
     *     }
     *   });
     * }
     * </code>
     * </pre>
     * 
     * In the parallel version, the method {@code compute} defined by the interface
     * {@code LoopInt} will be called n times for different indices i in the range
     * [0,n-1]. The order of indices is both indeterminant and irrelevant because
     * the computation for each index i is independent. The arrays a and b are
     * declared final as required for use in the implementation of {@code LoopInt}.
     * <p>
     * Note: because the method {@code loop} and interface {@code LoopInt} are
     * static members of this class, we can omit the class name prefix
     * {@code Parallel} if we first import these names with
     * 
     * <pre>
     * <code>
     * import static edu.mines.jtk.util.Parallel.*;
     * </code>
     * </pre>
     * 
     * A similar method facilitates tasks that reduce a sequence of indexed values
     * to one or more values. For example, given the following method:
     * 
     * <pre>
     * <code>
     * static float sum(float[] a) {
     *   int n = a.length;
     *   float s = 0.0f;
     *   for (int i=0; i&lt;n; ++i)
     *     s += a[i];
     *   return s;
     * }
     * </code>
     * </pre>
     * 
     * serial and parallel versions for 2D arrays may be written as:
     * 
     * <pre>
     * <code>
     * static float sumSerial(float[][] a) {
     *   int n = a.length;
     *   float s = 0.0f;
     *   for (int i=0; i&lt;n; ++i)
     *     s += sum(a[i]);
     *   return s;
     * }
     * </code>
     * </pre>
     * 
     * and
     * 
     * <pre>
     * <code>
     * static float sumParallel(final float[][] a) {
     *   int n = a.length;
     *   return Parallel.reduce(n,new Parallel.ReduceInt&lt;Float&gt;() {
     *     public Float compute(int i) {
     *       return sum(a[i]);
     *     }
     *     public Float combine(Float s1, Float s2) {
     *       return s1+s2;
     *     }
     *   });
     * }
     * </code>
     * </pre>
     * 
     * In the parallel version, we implement the interface {@code ReduceInt} with
     * two methods, one to {@code compute} sums of array elements and another to
     * {@code combine} two such sums together. The same pattern works for other
     * reduce operations. For example, with similar functions we could compute
     * minimum and maximum values (in a single reduce) for any indexed sequence of
     * values.
     * <p>
     * More general loops are supported, and are equivalent to the following serial
     * code:
     * 
     * <pre>
     * <code>
     * for (int i=begin; i&lt;end; i+=step)
     *   // some computation that depends on i
     * </code>
     * </pre>
     * 
     * The methods loop and reduce require that begin is less than end and that step
     * is positive. The requirement that begin is less than end ensures that reduce
     * is always well-defined. The requirement that step is positive ensures that
     * the loop terminates.
     * <p>
     * Static methods loop and reduce submit tasks to a fork-join framework that
     * maintains a pool of threads shared by all users of these methods. These
     * methods recursively split tasks so that disjoint sets of indices are
     * processed in parallel by different threads.
     * <p>
     * In addition to the three loop parameters begin, end, and step, a fourth
     * parameter chunk may be specified. This chunk parameter is a threshold for
     * splitting tasks so that they can be performed in parallel. If a range of
     * indices to be processed is smaller than the chunk size, or if too many tasks
     * have already been queued for processing, then the indices are processed
     * serially. Otherwise, the range is split into two parts for processing by new
     * tasks. If specified, the chunk size is a lower bound; the number of indices
     * processed serially will never be lower, but may be higher, than a specified
     * chunk size. The default chunk size is one.
     * <p>
     * The default chunk size is often sufficient, because the test for an excess
     * number of queued tasks prevents tasks from being split needlessly. This test
     * is especially useful when parallel loops are nested, as when looping over
     * elements of multi-dimensional arrays.
     * <p>
     * For example, an implementation of the method {@code sqrParallel} for 3D
     * arrays could simply call the 2D version listed above. Tasks will naturally
     * tend to be split for outer loops, but not inner loops, thereby reducing
     * overhead, time spent splitting and queueing tasks.
     * <p>
     * Reference: A Java Fork/Join Framework, by Doug Lea, describes the framework
     * used to implement this class. This framework will be part of JDK 7.
     * 
     * @author Dave Hale, Colorado School of Mines
     * @version 2010.11.23
     */
    public class Parallel
    {
    
        /** A loop body that computes something for an int index. */
        public interface LoopInt
        {
    
            /**
             * Computes for the specified loop index.
             * 
             * @param i
             *            loop index.
             */
            public void compute(int i);
        }
    
        /** A loop body that computes and returns a value for an int index. */
        public interface ReduceInt<V>
        {
    
            /**
             * Returns a value computed for the specified loop index.
             * 
             * @param i
             *            loop index.
             * @return the computed value.
             */
            public V compute(int i);
    
            /**
             * Returns the combination of two specified values.
             * 
             * @param v1
             *            a value.
             * @param v2
             *            a value.
             * @return the combined value.
             */
            public V combine(V v1, V v2);
        }
    
        /**
         * A wrapper for objects that are not thread-safe. Such objects have methods
         * that cannot safely be executed concurrently in multiple threads. To use
         * an unsafe object within a parallel computation, first construct an
         * instance of this wrapper. Then, within the compute method, get the unsafe
         * object; if null, construct and set a new unsafe object in this wrapper,
         * before using the unsafe object to perform the computation. This pattern
         * ensures that each thread computes using a distinct unsafe object. For
         * example,
         * 
         * <pre>
         * <code>
         * final Parallel.Unsafe&lt;Worker&gt; nts = new Parallel.Unsafe&lt;Worker&gt;();
         * Parallel.loop(count,new Parallel.LoopInt() {
         *   public void compute(int i) {
         *     Worker w = nts.get(); // get worker for the current thread
         *     if (w==null) nts.set(w=new Worker()); // if null, make one
         *     w.work(); // the method work need not be thread-safe
         *   }
         * });
         * </code>
         * </pre>
         * 
         * This wrapper is most useful when (1) the cost of constructing an unsafe
         * object is high, relative to the cost of each call to compute, and (2) the
         * number of threads calling compute is significantly lower than the total
         * number of such calls. Otherwise, if either of these conditions is false,
         * then simply construct a new unsafe object within the compute method.
         * <p>
         * This wrapper works much like the Java standard class ThreadLocal, except
         * that an object within this wrapper can be garbage-collected before its
         * thread dies. This difference is important because fork-join worker
         * threads are pooled and will typically die only when a program ends.
         */
        public static class Unsafe<T>
        {
    
            /**
             * Constructs a wrapper for objects that are not thread-safe.
             */
            public Unsafe()
            {
                int initialCapacity = 16; // the default initial capacity
                float loadFactor = 0.5f; // huge numbers of threads are unlikely
                int concurrencyLevel = 2 * _pool.getParallelism();
                _map = new ConcurrentHashMap<Thread, T>(initialCapacity,
                    loadFactor, concurrencyLevel);
            }
    
            /**
             * Gets the object in this wrapper for the current thread.
             * 
             * @return the object; null, of not yet set for the current thread.
             */
            public T get()
            {
                return _map.get(Thread.currentThread());
            }
    
            /**
             * Sets the object in this wrapper for the current thread.
             * 
             * @param object
             *            the object.
             */
            public void set(T object)
            {
                _map.put(Thread.currentThread(), object);
            }
    
            /**
             * Returns a collection of all unsafe objects in this wrapper. This
             * method is useful only after parallel loops have ended.
             * 
             * @return the collection of unsafe objects.
             */
            public Collection<T> getAll()
            {
                return _map.values();
            }
    
            private final ConcurrentHashMap<Thread, T> _map;
        }
    
        /**
         * Performs a loop <code>for (int i=0; i&lt;end; ++i)</code>.
         * 
         * @param end
         *            the end index (not included) for the loop.
         * @param body
         *            the loop body.
         */
        public static void loop(int end, LoopInt body)
        {
            loop(0, end, 1, 1, body);
        }
    
        /**
         * Performs a loop <code>for (int i=begin; i&lt;end; ++i)</code>.
         * 
         * @param begin
         *            the begin index for the loop; must be less than end.
         * @param end
         *            the end index (not included) for the loop.
         * @param body
         *            the loop body.
         */
        public static void loop(int begin, int end, LoopInt body)
        {
            loop(begin, end, 1, 1, body);
        }
    
        /**
         * Performs a loop <code>for (int i=begin; i&lt;end; i+=step)</code>.
         * 
         * @param begin
         *            the begin index for the loop; must be less than end.
         * @param end
         *            the end index (not included) for the loop.
         * @param step
         *            the index increment; must be positive.
         * @param body
         *            the loop body.
         */
        public static void loop(int begin, int end, int step, LoopInt body)
        {
            loop(begin, end, step, 1, body);
        }
    
        /**
         * Performs a loop <code>for (int i=begin; i&lt;end; i+=step)</code>.
         * 
         * @param begin
         *            the begin index for the loop; must be less than end.
         * @param end
         *            the end index (not included) for the loop.
         * @param step
         *            the index increment; must be positive.
         * @param chunk
         *            the chunk size; must be positive.
         * @param body
         *            the loop body.
         */
        public static void loop(int begin, int end, int step, int chunk,
            LoopInt body)
        {
            checkArgs(begin, end, step, chunk);
            if (_serial || end <= begin + chunk * step) {
                for (int i = begin; i < end; i += step) {
                    body.compute(i);
                }
            }
            else {
                LoopIntAction task = new LoopIntAction(begin, end, step, chunk,
                    body);
                if (LoopIntAction.inForkJoinPool()) {
                    task.invoke();
                }
                else {
                    _pool.invoke(task);
                }
            }
        }
    
        /**
         * Performs a reduce <code>for (int i=0; i&lt;end; ++i)</code>.
         * 
         * @param end
         *            the end index (not included) for the loop.
         * @param body
         *            the loop body.
         * @return the computed value.
         */
        public static <V> V reduce(int end, ReduceInt<V> body)
        {
            return reduce(0, end, 1, 1, body);
        }
    
        /**
         * Performs a reduce <code>for (int i=begin; i&lt;end; ++i)</code>.
         * 
         * @param begin
         *            the begin index for the loop; must be less than end.
         * @param end
         *            the end index (not included) for the loop.
         * @param body
         *            the loop body.
         * @return the computed value.
         */
        public static <V> V reduce(int begin, int end, ReduceInt<V> body)
        {
            return reduce(begin, end, 1, 1, body);
        }
    
        /**
         * Performs a reduce <code>for (int i=begin; i&lt;end; i+=step)</code>.
         * 
         * @param begin
         *            the begin index for the loop; must be less than end.
         * @param end
         *            the end index (not included) for the loop.
         * @param step
         *            the index increment; must be positive.
         * @param body
         *            the loop body.
         * @return the computed value.
         */
        public static <V> V reduce(int begin, int end, int step, ReduceInt<V> body)
        {
            return reduce(begin, end, step, 1, body);
        }
    
        /**
         * Performs a reduce <code>for (int i=begin; i&lt;end; i+=step)</code>.
         * 
         * @param begin
         *            the begin index for the loop; must be less than end.
         * @param end
         *            the end index (not included) for the loop.
         * @param step
         *            the index increment; must be positive.
         * @param chunk
         *            the chunk size; must be positive.
         * @param body
         *            the loop body.
         * @return the computed value.
         */
        public static <V> V reduce(int begin, int end, int step, int chunk,
            ReduceInt<V> body)
        {
            checkArgs(begin, end, step, chunk);
            if (_serial || end <= begin + chunk * step) {
                V v = body.compute(begin);
                for (int i = begin + step; i < end; i += step) {
                    V vi = body.compute(i);
                    v = body.combine(v, vi);
                }
                return v;
            }
            else {
                ReduceIntTask<V> task = new ReduceIntTask<V>(begin, end, step,
                    chunk, body);
                if (ReduceIntTask.inForkJoinPool()) {
                    return task.invoke();
                }
                else {
                    return _pool.invoke(task);
                }
            }
        }
    
        /**
         * Enables or disables parallel processing by all methods of this class. By
         * default, parallel processing is enabled. If disabled, all tasks will be
         * executed on the current thread.
         * <p>
         * <em>Setting this flag to false disables parallel processing for all
         * users of this class.</em> This method should therefore be used for
         * testing and benchmarking only.
         * 
         * @param parallel
         *            true, for parallel processing; false, otherwise.
         */
        public static void setParallel(boolean parallel)
        {
            _serial = !parallel;
        }
    
        // /
        // private
    
        // Implementation notes:
        // Each fork-join task below has a range of indices to be processed.
        // If the range is less than or equal to the chunk size, or if the
        // queue for the current thread holds too many tasks already, then
        // simply process the range on the current thread. Otherwise, split
        // the range into two parts that are approximately equal, ensuring
        // that the left part is at least as large as the right part. If the
        // right part is not empty, fork a new task. Then compute the left
        // part in the current thread, and, if necessary, join the right part.
    
        // Threshold for number of surplus queued tasks. Used below to
        // determine whether or not to split a task into two subtasks.
        private static final int NSQT = 6;
    
        // The pool shared by all fork-join tasks created through this class.
        private static ForkJoinPool _pool = new ForkJoinPool();
    
        // Serial flag; true for no parallel processing.
        private static boolean _serial = false;
    
        /**
         * Checks loop arguments.
         */
        private static void checkArgs(int begin, int end, int step, int chunk)
        {
            argument(begin < end, "begin<end");
            argument(step > 0, "step>0");
            argument(chunk > 0, "chunk>0");
        }
    
        public static void argument(boolean condition, String message)
        {
            if (!condition)
                throw new IllegalArgumentException("required condition: " + message);
        }
    
        /**
         * Splits range [begin:end) into [begin:middle) and [middle:end). The
         * returned middle index equals begin plus an integer multiple of step.
         */
        private static int middle(int begin, int end, int step)
        {
            return begin + step + ((end - begin - 1) / 2) / step * step;
        }
    
        /**
         * Fork-join task for parallel loop.
         */
        private static class LoopIntAction
            extends RecursiveAction
        {
            LoopIntAction(int begin, int end, int step, int chunk, LoopInt body)
            {
                assert begin < end : "begin < end";
                _begin = begin;
                _end = end;
                _step = step;
                _chunk = chunk;
                _body = body;
            }
    
            @Override
            protected void compute()
            {
                if (_end <= _begin + _chunk * _step
                    || getSurplusQueuedTaskCount() > NSQT) {
                    for (int i = _begin; i < _end; i += _step) {
                        _body.compute(i);
                    }
                }
                else {
                    int middle = middle(_begin, _end, _step);
                    LoopIntAction l = new LoopIntAction(_begin, middle, _step,
                        _chunk, _body);
                    LoopIntAction r = (middle < _end) ? new LoopIntAction(middle,
                        _end, _step, _chunk, _body) : null;
                    if (r != null)
                        r.fork();
                    l.compute();
                    if (r != null)
                        r.join();
                }
            }
    
            private final int _begin, _end, _step, _chunk;
            private final LoopInt _body;
        }
    
        /**
         * Fork-join task for parallel reduce.
         */
        private static class ReduceIntTask<V>
            extends RecursiveTask<V>
        {
            ReduceIntTask(int begin, int end, int step, int chunk, ReduceInt<V> body)
            {
                assert begin < end : "begin < end";
                _begin = begin;
                _end = end;
                _step = step;
                _chunk = chunk;
                _body = body;
            }
    
            @Override
            protected V compute()
            {
                if (_end <= _begin + _chunk * _step
                    || getSurplusQueuedTaskCount() > NSQT) {
                    V v = _body.compute(_begin);
                    for (int i = _begin + _step; i < _end; i += _step) {
                        V vi = _body.compute(i);
                        v = _body.combine(v, vi);
                    }
                    return v;
                }
                else {
                    int middle = middle(_begin, _end, _step);
                    ReduceIntTask<V> l = new ReduceIntTask<V>(_begin, middle,
                        _step, _chunk, _body);
                    ReduceIntTask<V> r = (middle < _end) ? new ReduceIntTask<V>(
                        middle, _end, _step, _chunk, _body) : null;
                    if (r != null)
                        r.fork();
                    V v = l.compute();
                    if (r != null)
                        v = _body.combine(v, r.join());
                    return v;
                }
            }
    
            private final int _begin, _end, _step, _chunk;
            private final ReduceInt<V> _body;
        }
    }
    展开全文
  • 卷积神经网络CNN的并行化研究 并行化 MapReduce框架 GPU加速
  • 为解决AprioriTid算法对大数据执行效率不高的问题,根据Hadoop平台的MapReduce模型,分析了AprioriTid算法的并行化方法,给出了并行化的主要步骤和Map、Reduce函数的描述。与串行的AprioriTid算法相比,并行算法利用...
  • 串行程序并行化

    2019-04-22 10:08:24
    串行程序并行化
  • 基于CUDA的图像预处理并行化研究
  • 程序并行化优化目前在PC平台上有一定运用,但在嵌入式平台上还很少,另外,嵌入式多核处理器与PC平台多核处理器有很大不同,因此不能直接将PC平台的并行化优化方法应用到嵌人式平台。本文分别从任务并行和缓存优化两...
  • OpenMp循环并行化

    2011-12-06 14:55:23
    循环并行化是使用OpenMp来并行化程序的最重要的部分,它是并行区域编程的一个特例。
  • MATLAB 并行化工具包是用于在 MATLAB 中进行并行计算的工具箱。 该工具箱的目的是提供在 MATLAB 中编写并行程序、在集群或多 CPU 机器上执行的简单方法。 该工具箱适用于进程间通信量较低的情况。 有几个高级命令...
  • 原子 并行化粒子物理引擎
  • PBRT并行化研究

    2020-09-24 08:56:35
    一、并行模型的设计 PBRT是由Matt Pharr 和Greg Humphreys完成的一个... 光线追踪算法的并行化,有两种基本思路:一是依次对每个像素点的光线追踪进行并行,同时执行多个相邻像素点的光线追踪,这种方法并行粒度较细
  • pso并行化算法.rar

    2019-06-27 11:16:59
    并行化实现了PSO算法,大大提高了算法的寻优效果。基于OpenMP和MPI两种方法实现。
  • 使用Intel Parallel Amplifier高性能工具,针对模糊C均值聚类算法在多核平台的性能问题,找出串行程序的热点和并发性,提出并行化设计方案。基于Intel并行库TBB(线程构建模块)和OpenMP运行时库函数,对多核平台下的...
  • 用Python设计实现了图像加密算法的并行化。包括串行代码和并行代码两部分。
  • 针对这一问题,提出了一种基于MapReduce模型的并行化k-medoids聚类算法,首先采用基于密度的聚类思想对k-medoids算法初始点的选取策略进行优化,并利用Hadoop平台下的MapReduce编程框架实现了算法的并行化处理。...
  • 计算机系研究生并行课程自己做的讲义。详细介绍了模拟退火算法的基本原理及如何进行并行化。用MPI和GPU等方法。附有有详细的名词解释,适合智能计算方向的小白。
  • 本文分别从任务并行和缓存优化两方面进行并行化优化的研究,探索在嵌人式多核处理器上对程序进行并行化优化的方法。 1 嵌入式多核处理器结构 嵌人式多核处理器的结构包括同构(SymmetrIC)和异构(Asymmetric)两种。...
  • 基于社区信息的链接预测及其并行化
  • 基于FPGA的反投影算法并行化实现

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 16,049
精华内容 6,419
关键字:

并行化