聊天机器人—RNN实战项目

聊天机器人是一种基于自然语言处理相关技术和方法的人机交互方式,虚拟助手(例如苹果的 Siri、微软的 Cortana、Google Duplex)、虚拟人物(微软小冰、Gowild 的琥珀),以及智能客服等诸多场景都有应用。

聊天机器人根据其应用场景的需要有不同的实现方式,本项目基于编码器-解码器模型实现一种生成式对话。读者可以通过关注公众号“磐小妹”(微信 ID:BotMind-PXM)体验最终实现的效果,除“磐小妹”中的“查天气”“听音乐”等封闭域的聊天外,其所有的回复基于的都是下面即将实现的对话生成模型,如图 1 所示。
本项目实现的聊天效果(不包括接入公众号)
图 1:本项目实现的聊天效果(不包括接入公众号)

1. 数据预处理

为了本项目的实现,作者花费了大量的时间爬取和搜集了 106 万多的对话数据(一问一答的形式)。数据集中对话数据的质量有高有低,一个高质量的对话数据集对于聊天机器人的对话效果尽管重要,但是要获取高质量的对话数据集需要长时间的积累,会耗费大量的时间和精力。

首先定义两个辅助函数(本项目的代码修改自 TensorFlow 的官方示例“Neural Machine Translation with Attention”):
import tensorflow as tf
from sklearn.model_selection import train_test_split
import jieba
import os
import time

def preprocess_sentence(sentence):
    """为句子添加开始和结束标记"""
    sentence = '<start>' + sentence +'<end>'
    return sentence

def max_length(tensor):
    #计算问答序列的最大长度
    return max(len(t) for t in tensor)
第 7 行代码中的“preprocess_sentence”函数用来在句子的首尾添加标记,“max_length”用来计算数据集中问句和答句中最长的句子长度。

接下来我们定义一个分词器函数“tokenize”,该函数返回的“tensor”使用 id 表示的句子,“sentences_tokenizer”是训练得到的词典,与上一个项目中我们自己建立的词典作用是一样的。
def tokenize(sentences):
    #初始化分词器,并生成词典
    sentences_tokenizer=tf.keras.preprocessing.text.Tokenizer(filters=' ')
        sentences_tokenizer.fit_on_texts(sentences)
        #利用词典将文本数据转为id表示
        tensor = sentences_tokenize.texts_to_sequences(sentences)
        #将数据填充成统一长度,以所有数据中最大长度为准,长度不够的补零
        tensor=tf.keras.preprocessing.sequence.pad_sequences(tensor,padding='post')

        return tensor, sentences_tokenizer

最后定义一个“load_dataset”函数加载数据集:
def load_dataset(file_path):
    """加载数据集"""
    with open(file_path, "r") as file:
        lines = file.readlines()
        q = ''
        a = ''
        qa_pairs =[]
        for i in range(len(lines)):
            if i % 3 == 0:
                q = " ".join(jieba.cut(lines[i] .strip()))
            elif i % 3 == 1:
                a = n ".join(jieba.cut(lines[i] .strip()))
            else :   #组合
pair=[preprocess_sentence(q), preprocess_sentence (a)]
                 qa_pairs.append(pair)
    #zip 操作删除重复问答
    #zip 返回格式:[(q, a) , (q, a),...]
    q_sentences, a_sentences = zip(*qa_pairs)
    q_tensor, q_tokenizer = tokenize(q_sentences)
    a_tensor, a_tokenizer = tokenize(a_sentences)

    return q_tensor, a_tensor, q_tokenizer, a_tokenizer
函数“load_dataset”返回的“q_tensor”和“a_tensor”分别是用id 表示的数据集中的问题部分和回答部分,“q_tokenizer”和“a_ tokenizer”分别是根据问题和回答建立的词汇表。

2. 模型搭建

本项目使用的是如图 2 所示的编码器-解码器模型。
编码器-解码器模型
图 2:编码器-解码器模型

这里我们将使用 Bahdanau Attention,这是第一次被应用到自然语言处理领域的注意力机制。注意力机制的基本思想在前面已经介绍过,其主要目的是在解码过程中能够自动地为每一个输出寻找对应的相关度最高的输入。

为此,我们需要计算 Attention 的权重(ats)、上下文向量(ct),以及 Attention 向量(at):


公式 1 中的“score”在 Bahdanau Attention 中的计算公式如下:


接下来开始实现模型部分的代码,首先是编码器部分:
class Encoder(tf.keras.Model):
    """编码器"""
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.enc_units, return_sequences=True, return_state=True , recurrent_initializer = 'glorot_uniform')

    def call(self, x, hidden):
        x = self.embedding(x)
        output, state = self.gru(x, initial_state=hidden)
        return output, state

    def initialize_hidden_state(self):
        return tf.zeros((self.batch_sz, self.enc_units))
在第 7 行代码中,我们添加了一个“Embedding”层,用来训练词向量,并将由 id 表示的句子转化为向量表示。

