【搜广推打怪升级】1. ItemCF实践

前言:本人开通此博客,期望能够督促自己提升代码能力。

个人情况:本2泛商科专业,9硕在读,较熟悉sql数据处理,仅有一点python皮毛。

今天正式开始搜广推打怪升级之路。

其实在开始今天的博客之路之前,已经基本过了一遍①王树森老师的推荐算法公开课(b站直搜即可),②项亮老师的《推荐系统实践》,③王喆老师的《深度学习推荐系统》,石塔西老师的《互联网大厂推荐算法实战》也在拜读中。虽然对如此丰富又优秀的资源进行了学习,但落实到代码上,还是很吃力,所以希望能够在学习完别人的代码后,能够自己按照自己的习惯独自写出代码。

  • 以天池新闻推荐的数据为例。https://tianchi.aliyun.com/competition/entrance/531842

  • 今天主要总结ItemCF的代码及其实践,主要参考源为Datawhale的FunRec课程。https://datawhalechina.github.io/fun-rec/#/

简单回顾一下协同过滤

协同过滤的定义:就是协同大家的反馈、评价和意见一起对海量的信息进行过滤,从中筛选出目标用户可能感兴趣的信息的推荐过程。
协同过滤的算法流程:(以电商网站商品推荐为例,决定是否向用户 X 推荐商品 A, or 决定用户 X 的推荐物品列表)

  • 有 n 种商品 A, B, C, D, ……;有 m 个用户 X, Y, Z ……
  • 可利用的数据:有用户 X 对其它商品的历史评价数据,以及其它用户对各商品的历史评价数据(当然每个用户对每种商品的评价数据不一定全都有)
  • 将以上数据转换为共现矩阵:用户作为矩阵行坐标,商品作为列坐标,每个元素代表用户对商品的评分/喜好
  • 问题转换:预测共现矩阵中 X 行 A 列这个缺失值
    • 如果使用用户协同过滤(UserCF)算法:基于用户相似度,进行推荐
    • 如果使用物品协同过滤(ItemCF)算法:基于物品相似度,进行推荐

两个向量\(i\)\(j\)之间的相似度\(sim(i,j)\)计算公式:
以余弦相似度为例:\(sim(i,j) = \frac{i \cdot j}{\Vert i \Vert \cdot \Vert j \Vert }\)

基于物品的协同过滤ItemCF

  1. 基于历史数据构建 m 行 n 列的共现矩阵(m 个用户,n 种商品)
    • 共现矩阵第 j 列的列向量就是第 j 个物品的物品向量
  2. 计算物品相似度:计算共现矩阵中两两列向量之间的相似度,构建 n×n 的物品相似度矩阵
  3. 获取和目标用户 X 历史上感兴趣的物品列表(正反馈物品集合)
  4. 针对正反馈物品集合,通过物品相似度矩阵,计算各物品和正反馈物品的相似性,找出最相似的 Top k 个物品,组成相似物品集合
    • 一个物品的相似度是该物品和正反馈物品集合中所有物品相似度的累加
    • 用户 u 对物品 p 的预测评分:\(R_{u,p}=\sum_{h\in H}(w_{p,h}\cdot R_{u,h})\),H 是目标用户的正反馈物品集合,\(w_{p,h}\) 是物品 p 与 h 的物品相似度,\(R_{u,h}\) 是用户 u 对物品 h 的已有评分
  5. 根据各物品的相似度打分进行排序,生成最终的物品推荐列表

ItemCF代码实践

最终成绩:0.1484

0. 导包

# import packages
import time, math, os
from tqdm import tqdm
import gc
import pickle
import random
from datetime import datetime
from operator import itemgetter
import numpy as np
import pandas as pd
import warnings
from collections import defaultdict
warnings.filterwarnings('ignore')

1. 读取数据

# 设置数据路径
data_path = '../tcdata/'
middle_path = '../user_data/'
save_path = '../prediction_result/'

