Wide and Deep Learning for Recommender Systems解读

Google在去年6月份在arxiv上放出了”Wide & Deep Learning for Recommender Systems”这篇文章,应用场景是Google Play上App安装预测,线上效果提升显著(相比较于Wide only和Deep only的model)。与此同时,Google也开源了这一模型框架并将其集成在Tensorflow的高级封装Tflearn中,今年开年的谷歌开发者大会上也专门有一个section是讲wide and deep model的,作者的意图也很明显,一方面推广tensorflow,另一方面是显示模型的强大。

通读文章后,发现其实模型的基本原理很简单,就是wide model + deep model。分别来讲,wide model就是一个LR,在特征方面除了原始特征外还有分类特征稀疏表达后的交叉特征,例如将分类特征做完one-hot后再进行cross product。个人理解,这里一方面是利用类似于FM模型原理来增强分类特征的特征交互(co-occurrence),另一方面是利用LR对高维稀疏特征的学习能力,而作者把wide model所具备的能力称为“memorization”;而deep model则是一个DNN,特征上除了原始特征还增加了分类特征的embedding,这个embedding在模型中属于独立的一层,embedding后的向量也是通过不断迭代学习出来的。将高维稀疏分类特征映射到低维embedding特征这种方式有助于模型进行“generalization”。Memerization和Generalization这两个概念中文还真没找到特别合适的诠释,如果非要翻译一下,我觉得应该是推理和演绎,一个是通过特征交互关系来训练浅层模型,另一个则是通过特征在映射空间中的信息训练深层模型。

模型结构采用joint的方式而非传统的ensemble方式。如果是ensemble方式,那么这两个模型就针对label进行单独训练,然后再加权到一起;而joint方式则是将这两个模型的输出加起来,然后再针对label进行联合训练。这样的好处是在train model的时候可以同时最优化两个model的参数,而且两个model可以起到互相补充的作用。下面的公式也很好的解释了wide and deep model的结构原理,即两个model的output在通过sigmoid函数之前把结果相加,然后再经过sigmoid实现分类。这里$x$指原始特征,$\phi(x)$分别表示wide模型的cross product feature和deep模型的embedding feature,而$w_{deep}$则泛指DNN各层weights和bias集合表示。

