算法大赛-天池大赛-阿里云的赛制

赛题理解

数据

该数据来自某新闻APP平台的用户交互数据,包括30万用户,近300万次点击,共36万多篇不同的新闻文章,同时每篇新闻文章有对应的embedding向量表示。为了保证比赛的公平性,从中抽取20万用户的点击日志数据作为训练集,5万用户的点击日志数据作为测试集A,5万用户的点击日志数据作为测试集B

数据表

train_click_log.csv:训练集用户点击日志

testA_click_log.csv:测试集用户点击日志

articles.csv:新闻文章信息数据表

articles_emb.csv:新闻文章embedding向量表示

sample_submit.csv:提交样例文件

字段表

Field Description
user_id 用户id
click_article_id 点击文章id
click_timestamp 点击时间戳
click_environment 点击环境
click_deviceGroup 点击设备组
click_os 点击操作系统
click_country 点击城市
click_region 点击地区
click_referrer_type 点击来源类型
article_id 文章id,与click_article_id相对应
category_id 文章类型id
created_at_ts 文章创建时间戳
words_count 文章字数
emb_1,emb_2,…,emb_249 文章embedding向量表示

评价方式

根据sample_submit.csv可知最后需要针对每个用户,给出五篇文章的推荐结果,按照点击概率从前往后排序

真实的每个用户最后一次点击的文章只会有一篇的真实答案,所以就看推荐的这五篇里面是否有命中真实答案的

提交结果:user1, article1, article2, article3, article4, article5

评价指标为MRR(Mean Reciprocal Rank),公式如下:
$$
score(\text{user}) = \sum_{k=1}^5 \frac{s(\text{user}, k)}{k}
$$
如果article1是用户点击文章,也就是article1命中,则$s(\text{user1},1)=1, s(\text{user1},2-4)$都是0

如果article2是用户点击的文章,则$s(\text{user},2)=1/2,s(\text{user},1,3,4,5)$都是0

如果都没命中,则$score(\text{user1})=0$

所以目标就是希望命中的结果尽量靠前,分数比较高

问题分析

给到的数据相当于是用户日志,不是那种特征+标签的数据

需要把问题变为一个监督学习的问题(特征+标签)

由于需要预测用户最后一次点击的新闻文章,简单来看是多分类问题,但是文章数有36万,过于庞大,需要将问题转化一下

转化为预测出某个用户最后一次对于某一篇文章会进行点击的概率,变为软分类,分类的标签就是用户是否会点击某篇文章

但接下来的问题是

  • 如何转成监督学习问题?
  • 训练集和测试集怎么制作?
  • 能利用哪些特征?
  • 可以尝试哪些模型?

数据分析

数据分析的价值主要在于熟悉了解整个数据集的基本情况包括每个文件里有哪些数据,具体的文件中的每个字段表示什么实际含义,以及数据集中特征之间的相关性,在推荐场景下主要就是分析用户本身的基本属性,文章基本属性,以及用户和文章交互的一些分布,这些都有利于后面的召回策略的选择,以及特征工程

导包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
%matplotlib inline
import gc
import os
import re
import sys
import warnings
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

plt.rc('font', size=13)
warnings.filterwarnings("ignore")

data_path = Path("tcdata")

读取数据

1
2
3
4
5
6
7
8
# 训练集
trn_click = pd.read_csv(data_path / 'train_click_log.csv')
item_df = pd.read_csv(data_path / 'articles.csv')
item_df = item_df.rename(columns={'article_id': 'click_article_id'}) #重命名,方便后续match
item_emb_df = pd.read_csv(data_path / 'articles_emb.csv')

# 测试集
tst_click = pd.read_csv(data_path / 'testA_click_log.csv')

数据预处理

计算用户点击rank

1
2
3
4
5
# 对每个用户的点击时间戳进行排序
# 降序,最新的时间在前
trn_click['rank'] = trn_click.groupby(['user_id'])['click_timestamp'].rank(
ascending=False).astype(int)
tst_click['rank'] = tst_click.groupby(['user_id'])['click_timestamp'].rank(ascending=False).astype(int)

为了统计点击次数,需要选择一列不会为NaN的,时间戳比较合适

1
2
3
# 计算用户点击文章的次数,并添加新的一列
trn_click['click_cnts'] = trn_click.groupby(['user_id'])['click_timestamp'].transform('count')
tst_click['click_cnts'] = tst_click.groupby(['user_id'])['click_timestamp'].transform('count')

