【实践】广告ctr模型之Deep cross network (dcn)
广告ctr模型可用的深度模型其本质架构都一样(可见https://blog.csdn.net/dengxing1234/article/details/79916532),这也是限制了模型的发展路线。Deep cross network在广告ctr模型也是应用很常见,它聚焦于解决特征工程的问题,对比paper中提到:【DNN可以自动地学习特征地交互作用,然而,它们隐式地的生成所有的特征交互,这对于学习所有类型的交叉特征不一定有效。于是提出了一种能够保持深度神经网络良好收益的深度交叉网络(DCN),除此之外,它还引入了一个新的交叉网络,更有效地学习在一定限度下的特征相互作用,更有甚,DCN在每一层确切地应用交叉特征而不需要人工特征工程,这相比于DNN模型增加地额外地复杂度可以忽略不计】。自己按照paper和高级的tensorflow api,实现l一版dcn,源码文件3个都已共享。希望有问题各位同行人能指出交流。
原版paper:https://arxiv.org/abs/1708.05123
my_core.py
from tensorflow.python.framework import tensor_shape
from tensorflow.python.layers import base
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import standard_ops
from tensorflow.python.framework import ops
class CrossLayer(base.Layer):
def __init__(self,
use_bias=True,
kernel_initializer=None,
bias_initializer=init_ops.zeros_initializer(),
kernel_regularizer=None,
bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None,
bias_constraint=None,
trainable=True,
name=None,
**kwargs):
super(CrossLayer, self).__init__(trainable=trainable, name=name,
activity_regularizer=activity_regularizer,
**kwargs)
self.use_bias = use_bias
self.kernel_initializer = kernel_initializer
self.bias_initializer = bias_initializer
self.kernel_regularizer = kernel_regularizer
self.bias_regularizer = bias_regularizer
self.kernel_constraint = kernel_constraint
self.bias_constraint = bias_constraint
self.input_spec = base.InputSpec(min_ndim=2)
def build(self, input_shape):
input_shape = tensor_shape.TensorShape(input_shape)
if input_shape[-1].value is None:
raise ValueError('The last dimension of the inputs to `DCN` ''should be defined. Found `None`.')
self.dim = input_shape[-1].value
self.input_spec = base.InputSpec(min_ndim=2,
axes={-1: self.dim})
self.kernel = self.add_variable('kernel',
shape=[1, self.dim],
initializer=self.kernel_initializer,
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint,
dtype=self.dtype,
trainable=True)
if self.use_bias:
self.bias = self.add_variable('bias',
shape=[1,self.dim],
initializer=self.bias_initializer,
regularizer=self.bias_regularizer,
constraint=self.bias_constraint,
dtype=self.dtype,
trainable=True)
else:
self.bias = None
self.built = True
def call(self, inputs, **kwargs):
x_0 = kwargs["x_0"]
x_l = ops.convert_to_tensor(inputs, dtype=self.dtype)
scalar = standard_ops.matmul(x_l, self.kernel, transpose_b=True)
dot_ = standard_ops.multiply(x_0, scalar)
dot_ = standard_ops.add(dot_, self.bias)
return standard_ops.add(dot_, x_l)
def compute_output_shape(self, input_shape):
input_shape = tensor_shape.TensorShape(input_shape)
input_shape = input_shape.with_rank_at_least(2)
if input_shape[-1].value is None:
raise ValueError(
'The innermost dimension of input_shape must be defined, but saw: %s'
% input_shape)
return input_shape
def cross_layer(
inputs,
x_0,
use_bias=True,
kernel_initializer=init_ops.truncated_normal_initializer(),
bias_initializer=init_ops.zeros_initializer(),
kernel_regularizer=None,
bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None,
bias_constraint=None,
trainable=True,
name=None,
reuse=None
):
"""Functional interface for the dense cross layer."""
layer = CrossLayer(
use_bias=use_bias,
kernel_initializer=kernel_initializer,
bias_initializer=bias_initializer,
kernel_regularizer=kernel_regularizer,
bias_regularizer=bias_regularizer,
activity_regularizer=activity_regularizer,
kernel_constraint=kernel_constraint,
bias_constraint=bias_constraint,
trainable=trainable,
name=name,
dtype=inputs.dtype.base_dtype,
_reuse=reuse)
return layer.apply(inputs, x_0=x_0)
dcn.py
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import six
from tensorflow.python.estimator import estimator
from tensorflow.python.estimator import model_fn
from tensorflow.python.estimator.canned import head as head_lib
from tensorflow.python.estimator.canned import optimizers
from tensorflow.python.feature_column import feature_column as feature_column_lib
from tensorflow.python.layers import core as core_layers
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import nn
from tensorflow.python.ops import partitioned_variables
from tensorflow.python.ops import variable_scope
from tensorflow.python.summary import summary
from tensorflow.python.training import training_util
import tensorflow as tf
from model_extension import my_core
_LEARNING_RATE = 0.005
def _add_hidden_layer_summary(value, tag):
summary.scalar('%s/fraction_of_zero_values' % tag, nn.zero_fraction(value))
summary.histogram('%s/activation' % tag, value)
def _dcn_logit_fn_builder(units,
hidden_units,
feature_columns,
activation_fn,
dropout,
num_cross_layers,
input_layer_partitioner):
"""Function builder for a dcn logit_fn"""
if not (isinstance(units, int) or isinstance(units, list)):
raise ValueError('units must be an int or list. Given type: {}'.format(
type(units)))
def dcn_logit_fn(features, mode):
with variable_scope.variable_scope(
'input_from_feature_columns',
values=tuple(six.itervalues(features)),
partitioner=input_layer_partitioner):
x0 = feature_column_lib.input_layer(
features=features, feature_columns=feature_columns)
deep_net = x0
cross_net = x0
for layer_id, num_hidden_units in enumerate(hidden_units):
with variable_scope.variable_scope(
'hiddenlayer_%d' % layer_id, values=(deep_net,)) as hidden_layer_scope:
deep_net = core_layers.dense(
deep_net,
units=num_hidden_units,
activation=activation_fn,
kernel_initializer=init_ops.glorot_uniform_initializer(),
name=hidden_layer_scope)
if dropout is not None and mode == model_fn.ModeKeys.TRAIN:
cross_net = core_layers.dropout(deep_net, rate=dropout, training=True)
_add_hidden_layer_summary(deep_net, hidden_layer_scope.name)
if num_cross_layers == 0:
last_layer = deep_net
else:
for layer_id in range(num_cross_layers):
with variable_scope.variable_scope(
'crosslayer_%d' % layer_id, values=(cross_net,)) as cross_layer_scope:
cross_net = my_core.cross_layer(
cross_net,
x0,
name=cross_layer_scope,
)
_add_hidden_layer_summary(cross_net, cross_layer_scope.name)
last_layer = tf.concat([cross_net,deep_net], 1)
if isinstance(units, int):
with variable_scope.variable_scope(
'logits', values=(last_layer,)) as logits_scope:
logits = core_layers.dense(
last_layer,
units=units,
activation=None,
kernel_initializer=init_ops.glorot_uniform_initializer(),
name=logits_scope)
_add_hidden_layer_summary(logits, logits_scope.name)
else:
logits = []
for head_index, logits_dimension in enumerate(units):
with variable_scope.variable_scope(
'logits_head_{}'.format(head_index), values=(last_layer,)) as logits_scope:
these_logits = core_layers.dense(
last_layer,
units=logits_dimension,
activation=None,
kernel_initializer=init_ops.glorot_uniform_initializer(),
name=logits_scope)
_add_hidden_layer_summary(these_logits, logits_scope.name)
logits.append(these_logits)
return logits
return dcn_logit_fn
def _dcn_model_fn(
features,
labels,
mode,
head,
hidden_units,
feature_columns,
optimizer='Adagrad',
activation_fn=nn.relu,
dropout=None,
num_cross_layers = None,
input_layer_partitioner=None, config=None):
"""Deep Neural Net model_fn"""
if not isinstance(features, dict):
raise ValueError('features should be a dictionary of `Tensor`s. '
'Given type: {}'.format(type(features)))
optimizer = optimizers.get_optimizer_instance(
optimizer, learning_rate=_LEARNING_RATE)
num_ps_replicas = config.num_ps_replicas if config else 0
partitioner = partitioned_variables.min_max_variable_partitioner(
max_partitions=num_ps_replicas)
with variable_scope.variable_scope(
'dcn',
values=tuple(six.itervalues(features)),
partitioner=partitioner):
input_layer_partitioner = input_layer_partitioner or (
partitioned_variables.min_max_variable_partitioner(
max_partitions=num_ps_replicas,
min_slice_size=64 << 20))
logit_fn = _dcn_logit_fn_builder(
units=head.logits_dimension,
hidden_units=hidden_units,
feature_columns=feature_columns,
activation_fn=activation_fn,
dropout=dropout,
num_cross_layers = num_cross_layers,
input_layer_partitioner=input_layer_partitioner)
logits = logit_fn(features=features, mode=mode)
def _train_op_fn(loss):
"""Returns the op to optimize the loss."""
return optimizer.minimize(
loss,
global_step=training_util.get_global_step())
return head.create_estimator_spec(
features=features,
mode=mode,
labels=labels,
train_op_fn=_train_op_fn,
logits=logits)
class DCNClassifier(estimator.Estimator):
def __init__(self,
hidden_units,
feature_columns,
model_dir=None,
n_classes=2,
weight_column=None,
label_vocabulary=None,
optimizer='Adagrad',
activation_fn=nn.relu,
dropout=None,
num_cross_layers=0,
input_layer_partitioner=None,
config=None):
if n_classes == 2:
head = head_lib._binary_logistic_head_with_sigmoid_cross_entropy_loss( # pylint: disable=protected-access
weight_column=weight_column,
label_vocabulary=label_vocabulary)
else:
head = head_lib._multi_class_head_with_softmax_cross_entropy_loss( # pylint: disable=protected-access
n_classes, weight_column=weight_column,
label_vocabulary=label_vocabulary)
def _model_fn(features, labels, mode, config):
return _dcn_model_fn(
features=features,
labels=labels,
mode=mode,
head=head,
hidden_units=hidden_units,
feature_columns=tuple(feature_columns or []),
optimizer=optimizer,
activation_fn=activation_fn,
dropout=dropout,
num_cross_layers=num_cross_layers,
input_layer_partitioner=input_layer_partitioner,
config=config)
super(DCNClassifier, self).__init__(
model_fn=_model_fn, model_dir=model_dir, config=config)
deep_cross.py
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import shutil
import sys
from model_extension.dcn import DCNClassifier
import tensorflow as tf
_CSV_COLUMNS_I = [("I"+str(x)) for x in range(13)]
_CSV_COLUMNS_C = [("C"+str(x)) for x in range(26)]
_CSV_COLUMNS = _CSV_COLUMNS_I + _CSV_COLUMNS_C
_CSV_COLUMNS.insert(0,"label")
_CSV_COLUMN_DEFAULTS_I=[[0.0]for x in range(13)]
_CSV_COLUMN_DEFAULTS_C=[['']for x in range(26)]
_CSV_COLUMN_DEFAULTS = _CSV_COLUMN_DEFAULTS_I + _CSV_COLUMN_DEFAULTS_C
_CSV_COLUMN_DEFAULTS.insert(0,[0])
parser = argparse.ArgumentParser()
parser.add_argument(
'--model_dir', type=str, default='../tmp/criteo_model/deep_cross',
help='Base directory for the model.')
parser.add_argument(
'--model_type', type=str, default='deep_cross',
help="Valid model types: {'deep_cross'}.")
parser.add_argument(
'--train_epochs', type=int, default=40, help='Number of training epochs.')
parser.add_argument(
'--epochs_per_eval', type=int, default=2,
help='The number of training epochs to run between evaluations.')
parser.add_argument(
'--batch_size', type=int, default=40, help='Number of examples per batch.')
parser.add_argument(
'--train_data', type=str, default='../data/criteo_data/criteo_train.txt',
help='Path to the training data.')
parser.add_argument(
'--test_data', type=str, default='../data/criteo_data/criteo_test.txt',
help='Path to the test data.')
_NUM_EXAMPLES = {
'train': 20000000,
'validation': 10000000,
}
def build_model_columns():
"""Builds a set of deep feature columns."""
numeric_feature = [tf.feature_column.numeric_column('I'+str(x)) for x in range(13)]
categorical_feature = [tf.feature_column.categorical_column_with_hash_bucket(
'C'+str(x), hash_bucket_size=1000) for x in range(26)]
embedding_feature = [tf.feature_column.embedding_column(x, dimension=8) for x in categorical_feature]
deep_columns = embedding_feature + numeric_feature
return deep_columns
def build_estimator(model_dir, model_type):
"""Build an estimator appropriate for the given model type."""
deep_columns = build_model_columns()
hidden_units = [2]
# Create a tf.estimator.RunConfig to ensure the model is run on CPU, which
# trains faster than GPU for this model.
run_config = tf.estimator.RunConfig().replace(
session_config=tf.ConfigProto(device_count={'GPU': 0}))
if model_type == 'deep_cross':
return DCNClassifier(
model_dir=model_dir,
hidden_units=hidden_units,
feature_columns=deep_columns,
optimizer=tf.train.ProximalAdagradOptimizer(
learning_rate=0.001,
l2_regularization_strength=0.001
),
num_cross_layers=2,
config=run_config)
def input_fn(data_file, num_epochs, shuffle, batch_size):
"""Generate an input function for the Estimator."""
assert tf.gfile.Exists(data_file), (
'%s not found. Please make sure you have either run data_download.py or '
'set both arguments --train_data and --test_data.' % data_file)
def parse_csv(value):
print('Parsing', data_file)
columns = tf.decode_csv(value, record_defaults=_CSV_COLUMN_DEFAULTS)
features = dict(zip(_CSV_COLUMNS, columns))
labels = features.pop('label')
return features, labels
# Extract lines from input files using the Dataset API.
dataset = tf.data.TextLineDataset(data_file)
if shuffle:
dataset = dataset.shuffle(buffer_size=_NUM_EXAMPLES['train'])
dataset = dataset.map(parse_csv, num_parallel_calls=5)
# We call repeat after shuffling, rather than before, to prevent separate
# epochs from blending together.
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size)
iterator = dataset.make_one_shot_iterator()
features, labels = iterator.get_next()
return features, labels
def main(unused_argv):
# Clean up the model directory if present
shutil.rmtree(FLAGS.model_dir, ignore_errors=True)
model = build_estimator(FLAGS.model_dir, FLAGS.model_type)
train_spec = tf.estimator.TrainSpec(input_fn=lambda: input_fn(
FLAGS.train_data, 2, True, 500),max_steps=8000)
eval_spec = tf.estimator.EvalSpec(input_fn=lambda: input_fn(
FLAGS.test_data, 1, False, 500),steps=2000)
tf.estimator.train_and_evaluate(model, train_spec, eval_spec)
if __name__ == '__main__':
tf.logging.set_verbosity(tf.logging.INFO)
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)