在一般的推荐系统比赛中读取数据部分主要分为三种模式, 不同的模式对应的不同的数据集:

  1. Debug模式: 这个的目的是帮助我们基于数据先搭建一个简易的baseline并跑通, 保证写的baseline代码没有什么问题。 由于推荐比赛的数据往往非常巨大, 如果一上来直接采用全部的数据进行分析,搭建baseline框架, 往往会带来时间和设备上的损耗, 所以这时候我们往往需要从海量数据的训练集中随机抽取一部分样本来进行调试(train_click_log_sample), 先跑通一个baseline。
  2. 线下验证模式: 这个的目的是帮助我们在线下基于已有的训练集数据, 来选择好合适的模型和一些超参数。 所以我们这一块只需要加载整个训练集(train_click_log), 然后把整个训练集再分成训练集和验证集。 训练集是模型的训练数据, 验证集部分帮助我们调整模型的参数和其他的一些超参数。
  3. 线上模式: 我们用debug模式搭建起一个推荐系统比赛的baseline, 用线下验证模式选择好了模型和一些超参数, 这一部分就是真正的对于给定的测试集进行预测, 提交到线上, 所以这一块使用的训练数据集是全量的数据集(train_click_log+test_click_log)

下面就分别对这三种不同的数据读取模式先建立不同的代导入函数, 方便后面针对不同的模式下导入数据。

# debug模式: 从训练集中划出一部分数据来调试代码
def get_all_click_sample(data_path, sample_nums=10000):
    """
        训练集中采样一部分数据调试
        data_path: 原数据的存储路径
        sample_nums: 采样数目(这里由于机器的内存限制,可以采样用户做)
    """
    all_click = pd.read_csv(data_path + 'train_click_log.csv')
    all_user_ids = all_click.user_id.unique()    # 返回的是一个 numpy.ndarray 类型的数组

    sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False) 
    all_click = all_click[all_click['user_id'].isin(sample_user_ids)]
    
    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
    return all_click

# 读取点击数据,这里分成线上和线下,如果是为了获取线上提交结果应该将测试集中的点击数据合并到总的数据中
# 如果是为了线下验证模型的有效性或者特征的有效性,可以只使用训练集
def get_all_click_df(data_path, offline=True):
    if offline:
        all_click = pd.read_csv(data_path + 'train_click_log.csv')
    else:
        trn_click = pd.read_csv(data_path + 'train_click_log.csv')
        tst_click = pd.read_csv(data_path + 'testA_click_log.csv')

        all_click = pd.concat([trn_click, tst_click])
    
    all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
    return all_click

# 采样数据
# all_click_df = get_all_click_sample(data_path)
# 全量训练集
all_click_df = get_all_click_df(data_path, offline=False)
# 读取文章的基本属性
def get_item_info_df(data_path):
    item_info_df = pd.read_csv(data_path + 'articles.csv')
    
    # 为了方便与训练集中的click_article_id拼接,需要把article_id修改成click_article_id
    item_info_df = item_info_df.rename(columns={'article_id': 'click_article_id'})
    
    return item_info_df
item_info_df = get_item_info_df(data_path)

2. 获取 【用户-文章-点击时间】 字典

# 根据点击时间获取用户的点击文章序列   {user1: {item1: time1, item2: time2..}...}
def get_user_item_time(click_df):
    
    click_df = click_df.sort_values('click_timestamp')
    # 定义一个辅助函数,用于将文章 ID 和点击时间组合成元组列表
    def make_item_time_pair(df):
        return dict(zip(df['click_article_id'], df['click_timestamp']))
    
    user_item_time_df = click_df.groupby('user_id')[['click_article_id', 'click_timestamp']].apply(lambda x: make_item_time_pair(x)).reset_index().rename(columns={0: 'item_time_dict'})
    user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_dict']))
    
    return user_item_time_dict
user_item_time_dict = get_user_item_time(all_click_df)

3. 获取点击最多的topk个新闻

def get_item_topk_click(click_df, k):
    topk_click = click_df['click_article_id'].value_counts().index[:k]
    return topk_click

4. 获取文章属性特征

# 获取文章id对应的基本属性,保存成字典的形式,方便后面召回阶段,冷启动阶段直接使用
def get_item_info_dict(item_info_df):
    max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
    item_info_df['created_at_ts'] = item_info_df[['created_at_ts']].apply(max_min_scaler)
    
    item_type_dict = dict(zip(item_info_df['click_article_id'], item_info_df['category_id']))
    item_words_dict = dict(zip(item_info_df['click_article_id'], item_info_df['words_count']))
    item_created_time_dict = dict(zip(item_info_df['click_article_id'], item_info_df['created_at_ts']))
    
    return item_type_dict, item_words_dict, item_created_time_dict