数据浏览

训练集用户点击日志

item_df中匹配click_article_id的信息合并

1
2
trn_click = trn_click.merge(item_df, how='left', on=['click_article_id'])
# left: 左连接,trn_click 的每一行一定保留

查看合并后的情况

1
trn_click.info()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1112623 entries, 0 to 1112622
Data columns (total 20 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 user_id 1112623 non-null int64
1 click_article_id 1112623 non-null int64
2 click_timestamp 1112623 non-null int64
3 click_environment 1112623 non-null int64
4 click_deviceGroup 1112623 non-null int64
5 click_os 1112623 non-null int64
6 click_country 1112623 non-null int64
7 click_region 1112623 non-null int64
8 click_referrer_type 1112623 non-null int64
9 rank 1112623 non-null int64
10 click_cnts 1112623 non-null int64
11 category_id_x 1112623 non-null int64
12 created_at_ts_x 1112623 non-null int64
13 words_count_x 1112623 non-null int64
14 category_id_y 1112623 non-null int64
15 created_at_ts_y 1112623 non-null int64
16 words_count_y 1112623 non-null int64
17 category_id 1112623 non-null int64
18 created_at_ts 1112623 non-null int64
19 words_count 1112623 non-null int64
dtypes: int64(20)
memory usage: 169.8 MB

统计数据集中用户数量

1
trn_click.user_id.nunique()
1
200000

查看最少点击量

1
trn_click.groupby('user_id')['click_article_id'].count().min()
1
2

画直方图大体看一下基本的属性分布

1
2
3
4
5
6
7
8
9
10
11
12
13
plt.figure(figsize=(15, 20))
i = 1
for col in ['click_article_id', 'click_timestamp', 'click_environment', 'click_deviceGroup', 'click_os', 'click_country',
'click_region', 'click_referrer_type', 'rank', 'click_cnts']:
plot_envs = plt.subplot(5, 2, i)
i += 1
v = trn_click[col].value_counts().reset_index()[:10]
fig = sns.barplot(x=v.iloc[:, 0], y=v.iloc[:, 1])
for item in fig.get_xticklabels():
item.set_rotation(90)
plt.title(col)
plt.tight_layout()
plt.show()
image-20260126204838120

从点击时间clik_timestamp来看,分布较为平均,可不做特殊处理

从点击环境click_environment来看分布不均匀

1
2
3
v = trn_click['click_environment'].value_counts().reset_index()
v['ratio'] = v.iloc[:, 1] / v.iloc[:, 1].sum()
print(v.loc[:])
1
2
3
4
   click_environment    count     ratio
0 4 1084627 0.974838
1 2 25894 0.023273
2 1 2102 0.001889

从点击设备组click_deviceGroup来看

1
2
3
v = trn_click['click_deviceGroup'].value_counts().reset_index()
v['ratio'] = v.iloc[:, 1] / v.iloc[:, 1].sum()
print(v.loc[:])
1
2
3
4
5
6
   click_deviceGroup   count     ratio
0 1 678187 0.609539
1 3 395558 0.355518
2 4 38731 0.034811
3 5 141 0.000127
4 2 6 0.000005

训练集的用户ID由0~199999,而测试集A的用户ID由200000~249999

1
tst_click.user_id.nunique()
1
50000
1
tst_click.groupby('user_id')['click_article_id'].count().min()
1
1

注意,这里测试集有只点击过一次文章的用户,这里会有一些冷启动问题

新闻文章信息数据表

1
item_df['words_count'].value_counts()
1
2
3
4
5
6
7
8
9
10
11
12
13
words_count
176 3485
182 3480
179 3463
178 3458
174 3456
...
1040 1
871 1
627 1
473 1
841 1
Name: count, Length: 866, dtype: int64

统计主题数

1
print(item_df['category_id'].nunique())
1
461

查看不同主体的文章数量

1
_ = item_df['category_id'].hist(figsize=(5, 4),grid=False)
image-20260126210915457
1
item_df.shape
1
(364047, 4)

一共有364047篇文章

最重要的语义向量给好

1
item_emb_df.shape
1
(364047, 251)

每篇文章独立向量,向量维度为251维

分析

用户重复点击

1
2
3
4
5
6
7
8
9
user_click_merge = pd.concat([trn_click, tst_click])
# 统计“每个用户 × 每篇文章”被点击了多少次
user_click_count = (
user_click_merge
.groupby(['user_id', 'click_article_id'])['click_timestamp']
.agg({'count'})
.reset_index()
)
user_click_count.loc[:,'count'].value_counts()
1
2
3
4
5
6
7
8
9
10
11
count
1 1605541
2 11621
3 422
4 77
5 26
6 12
10 4
7 3
13 1
Name: count, dtype: int64

agg 可以一次算多个指标

这里虽然只统计count一个量,理论上也可以写成count

在推荐 / CTR / 行为建模代码中,groupby + agg模板化写法,即使只算一个统计量,也会先用 agg

仅有极少数用户重复点击过某篇文章

用户点击环境变化分析

1
2
3
4
5
6
7
8
9
10
11
12
13
def plot_envs(df, cols, r, c, figsize=(8, 4)):
plt.figure(figsize=figsize)
i = 1
for col in cols:
plt.subplot(r, c, i)
i += 1
v = df[col].value_counts().reset_index()
fig = sns.barplot(x=v.iloc[:, 0], y=v.iloc[:, 1])
for item in fig.get_xticklabels():
item.set_rotation(90)
plt.title(col)
plt.tight_layout()
plt.show()
1
2
3
4
5
6
# 分析用户点击环境变化是否明显,这里随机采样10个用户分析这些用户的点击环境分布
sample_user_ids = np.random.choice(tst_click['user_id'].unique(), size=5, replace=False)
sample_users = user_click_merge[user_click_merge['user_id'].isin(sample_user_ids)]
cols = ['click_environment','click_deviceGroup', 'click_os', 'click_country', 'click_region','click_referrer_type']
for _, user_df in sample_users.groupby('user_id'):
plot_envs(user_df, cols, 2, 3, figsize=(8, 4))

根据结果图可以发现绝大多数数的用户的点击环境是比较固定的,可以基于这些环境的统计特征来代表该用户本身的属性

用户点击新闻数量的分布

1
2
3
4
5
6
7
8
9
user_click_item_count = sorted(
user_click_merge
.groupby('user_id')['click_article_id']
.count()
.value,
reverse=True
)
plt.figure(figsize=(5, 3))
_ = plt.plot(user_click_item_count)
image-20260126212654025

可以根据用户的点击文章次数看出用户的活跃度

1
2
plt.figure(figsize=(5, 3))
_ = plt.plot(user_click_item_count[:50])
image-20260126212733873

点击次数排前50的用户的点击次数都在100次以上

思路:可以定义点击次数大于等于100次的用户为活跃用户,这是一种简单的处理思路,判断用户活跃度,更加全面的是再结合上点击时间,会基于点击次数和点击时间两个方面来判断用户活跃度

1
2
plt.figure(figsize=(5, 3))
_ = plt.plot(user_click_item_count[150000:170000])
image-20260126213153392

点击数少于等于2的用户非常多,这些用户可以认为是非活跃用户

新闻点击次数分析

1
2
3
4
5
6
7
8
item_click_count = sorted(
user_click_merge
.groupby('click_article_id')['user_id']
.count(),
reverse=True
)
plt.figure(figsize=(5, 3))
_ = plt.plot(item_click_count)
image-20260126213358753
1
print(item_click_count[300],item_click_count[100], item_click_count[50])
1
1195 3164 5659
1
2
plt.figure(figsize=(5, 3))
_ = plt.plot(item_click_count[10000:])
image-20260126213900615

可以定义点击次数超过一定量的为热门新闻,很多新闻只被点击过一两次,可以被定义为冷门新闻

新闻共现频次:两篇新闻连续出现的次数

统计“文章 i 后面紧跟文章 j”出现了多少次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
tmp = user_click_merge.sort_values('click_timestamp')
tmp['next_item'] = (
tmp
.groupby(['user_id'])['click_article_id']
.transform(lambda x: x.shift(-1))
)
union_item = (
tmp
.groupby(['click_article_id', 'next_item'])['click_timestamp']
.agg({'count'})
.reset_index()
.sort_values('count', ascending=False)
)
union_item[['count']].describe()

统计 item → next_item 的共现次数

1
2
3
4
5
6
7
8
count,433596.000000
mean,3.184146
std,18.851689
min,1.000000
25%,1.000000
50%,1.000000
75%,2.000000
max,2202.000000
1
2
3
4
x = union_item['click_article_id']
y = union_item['count']
plt.figure(figsize=(5, 3))
_ = plt.scatter(x, y)
image-20260126214808386

说明用户看的新闻,相关性是比较强的

用户点击的新闻类型的偏好

此特征可以用于度量用户的兴趣是否广泛

1
2
3
4
5
6
7
plt.figure(figsize=(5, 3))
_ = plt.plot(sorted(
user_click_merge
.groupby('user_id')['category_id']
.nunique(),
reverse=True)
)
image-20260126214933195

有一小部分用户阅读类型是极其广泛的,大部分人都处在20个新闻类型以下

用户查看文章的长度的分布

1
2
3
4
5
6
7
plt.figure(figsize=(5, 3))
_ =plt.plot(sorted(
user_click_merge
.groupby('user_id')['words_count']
.mean(),
reverse=True)
)
image-20260126215128705

从上图中可以发现有一小部分人看的文章平均词数非常高,也有一小部分人看的平均文章次数非常低

1
2
plt.figure(figsize=(5, 3))
_ = plt.plot(sorted(user_click_merge.groupby('user_id')['words_count'].mean(), reverse=True)[1000:45000])
image-20260126215329591

大部分人看的文章小于250字

用户点击新闻的时间分析

1
2
3
4
5
6
7
8
9
10
11
12
13
#为了更好的可视化,这里把时间进行归一化操作
from sklearn.preprocessing import MinMaxScaler

mm = MinMaxScaler()
user_click_merge['click_timestamp'] = mm.fit_transform(
user_click_merge[['click_timestamp']]
) # 用户点击时间

user_click_merge['created_at_ts'] = mm.fit_transform(
user_click_merge[['created_at_ts']]
) # 文章创建时间

user_click_merge = user_click_merge.sort_values('click_timestamp')

建立时间差函数

1
2
3
4
5
def mean_diff_time_func(df, col):
df = pd.DataFrame(df, columns=[col])
df['time_shift1'] = df[col].shift(1).fillna(0)
df['diff_time'] = abs(df[col] - df['time_shift1'])
return df['diff_time'].mean()
1
2
3
4
5
6
mean_diff_click_time = (
user_click_merge
.sort_values('click_timestamp')
.groupby('user_id')['click_timestamp']
.apply(lambda x: mean_diff_time_func(x, 'click_timestamp'))
)
1
2
plt.figure(figsize=(5, 3))
_ = plt.plot(sorted(mean_diff_click_time.values, reverse=True))

从上图可以发现不同用户点击文章的时间差是有差异的

1
2
3
4
5
mean_diff_created_time = (
user_click_merge
.groupby('user_id')[['created_at_ts']]
.apply(mean_diff_time_func, col='created_at_ts')
)
1
2
plt.figure(figsize=(5, 3))
_ = plt.plot(sorted(mean_diff_created_time.values, reverse=True))

从图中可以发现用户先后点击文章,文章的创建时间也是有差异的

1
2
3
4
5
6
7
8
9
# 用户前后点击文章的相似性分布
item_idx_2_rawid_dict = dict(
zip(item_emb_df['article_id'], item_emb_df.index)
) # 匹配'article_id'和emb文件中的行号
del item_emb_df['article_id'] # 删除id数值,后面不需要了
item_emb_np = np.ascontiguousarray(
item_emb_df.values,
dtype=np.float32
) # 转成连续内存的 numpy 数组
1
2
3
4
5
# 随机选择5个用户,查看这些用户前后查看文章的相似性
sub_user_ids = np.random.choice(user_click_merge.user_id.unique(), size=15, replace=False)
sub_user_info = user_click_merge[user_click_merge['user_id'].isin(sub_user_ids)]

sub_user_info.head()
1
2
3
4
5
6
7
8
9
def get_item_sim_list(df):
sim_list = []
item_list = df['click_article_id'].values
for i in range(0, len(item_list)-1):
emb1 = item_emb_np[item_idx_2_rawid_dict[item_list[i]]]
emb2 = item_emb_np[item_idx_2_rawid_dict[item_list[i+1]]]
sim_list.append(np.dot(emb1,emb2)/(np.linalg.norm(emb1)*(np.linalg.norm(emb2))))
sim_list.append(0)
return sim_list
1
2
3
4
plt.figure(figsize=(5, 3))
for _, user_df in sub_user_info.groupby('user_id'):
item_sim_list = get_item_sim_list(user_df)
plt.plot(item_sim_list)
202601262238

从图中可以看出有些用户前后看的商品的相似度波动比较大,有些波动比较小,也是有一定的区分度的

总结

  1. 训练集和测试集的用户id没有重复,也就是测试集里面的用户没有模型是没有见过的
  2. 训练集中用户最少的点击文章数是2, 而测试集里面用户最少的点击文章数是1
  3. 用户对于文章存在重复点击的情况,但这个都存在于训练集里面
  4. 同一用户的点击环境存在不唯一的情况,后面做这部分特征的时候可以采用统计特征
  5. 用户点击文章的次数有很大的区分度,后面可以根据这个制作衡量用户活跃度的特征
  6. 文章被用户点击的次数也有很大的区分度,后面可以根据这个制作衡量文章热度的特征
  7. 用户看的新闻,相关性是比较强的,所以往往判断用户是否对某篇文章感兴趣的时候,在很大程度上会和历史点击过的文章有关
  8. 用户点击的文章字数有比较大的区别,这个可以反映用户对于文章字数的区别
  9. 用户点击过的文章主题也有很大的区别,这个可以反映用户的主题偏好
  10. 不同用户点击文章的时间差也会有所区别,这个可以反映用户对于文章时效性的偏好

Baseline

baseline以ItemCF(基于物品的协同过滤)算法作为召回策略,这是工业界广泛使用的经典方法,具有可解释性强、效果稳定的特点

导包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import gc
import logging
import math
import os
import pickle
import random
import time
import warnings
from collections import defaultdict
from datetime import datetime
from operator import itemgetter
from pathlib import Path

logger = logging.getLogger(__name__)
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
from tqdm import tqdm

# 数据路径
data_path = Path("tcdata")
save_path = Path("user_data")
result_path = Path("prediction_result")
if not os.path.exists(save_path):
os.makedirs(save_path)
if not os.path.exists(result_path):
os.makedirs(result_path)

df节省内存函数

在不改变数据含义的前提下,把 DataFrame 的数值列转成“能用的最小 dtype”,从而节约内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 节约内存的一个标配函数
def reduce_mem(df):
starttime = time.time()
# 只对 整数型 / 浮点型列 做压缩
numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
# 计算原始内存占用
# /1024**2 转为MB
start_mem = df.memory_usage().sum() / 1024**2
for col in df.columns:
col_type = df[col].dtypes
if col_type in numerics:
c_min = df[col].min()
c_max = df[col].max()
# 跳过全空列
if pd.isnull(c_min) or pd.isnull(c_max):
continue
# 如果是int类型
if str(col_type)[:3] == 'int':
# 根据数值大小判断放到哪种类型
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
df[col] = df[col].astype(np.int64)
else:
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
df[col] = df[col].astype(np.float16)
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
df[col] = df[col].astype(np.float64)
end_mem = df.memory_usage().sum() / 1024**2
print(
f'-- Mem. usage decreased to {end_mem:.2f} Mb '
f'({100*(start_mem-end_mem)/start_mem:.1f}% reduction), '
f'time spend:{(time.time()-starttime)/60:.2f} min'
)

return df

采样函数

按用户采样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def get_all_click_sample(data_path, sample_nums=10000):
"""
训练集中采样一部分数据调试
data_path: 原数据的存储路径
sample_nums: 采样数目(这里由于机器的内存限制,可以采样用户做)
"""
all_click = pd.read_csv(data_path / 'train_click_log.csv')
all_user_ids = all_click.user_id.unique()

sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False)
all_click = all_click[all_click['user_id'].isin(sample_user_ids)]
# 去重三列相同的数据
all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
return all_click

