基于循环神经网络(RNN)的古诗生成器

转载请注明出处:https://blog.csdn.net/aaronjny/article/details/79677457
<https://blog.csdn.net/aaronjny/article/details/79677457>

之前在手机百度上看到有个“为你写诗”功能,能够随机生成古诗,当时感觉很酷炫= =

在学习了深度学习后,了解了一下原理,打算自己做个实现练练手,于是,就有了这个项目。文中如有瑕疵纰漏之处,还请路过的诸位大佬不吝赐教,万分感谢!

使用循环神经网络实现的古诗生成器,能够完成古体诗的自动生成。我简单地训练了一下,格式是对上了,至于意境么。。。emmm,呵呵

举一下模型测试结果例子:

1.生成古体诗

示例1:

树阴飞尽水三依,谩自为能厚景奇。
莫怪仙舟欲西望,楚人今此惜春风。

示例2:

岩外前苗点有泉,紫崖烟霭碧芊芊。
似僧月明秋更好,一踪颜事欲犹伤?

2.生成藏头诗(以“天空”为例)

示例:

天序曾柏乌倾鱼,空老桐歌尘翁红。

下面记录项目实现过程(由于都是文本处理方面,跟前一个项目存在很多类似的内容,对于这部分内容,我就只简单提一下,不展开了,新的东西再具体说):

1.数据预处理

数据集使用四万首的唐诗训练集,可以点击这里
<https://github.com/AaronJny/peotry_generate/blob/master/origin_data/poetry.txt>
进行下载。

数据预处理的过程与前一个项目TensorFlow练手项目一:使用循环神经网络(RNN)实现影评情感分类
<https://blog.csdn.net/aaronjny/article/details/79561115>
大同小异,可以参考前一个项目,这里就不多说了,直接上代码。
# -*- coding: utf-8 -*- # @Time : 18-3-13 上午11:04 # @Author : AaronJny #
@Email : [email protected] import sys reload(sys) sys.setdefaultencoding('utf8')
import collections ORIGIN_DATA = 'origin_data/poetry.txt' # 源数据路径 OUTPUT_DATA =
'processed_data/poetry.txt' # 输出向量路径 VOCAB_DATA = 'vocab/poetry.vocab' def
word_to_id(word, id_dict): if word in id_dict: return id_dict[word] else: return
id_dict['<unknow>'] poetry_list = [] # 存放唐诗的数组 # 从文件中读取唐诗 with
open(ORIGIN_DATA,'r') as f: f_lines = f.readlines() print '唐诗总数 : {}'
.format(len(f_lines))# 逐行进行处理 for line in f_lines: # 去除前后空白符,转码 strip_line =
line.strip().decode('utf8') try: # 将唐诗分为标题和内容 title, content = strip_line.split(
':') except: # 出现多个':'的将被舍弃 continue # 去除内容中的空格 content =
content.strip().replace(' ', '') # 舍弃含有非法字符的唐诗 if '(' in content or '(' in
contentor '<' in content or '《' in content or '_' in content or '[' in content:
continue # 舍弃过短或过长的唐诗 lenth = len(content) if lenth < 20 or lenth > 100:
continue # 加入列表 poetry_list.append('s' + content + 'e') print '用于训练的唐诗数 : {}'
.format(len(poetry_list)) poetry_list=sorted(poetry_list,key=lambda x:len(x))
words_list = []# 获取唐诗中所有的字符 for poetry in poetry_list: words_list.extend([word
for word in poetry]) # 统计其出现的次数 counter = collections.Counter(words_list) # 排序
sorted_words = sorted(counter.items(), key=lambda x: x[1], reverse=True) #
获得出现次数降序排列的字符列表 words_list = ['<unknow>'] + [x[0] for x in sorted_words] #
这里选择保留高频词的数目,词只有不到七千个,所以我全部保留 words_list = words_list[:len(words_list)] print
'词汇表大小 : {}'.format(words_list) with open(VOCAB_DATA, 'w') as f: for word in
words_list: f.write(word +'\n') # 生成单词到id的映射 word_id_dict =
dict(zip(words_list, range(len(words_list))))# 将poetry_list转换成向量形式 id_list=[]
for poetry in poetry_list: id_list.append([str(word_to_id(word,word_id_dict))
for word in poetry]) # 将向量写入文件 with open(OUTPUT_DATA, 'w') as f: for id_l in
id_list: f.write(' '.join(id_l) + '\n')
2.模型编写