item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)

5. ItemCF i2i_sim计算

借鉴KDD2020的去偏商品推荐,在计算item2item相似性矩阵时,使用关联规则,使得计算的文章的相似性还考虑到了:

  • 用户点击的时间权重
  • 用户点击的顺序权重
  • 文章创建的时间权重
def itemcf_sim(df, item_created_time_dict):
    """
        文章与文章之间的相似性矩阵计算
        :param df: 数据表
        :item_created_time_dict:  文章创建时间的字典
        return : 文章与文章的相似性矩阵
        
        思路: 基于物品的协同过滤(详细请参考上一期推荐系统基础的组队学习) + 关联规则
    """
    
    user_item_time_dict = get_user_item_time(df)
    
    # 计算物品相似度
    # 最终得到{item1: {item2: w12, item3: w13, ... }, ...}
    i2i_sim = {}
    item_count = defaultdict(int)
    for user, item_time_dict in tqdm(user_item_time_dict.items()):
        # 在基于新闻的协同过滤优化的时候可以考虑时间因素
        for loc_1, (i, i_click_time) in enumerate(item_time_dict.items()):
            # 统计文章 i 的点击次数
            item_count[i] += 1
            i2i_sim.setdefault(i, {})
            for loc_2, (j, j_click_time) in enumerate(item_time_dict.items()):
                if(i == j):
                    continue
                    
                # 考虑文章的正向顺序点击和反向顺序点击
                # 如果 j 在 i 之后点击,loc_alpha 为 1.0,否则为 0.7  
                loc_alpha = 1.0 if loc_2 > loc_1 else 0.7
                # 位置信息权重,其中的参数可以调节,点击顺序相差越大,权重越小
                loc_weight = loc_alpha * (0.9 ** (np.abs(loc_2 - loc_1) - 1))
                
                # 点击时间权重,其中的参数可以调节,点击时间间隔越大,权重越小
                click_time_weight = np.exp(0.7 ** np.abs(i_click_time - j_click_time))
                
                # 两篇文章创建时间的权重,其中的参数可以调节,创建时间间隔越大,权重越小
                created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))

                # 初始化文章 i 对文章 j 的相似度
                i2i_sim[i].setdefault(j, 0)

                # 考虑多种因素的权重计算最终的文章之间的相似度
                # 使用 += ,主要是因为在计算 i2i_sim 时,需要对多个用户行为所贡献的相似度得分进行累加。
                # 除以 math.log(len(item_time_list) + 1) 是为了对用户点击文章数量进行惩罚
                i2i_sim[i][j] += loc_weight * click_time_weight * created_time_weight / math.log(len(item_time_dict) + 1)
                
    i2i_sim_normalization = i2i_sim.copy()
    for i, related_items in i2i_sim.items():
        for j, wij in related_items.items():
            # 除以两篇文章点击次数的几何平均数进行归一化
            i2i_sim_normalization[i][j] = wij / math.sqrt(item_count[i] * item_count[j])
    
    # 将得到的相似性矩阵保存到本地临时中间文件
    pickle.dump(i2i_sim_normalization, open(middle_path + 'itemcf_i2i_sim.pkl', 'wb'))
    
    return i2i_sim_normalization
i2i_sim = itemcf_sim(all_click_df, item_created_time_dict)

6. ItemCF Recall