按行采样

1
2
3
4
5
6
7
8
9
10
11
def get_all_click_df(data_path, offline=True):
if offline:
all_click = pd.read_csv(data_path / 'train_click_log.csv')[:20000]
else:
trn_click = pd.read_csv(data_path / 'train_click_log.csv')[:10000]
tst_click = pd.read_csv(data_path / 'testA_click_log.csv')[:10000]

all_click = pd.concat([trn_click, tst_click])
# .reset_index(drop=True)是防御性措施
all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp'])).reset_index(drop=True)
return all_click
1
all_click_df = get_all_click_df(data_path, offline=False) # 训练集和测试集都采

用户-文章-点击时间

获取 用户 - 文章 - 点击时间字典

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 根据点击时间获取用户的点击文章序列   {user1: {item1: time1, item2: time2..}...}
def get_user_item_time(click_df):
# 先排序保证组内是时间顺序,就不用多次排了
click_df = click_df.sort_values('click_timestamp')
# 组合文章和时间
def make_item_time_pair(df):
return list(zip(df['click_article_id'], df['click_timestamp']))

user_item_time_df = (
click_df.groupby('user_id')[['click_article_id', 'click_timestamp']]
.apply(lambda x: make_item_time_pair(x))
.reset_index() # 把user_id从index变为普通列,并且变为DataFrame
.rename(columns={0: 'item_time_list'}) # 给组合列重命名
)
# 相当于把user_id重新作为index
user_item_time_dict = dict(
zip(
user_item_time_df['user_id'],
user_item_time_df['item_time_list']
)
)
return user_item_time_dict