这里要编写两个模型,一个用于训练,一个用于验证(生成古体诗)。两个模型大体上一致,因为用途不同,所以有些细节有出入。当进行验证时,验证模型读取训练模型的参数进行覆盖。

注释比较细,就不多说了,看代码。对于两个模型不同的一些关键细节,我也用注释进行了说明。
# -*- coding: utf-8 -*- # @Time : 18-3-13 下午2:06 # @Author : AaronJny # @Email
: [email protected] import tensorflow as tf import functools import setting
HIDDEN_SIZE =128 # LSTM隐藏节点个数 NUM_LAYERS = 2 # RNN深度 def doublewrap(function):
@functools.wraps(function) def decorator(*args, **kwargs): if len(args) == 1 and
len(kwargs) ==0 and callable(args[0]): return function(args[0]) else: return
lambda wrapee: function(wrapee, *args, **kwargs) return decorator @doublewrap
def define_scope(function, scope=None, *args, **kwargs): attribute = '_cache_'
+ function.__name__ name = scopeor function.__name__ @property
@functools.wraps(function) def decorator(self): if not hasattr(self, attribute):
with tf.variable_scope(name, *args, **kwargs): setattr(self, attribute,
function(self))return getattr(self, attribute) return decorator class TrainModel
(object): """ 训练模型 """ def __init__(self, data, labels, emb_keep, rnn_keep):
self.data = data# 数据 self.labels = labels # 标签 self.emb_keep = emb_keep #
embedding层dropout保留率 self.rnn_keep = rnn_keep # lstm层dropout保留率
self.global_step self.cell self.predict self.loss [email protected]_scope def
cell(self): """ rnn网络结构 :return: """ lstm_cell = [
tf.nn.rnn_cell.DropoutWrapper(tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE),
output_keep_prob=self.rnn_keep)for _ in range(NUM_LAYERS)] cell =
tf.nn.rnn_cell.MultiRNNCell(lstm_cell)return cell @define_scope def predict
(self): """ 定义前向传播 :return: """ # 创建词嵌入矩阵权重 embedding = tf.get_variable(
'embedding', shape=[setting.VOCAB_SIZE, HIDDEN_SIZE]) # 创建softmax层参数 if
setting.SHARE_EMD_WITH_SOFTMAX: softmax_weights = tf.transpose(embedding)else:
softmax_weights = tf.get_variable('softmaweights', shape=[HIDDEN_SIZE,
setting.VOCAB_SIZE]) softmax_bais = tf.get_variable('softmax_bais',
shape=[setting.VOCAB_SIZE])# 进行词嵌入 emb = tf.nn.embedding_lookup(embedding,
self.data)# dropout emb_dropout = tf.nn.dropout(emb, self.emb_keep) #
计算循环神经网络的输出 self.init_state = self.cell.zero_state(setting.BATCH_SIZE,
dtype=tf.float32) outputs, last_state = tf.nn.dynamic_rnn(self.cell,
emb_dropout, scope='d_rnn', dtype=tf.float32, initial_state=self.init_state)
outputs = tf.reshape(outputs, [-1, HIDDEN_SIZE]) # 计算logits logits =
tf.matmul(outputs, softmax_weights) + softmax_baisreturn logits @define_scope
def loss(self): """ 定义损失函数 :return: """ # 计算交叉熵 outputs_target =
tf.reshape(self.labels, [-1]) loss =
tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.predict,
labels=outputs_target, )# 平均 cost = tf.reduce_mean(loss) return cost
@define_scope def global_step(self): """ global_step :return: """ global_step =
tf.Variable(0, trainable=False) return global_step @define_scope def optimize
(self): """ 定义反向传播过程 :return: """ # 学习率衰减 learn_rate =
tf.train.exponential_decay(setting.LEARN_RATE, self.global_step,
setting.LR_DECAY_STEP, setting.LR_DECAY)# 计算梯度,并防止梯度爆炸 trainable_variables =
tf.trainable_variables() grads, _ =
tf.clip_by_global_norm(tf.gradients(self.loss, trainable_variables),
setting.MAX_GRAD)# 创建优化器,进行反向传播 optimizer = tf.train.AdamOptimizer(learn_rate)
train_op = optimizer.apply_gradients(zip(grads, trainable_variables),
self.global_step)return train_op class EvalModel(object): """ 验证模型 """ def
__init__(self, data, emb_keep, rnn_keep): self.data = data # 输入 self.emb_keep =
emb_keep# embedding层dropout保留率 self.rnn_keep = rnn_keep # lstm层dropout保留率
self.cell self.predict [email protected]_scope def cell(self): """ rnn网络结构
:return: """ lstm_cell = [
tf.nn.rnn_cell.DropoutWrapper(tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE),
output_keep_prob=self.rnn_keep)for _ in range(NUM_LAYERS)] cell =
tf.nn.rnn_cell.MultiRNNCell(lstm_cell)return cell @define_scope def predict
(self): """ 定义前向传播过程 :return: """ embedding = tf.get_variable('embedding',
shape=[setting.VOCAB_SIZE, HIDDEN_SIZE])if setting.SHARE_EMD_WITH_SOFTMAX:
softmax_weights = tf.transpose(embedding)else: softmax_weights =
tf.get_variable('softmaweights', shape=[HIDDEN_SIZE, setting.VOCAB_SIZE])
softmax_bais = tf.get_variable('softmax_bais', shape=[setting.VOCAB_SIZE]) emb
= tf.nn.embedding_lookup(embedding, self.data) emb_dropout = tf.nn.dropout(emb,
self.emb_keep)# 与训练模型不同,这里只要生成一首古体诗,所以batch_size=1 self.init_state =
self.cell.zero_state(1, dtype=tf.float32) outputs, last_state =
tf.nn.dynamic_rnn(self.cell, emb_dropout, scope='d_rnn', dtype=tf.float32,
initial_state=self.init_state) outputs = tf.reshape(outputs, [-1, HIDDEN_SIZE])
logits = tf.matmul(outputs, softmax_weights) + softmax_bais#
与训练模型不同,这里要记录最后的状态,以此来循环生成字,直到完成一首诗 self.last_state = last_state return logits
@define_scope def prob(self): """ softmax计算概率 :return: """ probs =
tf.nn.softmax(self.predict)return probs
3.组织数据集