# 基于商品的召回
def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict):
    """
        基于文章协同过滤的召回
        :param user_id: 用户id
        :param user_item_time_dict: 字典, 根据点击时间获取用户的点击文章序列   {user1: {item1: time1, item2: time2, ...}, ...}
        :param i2i_sim: 字典,文章相似性矩阵  {item1: {item2: w12, item3: w13, ... }, ...}
        :param sim_item_topk: 整数,选择与当前文章最相似的前k篇文章
        :param recall_item_num: 整数,最后的召回文章数量
        :param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全
        :param emb_i2i_sim: 字典基于内容embedding算的文章相似矩阵
        
        return: 召回的文章列表 {item1:score1, item2: score2...}
        
    """
    # 获取用户历史交互的文章
    user_hist_items = user_item_time_dict.get(user)
    
    item_rank = {}
    for loc, (i, click_time) in enumerate(user_hist_items.items()):
        for j, wij in sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]:
            if j in user_hist_items:
                continue
            
            # 文章创建时间差权重
            created_time_weight = np.exp(0.8 ** np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
            # 相似文章和历史点击文章序列中历史文章所在的位置权重
            loc_weight = (0.9 ** (len(user_hist_items) - loc))
                
            item_rank.setdefault(j, 0)
            item_rank[j] += created_time_weight * loc_weight * wij
    
    # 不足10个,用热门商品补全
    if len(item_rank) < recall_item_num:
        for i, item in enumerate(item_topk_click):
            if item in item_rank.items(): # 填充的item应该不在原来的列表中
                continue
            item_rank[item] = - i - 100 # 随便给个负数就行
            if len(item_rank) == recall_item_num:
                break
    
    item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
        
    return item_rank

7. 为每个用户根据ItemCF推荐新闻

# 定义
user_recall_items_dict = defaultdict(dict)

# 获取文章相似度
i2i_sim = pickle.load(open(middle_path + 'itemcf_i2i_sim.pkl', 'rb'))

# 相似文章的数量
sim_item_topk = 20

# 召回文章数量
recall_item_num = 20

# 用户热度补全
item_topk_click = get_item_topk_click(all_click_df, k=50)

# 为所有用户生成召回物品列表
# {user1: [(item1, score1), (item2, score2), ...], user2: [(item3, score3), (item4, score4), ...], ...}
for user in tqdm(all_click_df['user_id'].unique()):
    user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict)

8. 召回字典转换成df

# 将字典的形式转换成df
user_item_score_list = []

for user, items in tqdm(user_recall_items_dict.items()):
    for item, score in items:
        user_item_score_list.append([user, item, score])

recall_df = pd.DataFrame(user_item_score_list, columns=['user_id', 'click_article_id', 'pred_score'])

9. 生成提交文件

# 生成提交文件
def submit(recall_df, topk=5, model_name=None):
    recall_df = recall_df.sort_values(by=['user_id', 'pred_score'])
    
    # 添加排序
    # method='first' 表示若分数相同则按数据出现顺序排名。
    recall_df['rank'] = recall_df.groupby(['user_id'])['pred_score'].rank(ascending=False, method='first')
    recall_df['rank'] = recall_df['rank'].astype(int)
    
    # 判断是不是每个用户都有5篇文章及以上
    # 按 user_id 分组,找出每组内 rank 的最大值。
    tmp = recall_df.groupby('user_id').apply(lambda x: x['rank'].max())
    # 确保每个用户至少有 topk 篇文章被召回,若不满足则抛出 AssertionError
    assert (tmp >= topk).all()
    
    del recall_df['pred_score']
    
    # 筛选出 rank 小于等于 topk 的记录
    # unstack 函数用于将数据从长格式转换为宽格式。-1 表示将最内层的索引(这里是 rank)转换为列。
    submit = recall_df[recall_df['rank'] <= topk].set_index(['user_id', 'rank']).unstack(-1).reset_index()
    
    # 遍历移除顶层索引后的列名列表,对于那些原本是整数类型的列名,将其转换为整数;对于非整数类型的列名,保持不变。
    # droplevel(0) 用于移除列名的最顶层索引,只保留底层索引。
    submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)]
    
    # 按照提交格式定义列名
    submit = submit.rename(columns={'': 'user_id', 1: 'article_1', 2: 'article_2', 
                                                  3: 'article_3', 4: 'article_4', 5: 'article_5'})
    
    save_name = save_path + model_name + '_' + datetime.today().strftime('%m-%d') + '.csv'
    submit.to_csv(save_name, index=False, header=True)

10. 运行

# 获取测试集
test_click = pd.read_csv(data_path + 'testA_click_log.csv')
test_users = test_click['user_id'].unique()

# 从所有的召回数据中将测试集中的用户选出来
test_recall = recall_df[recall_df['user_id'].isin(test_users)]

# 生成提交文件
submit(test_recall, topk=5, model_name='itemcf_baseline')

来源链接:https://www.cnblogs.com/WarmZz/p/18758717

© 版权声明
THE END
支持一下吧
点赞6 分享
评论 抢沙发
头像
请文明发言!
提交
头像

昵称

取消
昵称表情代码快捷回复

    暂无评论内容