8天玩转并行开辟,Net四线程编制程序

1、Parallel.Invoke 主要用于任务的并行
  这个函数的功能和Task有些相似,就是并发执行一系列任务,然后等待所有完成。和Task比起来,省略了Task.WaitAll这一步,自然也缺少了Task的相关管理功能。它有两种形式:
  Parallel.Invoke( params Action[] actions);
  Parallel.Invoke(Action[] actions,TaskManager manager,TaskCreationOptions options);

System.Threading.Tasks.Parallel类提供了Parallel.Invoke,Parallel.For,Parallel.ForEach这三个静态方法。

 

图片 1图片 2

1 Parallel.Invoke

      随着多核时代的到来,并行开发越来越展示出它的强大威力,像我们这样的码农再也不用过多的关注底层线程的实现和手工控制,

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var actions = new Action[]
            {
                () => ActionTest("test 1"),
                () => ActionTest("test 2"),
                () => ActionTest("test 3"),
                () => ActionTest("test 4")
            };

            Console.WriteLine("Parallel.Invoke 1 Test");
            Parallel.Invoke(actions);

            Console.WriteLine("结束!");
        }

        static void ActionTest(object value)
        {
            Console.WriteLine(">>> thread:{0}, value:{1}",
            Thread.CurrentThread.ManagedThreadId, value);
        }
    }
}

尽可能并行执行所提供的每个操作,除非用户取消了操作。

要了解并行开发,需要先了解下两个概念:“硬件线程”和“软件线程”。

Program

方法:

 

2、For方法,主要用于处理针对数组元素的并行操作(数据的并行

1)public static void Invoke(params Action[] actions);

  1. 硬件线程

图片 3图片 4

2)public static void Invoke(ParallelOptions parallelOptions,

    相信大家手头的电脑都是双核以上的,像我这样古董的电脑都是双核的,这样的双核叫做物理内核。

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            Parallel.For(0, nums.Length, (i) =>
            {
                Console.WriteLine("针对数组索引{0}对应的那个元素{1}的一些工作代码……ThreadId={2}", i, nums[i], Thread.CurrentThread.ManagedThreadId);
            });
            Console.ReadKey();
        }
    }
}

params Action[] actions);

图片 5

Program

参数:

 

3、Foreach方法,主要用于处理泛型集合元素的并行操作(数据的并行)

parallelOptions:一个对象,用于配置此操作的行为。

硬件线程又叫做逻辑内核,我们可以在”任务管理器“中查看”性能“标签页,如下图,我们知道有2个硬件线程。

图片 6图片 7

Actions:要执行的操作数组

 图片 8

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            Parallel.ForEach(nums, (item) =>
            {
                Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", item, Thread.CurrentThread.ManagedThreadId);
            });
            Console.ReadKey();
        }
    }
}

异常:

 

Program

对方法1:

一般情况下,一个物理内核对应一个逻辑内核,比如我这里的2对2。当然如果你的cpu采用的是超线程技术,那么可能就会有4个物理内核对应

  数据的并行的方式二(AsParallel()):

    System.ArgumentNullException: actions 参数为 null。

8个硬件线程,现在有很多服务器都有8个硬件线程,上午在公司的服务器上截了个图。

图片 9图片 10

    System.AggregateException:当 actions 数组中的任何操作引发异常时引发的异常。

图片 11

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            var evenNumbers = nums.AsParallel().Select(item => Calculate(item));
            //注意这里是个延迟加载,也就是不用集合的时候 这个Calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果;
            Console.WriteLine(evenNumbers.Count());
            //foreach (int item in evenNumbers)
            //    Console.WriteLine(item);
            Console.ReadKey();
        }

        static int Calculate(int number)
        {
            Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId);
            return number * 2;
        }
    }
}

System.ArgumentException:actions数组包含 null 个元素。

我们要知道并行开发要做的事情就是将任务分摊给这些硬件线程去并行执行来达到负载和加速。

Program

对方法2除上述异常外还包括:

 

  .AsOrdered() 对结果进行排序:

System.OperationCanceledException:parallelOptions 设置了System.Threading.CancellationToken。

  1. 软件线程

图片 12图片 13