编写一个类用于组织数据,方便训练使用。代码很简单,应该不存在什么问题。
# -*- coding: utf-8 -*- # @Time : 18-3-13 上午11:59 # @Author : AaronJny #
@Email : [email protected] import numpy as np BATCH_SIZE = 64 DATA_PATH =
'processed_data/poetry.txt' class Dataset(object): def __init__(self,
batch_size): self.batch_size = batch_size self.data, self.target =
self.read_data() self.start =0 self.lenth = len(self.data) def read_data(self):
""" 从文件中读取数据,构建数据集 :return: 训练数据,训练标签 """ # 从文件中读取唐诗向量 id_list = [] with
open(DATA_PATH,'r') as f: f_lines = f.readlines() for line in f_lines:
id_list.append([int(num)for num in line.strip().split()]) # 计算可以生成多少个batch
num_batchs = len(id_list) // self.batch_size# data和target x_data = [] y_data =
[]# 生成batch for i in range(num_batchs): # 截取一个batch的数据 start = i *
self.batch_size end = start + self.batch_size batch = id_list[start:end]# 计算最大长度
max_lenth = max(map(len, batch))# 填充 tmp_x = np.full((self.batch_size,
max_lenth),0, dtype=np.int32) # 数据覆盖 for row in range(self.batch_size):
tmp_x[row, :len(batch[row])] = batch[row] tmp_y = np.copy(tmp_x) tmp_y[:, :-1]
= tmp_y[:,1:] x_data.append(tmp_x) y_data.append(tmp_y) return x_data, y_data
def next_batch(self): """ 获取下一个batch :return: """ start = self.start self.start
+=1 if self.start >= self.lenth: self.start = 0 return self.data[start],
self.target[start]if __name__ == '__main__': dataset = Dataset(BATCH_SIZE)
dataset.read_data()
4.训练模型

万事俱备,开始训练。

没有按照epoch进行训练,这里只是循环训练指定个mini_batch。

训练过程中,会定期显示当前训练步数以及loss值。会定期保存当前模型及对应checkpoint。