接下来是 Bahdanau Attention 的实现:
class BahdanauAttention(tf.keras.Model):
    """Bahdanau attention"""
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, query, values):
        # query为编码器最后一个时间步的隐状态(hidden )
        #values为编码器部分的输出,即每个时间步的隐状态,形状为(batch_size, max_length, hidden size)
        #query 的形状为(batch_size, hidden size)
        #为了后续计算,需要将query的形状转为(batch_size, 1, hidden size)
        hidden_with_time_axis = tf.expand_dims(query, 1)

        #计算 Score 和 attention_weights
        #score 的形状为(batch_size, max_length, 1)
        score = self.V(tf.nn.tanh(
            self.W1(values) + self.W2(hidden_with_time_axis)))

        #attention_weights 的形状为(batch_size, max_length, 1)
        attention_weights = tf.nn.softmax(score, axis=1)

        #计算 Context Vector,形状为(batch_size, max_length, hidden size)
        context vector = attention weights * values

        # 求和之后的形状为(batch_size, hidden_size)
        context_vector = tf.reduce_sum(context_vector, axis=l)

        return context_vector, attention_weights
在第 19 行代码中,“self.W1(values)”计算结果的维度是 (batch_size, max_length, units_length),“self.W2(hidden_with_time_axis)” 计算结果的维度是 (batch_size, 1, units_length),在 TensorFlow 中两者相加的方式是将后者以向量的形式逐行加到前者的第二维矩阵中。

在第 25 行代码中,将 Attention 的权重与编码器部分的输出(即每个时间步的隐状态相乘),再基于第二维矩阵求和得到上下文向量。

在解码器中将基于上下文向量计算得到最终的输出:
class Decoder(tf.keras.Model):
    """解码器"""
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super(Decoder, self).__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.dec_units,
                                        return_sequences=True,
                                        return_state=True,
recurrent_initializer= 'glorot_uniform')
        self.fc = tf.keras.layers.Dense(vocab_size)
        #attention
        self.attention = BahdanauAttention(self.dec_units)

    def call(self, x, hidden, enc_output):
        #获得 Context Vector 和 Attention Weights
        context_vector, attention_weights=self.attention(hidden, enc_output)

        #编码之后 x 的形状为(batch_size, 1, embedding_dim)
        x = self.embedding(x)

        #将context_vector和输入 x 拼接,
        #拼接后的形状为(batch_size, 1, embedding_dim+hidden_size)
        #这里的hidden_size即context_vector向量的长度
        x=tf.concat([tf.expand_dims(context_vector,1),x],axis=-1)

        #拼接后输入GRU网络
        output, state = self.gru(x)

        #Reshape 操作前 output 形状为 (batch_size, 1, hidden_size)
        #Reshape 操作后 output 形状为 (batch_size, hidden_size)
        output = tf.reshape(output, (-1, output.shape[2]))

        #x的形状为(batch_ size, vocab)
        x = self.fc(output)

        return x, state, attention_weights
在解码器中,我们和编码器一样只使用了一个单层的 GRU 网络。

3. 模型训练

接下来实现模型的训练部分,首先定义损失函数和优化器:
def loss_function(real, pred):
    """交叉熵损失函数"""
    #返回非零值(去掉了序列不够长时填补的零)
    mask = tf.math.logical_not (tf.math.equal(real, 0))
    #交叉炳损失
    loss_object=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
    loss_ = loss_object(real, pred)
    # 将 mask 转为 loss_. dtype 类型
    mask = tf.cast(mask, dtype=loss_.dtype)
    #计算损失
    loss_ *= mask

    #每次计算的是一个批次的数据,因此要求平均损失
    return tf.reduce_mean(loss_)

#使用Adam优化器
optimizer = tf.keras.optimizers.Adam()

第 1 行代码中的函数“loss_function”接收两个参数:“real”为训练数据的真实标签,“pred”为模型预测的标签。在第 4 行代码中,我们去掉了真实类标中“补零”的部分,由于我们在数据预处理时将所有句子填充为了统一长度,较短的句子后面都补了零,因此在计算损失时需要消除这一影响。由于我们每次丢入模型的都是一个批次的数据,因此在计算得到一个批次数据的损失之后要取平均值,作为一个训练样本的损失。

定义一个“train_step”函数控制模型的训练过程:
@tf.function
def train_step(q, a, enc_hidden):
    loss = 0

    with tf.GradientTape() as tape:
        enc_output, enc_hidden = encoder(q, enc_hidden)
        dec_hidden = enc_hidden
        #解码器第一个时间步的输入
        dec_input=tf.expand_dims([a_tokenizer.word_index['<start>']] * BATCH_SIZE, 1)

        #逐个时间步进行解码
            for t in range(1, a.shape[1]):
            predictions, dec_hidden, _=decoder (dec_input, dec_hidden, enc_output)
            #计算当前时间步的损失
            loss += loss_function(a[:, t], predictions)
            #使用Teacher Forcing方法,该方法要求模型的生成结果必须和参考句
            # 一一对应
            dec_input = tf.expand_dims(a[:, t], 1)

    #要输出的一个批次的损失(取解码器中所有时间步损失的平均值)
    batch_loss = (loss/int(a.shape[1]))

    #优化参数
    variables=encoder.trainable_variables+decoder.trainable_variables
    #计算梯度
    gradients = tape.gradient(loss, variables)
    #使用Adam优化器更新参数
    optimizer.apply_gradients(zip(gradients, variables))

    return batch_loss
这里需要注意一下,在第 12 行的 for 循环里,我们是逐个时间步进行解码的。至此,整个项目的主要部分代码都已经实现了,另外为了执行训练和测试,我们还需要定义“train”和“test”两个函数,由于这两个函数没有重点和难点需要讲解,这里就不再给出具体实现了。

测试效果如图 3 所示。
命令行下的聊天测试效果
图 3:命令行下的聊天测试效果