python应用MapReduce统计词频

最早看到这篇文章是在这里正好每月工作需要统计用户的词频,传统方法是切词后存入dict然后计数,排序,一方面速度慢,另一方面耗的内存比较高。

分词

计算词频首先需要分词,这里采用的是hanlp,调用方法见这里简单来说,先安装JPype,再把对应的jar包放到任意位置,修改配置

1
2
startJVM(getDefaultJVMPath(), "-Djava.class.path=jar包位置;jar包所在文件夹", "-Xms1g", "-Xmx1g") # 启动JVM,Linux需替换分号;为冒号:
HanLP = JClass('com.hankcs.hanlp.HanLP')

另外切词后返回的是arrayList,需要转换成python用的list,如下:

1
2
3
4
5
6
7
def CovertJlistToPlist(jList):
ret = []
if jList is None:
return ret
for i in range(jList.size()):
ret.append(str(jList.get(i)))
return ret

另外如果文本比较大,可以考虑拆分成多个文件
在linux一条命令就搞定了。分隔成每个文件包含10w行的子文件,另外用mv改名

1
split **.txt -l 100000 -d -a 2 **_&&ls|grep **_|xargs -n1 -i{} mv {} {}.txt

统计词频

将代码汇总后如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# -*- coding:utf-8 -*-
# Filename: main.py
from jpype import *
from smr import SimpleMapReduce
import string
import multiprocessing
import codecs
import os
import glob
import time
from collections import defaultdict
result = defaultdict(int)
startJVM(getDefaultJVMPath(), r"-Djava.class.path=C:\hanlp\hanlp-1.3.2.jar;C:\hanlp", "-Xms1g", "-Xmx1g") # 启动JVM,Linux需替换分号;为冒号:
HanLP = JClass('com.hankcs.hanlp.HanLP')
chinese_punctuation = r"?,。、‘’“”~·!()./"
punctuation = chinese_punctuation + string.punctuation
#将JAVA的List转换成python的List
def CovertJlistToPlist(jList):
ret = []
if jList is None:
return ret
for i in range(jList.size()):
ret.append(str(jList.get(i)))
return ret
def file_to_words(filename):
"""Read a file and return a sequence of (word, occurances) values.
"""
#STOP_WORDS = [u"的", u"你"]
TR = str.maketrans(punctuation, ' ' * len(punctuation))
print(multiprocessing.current_process().name, 'reading', filename)
output = []
with codecs.open(filename, encoding='utf-8') as f:
for line in f:
line_list = line.strip().split('\t')
if len(line_list) != 4:
continue
*1, *2, *3, *4 = line_list
output.append( (question, 1) )
line = question.translate(TR)
segment = HanLP.segment(line)
lists = CovertJlistToPlist(segment)
for words in lists:
output.append( (word, 1) )
return output
def count_words(item):
"""Convert the partitioned data for a word to a
tuple containing the word and the number of occurances.
"""
word, occurances = item
return (word, sum(occurances))
if __name__ == '__main__':
import operator
start = time.time()
input_files = glob.glob('E:\\**_data\\*.txt')
mapper = SimpleMapReduce(file_to_words, count_words)
word_counts = mapper(input_files)
word_counts.sort(key=operator.itemgetter(1))
word_counts.reverse()
shutdownJVM()
print('\nTOP 2000 WORDS BY FREQUENCY\n')
top20 = word_counts[:2000]
longest = max(len(word) for word, count in top20)
for word, count in top20:
print('%-*s: %5s' % (longest+1, word, count))
#
end = time.time()
print("耗时:{}".format(str(end-start)))

速度确实很快,之前需要1~2个小时处理的数据现在只需要10分钟就搞定了。另外上面的代码没有写去除停用词以及计算tf-idf值。