当前位置: 代码迷 >> C# >> 统计大文件里,频次最高的10个单词,(C# TPL DataFlow版)
  详细解决方案

统计大文件里,频次最高的10个单词,(C# TPL DataFlow版)

热度:97   发布时间:2016-05-05 04:07:05.0
统计大文件里,频率最高的10个单词,(C# TPL DataFlow版)

最近公司搞了一个写程序的比赛,要求从2G的文件里统计出出现频率最高的10个单词。

最开始的想法是使用字典树,后来发现字典树更适合用在找前缀上,在查找没有hash表效率高。

之后使用Hash表+DataFlow完成了功能,2G的文件处理在20秒以内(其实我有信心优化到10秒以内,但是太折腾了)。

这是我的设计图:

为什么要形成那么多结果?因为我不想写锁,写锁会降低很多效率,而且也失去了线程的意义,每个线程做自己的工作,

最后在把每个线程处理的结果汇总起来,这样也符合fork join 的设计。

而且我也试过,如果写锁的话,效率会降低10秒以上,我也尝试过微软提供的ConcurrentDictionary 原子哈希表,但是效果都不是

很理想,而且,在并行的年代,在写锁这个东西,感觉很恶心,好像在代码里加了一坨屎一样,我以前就很讨厌锁,也出现过代码死锁的情况。

最后我选择了使用微软的TPL 库来解决并行的问题。

使用DataFlow解决了我处理时多线程管理的问题,还有线程等待消息队列的问题,

使用BufferBlock 进行主控与工作线程之间消息传递,这是我的设计图:

 

读取文件之后使用BufferBlock.Post发送给工作线程,工作线程使用TryReceive接收消息,并且处理。

在MSDNhttps://msdn.microsoft.com/zh-cn/library/hh228601(v=vs.110).aspx 里有详细的介绍。

这是典型的单生产者,多使用者的列子。

代码方面首先是读取文件:

  public class FileBufferBlock    {               private string _fileName;        BufferBlock<WordStream> _buffer = null;        public FileBufferBlock(BufferBlock<WordStream> buffer,string fileName)        {            this._fileName = fileName;            this._buffer = buffer;        }        /// <summary>        /// 按32M读取文件,循环发送给WordBufferBlock        /// </summary>        public void ReadFile()        {            using (FileStream fs = new FileStream(_fileName, FileMode.Open, FileAccess.Read))            {                using (StreamReader sr = new StreamReader(fs))                {                    while (!sr.EndOfStream)                    {                        char[] charBuffer = new char[32 * 1024 * 1024];                        sr.ReadBlock(charBuffer, 0, charBuffer.Length);                        _buffer.Post(new WordStream(charBuffer));                    }                }            }            _buffer.Complete();        }

在这里使用BufferBlock.Post 发送消息给工作线程,如果不用它,你得去找个能阻塞的消息队列。

下面是我的接收方的代码,使用BufferBlock.TryReceive 接收消息,然后处理,在这里可以开多个个线程去处理。

而且线程是它帮你管理的:

// --------------------------------------------------------------------------------------------------------------------// <copyright file="WordProcessBufferBlock.cs" company="yada">//   Copyright (c) yada Corporation. All rights reserved.// </copyright>// change by qugang 2015.4.18// 描述:用于截取单词的工作线程// --------------------------------------------------------------------------------------------------------------------using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using System.Threading.Tasks.Dataflow;namespace WordStatistics{    public class WordProcessBufferBlock    {        private int _taskCount = 1;        BufferBlock<WordStream> _buffer = null;        private List<Task<Dictionary<string, int>>> _list = new List<Task<Dictionary<string, int>>>();        /// <summary>        /// 单词处理类        /// </summary>        /// <param name="taskCount">工作线程数</param>        /// <param name="buffer">DataFlow的BufferBlock</param>        public WordProcessBufferBlock(int taskCount, BufferBlock<WordStream> buffer)        {            _taskCount = taskCount;            this._buffer = buffer;        }        public void StartWord()        {            for (int i = 0; i < _taskCount; i++)            {                _list.Add(Process());            }        }        /// <summary>        /// 等待所有工作完成        /// </summary>        /// <param name="f">完成后的工作函数</param>        public void WaitAll(Action<Dictionary<string,int>> f)        {            Task.WaitAll(_list.ToArray());            foreach (var row in _list)            {                f(row.Result);            }        }        /// <summary>        /// 使用BufferBlock.TryReceive循环从消息里取从FileBufferBlock发送的buffer        /// </summary>        /// <returns>工作结果</returns>        private async Task<Dictionary<string, int>> Process()        {            Dictionary<string, int> dic = new Dictionary<string, int>();            while (await _buffer.OutputAvailableAsync())            {                WordStream ws;                while (_buffer.TryReceive(out ws))                {                    foreach (string value in ws)                    {                        if (dic.ContainsKey(value))                        {                            dic[value]++;                        }                        else                        {                            dic.Add(value, 1);                        }                    }                }            }            return dic;        }    }}

WordStrem是我自己写的一个单词枚举流,继承了IEnumerable接口,将找单词的算法写到枚举器里面,实现流化。

// --------------------------------------------------------------------------------------------------------------------// <copyright file="WordStatistics.cs" company="yada">//   Copyright (c) yada Corporation. All rights reserved.// </copyright>// change by qugang 2015.4.18// 单词枚举器:算法从开始找字母,如果不是字母,则返回从pos 到end 的组成单词// --------------------------------------------------------------------------------------------------------------------using System;using System.Collections;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace WordStatistics{    /// <summary>    /// 单词枚举器    /// </summary>    public class WordStream : IEnumerable    {        private char[] buffer;        public WordStream(char[] buffer)        {            this.buffer = buffer;        }        IEnumerator IEnumerable.GetEnumerator()        {            return (IEnumerator)GetEnumerator();        }        public WordStreamEnum GetEnumerator()        {            return new WordStreamEnum(this.buffer);        }    }    public class WordStreamEnum : IEnumerator    {        private char[] buffer;        int pos = 0;        int endCount = 0;        int index = -1;        public WordStreamEnum(char[] buffer)        {            this.buffer = buffer;        }        public bool MoveNext()        {            while (index < buffer.Length - 1)            {                index++;                char buff = buffer[index];                if ((buff >= 'a' && buff <= 'z') || (buff >= 'A' && buff <= 'Z'))                {                    if (endCount == 0)                    {                        pos = index;                        endCount++;                    }                    else                    {                        endCount++;                    }                }                else                {                    if (endCount != 0)                        return true;                }                if (buff == '\0')                {                    return false;                }            }            return false;        }        public object Current        {            get            {                int tempInt = endCount;                endCount = 0;                return new string(buffer, pos, tempInt);            }        }        public void Reset()        {            index = -1;        }    }}

到这里就完成了,然后再Main函数里添加调用

  static void Main(string[] args)        {            DateTime dt = DateTime.Now;            var buffer = new BufferBlock<WordStream>();            //创建工作BufferBlock            WordProcessBufferBlock wb = new WordProcessBufferBlock(8, buffer);            wb.StartWord();            //创建读取文件,发送的BufferBlock            FileBufferBlock fb = new FileBufferBlock(buffer, @"D:\content.txt");            fb.ReadFile();            Dictionary<string,int> dic = new Dictionary<string,int>();            //等待工作完成汇总结果            wb.WaitAll(p =>                {                    foreach (var row in p)                    {                        if (!dic.ContainsKey(row.Key))                            dic.Add(row.Key, row.Value);                        else                        {                            dic[row.Key] += row.Value;                        }                    }                }                );            var myList = dic.ToList();            myList.Sort((p, v) => v.Value.CompareTo(p.Value));            foreach (var row in myList.Take(10))            {                Console.WriteLine(row);            }                        Console.WriteLine(DateTime.Now - dt);        }

最后2G的文件,我的机器跑出来是19秒多。

如果代码没有包,请从NuGet上下载Dataflow包。

代码下载:http://files.cnblogs.com/files/qugangf/WordStatistics.rar

2楼smodi
赞, 想想 10年左右去瑞星面试也是一道这样的题目。
Re: qg
@smodi,献丑了!
1楼土豆哥哥
料贴,学习了