训练代码:
# -*- coding: utf-8 -*- # @Time : 18-3-13 下午2:50 # @Author : AaronJny # @Email
: [email protected] import tensorflow as tf from rnn_models import TrainModel
import dataset import setting TRAIN_TIMES = 30000 # 迭代总次数(没有计算epoch) SHOW_STEP =
1 # 显示loss频率 SAVE_STEP = 100 # 保存模型参数频率 x_data = tf.placeholder(tf.int32,
[setting.BATCH_SIZE,None]) # 输入数据 y_data = tf.placeholder(tf.int32,
[setting.BATCH_SIZE,None]) # 标签 emb_keep = tf.placeholder(tf.float32) #
embedding层dropout保留率 rnn_keep = tf.placeholder(tf.float32) # lstm层dropout保留率
data = dataset.Dataset(setting.BATCH_SIZE)# 创建数据集 model = TrainModel(x_data,
y_data, emb_keep, rnn_keep)# 创建训练模型 saver = tf.train.Saver() with tf.Session()
as sess: sess.run(tf.global_variables_initializer()) # 初始化 for step in
range(TRAIN_TIMES):# 获取训练batch x, y = data.next_batch() # 计算loss loss, _ =
sess.run([model.loss, model.optimize], {model.data: x, model.labels: y,
model.emb_keep: setting.EMB_KEEP, model.rnn_keep: setting.RNN_KEEP})if step %
SHOW_STEP ==0: print 'step {}, loss is {}'.format(step, loss) # 保存模型 if step %
SAVE_STEP ==0: saver.save(sess, setting.CKPT_PATH,
global_step=model.global_step)
5.验证模型

提供两种方法验证模型:

*
随机生成古体诗

*
生成藏头诗

随机生成的结果勉强可以接受,起码格式对了,看起来也像个样子。

生成藏头诗就五花八门了,效果不好,往往要多次才能生成一个差强人意的。emmm,其实也可以理解,毕竟我们指定的“藏头”在训练集中的分布是不能保证的。

这里简单说一下生成古体诗的过程:

*
1.首先,读取训练模型保存的参数,覆盖验证模型的参数

*

2.将开始符号’s’作为输入,喂给模型,模型将输出下一个字符为此表中各词的概率,以及rnn传递的state。注意,验证模型时,dropout的保留率应设置为1.0

*
3.根据2中输出的概率,使用轮盘赌法,随机出下一个字

*
4.将随机出来的字作为输入,前一次输出的state作为本次输入的state,喂给模型,模型将输入下一个字符为此表中各词的概率,以及rnn传递的state

*
5.重复3,4步骤,直到随机出结束符’e’,生成结束。过程中生成的所有字符,构成本次生成的古体诗(’s’和’e’不算)