System.ObjectDisposedException:在 parallelOptions 中与 System.Threading.CancellationToken 关联的System.Threading.CancellationTokenSource已被释放。

    相信这个大家最熟悉了,我们知道传统的代码都是串行的,就一个主线程,当我们为了实现加速而开了很多工作线程,这些工作线程

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp
{

    class Program
    {
        static void Main(string[] args)
        {
            List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
            var evenNumbers = nums.AsParallel().AsOrdered().Select(item => Calculate(item));
            //注意这里是个延迟加载,也就是不用集合的时候 这个Calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果;
            //Console.WriteLine(evenNumbers.Count());
            foreach (int item in evenNumbers)
                Console.WriteLine(item);
            Console.ReadKey();
        }

        static int Calculate(int number)
        {
            Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId);
            return number * 2;
        }
    }
}

说明:

也就是软件线程。

Program

1)Invoke方法只有在actions全部执行完才会返回,即使在执行过程中出现异常也会完成。

 

  ForEach的独到之处就是可以将数据进行分区,每一个小区内实现串行计算,分区采用Partitioner.Create实现。

2)不能保证actions中的所有操作同时执行。比如actions大小为4,但硬件线程数为2,那么同时运行的操作数最多为2。

好,我们知道了基本概念就ok了,在.net 4.0中,微软给我们提供了一个新的命名空间:System.Threading.Tasks。这里面有很多好玩

图片 14图片 15

3)actions中的操作并行的运行且与顺序无关,若编写与运行顺序有关的并发代码,应选择其他方法。

的东西,作为第一篇就介绍下最基础,最简单的Parallel的使用。

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            for (int j = 1; j < 4; j  )
            {
                ConcurrentBag<int>  bag = new ConcurrentBag<int>();
                var watch = Stopwatch.StartNew();
                watch.Start();
                Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
                {
                    for (int m = i.Item1; m < i.Item2; m  )
                    {
                        bag.Add(m);
                    }
                });
                Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
                GC.Collect();

            }
        }
    }
}

4)如果使用Invoke加载多个操作,多个操作运行时间迥异,总的运行时间以消耗时间最长操作为基准,这会导致很多逻辑内核长时间处于空闲状态。

 

Program

5)受限的并行可扩展性,这源于Invoke所调用的委托数目是固定的。

图片 16

  ParallelOptions类
  ParallelOptions options = new ParallelOptions();
  //指定使用的硬件线程数为4
  options.MaxDegreeOfParallelism = 4;
  有时候我们的线程可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核,正好ParallelOptions提供了MaxDegreeOfParallelism属性。

2 Parallel.For

 

图片 17图片 18

可能会并行运行迭代,可以监视和操作循环的状态。Parallel.For有多个重载的方法,下面列举部分方法。

一: Parallel的使用

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class Student
    {
        public int ID { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
        public DateTime CreateTime { get; set; }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var dic = LoadData();
            Stopwatch watch = new Stopwatch();
            watch.Start();
            var query2 = (from n in dic.Values.AsParallel()
                          where n.Age > 20 && n.Age < 25
                          select n).ToList();
            watch.Stop();
            Console.WriteLine("并行计算耗费时间:{0}", watch.ElapsedMilliseconds);

            Console.Read();
        }

        public static ConcurrentDictionary<int, Student> LoadData()
        {
            ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();
            ParallelOptions options = new ParallelOptions();
            //指定使用的硬件线程数为4
            options.MaxDegreeOfParallelism = 4;
            //预加载1500w条记录
            Parallel.For(0, 15000000, options, (i) =>
            {
                var single = new Student()
                {
                    ID = i,
                    Name = "hxc"   i,
                    Age = i % 151,
                    CreateTime = DateTime.Now.AddSeconds(i)
                };
                dic.TryAdd(i, single);
            });

            return dic;
        }
    }
}

方法:

在Parallel下面有三个常用的方法invoke,for和forEach。

Program

1)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body);

1:  Parallel.Invoke

常见问题的处理

2)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body);

    这是最简单,最简洁的将串行的代码并行化。

  <1> 如何中途退出并行循环?
  是的,在串行代码中我们break一下就搞定了,但是并行就不是这么简单了,不过没关系,在并行循环的委托参数中提供了一个ParallelLoopState,该实例提供了Break和Stop方法来帮我们实现。
  Break: 当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。