Topk热门文章

获取点击最多的Topk个文章

1
2
3
4
# 获取近期点击最多的文章
def get_item_topk_click(click_df, k):
topk_click = click_df['click_article_id'].value_counts().index[:k]
return topk_click

ItemCF相似度

itemCF的物品相似度计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def itemcf_sim(df):
"""
文章与文章之间的相似性矩阵计算
:param df: 数据表
:item_created_time_dict: 文章创建时间的字典
return : 文章与文章的相似性矩阵
"""

user_item_time_dict = get_user_item_time(df)

# 计算物品相似度
i2i_sim = {} # 存放物品两两之间的共现权重
item_cnt = defaultdict(int) # 被多少个用户点过,用于归一化

for user, item_time_list in tqdm(
user_item_time_dict.items(),
disable=not logger.isEnabledFor(logging.DEBUG)
): # 只有在 DEBUG 模式下,才显示 tqdm 进度条;生产 / 正常运行时不显示

# 在基于商品的协同过滤优化的时候可以考虑时间因素
# 遍历
for i, i_click_time in item_time_list:
item_cnt[i] += 1 # 统计文章 i 的点击次数
i2i_sim.setdefault(i, {}) # 给文章i准备一个空字典
# 同样遍历,但是跳过自身
for j, j_click_time in item_time_list:
if(i == j):
continue
i2i_sim[i].setdefault(j, 0)
# 累加共现次数 i2i_sim[i][j] += 1
# 这里用的加权共现
i2i_sim[i][j] += 1 / math.log(len(item_time_list) + 1)

