问题描述
有一个文件夹包含1000个小文本文件.我的目标是在将更多文件填充到文件夹中时解析和处理所有这些文件.我的目的是多线程此操作,因为单线原型花了六分钟才能处理1000个文件.
我喜欢让读者和作家线程如下.读者线程正在读取文件时,我想让作者线程处理它们.读者开始读取文件后,我希望将其标记为正在处理的文件,例如通过重命名.读取后,将其重命名为已完成.
如何处理这样的多线程应用程序?
使用分布式哈希表或队列更好?
我使用哪种数据结构可以避免锁?
该方案有更好的方法吗?
推荐答案
由于.NET 4在评论中如何使用此方法,因此这是这种方法.抱歉,OP可能没有选择. 免责声明:这不是一项高度科学的分析,只是表明有明确的性能好处.基于硬件,您的里程可能差异很大.
这是一个快速测试(如果您在此简单测试中看到一个大错误,那就是一个例子.请评论,我们可以将其修复以更有用/准确).为此,我刚刚将12,000〜60 kb的文件放入目录中作为样本(fire linqpad ;您可以免费玩! - 一定要确保获得linqpad 4虽然):
var files = Directory.GetFiles("C:\\temp", "*.*", SearchOption.AllDirectories).ToList(); var sw = Stopwatch.StartNew(); //start timer files.ForEach(f => File.ReadAllBytes(f).GetHashCode()); //do work - serial sw.Stop(); //stop sw.ElapsedMilliseconds.Dump("Run MS - Serial"); //display the duration sw.Restart(); files.AsParallel().ForAll(f => File.ReadAllBytes(f).GetHashCode()); //parallel sw.Stop(); sw.ElapsedMilliseconds.Dump("Run MS - Parallel");
稍微更改循环以并行化查询 大多数 简单情况.通过"简单",我主要是指一个动作的结果不会影响下一个动作.最常牢记的事情是一些收藏,例如我们的方便 List<T> List<T> IS not thread Safe ,因此在并行方案中使用它不是一个好主意:)幸运的是,有在.NET 4 中添加的并发集合安全.同样请记住,如果您使用的锁定收集,这也可能是瓶颈,具体取决于情况.
这使用 .AsParallel<T>(IEnumeable<T>) 和 .ForAll<T>(ParallelQuery<T>) extensions在.net 4.0中可用. .AsParallel() C4>在A ParallelEnumerableWrapper<T>(内部类)中,可以实现 ParallelQuery<T> ParallelQuery<T> .现在,这允许您使用并行扩展方法,在这种情况下,我们正在使用
.ForAll() .ForAll() a ForAllOperator<T>(query, action)并同步运行.这可以处理线程运行后的线程和合并...那里发生了很多事情,我建议如果您想了解更多信息,包括其他选项.
结果(计算机1-物理硬盘):
- 串行: 1288-1333ms
- 平行: 461-503ms
计算机规格 - 用于比较:
结果(计算机2-固态驱动器):
- 串行: 545-601 MS
- 平行: 248-278 ms
计算机规格 - 进行比较:
- Quad Core 2 Quad Q9100 @ 2.26 GHz
- 8 GB RAM(DDR 1333)
- 120 GB OCZ VERTEX SSD SSD(标准版-1.4固件)
我这次没有CPU/RAM的链接,这些链接已安装.这是一台戴尔M6400笔记本电脑(这是M6500 ... dell's 自己的链接到6400 =" http://www.dell.com/content/products/productdetails.aspx/workstation-precision-m6400?cc = us&l = en&s = biz" rel =" noreferrer"> broken ).
这些数字来自10次运行,取得了内部8个结果的最小/最大值(删除每个可能的离群值的原始最小/最大).我们在这里碰到了I/O瓶颈,尤其是在物理驱动器上,但是请考虑串行方法的作用.它读取,过程,读取,过程,冲洗重复.使用并行方法,您(即使使用I/O瓶颈)同时阅读和处理.在最坏的瓶颈情况下,您正在阅读下一个文件时处理一个文件.仅此(在任何当前计算机上!)就应该导致一些性能增益.您可以看到,在上面的结果中,我们可以每次获得不止一个,从而使我们得到健康的提升.
另一个免责声明:Quad Core + .NET 4 Barallel不会给您四倍的性能,它不会线性扩展...还有其他考虑因素和瓶颈.
我希望这对展示方法和可能的好处是一种兴趣.随意批评或改进...这个答案仅适用于评论中所示的那些好奇的答案:)
其他推荐答案
设计
生产者/消费者模式在这种情况下可能是最有用的.您应该创建足够的线程以最大化吞吐量.
这里有一些有关生产者/消费者模式的问题,可以使您了解其工作原理:
您应该使用阻止队列,而生产者应在消费者从队列处理文件时将文件添加到队列中.阻塞队列不需要锁定,因此这是解决问题的最有效方法.
如果您正在使用.NET 4.0,则有几个并发收藏您可以开箱即用:
- concurrentqueue: http:http://msdn .microsoft.com/en-us/library/dd267265%28V = vs.100%29.aspx
- blockingCollection: .com/en-us/library/dd267312%28vs.100%29.aspx
线程
单个生产者线程可能是从磁盘加载文件并将其推入队列的最有效方法.随后,多个消费者将弹出队列中的项目,他们将处理它们.我建议您尝试每个核心的2-4个消费者线程,并进行一些性能测量以确定哪个最佳功能(即为您提供最大吞吐量的线程数).我会不是建议在此特定示例中使用theadpool.
P.S.我不明白单个失败点和分布式哈希表的使用是什么?我知道DHT听起来像是一件非常酷的事情,但是除非您有一个特定的问题,否则我会首先尝试常规方法.
.其他推荐答案
我建议您为每个文件排队一个线程,并跟踪字典中的运行线程,在线程完成时启动新线程,最大限制为最大限制.我更喜欢在长期运行时创建自己的线程,并在完成或遇到异常时使用回调来发出信号.在下面的示例中,我使用字典来跟踪运行的工人实例.如果我想尽早停止工作,我可以打电话给一个实例.回调也可以用于更新进度和吞吐量的UI.您还可以动态地油门为添加点的运行线程限制.
示例代码是一个缩写的演示器,但确实运行.
class Program { static void Main(string[] args) { Supervisor super = new Supervisor(); super.LaunchWaitingThreads(); while (!super.Done) { Thread.Sleep(200); } Console.WriteLine("\nDone"); Console.ReadKey(); } } public delegate void StartCallbackDelegate(int idArg, Worker workerArg); public delegate void DoneCallbackDelegate(int idArg); public class Supervisor { Queue<Thread> waitingThreads = new Queue<Thread>(); Dictionary<int, Worker> runningThreads = new Dictionary<int, Worker>(); int maxThreads = 20; object locker = new object(); public bool Done { get { lock (locker) { return ((waitingThreads.Count == 0) && (runningThreads.Count == 0)); } } } public Supervisor() { // queue up a thread for each file Directory.GetFiles("C:\\folder").ToList().ForEach(n => waitingThreads.Enqueue(CreateThread(n))); } Thread CreateThread(string fileNameArg) { Thread thread = new Thread(new Worker(fileNameArg, WorkerStart, WorkerDone).ProcessFile); thread.IsBackground = true; return thread; } // called when a worker starts public void WorkerStart(int threadIdArg, Worker workerArg) { lock (locker) { // update with worker instance runningThreads[threadIdArg] = workerArg; } } // called when a worker finishes public void WorkerDone(int threadIdArg) { lock (locker) { runningThreads.Remove(threadIdArg); } Console.WriteLine(string.Format(" Thread {0} done", threadIdArg.ToString())); LaunchWaitingThreads(); } // launches workers until max is reached public void LaunchWaitingThreads() { lock (locker) { while ((runningThreads.Count < maxThreads) && (waitingThreads.Count > 0)) { Thread thread = waitingThreads.Dequeue(); runningThreads.Add(thread.ManagedThreadId, null); // place holder so count is accurate thread.Start(); } } } } public class Worker { string fileName; StartCallbackDelegate startCallback; DoneCallbackDelegate doneCallback; public Worker(string fileNameArg, StartCallbackDelegate startCallbackArg, DoneCallbackDelegate doneCallbackArg) { fileName = fileNameArg; startCallback = startCallbackArg; doneCallback = doneCallbackArg; } public void ProcessFile() { startCallback(Thread.CurrentThread.ManagedThreadId, this); Console.WriteLine(string.Format("Reading file {0} on thread {1}", fileName, Thread.CurrentThread.ManagedThreadId.ToString())); File.ReadAllBytes(fileName); doneCallback(Thread.CurrentThread.ManagedThreadId); } }
问题描述
There is a folder that contains 1000s of small text files. I aim to parse and process all of them while more files are being populated into the folder. My intention is to multithread this operation as the single threaded prototype took six minutes to process 1000 files.
I like to have reader and writer thread(s) as the following. While the reader thread(s) are reading the files, I'd like to have writer thread(s) to process them. Once the reader is started reading a file, I d like to mark it as being processed, such as by renaming it. Once it's read, rename it to completed.
How do I approach such a multithreaded application?
Is it better to use a distributed hash table or a queue?
Which data structure do I use that would avoid locks?
Is there a better approach to this scheme?
推荐答案
Since there's curiosity on how .NET 4 works with this in comments, here's that approach. Sorry, it's likely not an option for the OP. Disclaimer: This is not a highly scientific analysis, just showing that there's a clear performance benefit. Based on hardware, your mileage may vary widely.
Here's a quick test (if you see a big mistake in this simple test, it's just an example. Please comment, and we can fix it to be more useful/accurate). For this, I just dropped 12,000 ~60 KB files into a directory as a sample (fire up LINQPad; you can play with it yourself, for free! - be sure to get LINQPad 4 though):
var files = Directory.GetFiles("C:\\temp", "*.*", SearchOption.AllDirectories).ToList(); var sw = Stopwatch.StartNew(); //start timer files.ForEach(f => File.ReadAllBytes(f).GetHashCode()); //do work - serial sw.Stop(); //stop sw.ElapsedMilliseconds.Dump("Run MS - Serial"); //display the duration sw.Restart(); files.AsParallel().ForAll(f => File.ReadAllBytes(f).GetHashCode()); //parallel sw.Stop(); sw.ElapsedMilliseconds.Dump("Run MS - Parallel");
Slightly changing your loop to parallelize the query is all that's needed in most simple situations. By "simple" I mostly mean that the result of one action doesn't affect the next. Something to keep in mind most often is that some collections, for example our handy List<T> is not thread safe, so using it in a parallel scenario isn't a good idea :) Luckily there were concurrent collections added in .NET 4 that are thread safe. Also keep in mind if you're using a locking collection, this may be a bottleneck as well, depending on the situation.
This uses the .AsParallel<T>(IEnumeable<T>) and .ForAll<T>(ParallelQuery<T>) extensions available in .NET 4.0. The .AsParallel() call wraps the IEnumerable<T> in a ParallelEnumerableWrapper<T> (internal class) which implements ParallelQuery<T>. This now allows you to use the parallel extension methods, in this case we're using .ForAll().
.ForAll() internally crates a ForAllOperator<T>(query, action) and runs it synchronously. This handles the threading and merging of the threads after it's running... There's quite a bit going on in there, I'd suggest starting here if you want to learn more, including additional options.
The results (Computer 1 - Physical Hard Disk):
- Serial: 1288 - 1333ms
- Parallel: 461 - 503ms
Computer specs - for comparison:
The results (Computer 2 - Solid State Drive):
- Serial: 545 - 601 ms
- Parallel: 248 - 278 ms
Computer specifications - for comparison:
- Quad Core 2 Quad Q9100 @ 2.26 GHz
- 8 GB RAM (DDR 1333)
- 120 GB OCZ Vertex SSD (Standard Version - 1.4 Firmware)
I don't have links for the CPU/RAM this time, these came installed. This is a Dell M6400 Laptop (here's a link to the M6500... Dell's own links to the 6400 are broken).
These numbers are from 10 runs, taking the min/max of the inner 8 results (removing the original min/max for each as possible outliers). We hit an I/O bottleneck here, especially on the physical drive, but think about what the serial method does. It reads, processes, reads, processes, rinse repeat. With the parallel approach, you are (even with a I/O bottleneck) reading and processing simultaneously. In the worst bottleneck situation, you're processing one file while reading the next. That alone (on any current computer!) should result in some performance gain. You can see that we can get a bit more than one going at a time in the results above, giving us a healthy boost.
Another disclaimer: Quad core + .NET 4 parallel isn't going to give you four times the performance, it doesn't scale linearly... There are other considerations and bottlenecks in play.
I hope this was on interest in showing the approach and possible benefits. Feel free to criticize or improve... This answer exists solely for those curious as indicated in the comments :)
其他推荐答案
Design
The Producer/Consumer pattern will probably be the most useful for this situation. You should create enough threads to maximize the throughput.
Here are some questions about the Producer/Consumer pattern to give you an idea of how it works:
You should use a blocking queue and the producer should add files to the queue while the consumers process the files from the queue. The blocking queue requires no locking, so it's about the most efficient way to solve your problem.
If you're using .NET 4.0 there are several concurrent collections that you can use out of the box:
- ConcurrentQueue: http://msdn.microsoft.com/en-us/library/dd267265%28v=VS.100%29.aspx
- BlockingCollection: http://msdn.microsoft.com/en-us/library/dd267312%28VS.100%29.aspx
Threading
A single producer thread will probably be the most efficient way to load the files from disk and push them onto the queue; subsequently multiple consumers will be popping items off the queue and they'll process them. I would suggest that you try 2-4 consumer threads per core and take some performance measurements to determine which is most optimal (i.e. the number of threads that provide you with the maximum throughput). I would not recommend the use a ThreadPool for this specific example.
P.S. I don't understand what's the concern with a single point of failure and the use of distributed hash tables? I know DHTs sound like a really cool thing to use, but I would try the conventional methods first unless you have a specific problem in mind that you're trying to solve.
其他推荐答案
I recommend that you queue a thread for each file and keep track of the running threads in a dictionary, launching a new thread when a thread completes, up to a maximum limit. I prefer to create my own threads when they can be long-running, and use callbacks to signal when they're done or encountered an exception. In the sample below I use a dictionary to keep track of the running worker instances. This way I can call into an instance if I want to stop work early. Callbacks can also be used to update a UI with progress and throughput. You can also dynamically throttle the running thread limit for added points.
The example code is an abbreviated demonstrator, but it does run.
class Program { static void Main(string[] args) { Supervisor super = new Supervisor(); super.LaunchWaitingThreads(); while (!super.Done) { Thread.Sleep(200); } Console.WriteLine("\nDone"); Console.ReadKey(); } } public delegate void StartCallbackDelegate(int idArg, Worker workerArg); public delegate void DoneCallbackDelegate(int idArg); public class Supervisor { Queue<Thread> waitingThreads = new Queue<Thread>(); Dictionary<int, Worker> runningThreads = new Dictionary<int, Worker>(); int maxThreads = 20; object locker = new object(); public bool Done { get { lock (locker) { return ((waitingThreads.Count == 0) && (runningThreads.Count == 0)); } } } public Supervisor() { // queue up a thread for each file Directory.GetFiles("C:\\folder").ToList().ForEach(n => waitingThreads.Enqueue(CreateThread(n))); } Thread CreateThread(string fileNameArg) { Thread thread = new Thread(new Worker(fileNameArg, WorkerStart, WorkerDone).ProcessFile); thread.IsBackground = true; return thread; } // called when a worker starts public void WorkerStart(int threadIdArg, Worker workerArg) { lock (locker) { // update with worker instance runningThreads[threadIdArg] = workerArg; } } // called when a worker finishes public void WorkerDone(int threadIdArg) { lock (locker) { runningThreads.Remove(threadIdArg); } Console.WriteLine(string.Format(" Thread {0} done", threadIdArg.ToString())); LaunchWaitingThreads(); } // launches workers until max is reached public void LaunchWaitingThreads() { lock (locker) { while ((runningThreads.Count < maxThreads) && (waitingThreads.Count > 0)) { Thread thread = waitingThreads.Dequeue(); runningThreads.Add(thread.ManagedThreadId, null); // place holder so count is accurate thread.Start(); } } } } public class Worker { string fileName; StartCallbackDelegate startCallback; DoneCallbackDelegate doneCallback; public Worker(string fileNameArg, StartCallbackDelegate startCallbackArg, DoneCallbackDelegate doneCallbackArg) { fileName = fileNameArg; startCallback = startCallbackArg; doneCallback = doneCallbackArg; } public void ProcessFile() { startCallback(Thread.CurrentThread.ManagedThreadId, this); Console.WriteLine(string.Format("Reading file {0} on thread {1}", fileName, Thread.CurrentThread.ManagedThreadId.ToString())); File.ReadAllBytes(fileName); doneCallback(Thread.CurrentThread.ManagedThreadId); } }