3)public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body);

 1 class Program
 2 {
 3     static void Main(string[] args)
 4     {
 5         var watch = Stopwatch.StartNew();
 6 
 7         watch.Start();
 8 
 9         Run1();
10 
11         Run2();
12 
13         Console.WriteLine("我是串行开发,总共耗时:{0}n", watch.ElapsedMilliseconds);
14 
15         watch.Restart();
16 
17         Parallel.Invoke(Run1, Run2);
18 
19         watch.Stop();
20 
21         Console.WriteLine("我是并行开发,总共耗时:{0}", watch.ElapsedMilliseconds);
22 
23         Console.Read();
24     }
25 
26     static void Run1()
27     {
28         Console.WriteLine("我是任务一,我跑了3s");
29         Thread.Sleep(3000);
30     }
31 
32     static void Run2()
33     {
34         Console.WriteLine("我是任务二,我跑了5s");
35         Thread.Sleep(5000);
36     }
37 }

  Stop:这个就不一样了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。

4)public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);

图片 19

图片 20图片 21

参数:

在这个例子中可以获取二点信息:

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            ConcurrentBag<int> bag = new ConcurrentBag<int>();

            Parallel.For(0, 20000000, (i, state) =>
            {
                if (bag.Count == 1000)
                {
                    //state.Break();
                    state.Stop();
                    return;
                }
                bag.Add(i);
            });

            Console.WriteLine("当前集合有{0}个元素。", bag.Count);

        }
    }
}

fromInclusive:开始索引(含)。

第一:一个任务是可以分解成多个任务,采用分而治之的思想。

Program

toExclusive:结束索引(不含)。

第二:尽可能的避免子任务之间的依赖性,因为子任务是并行执行,所以就没有谁一定在前,谁一定在后的规定了。

  取消(cancel)

body:将被每个迭代调用一次的委托。

 

图片 22图片 23

parallelOptions:一个对象,用于配置此操作的行为。

2:Parallel.for

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        public static void Main()
        {

            var cts = new CancellationTokenSource();
            var ct = cts.Token;
            Task.Factory.StartNew(() => fun(ct));
            Console.ReadKey();
            //Thread.Sleep(3000);
            cts.Cancel();
            Console.WriteLine("任务取消了!");

        }

        static void fun(CancellationToken token)
        {
            Parallel.For(0, 100000,
                        new ParallelOptions { CancellationToken = token },
                        (i) =>
                        {
                            Console.WriteLine("针对数组索引{0}的一些工作代码……ThreadId={1}", i, Thread.CurrentThread.ManagedThreadId);
                        });
        }
    }
}

localInit:一个委托,用于返回每个任务的本地数据的初始状态。

 我们知道串行代码中也有一个for,但是那个for并没有用到多核,而Paraller.for它会在底层根据硬件线程的运行状况来充分的使用所有的可

Program

localFinally:一个委托,用于对每个任务的本地状态执行一个最终操作。

利用的硬件线程,注意这里的Parallel.for的步行是1。

  <2> 并行计算中抛出异常怎么处理?
  首先任务是并行计算的,处理过程中可能会产生n多的异常,那么如何来获取到这些异常呢?普通的Exception并不能获取到异常,然而为并行诞生的AggregateExcepation就可以获取到一组异常。

返回结果:

这里我们来演示一下,向一个线程安全的集合插入数据,当然这个集合采用原子性来实现线程同步,比那些重量级的锁机制更加的节省消耗。

图片 24图片 25

ParallelLoopResult :包含有关已完成的循环部分的信息。

 1  class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             for (int j = 1; j < 4; j  )
 6             {
 7                 Console.WriteLine("n第{0}次比较", j);
 8 
 9                 ConcurrentBag<int> bag = new ConcurrentBag<int>();
10 
11                 var watch = Stopwatch.StartNew();
12 
13                 watch.Start();
14 
15                 for (int i = 0; i < 20000000; i  )
16                 {
17                     bag.Add(i);
18                 }
19 
20                 Console.WriteLine("串行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
21 
22                 GC.Collect();
23 
24                 bag = new ConcurrentBag<int>();
25 
26                 watch = Stopwatch.StartNew();
27 
28                 watch.Start();
29 
30                 Parallel.For(0, 20000000, i =>
31                 {
32                     bag.Add(i);
33                 });
34 
35                 Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
36 
37                 GC.Collect();
38 
39             }
40         }
41     }
using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                Parallel.Invoke(Run1, Run2);
            }
            catch (AggregateException ex)
            {
                foreach (var single in ex.InnerExceptions)
                {
                    Console.WriteLine(single.Message);
                }
            }
            Console.WriteLine("结束了!");
            //Console.Read();
        }

        static void Run1()
        {
            Thread.Sleep(3000);
            throw new Exception("我是任务1抛出的异常");
        }

        static void Run2()
        {
            Thread.Sleep(5000);
            throw new Exception("我是任务2抛出的异常");
        }
    }
}