i2i_sim_ = i2i_sim.copy()
for i, related_items in i2i_sim.items():
for j, wij in related_items.items():
# \frac{C[i][j]}{\sqrt{|N(i)| \cdot |N(j)|}}
i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])

# 将得到的相似性矩阵保存到本地
save_dir = save_path / 'tmp_data'
save_dir.mkdir(parents=True, exist_ok=True)

pickle.dump(
i2i_sim_,
open(save_dir / 'itemcf_i2i_sim.pkl', 'wb')
)

return i2i_sim_

i2i_sim = itemcf_sim(all_click_df)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
i = 101,j=102
---
i2i_sim.setdefault(i, {})
i2i_sim = {
101: {}
}
---
i2i_sim[i].setdefault(j, 0)
i2i_sim = {
101: {
102: Cij
}
}

itemCF推荐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 基于商品的召回i2i
def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click):
"""
基于文章协同过滤的召回
:param user_id: 用户id
:param user_item_time_dict: 字典, 根据点击时间获取用户的点击文章序列 {user1: {item1: time1, item2: time2..}...}
:param i2i_sim: 字典,文章相似性矩阵
:param sim_item_topk: 整数, 选择与当前文章最相似的前k篇文章
:param recall_item_num: 整数, 最后的召回文章数量
:param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全
return: 召回的文章列表 {item1:score1, item2: score2...}
"""

