Keras 基础教程

Keras 构建神经网络的 3 种方法


Keras 建立模型的方法有 3 种,分别是使用 Sequential API(序列接口)、Functional API(函数接口)和 模型子类化(Model Subclassing)。

方法特点

  • Sequential API:当你试图使用单个输入、输出和层分支构建简单模型时,Sequential API 是最好的方法。对于想快速学习的新手来说,这也是一个很好的选择。
  • Functional API:函数 API 是构建 Keras 模型的最常用的方法。它可以完成 Sequential API 所能做的一切。此外,它允许多个输入、多个输出、分支和层共享。它是一种简洁易用的方法,而且仍然允许很好的定制灵活性。
  • Model Subclassing:模型子类化是为需要完全控制模型、层和训练过程的高级开发人员设计的。你需要创建一个定义模型的自定义类,而且你可能不需要它来执行日常任务。但是,如果你是一个有实验需求的研究人员,那么模型子类化可能是最好的选择,因为它会给你所有你需要的灵活性。

 

三种构建方法

序列 API

序列 API 主要是用 tf.keras.Sequential 类将所有层进行串联,其中所有层作为一个单独的层来传递。以 nlp 序列标注任务的 embedding + bilstm + crf 模型为例:

import tensorflow as tf


def build_model(vocab_size: int, max_len=100, emb_dim=128, class_num=14):
    model = tf.keras.Sequential(name='seq_tagging')
    model.add(tf.keras.layers.Input(shape=(max_len,), dtype=tf.int32, name='word_id_input_layer'))
    model.add(tf.keras.layers.Embedding(vocab_size + 1, emb_dim, name='embedding_layer'))
    model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(emb_dim, return_sequences=True), name='BiLSTM_layer'))
    model.add(tf.keras.layers.Dense(class_num, name='dense_layer'))
    crf = CRF(class_num, name='crf_layer')
    model.add(crf)
    model.summary()
    model.compile(tf.keras.optimizers.Adam(), loss=crf.get_loss, metrics=[crf.get_accuracy])
    return model

示例中,tensorflow 使用的是 2.4.1 版本,其中 CRF 是 tensorflow_addons 模块中的提供的 CRF 的改写(主要是 call 方法微调为输出一个结果,配合 Sequential API 示例;此外,增加了 get_loss 和 get_accuracy 函数)。

CRF 代码如下:

import tensorflow as tf

from tensorflow_addons.text.crf import crf_decode, crf_log_likelihood
from tensorflow_addons.utils import types
from typeguard import typechecked


