前言
本項目利用TF-IDF(Term Frequency-Inverse Document Frequency 詞頻-逆文檔頻率)檢索模型和CNN(卷積神經(jīng)網(wǎng)絡(luò))精排模型構(gòu)建了一個聊天機器人,旨在實現(xiàn)一個能夠進行日常對話和情感陪伴的聊天機器人。
首先,我們使用TF-IDF技術(shù)構(gòu)建了一個檢索模型。TF-IDF可以衡量一個詞語在文檔中的重要性,通過計算詞頻和逆文檔頻率來為每個詞分配一個權(quán)重。這個模型可以根據(jù)用戶輸入的問題,從預(yù)定義的問題-回答對中找到最相關(guān)的答案。
其次,我們利用CNN構(gòu)建了一個精排模型。CNN是一種深度學習模型,可以從大量的訓(xùn)練數(shù)據(jù)中學習問題和回答之間的語義關(guān)系。通過對問題和回答進行特征提取和匹配,這個模型可以進一步提高回答的準確性和質(zhì)量。
通過結(jié)合TF-IDF檢索模型和CNN精排模型,我們的聊天機器人能夠根據(jù)用戶的提問,首先通過檢索模型找到一組相關(guān)的答案候選,然后通過精排模型從中選擇最合適的回答。
這個項目的目標是實現(xiàn)一個能夠進行日常對話和情感陪伴的聊天機器人。用戶可以向機器人提問各種問題,包括娛樂、學習、生活等方面的內(nèi)容。機器人將根據(jù)其訓(xùn)練的知識和模型的學習能力,給出相關(guān)的回答,并盡可能地理解用戶的情感和需求。
聊天機器人在日常生活中具有廣泛的應(yīng)用潛力,可以為用戶提供便捷的信息查詢、娛樂互動和情感支持。通過不斷改進和訓(xùn)練,我們的目標是打造一個智能、友好和能夠真正陪伴用戶的聊天機器人。
總體設(shè)計
本部分包括系統(tǒng)整體結(jié)構(gòu)圖、系統(tǒng)流程圖和孿生神經(jīng)網(wǎng)絡(luò)結(jié)構(gòu)圖。
系統(tǒng)整體結(jié)構(gòu)圖
系統(tǒng)整體結(jié)構(gòu)如圖所示。
系統(tǒng)流程圖
系統(tǒng)流程如圖所示。
孿生神經(jīng)網(wǎng)絡(luò)結(jié)構(gòu)圖
孿生神經(jīng)網(wǎng)絡(luò)結(jié)構(gòu)如圖所示。
運行環(huán)境
本部分包括 Python環(huán)境、TensorFlow 環(huán)境和Python包依賴關(guān)系。
Python 環(huán)境
需要Python 3.6及以上配置,在Windows環(huán)境 下推薦下載Anaconda完成Python所需配置。
Anaconda是開源的Python發(fā)行版本,包含conda、Python等180多個科學包及其依賴項。下載文件比較大,如果只需要某些包,或者需要節(jié)省帶寬、存儲空間,也可以使用Miniconda發(fā)行版(僅包含conda和Python)。下載地址為: https://www.anaconda.com/。也可以下載虛擬機在Linux環(huán)境下運行代碼。添加環(huán)境變量:單擊鼠標右鍵,依次選擇屬性、高級系統(tǒng)設(shè)置、環(huán)境變量、新建系統(tǒng)變量,添加安裝Anaconda的路徑即可。
用CMD命令行測試,輸入:
conda list
顯示Anaconda所存的路徑以及文件。Anaconda 自帶Anaconda Prompt,也可以用于其他類似安裝包。
TensorFlow 環(huán)境
在Anaconda中配置TensorFlow環(huán)境的步驟(針對Windows系統(tǒng))如下:
打開Anaconda Prompt,使用語句查詢Python版本,輸入清華倉庫鏡像,命令為
conda config--add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config-set show_channel_urls yes
創(chuàng)建Python 3.x環(huán)境,名稱為TensorFlow,此時Python版本和后面TensorFlow的版本有匹配問題,此步選擇Python3.10,輸入命令:
conda create-n tensorflow python=3.10
有需要確認的地方,都輸入y。
在Anaconda Prompt中激活TensorFlow環(huán)境,輸入命令:
activate tensorflow
安裝CPU版本的TensorFlow,輸入命令:
pip install --upgrade --ignore-installed tensorflow
安裝完畢。
在Anaconda Prompt 中激活TensorFlow環(huán)境,輸入命令:
activate tensorflow
NumPy是存儲和處理大型矩陣的科學計算包,比Python自身的嵌套列表結(jié)構(gòu)高效。安裝命令:
pip install numpy
Matplotlib是Python最著名的繪圖表,提供了一整套和MATLAB相似的命令A(yù)PI,適合交互式制圖。安裝命令:
pip install matplotlib
jieba是優(yōu)秀的第三方中文分詞庫,中文文本需要通過分詞獲得單個詞語。安裝命令:
pip install jieba
Pandas是基于NumPy的一種工具,該工具是為了解決數(shù)據(jù)分析任務(wù)而創(chuàng)建的。Pandas納入了大量庫和一些標準的數(shù)據(jù)模型,提供了高效操作大型數(shù)據(jù)集所需的工具。安裝命令:
pip install pandas
tqdm是快速、可擴展的Python進度條,在Python長循環(huán)中添加一個進度提示信息,用戶只需封裝任意的迭代器tqdm (iterator) 。安裝命令:
pip install tqdm
nltk模塊中包含大量的語料庫,方便完成自然語言處理的任務(wù),包括分詞、詞性標注、命名實體識別及句法分析。安裝命令:
pip install nltk
gensim是開源的第三方Python工具包,用于從原始、非結(jié)構(gòu)化的文本中無監(jiān)督學習到文本隱層的主題向量表達。安裝命令:
pip install gensim
PyQt是創(chuàng)建GUI應(yīng)用程序的工具包,它是Python編程語言和Qt庫的成功融合。安裝命令:
pip install pyqt5
模塊實現(xiàn)
本項目包括4個模塊:數(shù)據(jù)預(yù)處理、模型創(chuàng)建與編譯、模型訓(xùn)練及保存、模型生成。下面分別給出各模塊的功能介紹及相關(guān)代碼。
1. 數(shù)據(jù)預(yù)處理
本部分包括基礎(chǔ)數(shù)據(jù)、數(shù)據(jù)增強和數(shù)據(jù)預(yù)處理。
1)基礎(chǔ)數(shù)據(jù)
數(shù)據(jù)來源于GitHub開源語料集,下 載地址為: https://github.com/codemayq。
該庫對目前市面上已有的開源中文聊天語料進行搜集和系統(tǒng)化整理,包括chatterbot、 豆瓣多輪、PTT八卦、青云、電視劇對白、貼吧論壇回帖、小黃雞、微博語料共8個公開閑聊常用語料和短信,并對其進行統(tǒng)一化規(guī)整和處理,以便于使用。
將解壓后的raw_chat_corpus 文件夾放到當前目錄下,執(zhí)行python main.py
,每個來源的語料分別生成一個獨立的.tsv文件,放在新生成的clean_chat_corpus
文件夾下。
2)數(shù)據(jù)增強
數(shù)據(jù)增強一方面可以增加訓(xùn)練數(shù)據(jù),提升模型的泛化能力;另一方面可增加噪聲數(shù)據(jù),增強模型的健壯性。本項目使用同義詞替換、隨機插入、隨機交換、隨機刪除等數(shù)據(jù)增強操作。
def synonym_replacement(words, n): #同義詞替換
new_words = words.copy()
random_word_list = list(set([word for word in words if word not in stop_words]))
random.shuffle(random_word_list)
num_replaced = 0
for random_word in random_word_list:
synonyms = get_synonyms(random_word) #從詞林中選擇同義詞進行替換
if len(synonyms) >= 1:
synonym = random.choice(list(synonyms))
new_words = [synonym if word == random_word else word for word in new_words]
num_replaced += 1
if num_replaced >= n: #最多替換n個詞
break
sentence = ' '.join(new_words)
new_words = sentence.split(' ')
return new_words
def random_insertion(words, n): #隨機插入
new_words = words.copy()
for _ in range(n):
add_word(new_words)
return new_words
def random_swap(words, n): #隨機交換
new_words = words.copy()
for _ in range(n):
new_words = swap_word(new_words)
return new_words
def swap_word(new_words): #隨機把句子里的兩個單詞交換n次
random_idx_1 = random.randint(0, len(new_words)-1)
random_idx_2 = random_idx_1
counter = 0
while random_idx_2 == random_idx_1:
random_idx_2 = random.randint(0, len(new_words)-1)
counter += 1
if counter > 3:
return new_words
new_words[random_idx_1], new_words[random_idx_2] = new_words[random_idx_2], new_words[random_idx_1]
return new_words
def random_deletion(words, p): #隨機刪除
#如果只有一個詞,不用刪除
if len(words) == 1:
return words
#以概率p刪除詞
new_words = []
for word in words:
r = random.uniform(0, 1)
if r > p:
new_words.append(word)
#i如果全部刪除,返回隨機詞
if len(new_words) == 0:
rand_int = random.randint(0, len(words)-1)
return [words[rand_int]]
return new_words
3)數(shù)據(jù)預(yù)處理
將文檔中原始字符文本轉(zhuǎn)換成Gensim模型所能理解的稀疏向量,進行分詞處理,采用Python中文分詞最常用的jieba工具,支持多種切分模式。
#采用非全模式
#采用非全模式
def cut(self, sentence, stopword= True, cut_all = False):
seg_list = jieba.cut(sentence, cut_all)#對原始語料進行分詞處理
results = []
for seg in seg_list:
if stopword and seg in self.stopwords: #去除停用詞
continue
results.append(seg)
return results
#另一種粒度較細的jieba分詞模式
def cut_for_search(self,sentence, stopword=True):
seg_list = jieba.cut_for_search(sentence)#對原始語料進行分詞處理
results = []
for seg in seg_list:
if stopword and seg in self.stopwords:#去除停用詞
continue
results.append(seg)
return results
對于一些特定的語境和特殊的詞語,需要載入自定義詞典,從而提高分詞的準確率。
#載入自定義詞典
def load_userdict(self, file_name):
jieba.load_userdict(file_name)
停用詞是指在信息檢索中,為節(jié)省存儲空間和提高搜索效率,在處理自然語言數(shù)據(jù)之前或之后自動過濾掉某些字或詞,這些字或詞被稱為停用詞。它們沒有明確意義,但出現(xiàn)的頻率卻很高,進行過濾和省略能夠節(jié)省存儲空間、提高搜索效率。
#讀入停用詞表
def read_in_stopword(self):
file_obj = codecs.open(self.stopword_filepath, 'r', 'utf-8')
while True:
line = file_obj.readline()
line=line.strip('\r\n') #去掉換行符
if not line:
break
self.stopwords.add(line)
file_obj.close()
使用doc2bow()方法對每個不同單詞的詞頻進行統(tǒng)計,將單詞轉(zhuǎn)換為編號,以稀疏向量的形式返回結(jié)果。
#對句子分詞
def cut(self, seg):
return seg.cut_for_search(self.origin_sentence)
#獲取切詞后的詞列表
def get_cuted_sentence(self):
return self.cuted_sentence
#獲取原句子
def get_origin_sentence(self):
return self.origin_sentence
#設(shè)置該句子得分
def set_score(self, score):
self.score = score
#詞袋表示方法
def sentence2vec(self, sentence):
sentence = Sentence(sentence, self.seg)
vec_bow = self.dictionary.doc2bow(sentence.get_cuted_sentence())
return self.model[vec_bow] #返回稀疏向量形式
self.corpus_simple = [self.dictionary.doc2bow(text) for text in self.texts] #生成語料
2. 創(chuàng)建模型并編譯
數(shù)據(jù)加載進模型之后,需要定義模型結(jié)構(gòu)并優(yōu)化損失函數(shù)。
1)定義模型結(jié)構(gòu)
在TF-IDF模型中定義的架構(gòu)為:計算TF-IDF向量,通過倒排表的方式找到與當前輸入類似的問題描述,針對候選問題進行余弦相似度計算。
#初始化模型,將整個語料庫轉(zhuǎn)為TF-IDF表示方法,創(chuàng)建余弦相似度索引
#構(gòu)建其他復(fù)雜模型前需要的簡單模型
def simple_model(self, min_frequency = 0):
self.texts = self.get_cuted_sentences()
# 刪除低頻詞
frequency = defaultdict(int) #創(chuàng)建頻率對象
for text in self.texts:
for token in text:
frequency[token] += 1
self.texts = [[token for token in text if frequency[token] > min_frequency] for text in self.texts]
self.dictionary = corpora.Dictionary(self.texts) #創(chuàng)建字典
self.corpus_simple = [self.dictionary.doc2bow(text) for text in self.texts] #生成語料
#TF-IDF模型
def TfidfModel(self):
self.simple_model()
#轉(zhuǎn)換模型
self.model = models.TfidfModel(self.corpus_simple)
self.corpus = self.model[self.corpus_simple]
#創(chuàng)建相似度矩陣
self.index = similarities.MatrixSimilarity(self.corpus)
#對新輸入的句子(比較的句子)進行預(yù)處理
def sentence2vec(self, sentence):
sentence = Sentence(sentence, self.seg)
vec_bow = self.dictionary.doc2bow(sentence.get_cuted_sentence())
return self.model[vec_bow]
def bow2vec(self):
vec = []
length = max(self.dictionary) + 1
for content in self.corpus:
sentence_vectors = np.zeros(length)
for co in content:
sentence_vectors[co[0]] = co[1]
#將句子出現(xiàn)的單詞TF-IDF表示放入矩陣中
vec.append(sentence_vectors)
return vec
#求最相似的句子
def similarity(self, sentence):
sentence_vec = self.sentence2vec(sentence)
sims = self.index[sentence_vec]
sim = max(enumerate(sims), key=lambda item: item[1])
index = sim[0]
score = sim[1]
sentence = self.sentences[index]
sentence.set_score(score)
return sentence #返回一個類
在孿生神經(jīng)網(wǎng)絡(luò)中,每個CNN都有一個卷積層,卷積后連接一個池化層,進行數(shù)據(jù)的降維。在每個卷積層上都會使用多個濾波器來提取不同類型的特征。最大池化和全連接層之后,引入dropout進行正則化,用以消除模型的過擬合問題。
def fc_layer(self, bottom, n_weight, name): #全連接層
assert len(bottom.get_shape()) == 2
n_prev_weight = bottom.get_shape()[1]
initer = tf.contrib.layers.xavier_initializer()
W = tf.get_variable(name + 'W', dtype=tf.float32, shape=[n_prev_weight, n_weight], initializer=initer, regularizer=tf.contrib.layers.l2_regularizer(scale=0.0000001))#y=Wx+b線性模型
b = tf.get_variable(name + 'b', dtype=tf.float32, initializer=tf.constant(0.01, shape=[n_weight],dtype=tf.float32),regularizer=tf.contrib.layers.l2_regularizer(scale=0.0000001))
fc = tf.nn.bias_add(tf.matmul(bottom, W), b)
return fc
def _cnn_layer(self, input): #卷積和池化層
all = []
max_len = input.get_shape()[1]
for i, filter_size in enumerate(self.window_sizes):
with tf.variable_scope('filter{}'.format(filter_size)):
cnn_out = tf.layers.conv1d(input, self.n_filters, filter_size, padding='valid',
activation=tf.nn.relu, name='q_conv_' + str(i)) #卷積
pool_out = tf.reduce_max(cnn_out, axis=1, keepdims=True) #池化
tanh_out = tf.nn.tanh(pool_out) #tanh激活函數(shù)
all.append(tanh_out)
cnn_outs = tf.concat(all, axis=-1)
dim = cnn_outs.get_shape()[-1]
cnn_outs = tf.reshape(cnn_outs, [-1, dim])
return cnn_outs
隱藏層的意義是把輸入數(shù)據(jù)的特征,抽象到另一個維度空間,展現(xiàn)其更抽象化的特征,更好的進行線性劃分。
def _HL_layer(self, bottom, n_weight, name): #隱藏層
assert len(bottom.get_shape()) == 3
n_prev_weight = bottom.get_shape()[-1]
max_len = bottom.get_shape()[1]
initer = tf.contrib.layers.xavier_initializer() #初始化
W = tf.get_variable(name + 'W', dtype=tf.float32, shape=[n_prev_weight, n_weight], initializer=initer, regularizer=tf.contrib.layers.l2_regularizer(scale=0.0000001))
b = tf.get_variable(name + 'b', dtype=tf.float32, initializer=tf.constant(0.1, shape=[n_weight],dtype=tf.float32),regularizer=tf.contrib.layers.l2_regularizer(scale=0.0000001)) #y=Wx+b線性模型
bottom_2 = tf.reshape(bottom, [-1, n_prev_weight])
hl = tf.nn.bias_add(tf.matmul(bottom_2, W), b) #y=Wx+b單個神經(jīng)元
hl_tanh = tf.nn.tanh(hl) #激活函數(shù)
HL = tf.reshape(hl_tanh, [-1, max_len, n_weight])
return HL
def _build(self, embeddings): #構(gòu)建層
if embeddings is not None:
self.Embedding=tf.Variable(tf.to_float(embeddings), trainable=False, name='Embedding')
else: #嵌入構(gòu)建
self.Embedding = tf.get_variable('Embedding',shape=[self.vocab_size, self.embedding_size], initializer=tf.uniform_unit_scaling_initializer())
self.q_embed = tf.nn.dropout(tf.nn.embedding_lookup(self.Embedding, self._ques), keep_prob=self.dropout_keep_prob)
self.a_embed = tf.nn.dropout(tf.nn.embedding_lookup(self.Embedding, self._ans), keep_prob=self.dropout_keep_prob)
with tf.variable_scope('siamese') as scope:
#計算隱藏和卷積層
#hl_q = self._HL_layer(self.q_embed, self.hidden_size, 'HL_layer')
conv1_q = self._cnn_layer(self.q_embed)
scope.reuse_variables() #權(quán)值共享
#hl_a = self._HL_layer(self.a_embed, self.hidden_size, 'HL_layer')
conv1_a = self._cnn_layer(self.a_embed)
with tf.variable_scope('fc') as scope:
con = tf.concat([conv1_q, conv1_a], axis=-1)
logits = self.fc_layer(con, 1, 'fc_layer')
res = tf.nn.sigmoid(logits)
return logits, res
def _add_loss_op(self, logits):
#損失節(jié)點
loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=logits,
labels=tf.cast(tf.reshape(self._y, [-1, 1]), dtype=tf.float32))
reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
l2_loss = sum(reg_losses)
pointwise_loss = tf.reduce_mean(loss) + l2_loss
tf.summary.scalar('loss', pointwise_loss)
return pointwise_loss
def _add_acc_op(self):
#精確度節(jié)點
predictions = tf.to_int32(tf.round(self.res))
correct_prediction = tf.equal(predictions, self._y)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
tf.summary.scalar('accuracy', accuracy)
return accuracy
def _add_train_op(self, loss):
#訓(xùn)練節(jié)點
with tf.name_scope('train_op'):
#記錄訓(xùn)練步驟
self.global_step=tf.Variable(0, name='global_step', trainable=False)
optimizer = tf.train.AdamOptimizer(self.learning_rate)
#計算梯度,得到梯度和變量
gradsAndVars = optimizer.compute_gradients(loss)
#將梯度應(yīng)用到變量下,生成訓(xùn)練器
train_op = optimizer.apply_gradients(gradsAndVars, global_step=self.global_step)
#用summary繪制tensorBoard
for g, v in gradsAndVars:
if g is not None:
tf.summary.histogram("{}/grad/hist".format(v.name), g)
tf.summary.scalar("{}/grad/sparsity".format(v.name), tf.nn.zero_fraction(g))
self.summary_op = tf.summary.merge_all()
return train_op
2)優(yōu)化損失函數(shù)
確定模型架構(gòu)后進行編譯,這是二分類問題,使用交叉熵作為損失函數(shù)。由于所有標簽都帶有相似的權(quán)重,通常使用精確度作為性能指標。Adam是常用的梯度下降方法,使用它來優(yōu)化模型參數(shù)。
#定義損失函數(shù)和優(yōu)化器
loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=logits,labels=tf.cast(tf.reshape(self._y, [-1, 1]), dtype=tf.float32))optimizer = tf.train.AdamOptimizer(self.learning_rate)
predictions = tf.to_int32(tf.round(self.res))
correct_prediction = tf.equal(predictions, self._y)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
3. 模型訓(xùn)練及保存
在定義模型架構(gòu)和編譯之后,使用訓(xùn)練集訓(xùn)練模型,使模型可以對語義相似的問句正確分類,用訓(xùn)練集來擬合并保存模型。
1)模型訓(xùn)練
def devStep(corpus):
iterator = Iterator(corpus)
dev_Loss = [] #損失
dev_Acc = [] #準確率
dev_Prec = []
dev_Recall = []
dev_F_beta = []
for batch_x in iterator.next(config.batch_size, shuffle=False):
batch_q,batch_a,batch_qmask,batch_amask,label= zip(*batch_x)
batch_q = np.asarray(batch_q) #批次
batch_a = np.asarray(batch_a)
loss, summary, step, predictions = sess.run(
[model.total_loss,model.summary_op,model.global_step, model.res],
feed_dict={model._ques: batch_q, #傳入訓(xùn)練集
model._ans: batch_a,
model._y: label,
model.dropout_keep_prob: 1.0})
predictions = [1 if i >= 0.5 else 0 for i in predictions]
acc, recall, prec, f_beta = get_binary_metrics(pred_y=predictions, true_y=label) #預(yù)測值
dev_Loss.append(loss)
dev_Acc.append(acc)
dev_Prec.append(prec)
dev_Recall.append(recall)
dev_F_beta.append(f_beta)
evalSummaryWriter.add_summary(summary, step)
return mean(dev_Loss), mean(dev_Acc), mean(dev_Recall), mean(dev_Prec), mean(dev_F_beta) #返回參數(shù)
best_acc = 0.0
for epoch in range(config.num_epochs): #輪次
train_time1 = time.time()
print("----- Epoch {}/{} -----".format(epoch + 1, config.num_epochs))
train_Loss = [] #損失
train_Acc = [] #準確率
train_Prec = []
train_Recall = []
train_F_beta = []
for batch_x in iterator.next(config.batch_size, shuffle=True):
batch_q,batch_a,batch_qmask,batch_amask,label= zip(*batch_x)
batch_q = np.asarray(batch_q) #批次
batch_a = np.asarray(batch_a)
train_loss, train_acc, train_prec, train_recall, train_f_beta = trainStep(batch_q, batch_a, label) #輸出訓(xùn)練結(jié)果
train_Loss.append(train_loss)
train_Acc.append(train_acc)
train_Prec.append(train_prec)
train_Recall.append(train_recall)
train_F_beta.append(train_f_beta)
print("---epoch %d -- train loss %.3f -- train acc %.3f -- train recall %.3f -- train precision %.3f"
"-- train f_beta %.3f" % (
epoch+1, np.mean(train_Loss), np.mean(train_Acc), np.mean(train_Recall),np.mean(train_Prec), np.mean(train_F_beta)))#打印準確率
test_loss, test_acc, test_recall, test_prec, test_f_beta = devStep(test_corpus)
print("---epoch %d -- test loss %.3f -- test acc %.3f -- test recall %.3f -- test precision %.3f"
"-- test f_beta %.3f" % (
epoch + 1, test_loss, test_acc, test_recall, test_prec, test_f_beta))
2)模型保存
方便訓(xùn)練時讀取,將模型保存為ckpt格式的文件,利用TensorFlow中的tf.train.Saver
進行保存。
saver = tf.train.Saver(tf.global_variables(), max_to_keep=10)
#定義保存的對象
best_saver = tf.train.Saver(tf.global_variables(), max_to_keep=5)
ckpt = tf.train.get_checkpoint_state(save_path)
checkpoint_path=os.path.join(save_path, 'acc{:.3f}_{}.ckpt'.format(test_acc, epoch + 1))
bestcheck_path = os.path.join(best_path, 'acc{:.3f}_{}.ckpt'.format(test_acc, epoch + 1))
saver.save(sess, checkpoint_path, global_step=epoch)#保存模型
4. 模型應(yīng)用
一是通過中控模塊調(diào)用召回和精排模型;二是通過訓(xùn)練好的召回和精排模型進行語義分類,并且獲取輸出。
1)GUI模塊
GUI模塊是本項目的前端。提供了2個文本框,1個顯示用戶輸入,1個顯示對話內(nèi)容。提供了1個“發(fā)送”
button,調(diào)取control.py
中的接口,返回選取的回答內(nèi)容。
def setupUi(self, Dialog): #設(shè)置界面
Dialog.setObjectName("智能聊天機器人")
Dialog.resize(582, 434)
#palette = QPalette()
#palette.setBrush(QPalette.Background, QBrush(QPixmap("./background.jpg")))
#Dialog.setPalette(palette)
palette = QPalette()
pix = QPixmap("./background.jpg")
pix = pix.scaled(Dialog.width(), Dialog.height())
palette.setBrush(QPalette.Background, QBrush(pix))
Dialog.setPalette(palette)
self.label = QtWidgets.QLabel(Dialog)
self.label.setGeometry(QtCore.QRect(40, 30, 361, 51))
self.label.setStyleSheet("color: rgb(205, 85, 85);\n"
"font: 16pt \"黑體\";\n"
"text-decoration: underline;")
self.label.setObjectName("dialog")
self.plainTextEdit = QtWidgets.QPlainTextEdit(Dialog)
self.plainTextEdit.setGeometry(QtCore.QRect(40, 80, 501, 181))
self.plainTextEdit.setObjectName("plainTextEdit")
self.plainTextEdit.setFocusPolicy(QtCore.Qt.NoFocus)
self.plainTextEdit_2 = QtWidgets.QPlainTextEdit(Dialog)
self.plainTextEdit_2.setGeometry(QtCore.QRect(40, 310, 401, 41))
self.plainTextEdit_2.setObjectName("plainTextEdit_2")
self.plainTextEdit.setStyleSheet("font: 14pt \"黑體\";\n")
self.pushButton = QtWidgets.QPushButton(Dialog)
self.pushButton.setGeometry(QtCore.QRect(480, 320, 75, 23))
self.pushButton.setStyleSheet("font: 14pt \"黑體\";\n"
"background-color: rgb(255, 192, 203);")
self.pushButton.setObjectName("pushButton")
self.label_2 = QtWidgets.QLabel(Dialog)
self.label_2.setGeometry(QtCore.QRect(50, 280, 54, 12))
self.label_2.setText("")
self.label_2.setObjectName("label_2")
self.label_3 = QtWidgets.QLabel(Dialog)
self.label_3.setGeometry(QtCore.QRect(50, 280, 71, 16))
self.label_3.setStyleSheet("font: 75 12pt \"Aharoni\";")
self.label_3.setObjectName("label_3")
self.retranslateUi(Dialog)
QtCore.QMetaObject.connectSlotsByName(Dialog)
2)中控模塊
中控模塊設(shè)定2個閾值max_sim和min_sim,用于縮減響應(yīng)時間。如果recall_score<min_ sim,說明問答庫數(shù)量少或者問句噪聲大,需要復(fù)查分析;如果min_sim<recall_score<max_ sim,進行召回(recall)和精排(rerank);如果recall_score>max_sim, 只進行召回,直接得出答案。
import time #導(dǎo)入模塊
from Rerank.data_helper import *
from Recall import recall_model
from Rerank import rerank_model
class SmartQA: #定義類
def __init__(self): #初始化
self.top_k = 5
self.min_sim = 0.10
self.max_sim = 0.90
self.embeding_size = 200
self.vocab_file = './data/word_vocab.txt'
self.embed_file = './word2vec/5000-small.txt'
self.embedding = load_embedding(self.embed_file, self.embeding_size, self.vocab_file)
#分為recall和rerank兩部分
def search_main(self, question):
#粗排
candi_questions, questionList, answerList = recall_model.main(question, self.top_k)
answer_dict = {}
corpus = []
indxs = []
matchmodel_simscore = []
sim_questions = []
for indx, candi in zip(*candi_questions):
#如果在粗排階段就已經(jīng)找到了非常相似的問題,則馬上返回這個答案,終止循環(huán)
if candi > self.max_sim:
indxs.append(indx)
break
else:
#如果召回的數(shù)據(jù)噪聲大,生成一個文件,復(fù)查分析
matchmodel_simscore.append(candi)
corpus.append((question, questionList[indx]))
indxs.append(indx)
sim_questions.append(questionList[indx])
if candi_questions[1][0] < self.min_sim:
final_answer = '我還沒找到相似的答案,請說得再清楚一點'
return final_answer, sim_questions
if len(indxs) == 1:
#找到非常相似的答案
sim=[questionList[indx] for indx, candi in zip(*candi_questions)]
return answerList[indxs[0]], sim
else:
if len(indxs) != 0 :
deepmodel_simscore = rerank_model.main(corpus, self.embedding) #使用精排模型
final = list(zip(indxs, matchmodel_simscore, deepmodel_simscore)) #輸出結(jié)果
for id, score1, score2 in final:
final_score = (score1 + score2) / 2
answer_dict[id] = final_score
if answer_dict: #如果識別成功
answer_dict = sorted(answer_dict.items(), key=lambda asd: asd[1], reverse=True)
final_answer = answerList[answer_dict[0][0]]
else:
final_answer = '請說得再清楚一點.'
return final_answer, sim_questions
def answer(question): #定義回答的問題
handler = SmartQA()
final_answer, sim_questions = handler.search_main(question)
return final_answer
if __name__ == "__main__": #主函數(shù)
handler = SmartQA()
while (1):
question = input('用戶說: \n')
if question == 'end':
print('byebye~')
break
s1 = time.time()
final_answer, sim_questions = handler.search_main(question)
s2 = time.time()
print('機器人:', final_answer)
3)相關(guān)代碼
本部分包括召回(Recall) 模型和精排(Rerank) 模型。
1)召回模型
召回模型相關(guān)代碼如下:
import pandas as pd #導(dǎo)入模塊
import matplotlib as mpl
import numpy as np
from nltk.probability import FreqDist
from .jiebaSegment import *
from .sentenceSimilarity import SentenceSimilarity
mpl.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # enable chinese
#設(shè)置外部詞
seg = Seg()
seg.load_userdict('./data/userdict.txt')
def read_corpus():
qList = []
#問題的關(guān)鍵詞列表
qList_kw = []
aList = []
data = pd.read_csv('./data/qa_.csv', header=None)
data_ls = np.array(data).tolist()
for t in data_ls:
qList.append(t[0])
qList_kw.append(seg.cut(t[0]))
aList.append(t[1])
return qList_kw, qList, aList
def invert_idxTable(qList_kw): #定一個簡單的倒排表
invertTable = {}
for idx, tmpLst in enumerate(qList_kw):
for kw in tmpLst:
if kw in invertTable.keys():
invertTable[kw].append(idx)
else:
invertTable[kw] = [idx]
return invertTable
def filter_questionByInvertTab(inputQuestionKW, questionList, answerList, invertTable): #過濾問題
idxLst = []
questions = []
answers = []
for kw in inputQuestionKW:
if kw in invertTable.keys():
idxLst.extend(invertTable[kw])
idxSet = set(idxLst)
for idx in idxSet:
questions.append(questionList[idx])
answers.append(answerList[idx])
return questions, answers
def main(question, top_k):#topk控制選出的回答個數(shù)
qList_kw, questionList, answerList = read_corpus()
questionList_s = questionList
answerList_s = answerList
#初始化模型
ss = SentenceSimilarity(seg)
ss.set_sentences(questionList_s)
ss.TfidfModel() #TF-IDF模型
question_k = ss.similarity_k(question, top_k)
return question_k, questionList_s, answerList_s
if __name__ == '__main__':
#設(shè)置外部詞
seg = Seg()
seg.load_userdict('./userdict/userdict.txt')
#讀取數(shù)據(jù)
List_kw, questionList, answerList = read_corpus()
#初始化模型
ss = SentenceSimilarity(seg)
ss.set_sentences(questionList)
ss.TfidfModel()
while True:
question = input("請輸入問題(q退出): ")
if question == 'q':
break
question_k = ss.similarity_k(question, 5)
print("女票: {}".format(answerList[question_k[0][0]]))
for idx, score in zip(*question_k):
print("same questions: {}, score: {}".format(questionList[idx], score))
2)精排模型
精排模型相關(guān)代碼如下:
import time #導(dǎo)入各種模塊
import logging
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import tensorflow as tf
import os
import tqdm
import sys
from copy import deepcopy
stdout = sys.stdout
from Rerank.data_helper import *
from Rerank.data_preprocess import *
from Rerank.model import SiameseQACNN
from Rerank.model_utils import *
from Rerank.metrics import *
from sklearn.metrics import accuracy_score
class NNConfig(object): #定義類
def __init__(self, embeddings): #初始化
self.ans_length = 15
self.num_epochs = 10
self.ques_length = 15
self.batch_size = 32
self.window_sizes = [1, 1, 2]
self.hidden_size = 128
self.output_size = 128
self.keep_prob = 0.5
self.n_filters = 128
self.embeddings = np.array(embeddings).astype(np.float32)
self.vocab_size = 3258
self.embedding_size = 300
self.learning_rate = 0.0001
self.optimizer = 'adam'
self.clip_value = 5
self.l2_lambda = 0.00001
self.eval_batch = 100
def train(train_corpus, test_corpus, config): #定義訓(xùn)練
iterator = Iterator(train_corpus)
if not os.path.exists(save_path):
os.makedirs(save_path)
if not os.path.exists(best_path):
os.makedirs(best_path)
#定義計算圖
with tf.Graph().as_default():
session_conf = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
with tf.Session(config=session_conf) as sess:
#訓(xùn)練
print('Start training and evaluating ...')
outDir = os.path.abspath(os.path.join(os.path.curdir, "summarys"))
print("Writing to {}\n".format(outDir))
trainSummaryDir = os.path.join(outDir, "train")
trainSummaryWriter = tf.summary.FileWriter(trainSummaryDir, sess.graph)
evalSummaryDir = os.path.join(outDir, "eval")
evalSummaryWriter = tf.summary.FileWriter(evalSummaryDir, sess.graph)
model = SiameseQACNN(config)
#初始化所有變量
saver = tf.train.Saver(tf.global_variables(), max_to_keep=10)
best_saver = tf.train.Saver(tf.global_variables(), max_to_keep=5)
ckpt = tf.train.get_checkpoint_state(save_path)
print('Configuring TensorBoard and Saver ...')
if ckpt and tf.train.checkpoint_exists(ckpt.model_checkpoint_path):
print('Reloading model parameters ...')
saver.restore(sess, ckpt.model_checkpoint_path)
else:
print('Created new model parameters ...')
sess.run(tf.global_variables_initializer())
#計算訓(xùn)練參數(shù)
total_parameters = count_parameters()
print('Total trainable parameters : {}'.format(total_parameters))
def trainStep(batch_q, batch_a, batchY):
_, loss, summary, step, predictions = sess.run(
[model.train_op, model.total_loss, model.summary_op, model.global_step, model.res],
feed_dict={model._ques: batch_q,
model._ans: batch_a,
model._y: label,
model.dropout_keep_prob: config.keep_prob})
predictions = [1 if i >= 0.5 else 0 for i in predictions]
acc, recall, prec, f_beta = get_binary_metrics(pred_y=predictions, true_y=batchY)
trainSummaryWriter.add_summary(summary, step)
return loss, acc, prec, recall, f_beta
def devStep(corpus):
iterator = Iterator(corpus) #定義各種參數(shù)
dev_Loss = []
dev_Acc = []
dev_Prec = []
dev_Recall = []
dev_F_beta = []
for batch_x in iterator.next(config.batch_size, shuffle=False):
batch_q, batch_a, batch_qmask, batch_amask, label = zip(*batch_x)
batch_q = np.asarray(batch_q) #獲取批次
batch_a = np.asarray(batch_a)
loss, summary, step, predictions = sess.run( #輸出結(jié)果
[model.total_loss, model.summary_op, model.global_step, model.res],
feed_dict={model._ques: batch_q,
model._ans: batch_a,
model._y: label,
model.dropout_keep_prob: 1.0})
predictions = [1 if i >= 0.5 else 0 for i in predictions]
acc, recall, prec, f_beta = get_binary_metrics(pred_y=predictions, true_y=label) #得到參數(shù)值
dev_Loss.append(loss)
dev_Acc.append(acc)
dev_Prec.append(prec)
dev_Recall.append(recall)
dev_F_beta.append(f_beta)
evalSummaryWriter.add_summary(summary, step)
return mean(dev_Loss), mean(dev_Acc), mean(dev_Recall), mean(dev_Prec), mean(dev_F_beta)
best_acc = 0.0
for epoch in range(config.num_epochs): #輪次
train_time1 = time.time()
print("----- Epoch {}/{} -----".format(epoch + 1, config.num_epochs)) #輸出訓(xùn)練參數(shù)
train_Loss = []
train_Acc = []
train_Prec = []
train_Recall = []
train_F_beta = []
for batch_x in iterator.next(config.batch_size, shuffle=True):
batch_q, batch_a, batch_qmask, batch_amask, label = zip(*batch_x)
batch_q = np.asarray(batch_q) #批次數(shù)據(jù)
batch_a = np.asarray(batch_a)
train_loss, train_acc, train_prec, train_recall, train_f_beta = trainStep(batch_q, batch_a, label) #訓(xùn)練參數(shù)獲取
train_Loss.append(train_loss)
train_Acc.append(train_acc)
train_Prec.append(train_prec)
train_Recall.append(train_recall)
train_F_beta.append(train_f_beta)
print("---epoch %d -- train loss %.3f -- train acc %.3f -- train recall %.3f -- train precision %.3f"
"-- train f_beta %.3f" % (
epoch+1, np.mean(train_Loss), np.mean(train_Acc), np.mean(train_Recall),np.mean(train_Prec), np.mean(train_F_beta))) #打印訓(xùn)練參數(shù)值
test_loss, test_acc, test_recall, test_prec, test_f_beta = devStep(test_corpus)
print("---epoch %d -- test loss %.3f -- test acc %.3f -- test recall %.3f -- test precision %.3f"
"-- test f_beta %.3f" % (
epoch + 1, test_loss, test_acc, test_recall, test_prec, test_f_beta)) #打印測試參數(shù)值
checkpoint_path = os.path.join(save_path, 'acc{:.3f}_{}.ckpt'.format(test_acc, epoch + 1)) #檢查點路徑
bestcheck_path = os.path.join(best_path, 'acc{:.3f}_{}.ckpt'.format(test_acc, epoch + 1)) #最佳檢查路徑
saver.save(sess, checkpoint_path, global_step=epoch)
if test_acc > best_acc:
best_acc = test_acc
best_saver.save(sess, bestcheck_path, global_step=epoch)
def main(): #主函數(shù)
embedding = load_embedding(embeding, embeding_size, vocab_file)
preprocess_data1 = preprocess(train_file) #預(yù)處理
preprocess_data2 = preprocess(test_file)
train_data = read_train(preprocess_data1, stopword_file, vocab_file) #訓(xùn)練數(shù)據(jù)
test_data = read_train(preprocess_data2, stopword_file, vocab_file) #測試數(shù)據(jù)
train_corpus = load_train_data(train_data, max_q_length, max_a_length)
test_corpus = load_train_data(test_data, max_q_length, max_a_length)
config = NNConfig(embedding) #配置參數(shù)
config.ques_length = max_q_length
config.ans_length = max_a_length
#config.embeddings = embedding
train(deepcopy(train_corpus), test_corpus, config)
if __name__ == '__main__': #主函數(shù)
save_path = "./model/checkpoint"
best_path = "./model/bestval"
train_file = '../data/train.csv'
test_file = '../data/test.csv'
stopword_file = '../stopwordList/stopword.txt'
embeding = '../word2vec/5000-small.txt'
vocab_file = '../data/word_vocab.txt'
max_q_length = 15
max_a_length = 15
embeding_size = 200
main()
系統(tǒng)測試
本部分包括訓(xùn)練準確率、測試效果及模型應(yīng)用。
1. 訓(xùn)練準確率
測試準確率在90%左右,損失隨訓(xùn)練次數(shù)增多而下降,并趨于穩(wěn)定,如下圖所示。
2. 測試效果
將文本輸入模型進行測試,如下圖所示 。
3. 模型生成
本部分包括程序下載運行、應(yīng)用使用說明和測試結(jié)果。
1)程序下載運行
下載程序壓縮包后,在Python環(huán)境 下執(zhí)行gui.py
命令即可。
2)應(yīng)用使用說明
解壓程序壓縮包后,文件目錄如下:
其中,qacnn.py
是模型的訓(xùn)練文件,可以單獨運行; Control.py
控制Recall和Rerank模型的選擇,可以單獨運行;GUI.py
是本項目的圖形化界面,調(diào)用control.py的接口。
3)測試結(jié)果
圖形化界面測試結(jié)果如圖所示。
工程源代碼下載
詳見本人博客資源下載頁文章來源:http://www.zghlxwxcb.cn/news/detail-546202.html
其它資料下載
如果大家想繼續(xù)了解人工智能相關(guān)學習路線和知識體系,歡迎大家翻閱我的另外一篇博客《重磅 | 完備的人工智能AI 學習——基礎(chǔ)知識學習路線,所有資料免關(guān)注免套路直接網(wǎng)盤下載》
這篇博客參考了Github知名開源平臺,AI技術(shù)平臺以及相關(guān)領(lǐng)域?qū)<遥篋atawhale,ApacheCN,AI有道和黃海廣博士等約有近100G相關(guān)資料,希望能幫助到所有小伙伴們。文章來源地址http://www.zghlxwxcb.cn/news/detail-546202.html
到了這里,關(guān)于基于TF-IDF+Tensorflow+PyQt+孿生神經(jīng)網(wǎng)絡(luò)的智能聊天機器人(深度學習)含全部Python工程源碼及模型+訓(xùn)練數(shù)據(jù)集的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!