python应用MapReduce统计词频

最早看到这篇文章是在这里正好每月工作需要统计用户的词频,传统方法是切词后存入dict然后计数,排序,一方面速度慢,另一方面耗的内存比较高。

分词

计算词频首先需要分词,这里采用的是hanlp,调用方法见这里简单来说,先安装JPype,再把对应的jar包放到任意位置,修改配置

1
2
startJVM(getDefaultJVMPath(), "-Djava.class.path=jar包位置;jar包所在文件夹", "-Xms1g", "-Xmx1g") # 启动JVM,Linux需替换分号;为冒号:
HanLP = JClass('com.hankcs.hanlp.HanLP')

另外切词后返回的是arrayList,需要转换成python用的list,如下:

1
2
3
4
5
6
7
def CovertJlistToPlist(jList):
ret = []
if jList is None:
return ret
for i in range(jList.size()):
ret.append(str(jList.get(i)))
return ret

另外如果文本比较大,可以考虑拆分成多个文件
在linux一条命令就搞定了。分隔成每个文件包含10w行的子文件,另外用mv改名

1
split **.txt -l 100000 -d -a 2 **_&&ls|grep **_|xargs -n1 -i{} mv {} {}.txt

统计词频

将代码汇总后如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# -*- coding:utf-8 -*-
# Filename: main.py
from jpype import *
from smr import SimpleMapReduce
import string
import multiprocessing
import codecs
import os
import glob
import time
from collections import defaultdict
result = defaultdict(int)
startJVM(getDefaultJVMPath(), r"-Djava.class.path=C:\hanlp\hanlp-1.3.2.jar;C:\hanlp", "-Xms1g", "-Xmx1g") # 启动JVM,Linux需替换分号;为冒号:
HanLP = JClass('com.hankcs.hanlp.HanLP')
chinese_punctuation = r"?,。、‘’“”~·!()./"
punctuation = chinese_punctuation + string.punctuation
#将JAVA的List转换成python的List
def CovertJlistToPlist(jList):
ret = []
if jList is None:
return ret
for i in range(jList.size()):
ret.append(str(jList.get(i)))
return ret
def file_to_words(filename):
"""Read a file and return a sequence of (word, occurances) values.
"""
#STOP_WORDS = [u"的", u"你"]
TR = str.maketrans(punctuation, ' ' * len(punctuation))
print(multiprocessing.current_process().name, 'reading', filename)
output = []
with codecs.open(filename, encoding='utf-8') as f:
for line in f:
line_list = line.strip().split('\t')
if len(line_list) != 4:
continue
*1, *2, *3, *4 = line_list
output.append( (question, 1) )
line = question.translate(TR)
segment = HanLP.segment(line)
lists = CovertJlistToPlist(segment)
for words in lists:
output.append( (word, 1) )
return output
def count_words(item):
"""Convert the partitioned data for a word to a
tuple containing the word and the number of occurances.
"""
word, occurances = item
return (word, sum(occurances))
if __name__ == '__main__':
import operator
start = time.time()
input_files = glob.glob('E:\\**_data\\*.txt')
mapper = SimpleMapReduce(file_to_words, count_words)
word_counts = mapper(input_files)
word_counts.sort(key=operator.itemgetter(1))
word_counts.reverse()
shutdownJVM()
print('\nTOP 2000 WORDS BY FREQUENCY\n')
top20 = word_counts[:2000]
longest = max(len(word) for word, count in top20)
for word, count in top20:
print('%-*s: %5s' % (longest+1, word, count))
#
end = time.time()
print("耗时:{}".format(str(end-start)))

速度确实很快,之前需要1~2个小时处理的数据现在只需要10分钟就搞定了。另外上面的代码没有写去除停用词以及计算tf-idf值。

数据分析周报模板

数据分析周报模板

之前给运营做了一版每周周报的模板,虽然后面被弃用了,但是自己还是挺满意的,所以记录一下,当然下面的所有数据都隐藏啦,标签也隐藏,只是分享做PPT的过程~~

首先当然是封面啦

作为一家高大上的机器人公司,当然要用高大上的机器人图片啦,各种谷歌以后找到了这张。
就决定是你啦,皮卡丘~
enter image description here
加一个透明度40%的深蓝色蒙版,齐活~
右上角的小红块是个人习惯~~貌似是跟经济学人学的。

关键指标概览

首先确认每周需要汇报的关键指标,汇总总量,对有异常的数据进行说明,用不同的字体和颜色标记。如下
enter image description here
这样基本上把这周的几个重点指标都说清楚了。

关键指标详细说明

后面都差不多,对各个关键指标按天进行详细说明。
enter image description here
个人最喜欢上面这张,颜色主体是公司的颜色,配色上https://coolors.co/这里配的,强烈安利一下。剩下的我就简单粘上来了。
enter image description here
enter image description here
enter image description here
enter image description here
enter image description here
enter image description here

最后多说几句

当然这个PPT还是有不足的地方,表现形式稍显单一,只有柱状图、折线图、饼图,但需要表示的问题还是说清楚了,可视化这块推荐用R,好看的图一大堆,而且是免费的,不像tableau,商业范,高冷范。

将图灵机器人api接入nao机器人

用nao自带的choregraphe新建项目,结构如下图所示:
enter image description here
简单来说,一共分为4步

  1. 启动nao自带的录音程序,拍打nao头部启动录音,并输出录音文件所在的路径。
  2. 因图灵机器人暂时只支持纯文本输入,需要将录音转换成文字。
  3. 将转换出来的文字调用图灵机器人api,输出回答。
  4. 根据不同回答配合不同肢体动作。

设置nao录音程序

启动choregraphe,在左下指令盒库搜索Record Sound,如下图:
enter image description here
输入输出均有4种基本类型:动态激活数字字符串
具体类型区别详见官方文档。
单击进入Record Sound修改Rec. Sound File如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import time
class MyClass(GeneratedClass):
def __init__(self):
GeneratedClass.__init__(self, False)
try:
self.ad = ALProxy("ALAudioDevice")
except Exception as e:
self.ad = None
self.logger.error(e)
self.leds = ALProxy("ALLeds")
self.player = ALProxy('ALAudioPlayer')
self.filepath = ""
def onLoad(self):
self.bIsRecording = False
self.bIsRunning = False
def onUnload(self):
self.bIsRunning = False
#用ftp把音频上传到'/home/nao/recordings/ringstones/'目录
if( self.bIsRecording ):
self.player.post.playFileFromPosition('/home/nao/recordings/ringstones/drip.wav', 0.00, 1.00, 0.00)
self.ad.stopMicrophonesRecording()
self.bIsRecording = False
def onInput_onStart(self, p):
if(self.bIsRunning):
return
self.bIsRunning = True
sGroup = "FaceLeds"
RGB = [0, 255, 51]
sExtension = self.toExtension( self.getParameter("Microphones used") )
self.filepath = p + sExtension
if self.ad:
self.leds.fadeRGB(sGroup, 256*256*RGB[0] + 256*RGB[1] + RGB[2], 0.5)
time.sleep(0.2)
self.ad.startMicrophonesRecording( self.filepath )
self.bIsRecording = True
else:
self.logger.warning("No sound recorded")
def onInput_onStop(self):
if( self.bIsRunning ):
self.onUnload()
self.onStopped(self.filepath)
def toExtension(self, sMicrophones):
if( sMicrophones == "Front head microphone only (.ogg)" ):
return ".ogg"
else:
return ".wav"

