一、构建模型

  • 使用Sequential按层顺序构建模型
  • 使用函数式API构建任意结构模型
  • 继承Model基类构建自定义模型。

1、Sequential按层顺序创建模型(方便,网络显示清晰)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
tf.keras.backend.clear_session()

model = models.Sequential()

model.add(layers.Embedding(MAX_WORDS,7,input_length=MAX_LEN))
model.add(layers.Conv1D(filters = 64,kernel_size = 5,activation = "relu"))
model.add(layers.MaxPool1D(2))
model.add(layers.Conv1D(filters = 32,kernel_size = 3,activation = "relu"))
model.add(layers.MaxPool1D(2))
model.add(layers.Flatten())
model.add(layers.Dense(1,activation = "sigmoid"))

model.compile(optimizer='Nadam',
loss='binary_crossentropy',
metrics=['accuracy',"AUC"])

model.summary()

2、函数式API创建任意结构模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
tf.keras.backend.clear_session()

inputs = layers.Input(shape=[MAX_LEN])
x = layers.Embedding(MAX_WORDS,7)(inputs)

branch1 = layers.SeparableConv1D(64,3,activation="relu")(x)
branch1 = layers.MaxPool1D(3)(branch1)
branch1 = layers.SeparableConv1D(32,3,activation="relu")(branch1)
branch1 = layers.GlobalMaxPool1D()(branch1)

branch2 = layers.SeparableConv1D(64,5,activation="relu")(x)
branch2 = layers.MaxPool1D(5)(branch2)
branch2 = layers.SeparableConv1D(32,5,activation="relu")(branch2)
branch2 = layers.GlobalMaxPool1D()(branch2)

branch3 = layers.SeparableConv1D(64,7,activation="relu")(x)
branch3 = layers.MaxPool1D(7)(branch3)
branch3 = layers.SeparableConv1D(32,7,activation="relu")(branch3)
branch3 = layers.GlobalMaxPool1D()(branch3)

concat = layers.Concatenate()([branch1,branch2,branch3])
outputs = layers.Dense(1,activation = "sigmoid")(concat)

model = models.Model(inputs = inputs,outputs = outputs)

model.compile(optimizer='Nadam',
loss='binary_crossentropy',
metrics=['accuracy',"AUC"])

model.summary()

3、Model子类化创建自定义模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 先自定义一个残差模块,为自定义Layer

class ResBlock(layers.Layer):
def __init__(self, kernel_size, **kwargs):
super(ResBlock, self).__init__(**kwargs)
self.kernel_size = kernel_size

def build(self,input_shape):
self.conv1 = layers.Conv1D(filters=64,kernel_size=self.kernel_size,
activation = "relu",padding="same")
self.conv2 = layers.Conv1D(filters=32,kernel_size=self.kernel_size,
activation = "relu",padding="same")
self.conv3 = layers.Conv1D(filters=input_shape[-1],
kernel_size=self.kernel_size,activation = "relu",padding="same")
self.maxpool = layers.MaxPool1D(2)
super(ResBlock,self).build(input_shape) # 相当于设置self.built = True

def call(self, inputs):
x = self.conv1(inputs)
x = self.conv2(x)
x = self.conv3(x)
x = layers.Add()([inputs,x])
x = self.maxpool(x)
return x

#如果要让自定义的Layer通过Functional API 组合成模型时可以序列化,需要自定义get_config方法。
def get_config(self):
config = super(ResBlock, self).get_config()
config.update({'kernel_size': self.kernel_size})
return config

# 测试ResBlock
resblock = ResBlock(kernel_size = 3)
resblock.build(input_shape = (None,200,7))
resblock.compute_output_shape(input_shape=(None,200,7))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 自定义模型,实际上也可以使用Sequential或者FunctionalAPI

class ImdbModel(models.Model):
def __init__(self):
super(ImdbModel, self).__init__()

def build(self,input_shape):
self.embedding = layers.Embedding(MAX_WORDS,7)
self.block1 = ResBlock(7)
self.block2 = ResBlock(5)
self.dense = layers.Dense(1,activation = "sigmoid")
super(ImdbModel,self).build(input_shape)