最终模型在离线评测上效果并不明显,但在在线评测上提升还算显著。前几天在电梯里听到广告部同事说他们也在搞这个模型,离线效果提升了10%+,但在线serving技术上目前比较头疼,但也不知道具体是什么指标提升了10%+。但在我们团队内部客户拉新预测问题的离线应用上,同样的数据效果只和xgboost持平。另外,官方提供的tutorial原生代码并不能很好的应用于大数据量,于是进行了改写,读取数据方式变成了队列读取,也是参考了stackoverflow上的一些反馈。核心就是wide_and_deep函数,cross product的实现方式是直接在预处理数据时对分类特征进行字符串拼接,然后再做one-hot,相对contrib中crossed_column的实现方式略显复杂,但以个人能力也只能先这样做,日后再去探索。另外,由于目前团队业务只涉及线下模型,因此对于模型的线上更新并没有太多关注,但大的技术框架来看应该也是用tf serving的方式实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import pandas as pd
import tensorflow as tf
import tensorflow.contrib.learn as tf_learn
import tensorflow.contrib.layers as tf_layers
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
from sklearn.cross_validation import train_test_split
import os
import numpy as np
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import StandardScaler
from feat import CONTINUOUS_COLUMNS
from feat import CATEGORICAL_COLUMNS
from feat import COLUMNS
def add_columns(x):
return ':'.join(x)
# Define the column names for the data sets.
LABEL_COLUMN = 'target'
#second order
CROSSED_COLUMNS = []
for i in range(len(CATEGORICAL_COLUMNS) -1):
for j in range(i+1, len(CATEGORICAL_COLUMNS)):
CROSSED_COLUMNS.append([CATEGORICAL_COLUMNS[i], CATEGORICAL_COLUMNS[j]])
CATEGORICAL_COLUMNS_DNN = CATEGORICAL_COLUMNS[:]
CATEGORICAL_COLUMNS += map(add_columns, CROSSED_COLUMNS)
CATEGORICAL_ID_COLUMNS = [col + '_ids' for col in CATEGORICAL_COLUMNS]
HIDDEN_UNITS = [512, 512, 512]
CATEGORICAL_EMBED_SIZE = 10
LABEL_ENCODERS = {}
def pandas_input_fn(X, y=None, batch_size=1024, num_epochs=None):
def input_fn():
if y is not None:
X['target'] = y
queue = tf_learn.dataframe.queues.feeding_functions.enqueue_data(
X, 1000, shuffle=num_epochs is None, num_epochs=num_epochs)
if num_epochs is None:
features = queue.dequeue_many(batch_size)
else:
features = queue.dequeue_up_to(batch_size)
features = dict(zip(['index'] + list(X.columns), features))
if y is not None:
target = features.pop('target')
return features, target
return features
return input_fn
def encode_categorical_cross(df):
global LABEL_ENCODERS
for col in CATEGORICAL_COLUMNS:
if ":" in col:
df[col] = df[col.split(":")[0]].fillna(-1).astype(str) + ":" + df[col.split(":")[1]].fillna(-1).astype(str)
else:
df[col] = df[col].fillna(-1).astype(str)
encoder = LabelEncoder().fit(df[col])
df[col + '_ids'] = encoder.transform(df[col])
LABEL_ENCODERS[col] = encoder
for col in CATEGORICAL_COLUMNS:
df.pop(col)
return df, LABEL_ENCODERS
def process_input_df(df):
df, label_encoders = encode_categorical_cross(df)
for col in CATEGORICAL_COLUMNS:
y = df.pop(LABEL_COLUMN)
X = df[CATEGORICAL_ID_COLUMNS + CONTINUOUS_COLUMNS].fillna(0)
return X, y
def wide_and_deep(features, target, hidden_units=HIDDEN_UNITS):
global LABEL_ENCODERS
target = tf.one_hot(target, 2, 1.0, 0.0)
# DNN
final_features_nn = [tf.expand_dims(tf.cast(features[col], tf.float32), 1) for
col in CONTINUOUS_COLUMNS]
# Embed categorical variables into distributed representation.
for col in CATEGORICAL_COLUMNS_DNN:
feature_tmp = tf_learn.ops.categorical_variable(
features[col + '_ids'],
len(LABEL_ENCODERS[col].classes_),
embedding_size=CATEGORICAL_EMBED_SIZE,
name=col)
final_features_nn.append(feature_tmp)
# Concatenate all features into one vector.
features_nn = tf.concat(1, final_features_nn)
logits_nn = tf_layers.stack(features_nn,
tf_layers.fully_connected,
stack_args=hidden_units,
activation_fn=tf.nn.relu)
# LR
final_features_lr = [tf.expand_dims(tf.cast(features[col], tf.float32), 1) for
col in CONTINUOUS_COLUMNS]
for col in CATEGORICAL_COLUMNS:
final_features_lr.append(tf.one_hot(features[col + '_ids'],
len(LABEL_ENCODERS[col].classes_),
on_value = 1.0,
off_value = 0.0))
logits_lr = tf_layers.stack(tf.concat(1, final_features_lr),
tf_layers.fully_connected,
stack_args=[1],
activation_fn=None)
# add logits
logits = logits_lr + logits_nn
prediction, loss = tf_learn.models.logistic_regression(logits, target)
train_op = tf_layers.optimize_loss(loss,
tf.contrib.framework.get_global_step(),
optimizer='Adam',
learning_rate=0.001)
return prediction[:,1], loss, train_op
def train(X, y, steps=100):
print("model dir: ", model_dir)
classifier = tf_learn.Estimator(model_fn=wide_and_deep, model_dir=model_dir)
classifier.fit(input_fn=pandas_input_fn(X, y), steps=steps)
return classifier
def predict(classifier, X):
return list(classifier.predict(input_fn=pandas_input_fn(X, num_epochs=1),
as_iterable=True))
if __name__ == '__main__':
model_dir = "./wnd"
os.system("rm -rf ./wnd")
trainFile = 'train.csv'
testFile = 'test.csv'
data_train = pd.read_csv(trainFile, names=COLUMNS) # LOAD DATA
data_test = pd.read_csv(testFile, names=COLUMNS) # LOAD DATA
train_size = data_train.shape[0]
X_train, y_train = process_input_df(data_train)
X_test, y_test = process_input_df(data_test)
# data scale
data_continuous = pd.concat([X_train[CONTINUOUS_COLUMNS], X_test[CONTINUOUS_COLUMNS]])
scaler = StandardScaler()
data_continuous_scale = scaler.fit_transform(data_continuous)
X_train[CONTINUOUS_COLUMNS] = pd.DataFrame(data_continuous_scale[:train_size])
X_test[CONTINUOUS_COLUMNS] = pd.DataFrame(data_continuous_scale[train_size:])
classifier = train(X_train, y_train, steps=5000)
pred = predict(classifier, X_test)
print("auc", roc_auc_score(y_test, np.array(pred)))