精华内容
下载资源
问答
  • 多线程换IP源码(调用鱼刺线程池) cc971113
  • 资源介绍:多线程换IP源码(调用鱼刺线程池)资源作者:cc971113资源界面:资源下载:
  • AgentGo 这是一个兼容 GTP 协议的围棋 AI 工具。 'master' 分支只使用蒙特卡洛方法,速度快且稳定。 “uct_WSY_final”分支保存了我们作为最终课程项目提交... 它比“主”分支的 AI 慢得,但要聪明得。 玩得开心!
  • CC++多线程文件传输v2.2完整源码(断点续传 网络消息收发 高效率 界面与后台线程交互)
  • c# 多线程2 源码

    2011-07-12 13:25:27
    public String ruleForSale(int num, int money)//同步的方法(同时对线程可见的方法)同时只能有1个线程运行 { String s = null; if (memontoes ) return "对不起,已经售完"; if (money == 5) { ...
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    
    namespace ConsoleApplication1
    {
        class Test
        {
            static SalesLady saleslady = new SalesLady(14, 0, 0);
            static EventWaitHandle wh = new AutoResetEvent(false);
            //static void Main()
            //{
            //    int[] moneies = { 10, 10, 5, 10, 5, 10, 5, 5, 10, 5, 10, 5, 5, 10, 5 };
            //    Thread[] aThreadArray = new Thread[20];
            //    Console.WriteLine("现在开始售票:");
            //    for (int i = 0; i < moneies.Length; i++)
            //    {
            //        CustomerClass cc = new CustomerClass(i + 1, moneies[i]);
            //        aThreadArray[i] = new Thread(cc.run);
            //        aThreadArray[i].Start();
            //        // Thread.Sleep(0);//当前线程让出时间片 如果有上个被唤醒的线程 让其先执行
            //    }
            //    for (int i = 0; i < moneies.Length; i++)
            //    {
            //        try
            //        {
            //            aThreadArray[i].Join();
            //        }
            //        catch (Exception e)
            //        {
            //            Console.WriteLine(e.ToString());
            //        }
            //    }
            //    Console.WriteLine("票已售完" + saleslady.memontoes);
            //    Console.Read();
            //}
    
    
            class SalesLady
            {
                public int memontoes, five, ten;
                public SalesLady(int m, int f, int t)
                {
                    memontoes = m;
                    five = f;
                    ten = t;
                }
                public String ruleForSale(int num, int money)//同步的方法(同时对多个线程可见的方法)同时只能有1个线程运行
                {
                     
                    String s = null;
                    if (memontoes <= 0)
                        return "对不起,已经售完";
                    if (money == 5)
                    {
                        memontoes--;
                        five++;
                        s = "给你票,你的钱正好。";
                        //notifyAll();   
                        wh.Set(); //唤醒
                    }
                    else if (money == 10)
                    {
                        while (five < 1)
                        {
                            try
                            {
                                Console.WriteLine("" + num + "号顾客用10元购票,请等待");
                                //wait();
                                wh.WaitOne(); //
                                Thread.Sleep(1);
                            }
                            catch (Exception e)
                            {
                                Console.WriteLine(e.ToString());
                            }
                            // 如果你的线程能够运行到这里,那么一定有一个five,此时就看哪个线程先被执行了
                            // 因为是同步方法,其中获得运行权利的线程,必须运行结束才会让其它的线程运行
                            // 所以当再次判断时,那个finve又没有了。
                            // 不会出现没有five却找零的问题。
                        }
                        // 如果你的线程能够运行到这里,那么其一定有一个five,
                        // 所以这个线程不会出现没有five而找零的问题
                        if (memontoes <= 0)
                        {
                            return "对不起,已经售完";
                        }
                        memontoes--;
                        five -= 1;
                        ten++;
                        s = "给你票,找你5元。";
                    }
                     
                    return s;
                }
            }
            // 顾客
            class CustomerClass
            {
                int num, money;
                public void run()
                {
                    Console.WriteLine("我是" + num + "号顾客,用" + money + "元购票,售票员说:"
                        + saleslady.ruleForSale(num, money));
    
                }
                public CustomerClass(int n, int m)
                {
                    num = n;
                    money = m;
                }
            }
        }
    }
    


    展开全文
  • 蚂蚁森林自动收集能量这个项目并不大,喜欢玩它的人也不,但是核心的一些实现方式很有推广意义。 本文主要解决三个问题 1. 如何准确的点击能量球 2. 如何判断好友是否有能量 3. 如何通过代码打开无障碍服务 一、...
  • 具有多线程(CPU)和OpenCL(GPU)的基于ZNCC的Depthmapping 基于GPU的计算的示例用法: $ ./zncc --use-gpu la 26.5.2018 10.32.04 +0300 :: maxdisp = 64; winsize = 09; thres = 08; nhood = 08, t_sg = 0....
  • 使用libcurl多线程下载的大文件的基本思想: 首选打开文件,将文件等分为指定的片段,使用http range下载,一个线程下载一个片段,当线程下载片段时,它们将数据写到打开文件的指定位置,类似BT文件下载的方式...
    使用libcurl多线程下载大文件的基本思想:
    
    首选打开文件,将文件等分为指定的片段,使用http range下载,一个线程下载一个片段,当线程下载片段时,它们将数据写到打开文件的指定位置,类似BT文件下载的方式(这样片段下载完成后不用再合并),当所有的子线程下载完成后,这个大文件也就随之下载完成了。
    下面是相关源码:
    //g++ -g curl_multithread_demo1.cpp -o curl_multithread_demo1 -lcurl -lpthread
    //./curl_multithread_demo1
    //说明: 该程序使用指定的线程数N来下载一个大文件,将该大文件等分为N+1个分片,每个线程一个分片, 使用range请求.下载完成,子线程退出, 线程数减一
    //主线程等到所有的子线程都退出后, 意味着文件下载完成,就关闭文件等待退出.每个线程下载的数据存放在文件的指定位置, 1个master线程, N+1个work线程
    //为啥使用互斥锁?因为它包含线程的计数, 文件的写入.这里对文件的读写比较赞, 虽然是分开下载,但是对文件没有单独存放,省去了最后的合并过程.
    //
    
    #include <iostream>
    #include <string>
    #include <unistd.h>
    #include <pthread.h>
    #include <curl/curl.h>
    
    using namespace std;
    
    struct tNode
    {
    	FILE *fp;
    	long startPos;
    	long endPos;
    	void *curl;
    	pthread_t tid;
    };
    
    int threadCnt = 0;
    static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
    
    static size_t writeFunc (void *ptr, size_t size, size_t nmemb, void *userdata)
    {
    	tNode *node = (tNode *) userdata;
    	size_t written = 0;
    	pthread_mutex_lock (&g_mutex);
    	if (node->startPos + size * nmemb <= node->endPos)
    	{
    		fseek (node->fp, node->startPos, SEEK_SET);
    		written = fwrite (ptr, size, nmemb, node->fp);
    		node->startPos += size * nmemb;
    	}
    	else
    	{
    		fseek (node->fp, node->startPos, SEEK_SET);
    		written = fwrite (ptr, 1, node->endPos - node->startPos + 1, node->fp);
    		node->startPos = node->endPos;
    	}
    	pthread_mutex_unlock (&g_mutex);
    	return written;
    }
    
    int progressFunc (void *ptr, double totalToDownload, double nowDownloaded, double totalToUpLoad, double nowUpLoaded)
    {
    	int percent = 0;
    	if (totalToDownload > 0)
    	{
    		percent = (int) (nowDownloaded / totalToDownload * 100);
    	}
    
        if(percent % 20 == 0)
    	    printf ("下载进度%0d%%\n", percent);
    	return 0;
    }
    
    /************************************************************************/
    /* 获取要下载的远程文件的大小 											*/
    /************************************************************************/
    long getDownloadFileLenth (const char *url)
    {
    	double downloadFileLenth = 0;
    	CURL *handle = curl_easy_init ();
    	curl_easy_setopt (handle, CURLOPT_URL, url);
    	curl_easy_setopt (handle, CURLOPT_HEADER, 1);	//只需要header头
    	curl_easy_setopt (handle, CURLOPT_NOBODY, 1);	//不需要body
    	if (curl_easy_perform (handle) == CURLE_OK)
    	{
    		curl_easy_getinfo (handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &downloadFileLenth);
    	}
    	else
    	{
    		downloadFileLenth = -1;
    	}
    	return downloadFileLenth;
    }
    
    void *workThread (void *pData)
    {
    	tNode *pNode = (tNode *) pData;
    
    	int res = curl_easy_perform (pNode->curl);
    
    	if (res != 0)
    	{
    
    	}
    
    	curl_easy_cleanup (pNode->curl);
    
    	pthread_mutex_lock (&g_mutex);
    	threadCnt--;
    	printf ("thred %ld exit\n", pNode->tid);
    	pthread_mutex_unlock (&g_mutex);
    	delete pNode;
    	pthread_exit (0);
    
    	return NULL;
    }
    
    bool downLoad (int threadNum, string Url, string Path, string fileName)
    {
    	long fileLength = getDownloadFileLenth (Url.c_str ());
    
    	if (fileLength <= 0)
    	{
    		printf ("get the file length error...");
    		return false;
    	}
    
    	// Create a file to save package.
    	const string outFileName = Path + fileName;
    	FILE *fp = fopen (outFileName.c_str (), "wb");
    	if (!fp)
    	{
    		return false;
    	}
    
    	long partSize = fileLength / threadNum;
    
    	for (int i = 0; i <= threadNum; i++)
    	{
    		tNode *pNode = new tNode ();
    
    		if (i < threadNum)
    		{
    			pNode->startPos = i * partSize;
    			pNode->endPos = (i + 1) * partSize - 1;
    		}
    		else
    		{
    			if (fileLength % threadNum != 0)
    			{
    				pNode->startPos = i * partSize;
    				pNode->endPos = fileLength - 1;
    			}
    			else
    				break;
    		}
    
    		CURL *curl = curl_easy_init ();
    
    		pNode->curl = curl;
    		pNode->fp = fp;
    
    		char range[64] = { 0 };
    		snprintf (range, sizeof (range), "%ld-%ld", pNode->startPos, pNode->endPos);
    
    		// Download pacakge
    		curl_easy_setopt (curl, CURLOPT_URL, Url.c_str ());
    		curl_easy_setopt (curl, CURLOPT_WRITEFUNCTION, writeFunc);
    		curl_easy_setopt (curl, CURLOPT_WRITEDATA, (void *) pNode);
    		curl_easy_setopt (curl, CURLOPT_NOPROGRESS, 0L);
    		curl_easy_setopt (curl, CURLOPT_PROGRESSFUNCTION, progressFunc);
    		curl_easy_setopt (curl, CURLOPT_NOSIGNAL, 1L);
    		curl_easy_setopt (curl, CURLOPT_LOW_SPEED_LIMIT, 1L);
    		curl_easy_setopt (curl, CURLOPT_LOW_SPEED_TIME, 5L);
    		curl_easy_setopt (curl, CURLOPT_RANGE, range);
    
    		pthread_mutex_lock (&g_mutex);
    		threadCnt++;
    		pthread_mutex_unlock (&g_mutex);
    		int rc = pthread_create (&pNode->tid, NULL, workThread, pNode);
    	}
    
    	while (threadCnt > 0)
    	{
    		usleep (1000000L);
    	}
    
    	fclose (fp);
    
    	printf ("download succed......\n");
    	return true;
    }
    
    int main (int argc, char *argv[])
    {
    //	downLoad (10,
    //		"http://101.26.37.79/ws.cdn.baidupcs.com/file/2c72878c8a5731a27f6d0a6018173520?xcode=ccef659b8500cc28f5ca86d0cbb8d4c6e6c008630559e8680b2977702d3e6764&fid=335809860-250528-463118344000947&time=1410621706&sign=FDTAXER-DCb740ccc5511e5e8fedcff06b081203-zXbBKRSs5knf%2BKll6uykeWpQoTY%3D&to=cb&fm=Nin,B,U,nc&sta_dx=105&sta_cs=98&sta_ft=mp4&sta_ct=3&newver=1&newfm=1&flow_ver=3&expires=8h&rt=pr&r=487142437&mlogid=3863405498&vuk=335809860&vbdid=140272377&fn=opclass.com-%E4%BA%92%E8%81%94%E7%BD%91%E6%97%B6%E4%BB%A3%E7%AC%AC1%E9%9B%86%EF%BC%9A%E6%97%B6%E4%BB%A3.mp4&wshc_tag=0&wsts_tag=5414610a&wsid_tag=72f52ad0&wsiphost=ipdbm",
    //		"./", "Network_Age_1.mp4");
    	downLoad (10,
    		"http://139.209.90.30/ws.cdn.baidupcs.com/file/03f85133cb241c57cc17f5baf66b9820?xcode=e38bc1881ff679abc91893c2710fc81ac7f100988f506460837047dfb5e85c39&fid=335809860-250528-571147930138020&time=1410622082&sign=FDTAXER-DCb740ccc5511e5e8fedcff06b081203-cJtuWcK6QQghdq9RC%2F%2F4eJQ39gU%3D&to=cb&fm=Nin,B,U,nc&sta_dx=105&sta_cs=89&sta_ft=mp4&sta_ct=3&newver=1&newfm=1&flow_ver=3&expires=8h&rt=pr&r=588575374&mlogid=1816906087&vuk=335809860&vbdid=140272377&fn=opclass.com-%E4%BA%92%E8%81%94%E7%BD%91%E6%97%B6%E4%BB%A3%E7%AC%AC2%E9%9B%86%EF%BC%9A%E6%B5%AA%E6%BD%AE.mp4&wshc_tag=0&wsts_tag=54146282&wsid_tag=72f52ad0&wsiphost=ipdbm",
    		"./", "Network_Age_2.mp4");
    //	downLoad (10,
    //		"http://121.18.230.69/ws.cdn.baidupcs.com/file/e2b36423e8f1cc4019d77598e32870f5?xcode=d60f60ab1a1746111db642272c0f9726fcfce98aadfaf01d0b2977702d3e6764&fid=335809860-250528-544759395965909&time=1410617631&sign=FDTAXER-DCb740ccc5511e5e8fedcff06b081203-eirgEMQqOKx5ssfzMt%2Ft0JEbvM0%3D&to=cb&fm=Nin,B,U,nc&sta_dx=173&sta_cs=74&sta_ft=mp4&sta_ct=3&newver=1&newfm=1&flow_ver=3&expires=8h&rt=pr&r=943427300&mlogid=3977494833&vuk=335809860&vbdid=140272377&fn=opclass.com-%E4%BA%92%E8%81%94%E7%BD%91%E6%97%B6%E4%BB%A3%E7%AC%AC10%E9%9B%86%EF%BC%9A%E7%9C%BA%E6%9C%9B.mp4&wshc_tag=0&wsts_tag=54145120&wsid_tag=72f52ad0&wsiphost=ipdbm",
    //		"./", "Network_Age_10.mp4");
    	//downLoad(10, "http://ardownload.adobe.com/pub/adobe/reader/win/11.x/11.0.01/en_US/AdbeRdr11001_en_US.exe", "./", "AdbeRdr11001_en_US.exe");
    
    	getchar ();
    	return 0;
    }

    使用方法:

    在main函数中, 指定要下载的线程数, 下载大文件的url, 存放在本地的文件目录(末尾需要加/), 待存放的文件名, 这几个都写死在代码, 然后编译并运行

    g++ -g curl_multithread_demo1.cpp -o curl_multithread_demo1 -lcurl -lpthread
    ./curl_multithread_demo1

    下载后的文件目录


    文件完整性验证:

    为了验证代码的准确性,我选用百度云盘上的视频文件《互联网时代》纪录片来进行下载,下载完成后,我使用vlc播放器来播放,发现播放是正常的,这说明程序没有问题。

    我们也可以下载Ubuntu 14.04.1官网的iso,下载完成后,使用md5check来检查下载文件的md5值与官网提供的是否相同。

    要注意的问题
    基本上,每个线程都应该有自己的easy handle用于数据通信(如果需要的话)。千万不要在多线程之间共享同一个easy handle。
    待解决的问题:
    怎样通过预生成的线程池来下载呢?

    参考文献
    [1].http://blog.csdn.net/zmy12007/article/details/37675331
    展开全文
  •  Fork/Join就是将一个大任务分解(fork)成许多个独立的小任务,然后多线程并行去处理这些小任务,每个小任务处理完得到结果再进行合并(join)得到最终的结果。  流程:任务继承RecursiveTask,重写compute方法,...

    一.概念

      Fork/Join就是将一个大任务分解(fork)成许多个独立的小任务,然后多线程并行去处理这些小任务,每个小任务处理完得到结果再进行合并(join)得到最终的结果。

      流程:任务继承RecursiveTask,重写compute方法,使用ForkJoinPool的submit提交任务,任务在某个线程中运行,工作任务中的compute方法的代码开始对任务进行分析,如果符合条件就进行任务拆分,拆分成多个子任务,每个子任务进行数据的计算或操作,得到结果返回给上一层任务开启线程进行合并,最终通过get获取整体处理结果。【只能将任务1个切分为两个,不能切分为3个或其他数量

    • ForkJoinTask:代表fork/join里面的任务类型,一般用它的两个子类RecursiveTask(任务有返回值)和RecursiveAction(任务没有返回值),任务的处理逻辑包括任务的切分都是在重写compute方法里面进行处理。只有ForkJoinTask任务可以被拆分运行和合并运行。可查看上篇Future源码分析的类图结构】【ForkJoinTask使用了模板模式进行设计,将ForkJoinTask的执行相关代码进行隐藏,通过提供抽象类(即子类RecursiveTask、RecursiveAction)暴露用户的实际业务处理。】
      • RecursiveTask:在进行exec之后会使用一个result的变量进行接受返回的结果;
        public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
            V result;
            protected abstract V compute();
        
            public final V getRawResult() {
                return result;
            }
        
            protected final void setRawResult(V value) {
                result = value;
            }
            protected final boolean exec() {
                result = compute();
                return true;
            }
        
        }
      • RecursiveAction:在进行exec之后没有返回结果;
        public abstract class RecursiveAction extends ForkJoinTask<Void> {
           
            protected abstract void compute();
        
            public final Void getRawResult() { return null; }
        
            protected final void setRawResult(Void mustBeNull) { }
        
            protected final boolean exec() {
                compute();
                return true;
            }
        
        } 
    • ForkJoinPool:fork/join框架的管理者,最原始的任务都要交给它来处理。它负责控制整个fork/join有多少个工作线程,工作线程的创建、机会都是由它来控制。它还负责workQueue队列的创建和分配,每当创建一个工作线程,它负责分配对应的workQueue,然后它把接到的活都交给工作线程去处理。是整个fork/join的容器。
      • ForkJoinPool.WorkQueue:双端队列,负责存储接收的任务;
    • ForkJoinWorkerThread:fork/join里面真正干活的”工人“,它继承了Thread,所以本质是一个线程。它有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue,然后就从workQueue里面拿任务出来处理。它是依附于ForkJoinPool而存活,如果ForkJoinPool销毁了,它也会跟着结束。【每一个ForkJoinWorkerThread线程都具有一个独立的任务等待队列workQueue。】
      • 当使用ForkJoinPool进行submit任务提交时,创建1个workQueue将任务放进去,然后进行fork任务切分,如果切分后的任务放的进去之前的workQueue就放进去,不行就随机选取workQueue放进去,如果还放不了就创建一个新的workQueue放进去;
        public class ForkJoinWorkerThread extends Thread {
            final ForkJoinPool pool;
            final ForkJoinPool.WorkQueue workQueue;
            protected ForkJoinWorkerThread(ForkJoinPool pool) {
                super("aForkJoinWorkerThread");
                this.pool = pool;
                this.workQueue = pool.registerWorker(this);//向ForkJoinPool执行池注册当前工作线程,ForkJoinPool为其分配一个工作队列
            }
        }

    二.用法

      以前1+2+3+...+100这样的处理可以用for循环处理,现在使用fork/join来处理:从下面结果可以看到,大任务被不断的拆分成小任务,然后添加到工作线程的队列中,每个小任务都会被工作线程从队列中取出进行运行,然后每个小任务的结果的合并也由工作线程执行,然后不断的汇总成最终结果。【task通过ForkJoinPool来执行,分割的子任务添加到当前工作线程的队列中,进入队列的头部,当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务。(工作窃取:当前工作线程对应的队列中没有任务了,从其他工作线程对应的队列中取出任务进行操作,然后将操作结果返还给对应队列的线程。)】

    public class MyFrokJoinTask extends RecursiveTask<Integer> {
        private int begin;
        private int end;
    
        public MyFrokJoinTask(int begin, int end) {
            this.begin = begin;
            this.end = end;
        }
    
        public static void main(String[] args) throws Exception {
            ForkJoinPool pool = new ForkJoinPool();
            ForkJoinTask<Integer> result = pool.submit(new MyFrokJoinTask(1, 100));//提交任务
            System.out.println("计算的值:"+result.get());//得到最终的结果
    
        }
    
        @Override
        protected Integer compute() {
            int sum = 0;
            if (end - begin <= 2) {
                for (int i = begin; i <= end; i++) {
                    sum += i;
                    System.out.println("i:"+i);
                }
            } else {
                MyFrokJoinTask d1 = new MyFrokJoinTask(begin, (begin + end) / 2);
                MyFrokJoinTask d2 = new MyFrokJoinTask((begin + end) / 2+1, end);
                d1.fork();//任务拆分
                d2.fork();//任务拆分
                Integer a = d1.join();//每个任务的结果
                Integer b = d2.join();//每个任务的结果
                sum = a + b;//汇总任务结果
                System.out.println("sum:" + sum + ",a:" + a + ",b:" + b);
            }
            System.out.println("name:"+Thread.currentThread().getName());
            return sum;
        }
    }
    //=========结果============
    i:1
    i:2
    name:ForkJoinPool-1-worker-1
    i:3
    i:4
    name:ForkJoinPool-1-worker-1
    sum:10,a:3,b:7
    name:ForkJoinPool-1-worker-1
    i:5
    i:6
    i:7
    name:ForkJoinPool-1-worker-1
    sum:28,a:10,b:18
    name:ForkJoinPool-1-worker-1
    ...............
    ...............
    sum:91,a:28,b:63
    sum:99,a:45,b:54
    name:ForkJoinPool-1-worker-3
    name:ForkJoinPool-1-worker-1
    i:23
    i:24
    i:25
    name:ForkJoinPool-1-worker-2
    sum:135,a:63,b:72
    name:ForkJoinPool-1-worker-2
    sum:234,a:99,b:135
    name:ForkJoinPool-1-worker-3
    sum:325,a:91,b:234
    name:ForkJoinPool-1-worker-1
    sum:1275,a:325,b:950
    name:ForkJoinPool-1-worker-1
    sum:5050,a:1275,b:3775
    name:ForkJoinPool-1-worker-1
    计算的值:5050

    三.分析

      ForkJoinPool

    ForkJoinPool forkJoinPool = new ForkJoinPool();
    //Runtime.getRuntime().availableProcessors()当前操作系统可以使用的CPU内核数量
    public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }
    //this调用到下面这段代码
    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism), //并行度
                checkFactory(factory), //工作线程创建工厂
                handler, //异常处理handler
                asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //任务队列出队模式 异步:先进先出,同步:后进先出
                "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }
    //上面的this最终调用到下面这段代码
    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }
    • parallelism:可并行数量,fork/join框架将依据这个并行数量的设定,决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理;
    • factory:当fork/join创建一个新的线程时,同样会用到线程创建工厂。它实现了ForkJoinWorkerThreadFactory接口,使用默认的的接口实现类DefaultForkJoinWorkerThreadFactory来实现newThread方法创建一个新的工作线程;
      public static interface ForkJoinWorkerThreadFactory {
              /**
               * Returns a new worker thread operating in the given pool.
               */
              public ForkJoinWorkerThread newThread(ForkJoinPool pool);
          }
      
          static final class DefaultForkJoinWorkerThreadFactory
              implements ForkJoinWorkerThreadFactory {
              public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                  return new ForkJoinWorkerThread(pool);
              }
          }
    • handler:异常捕获处理器。当执行的任务出现异常,并从任务中被抛出时,就会被handler捕获;
    • asyncMode:fork/join为每一个独立的工作线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。即可以使用先进先出的工作模式,也可以使用后进先出的工作模式;

       Fork()和Join()

      fork/join框架中提供的fork()和join()是最重要的两个方法,它们和parallelism(”可并行任务数量“)配合工作,可以导致拆分的子任务T1.1、T1.2甚至TX在fork/join中不同的运行效果(上面1+2....+100的每次运行的子任务都是不同的)。即TX子任务或等待其他已存在的线程运行关联的子任务(sum操作),或在运行TX的线程中”递归“执行其他任务(将1-50进行拆分后的子任务递归运行),或启动一个新的线程执行子任务(运行1-50另一边拆分的任务,即50-100的子任务)。

      fork()用于将新创建的子任务放入当前线程的workQueue队列中,fork/join框架将根据当前正在并发执行ForkJoinTask任务的ForkJoinWorkerThread线程状态,决定是让这个任务在队列中等待,还是创建一个新的ForkJoinWorkedThread线程运行它,又或者是唤起其他正在等待任务的ForkJoinWorkerThread线程运行它。

      join()用于让当前线程阻塞,直到对应的子任务完成运行并返回执行结果。或者,如果这个子任务存在于当前线程的任务等待队列workQueue中,则取出这个子任务进行”递归“执行,其目的是尽快得到当前子任务的运行结果,然后继续执行。

      提交任务:

    1.  sumbit的第一次提交:ForkJoinPool.submit(ForkJoinTask<T> task) -> externalPush(task) -> externalSubmit(task)

      1. submit:

        public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
                if (task == null)
                    throw new NullPointerException();
                externalPush(task);
                return task;
            }
        
            public <T> ForkJoinTask<T> submit(Callable<T> task) {
                ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
                externalPush(job);
                return job;
            }
        
            public <T> ForkJoinTask<T> submit(Runnable task, T result) {
                ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
                externalPush(job);
                return job;
            }
        
            public ForkJoinTask<?> submit(Runnable task) {
                if (task == null)
                    throw new NullPointerException();
                ForkJoinTask<?> job;
                if (task instanceof ForkJoinTask<?>) // avoid re-wrap
                    job = (ForkJoinTask<?>) task;
                else
                    job = new ForkJoinTask.AdaptedRunnableAction(task);
                externalPush(job);
                return job;
            }
      2. externalPush:将任务添加到随机选取的队列中或新创建的队列中;
        final void externalPush(ForkJoinTask<?> task) {
                WorkQueue[] ws; WorkQueue q; int m;
                int r = ThreadLocalRandom.getProbe();//当前线程的一个随机数
                int rs = runState;//当前容器的状态
                //如果随机选取的队列还有空位置可以存放、队列加锁锁定成功,任务就放入队列中
                if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                    (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                    U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                    ForkJoinTask<?>[] a; int am, n, s;
                    if ((a = q.array) != null &&
                        (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                        int j = ((am & s) << ASHIFT) + ABASE;
                        U.putOrderedObject(a, j, task);//任务加入队列中
                        U.putOrderedInt(q, QTOP, s + 1);//挪动下次任务存放的槽的位置
                        U.putIntVolatile(q, QLOCK, 0);//队列解锁
                        if (n <= 1)//当前数组元素少时,进行唤醒当前线程;或者当没有活动线程或线程数较少时,添加新的线程
                            signalWork(ws, q);
                        return;
                    }
                    U.compareAndSwapInt(q, QLOCK, 1, 0);//队列解锁
                }
                externalSubmit(task);//升级版的externalPush
            }
        
        
            volatile int runState;               // lockable status锁定状态
            // runState: SHUTDOWN为负数,其他的为2的次幂
            private static final int  RSLOCK     = 1;
            private static final int  RSIGNAL    = 1 << 1;//唤醒
            private static final int  STARTED    = 1 << 2;//启动
            private static final int  STOP       = 1 << 29;//停止
            private static final int  TERMINATED = 1 << 30;//结束
            private static final int  SHUTDOWN   = 1 << 31;//关闭
      3. externalSubmit:队列添加任务失败,进行升级版操作,即创建队列数组和创建队列后,将任务放入新创建的队列中;
        private void externalSubmit(ForkJoinTask<?> task) {
            int r;                                    // initialize caller's probe
            if ((r = ThreadLocalRandom.getProbe()) == 0) {
                ThreadLocalRandom.localInit();
                r = ThreadLocalRandom.getProbe();
            }
            for (;;) {//自旋
                WorkQueue[] ws; WorkQueue q; int rs, m, k;
                boolean move = false;
                /**
                *ForkJoinPool执行器停止工作了,抛出异常
                *ForkJoinPool extends AbstractExecutorService
                *abstract class AbstractExecutorService implements ExecutorService
                *interface ExecutorService extends Executor
                *interface Executor执行提交的对象Runnable任务
                */
                if ((rs = runState) < 0) {
                    tryTerminate(false, false);    // help terminate
                    throw new RejectedExecutionException();
                }
                //第一次遍历,队列数组未创建,进行创建
                else if ((rs & STARTED) == 0 ||     // initialize初始化
                         ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                    int ns = 0;
                    rs = lockRunState();
                    try {
                        if ((rs & STARTED) == 0) {
                            U.compareAndSwapObject(this, STEALCOUNTER, null,
                                                   new AtomicLong());
                            // create workQueues array with size a power of two
                            int p = config & SMASK; // ensure at least 2 slots,config是CPU核数
                            int n = (p > 1) ? p - 1 : 1;
                            n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                            n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                            workQueues = new WorkQueue[n];//创建
                            ns = STARTED;
                        }
                    } finally {
                        unlockRunState(rs, (rs & ~RSLOCK) | ns);
                    }
                }
                //第三次遍历,把任务放入队列中
                else if ((q = ws[k = r & m & SQMASK]) != null) {
                    if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                        ForkJoinTask<?>[] a = q.array;
                        int s = q.top;
                        boolean submitted = false; // initial submission or resizing
                        try {                      // locked version of push
                            if ((a != null && a.length > s + 1 - q.base) ||
                                (a = q.growArray()) != null) {
                                int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                                U.putOrderedObject(a, j, task);
                                U.putOrderedInt(q, QTOP, s + 1);
                                submitted = true;
                            }
                        } finally {
                            U.compareAndSwapInt(q, QLOCK, 1, 0);
                        }
                        if (submitted) {
                            signalWork(ws, q);
                            return;
                        }
                    }
                    move = true;                   // move on failure
                }
                //第二次遍历,队列数组为空,创建队列
                else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                    q = new WorkQueue(this, null);
                    q.hint = r;
                    q.config = k | SHARED_QUEUE;
                    q.scanState = INACTIVE;
                    rs = lockRunState();           // publish index
                    if (rs > 0 &&  (ws = workQueues) != null &&
                        k < ws.length && ws[k] == null)
                        ws[k] = q;                 // else terminated
                    unlockRunState(rs, rs & ~RSLOCK);
                }
                else
                    move = true;                   // move if busy
                if (move)
                    r = ThreadLocalRandom.advanceProbe(r);
            }
        }
    2. fork任务切分的提交:ForkJoinTask.fork() -> ForkJoinWorkerThread.workQueue.push(task)/ForkJoinPool.common.externalPush(task) -> ForkJoinPool.push(task)/externalPush(task)

      1. fork:
        public final ForkJoinTask<V> fork() {
                Thread t;
                if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)//当前线程是workerThread,任务直接放入workerThread当前的workQueue
                    ((ForkJoinWorkerThread)t).workQueue.push(this);
                else
                    ForkJoinPool.common.externalPush(this);//将任务添加到随机选取的队列中或新创建的队列中
                return this;
            }
      2.  push:

        public class ForkJoinPool extends AbstractExecutorService {
                static final class WorkQueue {
                    final void push(ForkJoinTask<?> task) {
                        ForkJoinTask<?>[] a; ForkJoinPool p;
                        int b = base, s = top, n;
                        if ((a = array) != null) {    // ignore if queue removed,队列被移除忽略
                            int m = a.length - 1;     // fenced write for task visibility
                            U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);//任务加入队列中
                            U.putOrderedInt(this, QTOP, s + 1);//挪动下次任务存放的槽的位置
                            if ((n = s - b) <= 1) {//当前数组元素少时,进行唤醒当前线程;或者当没有活动线程或线程数较少时,添加新的线程
                                if ((p = pool) != null)
                                    p.signalWork(p.workQueues, this);
                            }
                            else if (n >= m)//数组所有元素都满了进行2倍扩容
                                growArray();
                        }
                    }
                    final ForkJoinTask<?>[] growArray() {
                        ForkJoinTask<?>[] oldA = array;
                        int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;//2倍扩容或初始化
                        if (size > MAXIMUM_QUEUE_CAPACITY)
                            throw new RejectedExecutionException("Queue capacity exceeded");
                        int oldMask, t, b;
                        ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
                        if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
                            (t = top) - (b = base) > 0) {
                            int mask = size - 1;
                            do { // emulate poll from old array, push to new array遍历从旧数组中取出放到新数组中
                                ForkJoinTask<?> x;
                                int oldj = ((b & oldMask) << ASHIFT) + ABASE;
                                int j    = ((b &    mask) << ASHIFT) + ABASE;
                                x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);//从旧数组中取出
                                if (x != null &&
                                    U.compareAndSwapObject(oldA, oldj, x, null))//将旧数组取出的位置的对象置为null
                                    U.putObjectVolatile(a, j, x);//放入新数组
                            } while (++b != t);
                        }
                        return a;
                    }
                }
            }

      任务的消费

      任务的消费的执行链路是ForkJoinTask.doExec() -> RecursiveTask.exec()/RecursiveAction.exec() -> 覆盖重写的compute()

    1.  doExec:任务的执行入口

      final int doExec() {
              int s; boolean completed;
              if ((s = status) >= 0) {
                  try {
                      completed = exec();//消费任务
                  } catch (Throwable rex) {
                      return setExceptionalCompletion(rex);
                  }
                  if (completed)
                      s = setCompletion(NORMAL);//任务执行完设置状态为NORMAL,并唤醒其他等待任务
              }
              return s;
          }
          protected abstract boolean exec();
          private int setCompletion(int completion) {
              for (int s;;) {
                  if ((s = status) < 0)
                      return s;
                  if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {//任务状态修改为NORMAL
                      if ((s >>> 16) != 0)//状态不是SMASK
                          synchronized (this) { notifyAll(); }//唤醒其他等待任务
                      return completion;
                  }
              }
          }
          /** The run status of this task 任务的运行状态*/
          volatile int status; // accessed directly by pool and workers由ForkJoinPool池或ForkJoinWorkerThread控制
          static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
          static final int NORMAL      = 0xf0000000;  // must be negative
          static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
          static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
          static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
          static final int SMASK       = 0x0000ffff;  // short bits for tags

      任务真正执行处理逻辑

      任务提交到ForkJoinPool,最终真正的是由继承Thread的ForkJoinWorkerThread的run方法来执行消费任务的,ForkJoinWorkerThread处理哪个任务是由join来出队的;

      1. ForkJoinTask.join()

            public final V join() {
                int s;
                if ((s = doJoin() & DONE_MASK) != NORMAL)
                    reportException(s);
                return getRawResult();//得到返回结果
            }
            private int doJoin() {
                int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
                /**
                 * (s = status) < 0 判断任务是否已经完成,完成直接返回s
                 * 任务未完成:
                 *          1)线程是ForkJoinWorkerThread,tryUnpush任务出队然后消费任务doExec
                 *              1.1)出队或消费失败,执行awaitJoin进行自旋,如果任务状态是完成就退出,否则继续尝试出队,直到任务完成或超时为止;
                 *          2)如果线程不是ForkJoinWorkerThread,执行externalAwaitDone进行出队消费
                 */
                return (s = status) < 0 ? s :
                    ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                    (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                    tryUnpush(this) && (s = doExec()) < 0 ? s :
                    wt.pool.awaitJoin(w, this, 0L) :
                    externalAwaitDone();
            }
            private void reportException(int s) {
                if (s == CANCELLED)//取消
                    throw new CancellationException();
                if (s == EXCEPTIONAL)//异常
                    rethrow(getThrowableException());
            }
        1. awaitJoin:
              public class ForkJoinPool{
                  final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
                      int s = 0;
                      if (task != null && w != null) {
                          ForkJoinTask<?> prevJoin = w.currentJoin;
                          U.putOrderedObject(w, QCURRENTJOIN, task);
                          CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                              (CountedCompleter<?>)task : null;
                          for (;;) {
                              if ((s = task.status) < 0)//任务完成退出
                                  break;
                              if (cc != null)//当前任务即将完成,检查是否还有其他的等待任务,如果有
                                  //运行当前队列的其他任务,若当前的队列中没有任务了,则窃取其他队列的任务并运行
                                  helpComplete(w, cc, 0);
                              //当前队列没有任务了,或遍历当前队列有没有任务,如果有且在top端取出来运行,或在队列中间使用EmptyTask替代原位置取出来运行,如果没有,执行helpStealer
                              else if (w.base == w.top || w.tryRemoveAndExec(task))
                                  helpStealer(w, task);//窃取其他队列的任务
                              if ((s = task.status) < 0)
                                  break;
                              long ms, ns;
                              if (deadline == 0L)
                                  ms = 0L;
                              else if ((ns = deadline - System.nanoTime()) <= 0L)//超时退出
                                  break;
                              else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                                  ms = 1L;
                              if (tryCompensate(w)) {//当前队列阻塞了
                                  task.internalWait(ms);//进行等待
                                  U.getAndAddLong(this, CTL, AC_UNIT);
                              }
                          }
                          U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
                      }
                      return s;
                  }
              }
        2. externalAwaitDone:
              private int externalAwaitDone() {
                  /**
                  *   当前任务是CountedCompleter
                  *   1)是则执行ForkJoinPool.common.externalHelpComplete()
                  *   2)否则执行ForkJoinPool.common.tryExternalUnpush(this)进行任务出队
                  *       2.1)出队成功,进行doExec()消费,否则进行阻塞等待
                  */
                  int s = ((this instanceof CountedCompleter) ? // try helping
                           ForkJoinPool.common.externalHelpComplete(
                               (CountedCompleter<?>)this, 0) :
                           ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
                  if (s >= 0 && (s = status) >= 0) {//任务未完成
                      boolean interrupted = false;
                      do {
                          if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {//任务状态标记为SIGNAL
                              synchronized (this) {
                                  if (status >= 0) {
                                      try {
                                          wait(0L);//阻塞等待
                                      } catch (InterruptedException ie) {//有中断异常
                                          interrupted = true;//设置中断标识为true
                                      }
                                  }
                                  else
                                      notifyAll();//任务完成唤醒其他任务
                              }
                          }
                      } while ((s = status) >= 0);
                      if (interrupted)
                          Thread.currentThread().interrupt();//当前线程进行中断
                  }
                  return s;
              }
              final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
                  WorkQueue[] ws; int n;
                  int r = ThreadLocalRandom.getProbe();
                  //没有任务直接结束,有任务则执行helpComplete
                  //helpComplete:运行随机选取的队列的任务,若选取的队列中没有任务了,则窃取其他队列的任务并运行
                  return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 :
                      helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks);
              } 
    1. run和工作窃取

      任务是由workThread来窃取的,workThread是一个线程。线程的所有逻辑都是由run()方法执行:

    public class ForkJoinWorkerThread extends Thread {
        public void run() {
            if (workQueue.array == null) { // only run once
                Throwable exception = null;
                try {
                    onStart();//初始化状态
                    pool.runWorker(workQueue);//处理任务队列
                } catch (Throwable ex) {
                    exception = ex;//记录异常
                } finally {
                    try {
                        onTermination(exception);
                    } catch (Throwable ex) {
                        if (exception == null)
                            exception = ex;
                    } finally {
                        pool.deregisterWorker(this, exception);//注销工作线程
                    }
                }
            }
        }
    }
        public class ForkJoinPool{
            final void runWorker(WorkQueue w) {
                w.growArray();                   // allocate queue,队列初始化
                int seed = w.hint;               // initially holds randomization hint
                int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
                for (ForkJoinTask<?> t;;) {//自旋
                    if ((t = scan(w, r)) != null)//从队列中窃取任务成功,scan()进行任务窃取
                        w.runTask(t);//执行任务,内部方法调用了doExec()进行任务的消费
                    else if (!awaitWork(w, r))//队列没有任务了则结束
                        break;
                    r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
                }
            }
        }
      1. scan:
        private ForkJoinTask<?> scan(WorkQueue w, int r) {
                WorkQueue[] ws; int m;
                if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
                    int ss = w.scanState;                     // initially non-negative
                    for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                        WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                        int b, n; long c;
                        if ((q = ws[k]) != null) {   //随机选中了非空队列 q
                            if ((n = (b = q.base) - q.top) < 0 &&
                                (a = q.array) != null) {      // non-empty
                                long i = (((a.length - 1) & b) << ASHIFT) + ABASE;  //从尾部出队,b是尾部下标
                                if ((t = ((ForkJoinTask<?>)
                                          U.getObjectVolatile(a, i))) != null &&
                                    q.base == b) {
                                    if (ss >= 0) {
                                        if (U.compareAndSwapObject(a, i, t, null)) { //利用cas出队
                                            q.base = b + 1;
                                            if (n < -1)       // signal others
                                                signalWork(ws, q);
                                            return t;  //出队成功,成功窃取一个任务!
                                        }
                                    }
                                    else if (oldSum == 0 &&   // try to activate 队列没有激活,尝试激活
                                             w.scanState < 0)
                                        tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                                }
                                if (ss < 0)                   // refresh
                                    ss = w.scanState;
                                r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                                origin = k = r & m;           // move and rescan
                                oldSum = checkSum = 0;
                                continue;
                            }
                            checkSum += b;
                        }
               //k = k + 1表示取下一个队列 如果(k + 1) & m == origin表示已经遍历完所有队列了
                        if ((k = (k + 1) & m) == origin) {    // continue until stable 
                            if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                                oldSum == (oldSum = checkSum)) {
                                if (ss < 0 || w.qlock < 0)    // already inactive
                                    break;
                                int ns = ss | INACTIVE;       // try to inactivate
                                long nc = ((SP_MASK & ns) |
                                           (UC_MASK & ((c = ctl) - AC_UNIT)));
                                w.stackPred = (int)c;         // hold prev stack top
                                U.putInt(w, QSCANSTATE, ns);
                                if (U.compareAndSwapLong(this, CTL, c, nc))
                                    ss = ns;
                                else
                                    w.scanState = ss;         // back out
                            }
                            checkSum = 0;
                        }
                    }
                }
                return null;
            }
      2. ForkJoinPool.runTask:
                volatile int scanState;    // versioned, <0: inactive; odd:scanning,版本标记,小于0暂停,奇数进行扫描其他任务
                static final int SCANNING     = 1;             // false when running tasks,有任务执行是false
                /**
                 * Executes the given task and any remaining local tasks.
                 * 执行给定的任务和任何剩余的本地任务
                 */
                final void runTask(ForkJoinTask<?> task) {
                    if (task != null) {
                        scanState &= ~SCANNING; // mark as busy,暂停扫描,当前有任务执行
                        (currentSteal = task).doExec();//执行窃取的任务
                        U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC,窃取的任务执行完置为null
                        execLocalTasks();//执行本地的任务,即自己workQueue的任务,调用doExec执行到workQueue空为止
                        ForkJoinWorkerThread thread = owner;
                        if (++nsteals < 0)      // collect on overflow,窃取计数溢出
                            transferStealCount(pool);//重置窃取计数
                        scanState |= SCANNING;//继续扫描队列
                        if (thread != null)
                            thread.afterTopLevelExec();
                    }
                }
                static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
                        @Override // to erase ThreadLocals,清除threadLocals
                        void afterTopLevelExec() {
                            eraseThreadLocals();
                        }
                        /**
                         * Erases ThreadLocals by nulling out Thread maps.
                         */
                        final void eraseThreadLocals() {
                            U.putObject(this, THREADLOCALS, null);//threadLocals置为null
                            U.putObject(this, INHERITABLETHREADLOCALS, null);//inheritablethreadlocals置为null
                        }
                } 

    四.总结

      对于fork/join来说,在使用时还是存在下面的一些问题的:

    • 在使用JVM的时候我们要考虑OOM的问题,如果我们的任务处理时间非常耗时,并且处理的数据非常大的时候,会造成OOM;
    • ForkJoin是通过多线程的方式进行处理任务,那么我们不得不考虑是否应该使用ForkJoin。因为当数据量不是特别大的时候,我们没有必要使用ForkJoin。因为多线程会涉及到上下文的切换,所以数据量不大的时候使用串行比使用多线程快;
      • 项目中进行本地测试发现,业务层Service进行excel表数据(数据量几百)的复杂处理,进行单线程for循环统计消耗时间,然后与使用fork/join进行处理统计消耗时间,发现fork/join的消耗时间是单线程for的2倍;

    欢迎任何形式的转载,但请务必注明出处。

    如果觉得文章还行,希望推荐+关注下,谢谢!

    限于本人水平,如果文章和代码有表述不当之处,还请不吝赐教。

    展开全文
  • 多线程在web中的使用

    千次阅读 2018-09-28 15:01:05
    整理网络上的 最典型的如: ... 且这类任务即使失败也不是特别重要的。 2、后台线程:比如定期...最典型的应用比如tomcat,tomcat内部采用的就是多线程。 上百个客户端访问同一个web应用,tomcat接入后都是把后续的处理...

    整理网络上的

    最典型的如:
    1、用户注册完成送大礼包/积分之类,且积分等也是另一个系统并比较耗时;
    且这类任务即使失败也不是特别重要的。

    2、后台线程:比如定期执行一些特殊任务,如定期更新配置文件,任务调度(如quartz),一些监控用于定期信息采集等。

    回答二:
    最典型的应用比如tomcat,tomcat内部采用的就是多线程。
    上百个客户端访问同一个web应用,tomcat接入后都是把后续的处理扔给一个新的线程来处理,这个新的线程最后调用到我们的servlet程序,比如doGet或者doPost方法。
    如果不采用多线程机制,上百个人同时访问一个web应用的时候,tomcat就得排队串行处理了,那样客户端根本是无法忍受那种访问速度的。

    还有就是需要异步处理的时候,需要使用多线程。
    比如task a和task b要并行处理,单个线程只能串行处理,先做完task a然后再做task b。

    如果想要多个task同时执行的话,就必须为每个task分配一个线程,然后通过java虚拟机的线程调度,来同时执行多个任务。

    比如你的CPU是多核心的话,就可以让一个CPU执行一个线程。
    如果只有一个CPU的话,底层是按照分时复用的原则,各个线程按照时间片来获得CPU资源。

    回答三:

    特别耗时的操作

    如备份数据库,可以开个线程执行备份,然后执行返回,前台不断向后台询问线程执行状态问

    JAVA项目中哪些场景需要用到多线程,深感迷茫,请使用过的高手指点。

    场景一:
    一个业务逻辑有很多次的循环,每次循环之间没有影响

    比如验证1万条url路径是否存在,正常情况要循环1万次,逐个去验证每一条URL,这样效率会很低,假设验证一条需要1分钟,总共就需要1万分钟,有点恐怖。

    这时可以用多线程,将1万条URL分成50等份,开50个线程,每个线程只需验证200条,这样所有的线程执行完是远小于1万分钟的。

    场景二:

    需要知道一个任务的执行进度,比如我们常看到的进度条,实现方式可以是在任务中加入一个整型属性变量(这样不同方法可以共享),任务执行一定程度就给变量值加1,另外开一个线程按时间间隔不断去访问这个变量,并反馈给用户。

    总之使用多线程就是为了充分利用cpu的资源,提高程序执行效率,当你发现一个业务逻辑执行效率特别低,耗时特别长,就可以考虑使用多线程。

    不过CPU执行哪个线程的时间和顺序是不确定的,即使设置了线程的优先级

    因此使用多线程的风险也是比较大的,会出现很多预料不到的问题,一定要多熟悉概念,多构造不同的场景去测试才能够掌握!

    最典型的如:1、用户注册完成送大礼包/积分之类,且积分等也是另一个系统并比较耗时;且这类任务即使失败也不是特别重要的。2、后台线程:比如定期执行一些特殊任务,如定期更新配置文件,任务调度(如quartz),一些监控用于定期信息采集等。回答二:最典型的应用比如tomcat,tomcat内部采用的就是多线程,上百个客户端访问同一个web应用,tomcat接入后都是把后续的处理扔给一个新的线程来处理,这个新的线程最后调用到我们的servlet程序,比如doGet或者doPost方法。如果不采用多线程机制,上百个人同时访问一个web应用的时候,tomcat就得排队串行处理了,那样客户端根本是无法忍受那种访问速度的。还有就是需要异步处理的时候,需要使用多线程。比如task a和task b要并行处理,单个线程只能串行处理,先做完task a然后再做task b。如果想要多个task同时执行的话,就必须为每个task分配一个线程,然后通过java虚拟机的线程调度,来同时执行多个任务。比如你的CPU是多核心的话,就可以让一个CPU执行一个线程。如果只有一个CPU的话,底层是按照分时复用的原则,各个线程按照时间片来获得CPU资源。回答三:特别耗时的操作,如备份数据库,可以开个线程执行备份,然后执行返回,前台不断向后台询问线程执行状态问:JAVA项目中哪些场景需要用到多线程,深感迷茫,请使用过的高手指点。答:场景一:一个业务逻辑有很多次的循环,每次循环之间没有影响,比如验证1万条url路径是否存在,正常情况要循环1万次,逐个去验证每一条URL,这样效率会很低,假设验证一条需要1分钟,总共就需要1万分钟,有点恐怖。这时可以用多线程,将1万条URL分成50等份,开50个线程,没个线程只需验证200条,这样所有的线程执行完是远小于1万分钟的。场景二:需要知道一个任务的执行进度,比如我们常看到的进度条,实现方式可以是在任务中加入一个整型属性变量(这样不同方法可以共享),任务执行一定程度就给变量值加1,另外开一个线程按时间间隔不断去访问这个变量,并反馈给用户。总之使用多线程就是为了充分利用cpu的资源,提高程序执行效率,当你发现一个业务逻辑执行效率特别低,耗时特别长,就可以考虑使用多线程。不过CPU执行哪个线程的时间和顺序是不确定的,即使设置了线程的优先级,因此使用多线程的风险也是比较大的,会出现很多预料不到的问题,一定要多熟悉概念,多构造不同的场景去测试才能够掌握!


    有一些比较耗时,但又很重复的工作可以多线程。

    比如入库,入库时很耗时的。但如果入库的数据已准备好,并且很多。多线程就好一些。

    比如同步数据,你不知道什么时候对方会给你传递数据过来,这个时候你就间隔一段时间让他自己去执行,其实还涉及一些逻辑上的处理比较耗的。这样不影响你其他地方的处理……

    (1)线程的工作场景主要有两条:
    一个是并发操作,避免阻塞和更有效利用资源。典型的例子有:在长时间工作的程序中使用工作线程避免界面失去响应。在网络下载程序中,使用多个线程提高对网络的使用效率,更快下载文件。
    一个是并行,线程是处理器调度的最小单位。如果你的计算机配置了多个处理器或者内核,那么可以同时利用多个处理器同时计算,加快问题解决的速度。
    (2)多线程的工作原理:
    对于单处理器系统,操作系统会轮流调度每个线程执行一小段时间,然后切换另一个线程,在切换的时候,保存当前线程使用的寄存器上下文和堆栈,并且在下次调度的时候恢复。这样线程中的程序感觉不到自己被中断过。对于多处理器系统,操作系统会将不同的线程调度给多个处理器,让它们并行执行。

    这个要用,由很多可用的,看你的程序结构,发邮件是一个,假如你的文档管理系统要做全文索引,那么在用户上传一个文件后,后台要做索引,就可以启动一个线程去创建索引,还有如果你的文档管理系统权限比较复杂,涉及文件夹父子关系权限的继承和覆盖等复杂的ACL操作,都可以使用多线程。

    1、单线程和多线程区别:
    单线程处理的优点:同步应用程序的开发比较容易,但由于需要在上一个任务完成后才能开始新的任务,所以其效率通常比多线程应用程序低,如果完成同步任务所用的时间比预计时间长,应用程序可能会不响应。
    多线程处理可以同时运行多个过程,简单说下多线程开发的益处:
    1.多线程开发可以将耗时操作放入子线程,将UI刷新加入主线程,防止页面卡顿。
    2.在并发操作时使用多线程,如C/S架构的服务器端并发线程响应用户的请求。
    3.在多核CPU系统中,使用线程可以提高程序响应速度,提高CPU和内存的利用率。
    4.改善程序结构。将一个复杂的的进程分为多个线程,减少类之间的耦合。
    5.将程序分块管理,方便程序的开发和维护。
    6.可以随时停止任务。 可以分别设置各个任务的优先级以优化性能。
    2、开启多线程
    一个请求就是一个线程,这个线程不需要我们来控制,WEB容器自己实现,这是第一个应用。
    我们也可以在web中new线程来做我们的事。比如,当我有一个很耗时的操作,像统计排名之类的功能。当用户在web上点击排名时,由于这个计算量太大,可能要计算3~5分钟或更久。这时我们会在servlert中new一个线程来做这个事情,这样用户就可以点击排名后去做其他的事,等排名出来现通知他。如果不new线程当用户点排名时,浏览器就会一直卡在这里,一个圈圈转啊转的,就是不出来。所以我们可以new线程来做耗是任务。还有很多比如,定时任务、WEB版的爬虫程序、监听等
    3、场景一:一个业务逻辑有很多次的循环,每次循环之间没有影响,比如验证1万条url路径是否存在,正常情况要循环1万次,逐个去验证每一条URL,这样效率会很低,假设验证一条需要1分钟,总共就需要1万分钟,有点恐怖。这时可以用多线程,将1万条URL分成50等份,开50个线程,没个线程只需验证200条,这样所有的线程执行完是远小于1万分钟的。
    场景二:需要知道一个任务的执行进度,比如我们常看到的进度条,实现方式可以是在任务中加入一个整型属性变量(这样不同方法可以共享),任务执行一定程度就给变量值加1,另外开一个线程按时间间隔不断去访问这个变量,并反馈给用户。
    场景三:电商项目中controller层需要接收前台传来的需要下架商品id,因为是全选操作所以也就是数据库中商品表的所有id。然后调用service层业务逻辑进行删除。本人数据库中商品表大概有3000条数据,在不考虑多线程方式下,速度已经明显受到影响,实际项目中又何止成千上万条数据!如果单线程逐一进行删除其性能可想而知。所以考虑创建多个线程,多线程并发形式执行商品下线业务逻辑。
    总之使用多线程就是为了充分利用cpu的资源,提高程序执行效率,当你发现一个业务逻辑执行效率特别低,耗时特别长,就可以考虑使用多线程。不过CPU执行哪个线程的时间和顺序是不确定的,即使设置了线程的优先级,因此使用多线程的风险也是比较大的,会出现很多预料不到的问题,一定要多熟悉概念,多构造不同的场景去测试才能够掌握!

    为什么要使用多线程?
    1.防止阻塞主线程,提高吞吐量
    2,提高资源的利用率
    应用场景:
    1,最典型的应用比如tomcat,tomcat内部采用的就是多线程,上百个客户端访问同一个web应用,tomcat接入后都是把后续的处理扔给一个新的线程来处理,这个新的线程最后调用到我们的servlet程序,比如doGet或者doPost方法。

    2,做登录业务时使用到sms短信网关业务,创建子线程,让子线程去调用sms服务,主线程则返回,这样做可以提高用户的体验度
    3,图片上传业务
    4

    项目中如何使用多线程
    多线程在项目中主要用来解决并发任务执行。java中线程的主要实现方式有三种:继承Thread类 实现Runnable接口 实现Callable接口。另外还可以通过Executor类来创建多线程线程池。
    线程生命周期:首先通过继承thread或者实现runnable接口来创建一个线程,当调用线程的start方法,线程进入就绪状态,如果这时处理器有资源运行此线程,线程就进入运行状态,如果线程内部调用了sleep就会放弃cpu资源,返回到阻塞状态,线程等待某个通知,sleep时间到了之后,如果其他线程发送通知,那么当前线程就从阻塞状态变为就绪状态恢复执行。另如果调用了yield()方法也会从运行状态变为就绪状态。一般来说线程执行完毕或者调用stop方法线程结束生命周期。
    四种模板线程池:1可缓存线程池 newCachedThreadPool 2定长线程池 newFixedThreadPool 3定长支持定时及周期性任线程池, newScheduledThreadPool 4单线程化的线程池(有序) newSingleThreadExecutor 。我们在项目中主要使用了第二种就是定长线程池newFixedThreadPool,一般多线程进行并行任务处理需要配合队列使用。队列中存放任务信息,当线程池中的线程进行任务处理,主动去队列领取任务,队列将任务弹出并交由线程执行,所有线程谁先执行完,就领取行的任务,直到队列中没有任务。
    线程安全:一般通过加锁解决安全问题,保证数据一致性。一般我们可以使用synchronized标记方法或者代码块,来保证原子性操作。但是synchronized性能不如volatile。 在java底层中一些设计线程安全的源码都是用了volatile关键字。多线程如果要保证数据安全必须要保证原子性、可见性以及有序性。一般情况下当多个线程同时执行时,如果多个线程同时访问同一变量,如果变量所在方法没有使用synchronized,将导致每个线程只关注自己线程内cache的变量值,当多个线程将变量同步到主线程的主存时,会发生数据不一致的情况。如果使用volatile可以让变量拥有可见性,多个线程进行执行时,每个线程都会看到主线程中的主存的变量值发生的改变,进行修正,保证与自己线程数据同步,在线程修改变量时,volatile关键字会强制将修改的值立即写入主存,其他线程中的对应缓存变量就会被强制标记为无效,而从主存中进行同步。

    项目业务场景:
    批量页面静态化 在系统中,商品详情页我们使用freemarker来进行页面静态化,每天夜里12点开始要对所有商品页面进行一遍静态化,由于商品数量比较多 如果使用单线程将耗时过长,我们使用一个定长线程池进行批量执行,将任务放在队列中,多个线程同时领取并执行。

    订单处理(用户下单后可能支付状态不明确,我们后台可以通过多线程去主动核实第三方支付状态,来更新我们系统的订单状态)
    登录后用户信息处理(用户登录后应该通知各相关系统将用户常用数据进行缓存 以快速响应登录用户),

    大CC的博客

    内容目录:
    多进程模型多线程模型选用参考

    多线程和多进程模型的选用
    这里的线程指通过linux的pthread_create而产生的原生线程,线程资源很宝贵,能被操作系统的任务调度器看见的(不是python gevent、go gorouine里的概念);
    我们讨论以下两种模型;
    多进程单线程模型(以下简称为多进程);单进程多线程模型(以下简称为多线程);多进程模型
    优点
    编程相对容易;通常不需要考虑锁和同步资源的问题。
    更强的容错性:比起多线程的一个好处是一个进程崩溃了不会影响其他进程。
    有内核保证的隔离:数据和错误隔离。
    对于使用如C/C++这些语言编写的本地代码,错误隔离是非常有用的:采用多进程架构的程序一般可以做到一定程度的自恢复;(master守护进程监控所有worker进程,发现进程挂掉后将其重启)
    多进程的案例
    nginx主流的工作模式是多进程模式(也支持多线程模型)
    几乎所有的web server服务器服务都有多进程的,至少有一个守护进程配合一个worker进程,例如apached,httpd等等以d结尾的进程包括init.d本身就是0级总进程,所有你认知的进程都是它的子进程;
    chrome浏览器也是多进程方式。
    redis也可以归类到“多进程单线程”模型(平时工作是单个进程,涉及到耗时操作如持久化或aof重写时会用到多个进程)
    多线程模型
    优点
    多线程优点:创建速度快,方便高效的数据共享
    共享数据:多线程间可以共享同一虚拟地址空间;多进程间的数据共享就需要用到共享内存、信号量等IPC技术;
    较轻的上下文切换开销 - 不用切换地址空间,不用更改寄存器,不用刷新TLB。
    提供非均质的服务
    如果全都是计算任务,但每个任务的耗时不都为1s,而是1ms-1s之间波动;这样,多线程相比多进程的优势就体现出来,它能有效降低“简单任务被复杂任务压住”的概率;
    适用的场景
    1 线程间有数据共享,并且数据是需要修改的(不同任务间需要大量共享数据或频繁通信时);
    2 提供非均质的服务(有优先级任务处理)事件响应有优先级;
    3 单任务并行计算,在非CPU Bound的场景下提高响应速度,降低时延;
    4 与人有IO交互的应用,良好的用户体验(键盘鼠标的输入,立刻响应)
    多线程案例
    桌面软件,响应用户输入的是一个线程,后台程序处理是另外的线程;
    memcached
    选用
    单进程多线程和多进程单线程,2种模式如何取舍?
    进程线程间创建的开销不足作为选择的依据,因为一般我们都是使用线程池或者进程池,在系统启动时就创建了固定的线程或进程,不会频繁的创建和销毁;
    首先,根据工作集(需要共享的内存)的大小来定;如果工作集较大,就用多线程,避免cpu cache频繁的换入换出;比如memcached缓存系统;
    其次,选择的依据根据以上多线程适用的场景来对比自身的业务场景,是否有这样场景需求:数据共享、提供非均质的服务,单任务拆散并行化等;
    如果没有必要,或者多进程就可以很好的胜任,就多用多进程,享受单线程编程带来便利;
    RCU的发明者,Paul McKenny 在《Is Parallel Programming Hard, And, If So, What Can You Do About It?》说过:
    能用多进程方便的解决问题的时候不要使用多线程。
    参考
    ref:《Linux多线程服务端编程:使用muduo网络库》
    ref:http://www.zhihu.com/question/19903801
    ref:https://computing.llnl.gov/tutorials/pthreads/#WhyPthreads


    本文来自 RoyKuang07 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/pirlck/article/details/52296716?utm_source=copy

    本文来自 Xiayubing_ 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/Xiayubing_/article/details/81381704?utm_source=copy

    本文来自 袖扣 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/Sundefind/article/details/79062239?utm_source=copy

    本文来自 fmwind 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/fmwind/article/details/77068706?utm_source=copy
    参考链接

    https://blog.csdn.net/kangkang_hacker/article/details/80863359?utm_source=copy

    展开全文
  • Spring JDBC-事务管理中的多线程问题

    千次阅读 2017-09-27 04:22:57
    概述 示例 结论 示例源码概述众所周知,Spring 的事务管理器是通过线程相关...我们知道 Web 容器本身就是多线程的,Web 容器为一个 Http 请求创建一个独立的线程,所以由此请求所牵涉到的 Spring 容器中的 Bean 也是运
  • 系统、全面、深入掌握Java多线程的核心知识主讲老师:CC老师     20年Java开发和使用经验,多年的首席架构师和CTO,畅销原创书籍《研磨设计模式》的作者。         ...
  • 简单的多线程工具,用于从commoncrawl.org中提取与域相关的数据 用法 ccp.py [-h] -d domain -o path [-t THREADS] [-f index1] [-f index2] necessary arguments: -d, --domain The domain you want to search for...
  • Chromium多线程模型设计和实现分析

    万次阅读 多人点赞 2015-07-27 00:59:08
    Chromium除了远近闻名的多进程架构之外,它的多线程模型也相当引人注目的。Chromium的多进程架构是为了解决网页的稳定性问题,而多线程模型则是为了解决网页的卡顿问题。为了达到这个目的,Chromium的多线程模型是...
  • 多线程(图解)

    2019-09-11 21:09:09
    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...
  • MySQL源码调试(二)多线程调试 gdb 线程操作命令集合 no-stop模式 一条查询的哪些阶段会阻塞flush tables操作 结束语 MySQL源码调试(二)多线程调试 MySQL是一个多线程的服务端程序,所以必须了解一下,...
  • Android 多线程下载源码实现详细注释

    千次阅读 2012-08-15 16:20:51
    * 还有一个是按照一次最大的文件块来分的这也是实现多线程下载的一个方式 * @author WH1107022 * */ public class downloadTask extends Thread { private int blockSize, downloadSizeMore; // 每一个...
  • XGBoost源码分析之单机多线程的实现

    千次阅读 2017-02-06 18:57:06
    XGBoost源码思路清晰,读起来没有什么困难。通过阅读XGBoost源码,我梳理了一些类之间的关系,可以为以后自己写代码提供参考之处。由于代码量不算小,没办法做到面面俱到,我总结了几处自己认为比较重要的方面。如有...
  • 当我们测试线程操作a++的时候,会出现以下结果 public class CasDemo2 { public static void main(String[] args) { Castest castest=new Castest(); for(int i=0;i<10;i++){ ...
  • 多线程任务计划程序 网页 1.00 网络 1455 BSD 轻量级的Web服务器 定义文件 1.00 压缩 525 公共区域 给压缩机放气 fl 1.00 减压 320 公共区域 放气减压器 图书馆总数:5 C代码总行数:4487 常问问题 为什么...
  • WebRTC源码分析-线程基础之线程管理

    千次阅读 2019-11-08 19:44:15
    WebRTC中的线程管理是通过ThreadManager对象来实施的,该类起着牧羊者的作用,rtc::Thread类对象就是羊群。其通过什么样的技术来实现对rtc::Thread管理的?在不同的系统平台下如何实现?下文将进行阐述。 该类的...
  • 多线程分批次查询数据欢迎使用Markdown编辑器需求:在系统开发和对接过程中,常常出现大数据量获取的情况,比如你是A系统,去获取B系统的数据,B系统只是给你接口。并且返回只能返回10w条数据,但是实际上你查出来的...
  • Android 多线程编程的总结

    千次阅读 2017-04-16 15:57:03
    这几天在研究Android的多线程方面的知识,阅读了许多大牛的文章,发现Android的多线程方式挺多的,关于各种方式的优缺点也都各有看法,所以这部分的知识还是很容易令人觉得混乱的,所以自己梳理了相关知识,用自己的...
  • Java多线程

    2021-05-15 19:38:29
    文章目录相关概念线程与进程并行、并发、串行同步与异步守护线程线程的状态创建线程Thread类Runnable接口Callable接口线程池常用方式SingleThreadExecutorFixedThreadPoolCachedThreadPool阻塞队列为什么使用阻塞...
  • C++多线程强制终止

    千次阅读 2021-06-04 10:24:24
    摘要:实际上,没有任何语言或操作系统可以为你提供异步突然终止线程的便利,且不会警告你不要使用它们。
  • 1 如何停止一个线程? 答: 1.1 这道题想考察什么? 答:(1)考察要点 ●是否对线程的用法有了解;是否对线程的stop方法有了解(初级) ●是否对线程stop过程中存在的问题有认识;是否熟悉interrupt中断的用法(中级) ●...
  • 线程对象实现:实现Thread类、ThreadData结构体 、CurrentThread命名空间功能:实现线程对象并实现当前线程信息存储知识点: __thread __thread是GCC内置的线程局部存储设施,存取效率可以和全局变量相比。__...
  • 在我的博客中,曾经发布了一篇...只不过是支持多线程的。 纳尼??感情是之前那个不支持多线程?Sorry,我说错了;两个都是支持多线程调用的。不过新讲的这个是能在内部采用多线程进行分段模糊。 原来的:[Android]-
  • SLog是用于C / C ++的简单且线程安全的日志记录库,可以轻松控制详细程度,对输出进行标记和着色,记录到文件,即时更改配置参数等等。 安装 可以使用Makefile安装(CMake列表也包含在项目中)。 git clone ...
  • 那么可以使用多线程来并发进行,即我们可以先网购下单,在等待快递员送货过来的这段时间去菜市场买食材,节省时间,提高效率。 直接开启线程,使用类继承Thread重写方法实现网购,join阻塞直到厨具到达才开始做饭。 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 21,385
精华内容 8,554
关键字:

多线程cc源码