异常:

图片 26

Program

System.ArgumentNullException:body 参数为 null,或 localInit 参数为 null,或 localFinally 参数为 null,或 parallelOptions 参数为 null。 System.AggregateException:包含在所有线程上引发的全部单个异常的异常。

 

  注意Parallel里面 不建议抛出异常 因为在极端的情况下比如进去的第一批线程先都抛异常了 此时AggregateExcepation就只能捕获到这一批的错误,然后程序就结束了

对于方法3)和4)除包含以上异常外还包括:

可以看的出,加速的效果还是比较明显的。

图片 27图片 28

System.OperationCanceledException:在 parallelOptions 设置了参数 System.Threading.CancellationToken。

 

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class TestClass
    {
        public static List<int> NumberList = null;
        private static readonly object locker = new object();
        public void Test(int Number)
        {
            throw new Exception("1111");
            //lock (locker)
            //{
            //    if (NumberList == null)
            //    {
            //        Console.WriteLine("执行添加");
            //        NumberList = new List<int>();
            //        NumberList.Add(1);
            //        //Thread.Sleep(1000);
            //    }
            //}
            //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number));
            //Console.WriteLine(Number);
        }
    }

    class Program
    {
        private static readonly object locker = new object();
        static void Main(string[] args)
        {
            List<string> errList = new List<string>();
            try
            {
                Parallel.For(0, 10, (i) =>
                {
                    try
                    {
                        TestClass a = new TestClass();
                        a.Test(i);
                    }
                    catch (Exception ex)
                    {
                        lock (locker)
                        {
                            errList.Add(ex.Message);
                            throw ex;
                        }
                    }
                });
            }
            catch (AggregateException ex)
            {
                foreach (var single in ex.InnerExceptions)
                {
                    Console.WriteLine(single.Message);
                }
            }
            int Index = 1;
            foreach (string err in errList)
            {
                Console.WriteLine("{0}、的错误:{1}", Index  , err);
            }
        }
    }
}

System.ObjectDisposedException:在 parallelOptions 中与 System.Threading.CancellationToken 关联的 System.Threading.CancellationTokenSource已被释放。

3:Parallel.forEach
    forEach的独到之处就是可以将数据进行分区,每一个小区内实现串行计算,分区采用Partitioner.Create实现。

Program

说明:

 class Program
    {
        static void Main(string[] args)
        {
            for (int j = 1; j < 4; j  )
            {
                Console.WriteLine("n第{0}次比较", j);

                ConcurrentBag<int> bag = new ConcurrentBag<int>();

                var watch = Stopwatch.StartNew();

                watch.Start();

                for (int i = 0; i < 3000000; i  )
                {
                    bag.Add(i);
                }

                Console.WriteLine("串行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);

                GC.Collect();

                bag = new ConcurrentBag<int>();

                watch = Stopwatch.StartNew();

                watch.Start();

                Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
                {
                    for (int m = i.Item1; m < i.Item2; m  )
                    {
                        bag.Add(m);
                    }
                });

                Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);

                GC.Collect();

            }
        }
    }

  可以向下面这样来处理一下
  不在AggregateExcepation中来处理 而是在Parallel里面的try catch来记录错误,或处理错误

1)不支持浮点和步进。

图片 29

图片 30图片 31

2)无法保证迭代的执行顺序。