# 获取用户历史交互的文章
user_hist_items = user_item_time_dict[user_id]
user_hist_item_set = {item for item, _ in user_hist_items}

item_rank = {} # 打分字典
# loc为第几次点击,i为文章id
for loc, (i, click_time) in enumerate(user_hist_items):
for j, wij in sorted(
i2i_sim[i].items(),
key=lambda x: x[1],
reverse=True
)[:sim_item_topk]: # 取和文章i最相似的前k篇
if j in user_hist_item_set:
continue # 用户已经点过的就不推荐了

item_rank.setdefault(j, 0)
item_rank[j] += wij # 给候选文章累加分

# 不足10个,用热门文章补全
if len(item_rank) < recall_item_num:
for i, item in enumerate(item_topk_click):
if item in item_rank: # 填充的item应该不在原来的列表中
continue
item_rank[item] = - i - 100 # 给个负数让排名靠后,不影响真实
if len(item_rank) == recall_item_num:
break

item_rank = sorted(
item_rank.items(),
key=lambda x: x[1],
reverse=True
)[:recall_item_num]

return item_rank

给每个用户根据物品的协同过滤推荐文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 定义结果字典
user_recall_items_dict = defaultdict(dict)

# 获取 用户 - 文章 - 点击时间的字典
user_item_time_dict = get_user_item_time(all_click_df)

