为您推荐:
精华内容
最热下载
问答
  • 5星
    94KB weixin_42350212 2020-12-12 20:12:12
  • 5星
    49.33MB weixin_38950816 2021-04-28 18:55:32
  • 5星
    24.66MB zhangchen124 2021-09-14 21:51:07
  • 5星
    360KB weixin_46931877 2021-08-04 13:42:34
  • 5星
    680KB zb12138 2021-03-15 16:34:59
  • 5星
    16MB qq_30787727 2021-06-03 22:41:58
  • 5星
    6.55MB a876106354 2021-06-28 11:27:06
  • 28.26MB weixin_46931877 2021-09-06 10:37:14
  • 5星
    69.32MB hyjchina 2021-03-03 11:17:44
  • 5星
    6KB thy77 2021-04-15 16:42:43
  • 5星
    93.75MB u011626417 2018-11-18 18:28:31
  • 96.56MB nista 2018-05-24 09:06:27
  • 5星
    92.83MB zhousiwei 2018-03-23 15:22:23
  • 93.81MB hardwork617s 2018-08-28 22:52:54
  • 94MB njust_zgx 2018-01-09 09:49:14
  • 162KB weixin_38736529 2021-01-27 11:09:09
  • 5星
    96.54MB woxpp 2014-08-18 16:20:16
  • 18KB weixin_42119866 2021-04-28 16:30:49
  • 78KB weixin_38730977 2021-01-21 18:44:37
  • 4星
    49KB z0582 2012-08-19 09:39:09
  • 5星
    50.36MB tjoy2005 2013-10-10 21:58:45
  • c#并行队列任务No doubts that async/await pattern has significantly simplified working with asynchronous operations in C#. However, this simplification relates only to the situation when asynchronous ...

    c#并行队列任务

    image

    No doubts that async/await pattern has significantly simplified working with asynchronous operations in C#. However, this simplification relates only to the situation when asynchronous operations are executed consequently. If we need to execute several asynchronous operations simultaneously (e.g. we need to call several micro-services) then we do not have many built-in capabilities and most probably Task.WhenAll will be used:

    毫无疑问, async/await模式已大大简化了C#中异步操作的工作。 但是,这种简化仅涉及随后执行异步操作的情况。 如果我们需要同时执行多个异步操作(例如,我们需要调用多个微服务),则我们没有很多内置功能,很可能将使用Task.WhenAll

    Task<SomeType1> someAsyncOp1 = SomeAsyncOperation1();
    Task<SomeType2> someAsyncOp2 = SomeAsyncOperation2();
    Task<SomeType3> someAsyncOp3 = SomeAsyncOperation3();
    Task<SomeType4> someAsyncOp4 = SomeAsyncOperation4();
    await Task.WhenAll(someAsyncOp1, someAsyncOp2, someAsyncOp4);
    var result = new SomeContainer(
         someAsyncOp1.Result,someAsyncOp2.Result,someAsyncOp3.Result, someAsyncOp4.Result);

    This is a working solution, but it is quite verbose and not very reliable (you can forget to add a new task to “WhenAll”). I would prefer something like that instead:

    这是一个可行的解决方案,但是它很冗长且不够可靠(您可以忘记在“ WhenAll”中添加新任务)。 我更喜欢这样的东西:

    var result =  await 
        from r1 in SomeAsyncOperation1()
        from r2 in SomeAsyncOperation2()
        from r3 in SomeAsyncOperation3()
        from r4 in SomeAsyncOperation4()
        select new SomeContainer(r1, r2, r3, r4);

    Further I will tell you what is necessary for this construction to work...

    此外,我将告诉您该构造起作用的必要条件...

    First, we should remember that C# query syntax is just a syntax sugar over method call chains and C# preprocessor will convert the previous statement into a cain of SelectMany calls:

    首先,我们应该记住,C#查询语法只是方法调用链上的语法糖,C#预处理程序会将先前的语句转换为SelectMany调用的SelectMany

    SomeAsyncOperation1()/*Task<T1>*/
      .SelectMany(
          (r1/*T1*/) => SomeAsyncOperation2()/*Task<T2>*/,
          (r1/*T1*/, r2/*T2*/) => new {r1, r2}/*Anon type 1*/)
      .SelectMany(
          (t/*Anon type 1*/) => SomeAsyncOperation3()/*Task<T3>*/,
          (t/*Anon type 1*/, r3/*T3*/) => new {t, r3}/*Anon type 2*/)
      .SelectMany(
          (t/*Anon type 2*/) => SomeAsyncOperation4()/*Task<T4>*/, 
          (t/*Anon type 2*/, r4/*T4*/) => new SomeContainer(t.t.r1, t.t.r2, t.r3, r4));

    Скрытыйтекст (Скрытый текст)

    By default C# Compiler will complain about this code since SelectMany extension method is defined only for IEnumerable interface, but nothing prevents us to create our own overloads of SelectMany to make the code compilable.

    默认情况下,由于SelectMany扩展方法仅为IEnumerable接口定义,因此C#编译器将抱怨此代码,但是没有什么可以阻止我们创建自己的SelectMany重载以使代码可编译。

    Each SelectMany function in the chain has two arguments*:

    链中的每个SelectMany函数都有两个参数*:

    • the first argument is a link to a function which returns a next asynchronous operation

      第一个参数是指向函数的链接,该函数返回下一个异步操作

    (t/*Anon type 1*/) => SomeAsyncOperation3(),/*Task<T2>*/

    (t/*Anon type 1*/) => SomeAsyncOperation3(),/*Task<T2>*/

    • the second argument is a link to a function that combines results of previous asynchronous operations with a result of the operation returned by the function which is passed as the first argument.

      第二个参数是指向函数的链接,该函数将先前异步操作的结果与作为第一个参数传递的函数返回的操作的结果进行组合。

    (t/*Anon type 1*/, r3/*T3*/) => new {t, r3}/*Anon type 2*/)

    (t/*Anon type 1*/, r3/*T3*/) => new {t, r3}/*Anon type 2*/)

    We can call the first functions to get a list of tasks which will be used in Task.WhenAll and then call the second mapping functions to build a result.

    我们可以调用第一个函数以获取将在Task.WhenAll使用的任务列表,然后调用第二个映射函数以构建结果。

    To get the task list and the list of mapping functions our SelectMany overload needs to return some objects which will contain links to a task and a mapping function. In addition to that, SelectMany receives a links to a previous object as this argument (in case if SelectMany is an extension method) — we also need this link to build a linked list which will contain all required data.

    为了获得任务列表和映射函数列表,我们的SelectMany重载需要返回一些对象,这些对象将包含任务和映射函数的链接。 除此之外, SelectMany还会收到一个指向先前对象的链接作为this参数(如果SelectMany是扩展方法)—我们还需要此链接来构建一个包含所有必需数据的链接列表。

    static class TaskAllExtensions
    {
        public static ITaskAccumulator<TRes> SelectMany<TCur, TNext, TRes>(
            this ITaskAccumulator<TCur> source, 
            Func<TCur, Task<TNext>> getNextTaskFunc, 
            Func<TCur, TNext, TRes> mapperFunc) 
        => 
            new TaskAccumulator<TCur, TNext, TRes>(
                prev: source, 
                currentTask: getNextTaskFunc(default(TCur)), 
                mapper: mapperFunc);
    }
    
    class TaskAccumulator<TPrev, TCur, TRes> : ITaskAccumulator<TRes>
    {
        public readonly ITaskAccumulator<TPrev> Prev;
    
        public readonly Task<TCur> CurrentTask;
    
        public readonly Func<TPrev, TCur, TRes> Mapper;
        ...
    }

    初始蓄能器 (The initial accumulator)

    The initial accumulator differs from the subsequent ones since It cannot have a link to a previous accumulator, but it should have a link to the first task:

    初始累加器与后续累加器有所不同,因为它无法链接到先前的累加器,但是应该具有到第一个任务的链接:

    static class TaskAllExtensions
    {
        ...
        public static ITaskAccumulator<TRes> SelectMany<TCur, TNext, TRes>(
            this Task<TCur> source, 
            Func<TCur, Task<TNext>> getNextTaskFunc, 
            Func<TCur, TNext, TRes> mapperFunc) 
        => 
            new TaskAccumulatorInitial<TCur, TNext, TRes>(
                task1: source, 
                task2: getNextTaskFunc(default(TCur)), 
                mapper: mapperFunc);
        ...
    }
    
    class TaskAccumulatorInitial<TPrev, TCur, TRes> : ITaskAccumulator<TRes>
    {
        public readonly Task<TPrev> Task1;
    
        public readonly Task<TCur> Task2;
    
        public readonly Func<TPrev, TCur, TRes> Mapper;
        ...
    }

    Now we can get the result by adding these methods:

    现在我们可以通过添加以下方法获得结果:

    class TaskAccumulator<TPrev, TCur, TRes> : ITaskAccumulator<TRes>
    {
        public async Task<TRes> Result()
        {
            await Task.WhenAll(this.Tasks);
            return this.ResultSync();
        }
    
        internal IEnumerable<Task> Tasks 
            => new Task[] { this.CurrentTask }.Concat(this.Prev.Tasks);
    
        internal TRes ResultSync() 
            => this.Mapper(this.Prev.ResultSync(), this.CurrentTask.Result);
        ...
        public readonly ITaskAccumulator<TPrev> Prev;
        public readonly Task<TCur> CurrentTask;
        public readonly Func<TPrev, TCur, TRes> Mapper;        
    }
    • Tasks property returns all tasks from the entire linked list.

      Tasks属性返回整个链接列表中的所有任务。

    • ResultSync() recursively applies mapper functions to the task results (all the tasks are supposed to be already resolved).

      ResultSync()递归将映射器函数应用于任务结果(假定所有任务都已解决)。

    • Result() resolves all tasks (through await Task.WhenAll(Tasks)) and returns result of ResultSync()

      Result()解决所有任务(通过await Task.WhenAll(Tasks) ),并返回ResultSync()结果

    Also, we can add a simple extension method to make await work with ITaskAccumulator:

    另外,我们可以添加一个简单的扩展方法来使ITaskAccumulator await ITaskAccumulator

    static class TaskAllExtensions
    {
        ...
        public static TaskAwaiter<T> GetAwaiter<T>(this ITaskAccumulator<T> source)
            => source.Result().GetAwaiter();
    }

    Now the code is working:

    现在代码可以正常工作了:

    var result =  await 
        from r1 in SomeAsyncOperation1()
        from r2 in SomeAsyncOperation2()
        from r3 in SomeAsyncOperation3()
        from r4 in SomeAsyncOperation4()
        select new SomeContainer(r1, r2, r3, r4);

    However, there is an issue here — C# allows using an intermediate result as an argument for further operations. For example:

    但是,这里存在一个问题-C#允许使用中间结果作为进一步操作的参数。 例如:

    from r2 in SomeAsyncOperation2()
        from r3 in SomeAsyncOperation3(r2)

    Such code will lead to "Null Reference Exception" since r2 is not yet resolved at the moment when SomeAsyncOperation3 is called:

    这样的代码将导致“空引用异常”,因为在调用SomeAsyncOperation3时尚未解析r2:

    ... 
    task2:getNextTaskFunc(default(TCur)),
    ...

    since all the tasks are run in parallel.

    因为所有任务都是并行运行的。

    Unfortunately, I do not see a solution for that problem in the current state of C# language, but we can mitigate it by dividing tasks in two groups:

    不幸的是,在C#语言的当前状态下,我看不到该问题的解决方案,但是我们可以通过将任务分为两组来减轻它:

    1. Tasks that are executed in parallel

      并行执行的任务
    2. Tasks that are executed consequently (which can use all previous results).

      因此执行的任务(可以使用以前的所有结果)。

    To do that let's introduce the two simple wrappers over a task:

    为此,让我们为一个任务介绍两个简单的包装器:

    public struct ParallelTaskWrapper<T>
    {
        public readonly Task<T> Task;
    
        internal ParallelTaskWrapper(Task<T> task) => this.Task = task;
    }
    
    public struct SequentialTaskWrapper<T>
    {
        public readonly Task<T> Task;
    
        public SequentialTaskWrapper(Task<T> task) => this.Task = task;
    }

    帮手 (Helpers)

    public static ParallelTaskWrapper<T> AsParallel<T>(this Task<T> task)
    {
        return new ParallelTaskWrapper<T>(task);
    }
    
    public static SequentialTaskWrapper<T> AsSequential<T>(this Task<T> task)
    {
        return new SequentialTaskWrapper<T>(task);
    }

    The only purpose of the tasks is to specify what '''SelectMany''' overloads should be used:

    任务的唯一目的是指定应使用的'''SelectMany'''重载:

    public static ITaskAccumulator<TRes> SelectMany<TCur, TNext, TRes>(
        this ITaskAccumulator<TCur> source, 
        Func<TCur, ParallelTaskWrapper<TNext>> exec, 
        Func<TCur, TNext, TRes> mapper)
    ...

    and (it is a new overload):

    和(这是一个新的重载):

    public static ITaskAccumulator<TRes> SelectMany<TCur, TNext, TRes>(
        this ITaskAccumulator<TCur> source, 
        Func<TCur, SequentialTaskWrapper<TNext>> exec, 
        Func<TCur, TNext, TRes> mapper)
    {
        return new SingleTask<TRes>(BuildTask());
    
        async Task<TRes> BuildTask()
        {
            var arg1 = await source.Result();
            var arg2 = await exec(arg1).Task;
            return mapper(arg1, arg2);
        }
    }

    单任务 (SingleTask)

    internal class SingleTask<T> : ITaskAccumulator<T>
    {
        private readonly Task<T> _task;
        private readonly Task[] _tasks;
    
        public SingleTask(Task<T> task)
        {
            this._task = task;
            this._tasks = new Task[] { task };
        }
    
        public Task<T> Result() => this._task;
        public IEnumerable<Task> Tasks => this._tasks;
        public T ResultSync() => this._task.Result;
    }

    As you see all previous tasks are resolved trough var arg1/*Anon Type X*/ = await source.Result();, so they can be used to retrieve a next task and the code bellow will work properly:

    如您所见,所有以前的任务都是通过var arg1/*Anon Type X*/ = await source.Result(); ,因此它们可用于检索下一个任务,并且下面的代码将正常工作:

    var result =  await 
        from r1 in SomeAsyncOperation1().AsParallel()
        from r2 in SomeAsyncOperation2().AsParallel()
        from r3 in SomeAsyncOperation3().AsParallel()
        from r4 in SomeAsyncOperation4(r1, r2, r3).AsSequential()
        from r5 in SomeAsyncOperation5().AsParallel()
        select new SomeContainer(r1, r2, r3, r4, r5);

    更新(摆脱Task.WhenAll ) (Update (Getting rid of Task.WhenAll))

    We introduced the task accumulator to get a list of tasks be able to call Task.WhenAll over them. But do we really need it? Actually, we do not! The thing is that once we received a link to the task it is already started execution and all the task below are running in parallel (the code from the beginning):

    我们引入了任务累加器,以获取能够调用Task.WhenAll的任务列表。 但是我们真的需要吗? 实际上,我们不! 关键是,一旦我们收到该任务的链接,它就已经开始执行,并且以下所有任务并行运行(从头开始的代码):

    Task<SomeType1> someAsyncOp1 = SomeAsyncOperation1();
    Task<SomeType2> someAsyncOp2 = SomeAsyncOperation2();
    Task<SomeType3> someAsyncOp3 = SomeAsyncOperation3();
    Task<SomeType4> someAsyncOp4 = SomeAsyncOperation4();

    But instead of Task.WhenAll we can use several await-s:

    但是我们可以使用几个await -s代替Task.WhenAll:

    SomeType1 op1Result = await someAsyncOp1;
    SomeType2 op2Result = await someAsyncOp2;
    SomeType3 op3Result = await someAsyncOp3;
    SomeType4 op4Result = await someAsyncOp4;

    await immediately returns a result if a task is already resolved or waits till an asynchronous operation is completed, so the code will take the same amount of time as if Task.WhenAll was used.

    如果任务已经解决或等待异步操作完成,则await立即返回结果,因此代码将花费与使用Task.WhenAll相同的时间。

    That fact allows us to significantly simplify the code and get rid of the task accumulator:

    这个事实使我们可以大大简化代码并摆脱任务累加器:

    static class TaskAllExtensions
    {
        public static ParallelTaskWrapper<TRes> SelectMany<TCur, TNext, TRes>(
            this ParallelTaskWrapper<TCur> source, 
            Func<TCur, ParallelTaskWrapper<TNext>> exec, 
            Func<TCur, TNext, TRes> mapper)
        {
            async Task<TRes> GetResult()
            {
                var nextTask = exec(default(TCur));//<--Important!
                return mapper(await source.Task, await nextTask);
            }
            return new ParallelTaskWrapper<TRes>(GetResult());
        }
    
        public static ParallelTaskWrapper<TRes> SelectMany<TCur, TNext, TRes>(
            this ParallelTaskWrapper<TCur> source, 
            Func<TCur, SequentialTaskWrapper<TNext>> exec, 
            Func<TCur, TNext, TRes> mapper)
        {
            async Task<TRes> GetResult()
            {
                return mapper(await source, await exec(await source).Task);
            }
            return new ParallelTaskWrapper<TRes>(GetResult());
        }
    
        public static TaskAwaiter<T> GetAwaiter<T>(
            this ParallelTaskWrapper<T> source)
            => 
            source.Task.GetAwaiter();
    }

    That is it.

    这就对了。

    All the code can be found on GitHub

    所有代码都可以在GitHub上找到

    ...

    ...

    Developers who familiar with functional programing languages might notice that the approach described above resembles “Monad” design pattern. It is no surprise since C# query notation is a kind of equivalent of “do” notation in Haskell which, in turn, is a “syntax sugar” for working with monads. If you are not familiar what that design pattern yet then, I hope, this demonstration will encourage you to get familiar with monads and functional programming.

    熟悉功能性编程语言的开发人员可能会注意到,上述方法类似于“ Monad”设计模式。 毫不奇怪,因为C#查询符号在Haskell中与“ do”符号等效,而后者又是用于处理monad的“语法糖”。 我希望,如果您还不熟悉该设计模式,那将鼓励您熟悉monad和函数式编程。

    翻译自: https://habr.com/en/post/349352/

    c#并行队列任务

    展开全文
    cullen2012 2020-09-06 09:49:31
  • 并行编程基本概念并行编程的实现方式任务并行库(TPL)及其分类Parallel类及其帮助器类Parallel类提供的并行方法:Parallel帮助器类Parallel帮助器类用于线程全局变量的设计结构数据并行利用Parallel.ForEach方法...

    基本概念

    并行编程的实现方式

    从业务实现的角度看并行策略
    并行编程模型分为数据并行与任务并行。
    从硬件实现的角度看并行策略
    并行又分为单机多核并行和多机多核并行。

    任务并行库(TPL)及其分类

    1.任务并行库(TPL)基于任务的并行编程模型,主要借助System.Threading.Tasks.Parallel类实现。
    2.TPL的核心是Parallel类和PLINQ,编写并行程序的首选方案。
    3.TPL的分类:
    数据并行
    ~对源集合或者数组中的元素同时执行相同操作。
    ~借助Parallel类的For或Foreach方法来实现。
    任务并行
    ~借助Parallel类提供的静态方法Invoke实现任务并行。
    并行查询
    ~并行实现LINQ to Objects查询,即PLINQ。
    4.TPL与传统多线程编程模型相比的优势
    (1)TPL编程模型使用CLR线程池执行多个任务,并能自动处理工作分区、线程调度和取消、状态管理以及其他低级别的细节操作。
    (2)TPL还会动态地按比例调节并发程度,从而最有效地使用所有可用的处理器。
    (3)TPL比Thread更具智能性,当它通过试探法来预判任务集不会从并行运行中获得性能优势时,还会自动选择按顺序运行。
    5.并行编程建议的做法:
    并非所有的任务都适合并行。
    这里特别强调的是,不论是数据并行、任务并行还是并行查询,在实际项目中都不应该在并行循环的内部频繁地和界面交互,这是因为频繁地调用共享资源(如界面控件、控制台或文件系统)会大幅降低并行循环的性能。

    Parallel类及其帮助器类

    Parallel类提供的并行方法:

    Parallel.For方法用于并行执行for循环。
    Parallel.Foreach方法用于并行执行foreach循环。
    Parallel.Invoke方法用于任务并行。

    Parallel帮助器类

    ParallelOptions类:为并行方法提供操作选项。
    ~~CancellationToken:获取或设置取消标志
    ~~TaskScheduler:默认值为null

    Parallel帮助器类

    ParallelLoopState类:将Parallel循环的迭代与其他迭代交互。
    ~~Break方法:告知Parallel循环尽早停止执行当前迭代之外的迭代。
    ~~Stop方法:告知Parallel循环尽早停止执行。
    ParallelLoopResult:提供Parallel循环的完成状态。
    ~~IsCompleted:获取该循环是否已经完成。

    用于线程全局变量的设计结构

    基本概念:线程全局变量,线程局部变量
    全局变量同步和冲突的解决:用volatile修饰变量,或者使用原子操作(Interlocked类提供的静态方法)。
    并发集合类:常用的并发集合类有ConcurrentBag。
    从.NET框架4.0开始,在System.Collections.Concurrent命名空间下,增加了用于多线程协同的并发集合类,这些并发集合类自动解决了可能会导致的各种冲突问题,不需要开发人员再使用“锁”去处理。

    数据并行

    1.Parallel.For方法
    Parallel.For方法用于并行执行for循环。静态的For方法有12种重载形式(6种32位重载,6种64位重载)
    For(Int32, Int32, Action)
    For(Int32, Int32, ParallelOptions, Action)
    For(Int32, Int32, Action<Int32, ParallelLoopState>)
    For(Int32, Int32, ParallelOptions, Action<Int32, ParallelLoopState>)
    For(Int32, Int32, Func, Func<Int32, ParallelLoopState, TLocal, TLocal>, Action)
    For(Int32, Int32, ParallelOptions, Func, Func<Int32, ParallelLoopState, TLocal, TLocal>, Action)
    一般形式如下:
    Parallel.For(<开始索引>,<结束索引>,<每次迭代执行的委托>)
    2.带并行选项的Parallel.For循环
    For(Int32, Int32, ParallelOptions, Action)

    其完整的语法如下:
    public static ParallelLoopResult For(
    int fromInclusive, //开始索引(包含)
    int toExclusive, //结束索引(不包含)
    ParallelOptions parallelOptions, //并行选项
    Action body //每个迭代调用的委托
    )
    3.带并行循环状态的Parallel.For循环
    For(Int32, Int32, Action<Int32,ParallelLoopState>)

    其完整的语法如下:
    public static ParallelLoopResult For(
    int fromInclusive, //开始索引(包含)
    int toExclusive, //结束索引(不包含)
    Action<int, ParallelLoopState> body //每个迭代调用的委托
    )
    4.带线程局部变量的Parallel.For循环
    线程局部变量是指某个线程内的局部变量,其他线程无法访问。线程局部变量保存的数据称为线程本地数据。

    public static ParallelLoopResult For(
    int fromInclusive, //开始索引(包含)
    int toExclusive, //结束索引(不包含)
    Func localInit, //返回每个任务初始化的状态
    Func<int, ParallelLoopState, TLocal, TLocal> body, //每个迭代调用一次
    Action localFinally //对每个任务执行一个最终操作
    )

    利用Parallel.ForEach方法实现数据并行

    1.简单的Parallel.ForEach循环
    ForEach(IEnumerable, Action)
    2.按范围分区加快小型循环体速度

    任务并行

    任务并行是指同时运行一个或多个独立的任务,而且并行的任务都是异步执行的。
    6.4.1 Parallel.Invoke方法
    Parallel.Invoke方法用于任务并行。重载形式有:
    ~~public static void Invoke(Action[] actions )
    ~~public static void Invoke(ParallelOptions parallelOptions, Action[] actions )
    这两种方式都是尽可能并行执行提供的操作,采用第二种重载形式还可以取消操作。

    展开全文
    qq_44749053 2020-04-18 20:48:28
  • 50MB cym1504255 2017-10-13 19:11:15
  • 5星
    109KB wangzhiyu1980 2015-04-21 09:27:50
  • 3星
    8.23MB dstang2000 2014-09-24 16:37:19
  • 5星
    28.7MB tjoy2005 2013-10-11 13:47:06
  • 5星
    26.02MB tjoy2005 2013-10-11 13:44:49
  • c# 并行执行两个异步方法 并发 (Concurrency) Have you ever wanted to call multiple API endpoints at the same time and be notified when all have finished? Maybe you want to batch asynchronous operations ...

    c# 并行执行两个异步方法

    并发 (Concurrency)

    Have you ever wanted to call multiple API endpoints at the same time and be notified when all have finished? Maybe you want to batch asynchronous operations and know when the all of the operations in the batch have finished.

    您是否曾经想过同时调用多个API端点,并在所有端点完成后收到通知? 也许您想批处理异步操作,并知道批处理中的所有操作何时完成。

    There are many reasons you might want to do multiple asyncronous operations in parrallel. In this tutorial we’ll be looking at performing in parallel multiple URL requests as a single operation.

    您可能有很多原因想要并行执行多个异步操作。 在本教程中,我们将以单个操作的形式并行执行多个URL请求。

    In this post we’ll learn how to use DispatchGroup to be notified when multiple concurrent asynchronous operations have finished. I assume you are familiar with the basics of Swift and Grand Central Dispatch.

    在本文中,我们将学习如何使用DispatchGroup在多个并发异步操作完成时得到通知。 我认为您熟悉Swift和Grand Central Dispatch的基础知识。

    I have used Swift 5.2.4 and Xcode 11.5 for this post.

    我在这篇文章中使用了Swift 5.2.4和Xcode 11.5。

    设置上下文 (Setting the context)

    Let’s say you have a point of sale (POS) app. In this app the user is able to create a product to sell in the store. The user can also provide a CSV file with the products to create.

    假设您有一个销售点(POS)应用程序。 在此应用程序中,用户可以创建要在商店中销售的产品。 用户还可以提供带有要创建的产品的CSV文件。

    When the user creates a product through the app interface the app calls the POS backend API endpoint POST /product to create a single product.

    当用户通过应用程序界面创建产品时,应用程序将调用POS后端API端点POST /product来创建单个产品。

    Image for post

    However when the user decides to create multiple products we need to call the endpoint for each product listed in the CSV file.

    但是,当用户决定创建多个产品时,我们需要为CSV文件中列出的每个产品调用终结点。

    Image for post

    So how can we easily call the POST /product endpoint for each product on the CSV and get notified on completion of them all? The answer is using DispatchGroup!

    那么,我们如何轻松地为CSV上的每个产品调用POST /product端点,并在完成所有操作时得到通知? 答案是使用DispatchGroup

    Dispatch group allows you to group multiple asynchronous operations and be notified when all of them have completed.

    调度组使您可以对多个异步操作进行分组,并在所有异步操作完成时得到通知。

    如何使用调度组执行并行API请求 (How to perform parallel API requests using Dispatch Group)

    In this section we’ll implement the parallel API request feature in the already existing POS app. The POS app and server will be provided.

    在本节中,我们将在现有的POS应用程序中实现并行API请求功能。 将提供POS应用程序和服务器。

    Here is the outline for this section:

    这是本节的概述:

    1. Retrieve starter project

      检索启动项目
    2. Run server

      运行服务器
    3. Perform parallel API requests

      执行并行API请求
    4. Use DispatchGroup to get notified upon API requests completion

      使用DispatchGroup在API请求完成时获得通知

    1.检索启动项目 (1. Retrieve starter project)

    Let’s start by downloading the starter pack. Open terminal and execute the following commands:

    让我们从下载入门包开始。 打开终端并执行以下命令:

    cd $HOME
    curl https://github.com/anuragajwani/dispatch-group-demo/archive/starter.zip -o dispatch_group_demo.zip -L -s
    unzip -q dispatch_group_demo.zip

    2.运行服务器 (2. Run server)

    The starter project contains a server to run that will become the API which our app will communicate with and request to when creating products.

    入门项目包含一个要运行的服务器,它将成为我们的应用在创建产品时与之通信并向其请求的API。

    To run the server execute the following commands:

    要运行服务器,请执行以下命令:

    cd ~/dispatch-group-demo-starter/POSBackEnd
    swift build
    ./.build/debug/POSBackEnd

    When running the commands for the first time it can take a while to run the server. The first time the command will fetch the dependencies required to run the server.

    首次运行命令时,可能需要一段时间才能运行服务器。 该命令第一次获取运行服务器所需的依赖项。

    Image for post

    For this project I have attached a Swift server that will act as out Products API. As it will run in our respective machines we will be easily be able to communicate with it using a simulator. However please note this won’t easily work with iOS devices.

    对于这个项目,我已附加了一个Swift服务器,它将充当Products API。 由于它将在我们各自的机器上运行,因此我们将能够轻松地使用模拟器与之通信。 但是请注意,这不适用于iOS设备。

    Also note that closing this terminal window will terminate the running server. Make sure the server is running at all times during this post.

    另请注意,关闭此终端窗口将终止正在运行的服务器。 在此期间,请确保服务器始终在运行。

    3.执行并行API请求 (3. Perform parallel API requests)

    The starter project also contains an iOS app. The app is already capable of creating a single product.

    入门项目还包含一个iOS应用程序。 该应用程序已经能够创建一个产品。

    In this section we’ll fill out uploadProductsCSV and add the code to create multiple products in parallel.

    在本部分中,我们将填写uploadProductsCSV并添加代码以并行创建多个产品。

    First let’s open the app project. In a new terminal window execute the following command:

    首先,让我们打开应用程序项目。 在新的终端窗口中,执行以下命令:

    open -a Xcode ~/dispatch-group-demo-starter/POSDispatchGroupDemo/POSDispatchGroupDemo.xcodeproj

    Next we’ll read file containing products in a CSV format. The file is included in the app. We’ll read the products.csv file and convert the products from CSV into Product on the function executed when the user taps Create products from products.csv button; theuploadProductsCSV function in ViewController.swift. I have already included a convenience function named getProductsFromCSV to do so. Thus we only need to call the function and store the products into a new variable.

    接下来,我们将读取包含CSV格式产品的文件。 该文件包含在应用程序中。 当用户点击“ Create products from products.csv按钮时执行的功能上,我们将读取products.csv文件并将产品从CSV转换为ProductViewController.swiftuploadProductsCSV函数。 我已经包含了一个名为getProductsFromCSV的便捷函数。 因此,我们只需要调用函数并将产品存储到新变量中即可。

    Open ViewController.swift and add the following line of code to uploadProductsCSV:

    打开ViewController.swift并将以下代码行添加到uploadProductsCSV

    let products = self.getProductsFromCSV()

    Next we’ll take each product within the products and create POST /product request. I have already created a convenience function that does the request. We will only be required to call it. Add the following lines of code:

    接下来,我们将采用产品中的每个products并创建POST /product请求。 我已经创建了一个方便的功能来执行请求。 我们只需要调用它。 添加以下代码行:

    products.forEach({ product in
    self
    .createProduct(product, onCompletion: { _ in
    // TODO handle completed product creation
    })
    })

    Note for this post we are ignoring any request errors. The CSV contains valid products and the server is running locally so there shouldn’t be any error when communicating with the server.

    请注意,此帖子我们将忽略任何请求错误。 CSV包含有效产品,并且服务器在本地运行,因此与服务器通信时不应有任何错误。

    You should be able to run the app and create all the products on the products.csv. However we don’t have any mechanism yet to notify us when all the API requests have completed.

    您应该能够运行该应用程序并在products.csv上创建所有产品。 但是,在所有API请求完成后,我们还没有任何机制可以通知我们。

    4.使用DispatchGroup在API请求完成时获得通知 (4. Use DispatchGroup to get notified upon API requests completion)

    Next we’ll use DispatchGroup to get notified when all API requests have completed.

    接下来,我们将使用DispatchGroup在所有API请求完成后获得通知。

    First let’s create an instance of DispatchGroup at the beginning of the function. Add the following line of code:

    首先,让我们在函数开始处创建DispatchGroup的实例。 添加以下代码行:

    let dispatchGroup = DispatchGroup()

    Next before each createProduct call we need to add add dispatchGroup.enter(). On each completion of createProduct we must call dispatchGroup.leave(). The uploadProductsCSV function should look like:

    接下来,在每个createProduct调用之前,我们需要添加add dispatchGroup.enter() 。 在createProduct每次完成时,我们都必须调用dispatchGroup.leave()uploadProductsCSV函数应如下所示:

    @IBAction func uploadProductsCSV(_ sender: Any) {
    let dispatchGroup = DispatchGroup()
    let products = self.getProductsFromCSV()
    products.forEach({ product in
    dispatchGroup.enter()
    self.createProduct(product, onCompletion: { _ in
    dispatchGroup.leave()
    })
    })
    }

    dispatchGroup.enter() notifies the dispatch group that a new asynchronous call is made. dispatchGroup.leave() notifies the dispatch group that an asynchronous call has completed.

    dispatchGroup.enter()通知调度组已进行新的异步调用。 dispatchGroup.leave()通知调度组异步调用已完成。

    Next we’ll register a closure with the dispatch group that will be executed when all asynchronous operations have completed. At the end of uploadProductsCSV add the following line of code:

    接下来,我们将在调度组中注册一个闭包,当所有异步操作完成时将执行该闭包。 在uploadProductsCSV的末尾添加以下代码行:

    dispatchGroup.notify(queue: .main, execute: { self.showProductsCreatedAlert() })

    It is important that notify is called after we tell dispatch group about all the asynchronous calls are made. If we register the closure before we notify the dispatch group about the asynchronous calls to be made then the closure will be called immediately. Why? Dispatch group calls the closure when the counter of currently executing asynchronous calls reaches 0. If we register the closure before we tell dispatch group about the asynchronous calls made the counter will effectively be at 0 and the dispatch group will execute the notify closure.

    重要的是, 我们通知调度组所有异步调用完成之后 ,调用notify 。 如果在通知派发组有关异步调用之前注册了闭包,那么将立即调用闭包。 为什么? 当当前正在执行的异步调用的计数器达到0时,调度组将调用闭包。如果在向调度组通知异步调用之前注册闭包,则计数器实际上将为0,而调度组将执行通知闭包。

    For each product sent to the server the counter will increase by 1 (when we call dispatchGroup.enter()). The counter will decrease by 1 when we finish sending a product (when we call dispatchGroup.leave()). When the counter reaches 0 the dispatch group will execute the closure registered (the one we provide to dispatchGroup.notify(queue:_, execute:_)).

    对于发送到服务器的每个产品,计数器将增加1(当我们调用dispatchGroup.enter() )。 当我们完成发送产品时(当我们调用dispatchGroup.leave() ),计数器将减少1。 当计数器达到0时,调度组将执行已注册的闭包(我们提供给dispatchGroup.notify(queue:_, execute:_)的闭包)。

    And that’s it! Run the POSDispatchGroupDemo app on a simulator and tap on the Create products from products.csv button and see it in action!

    就是这样! 在模拟器上运行POSDispatchGroupDemo应用程序,然后点击“ Create products from products.csv按钮,然后查看运行情况!

    Image for post

    There are 5 products in the CSV file. Those are the iPhones on sale at the time of writing on the Apple website and their UK prices. You can see the products being created on the terminal window running the server.

    CSV文件中有5个产品。 这些是在Apple网站上撰写本文时正在销售的iPhone及其在英国的价格。 您可以在运行服务器的终端窗口上看到正在创建的产品。

    摘要 (Summary)

    In this post we have learnt to perform parallel asynchronous and get notified upon completion of all asynchronous operations.

    在本文中,我们学习了执行并行异步,并在所有异步操作完成后得到通知。

    You can find the full source code in the repo below:

    您可以在下面的仓库中找到完整的源代码:

    最后说明 (Final Notes)

    DispatchGroup is a great simple tool to use when you want to quickly batch multiple asynchronous operations. However currently DispatchGroup doesn’t offer any mechanism out of the box to notify us when one operation has completed and how many are left.

    当您要快速批处理多个异步操作时,DispatchGroup是一个非常简单的工具。 但是,当前DispatchGroup尚不提供任何开箱即用的机制来通知我们何时完成一个操作以及还有多少操作。

    There are also many other ways of grouping asynchronous operation and getting notified upon the completion of each individual operation as well the whole. One example of such solution is using reactive programming framework.

    还有许多其他方式可以将异步操作分组并在每个单独操作以及整个操作完成时得到通知。 这种解决方案的一个示例是使用React式编程框架

    Stay tuned for more posts on iOS development! Follow me on Twitter or Medium!

    请继续关注有关iOS开发的更多帖子! 在TwitterMedium上关注我!

    翻译自: https://medium.com/@anuragajwani/how-to-perform-parallel-asynchronous-operations-with-dispatchgroup-3c112deda62c

    c# 并行执行两个异步方法

    展开全文
    weixin_26638123 2020-07-24 18:23:14
  • 55.55MB qq_26954773 2016-10-16 18:20:38
  • c# 并行计算(大量循环处理的场景下) 并行计算部分 沿用微软的写法,System.Threading.Tasks.::.Parallel类,提供对并行循环和区域的支持。 我们会用到的方法有For,ForEach,Invoke。 一、简单使用 首先...

    c# 并行计算(大量循环处理的场景下)

          

    并行计算部分

            沿用微软的写法,System.Threading.Tasks.::.Parallel类,提供对并行循环和区域的支持。 我们会用到的方法有For,ForEach,Invoke。

    一、简单使用

            首先我们初始化一个List用于循环,这里我们循环10次。(后面的代码都会按这个标准进行循环)

    Code

    1.             Program.Data = new List<int>();
    2.             for (int i = 0; i < 10; i++)
    3.             {
    4.                 Data.Add(i);
    5.             }

            下面我们定义4个方法,分别为for,foreach,并行For,并行ForEach。并测试他们的运行时长。

    Code

    1.         /// <summary>
    2.         /// 是否显示执行过程
    3.         /// </summary>
    4.         public bool ShowProcessExecution = false;
    5.  
    6.         /// <summary>
    7.         /// 这是普通循环for
    8.         /// </summary>
    9.         private void Demo1()
    10.         {
    11.             List<int> data = Program.Data;
    12.             DateTime dt1 = DateTime.Now;
    13.             for (int i = 0; i < data.Count; i++)
    14.             {
    15.                 Thread.Sleep(500);
    16.                 if (ShowProcessExecution)
    17.                     Console.WriteLine(data[i]);
    18.             }
    19.             DateTime dt2 = DateTime.Now;
    20.             Console.WriteLine("普通循环For运行时长:{0}毫秒。", (dt2 - dt1).TotalMilliseconds);
    21.         }
    22.  
    23.         /// <summary>
    24.         /// 这是普通循环foreach
    25.         /// </summary>
    26.         private void Demo2()
    27.         {
    28.             List<int> data = Program.Data;
    29.             DateTime dt1 = DateTime.Now;
    30.             foreach (var i in data)
    31.             {
    32.                 Thread.Sleep(500);
    33.                 if (ShowProcessExecution)
    34.                     Console.WriteLine(i);
    35.             }
    36.             DateTime dt2 = DateTime.Now;
    37.             Console.WriteLine("普通循环For运行时长:{0}毫秒。", (dt2 - dt1).TotalMilliseconds);
    38.         }
    39.  
    40.         /// <summary>
    41.         /// 这是并行计算For
    42.         /// </summary>
    43.         private void Demo3()
    44.         {
    45.             List<int> data = Program.Data;
    46.             DateTime dt1 = DateTime.Now;
    47.             Parallel.For(0, data.Count, (i) =>
    48.             {
    49.                 Thread.Sleep(500);
    50.                 if (ShowProcessExecution)
    51.                     Console.WriteLine(data[i]);
    52.             });
    53.             DateTime dt2 = DateTime.Now;
    54.             Console.WriteLine("并行运算For运行时长:{0}毫秒。", (dt2 - dt1).TotalMilliseconds);
    55.         }
    56.  
    57.         /// <summary>
    58.         /// 这是并行计算ForEach
    59.         /// </summary>
    60.         private void Demo4()
    61.         {
    62.             List<int> data = Program.Data;
    63.             DateTime dt1 = DateTime.Now;
    64.             Parallel.ForEach(data, (i) =>
    65.             {
    66.                 Thread.Sleep(500);
    67.                 if (ShowProcessExecution)
    68.                     Console.WriteLine(i);
    69.             });
    70.             DateTime dt2 = DateTime.Now;
    71.             Console.WriteLine("并行运算ForEach运行时长:{0}毫秒。", (dt2 - dt1).TotalMilliseconds);
    72.         }

    下面是运行结果:

    image

    这里我们可以看出并行循环在执行效率上的优势了。

    结论1:在对一个数组内的每一个项做单独处理时,完全可以选择并行循环的方式来提升执行效率。

    原理1:并行计算的线程开启是缓步开启的,线程数量1,2,4,8缓步提升。(不详,PLinq最多64个线程,可能这也是64)

      

      

    二、 并行循环的中断和跳出

            当在进行循环时,偶尔会需要中断循环或跳出循环。下面是两种跳出循环的方法Stop和Break,LoopState是循环状态的参数。

    Code

    1.         /// <summary>
    2.         /// 中断Stop
    3.         /// </summary>
    4.         private void Demo5()
    5.         {
    6.             List<int> data = Program.Data;
    7.             Parallel.For(0, data.Count, (i, LoopState) =>
    8.             {
    9.                 if (data[i] > 5)
    10.                     LoopState.Stop();
    11.                 Thread.Sleep(500);
    12.                 Console.WriteLine(data[i]);
    13.             });
    14.             Console.WriteLine("Stop执行结束。");
    15.         }
    16.  
    17.         /// <summary>
    18.         /// 中断Break
    19.         /// </summary>
    20.         private void Demo6()
    21.         {
    22.             List<int> data = Program.Data;
    23.             Parallel.ForEach(data, (i, LoopState) =>
    24.             {
    25.                 if (i > 5)
    26.                     LoopState.Break();
    27.                 Thread.Sleep(500);
    28.                 Console.WriteLine(i);
    29.             });
    30.             Console.WriteLine("Break执行结束。");
    31.         }

            执行结果如下:

    image

    结论2:使用Stop会立即停止循环,使用Break会执行完毕所有符合条件的项。

      

      

    三、并行循环中为数组/集合添加项

            上面的应用场景其实并不是非常多见,毕竟只是为了遍历一个数组内的资源,我们更多的时候是为了遍历资源,找到我们所需要的。那么请继续看。

    下面是我们一般会想到的写法:

    Code

    1.         private void Demo7()
    2.         {
    3.             List<int> data = new List<int>();
    4.             Parallel.For(0, Program.Data.Count, (i) =>
    5.             {
    6.                 if (Program.Data[i] % 2 == 0)
    7.                     data.Add(Program.Data[i]);
    8.             });
    9.             Console.WriteLine("执行完成For.");
    10.         }
    11.  
    12.         private void Demo8()
    13.         {
    14.             List<int> data = new List<int>();
    15.             Parallel.ForEach(Program.Data, (i) =>
    16.             {
    17.                 if (Program.Data[i] % 2 == 0)
    18.                     data.Add(Program.Data[i]);
    19.             });
    20.             Console.WriteLine("执行完成ForEach.");
    21.         }

    看起来应该是没有问题的,但是我们多次运行后会发现,偶尔会出现错误如下:

    image

    这是因为List是非线程安全的类,我们需要使用System.Collections.Concurrent命名空间下的类型来用于并行循环体内。

    说明
    BlockingCollection<T>为实现 IProducerConsumerCollection<T> 的线程安全集合提供阻止和限制功能。
    ConcurrentBag<T>表示对象的线程安全的无序集合。
    ConcurrentDictionary<TKey, TValue>表示可由多个线程同时访问的键值对的线程安全集合。
    ConcurrentQueue<T>表示线程安全的先进先出 (FIFO) 集合。
    ConcurrentStack<T>表示线程安全的后进先出 (LIFO) 集合。
    OrderablePartitioner<TSource>表示将一个可排序数据源拆分成多个分区的特定方式。
    Partitioner提供针对数组、列表和可枚举项的常见分区策略。
    Partitioner<TSource>表示将一个数据源拆分成多个分区的特定方式。

    公共类

    那么我们上面的代码可以修改为,加了了ConcurrentQueue和ConcurrentStack的最基本的操作。

    Code

    1.         /// <summary>
    2.         /// 并行循环操作集合类,集合内只取5个对象
    3.         /// </summary>
    4.         private void Demo7()
    5.         {
    6.             ConcurrentQueue<int> data = new ConcurrentQueue<int>();
    7.             Parallel.For(0, Program.Data.Count, (i) =>
    8.             {
    9.                 if (Program.Data[i] % 2 == 0)
    10.                     data.Enqueue(Program.Data[i]);//将对象加入到队列末尾
    11.             });
    12.             int R;
    13.             while (data.TryDequeue(out R))//返回队列中开始处的对象
    14.             {
    15.                 Console.WriteLine(R);
    16.             }
    17.  
    18.             Console.WriteLine("执行完成For.");
    19.         }
    20.  
    21.         /// <summary>
    22.         /// 并行循环操作集合类
    23.         /// </summary>
    24.         private void Demo8()
    25.         {
    26.             ConcurrentStack<int> data = new ConcurrentStack<int>();
    27.             Parallel.ForEach(Program.Data, (i) =>
    28.             {
    29.                 if (Program.Data[i] % 2 == 0)
    30.                     data.Push(Program.Data[i]);//将对象压入栈中
    31.             });
    32.             int R;
    33.             while (data.TryPop(out R))//弹出栈顶对象
    34.             {
    35.                 Console.WriteLine(R);
    36.             }
    37.  
    38.             Console.WriteLine("执行完成ForEach.");
    39.         }

    ok,这里返回一个序列的问题也解决了。

    结论3:在并行循环内重复操作的对象,必须要是thread-safe(线程安全)的。集合类的线程安全对象全部在System.Collections.Concurrent命名空间下。

      

      

    四、返回集合运算结果/含有局部变量的并行循环

            使用循环的时候经常也会用到迭代,那么在并行循环中叫做 含有局部变量的循环 。下面的代码中详细的解释,这里就不啰嗦了。

    Code

    1.         /// <summary>
    2.         /// 具有线程局部变量的For循环
    3.         /// </summary>
    4.         private void Demo9()
    5.         {
    6.             List<int> data = Program.Data;
    7.             long total = 0;
    8.  
    9.             //这里定义返回值为long类型方便下面各个参数的解释
    10.             Parallel.For<long>(0,           // For循环的起点
    11.                 data.Count,                 // For循环的终点
    12.                 () => 0,                    // 初始化局部变量的方法(long),既为下面的subtotal的初值
    13.                 (i, LoopState, subtotal) => // 为每个迭代调用一次的委托,i是当前索引,LoopState是循环状态,subtotal为局部变量名
    14.                 {
    15.                     subtotal += data[i];    // 修改局部变量
    16.                     return subtotal;        // 传递参数给下一个迭代
    17.                 },
    18.                 (finalResult) => Interlocked.Add(ref total, finalResult) //对每个线程结果执行的最后操作,这里是将所有的结果相加
    19.                 );
    20.             Console.WriteLine(total);
    21.         }
    22.  
    23.         /// <summary>
    24.         /// 具有线程局部变量的ForEach循环
    25.         /// </summary>
    26.         private void Demo10()
    27.         {
    28.             List<int> data = Program.Data;
    29.             long total = 0;
    30.  
    31.             Parallel.ForEach<int, long>(data, // 要循环的集合对象
    32.                 () => 0,                      // 初始化局部变量的方法(long),既为下面的subtotal的初值
    33.                 (i, LoopState, subtotal) =>   // 为每个迭代调用一次的委托,i是当前元素,LoopState是循环状态,subtotal为局部变量名
    34.                 {
    35.                     subtotal += i;            // 修改局部变量
    36.                     return subtotal;          // 传递参数给下一个迭代
    37.                 },
    38.                 (finalResult) => Interlocked.Add(ref total, finalResult) //对每个线程结果执行的最后操作,这里是将所有的结果相加
    39.                 );
    40.  
    41.             Console.WriteLine(total);
    42.         }

    结论4:并行循环中的迭代,确实很伤人。代码太难理解了。

     

     

    五、PLinq(Linq的并行计算)

               上面介绍完了For和ForEach的并行计算盛宴,微软也没忘记在Linq中加入并行计算。下面介绍Linq中的并行计算。

    4.0中在System.Linq命名空间下加入了下面几个新的类:

    说明
    ParallelEnumerable提供一组用于查询实现 ParallelQuery{TSource} 的对象的方法。这是 Enumerable 的并行等效项。
    ParallelQuery表示并行序列。
    ParallelQuery<TSource>表示并行序列。

    原理2:PLinq最多会开启64个线程

    原理3:PLinq会自己判断是否可以进行并行计算,如果不行则会以顺序模式运行。

    原理4:PLinq会在昂贵的并行算法或成本较低的顺序算法之间进行选择,默认情况下它选择顺序算法。

      

    在ParallelEnumerable中提供的并行化的方法

    ParallelEnumerable 运算符说明
    AsParallel()PLINQ 的入口点。指定如果可能,应并行化查询的其余部分。
    AsSequential()指定查询的其余部分应像非并行 LINQ 查询一样按顺序运行。
    AsOrdered()指定 PLINQ 应保留查询的其余部分的源序列排序,直到例如通过使用 orderby 子句更改排序为止。
    AsUnordered()指定查询的其余部分的 PLINQ 不需要保留源序列的排序。
    WithCancellation()指定 PLINQ 应定期监视请求取消时提供的取消标记和取消执行的状态。
    WithDegreeOfParallelism()指定 PLINQ 应当用来并行化查询的处理器的最大数目。
    WithMergeOptions()提供有关 PLINQ 应当如何(如果可能)将并行结果合并回到使用线程上的一个序列的提示。
    WithExecutionMode()指定 PLINQ 应当如何并行化查询(即使默认行为是按顺序运行查询)。
    ForAll()多线程枚举方法,与循环访问查询结果不同,它允许在不首先合并回到使用者线程的情况下并行处理结果。
    Aggregate() 重载对于 PLINQ 唯一的重载,它启用对线程本地分区的中间聚合以及一个用于合并所有分区结果的最终聚合函数。

    下面是PLinq的简单代码

    Code

    1.         /// <summary>
    2.         /// PLinq简介
    3.         /// </summary>
    4.         private void Demo11()
    5.         {
    6.             var source = Enumerable.Range(1, 10000);
    7.  
    8.             //查询结果按source中的顺序排序
    9.             var evenNums = from num in source.AsParallel().AsOrdered()
    10.                        where num % 2 == 0
    11.                        select num;
    12.  
    13.             //ForAll的使用
    14.             ConcurrentBag<int> concurrentBag = new ConcurrentBag<int>();
    15.             var query = from num in source.AsParallel()
    16.                         where num % 10 == 0
    17.                         select num;
    18.             query.ForAll((e) => concurrentBag.Add(e * e));
    19.         }

    上面代码中使用了ForAll,ForAll和foreach的区别如下:

    image

    展开全文
    cxu123321 2019-05-20 17:00:18
  • 951KB weixin_38616435 2021-02-23 23:28:48
  • 92.04MB weixin_40643087 2018-10-26 11:21:46
  • 21KB weixin_38518376 2020-09-04 16:30:23
  • 3星
    27KB liulonglong789 2012-07-22 08:49:13

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 33,854
精华内容 13,541
关键字:

c#并行

c# 订阅