class CRF(tf.keras.layers.Layer):

    @typechecked
    def __init__(
            self,
            units: int,
            chain_initializer: types.Initializer = "orthogonal",
            use_boundary: bool = True,
            boundary_initializer: types.Initializer = "zeros",
            use_kernel: bool = True,
            **kwargs,
    ):
        super().__init__(**kwargs)

        # setup mask supporting flag, used by base class (the Layer)
        # because base class's init method will set it to False unconditionally
        # So this assigned must be executed after call base class's init method
        self.supports_masking = True

        self.units = units  # numbers of tags

        self.use_boundary = use_boundary
        self.use_kernel = use_kernel
        self.chain_initializer = tf.keras.initializers.get(chain_initializer)
        self.boundary_initializer = tf.keras.initializers.get(boundary_initializer)

        # value remembered for loss/metrics function
        self.potentials = None
        self.sequence_length = None
        self.mask = None

        # weights that work as transfer probability of each tags
        self.chain_kernel = self.add_weight(
            shape=(self.units, self.units),
            name="chain_kernel",
            initializer=self.chain_initializer,
        )

        # weight of <START> to tag probability and tag to <END> probability
        if self.use_boundary:
            self.left_boundary = self.add_weight(
                shape=(self.units,),
                name="left_boundary",
                initializer=self.boundary_initializer,
            )
            self.right_boundary = self.add_weight(
                shape=(self.units,),
                name="right_boundary",
                initializer=self.boundary_initializer,
            )

        if self.use_kernel:
            self._dense_layer = tf.keras.layers.Dense(
                units=self.units, dtype=self.dtype
            )
        else:
            self._dense_layer = lambda x: tf.cast(x, dtype=self.dtype)

    def call(self, inputs, mask=None):
        # mask: Tensor(shape=(batch_size, sequence_length), dtype=bool) or None

        if mask is not None:
            if tf.keras.backend.ndim(mask) != 2:
                raise ValueError("Input mask to CRF must have dim 2 if not None")

        if mask is not None:
            # left padding of mask is not supported, due the underline CRF function
            # detect it and report it to user
            left_boundary_mask = self._compute_mask_left_boundary(mask)
            first_mask = left_boundary_mask[:, 0]
            if first_mask is not None and tf.executing_eagerly():
                no_left_padding = tf.math.reduce_all(first_mask)
                left_padding = not no_left_padding
                if left_padding:
                    raise NotImplementedError(
                        "Currently, CRF layer do not support left padding"
                    )

        # for acc use later, added by yk
        self.mask = mask

        self.potentials = self._dense_layer(inputs)

        # appending boundary probability info
        if self.use_boundary:
            self.potentials = self.add_boundary_energy(
                self.potentials, mask, self.left_boundary, self.right_boundary
            )

        self.sequence_length = self._get_sequence_length(inputs, mask)

        decoded_sequence, _ = self.get_viterbi_decoding(self.potentials, self.sequence_length)

        # return [decoded_sequence, self.potentials, self.sequence_length, self.chain_kernel]
        return decoded_sequence

    def _get_sequence_length(self, input_, mask):
        """Currently underline CRF fucntion (provided by
        tensorflow_addons.text.crf) do not support bi-direction masking (left
        padding / right padding), it support right padding by tell it the
        sequence length.

        this function is compute the sequence length from input and
        mask.
        """
        if mask is not None:
            sequence_length = self.mask_to_sequence_length(mask)
        else:
            # make a mask tensor from input, then used to generate sequence_length
            input_energy_shape = tf.shape(input_)
            raw_input_shape = tf.slice(input_energy_shape, [0], [2])
            alt_mask = tf.ones(raw_input_shape)

            sequence_length = self.mask_to_sequence_length(alt_mask)

        return sequence_length

    def mask_to_sequence_length(self, mask):
        """compute sequence length from mask."""
        sequence_length = tf.reduce_sum(tf.cast(mask, tf.int64), 1)
        return sequence_length

    @staticmethod
    def _compute_mask_right_boundary(mask):
        """input mask: 0011100, output right_boundary: 0000100."""
        # shift mask to left by 1: 0011100 => 0111000
        offset = 1
        left_shifted_mask = tf.concat(
            [mask[:, offset:], tf.zeros_like(mask[:, :offset])], axis=1
        )

        # NOTE: below code is different from keras_contrib
        # Original code in keras_contrib:
        # end_mask = K.cast(
        #   K.greater(self.shift_left(mask), mask),
        #   K.floatx()
        # )
        # has a bug, confirmed
        # by the original keras_contrib maintainer
        # Luiz Felix (github: lzfelix),

        # 0011100 > 0111000 => 0000100
        right_boundary = tf.math.greater(
            tf.cast(mask, tf.int32), tf.cast(left_shifted_mask, tf.int32)
        )

        return right_boundary

    @staticmethod
    def _compute_mask_left_boundary(mask):
        """input mask: 0011100, output left_boundary: 0010000."""
        # shift mask to right by 1: 0011100 => 0001110
        offset = 1
        right_shifted_mask = tf.concat(
            [tf.zeros_like(mask[:, :offset]), mask[:, :-offset]], axis=1
        )

        # 0011100 > 0001110 => 0010000
        left_boundary = tf.math.greater(
            tf.cast(mask, tf.int32), tf.cast(right_shifted_mask, tf.int32)
        )

        return left_boundary

    def add_boundary_energy(self, potentials, mask, start, end):
        def expand_scalar_to_3d(x):
            # expand tensor from shape (x, ) to (1, 1, x)
            return tf.reshape(x, (1, 1, -1))

        start = tf.cast(expand_scalar_to_3d(start), potentials.dtype)
        end = tf.cast(expand_scalar_to_3d(end), potentials.dtype)
        if mask is None:
            potentials = tf.concat(
                [potentials[:, :1, :] + start, potentials[:, 1:, :]], axis=1
            )
            potentials = tf.concat(
                [potentials[:, :-1, :], potentials[:, -1:, :] + end], axis=1
            )
        else:
            mask = tf.keras.backend.expand_dims(tf.cast(mask, start.dtype), axis=-1)
            start_mask = tf.cast(self._compute_mask_left_boundary(mask), start.dtype)

            end_mask = tf.cast(self._compute_mask_right_boundary(mask), end.dtype)
            potentials = potentials + start_mask * start
            potentials = potentials + end_mask * end
        return potentials

    def get_viterbi_decoding(self, potentials, sequence_length):
        # decode_tags: A [batch_size, max_seq_len] matrix, with dtype `tf.int32`
        decode_tags, best_score = crf_decode(
            potentials, self.chain_kernel, sequence_length
        )

        return decode_tags, best_score

    def get_config(self):
        # used for loading model from disk
        config = {
            "units": self.units,
            "chain_initializer": tf.keras.initializers.serialize(
                self.chain_initializer
            ),
            "use_boundary": self.use_boundary,
            "boundary_initializer": tf.keras.initializers.serialize(
                self.boundary_initializer
            ),
            "use_kernel": self.use_kernel,
        }
        base_config = super().get_config()
        return {**base_config, **config}

    def compute_output_shape(self, input_shape):
        output_shape = input_shape[:2]
        return output_shape

    def compute_mask(self, input_, mask=None):
        """keep mask shape [batch_size, max_seq_len]"""
        return mask

    @property
    def _compute_dtype(self):
        # fixed output dtype from underline CRF functions
        return tf.int32

    def get_loss(self, y_true, y_pred):
        self.potentials = tf.keras.backend.cast(self.potentials, tf.float32)
        y_true = tf.keras.backend.cast(y_true, tf.int32)
        self.sequence_length = tf.keras.backend.cast(self.sequence_length,
                                                     tf.int32)
        # self.chain_kernel = tf.keras.backend.cast(self.chain_kernel,
        #                                           tf.float32)

        log_likelihood, _ = crf_log_likelihood(
            self.potentials, y_true, self.sequence_length, self.chain_kernel)

        return -log_likelihood

    def get_accuracy(self, y_true, y_pred):
        judge = tf.keras.backend.cast(
            tf.keras.backend.equal(y_pred, y_true), tf.keras.backend.floatx())
        if self.mask is None:
            return tf.keras.backend.mean(judge)
        else:
            mask = tf.keras.backend.cast(self.mask, tf.keras.backend.floatx())
            return (tf.keras.backend.sum(judge * mask) /
                    tf.keras.backend.sum(mask))