这里还是要说一下:Partitioner.Create(0, 3000000)。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class TestClass
    {
        public static List<int> NumberList = null;
        private static readonly object locker = new object();
        public void Test(int Number)
        {
            throw new Exception("1111");
            //lock (locker)
            //{
            //    if (NumberList == null)
            //    {
            //        Console.WriteLine("执行添加");
            //        NumberList = new List<int>();
            //        NumberList.Add(1);
            //        //Thread.Sleep(1000);
            //    }
            //}
            //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number));
            //Console.WriteLine(Number);
        }
    }

    class Program
    {
        private static readonly object locker = new object();
        static void Main(string[] args)
        {
            List<string> errList = new List<string>();
            Parallel.For(0, 10, (i) =>
            {
                try
                {
                    TestClass a = new TestClass();
                    a.Test(i);
                }
                catch (Exception ex)
                {
                    lock (locker)
                    {
                        errList.Add(ex.Message);
                    }
                    //Console.WriteLine(ex.Message);
                    //注:这里不再将错误抛出.....
                    //throw ex;
                }
            });

            int Index = 1;
            foreach (string err in errList)
            {
                Console.WriteLine("{0}、的错误:{1}", Index  , err);
            }
        }
    }
}

3)如果fromInclusive大于或等于toExclusive,方法立即返回而不会执行任何迭代。

第一:我们要分区的范围是0-3000000。

Program

4)对于body参数中含有的ParallelLoopState实例,其作用为提早中断并行循环。

第二:我们肯定想知道系统给我们分了几个区? 很遗憾,这是系统内部协调的,无权告诉我们,当然系统也不反对我们自己指定分区个数,

 

5)只有在迭代全部完成以后才会返回结果,否则循环将一直阻塞。

        这里可以使用Partitioner.Create的第六个重载,比如这样:Partitioner.Create(0, 3000000, Environment.ProcessorCount),

 

        因为 Environment.ProcessorCount能够获取到当前的硬件线程数,所以这里也就开了2个区。

3 Parallel.ForEach

 

方法

下面分享下并行计算中我们可能有的疑惑?

1)public static ParallelLoopResult ForEach(IEnumerable<TSource> source, Action<TSource> body);

<1> 如何中途退出并行循环?

2)public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body);

      是的,在串行代码中我们break一下就搞定了,但是并行就不是这么简单了,不过没关系,在并行循环的委托参数中提供了一个

3)public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body);

ParallelLoopState,该实例提供了Break和Stop方法来帮我们实现。

参数:

Break: 当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。

source:数据源

Stop:这个就不一样了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。

body:将被每个迭代调用一次的委托。

 

parallelOptions:一个对象,用于配置此操作的行为。

下面举个例子,当迭代到1000的时候退出循环

返回结果:

 1   class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             var watch = Stopwatch.StartNew();
 6 
 7             watch.Start();
 8 
 9             ConcurrentBag<int> bag = new ConcurrentBag<int>();
10 
11             Parallel.For(0, 20000000, (i, state) =>
12                   {
13                       if (bag.Count == 1000)
14                       {
15                           state.Break();
16                           return;
17                       }
18                       bag.Add(i);
19                   });
20 
21             Console.WriteLine("当前集合有{0}个元素。", bag.Count);
22 
23         }
24     }

ParallelLoopResult :包含有关已完成的循环部分的信息。

图片 32

异常:

 

System.ArgumentNullException:source 参数为 null。-或- 方body 参数为 null。

<2> 并行计算中抛出异常怎么处理?

System.AggregateException:包含了所有线程上引发的全部单个异常。

 首先任务是并行计算的,处理过程中可能会产生n多的异常,那么如何来获取到这些异常呢?普通的Exception并不能获取到异常,然而为并行诞生的AggregateExcepation就可以获取到一组异常。

对于方法2)还包括:

class Program
{
    static void Main(string[] args)
    {
        try
        {
            Parallel.Invoke(Run1, Run2);
        }
        catch (AggregateException ex)
        {
            foreach (var single in ex.InnerExceptions)
            {
                Console.WriteLine(single.Message);
            }
        }

        Console.Read();
    }

    static void Run1()
    {
        Thread.Sleep(3000);
        throw new Exception("我是任务1抛出的异常");
    }

    static void Run2()
    {
        Thread.Sleep(5000);

        throw new Exception("我是任务2抛出的异常");
    }
}

System.OperationCanceledException:在 parallelOptions 设置了参数 System.Threading.CancellationToken。

图片 33

