发布时间:2024-12-30 12:01
用于对内存中的数据做并行运算,也就是说其只支持 LINQ to Object 的并行运算
就是在集合后加个AsParallel()。
例如:
var numbers = Enumerable.Range(0, 100); var result = numbers.AsParallel().AsOrdered().Where(i => i % 2 == 0); foreach (var i in result) Console.WriteLine(i);
下面我们模拟给ConcurrentDictionary灌入1500w条记录,看看串行和并行效率上的差异,注意我的老爷机是2个硬件线程。
static void Main(string[] args) { var dic = LoadData(); Stopwatch watch = new Stopwatch(); watch.Start(); //串行执行 var query1 = (from n in dic.Values where n.Age > 20 && n.Age < 25 select n).ToList(); watch.Stop(); Console.WriteLine(\"串行计算耗费时间:{0}\", watch.ElapsedMilliseconds); watch.Restart(); var query2 = (from n in dic.Values.AsParallel() where n.Age > 20 && n.Age < 25 select n).ToList(); watch.Stop(); Console.WriteLine(\"并行计算耗费时间:{0}\", watch.ElapsedMilliseconds); Console.Read(); } public static ConcurrentDictionaryLoadData() { ConcurrentDictionary dic = new ConcurrentDictionary (); //预加载1500w条记录 Parallel.For(0, 15000000, (i) => { var single = new Student() { ID = i, Name = \"hxc\" + i, Age = i % 151, CreateTime = DateTime.Now.AddSeconds(i) }; dic.TryAdd(i, single); }); return dic; } public class Student { public int ID { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime CreateTime { get; set; } }
orderby,sum(),average()等等这些聚合函数都是实现了并行化。
这个我在前面文章也说过,为了不让并行计算占用全部的硬件线程,或许可能要留一个线程做其他事情。
var query2 = (from n in dic.Values.AsParallel().WithDegreeOfParallelism(Environment.ProcessorCount - 1) where n.Age > 20 && n.Age < 25 orderby n.CreateTime descending select n).ToList();
首先这个类是Enumerable的并行版本,提供了很多用于查询实现的一组方法,下图为ParallelEnumerable类的方法,记住他们都是并行的。
ConcurrentBagbag = new ConcurrentBag (); var list = ParallelEnumerable.Range (0, 10000); list.ForAll((i) => { bag.Add(i); }); Console.WriteLine(\"bag集合中元素个数有:{0}\", bag.Count); Console.WriteLine(\"list集合中元素个数总和为:{0}\", list.Sum()); Console.WriteLine(\"list集合中元素最大值为:{0}\", list.Max()); Console.WriteLine(\"list集合中元素第一个元素为:{0}\", list.FirstOrDefault());
mapReduce是一个非常流行的编程模型,用于大规模数据集的并行计算,非常的牛X啊,记得mongodb中就用到了这个玩意。
下面我举个例子,用Mapreduce来实现一个对age的分组统计。
static void Main(string[] args) { Listlist = new List () { new Student(){ ID=1, Name=\"jack\", Age=20}, new Student(){ ID=1, Name=\"mary\", Age=25}, new Student(){ ID=1, Name=\"joe\", Age=29}, new Student(){ ID=1, Name=\"Aaron\", Age=25}, }; //这里我们会对age建立一组键值对 var map = list.AsParallel().ToLookup(i => i.Age, count => 1); //化简统计 var reduce = from IGrouping singleMap in map.AsParallel() select new { Age = singleMap.Key, Count = singleMap.Count() }; ///最后遍历 reduce.ForAll(i => { Console.WriteLine(\"当前Age={0}的人数有:{1}人\", i.Age, i.Count); }); } public class Student { public int ID { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime CreateTime { get; set; } }
考虑一个简单的例子,现有一个容量为1000000的单词集,需要我们以降序列出其中出现次数超过100000的单词(和其次数)。Map过程,使用PLINQ将集合按单词分组,这里使用了Lookup容器接口,它与Dictionary类似,但是提供的是键-值集映射;Reduce过程,使用PLINQ归约查询即可。
某一次运行结果如下:
Word: you, Count: 142416
Word: van, Count: 115816
Word: next, Count: 110228
到此这篇关于C#并行编程之PLINQ(并行LINQ)的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持脚本之家。