# 去取文章相似度
i2i_sim = pickle.load(open(save_path / 'tmp_data/itemcf_i2i_sim.pkl', 'rb'))

# 相似文章的数量
sim_item_topk = 10

# 召回文章数量
recall_item_num = 10

# 用户热度补全
item_topk_click = get_item_topk_click(all_click_df, k=50)

for user in tqdm(all_click_df['user_id'].unique(), disable=not logger.isEnabledFor(logging.DEBUG)):
user_recall_items_dict[user] = item_based_recommend(
user,
user_item_time_dict,
i2i_sim,
sim_item_topk,
recall_item_num,
item_topk_click
)
1
2
3
4
5
user_recall_items_dict = {
user1: [(item1, score1)...],
user2: [(item2, score2)...],
...
}

召回字典转换成df

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 将字典的形式转换成df
user_item_score_list = []

for user, items in tqdm(user_recall_items_dict.items(), disable=not logger.isEnabledFor(logging.DEBUG)):
for item, score in items:
user_item_score_list.append([user, item, score])
"""
user_item_score_list =
[
[user1, itemA, scoreA],
[user1, itemB, scoreB],
[user2, itemC, scoreC],
...
]
"""

recall_df = pd.DataFrame(user_item_score_list, columns=['user_id', 'click_article_id', 'pred_score'])
recall_df.head()

提交函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 生成提交文件
def submit(recall_df, topk=5, model_name=None):
# 按用户和预测分做一个排序
recall_df = recall_df.sort_values(by=['user_id', 'pred_score'])
# 打上名次
recall_df['rank'] = (
recall_df
.groupby(['user_id'])['pred_score']
.rank(ascending=False, method='first')
)

# 判断是不是每个用户都有5篇文章及以上
tmp = recall_df.groupby('user_id').apply(lambda x: x['rank'].max())
assert tmp.min() >= topk

del recall_df['pred_score'] # 删除预测分数列
submit = (
recall_df[recall_df['rank'] <= topk]
.set_index(['user_id', 'rank']) # 设置多重索引
.unstack(-1) # 把 rank 展开成列
.reset_index() # 把 user_id 从 index 拉回普通列
)

submit.columns = [
int(col) if isinstance(col, int) else col
for col in submit.columns.droplevel(0)
]

# 按照提交格式定义列名
submit = submit.rename(columns={
'': 'user_id',
1: 'article_1',
2: 'article_2',
3: 'article_3',
4: 'article_4',
5: 'article_5'
})


save_name = save_path / (
model_name + '_' + datetime.today().strftime('%m-%d') + '.csv'
)

submit.to_csv(save_name, index=False, header=True)
1
2
3
4
5
6
7
8
9
10
# 获取测试集
tst_click = pd.read_csv(data_path / 'testA_click_log.csv')
tst_users = tst_click['user_id'].unique()

# 从所有的召回数据中将测试集中的用户选出来
# 因为只用到了行为,没有用到标签,所以混杂测试集进去没有问题
tst_recall = recall_df[recall_df['user_id'].isin(tst_users)]

# 生成提交文件
submit(tst_recall, topk=5, model_name='itemcf_baseline')

这种简单的ItemCF提交得分为 0.1026

多路召回

多路召回策略是指使用多种不同的简单策略、特征或轻量模型,分别召回一部分候选集,再将这些候选集合并,供后续排序模型使用

这种策略本质上是在计算速度与召回率之间的权衡:

  • 各类简单策略保证召回过程足够快
  • 从不同角度设计的召回规则共同提升整体召回率,避免影响最终排序效果

在多路召回中,各个召回策略彼此独立,通常可以并行执行(如多线程),从而提升整体效率

3_multi_channel_recall

需要注意的是,具体采用哪些召回策略高度依赖业务场景

不同任务对应不同的真实需求,因此召回规则也会有所差异,例如在新闻或视频推荐中,常见的召回方式包括:热门内容、导演召回、演员召回、近期上映、流行趋势、类型召回等