System.ObjectDisposedException:在 parallelOptions 中与 System.Threading.CancellationToken 关联的 System.Threading.CancellationTokenSource已被释放。

 

对于3)包括的异常为:

<3> 并行计算中我可以留一个硬件线程出来吗?

System.ArgumentNullException:source 参数为 null。-或- 方body 参数为 null。

  默认的情况下,底层机制会尽可能多的使用硬件线程,然而我们使用手动指定的好处是我们可以在2,4,8个硬件线程的情况下来进行测量加速比。

System.InvalidOperationException:source 分区程序中的 System.Collections.Concurrent.Partitioner<TSource>.SupportsDynamicPartitions 属性返回 false。或 在 source 分区程序中的任何方法返回 null 时引发异常。或在source 分区程序中的 System.Collections.Concurrent.Partitioner<TSource>.GetPartitions(System.Int32)方法不返回正确数目的分区。

 class Program
    {
        static void Main(string[] args)
        {
            var bag = new ConcurrentBag<int>();

            ParallelOptions options = new ParallelOptions();

            //指定使用的硬件线程数为1
            options.MaxDegreeOfParallelism = 1;

            Parallel.For(0, 300000, options, i =>
            {
                bag.Add(i);
            });

            Console.WriteLine("并行计算:集合有:{0}", bag.Count);

        }
    }

说明:

 

1)对于body参数中含有的ParallelLoopState实例,其作用为提早中断并行循环。

————————————————————————————————————————————————————————————

2)Parallel.ForEach方法不保证执行顺序,它不像foreach循环那样总是顺序执行。

————————————————————————————————————————————————————————————

3)对于方法3)中的source,它的类型是Partitioner<TSource>。可以使用Partitioner.Create方法创建分区,该方法的几个重整方法为:

友情提示:如果不喜欢看文章,可以移步本系列的  C#IL解读完整视频 【一把伞的钱哦图片 34

l public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive);

————————————————————————————————————————————————————————————

l public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive, int rangeSize);

————————————————————————————————————————————————————————————

fromInclusive为范围下限(含),toExclusive为范围下限(不含),rangeSize为每个子范围的大小。

使用Partitioner创建的子范围大小默认大约是计算机内核的三倍,而当使用rangeSize指定范围大小时,那么子范围大小为指定值。

4)只有在迭代全部完成以后才会返回结果,否则循环将一直阻塞。

 

4 ParallelOptions

定义:

存储选项,用于配置 System.Threading.Tasks.Parallel 类的方法。

ParallelOptions属性

1)public CancellationToken CancellationToken { get; set; }

获取或设置传播有关应取消操作的通知。

2)public int MaxDegreeOfParallelism { get; set; }

获取或设置此 ParallelOptions 实例所允许的最大并行度。

3)public TaskScheduler TaskScheduler { get; set; }

获取或设置与此 System.Threading.Tasks.ParallelOptions 实例关联的 System.Threading.Tasks.TaskScheduler

说明:

1)通过设置CancellationToken来取消并行循环,当前正在运行的迭代会执行完,然后抛出System.OperationCanceledException类型的异常。

2)TPL的方法总是会试图利用所有可用内核以达到最好的效果,但是很可能.NET Framework内部使用的启发式算法所得到的注入和使用的线程数比实际需要的多(通常都会高于硬件线程数,这样会更好地支持CPU和I/O混合型的工作负载)。

通常将最大并行度设置为小于等于逻辑内核数。如果设置为等于逻辑内核数,那么要确保不会影响其他程序的执行。设置为小于逻辑内核数是为了有空闲内核来处理其他紧急的任务。

用途:

1)从循环外部取消并行循环

2)指定并行度

3)指定自定义任务调度程序

5 ParallelLoopState

定义:

可使并行循环迭代与其他迭代交互。 此类的实例由 Parallel 类提供给每个循环;不能在用户代码中创建实例。

方法:

1)Break()方法:通知并行循环在执行完当前迭代之后尽快停止执行,可确保低索引步骤完成。且可确保正在执行的迭代继续运行直到完成。

2)Stop()方法:通知并行循环尽快停止执行。对于尚未运行的迭代不能会尝试执行低索引迭代。不保证所有已运行的迭代都执行完。

用途:提早退出并行循环。

说明:

1)不能同时在同一个并行循环中同时使用Break和Stop。