生成藏头诗的过程与生成古体诗是类似的,主要区别在于,在开始和每个标点符号被预测出来时,向模型喂给的是“藏头”中的一个字,就不多说了,详情可参考代码。
# -*- coding: utf-8 -*- # @Time : 18-3-13 下午2:50 # @Author : AaronJny # @Email
: [email protected] import sys reload(sys) sys.setdefaultencoding('utf8') import
tensorflowas tf import numpy as np from rnn_models import EvalModel import utils
import os # 指定验证时不使用cuda,这样可以在用gpu训练的同时,使用cpu进行验证 os.environ[
'CUDA_VISIBLE_DEVICES'] = '' x_data = tf.placeholder(tf.int32, [1, None])
emb_keep = tf.placeholder(tf.float32) rnn_keep = tf.placeholder(tf.float32)#
验证用模型 model = EvalModel(x_data, emb_keep, rnn_keep) saver = tf.train.Saver() #
单词到id的映射 word2id_dict = utils.read_word_to_id_dict() # id到单词的映射 id2word_dict =
utils.read_id_to_word_dict()def generate_word(prob): """
选择概率最高的前100个词,并用轮盘赌法选取最终结果 :param prob: 概率向量 :return: 生成的词 """ prob =
sorted(prob, reverse=True)[:100] index = np.searchsorted(np.cumsum(prob),
np.random.rand(1) * np.sum(prob)) return id2word_dict[int(index)] # def
generate_word(prob): # """ # 从所有词中,使用轮盘赌法选取最终结果 # :param prob: 概率向量 # :return:
生成的词 # """ # index = int(np.searchsorted(np.cumsum(prob), np.random.rand(1) *
np.sum(prob))) # return id2word_dict[index] def generate_poem(): """ 随机生成一首诗歌
:return: """ with tf.Session() as sess: # 加载最新的模型 ckpt =
tf.train.get_checkpoint_state('ckpt') saver.restore(sess,
ckpt.model_checkpoint_path)# 预测第一个词 rnn_state = sess.run(model.cell.zero_state(1
, tf.float32)) x = np.array([[word2id_dict['s']]], np.int32) prob, rnn_state =
sess.run([model.prob, model.last_state], {model.data: x, model.init_state:
rnn_state, model.emb_keep:1.0, model.rnn_keep: 1.0}) word = generate_word(prob)
poem ='' # 循环操作,直到预测出结束符号‘e’ while word != 'e': poem += word x =
np.array([[word2id_dict[word]]]) prob, rnn_state = sess.run([model.prob,
model.last_state], {model.data: x, model.init_state: rnn_state, model.emb_keep:
1.0, model.rnn_keep: 1.0}) word = generate_word(prob) # 打印生成的诗歌 print poem def
generate_acrostic(head): """ 生成藏头诗 :param head:每行的第一个字组成的字符串 :return: """ with
tf.Session()as sess: # 加载最新的模型 ckpt = tf.train.get_checkpoint_state('ckpt')
saver.restore(sess, ckpt.model_checkpoint_path)# 进行预测 rnn_state =
sess.run(model.cell.zero_state(1, tf.float32)) poem = '' cnt = 1 # 一句句生成诗歌 for x
in head: word = x while word != ',' and word != '。': poem += word x =
np.array([[word2id_dict[word]]]) prob, rnn_state = sess.run([model.prob,
model.last_state], {model.data: x, model.init_state: rnn_state, model.emb_keep:
1.0, model.rnn_keep: 1.0}) word = generate_word(prob) if len(poem) > 25: print
'bad.' break # 根据单双句添加标点符号 if cnt & 1: poem += ',' else: poem += '。' cnt += 1 #
打印生成的诗歌 print poem return poem if __name__ == '__main__': #
generate_acrostic(u'天空') generate_poem()
6.一些提取出来的方法和配置

很简单,不多说。

utils.py
# -*- coding: utf-8 -*- # @Time : 18-3-13 下午4:16 # @Author : AaronJny # @Email
: [email protected] import setting def read_word_list(): """ 从文件读取词汇表 :return:
词汇列表 """ with open(setting.VOCAB_PATH, 'r') as f: word_list = [word for word in
f.read().decode('utf8').strip().split('\n')] return word_list def
read_word_to_id_dict(): """ 生成单词到id的映射 :return: """ word_list=read_word_list()
word2id=dict(zip(word_list,range(len(word_list))))return word2id def
read_id_to_word_dict(): """ 生成id到单词的映射 :return: """ word_list=read_word_list()
id2word=dict(zip(range(len(word_list)),word_list))return id2word if __name__ ==
'__main__': read_id_to_word_dict()
setting.py
# -*- coding: utf-8 -*- # @Time : 18-3-13 下午3:08 # @Author : AaronJny # @Email
: [email protected] VOCAB_SIZE = 6272 # 词汇表大小 SHARE_EMD_WITH_SOFTMAX = True #
是否在embedding层和softmax层之间共享参数 MAX_GRAD = 5.0 # 最大梯度,防止梯度爆炸 LEARN_RATE = 0.0005 #
初始学习率 LR_DECAY = 0.92 # 学习率衰减 LR_DECAY_STEP = 600 # 衰减步数 BATCH_SIZE = 64 #
batch大小 CKPT_PATH = 'ckpt/model_ckpt' # 模型保存路径 VOCAB_PATH = 'vocab/poetry.vocab'
# 词表路径 EMB_KEEP = 0.5 # embedding层dropout保留率 RNN_KEEP = 0.5 # lstm层dropout保留率
7.完毕

编码到此结束,有兴趣的朋友可以自己跑一跑,玩一玩,我就不多做测试了。

项目GitHub地址:https://github.com/AaronJny/peotry_generate
<https://github.com/AaronJny/peotry_generate>

博主也正在学习,能力浅薄,文中如有瑕疵纰漏之处,还请路过的诸位大佬不吝赐教,万分感谢!