背景
垃圾郵件的問(wèn)題一直困擾著人們,傳統(tǒng)的垃圾郵件分類(lèi)的方法主要有"關(guān)鍵詞法"和"校驗(yàn)碼法"等,然而這兩種方法效果并不理想。其中,如果使用的是“關(guān)鍵詞”法,垃圾郵件中如果這個(gè)關(guān)鍵詞被拆開(kāi)則可能識(shí)別不了,比如,“中獎(jiǎng)”如果被拆成“中 ~~~ 獎(jiǎng)”可能會(huì)識(shí)別不了。后來(lái),直到提出了使用“貝葉斯”的方法才使得垃圾郵件的分類(lèi)達(dá)到一個(gè)較好的效果,而且隨著郵件數(shù)目越來(lái)越多,貝葉斯分類(lèi)的效果會(huì)更加好。
我們想采用的分類(lèi)方法是通過(guò)多個(gè)詞來(lái)判斷是否為垃圾郵件,但這個(gè)概率難以估計(jì),通過(guò)貝葉斯公式,可以轉(zhuǎn)化為求垃圾郵件中這些詞出現(xiàn)的概率。
貝葉斯公式的介紹
貝葉斯定理由英國(guó)數(shù)學(xué)家貝葉斯 ( Thomas Bayes 1702-1761 ) 發(fā)展,用來(lái)描述兩個(gè)條件概率之間的關(guān)系,比如 P(A|B) 和 P(B|A)。
(1)條件概率公式
設(shè)A,B是兩個(gè)事件,且P(B)>0,則在事件B發(fā)生的條件下,事件A發(fā)生的條件概率為:
P(A|B) = P(AB)/P(B)
(2)由條件概率公式得出乘法公式: P(AB)=P(A|B)P(B)=P(B|A)P(A)
P(A|B) = P(AB)/P(B)可變形為:?
可以這樣來(lái)看貝葉斯公式:?
- P(A) 稱(chēng)為”先驗(yàn)概率”,即在B事件發(fā)生之前,我們對(duì)A事件概率的一個(gè)判斷。如:正常收到一封郵件,該郵件為垃圾郵件的概率就是“先驗(yàn)概率”
- P(A|B)稱(chēng)為”后驗(yàn)概率”, 即在B事件發(fā)生之后,我們對(duì)A事件概率的重新評(píng)估。如:郵件中含有“中獎(jiǎng)”這個(gè)詞,該郵件為垃圾郵件的概率就是“后驗(yàn)概率”
(3)全概率公式:
全概率公式的意義在于,當(dāng)直接計(jì)算P(A)較為困難,而P(Bi), P(A|Bi) (i=1,2,...)的計(jì)算較為簡(jiǎn)單時(shí),可以利用全概率公式計(jì)算P(A)。
?下面給出《概率論與數(shù)理統(tǒng)計(jì)》浙大四版對(duì)于全概率公式和貝葉斯公式的詳細(xì)講述:
本次分類(lèi)任務(wù)的主要思路:
分類(lèi)標(biāo)準(zhǔn):當(dāng) P(垃圾郵件|文字內(nèi)容)> P(正常郵件|文字內(nèi)容)時(shí),我們認(rèn)為該郵件為垃圾郵件,但是單憑單個(gè)詞而做出判斷誤差肯定相當(dāng)大,因此我們可以將所有的詞一起進(jìn)行聯(lián)合判斷。
這里假設(shè):所有詞語(yǔ)彼此之間是不相關(guān)的(嚴(yán)格說(shuō)這個(gè)假設(shè)不成立;實(shí)際上各詞語(yǔ)之間不可能完全沒(méi)有相關(guān)性,但可以忽略)。
假如我們進(jìn)行判斷的詞有“中獎(jiǎng)”、“免費(fèi)”、“無(wú)套路”,則需要判斷P(垃圾郵件|中獎(jiǎng),免費(fèi),無(wú)套路)與P(正常|中獎(jiǎng),免費(fèi),無(wú)套路),使用貝葉斯公式,可以變?yōu)?/p>
?同理,P(正常|中獎(jiǎng),免費(fèi),無(wú)套路)可以變?yōu)椋?/p>
因此,對(duì)P(垃圾郵件|中獎(jiǎng),免費(fèi),無(wú)套路)與P(正常|中獎(jiǎng),免費(fèi),無(wú)套路)的比較,只需要對(duì)分子進(jìn)行對(duì)比。
但是如果對(duì)多個(gè)詞的P(內(nèi)容|正常/垃圾)進(jìn)行相乘時(shí),可能會(huì)因?yàn)槟硞€(gè)詞的概率很小從而導(dǎo)致最后的結(jié)果為0(超出計(jì)算機(jī)的精度),因此可以對(duì)P(內(nèi)容|正常/垃圾)取自然對(duì)數(shù),即ln?P(內(nèi)容|正常/垃圾)。
因此可以變?yōu)?/p>
?但是還有一個(gè)要注意的問(wèn)題,如果某個(gè)詞只出現(xiàn)在垃圾郵件中,而沒(méi)有出現(xiàn)在正常的郵件中,這就會(huì)導(dǎo)致P(內(nèi)容|正常)為0,從而導(dǎo)致整個(gè)分子都變?yōu)?,這會(huì)讓上面的判別失效,為此,可以采取拉普拉斯平滑(laplace smoothing)。
拉普拉斯平滑(laplace smoothing)
主要的思想是對(duì)詞的個(gè)數(shù)+1,對(duì)訓(xùn)練數(shù)據(jù)進(jìn)行平滑處理。當(dāng)訓(xùn)練樣本很大時(shí),每個(gè)詞的個(gè)數(shù)+1造成的概率變化并不大,在誤差允許的范圍之內(nèi)。
數(shù)據(jù)集的介紹:
本次實(shí)驗(yàn)中,所采用的數(shù)據(jù)集為Enron?Email Dataset。該數(shù)據(jù)集已經(jīng)對(duì)正常郵件和垃圾郵件進(jìn)行了分類(lèi)。由于該數(shù)據(jù)集是真實(shí)的電子郵件數(shù)據(jù)集,因此它包含真實(shí)的垃圾郵件,殺毒軟件可能會(huì)對(duì)其中部分的郵件進(jìn)行刪除,因此進(jìn)行本次實(shí)驗(yàn)時(shí)請(qǐng)暫時(shí)關(guān)閉殺毒軟件。
代碼實(shí)現(xiàn):
1.需要導(dǎo)入的包:
import os
import re
import string
import math
2.數(shù)據(jù)的讀入:
DATA_DIR = 'enron'
target_names = ['ham', 'spam']
def get_data(DATA_DIR):
subfolders = ['enron%d' % i for i in range(1,7)]
data = []
target = []
for subfolder in subfolders:
# spam
spam_files = os.listdir(os.path.join(DATA_DIR, subfolder, 'spam'))
for spam_file in spam_files:
with open(os.path.join(DATA_DIR, subfolder, 'spam', spam_file), encoding="latin-1") as f:
data.append(f.read())
target.append(1)
# ham
ham_files = os.listdir(os.path.join(DATA_DIR, subfolder, 'ham'))
for ham_file in ham_files:
with open(os.path.join(DATA_DIR, subfolder, 'ham', ham_file), encoding="latin-1") as f:
data.append(f.read())
target.append(0)
return data, target
X, y = get_data(DATA_DIR)#讀取數(shù)據(jù)
在這個(gè)代碼段中,我們讀入了所有郵件內(nèi)容和標(biāo)簽,其中郵件內(nèi)容存儲(chǔ)在data中,標(biāo)簽存儲(chǔ)在target當(dāng)中,“1”表示為垃圾郵件,“0”表示為正常郵件。
3.數(shù)據(jù)的預(yù)處理
下面我們定義一個(gè)類(lèi)對(duì)數(shù)據(jù)進(jìn)行預(yù)處理
class SpamDetector_1(object):
#清除標(biāo)點(diǎn)符號(hào)
def clean(self, s):
translator = str.maketrans("", "", string.punctuation)
return s.translate(translator)
#將字符串標(biāo)記為單詞
def tokenize(self, text):
text = self.clean(text).lower()
return re.split("\W+", text)
#計(jì)算某個(gè)單詞出現(xiàn)的次數(shù)
def get_word_counts(self, words):
word_counts = {}
for word in words:
word_counts[word] = word_counts.get(word, 0.0) + 1.0
return word_counts
4.數(shù)據(jù)處理階段
在我們開(kāi)始實(shí)際算法之前,我們需要做三件事:計(jì)算(對(duì)數(shù))類(lèi)先驗(yàn),即計(jì)算P(垃圾郵件)和P(正常郵件);詞匯表(即正常郵件和垃圾郵件中出現(xiàn)的所有單詞,方便進(jìn)行拉普拉斯平滑);垃圾郵件和非垃圾郵件的詞頻,即給定詞在垃圾郵件和非垃圾郵件中出現(xiàn)的次數(shù)。
class SpamDetector_2(SpamDetector_1):
# X:data,Y:target標(biāo)簽(垃圾郵件或正常郵件)
def fit(self, X, Y):
self.num_messages = {}
self.log_class_priors = {}
self.word_counts = {}
# 建立一個(gè)集合存儲(chǔ)所有出現(xiàn)的單詞
self.vocab = set()
# 統(tǒng)計(jì)spam和ham郵件的個(gè)數(shù)
self.num_messages['spam'] = sum(1 for label in Y if label == 1)
self.num_messages['ham'] = sum(1 for label in Y if label == 0)
# 計(jì)算先驗(yàn)概率,即所有的郵件中,垃圾郵件和正常郵件所占的比例
self.log_class_priors['spam'] = math.log(
self.num_messages['spam'] / (self.num_messages['spam'] + self.num_messages['ham']))
self.log_class_priors['ham'] = math.log(
self.num_messages['ham'] / (self.num_messages['spam'] + self.num_messages['ham']))
self.word_counts['spam'] = {}
self.word_counts['ham'] = {}
for x, y in zip(X, Y):
c = 'spam' if y == 1 else 'ham'
# 構(gòu)建一個(gè)字典存儲(chǔ)單封郵件中的單詞以及其個(gè)數(shù)
counts = self.get_word_counts(self.tokenize(x))
for word, count in counts.items():
if word not in self.vocab:
self.vocab.add(word)#確保self.vocab中含有所有郵件中的單詞
# 下面語(yǔ)句是為了計(jì)算垃圾郵件和非垃圾郵件的詞頻,即給定詞在垃圾郵件和非垃圾郵件中出現(xiàn)的次數(shù)。
# c是0或1,垃圾郵件的標(biāo)簽
if word not in self.word_counts[c]:
self.word_counts[c][word] = 0.0
self.word_counts[c][word] += count
?可以利用下面的語(yǔ)句進(jìn)行debug,判斷是否運(yùn)行正確,若正確,log_class_priors of spam應(yīng)該為-0.6776,log_class_priors of ham應(yīng)該為-0.7089。
我們選取了第100封之后的郵件作為訓(xùn)練集,前面一百封郵件作為測(cè)試集。
MNB = SpamDetector_2()
# 選取了第100封之后的郵件作為訓(xùn)練集,前面一百封郵件作為測(cè)試集
MNB.fit(X[100:], y[100:])
# print("log_class_priors of spam", MNB.log_class_priors['spam']) #-0.6776
# print("log_class_priors of ham", MNB.log_class_priors['ham']) #-0.7089
5.測(cè)試階段
下面定義一個(gè)類(lèi) SpamDetector對(duì)測(cè)試集進(jìn)行測(cè)試。主要的思路是對(duì)
?進(jìn)行比較,從而判斷是垃圾郵件還是正常郵件
class SpamDetector(SpamDetector_2):
def predict(self, X):
result = []
flag_1 = 0
# 遍歷所有的測(cè)試集
for x in X:
counts = self.get_word_counts(self.tokenize(x)) # 生成可以記錄單詞以及該單詞出現(xiàn)的次數(shù)的字典
spam_score = 0
ham_score = 0
flag_2 = 0
for word, _ in counts.items():
if word not in self.vocab: continue
#下面計(jì)算P(內(nèi)容|垃圾郵件)和P(內(nèi)容|正常郵件),所有的單詞都要進(jìn)行拉普拉斯平滑
else:
# 該單詞存在于正常郵件的訓(xùn)練集和垃圾郵件的訓(xùn)練集當(dāng)中
if word in self.word_counts['spam'].keys() and word in self.word_counts['ham'].keys():
log_w_given_spam = math.log(
(self.word_counts['spam'][word] + 1) / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
log_w_given_ham = math.log(
(self.word_counts['ham'][word] + 1) / (sum(self.word_counts['ham'].values()) + len(
self.vocab)))
# 該單詞存在于垃圾郵件的訓(xùn)練集當(dāng)中,但不存在于正常郵件的訓(xùn)練集當(dāng)中
if word in self.word_counts['spam'].keys() and word not in self.word_counts['ham'].keys():
log_w_given_spam = math.log(
(self.word_counts['spam'][word] + 1) / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
log_w_given_ham = math.log( 1 / (sum(self.word_counts['ham'].values()) + len(
self.vocab)))
# 該單詞存在于正常郵件的訓(xùn)練集當(dāng)中,但不存在于垃圾郵件的訓(xùn)練集當(dāng)中
if word not in self.word_counts['spam'].keys() and word in self.word_counts['ham'].keys():
log_w_given_spam = math.log( 1 / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
log_w_given_ham = math.log(
(self.word_counts['ham'][word] + 1) / (sum(self.word_counts['ham'].values()) + len(
self.vocab)))
# 把計(jì)算到的P(內(nèi)容|垃圾郵件)和P(內(nèi)容|正常郵件)加起來(lái)
spam_score += log_w_given_spam
ham_score += log_w_given_ham
flag_2 += 1
# 最后,還要把先驗(yàn)加上去,即P(垃圾郵件)和P(正常郵件)
spam_score += self.log_class_priors['spam']
ham_score += self.log_class_priors['ham']
# 最后進(jìn)行預(yù)測(cè),如果spam_score > ham_score則標(biāo)志為1,即垃圾郵件
if spam_score > ham_score:
result.append(1)
else:
result.append(0)
flag_1 += 1
return result
MNB = SpamDetector()
MNB.fit(X[100:], y[100:])
pred = MNB.predict(X[:100])
true = y[:100]
accuracy = 0
for i in range(100):
if pred[i] == true[i]:
accuracy += 1
print(accuracy) # 0.98
測(cè)試集的分類(lèi)的正確率達(dá)到98%,效果還是挺好的。
所有的代碼:
import os
import re
import string
import math
DATA_DIR = 'enron'
target_names = ['ham', 'spam']
def get_data(DATA_DIR):
subfolders = ['enron%d' % i for i in range(1,7)]
data = []
target = []
for subfolder in subfolders:
# spam
spam_files = os.listdir(os.path.join(DATA_DIR, subfolder, 'spam'))
for spam_file in spam_files:
with open(os.path.join(DATA_DIR, subfolder, 'spam', spam_file), encoding="latin-1") as f:
data.append(f.read())
target.append(1)
# ham
ham_files = os.listdir(os.path.join(DATA_DIR, subfolder, 'ham'))
for ham_file in ham_files:
with open(os.path.join(DATA_DIR, subfolder, 'ham', ham_file), encoding="latin-1") as f:
data.append(f.read())
target.append(0)
return data, target
X, y = get_data(DATA_DIR)
class SpamDetector_1(object):
"""Implementation of Naive Bayes for binary classification"""
#清除空格
def clean(self, s):
translator = str.maketrans("", "", string.punctuation)
return s.translate(translator)
#分開(kāi)每個(gè)單詞
def tokenize(self, text):
text = self.clean(text).lower()
return re.split("\W+", text)
#計(jì)算某個(gè)單詞出現(xiàn)的次數(shù)
def get_word_counts(self, words):
word_counts = {}
for word in words:
word_counts[word] = word_counts.get(word, 0.0) + 1.0
return word_counts
class SpamDetector_2(SpamDetector_1):
# X:data,Y:target標(biāo)簽(垃圾郵件或正常郵件)
def fit(self, X, Y):
self.num_messages = {}
self.log_class_priors = {}
self.word_counts = {}
# 建立一個(gè)集合存儲(chǔ)所有出現(xiàn)的單詞
self.vocab = set()
# 統(tǒng)計(jì)spam和ham郵件的個(gè)數(shù)
self.num_messages['spam'] = sum(1 for label in Y if label == 1)
self.num_messages['ham'] = sum(1 for label in Y if label == 0)
# 計(jì)算先驗(yàn)概率,即所有的郵件中,垃圾郵件和正常郵件所占的比例
self.log_class_priors['spam'] = math.log(
self.num_messages['spam'] / (self.num_messages['spam'] + self.num_messages['ham']))
self.log_class_priors['ham'] = math.log(
self.num_messages['ham'] / (self.num_messages['spam'] + self.num_messages['ham']))
self.word_counts['spam'] = {}
self.word_counts['ham'] = {}
for x, y in zip(X, Y):
c = 'spam' if y == 1 else 'ham'
# 構(gòu)建一個(gè)字典存儲(chǔ)單封郵件中的單詞以及其個(gè)數(shù)
counts = self.get_word_counts(self.tokenize(x))
for word, count in counts.items():
if word not in self.vocab:
self.vocab.add(word)#確保self.vocab中含有所有郵件中的單詞
# 下面語(yǔ)句是為了計(jì)算垃圾郵件和非垃圾郵件的詞頻,即給定詞在垃圾郵件和非垃圾郵件中出現(xiàn)的次數(shù)。
# c是0或1,垃圾郵件的標(biāo)簽
if word not in self.word_counts[c]:
self.word_counts[c][word] = 0.0
self.word_counts[c][word] += count
MNB = SpamDetector_2()
MNB.fit(X[100:], y[100:])
class SpamDetector(SpamDetector_2):
def predict(self, X):
result = []
flag_1 = 0
# 遍歷所有的測(cè)試集
for x in X:
counts = self.get_word_counts(self.tokenize(x)) # 生成可以記錄單詞以及該單詞出現(xiàn)的次數(shù)的字典
spam_score = 0
ham_score = 0
flag_2 = 0
for word, _ in counts.items():
if word not in self.vocab: continue
#下面計(jì)算P(內(nèi)容|垃圾郵件)和P(內(nèi)容|正常郵件),所有的單詞都要進(jìn)行拉普拉斯平滑
else:
# 該單詞存在于正常郵件的訓(xùn)練集和垃圾郵件的訓(xùn)練集當(dāng)中
if word in self.word_counts['spam'].keys() and word in self.word_counts['ham'].keys():
log_w_given_spam = math.log(
(self.word_counts['spam'][word] + 1) / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
log_w_given_ham = math.log(
(self.word_counts['ham'][word] + 1) / (sum(self.word_counts['ham'].values()) + len(
self.vocab)))
# 該單詞存在于垃圾郵件的訓(xùn)練集當(dāng)中,但不存在于正常郵件的訓(xùn)練集當(dāng)中
if word in self.word_counts['spam'].keys() and word not in self.word_counts['ham'].keys():
log_w_given_spam = math.log(
(self.word_counts['spam'][word] + 1) / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
log_w_given_ham = math.log( 1 / (sum(self.word_counts['ham'].values()) + len(
self.vocab)))
# 該單詞存在于正常郵件的訓(xùn)練集當(dāng)中,但不存在于垃圾郵件的訓(xùn)練集當(dāng)中
if word not in self.word_counts['spam'].keys() and word in self.word_counts['ham'].keys():
log_w_given_spam = math.log( 1 / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
log_w_given_ham = math.log(
(self.word_counts['ham'][word] + 1) / (sum(self.word_counts['ham'].values()) + len(
self.vocab)))
# 把計(jì)算到的P(內(nèi)容|垃圾郵件)和P(內(nèi)容|正常郵件)加起來(lái)
spam_score += log_w_given_spam
ham_score += log_w_given_ham
flag_2 += 1
# 最后,還要把先驗(yàn)加上去,即P(垃圾郵件)和P(正常郵件)
spam_score += self.log_class_priors['spam']
ham_score += self.log_class_priors['ham']
# 最后進(jìn)行預(yù)測(cè),如果spam_score > ham_score則標(biāo)志為1,即垃圾郵件
if spam_score > ham_score:
result.append(1)
else:
result.append(0)
flag_1 += 1
return result
MNB = SpamDetector()
MNB.fit(X[100:], y[100:])
pred = MNB.predict(X[:100])
true = y[:100]
accuracy = 0
for i in range(100):
if pred[i] == true[i]:
accuracy += 1
print(accuracy) # 0.98
其他說(shuō)明:
Enron?Email Dataset數(shù)據(jù)集可以點(diǎn)擊下面鏈接下載
鏈接:https://pan.baidu.com/s/1qYrIXxP4gaja19uHjrm1xA?
提取碼:1234文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-784399.html
下載后解壓到.py文件的目錄下即可使用文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-784399.html
到了這里,關(guān)于基于樸素貝葉斯的垃圾郵件分類(lèi)Python實(shí)現(xiàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!