主要增加了录音开始以及nao机器人眼睛灯的提示。最后输出录音文件所在的位置。

对录音文件的处理

直接看代码吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import urllib2
import json
import base64
import httplib
class MyClass(GeneratedClass):
def __init__(self):
GeneratedClass.__init__(self)
def onLoad(self):
#put initialization code here
pass
def onUnload(self):
#put clean-up code here
pass
#下面的p就是录音文件所在的位置
def onInput_onStart(self, p):
#self.onStopped() #activate the output of the box
accessToken = self.getAccessToken()
speech_file = str(p)
self.logger.info(p)
try:
cmd = self.baidu_asr(speech_file, accessToken)
self.logger.info(cmd)
except Exception, e:
cmd = ''
self.logger.info(cmd)
res = self.request_tuling(cmd)
self.logger.info(res)
try:
response_dic = json.loads(res, encoding='utf-8')
answer = response_dic['text'].encode('utf-8', 'ignore')
except Exception,e:
self.logger.error(e)
answer = '对不起,没能听清你的话呢'
try:
response_dic = json.loads(res, encoding='utf-8')
special_value1 = response_dic['special_value1']
except Exception,e:
self.logger.error(e)
special_value1 = '0'
try:
response_dic = json.loads(res, encoding='utf-8')
special_value2 = response_dic['special_value2'].encode('utf-8', 'ignore')
except Exception,e:
self.logger.error(e)
special_value2 = ''
self.output([answer, special_value1, special_value2])
def onInput_onStop(self):
self.onUnload() #it is recommended to reuse the clean-up as the box is stopped
self.onStopped() #activate the output of the box
def getAccessToken(self):
ApiKey = 'your_baidu_speech_apikey'
SecretKey = 'your_secret_key'
auth_url = "https://openapi.baidu.com/oauth/2.0/token?grant_type=client_credentials&client_id=" + ApiKey + "&client_secret=" + SecretKey
res = urllib2.urlopen(auth_url)
json_data = res.read()
return json.loads(json_data)['access_token']
#请求百度语音识别接口,输入音频文件路径,输出语音识别出来的文本
def baidu_asr(self, speech_file, access_token):
asr_server = 'http://vop.baidu.com/server_api'
with open(speech_file, 'rb') as f:
speech_data = f.read()
speech_base64=base64.b64encode(speech_data).decode('utf-8')
speech_length=len(speech_data)
data_dict = {'format':'wav', 'rate':16000, 'channel':1, 'cuid':'005056c00008', 'token':access_token, 'lan':'zh', 'speech':speech_base64, 'len':speech_length}
json_data = json.dumps(data_dict)
json_length = len(json_data)
request = urllib2.Request(url=asr_server)
request.add_header("Content-Type", "application/json")
request.add_header("Content-Length", json_length)
fs = urllib2.urlopen(url=request, data=json_data)
result_str = fs.read()
json_resp = json.loads(result_str, encoding='utf-8')
self.logger.info('result_str ' + result_str)
return json_resp['result'][0].encode('utf-8', 'ignore')
#请求图灵接口
def request_tuling(self, cmd):
if not cmd:
return ''
requrl = '/openapi/api?key=your_tuling_apikey&info=%s' % cmd
try:
conn = httplib.HTTPConnection("www.tuling123.com", timeout=5)
conn.request(method="GET", url=requrl)
response = conn.getresponse()
res = response.read()
conn.close()
except Exception,e:
self.logger.error(e)
res = ''
return res

主要分为两步:

  1. 请求百度语音识别接口获取识别后的文字
  2. 将识别后的文字请求图灵接口,获取机器人回答

    以上代码包含对运动控制逻辑的处理,涉及隐私,故略去

根据不同回答不同处理

