Keras 建立模型的方法有 3 种,分别是使用 Sequential API(序列接口)、Functional API(函数接口)和 模型子类化(Model Subclassing)。
方法特点
- Sequential API:当你试图使用单个输入、输出和层分支构建简单模型时,Sequential API 是最好的方法。对于想快速学习的新手来说,这也是一个很好的选择。
- Functional API:函数 API 是构建 Keras 模型的最常用的方法。它可以完成 Sequential API 所能做的一切。此外,它允许多个输入、多个输出、分支和层共享。它是一种简洁易用的方法,而且仍然允许很好的定制灵活性。
- Model Subclassing:模型子类化是为需要完全控制模型、层和训练过程的高级开发人员设计的。你需要创建一个定义模型的自定义类,而且你可能不需要它来执行日常任务。但是,如果你是一个有实验需求的研究人员,那么模型子类化可能是最好的选择,因为它会给你所有你需要的灵活性。
三种构建方法
序列 API
序列 API 主要是用 tf.keras.Sequential 类将所有层进行串联,其中所有层作为一个单独的层来传递。以 nlp 序列标注任务的 embedding + bilstm + crf 模型为例:
import tensorflow as tf
def build_model(vocab_size: int, max_len=100, emb_dim=128, class_num=14):
model = tf.keras.Sequential(name='seq_tagging')
model.add(tf.keras.layers.Input(shape=(max_len,), dtype=tf.int32, name='word_id_input_layer'))
model.add(tf.keras.layers.Embedding(vocab_size + 1, emb_dim, name='embedding_layer'))
model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(emb_dim, return_sequences=True), name='BiLSTM_layer'))
model.add(tf.keras.layers.Dense(class_num, name='dense_layer'))
crf = CRF(class_num, name='crf_layer')
model.add(crf)
model.summary()
model.compile(tf.keras.optimizers.Adam(), loss=crf.get_loss, metrics=[crf.get_accuracy])
return model
示例中,tensorflow 使用的是 2.4.1 版本,其中 CRF 是 tensorflow_addons 模块中的提供的 CRF 的改写(主要是 call 方法微调为输出一个结果,配合 Sequential API 示例;此外,增加了 get_loss 和 get_accuracy 函数)。
CRF 代码如下:
import tensorflow as tf
from tensorflow_addons.text.crf import crf_decode, crf_log_likelihood
from tensorflow_addons.utils import types
from typeguard import typechecked
class CRF(tf.keras.layers.Layer):
@typechecked
def __init__(
self,
units: int,
chain_initializer: types.Initializer = "orthogonal",
use_boundary: bool = True,
boundary_initializer: types.Initializer = "zeros",
use_kernel: bool = True,
**kwargs,
):
super().__init__(**kwargs)
# setup mask supporting flag, used by base class (the Layer)
# because base class's init method will set it to False unconditionally
# So this assigned must be executed after call base class's init method
self.supports_masking = True
self.units = units # numbers of tags
self.use_boundary = use_boundary
self.use_kernel = use_kernel
self.chain_initializer = tf.keras.initializers.get(chain_initializer)
self.boundary_initializer = tf.keras.initializers.get(boundary_initializer)
# value remembered for loss/metrics function
self.potentials = None
self.sequence_length = None
self.mask = None
# weights that work as transfer probability of each tags
self.chain_kernel = self.add_weight(
shape=(self.units, self.units),
name="chain_kernel",
initializer=self.chain_initializer,
)
# weight of <START> to tag probability and tag to <END> probability
if self.use_boundary:
self.left_boundary = self.add_weight(
shape=(self.units,),
name="left_boundary",
initializer=self.boundary_initializer,
)
self.right_boundary = self.add_weight(
shape=(self.units,),
name="right_boundary",
initializer=self.boundary_initializer,
)
if self.use_kernel:
self._dense_layer = tf.keras.layers.Dense(
units=self.units, dtype=self.dtype
)
else:
self._dense_layer = lambda x: tf.cast(x, dtype=self.dtype)
def call(self, inputs, mask=None):
# mask: Tensor(shape=(batch_size, sequence_length), dtype=bool) or None
if mask is not None:
if tf.keras.backend.ndim(mask) != 2:
raise ValueError("Input mask to CRF must have dim 2 if not None")
if mask is not None:
# left padding of mask is not supported, due the underline CRF function
# detect it and report it to user
left_boundary_mask = self._compute_mask_left_boundary(mask)
first_mask = left_boundary_mask[:, 0]
if first_mask is not None and tf.executing_eagerly():
no_left_padding = tf.math.reduce_all(first_mask)
left_padding = not no_left_padding
if left_padding:
raise NotImplementedError(
"Currently, CRF layer do not support left padding"
)
# for acc use later, added by yk
self.mask = mask
self.potentials = self._dense_layer(inputs)
# appending boundary probability info
if self.use_boundary:
self.potentials = self.add_boundary_energy(
self.potentials, mask, self.left_boundary, self.right_boundary
)
self.sequence_length = self._get_sequence_length(inputs, mask)
decoded_sequence, _ = self.get_viterbi_decoding(self.potentials, self.sequence_length)
# return [decoded_sequence, self.potentials, self.sequence_length, self.chain_kernel]
return decoded_sequence
def _get_sequence_length(self, input_, mask):
"""Currently underline CRF fucntion (provided by
tensorflow_addons.text.crf) do not support bi-direction masking (left
padding / right padding), it support right padding by tell it the
sequence length.
this function is compute the sequence length from input and
mask.
"""
if mask is not None:
sequence_length = self.mask_to_sequence_length(mask)
else:
# make a mask tensor from input, then used to generate sequence_length
input_energy_shape = tf.shape(input_)
raw_input_shape = tf.slice(input_energy_shape, [0], [2])
alt_mask = tf.ones(raw_input_shape)
sequence_length = self.mask_to_sequence_length(alt_mask)
return sequence_length
def mask_to_sequence_length(self, mask):
"""compute sequence length from mask."""
sequence_length = tf.reduce_sum(tf.cast(mask, tf.int64), 1)
return sequence_length
@staticmethod
def _compute_mask_right_boundary(mask):
"""input mask: 0011100, output right_boundary: 0000100."""
# shift mask to left by 1: 0011100 => 0111000
offset = 1
left_shifted_mask = tf.concat(
[mask[:, offset:], tf.zeros_like(mask[:, :offset])], axis=1
)
# NOTE: below code is different from keras_contrib
# Original code in keras_contrib:
# end_mask = K.cast(
# K.greater(self.shift_left(mask), mask),
# K.floatx()
# )
# has a bug, confirmed
# by the original keras_contrib maintainer
# Luiz Felix (github: lzfelix),
# 0011100 > 0111000 => 0000100
right_boundary = tf.math.greater(
tf.cast(mask, tf.int32), tf.cast(left_shifted_mask, tf.int32)
)
return right_boundary
@staticmethod
def _compute_mask_left_boundary(mask):
"""input mask: 0011100, output left_boundary: 0010000."""
# shift mask to right by 1: 0011100 => 0001110
offset = 1
right_shifted_mask = tf.concat(
[tf.zeros_like(mask[:, :offset]), mask[:, :-offset]], axis=1
)
# 0011100 > 0001110 => 0010000
left_boundary = tf.math.greater(
tf.cast(mask, tf.int32), tf.cast(right_shifted_mask, tf.int32)
)
return left_boundary
def add_boundary_energy(self, potentials, mask, start, end):
def expand_scalar_to_3d(x):
# expand tensor from shape (x, ) to (1, 1, x)
return tf.reshape(x, (1, 1, -1))
start = tf.cast(expand_scalar_to_3d(start), potentials.dtype)
end = tf.cast(expand_scalar_to_3d(end), potentials.dtype)
if mask is None:
potentials = tf.concat(
[potentials[:, :1, :] + start, potentials[:, 1:, :]], axis=1
)
potentials = tf.concat(
[potentials[:, :-1, :], potentials[:, -1:, :] + end], axis=1
)
else:
mask = tf.keras.backend.expand_dims(tf.cast(mask, start.dtype), axis=-1)
start_mask = tf.cast(self._compute_mask_left_boundary(mask), start.dtype)
end_mask = tf.cast(self._compute_mask_right_boundary(mask), end.dtype)
potentials = potentials + start_mask * start
potentials = potentials + end_mask * end
return potentials
def get_viterbi_decoding(self, potentials, sequence_length):
# decode_tags: A [batch_size, max_seq_len] matrix, with dtype `tf.int32`
decode_tags, best_score = crf_decode(
potentials, self.chain_kernel, sequence_length
)
return decode_tags, best_score
def get_config(self):
# used for loading model from disk
config = {
"units": self.units,
"chain_initializer": tf.keras.initializers.serialize(
self.chain_initializer
),
"use_boundary": self.use_boundary,
"boundary_initializer": tf.keras.initializers.serialize(
self.boundary_initializer
),
"use_kernel": self.use_kernel,
}
base_config = super().get_config()
return {**base_config, **config}
def compute_output_shape(self, input_shape):
output_shape = input_shape[:2]
return output_shape
def compute_mask(self, input_, mask=None):
"""keep mask shape [batch_size, max_seq_len]"""
return mask
@property
def _compute_dtype(self):
# fixed output dtype from underline CRF functions
return tf.int32
def get_loss(self, y_true, y_pred):
self.potentials = tf.keras.backend.cast(self.potentials, tf.float32)
y_true = tf.keras.backend.cast(y_true, tf.int32)
self.sequence_length = tf.keras.backend.cast(self.sequence_length,
tf.int32)
# self.chain_kernel = tf.keras.backend.cast(self.chain_kernel,
# tf.float32)
log_likelihood, _ = crf_log_likelihood(
self.potentials, y_true, self.sequence_length, self.chain_kernel)
return -log_likelihood
def get_accuracy(self, y_true, y_pred):
judge = tf.keras.backend.cast(
tf.keras.backend.equal(y_pred, y_true), tf.keras.backend.floatx())
if self.mask is None:
return tf.keras.backend.mean(judge)
else:
mask = tf.keras.backend.cast(self.mask, tf.keras.backend.floatx())
return (tf.keras.backend.sum(judge * mask) /
tf.keras.backend.sum(mask))
示例中,tensorflow_addons 版本采用 0.13.0
函数 API
对于 Functional API,我们需要单独定义我们的输入,其实就是调用 call 函数的入参。然后,我们需要创建一个输出对象,同时创建所有层,这些层相互关联并与输出相关联。最后,我们创建一个接受输入和输出作为参数的模型对象。代码仍然非常干净,但是我们在 Functional API 中有了更大的灵活性。
如下示例用 Function API 的形式构建经典的文本分类模型 TextCNN,代码如下:
import tensorflow as tf
def build_model(vocab_size: int, max_len=100, emb_dim=128, class_num=13):
inputs = tf.keras.layers.Input(shape=(max_len,), dtype=tf.int32, name='word_id_input')
embedding = tf.keras.layers.Embedding(vocab_size + 1, emb_dim, input_length=max_len, name='embedding')(inputs)
cnn1 = tf.keras.layers.Conv1D(emb_dim, 2, padding='same', strides=1, activation=tf.keras.activations.relu)(
embedding)
cnn1 = tf.keras.layers.MaxPooling1D(pool_size=1, padding='same')(cnn1)
cnn2 = tf.keras.layers.Conv1D(emb_dim, 3, padding='same', strides=1, activation=tf.keras.activations.relu)(
embedding)
cnn2 = tf.keras.layers.MaxPooling1D(pool_size=1, padding='same')(cnn2)
cnn3 = tf.keras.layers.Conv1D(emb_dim, 4, padding='same', strides=1, activation=tf.keras.activations.relu)(
embedding)
cnn3 = tf.keras.layers.MaxPooling1D(pool_size=1, padding='same')(cnn3)
cnn = tf.keras.layers.concatenate([cnn1, cnn2, cnn3], axis=-1)
flat = tf.keras.layers.Flatten()(cnn)
drop = tf.keras.layers.Dropout(0.1)(flat)
intention_output = tf.keras.layers.Dense(class_num, activation=tf.keras.activations.softmax, name='softmax_layer')(
drop)
model = tf.keras.Model(inputs=inputs, outputs=intention_output, name="intention-model")
model.summary()
model.compile(tf.keras.optimizers.Adam(),
loss=tf.keras.losses.CategoricalCrossentropy(),
metrics=tf.keras.metrics.CategoricalAccuracy()
)
return model