导包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import os, math, warnings, math, pickle, random
from pathlib import Path
from datetime import datetime
from collections import defaultdict
import logging
warnings.filterwarnings('ignore')
# 限制线程数:防止 faiss / numpy 抢 CPU,线上或多进程时很常见
os.environ['OMP_NUM_THREADS'] = '1'
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

import faiss # Facebook 开源的 高性能向量检索库

import pandas as pd
import numpy as np
from tqdm import tqdm


from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.sequence import pad_sequences
1
2
3
4
5
6
7
8
9
10
# 数据路径
data_path = Path('tcdata')
temp_path = Path("user_data/tmp_data")
result_path = Path("prediction_result")
if not os.path.exists(temp_path):
os.makedirs(temp_path)
if not os.path.exists(result_path):
os.makedirs(result_path)
# 做召回评估的一个标志, 如果不进行评估就是直接使用全量数据进行召回
metric_recall = False

读取数据

在推荐系统比赛中,数据读取通常分为三种模式,对应不同阶段与数据集使用方式

  1. Debug 模式:用于快速搭建并跑通 baseline,验证整体代码流程是否正确

    在该阶段一般从训练集中随机抽取一小部分样本(如 train_click_log_sample)进行调试,确保逻辑无误即可

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    def get_all_click_sample(data_path, sample_nums=10000):
    """
    训练集中采样一部分数据调试
    data_path: 原数据的存储路径
    sample_nums: 采样数目(这里由于机器的内存限制,可以采样用户做)
    """
    all_click = pd.read_csv(data_path / 'train_click_log.csv')
    all_user_ids = all_click.user_id.unique()

    sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False)
    all_click = all_click[all_click['user_id'].isin(sample_user_ids)]
    # 去重三列相同的数据
    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
    return all_click
  2. 线下验证模式:用于模型选择与超参数调优

    此阶段加载完整训练集(train_click_log),并将其划分为训练集和验证集:训练集用于模型训练,验证集用于评估效果并调整模型与参数

  3. 线上模式:在 baseline 已跑通、模型与参数已确定后,进入最终预测阶段

    该模式使用全量数据进行训练,即 train_click_log + test_click_log,对测试集生成预测结果并提交线上评测

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    # 读取点击数据,这里分成线上和线下,如果是为了获取线上提交结果应该讲测试集中的点击数据合并到总的数据中
    # 如果是为了线下验证模型的有效性或者特征的有效性,可以只使用训练集
    def get_all_click_df(data_path, offline=True):
    if offline:
    all_click = pd.read_csv(data_path / 'train_click_log.csv')
    else:
    trn_click = pd.read_csv(data_path / 'train_click_log.csv')
    tst_click = pd.read_csv(data_path / 'testA_click_log.csv')

    # all_click = trn_click.append(tst_click)
    all_click = pd.concat([trn_click, tst_click]).reset_index(drop=True)

    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
    return all_click
1
2
3
4
5
6
7
8
# 读取文章的基本属性
def get_item_info_df(data_path):
item_info_df = pd.read_csv(data_path / 'articles.csv')

# 为了方便与训练集中的click_article_id拼接,需要把article_id修改成click_article_id
item_info_df = item_info_df.rename(columns={'article_id': 'click_article_id'})

return item_info_df
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 读取文章的Embedding数据
def get_item_emb_dict(data_path):
item_emb_df = pd.read_csv(data_path / 'articles_emb.csv')

item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
# 将 embedding 转成 NumPy 数组,并保证内存连续。
item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols])
# 对每个文章向量做 L2 归一化,向量点积 ≈ 余弦相似度
item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
# 构建 article_id → embedding 的映射字典
item_emb_dict = dict(zip(item_emb_df['article_id'], item_emb_np))
pickle.dump(item_emb_dict, open(save_path / 'item_content_emb.pkl', 'wb'))

return item_emb_dict
1
2
3
4
5
6
7
8
9
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
# 采样数据
all_click_df = get_all_click_sample(data_path)

# 全量训练集
# all_click_df = get_all_click_df(data_path, offline=False)

# 对时间戳进行归一化,用于在关联规则的时候计算权重
all_click_df['click_timestamp'] = all_click_df[['click_timestamp']].apply(max_min_scaler)
1
item_info_df = get_item_info_df(data_path)
1
item_emb_dict = get_item_emb_dict(data_path)

工具函数