2)Stop比Break更常用。break语句用在并行循环中的效果和用在串行循环中不同。Break用在并行循环中,委托的主体方法在每次迭代的时候被调用,退出委托的主体方法对并行循环的执行没有影响。Stop停止循环比Break快。

6 ParallelLoopResult结构

定义:

并行循环运行结果的信息。

属性:

1)public bool IsCompleted { get; }

如果该循环已运行完成(该循环的所有迭代均已执行,并且该循环没有收到提前结束的请求),则为 true;否则为 false。

2)public long? LowestBreakIteration { get; }

返回一个表示从中调用 Break 语句的最低迭代的整数

用途:判断当并行循环结束时,是否因调用了break方法或stop方法而提前退出并行循环,或所有迭代均已执行。

判断依据:

条件

 

IsCompleted

运行完成

!IsCompleted &&

LowestBreakIteration==null

使用了Stop语句而提前终止

!IsCompleted &&

LowestBreakIteration!=null

使用了Break语句而提前终止

 

7 捕获并行循环中的异常

原则:

1)异常优先于从循环外部取消和使用Break()方法或Stop()方法提前退出并行循环。

2)并行循环体抛出一个未处理的异常,并行循环就不能再开始新的迭代。

3)默认情况下当某次迭代抛出一个未处理异常,那么正在执行的迭代如果没抛出异常,正在执行的迭代会执行完。当所有迭代都执行完(有可能其他的迭代在执行的过程中也抛出异常),并行循环将在调用它的线程中抛出异常。

并行循环运行的过程中,可能有多个迭代抛出异常,所以一般使用AggregateException来捕获异常。AggregateException继承自Exception。为了防止仅使用AggregateException未能捕获某些异常,使用AggregateException的同时还要使用Exception。

8 使用模式

 

8.1 Parallel.Invoke

 1 public static void DemonstrateInvoke()
 2 {
 3     //使用Lambda
 4     Parallel.Invoke(
 5     () =>
 6     {
 7         //具体操作1
 8     }, 
 9     () => 
10     {
11         //具体操作2
12     });
13 
14     //不使用lambda
15     Parallel.Invoke(Operation1, Operation2);
16 }
17 
18 private static void Operation1()
19 {
20     //具体操作1
21 }
22 
23 private static void Operation2()
24 {
25     //具体操作2
26 }        

 

8.2 Parallel.For

1 串行循环:
2 int toExclusive = ...;
3 for(int i =0;i<=toExclusive;i  ){};
4 
5 对应的并行循环:
6 Parallel.For(0, toExclusive 1, (i) => 
7 {
8     //具体操作
9 });

8.3 Parallel.ForEach

 1 一般用法
 2 IEnumerable<string> coll = ...;
 3 Parallel.ForEach(coll,(str)=>
 4 {
 5     //具体操作
 6 });
 7 
 8 基于分区的模式
 9 优化分区数,使其最接近系统逻辑内核数:
10 子分区范围 = 对“待处理集合大小/系统逻辑内核数”取整 1。
11 int logicalCores =...;
12 IEnumerable<string> collP = ...;
13 int fromInclusive = ...;
14 int toExclusiv = ...;
15 int rangeSize = (int)((toExclusiv-fromInclusive )/logicalCores)  1;
16 Parallel.ForEach(Partitioner.Create(fromInclusive, toExclusiv, rangeSize), range =>
17 {
18     for (int i = range.Item1; i < range.Item2; i  )
19     {
20         //使用集合:collection[i]
21     }
22 });    

8.4 从循环外部取消并行循环

注意:不使用IsCancellationRequested或ThrowIfCancellationRequested()的情况下无法捕获类型为AggregateException的异常。

1)对于Parallel.For

