Producer-Consumer: A Duplicate File Finder

Processes tend to benefit greatly from multi-core processors if they are CPU bound (i.e. computational intensive tasks). The actual speedup depends on the portion of the code that must remain sequential. The multi-core benefit diminishes for IO bound processes however since hard drive performance becomes the limiting factor. Unlike solid state drives (SSD), traditional hard drives are very good at handling large sequential reads/writes but handle random accesses poorly. In fact, parallelizing sequential read/write access can actually cause performance degradation since sequential data might be accessed by different threads and parallelizing the access makes the access pattern of each thread more random. So the performance of such IO bound task is largely determined by the IO sub-system performance.

For tasks that contain both intensive IO operations and CPU usages however it is desirable to parallelize the IO bound portion and the CPU bound portion of code so that maximum parallelism can be achieved.

An example of such IO and CPU intensive tasks is finding duplicate files in a file system. To find duplicate files, files or their hashes must be compared. In this post, I will discuss a couple of different approaches to achieve this using threading.

1. Sequential find
First let us take a look at how to find duplicate files sequentially (all the sample code can be downloaded from here):

Suppose we have a function that returns a list of files within a directory (see GetFileList in FileUtils). In function FindDuplicates, we loop through the list of files and calculate the hash of each file. The file name and hash are used together as a key to a hash table. Whenever the same key is indexed the count is incremented by one indicating a duplicate file is found. Here is what the code looks like:

 

        public void FindDuplicates(List<string> files)
        {
            foreach (string fn in files)
            {
                string h = ComputeMD5(fn);
 
                if (h != string.Empty)
                {
                    FileDataDefinition def = new FileDataDefinition();
                    def.FileName = Path.GetFileName(fn);
                    def.FilePath = Path.GetDirectoryName(fn);
                    def.LastModifiedDate = Directory.GetLastWriteTime(fn);
 
                    FileInfo fileInfo = new FileInfo(fn);
                    def.FileSize = fileInfo.Length;
 
                    if (Hash.Contains(string.Concat(def.FileName, ":", h)))
                    {
                        List<FileDataDefinition> l = Hash[string.Concat(def.FileName, ":", h)] as List<FileDataDefinition>;
                        l.Add(def);
 
                        Hash[string.Concat(def.FileName, ":", h)] = l;
                    }
                    else
                    {
                        List<FileDataDefinition> l = new List<FileDataDefinition>();
                        l.Add(def);
                        Hash.Add(string.Concat(def.FileName, ":", h), l);
                    }
                }
            }
        }

2. ThreadPool
We could improve the performance by moving the hash calculation to separate threads.

The first approach to parallelize the task is to use a thread pool to queue the work of calculating file hashes. Since calculating file hash involves reading in the whole file (IO bound) and some calculation on the content, there is some inherent parallelism so using multiple threads could potentially improve performance.

The code below shows this approach:

 

        public void FindDuplicates(List<string> files)
        {
            for (int n = 0; n < files.Count; n += _numOfThreads)
            {
                _autoEvents = new AutoResetEvent[_numOfThreads];
 
                for (int i = 0; i < _numOfThreads; i++)
                {
                    if (n + i < files.Count)
                    {
                        _autoEvents[i] = new AutoResetEvent(false);
                        ThreadPool.QueueUserWorkItem(CalculateFileHash, new FileThreadInfo(i, files[n + i]));
                    }
                    else
                    {
                        _autoEvents[i] = new AutoResetEvent(true);
                    }
                }
 
                WaitHandle.WaitAll(_autoEvents);
            }
        }
 
        private void CalculateFileHash(object stateInfo)
        {
            FileThreadInfo threadInfo = stateInfo as FileThreadInfo;
            string h = ComputeMD5(threadInfo.FileName);
 
            Thread.CurrentThread.Name = threadInfo.FileName;
 
            if (h != string.Empty)
            {
                FileDataDefinition def = new FileDataDefinition();
                def.FileName = Path.GetFileName(threadInfo.FileName);
                def.FilePath = Path.GetDirectoryName(threadInfo.FileName);
                def.LastModifiedDate = Directory.GetLastWriteTime(threadInfo.FileName);
 
                FileInfo fileInfo = new FileInfo(threadInfo.FileName);
                def.FileSize = fileInfo.Length;
 
                if (Hash.Contains(string.Concat(def.FileName, ":", h)))
                {
                    List<FileDataDefinition> l = Hash[string.Concat(def.FileName, ":", h)] as List<FileDataDefinition>;
                    l.Add(def);
 
                    Hash[string.Concat(def.FileName, ":", h)] = l;
                }
                else
                {
                    List<FileDataDefinition> l = new List<FileDataDefinition>();
                    l.Add(def);
                    Hash.Add(string.Concat(def.FileName, ":", h), l);
                }
            }
 
            _autoEvents[threadInfo.ID].Set();
        }


3. Producer and Consumer

Even through the previous approach improved the duplicated file finding performance, it is still not optimum. For example, even though multiple threads are used to calculate file hashes, they are joined at the end before another set of calculation is queued. Also, the IO is operations are distributed across multiple threads and on some older hard drives handling IO in multiple threads can actually slow things down. An ideal solution would be to use a single thread for IO operation since IO is inherently sequential.

As it turned out, this problem is a classic producer-consumer problem. In this third approach, we will examine how to use the producer consumer model to solve the problem of finding duplicates. In this approach, the producer thread searches for files within a given directory. It opens each file as it encounters and read the content into a ProducerConsuerQueue. At the same time, the consumer computes the hash from the bytes stored in the queue and determines whether a file is duplicated or not. The code is slightly different than what you saw above as files are being added to the queue and consumed from the queue simultaneously.

 

        public ProducerConsumerQueue _q = null;
 
        public void FindDuplicates(string rootDir)
        {
            using (_q = new ProducerConsumerQueue())
            {
                GetFiles(rootDir);
            }
            _q.OutputDuplicates();
        }
 
        public void GetFiles(string rootDir)
        {
            string[] files = null;
 
            try
            {
                files = Directory.GetFiles(rootDir);
 
                foreach (string fn in files)
                {
                    FileDataDefinition def = new FileDataDefinition();
                    def.FileName = Path.GetFileName(fn);
                    def.FilePath = Path.GetDirectoryName(fn);
                    def.LastModifiedDate = Directory.GetLastWriteTime(fn);
 
                    FileInfo fileInfo = new FileInfo(fn);
                    def.FileSize = fileInfo.Length;
 
                    FileStream f = new FileStream(fn, FileMode.Open);
                    byte[] bytes = new byte[f.Length];                  
                    f.Read(bytes, 0, (int) f.Length 1);
                    f.Close();
 
                    _q.EnqueueTask(new FileBytes(def, bytes));
                }
            }
            catch (Exception e)
            {
            }
            string[] dirs = null;
 
            try
            {
                dirs = Directory.GetDirectories(rootDir);
 
                foreach (string s in dirs)
                {
                    GetFiles(s);
                }
            }
            catch (Exception e)
            {
            }
        }

If you run each versions of the code, you will see that this approach has most of the performance gains. It has better performance than ThreadPool approach since the IO thread always tries to keep the queue. Further, concentrate IO operations into a single thread helps the system optimize IO requests and thus increases the IO subsystem performance.

Be Sociable, Share!

Leave a Reply