示例中,tensorflow_addons 版本采用 0.13.0

函数 API

对于 Functional API,我们需要单独定义我们的输入,其实就是调用 call 函数的入参。然后,我们需要创建一个输出对象,同时创建所有层,这些层相互关联并与输出相关联。最后,我们创建一个接受输入和输出作为参数的模型对象。代码仍然非常干净,但是我们在 Functional API 中有了更大的灵活性。

如下示例用 Function API 的形式构建经典的文本分类模型 TextCNN,代码如下:

import tensorflow as tf


def build_model(vocab_size: int, max_len=100, emb_dim=128, class_num=13):
    inputs = tf.keras.layers.Input(shape=(max_len,), dtype=tf.int32, name='word_id_input')
    embedding = tf.keras.layers.Embedding(vocab_size + 1, emb_dim, input_length=max_len, name='embedding')(inputs)
    cnn1 = tf.keras.layers.Conv1D(emb_dim, 2, padding='same', strides=1, activation=tf.keras.activations.relu)(
        embedding)
    cnn1 = tf.keras.layers.MaxPooling1D(pool_size=1, padding='same')(cnn1)
    cnn2 = tf.keras.layers.Conv1D(emb_dim, 3, padding='same', strides=1, activation=tf.keras.activations.relu)(
        embedding)
    cnn2 = tf.keras.layers.MaxPooling1D(pool_size=1, padding='same')(cnn2)
    cnn3 = tf.keras.layers.Conv1D(emb_dim, 4, padding='same', strides=1, activation=tf.keras.activations.relu)(
        embedding)
    cnn3 = tf.keras.layers.MaxPooling1D(pool_size=1, padding='same')(cnn3)
    cnn = tf.keras.layers.concatenate([cnn1, cnn2, cnn3], axis=-1)
    flat = tf.keras.layers.Flatten()(cnn)
    drop = tf.keras.layers.Dropout(0.1)(flat)
    intention_output = tf.keras.layers.Dense(class_num, activation=tf.keras.activations.softmax, name='softmax_layer')(
        drop)

    model = tf.keras.Model(inputs=inputs, outputs=intention_output, name="intention-model")

    model.summary()
    model.compile(tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.CategoricalCrossentropy(),
                  metrics=tf.keras.metrics.CategoricalAccuracy()
                  )
    return model