使用IsCancellationRequested属性

 1 public static void CancelFromExternal()
 2 {
 3     CancellationTokenSource cts = new CancellationTokenSource();
 4     //其他操作...
 5 
 6     //异步执行Operation方法
 7     Task.Factory.StartNew(()=>{Operation(cts);});
 8     //异步执行condition的计算过程
 9     Task.Factory.StartNew(()=>{
10         bool condition = ...;
11         if (condition) cts.Cancel();
12     }
13 
14     //其他操作...
15 }
16 
17 private static void Operation(CancellationTokenSource cts)
18 {
19     CancellationToken ct = cts.Token;
20     ParallelOptions op = new ParallelOptions { CancellationToken = ct };
21     int toExclusive = ...;
22     Parallel.For(0, toExclusive, op, (i) =>
23     {
24 
25         //其他操作...
26 
27         //return只对当前子线程有效
28         if (ct.IsCancellationRequested)
29         { return; }
30 
31         //其他操作...
32     });
33 }                    

使用ThrowIfCancellationRequested()方法抛出异常

将上面的并行循环部分替换为下面的代码:

 

1 Parallel.For(0, toExclusive, op, (i) =>
2 {
3 
4     //其他操作...
5 
6     ct.ThrowIfCancellationRequested();
7 
8     //其他操作...
9 });

 

不使用IsCancellationRequested和ThrowIfCancellationRequested()方法

将Operation方法中的涉及到IsCancellationRequested和ThrowIfCancellationRequested()方法的代码去掉

2)对于Parallel.ForEach

使用IsCancellationRequested属性

 1 public static void CancelFromExternal()
 2 {
 3     //同1)中CancelFromExternal方法
 4 }
 5 
 6 private static void Operation(CancellationTokenSource cts)
 7 {
 8     CancellationToken ct = cts.Token;
 9     ParallelOptions op = new ParallelOptions { CancellationToken = ct };
10     IEnumerable<string> coll = new List<string> { "str1", "str2" };
11     Parallel.ForEach(coll, op,(str, loopState) =>
12     {
13         //其他操作...
14 
15       //return只对当前子线程有效
16       if (ct.IsCancellationRequested)
17       { return; }
18 
19       //其他操作...
20   });
21 }

使用ThrowIfCancellationRequested()方法抛出异常

将Operation方法中的:

if (ct.IsCancellationRequested)

   { return; }

替换为:

ct.ThrowIfCancellationRequested();

不使用IsCancellationRequested和ThrowIfCancellationRequested()方法

将Operation方法中的涉及到IsCancellationRequested和ThrowIfCancellationRequested()方法的代码去掉

8.5 指定并行度

1 int maxDegreeOfParallelism = Environment.ProcessorCount - 1;
2 ParallelOptions op = new ParallelOptions { MaxDegreeOfParallelism =     maxDegreeOfParallelism };
3 IEnumerable<string> coll = new List<string> { };
4 Parallel.ForEach(coll, op ,(str) =>
5 {
6     //具体操作
7 });

8.6 提早退出并行循环

1)对于Parallel.For

 1 int toExclusive = 10;
 2 Parallel.For(0, toExclusive, (i, loopState) =>
 3 {
 4     //其他操作...
 5     //计算condition
 6     bool condition = ...;
 7     if (condition)
 8     {
 9         loopState.Break();//或使用loopState.Stop
10         return;
11     }
12 
13     //其他操作
14 });    

2)对于Parallel.ForEach

 1 IEnumerable<string> coll = new List<string> {"str1","str2" };
 2 Parallel.ForEach(coll, (str, loopState) =>
 3 {
 4     //其他操作...
 5 
 6     //计算condition
 7     bool condition = ...;
 8     if (condition)
 9     {
10         loopState.Break();//或使用loopState.Stop
11         return;
12     }
13 
14     //其他操作
15 
16 });

9 异常处理模式

基本形式

在确保使用AggregateException 能够捕捉到所有的异常时,可以省去catch(Exception e)的部分。

 

 1 try
 2 {
 3     //Do something
 4 }
 5 catch(AggregateException e)
 6 {
 7     Foreach(Exception ex in e.InnerExceptions)
 8     {
 9         //Do something
10     }
11 }
12 catch(Exception e)
13 {
14     //Do something
15 }

 

为上述并行循环使用模式添加异常处理机制

一种方式是把并行循环放入try块中,另一种方式是在每次迭代的过程中捕获异常。

 

 -----------------------------------------------------------------------------------------

转载与引用请注明出处。

时间仓促,水平有限,如有不当之处,欢迎指正。

本文由星彩网app下载发布于计算机编程,转载请注明出处:8天玩转并行开辟,Net四线程编制程序

TAG标签: 星彩网app下载
Ctrl+D 将本页面保存为书签,全面了解最新资讯,方便快捷。