此处主要包含对运动指令(前后左右,跳舞)以及普通对话处理
简单来说包含运动指令关键词的执行运动控制的指令盒,即上面的controlBox,纯文本对话的执行SayWithBehavior指令盒,控制逻辑主要由decision控制。
涉及运动控制的涉及隐私故略去。下面主要说纯文本输出部分。即SayWithBehavior指令盒
结构如下图:
enter image description here
随机动作指令盒包含不同时长的动作信息,具体可以看nao的官方文档中时间轴部分的介绍。
最后回复初始状态(站立

遗留问题

  1. 每次需要拍打头部才能启动,没研究懂怎么根据声音强度,检测到说话自动调起录音程序
  2. 录音结束时间目前写死了。
  3. 反应比较慢,包括nao录音,语音识别过程,中间有失败没有迅速反馈,实际中会造成用户反复拍打进行识别。

Prophet初探

先给看个酷炫的图
enter image description here

安装

官方的安装教程在这里但是,按照官方的安装教程是安装不成功的说起来都是痛,
以下是正确的安装步骤(python3.5 64位环境):

  1. 安装C++编译器]Visual C++ Build Tools
  2. 安装pystan 推荐清华源(使用pip install pystan -i https://pypi.tuna.tsinghua.edu.cn/simple) 手打的,不保证对。
  3. 安装fbprophet,齐活。

快速使用

对原理有兴趣的,可以戳这里暂时我只把它当成一个黑箱子使用。
首先是数据,这里选用的是随机生成的web统计数据,首先我们加载数据:

1
2
3
4
5
6
7
8
# -*- encoding:utf-8 -*-
import pandas as pd
import numpy as np
from fbprophet import Prophet
import matplotlib.pyplot as plt
data_file = "data_trend.csv"
df = pd.read_csv(data_file, encoding="utf-8")
df.head()

enter image description here
主要有上面这几个字段。
我们想看看浏览次数随时间的变化情况
单独取出时段独立访客数据。另外我们的数据是倒序的,还需要按时间顺序回来,先画个图看看。

1
2
3
4
df_pv = df[["时段", "独立访客"]]
sort_df = df_pv.sort_values(by="时段")
sort_df.set_index("时段").plot()
plt.show()

enter image description here
能大体看出独立访客数随时间的变化趋势,但是这个图的图例显示不正常,在开头再加上这两行

1
2
plt.rcParams['font.sans-serif']=['SimHei'] #用来正常显示中文标签
plt.rcParams['axes.unicode_minus']=False

简单来说就是定义图的字体。

预测

prophet的API有点类似scikit-learn,常规流程,先fit数据,然后predict新的趋势。
首先我们修改column名称创建第一个预测模型

1
2
3
4
sort_df.columns = ["ds", "y"]
sort_df['y'] = np.log(sort_df['y'])
m1 = Prophet()
m1.fit(sort_df)

使用make_future_dataframe告诉prophet需要预测多久以后的数据,在这个例子中,我们选择预测未来一年的独立访客数。

1
2
3
futures1 = m1.make_future_dataframe(periods=365)
forecast1 = m1.predict(futures1)
forecast1[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()

forecast1 是dataframe格式的数据,如下:
enter image description here
Prophet在做预测的时候做了log处理,还原后数据如下:

1
2
3
4
5
6
7
8
9
print(np.exp(forecast1[['yhat', 'yhat_lower', 'yhat_upper']].tail()))
console:
yhat yhat_lower yhat_upper
630 10584.491634 212.320650 647084.467489
631 11077.641554 218.018125 671727.514255
632 11121.285237 219.600455 658814.011148
633 10834.162204 215.267186 660137.259109
634 10904.075380 201.501246 678710.749914

作图

1
m1.plot(forecast1)

enter image description here
略微有点崩溃,猜测原因是数据量太小,另外波动有异常,所以预测的最大最小值差距比较大。

1
m1.plot_components(forecast1)

enter image description here
能看出天还有星期的一个趋势。基本介绍就到这了,要继续研究的可以戳这里
Have Fun~~

使用gensim和SVM对THUCNews数据集进行分类

闲逛CSDN的时候看到的这个,正好之前也用过Gensim和SVM,拿来练练手,原版的代码写的太流水式了,所以考虑重构了一遍,Learning by doing嘛。

gensim是什么?

官方文档介绍的很清楚,主要用来做LDA和训练Word2Vec的,本文中是将文档训练成LSI向量,再运用SVM进行分类。

具体流程

整体结构其实很简单,分词 –> 计算词频 –> 计算tf-idf值 –> 计算LSI向量 –> 对LSI向量拆分训练集和测试集 –> 调用sklearn里的SVM对训练集进行训练 –> 用测试集进行测试。(上面每一步都需要把模型存储起来以后可直接调用)

首先,我们构造一个类,定义初始变量,主要是需要保存模型的路径,如下:

1
2
3
4
5
6
7
8
9
10
class classfication(object):
def __init__(self, path, sample):
self.path = path # Thucnews文档地址
self.sample = sample # 采样率
self.dictionary_path = "model/dictionary.dict"
self.tfIdfPath = "model/tfidf"
self.lsiModel = "model/fullLsi.model"
self.lsiPath = "model/lsi"
self.predictor = "model/predictor.model"
self.tag = os.listdir(self.path) # Thucnews文档标签

然后获取文档每一个类别里每一个文件的路径。

1
2
3
4
5
6
7
def _fullTagFile(self):
self.tagFile = {}
for tag in self.tag:
fullPath = os.path.join(self.path, tag)
fileName = glob.glob(os.path.join(fullPath, "*.txt"))
self.tagFile[tag] = fileName
return self.tagFile

然后是分词,分词这里用的是jieba分词,当然也可以用Hanlp进行分词,还需要对停用词(stop words)进行处理。下面的代码’\u3000’为全角空格,另外需要将所有文档单词分词后放入1个List中

1
2
3
4
5
6
7
8
9
10
11
def _segement(self, filepath):
words_list = []
# 暂时只去除标点
#stops_words = set(list(string.punctuation + "!,。'';·「」`~@#¥%&×()-+\\<>"))
stops_words = set([i.strip() for i in codecs.open("stop_words.txt", encoding="utf-8").readlines()])
with codecs.open(filepath, encoding="utf-8") as fp:
for line in fp:
line = line.replace("\u3000", "").replace("\n", "")
words_list.extend([i for i in jieba.cut(line, cut_all=False)
if i not in stops_words])
return words_list

下面的获取词典、TF-IDF、LSI向量方法类似,就只介绍一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def _getDictionary(self):
dictionary = corpora.Dictionary()
for tag in self.fullTagFile:
tagPath = self.fullTagFile[tag]
for i, filepath in enumerate(tagPath):
if i % self.sample == 0: # 采样率
word_list = self._segement(filepath)
dictionary.add_documents([word_list])
N += 1
if N % 1000 == 0:
print('{t} *** {i} \t docs has been dealed'
.format(i=N, t=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
small_freq_ids = [tokenid for tokenid, docfreq in dictionary.dfs.items() if docfreq < 5]
dictionary.filter_tokens(small_freq_ids)
dictionary.compactify()
dictionary.save(self.dictionary_path)
return dictionary

最后在train方法里汇总,上面都需要将中间结果进行保存,再后续使用可以直接从文件读取模型,节省时间,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def train(self):
self.fullTagFile = self._fullTagFile()
# 加载词典
if os.path.exists(self.dictionary_path):
dictionary = corpora.Dictionary.load(self.dictionary_path)
else:
dictionary = self._getDictionary()
# 加载tf-idf
tagTfidf = {}
if not os.path.exists(self.tfIdfPath):
tagTfidf = self._getTfIdf(dictionary)
else:
filePath = glob.glob(os.path.join(self.tfIdfPath, "*.mm"))
for file in filePath:
tag = os.path.split(file)[-1].split(".")[0]
tagTfidf[tag] = corpora.MmCorpus(file)
# 加载lsi-model,得到每一个文档的lsi向量
corpus_lsi = {}
if not os.path.exists(self.lsiPath):
corpus_lsi = self._getLsi(dictionary, tagTfidf)
else:
filePath = glob.glob(os.path.join(self.tfIdfPath, "*.mm"))
for file in filePath:
tag = os.path.split(file)[-1].split(".")[0]
corpus_lsi[tag] = corpora.MmCorpus(file)
# 完整的lsi模型,获得新句子的lsi向量
if os.path.exists(self.lsiModel):
with open(self.lsiModel, 'rb') as fp:
lsi_model = pickle.load(fp)
else:
corpus = []
for value in tagTfidf.values():
corpus.extend(value)
lsi_model = models.LsiModel(corpus=corpus, id2word=dictionary, num_topics=50)
# 加载分类器
if not os.path.exists(self.predictor):
predictor = self._getPredictor(corpus_lsi)
else:
with open(self.predictor, 'rb') as fp:
predictor = pickle.load(fp)
return predictor, dictionary, lsi_model

全部代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# -*- encoding:utf-8 -*-
import jieba
from gensim import corpora
from gensim import models
from scipy.sparse import csr_matrix
from sklearn import svm
from sklearn.model_selection import train_test_split
import os
import logging
import pickle
import codecs
import glob
from collections import defaultdict
import string
import datetime
logging.basicConfig(level=logging.WARNING,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
)
class classfication(object):
def __init__(self, path, sample):
self.path = path
self.sample = sample
self.dictionary_path = "model/dictionary.dict"
self.tfIdfPath = "model/tfidf"
self.lsiModel = "model/fullLsi.model"
self.lsiPath = "model/lsi"
self.predictor = "model/predictor.model"
self.tag = os.listdir(self.path)
def _fullTagFile(self):
self.tagFile = {}
for tag in self.tag:
fullPath = os.path.join(self.path, tag)
fileName = glob.glob(os.path.join(fullPath, "*.txt"))
self.tagFile[tag] = fileName
return self.tagFile
def _segement(self, filepath):
words_list = []
# 暂时只去除标点
#stops_words = set(list(string.punctuation + "!,。'';·「」`~@#¥%&×()-+\\<>"))
stops_words = set([i.strip() for i in codecs.open("stop_words.txt", encoding="utf-8").readlines()])
with codecs.open(filepath, encoding="utf-8") as fp:
for line in fp:
line = line.replace("\u3000", "").replace("\n", "")
words_list.extend([i for i in jieba.cut(line, cut_all=False)
if i not in stops_words])
return words_list
def _getDictionary(self):
globa N
dictionary = corpora.Dictionary()
for tag in self.fullTagFile:
tagPath = self.fullTagFile[tag]
for i, filepath in enumerate(tagPath):
if i % self.sample == 0:
word_list = self._segement(filepath)
dictionary.add_documents([word_list])
N += 1
if N % 1000 == 0:
print('{t} *** {i} \t docs has been dealed'
.format(i=N, t=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
small_freq_ids = [tokenid for tokenid, docfreq in dictionary.dfs.items() if docfreq < 5]
dictionary.filter_tokens(small_freq_ids)
dictionary.compactify()
dictionary.save(self.dictionary_path)
return dictionary
def _getTfIdf(self, dictionary):
global N
tagTfidf = defaultdict(list)
tfIdfModel = models.TfidfModel(dictionary=dictionary)
for tag in self.tagFile:
tagPath = self.tagFile[tag]
for i, filepath in enumerate(tagPath):
if i % self.sample == 0:
word_list = self._segement(filepath)
doc2bow = dictionary.doc2bow(word_list)
doc_tfidf = tfIdfModel[doc2bow]
tagTfidf[tag].append(doc_tfidf)
N += 1
if N % 1000 == 0:
print('{t} *** {i} \t docs has been dealed'
.format(i=N, t=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
if not os.path.isdir(self.tfIdfPath):
os.makedirs(self.tfIdfPath)
for tag in tagTfidf:
corpora.MmCorpus.serialize(os.path.join(self.tfIdfPath, "%s.mm" % tag),
tagTfidf.get(tag), id2word=dictionary)
return tagTfidf
def _getLsi(self, dictionary, tagTfidf):
corpus = []
for value in tagTfidf.values():
corpus.extend(value)
lsi_model = models.LsiModel(corpus=corpus, id2word=dictionary, num_topics=50)
with open(self.lsiModel, mode="wb") as fp:
pickle.dump(lsi_model, fp)
corpus_lsi = {}
if not os.path.isdir(self.lsiPath):
os.makedirs(self.lsiPath)
for tag in tagTfidf:
corpus = [lsi_model[doc] for doc in tagTfidf.get(tag)]
corpus_lsi[tag] = corpus
corpora.MmCorpus.serialize(os.path.join(self.lsiPath, '%s.mm' % tag),
corpus, id2word=dictionary)
return corpus_lsi
def _getPredictor(self, corpus_lsi):
corpus_lsi_total = []
tag_list_all = []
for index, tag in enumerate(self.tag):
temp = corpus_lsi[tag]
corpus_lsi_total.extend(temp)
tag_list_all.extend([index] * len(temp))
# 这里需要注意,将gensim里矩阵转化成numpy里所用的
lsi_matrix = self._csr_matrix(corpus_lsi_total)
x_train, x_test, y_train, y_test = train_test_split(lsi_matrix, tag_list_all, test_size=0.2, random_state=422)
clf = svm.LinearSVC()
clf_res = clf.fit(x_train, y_train)
# 测试集的正确率并返回训练后的模型
x_test_pred = clf_res.predict(x_test)
accuracy = sum([1 for i, j in zip(x_test_pred, y_test) if i == j]) / len(x_test)
print('=== 分类训练完毕,分类结果如下 ===')
print('测试集正确率: {e}'.format(e=accuracy))
with open(self.predictor, "wb") as fp:
pickle.dump(clf_res, fp)
return clf_res
def _csr_matrix(self, corpus_lsi, type="train"):
data = []
rows = []
columns = []
line_count = 0
if type == "train":
for line in corpus_lsi:
for elem in line:
rows.append(line_count)
columns.append(elem[0])
data.append(elem[1])
line_count += 1
lsi_array = csr_matrix((data, (rows, columns))).toarray()
elif type == "test":
for item in corpus_lsi:
data.append(item[1])
columns.append(item[0])
rows.append(0)
lsi_array = csr_matrix((data, (rows, columns))).toarray()
return lsi_array
def train(self):
self.fullTagFile = self._fullTagFile()
# 加载词典
if os.path.exists(self.dictionary_path):
dictionary = corpora.Dictionary.load(self.dictionary_path)
else:
dictionary = self._getDictionary()
# 加载tf-idf
tagTfidf = {}
if not os.path.exists(self.tfIdfPath):
tagTfidf = self._getTfIdf(dictionary)
else:
filePath = glob.glob(os.path.join(self.tfIdfPath, "*.mm"))
for file in filePath:
tag = os.path.split(file)[-1].split(".")[0]
tagTfidf[tag] = corpora.MmCorpus(file)
# 加载lsi-model
corpus_lsi = {}
if not os.path.exists(self.lsiPath):
corpus_lsi = self._getLsi(dictionary, tagTfidf)
else:
filePath = glob.glob(os.path.join(self.tfIdfPath, "*.mm"))
for file in filePath:
tag = os.path.split(file)[-1].split(".")[0]
corpus_lsi[tag] = corpora.MmCorpus(file)
# 完整的lsi模型
if os.path.exists(self.lsiModel):
with open(self.lsiModel, 'rb') as fp:
lsi_model = pickle.load(fp)
else:
corpus = []
for value in tagTfidf.values():
corpus.extend(value)
lsi_model = models.LsiModel(corpus=corpus, id2word=dictionary, num_topics=50)
# 加载分类器
if not os.path.exists(self.predictor):
predictor = self._getPredictor(corpus_lsi)
else:
with open(self.predictor, 'rb') as fp:
predictor = pickle.load(fp)
return predictor, dictionary, lsi_model
def predict(self, sentences):
predictor, dictionary, lsi_model = self.train()
demo_doc = list(jieba.cut(sentences, cut_all=False))
demo_bow = dictionary.doc2bow(demo_doc)
tfidf_model = models.TfidfModel(dictionary=dictionary)
demo_tfidf = tfidf_model[demo_bow]
demo_lsi = lsi_model[demo_tfidf]
demo_matrix = self._csr_matrix(demo_lsi, type="test")
x = predictor.predict(demo_matrix)
print(self.tag[x[0]])
if __name__ == '__main__':
train = classfication(r"E:\迅雷下载\THUCNews\THUCNews", sample=10)
#train = classfication("THUCNews", sample=1)
test = train.predict(''' 股价现在怎么样了 ''')

抽样10%的文档,最后结果在87%左右,只能说差强人意。
另外局限性也很大,对短句的判断能力很差,不过这个和训练样本有关,下次试试对短文本的聚类效果。

ubuntu下用Nginx和uwsgi部署flask项目

之前简单写过一个接口,下面整理一下,如何配置的过程,算是自己的一个总结。

服务器的环境配置

这里选用的是阿里云的服务器,自带了python2.7的环境,也可以自己安装其他版本,最后做一个软链接到/usr/bin目录下。
这里主要说一下nginx的安装。
简单写了个自动安装脚本,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#!/bin/bash
apt-get purge pcre*
apt-get remove pcre*
apt-get autoremove
wget http://heanet.dl.sourceforge.net/project/pcre/pcre/8.39/pcre-8.39.tar.gz
tar zxvf pcre-8.39.tar.gz
cd pcre-8.39
./configure
make && make install
cd ..
wget http://zlib.net/zlib-1.2.8.tar.gz
tar zxvf zlib-1.2.8.tar.gz
cd zlib-1.2.8
./configure
make && make install
cd ..
wget https://www.openssl.org/source/openssl-1.0.2h.tar.gz
tar zxvf openssl-1.0.2h.tar.gz
cd openssl-1.0.2h
./config
make && make install
cd ..
wget http://luajit.org/download/LuaJIT-2.0.4.tar.gz
tar zxvf LuaJIT-2.0.4.tar.gz
cd LuaJIT-2.0.4
make
make install PREFIX=/usr/local/luajit
export LUAJIT_LIB=/usr/local/luajit/lib
export LUAJIT_INC=/usr/local/luajit/include/luajit-2.0
export LD_LIBRARY_PATH=/usr/local/luajit/lib:$LD_LIBRARY_PATH
cd ..
wget https://github.com/simpl/ngx_devel_kit/archive/v0.3.0.tar.gz
wget https://github.com/openresty/lua-nginx-module/archive/v0.10.6.tar.gz
tar -zxvf v0.3.0.tar.gz
tar -zxvf v0.10.6.tar.gz
wget http://nginx.org/download/nginx-1.10.1.tar.gz
tar zxvf nginx-1.10.1.tar.gz
cd nginx-1.10.1
./configure --prefix=/usr/local/nginx \
--with-http_realip_module --with-http_sub_module --with-http_gzip_static_module \
--with-http_stub_status_module --with-http_addition_module --with-http_ssl_module \
--with-pcre=/usr/local/src/pcre-8.39 --with-zlib=/usr/local/src/zlib-1.2.8 \
--with-openssl=/usr/local/src/openssl-1.0.2h \
--add-module=/usr/local/src/ngx_devel_kit-0.3.0 --add-module=/usr/local/src/lua-nginx-module-0.10.6
make && make install

这里安装的是带lua模块的Nginx 1.10.1版本
另外写了一个启动脚本,丢在/etc/init.d目录下,记得 chmod +x, 以下是启动脚本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/bin/sh
# chkconfig: 345 86 14
# description: Startup and shutdown script for nginx
NGINX_DIR=/usr/local/nginx
export NGINX_DIR
case $1 in
'start' )
echo "Starting nginx..."
$NGINX_DIR/sbin/nginx
;;
'reload' )
echo "Reload nginx configuration..."
kill -HUP `cat $NGINX_DIR/logs/nginx.pid`
;;
'stop' )
echo "Stopping nginx..."
kill -15 `cat $NGINX_DIR/logs/nginx.pid`
;;
'list' )
ps aux | egrep '(PID|nginx)'
;;
'testconfig' )
$NGINX_DIR/sbin/nginx -t
;;
*)
echo "usage: `basename $0` {start|reload|stop|list|testconfig}"
esac

以后可以用service nginx start启动,用service nginx reload重新加载配置文件。
服务器基本环境算是完事了。

上传项目代码

在上传之前记得pip -freeze requirement.txt导出用的第三方python包。这里可以直接用rz上传或者用ftp上传,用ftp需要安装vsftpd服务,apt-get安装一下,然后修改一下配置文件就可以了。
在服务器端还需要安装virtualenv环境,使用

1
pip install virtualenv

然后新建项目目录,在目录下键入virtualenv venv,就会帮你自动安装好虚拟环境的一些包文件,用source venv/bin/activate 来启动虚拟环境,在虚拟环境中用

1
pip install -r requirement.txt

安装flask需要的包,requirement是前面导出的。

Nginx配置

首先安装UWSGI这里需要说明一下,其实不用nginx也可以,只不过nginx可以做负载均衡,然后修改端口啥的比较方便,用pip install uwsgi来安装。安装完后在项目目录下新建config.ini文件,打开,修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[uwsgi]
# uwsgi 启动时所使用的地址与端口
socket = 127.0.0.1:8888
# 指向网站目录
chdir = xxxxx/xxx/xx
# python 启动程序文件
wsgi-file = manage.py
# python 程序内用以启动的 application 变量名
callable = app
# 处理器数
processes = 4
# 线程数
threads = 2
#状态检测地址
stats = 127.0.0.1:9191
~

保存以后,通过uwsgi config.ini来启动服务。
接着配置nginx,默认的配置目录在/usr/local/nginx/conf下,打开
nginx.conf,增加一个server如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
server {
listen 8600;
server_name localhost; #公网地址
access_log /mnt/nginx/logs/blog_demo.log;
location / {
include uwsgi_params;
uwsgi_pass 127.0.0.1:8888; # 指向uwsgi 所应用的内部地址,所有请求将转发给uwsgi 处理
uwsgi_param UWSGI_PYHOME xxxxx; # 指向虚拟环境目录
uwsgi_param UWSGI_CHDIR xxxxx; # 指向网站根目录
uwsgi_param UWSGI_SCRIPT manage:app; # 指定启动程序
}
}

然后service nginx reload就可以了,访问服务器IP + 端口试试。域名啥的如果有可以在server_name配置。
就酱紫~~~

利用requests模块模拟登录百度并请求百小度

因为兴趣想测试一下百小度,想弄一些常见的问题测试一下百小度回答的质量怎么样,所以想到用python的requests模块模拟登录请求,先说一下我在做的时候遇到的三个问题.

1.获取token值
2.在登录的表单里面发现密码是通过加密的
3.登录表单里哪些字段是必须的哪些非必须说实话我还是没弄明白

首先我们打开Fiddler,如果和我一样是用chrome而且用了代理插件的,先把插件暂时设置为系统代理
接着我们打开百度首页,登录自己的帐号,接着我们会看到Fiddler已经抓取到了很多信息了。
enter image description here
我们发现获取token的地址如下
'https://passport.baidu.com/v2/api/?getapi&tpl=pp&apiver=v3&class=login'
然后我们直接用requests请求一下这个地址。至于为什么要用requests,大概是因为urllib,urllib2太多函数了记不住,很麻烦的样子,而且在python3.0以后整合到了一起
requests的官方文档在这.

1
2
3
4
5
6
7
8
9
10
11
import requests
header_base = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.134 Safari/537.36',
'Content-Type': 'application/x-www-form-urlencoded',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.4'
}
def _get_token():
ret = requests.get('https://passport.baidu.com/v2/api/?getapi&tpl=pp&apiver=v3&class=login', headers=header_base).content
print ret

然后运行代码,发现返回

1
2
3
4
D:\Python27\python.exe E:/pythonPractice/RequestTest.py
{"errInfo":{ "no": "0" }, "data": { "rememberedUserName" : "", "codeString" : "", "token" : "the fisrt two args should be string type:0,1!", "cookie" : "0", "usernametype":"", "spLogin" : "rate", "disable":"", "loginrecord":{ 'email':[ ], 'phone':[ ] } }}
Process finished with exit code 0

我们把获取token的地址在浏览器打开,发现返回的是

1
{"errInfo":{ "no": "0" }, "data": { "rememberedUserName" : "", "codeString" : "", "token" : "28563c348ee57b720560d7d278337d80", "cookie" : "1", "usernametype":"", "spLogin" : "rate", "disable":"", "loginrecord":{ 'email':[ ], 'phone':[ ] } }}

为什么不一样呢
答案是cookies,你重新开一个隐身窗口打开那个地址会发现获得和requests请求一样的结果.
所以我们先访问一下百度的首页,然后获取cookies,再访问那个获取token的地址
代码如下

1
2
3
4
5
6
7
def _get_token():
s = requests.session()
s.get('http://www.baidu.com')
ret = s.get('https://passport.baidu.com/v2/api/?getapi&tpl=pp&apiver=v3&class=login').content
token = re.search('"token" : "(?P<tokenVal>.*?)"', ret)
token_final = token.group('tokenVal')
return ret

结果成功获取到了token值,接着我们来看看登录表单里面还需要哪些值
enter image description here
还有个rsakey,另外我们发现密码被加密过了,我们接着去找一下获取pubkey的地址在哪
enter image description here
这下任务明晰了,我们接着去获取一下pubkey值,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
def _get_publickey():
postData = {
'token': token,
'tpl':'mn',
'apiver':'v3',
'tt':'1440514499019',
'gid':'06019B9-F329-42A0-B64E-F1825D5F578E',
'callback':'bd__cbs__rl713j'
}
content = requests.get('https://passport.baidu.com/v2/getpublickey', params=postData).content
jdata = json.loads(content.replace('\'','"').replace('bd__cbs__rl713j(','').replace('})','}'))
return (jdata['pubkey'], jdata['key'])

运行代码如下:

1
2
3
4
5
6
7
8
9
10
11
D:\Python27\python.exe E:/pythonPractice/RequestTest.py
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCyTKKeJYXAY3/5c2VuFBIGED6S
32zA8Tx6jPx+Z+ZwtCMkPAGAGzP36b5yQ1Ymq8ad7Ntrfw/IpqCj9qW/N6gpjlfD
S0juemauXasTYfFJ+1fNFxjc9n5XFET7tV6V5EDEcCw3EC03pVzQS+95uF+WXj/4
P2YOoL0qRfkltKmoNwIDAQAB
-----END PUBLIC KEY-----
xjhFm4PqjPLGPpMjxCU3xSU47YTv2LQd
Process finished with exit code 0

熟悉rsa的人应该看出来了我们获取的是pem格式的公钥
我们引入rsa模块进行解密.
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def getPUBLICKey(password,token):
s = requests.session()
s.get('http://www.baidu.com')
raw_content = s.get(token_url, headers=header_base).content
token = re.search('"token" : "(?P<tokenVal>.*?)"', raw_content)
token_final = token.group('tokenVal')
postData = {
'token': token,
'tpl':'mn',
'apiver':'v3',
'tt':'1440514499019',
'gid':'06019B9-F329-42A0-B64E-F1825D5F578E',
'callback':'bd__cbs__rl713j'
}
content = s.get('https://passport.baidu.com/v2/getpublickey', params=postData).content
jdata = json.loads(content.replace('\'','"').replace('bd__cbs__rl713j(','').replace('})','}'))
pubkey, rsakey = (jdata['pubkey'], jdata['key'])
key = rsa.PublicKey.load_pkcs1_openssl_pem(pubkey)
print key
password_rsaed = base64.b64encode(rsa.encrypt(password, key))
return password_rsaed, rsakey

万事俱备只欠东风,现在可以登录百度了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _login(token, username, password):
password_rsaed, rsakey = _get_publickey(password)
code_string = ''
login_data = {'staticpage': 'http://www.baidu.com/cache/user/html/v3Jump.html',
'charset': 'UTF-8',
'token': self.user['token'],
'tpl': 'pp',
'subpro': '',
'apiver': 'v3',
'tt': str(int(time.time())),
'codestring': code_string,
'isPhone': 'false',
'safeflg': '0',
'u': 'https://passport.baidu.com/',
'quick_user': '0',
'logLoginType': 'pc_loginBasic',
'loginmerge': 'true',
'logintype': 'basicLogin',
'username': username,
'password': password_rsaed,
'mem_pass': 'on',
'rsakey': str(rsakey),
'crypttype': 12,
'ppui_logintime': '50918',
'callback': 'parent.bd__pcbs__oa36qm'}
#Jpost_data = json.dumps(post_data)
self.session.post('https://passport.baidu.com/v2/api/?login', data=login_data, headers=header_base)

登录以后可以get一下百度的首页然后搜索一下有没有登陆后的用户名就可以确认是否真的登录了,另外因为cookies的关系强烈建议创建一个session会话,会自动保存cookies,不需要手动加载,另外登录以后最好手动保存一下cookies文件,防止频繁登录被ban或者要求验证码,验证码因为没碰到所以暂时没去研究.
接下来同样的步骤去抓百小度的包获取请求地址以及登录表单就可以了.

1
2
3
4
5
6
7
8
9
10
11
12
def _getXiaodu(cmd):
load_data = {
'sample_name': 'bear_brain',
'request_query': cmd,
'bear_type': 2,
'request_time': str(int(time.time()*1000)),
'callback': 'jQuery110206030894953291863_1440604236835',
'_': 1440604236840
}
r = s.get('https://sp0.baidu.com/yLsHczq6KgQFm2e88IuM_a/s', params=load_data, headers=header_base).content
return r

运行以后的结果如下

1
2
3
4
D:\Python27\python.exe E:/pythonPractice/RequestTest.py
/**/jQuery110206030894953291863_1440604236835({"status": 0, "result_list": [{"source_type": "talk_service", "service_id": "", "result_confidence": 100, "result_type": "txt", "result_content": "{\"answer\":\"(抠鼻)我妹不在\",\"question\":\"\"}\n"}, {"source_type": "bear_grown", "result_confidence": 100, "result_type": "txt", "result_content": "{\"answer\":\"主人,你的陪伴让我又长大了一点,亲密度+1!小度越来越稀罕你啦!\"}"}], "request_uid": "117192806", "request_time": "1440836860993", "tips_num": 0, "se_query": "", "emotion": 0, "level_info": "{ \"level\": 5, \"new_equipments\": \"1\" }"})
Process finished with exit code 0

我问的是”你妹啊”,百小度还挺有意思的
返回的是一个json的字符串,处理一下就可以,直接打印的话可以用pprint .
以上.

利用Scrapy下载WallHeaven的图片

稍微试了一下怎么使用scrapy,确实非常方便.

Scrapy的安装

安装指南可以戳这里因为安装太久了所以也忘了有啥需要的地方

开始用scrapy创建项目

在磁盘的任意位置(最好还是先新建一个文件夹)按住shift,鼠标右键打开CMD,输入以下命令

1
scrapy startproject wallheaven_download

该命令会创建包含下列内容的

1
2
3
4
5
6
7
8
9
wallheaven_download/
scrapy.cfg
tutorial/
__init__.py
items.py
pipelines.py
settings.py
spiders/
__init__.py

这些文件分别是:

scrapy.cfg: 项目的配置文件
tutorial/: 该项目的python模块。之后您将在此加入代码。
tutorial/items.py: 项目中的item文件.
tutorial/pipelines.py: 项目中的pipelines文件.
tutorial/settings.py: 项目的设置文件.
tutorial/spiders/: 放置spider代码的目录.

定义Item

Item是保存爬取到数据的一个容易,在抓wallheaven这个项目里,我们需要保存的信息都储存在里面,主要是图片的下载地址还有在本地保存的地址.
我们编辑wallheaven文件夹里的items.py

1
2
3
4
5
6
7
8
9
import scrapy
class WallheavenDownloadItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
image_urls = scrapy.Field()
images = scrapy.Field()
image_paths = scrapy.Field()

编写第一个爬虫

编辑wallheaven文件夹里的wallheaven_spider.py
一个正常的spider需要包含以下三个属性:

  • name: 用于区别Spider。 该名字必须是唯一的,您不可以为不同 的Spider设定相同的名字。
  • start_urls: 包含了Spider在启动时进行爬取的url列表。 因此,第一个被获取到的页面将是其中之一。 后续的URL则从初始的URL获取到的数据中提取。
  • parse() 是spider的一个方法。 被调用时,每个初始URL完成下载后生成的 Response 对象将会作为唯一的参数传递给该函数。 该方法负责解析返回的数据(response data),提取数据(生成item)以及生成需要进一步处理的URL的 Request 对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class WallHeaven(Spider):
name = 'wallheaven_image' #spider的名字
allowed_domains = ['wallhaven.cc']
start_urls = ['http://alpha.wallhaven.cc/random?page=1'] # 需要爬取的原始页面
for i in range(2, 5288):
new_url = 'http://alpha.wallhaven.cc/random?page=' + str(i)
start_urls.append(new_url) # 生成包含所有需要下载的URL的列表
def parse(self, response):
req = []
hxs = Selector(response)
page_urls = hxs.xpath('//div[@id="thumbs"]//ul/li/figure/a/@href').extract() # 提取抓取页面的所有的image的链接,可以用chrome F12模式,找到图片,右键复制XPATH
for url in page_urls:
r = Request(url, callback=self.parse_image_page)
req.append(r)
return req
def parse_image_page(self, response):
hxs = Selector(response)
download_link = hxs.xpath('//img[@id="wallpaper"]/@src').extract() #打开某一张图片所在页面获取下载地址
# 特别注意一下download_link返回的是一个list
for link in download_link:
real_download_link = re.split(r'//', link)[1]
postfix = re.split(r'/', link)[-1]
Item = WallheavenDownloadItem() #将获取到的下载地址存到ITEM里面,item结构类似于字典
Item['image_urls'] = ["http://" + real_download_link]
return Item

修改配置文件settings.py

1
2
3
4
5
BOT_NAME = 'wallheaven_download'
SPIDER_MODULES = ['wallheaven_download.spiders']
NEWSPIDER_MODULE = 'wallheaven_download.spiders'
ITEM_PIPELINES = {'wallheaven_download.pipelines.MyImagesPipeline': 1} # 处理图片下载的管道,将对应的权值调大
IMAGES_STORE = './image'

下载图片

Scrapy提供了一个Item pipeline用来下载这个项目的图片

这条管道,被称作图片管道,在 ImagesPipeline 类中实现,提供了一个方便并具有额外特性的方法,来下载并本地存储图片:
将所有下载的图片转换成通用的格式(JPG)和模式(RGB)
避免重新下载最近已经下载过的图片
缩略图生成
检测图像的宽/高,确保它们满足最小限制

在我们这个项目里没有下载缩略图,壁纸当然要下载原始的

使用图片管道

当使用 ImagesPipeline ,典型的工作流程如下所示:

在一个爬虫里,你抓取一个项目,把其中图片的URL放入 image_urls 组内,就是我们在spider里面做的
项目从爬虫内返回,进入项目管道。
当项目进入 ImagesPipeline,image_urls 组内的URLs将被Scrapy的调度器和下载器(这意味着调度器和下载器的中间件可以复用)安排下载,当优先级更高,会在其他页面被抓取前处理。项目会在这个特定的管道阶段保持“locker”的状态,直到完成图片的下载(或者由于某些原因未完成下载)。
当图片下载完,另一个组(images)将被更新到结构中。这个组将包含一个字典列表,其中包括下载图片的信息,比如下载路径、源抓取地址(从 image_urls 组获得)和图片的校验码。 images 列表中的图片顺序将和源 image_urls 组保持一致。如果某个图片下载失败,将会记录下错误信息,图片也不会出现在 images 组中。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import scrapy
from scrapy.contrib.pipeline.images import ImagesPipeline
from scrapy.exceptions import DropItem
class MyImagesPipeline(ImagesPipeline):
def file_path(self, request, response=None, info=None):
image_guid = request.url.split('/')[-1]
return 'full/%s' % (image_guid) #生成下载图片的名字
def get_media_requests(self, item, info):
for image_url in item['image_urls']:
yield scrapy.Request(image_url)
def item_completed(self, results, item, info):
image_paths = [x['path'] for ok, x in results if ok]
if not image_paths:
raise DropItem("Item contains no images")
item['image_paths'] = image_paths
return item
class WallheavenDownloadPipeline(object):
def process_item(self, item, spider):
return item

最后在根目录输入

1
scrapy crawl wallheaven_image

就可以了
在目录下会新建一个full文件夹,图片会源源不断的下载进来
不过有一个问题,停不下来,接不上去,我还没想好怎么去解决,sigh

机器学习之用Python从零实现贝叶斯分类器

原帖在这里,好久没看到这么清晰的机器学习的教程,所以自己也来试试了,选取的数据是Iris的数据,结果准确率很高,平均在94%左右.如果没有特别声明,一下属性一般指的是植物的宽度之类的数据,种类指的是不同种类的植物.

关于朴素贝叶斯

朴素贝叶斯算法是一个直观的方法,使用每个属性归属于某个类的概率来做预测。你可以使用这种监督性学习方法,对一个预测性建模问题进行概率建模。
给定一个类,朴素贝叶斯假设每个属性归属于此类的概率独立于其余所有属性,从而简化了概率的计算。这种强假定产生了一个快速、有效的方法。
给定一个属性值,其属于某个类的概率叫做条件概率。对于一个给定的类值,将每个属性的条件概率相乘,便得到一个数据样本属于某个类的概率。
我们可以通过计算样本归属于每个类的概率,然后选择具有最高概率的类来做预测。
通常,我们使用分类数据来描述朴素贝叶斯,因为这样容易通过比率来描述、计算。一个符合我们目的、比较有用的算法需要支持数值属性,同时假设每一个数值属性服从正态分布(分布在一个钟形曲线上),这又是一个强假设,但是依然能够给出一个健壮的结果。

根据花萼和花瓣的长度和宽度预测它的属性

本文使用的测试问题是”安德森鸢尾花卉”的属性问题,IRIS数据集一共包括150个数据集,分为三类,每类50个数据,每个数据包含四个属性,分别是花萼和花瓣的长度和宽度,最后1条记录是属性值,0,1,2分别代表三种不同的植物
下面是iris数据集的一个样本

1
2
3
4
5
5.1,3.5,1.4,0.2,0
4.9,3.0,1.4,0.2,0
4.7,3.2,1.3,0.2,0
4.6,3.1,1.5,0.2,0
5.0,3.6,1.4,0.2,0

基本做法和链接的那篇文章一样

朴素贝叶斯算法教程

教程分为如下几步:
1.处理数据:从CSV文件中载入数据,然后划分为训练集和测试集。
2.提取数据特征:提取训练数据集的属性特征,以便我们计算概率并做出预测。
3.单一预测:使用数据集的特征生成单个预测。
4.多重预测:基于给定测试数据集和一个已提取特征的训练数据集生成预测。
5.评估精度:评估对于测试数据集的预测精度作为预测正确率。
6.合并代码:使用所有代码呈现一个完整的、独立的朴素贝叶斯算法的实现。

1.处理数据

1
2
3
4
5
6
7
8
9
10
11
def load_data(filename, ratio=0.67):
data_set = []
train_set = []
fp = open(filename, "r")
for line in fp:
data_set.append([float(x) for x in line.replace("\n", "").split(',')])
limit = int(len(data_set) * ratio)
while len(train_set) < limit:
index = random.randrange(len(data_set))
train_set.append(data_set.pop(index))
return data_set, train_set

当然也可以在read的时候取随机

1
2
3
4
5
6
7
8
9
10
11
12
def load_data(filename, ratio=0.67):
test_set = []
train_set = []
fp = open(filename, "r")
for line in fp:
flag = random.randrange(1,101)
if flag in range(1, int(ratio * 100) + 1):
train_set.append([float(x) for x in line.replace("\n", "").split(',')])
else:
test_set.append([float(x) for x in line.replace("\n", "").split(',')])
print len(train_set), len(test_set)
return test_set, train_set

再按类别生成key-value对应的数据

1
2
3
4
5
6
def separateByClass(dataset):
result = defaultdict(list)
for i in range(len(dataset)):
classType = int(dataset[i][-1])
result[classType].append(dataset[i][:-1])
return result

计算均值还有方差以及概率密度函数

1
2
3
4
5
6
7
8
9
10
def mean(numbers):
return sum(numbers) / float(len(numbers))
def stdev(numbers):
return math.sqrt(sum([pow(x - mean(numbers), 2) for x in numbers]) / float(len(numbers) - 1))
# 生成高斯函数并计算某一个值的概率
def calNumberProbability(mean, std, y):
exponent = math.exp(-(math.pow(y - mean, 2)/(2*math.pow(std, 2))))
return (1 / (math.sqrt(2*math.pi) * std)) * exponent

训练模型,生成每个属性对应的平均值还有标准差

1
2
3
4
5
6
def createMeanStdPair(result):
summarise = defaultdict(list)
for key in result:
for type in zip(*result[key]):
summarise[key].append([mean(type), stdev(type)])
return summarise

预测

准备工作都已经做完了,开始正事了,首先要说明,上面的步骤应该用训练数据去培训,下面用测试数据去生成结果.

1
2
3
4
5
6
7
8
9
10
11
def labelOfVector(summarise, input_vector):
summarise_prob = {}
for key in summarise:
#需要把初始概率写在循环里面,因为每个类别初始的概率都是1
init_prob = 1
for i in range(len(summarise[key])):
mean, std = summarise[key][i]
probability = calNumberProbability(mean, std, input_vector[i])
init_prob *= probability
summarise_prob[key] = init_prob
return sorted(summarise_prob.items(),key=lambda x:x[1])[-1][0]

计算精度

1
2
3
4
5
6
7
8
9
10
11
def main(summarise, test_set):
# 初始正确的数目
count = 0
sum_vector = 0
for key in test_set:
sum_vector += len(test_set[key])
for vector in test_set[key]:
label = labelOfVector(summarise, vector)
if label == key:
count += 1
print "测试数据一共{0}条,正确率为{1}%".format(sum_vector, (count / float(sum_vector)) * 100)

运行程序,得到结果

1
测试数据一共48条,正确率为95.8333333333%

不同种类的植物的区分度很大,且各个属性之间可看作是独立的.

下面我们来看看用现成的scikit-learn包怎么做.官方文档
测试的问题是“皮马印第安人糖尿病问题”。

这个问题包括768个对于皮马印第安患者的医疗观测细节,记录所描述的瞬时测量取自诸如患者的年纪,怀孕和血液检查的次数。所有患者都是21岁以上(含21岁)的女性,所有属性都是数值型,而且属性的单位各不相同。
每一个记录归属于一个类,这个类指明以测量时间为止,患者是否是在5年之内感染的糖尿病。如果是,则为1,否则为0。
机器学习文献中已经多次研究了这个标准数据集,好的预测精度为70%-76%。下面是pima-indians.data.csv文件中的一个样本,了解一下我们将要使用的数据。

1
2
3
4
5
6,148,72,35,0,33.6,0.627,50,1
1,85,66,29,0,26.6,0.351,31,0
8,183,64,0,0,23.3,0.672,32,1
1,89,66,23,94,28.1,0.167,21,0
0,137,40,35,168,43.1,2.288,33,1

基本方式和上面类似,我们也用高斯,不知道翻译成什么,直接用Gaussian Naive Bayes代替吧

1.处理数据

这里要注意数据都要处理成array格式的
步骤和上面的类似,先把初始数据处理成train_datatest_data.有两点不同需要注意一下.

  1. 将测量数据和类别数据分开
  2. 将数据用numpy处理成array格式的
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    def load_data(filename, ratio=0.67):
    test_set = []
    train_set = []
    train_features = []
    test_features = []
    fp = open(filename, "r")
    for line in fp:
    flag = random.randrange(1,101)
    if flag in range(1, int(ratio * 100) + 1):
    train_set.append([float(x) for x in line.replace("\n", "").split(',')[:-1]])
    train_features.append(int(line.replace("\n", "").split(',')[-1]))
    else:
    test_set.append([float(x) for x in line.replace("\n", "").split(',')[:-1]])
    test_features.append(int(line.replace("\n", "").split(',')[-1]))
    return np.array(test_set), np.array(train_set), np.array(train_features), np.array(test_features)

2.建立模型

1
2
3
4
5
from sklearn.naive_bayes import GaussianNB
gnb = GaussianNB()
y_pred = gnb.fit(train_set, train_features).predict(test_set)
print("split %d rows into train=%d and test=%d rows \nAccuracy: %.2f%%"\
% (len(test_set) + train_set.shape[0], train_set.shape[0], test_set.shape[0], (test_features == y_pred).sum() * 100 / float(test_set.shape[0])))

输出结果如下:

1
2
split 768 rows into train=524 and test=244 rows
Accuracy: 79.92%

不得不说有scikit-learn包以后简单了很多, T T.

Building machine learning system