def call(self, x):
x = self.embedding(x)
x = self.block1(x)
x = self.block2(x)
x = layers.Flatten()(x)
x = self.dense(x)
return(x)
1
2
3
4
5
6
7
8
9
10
# 测试
tf.keras.backend.clear_session()

model = ImdbModel()
model.build(input_shape =(None,200))
model.summary()

model.compile(optimizer='Nadam',
loss='binary_crossentropy',
metrics=['accuracy',"AUC"])

二、训练模型

模型的训练主要有内置fit方法、内置tran_on_batch方法、自定义训练循环。

1、内置fit方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
tf.keras.backend.clear_session()
def create_model():

model = models.Sequential()
model.add(layers.Embedding(MAX_WORDS,7,input_length=MAX_LEN))
model.add(layers.Conv1D(filters = 64,kernel_size = 5,activation = "relu"))
model.add(layers.MaxPool1D(2))
model.add(layers.Conv1D(filters = 32,kernel_size = 3,activation = "relu"))
model.add(layers.MaxPool1D(2))
model.add(layers.Flatten())
model.add(layers.Dense(CAT_NUM,activation = "softmax"))
return(model)

def compile_model(model):
model.compile(optimizer=optimizers.Nadam(),
loss=losses.SparseCategoricalCrossentropy(),
metrics=[metrics.SparseCategoricalAccuracy(),metrics.SparseTopKCategoricalAccuracy(5)])
return(model)

model = create_model()
model.summary()
model = compile_model(model)

# 构建模型
history = model.fit(ds_train,validation_data = ds_test,epochs = 10)

2、内置train_on_batch方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def train_model(model,ds_train,ds_valid,epoches):

for epoch in tf.range(1,epoches+1):
model.reset_metrics()

# 在后期降低学习率
if epoch == 5:
model.optimizer.lr.assign(model.optimizer.lr/2.0)
tf.print("Lowering optimizer Learning Rate...\n\n")

for x, y in ds_train:
train_result = model.train_on_batch(x, y)

for x, y in ds_valid:
valid_result = model.test_on_batch(x, y,reset_metrics=False)

if epoch%1 ==0:
printbar()
tf.print("epoch = ",epoch)
print("train:",dict(zip(model.metrics_names,train_result)))
print("valid:",dict(zip(model.metrics_names,valid_result)))
print("")

# 构建模型
train_model(model,ds_train,ds_test,10)

3、自定义训练循环

自定义训练循环无需编译模型,直接利用优化器根据损失函数反向传播迭代参数,拥有最高的灵活性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
optimizer = optimizers.Nadam()
loss_func = losses.SparseCategoricalCrossentropy()

train_loss = metrics.Mean(name='train_loss')
train_metric = metrics.SparseCategoricalAccuracy(name='train_accuracy')

valid_loss = metrics.Mean(name='valid_loss')
valid_metric = metrics.SparseCategoricalAccuracy(name='valid_accuracy')

@tf.function
def train_step(model, features, labels):
with tf.GradientTape() as tape:
predictions = model(features,training = True)
loss = loss_func(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))

train_loss.update_state(loss)
train_metric.update_state(labels, predictions)


@tf.function
def valid_step(model, features, labels):
predictions = model(features)
batch_loss = loss_func(labels, predictions)
valid_loss.update_state(batch_loss)
valid_metric.update_state(labels, predictions)


def train_model(model,ds_train,ds_valid,epochs):
for epoch in tf.range(1,epochs+1):

for features, labels in ds_train:
train_step(model,features,labels)

for features, labels in ds_valid:
valid_step(model,features,labels)

logs = 'Epoch={},Loss:{},Accuracy:{},Valid Loss:{},Valid Accuracy:{}'

if epoch%1 ==0:
printbar()
tf.print(tf.strings.format(logs,
(epoch,train_loss.result(),train_metric.result(),valid_loss.result(),valid_metric.result())))
tf.print("")

train_loss.reset_states()
valid_loss.reset_states()
train_metric.reset_states()
valid_metric.reset_states()

# 构建模型
train_model(model,ds_train,ds_test,10)