CTR示例模型
本章节将通过一个简单的CTR模型介绍 RecIS 的基础使用方法,帮助初学者快速上手,并了解更多特征处理用法。 具体训练代码可参考 examples/deepctr 目录
数据构建
导入需要模块
import os
import random
import string
import numpy as np
import pyarrow as pa
import pyarrow.orc as orc
生成数据
# 数据产出目录,数据信息定义
file_dir = "./fake_data/"
os.makedirs(file_dir, exist_ok=True)
bs = 2047
file_num = 10
dense1 = np.random.rand(bs, 8)
dense2 = np.random.rand(bs, 1)
label = np.floor(np.random.rand(bs, 1) + 0.5, dtype=np.float32)
sparse1 = np.arange(bs, dtype=np.int64).reshape(bs, 1)
sparse2 = np.arange(bs, dtype=np.int64).reshape(bs, 1)
sparse3 = np.arange(bs, dtype=np.int64).reshape(bs, 1)
# 生成长序列特征
long_int_seq = []
for i in range(bs):
seq_len = np.random.randint(1, 2000, dtype=np.int64)
sequence = np.random.randint(0, 1000000, size=seq_len, dtype=np.int64).tolist()
long_int_seq.append(sequence)
def generate_random_string(length):
characters = string.ascii_letters + string.digits
return ''.join(random.choices(characters, k=length))
# 生成长字符串序列特征
strs = []
for i in range(1000):
strs.append(generate_random_string(10))
long_str_seq = []
for i in range(bs):
seq_len = np.random.randint(1, 2000, dtype=np.int64)
sequence = random.choices(strs, k=seq_len)
long_str_seq.append(sequence)
data = {
"label": label.tolist(),
"dense1": dense1.tolist(),
"dense2": dense2.tolist(),
"sparse1": sparse1.tolist(),
"sparse2": sparse2.tolist(),
"sparse3": sparse3.tolist(),
"sparse4": long_int_seq,
"sparse5": long_str_seq,
}
table = pa.Table.from_pydict(data)
# 生成数据
for i in range(file_num):
orc.write_table(table, os.path.join(file_dir, "data_{}.orc".format(i)))
数据定义
定义IO参数
from dataclasses import dataclass
@dataclass
class IOArgs:
data_paths: str
batch_size: int
# 读取数据的并发数
thread_num: int
# 数据预取数量
prefetch: int
drop_remainder: bool
构建dataset
import os
import torch
import recis
from recis.io.orc_dataset import OrcDataset
def get_dataset(io_args):
# 获取当前分布式模式下的rank id和rank num,用于数据并行
worker_idx = int(os.environ.get("RANK", 0))
worker_num = int(os.environ.get("WORLD_SIZE", 1))
dataset = OrcDataset(
io_args.batch_size,
worker_idx=worker_idx,
worker_num=worker_num,
read_threads_num=io_args.thread_num,
prefetch=io_args.prefetch,
is_compressed=False,
drop_remainder=io_args.drop_remainder,
# 数据预处理
transform_fn=[lambda x: x[0]],
dtype=torch.float32,
# batch打包结果直接place到cuda上
device="cuda",
save_interval=None,
)
data_paths = io_args.data_paths.split(",")
for path in data_paths:
dataset.add_path(path)
# 设定需要读取的特征列
# 读取定长特征,以及默认值
dataset.fixedlen_feature("label", [0.0])
dataset.fixedlen_feature("dense1", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
dataset.fixedlen_feature("dense2", [0.0])
# 读取变长特征
dataset.varlen_feature("sparse1")
dataset.varlen_feature("sparse2")
dataset.varlen_feature("sparse3")
dataset.varlen_feature("sparse4")
# sparse5 是一个需要进行哈希处理的字符串序列。
# 可以在数据集中通过设置 hash_type="farm" 来执行哈希操作,
# 或者通过设置 hash_type=None 和 trans_int8=True 来将字符串
# 读取为 int8 的字节流,之后再通过 HashOp 执行哈希。
dataset.varlen_feature("sparse5", hash_type=None, trans_int8=True)
return dataset
特征处理配置
from recis.features.feature import Feature
from recis.features.op import (
Bucketize,
SelectField,
SelectFields,
FeatureCross,
SequenceTruncate,
Mod,
)
def get_feature_conf():
feature_confs = []
# dense1特征直接读取,dim为8
feature_confs.append(Feature("dense1").add_op(SelectField("dense1", dim=8)))
# dense2特征,dim为1,需要做分桶转换
feature_confs.append(
Feature("dense2")
.add_op(SelectField("dense2", dim=1))
.add_op(Bucketize([0, 0.5, 1]))
)
# sparse1 / sparse2特征,直接读取
feature_confs.append(Feature("sparse1").add_op(SelectField("sparse1")))
feature_confs.append(Feature("sparse2").add_op(SelectField("sparse2")))
# sparse3特征,做10000的取模计算处理
feature_confs.append(
Feature("sparse3").add_op(SelectField("sparse3")).add_op(Mod(10000))
)
# sparse4特征,做取模计算和截断处理
feature_confs.append(
Feature("sparse4")
.add_op(SelectField("sparse4"))
.add_op(Mod(10000))
.add_op(SequenceTruncate(seq_len=1000,
truncate=True,
truncate_side="right",
check_length=True,
n_dims=2))
)
# sparse5特征,做哈希、取模和截断处理
feature_confs.append(
Feature("sparse5")
.add_op(SelectField("sparse5"))
.add_op(Hash(hash_type="farm"))
.add_op(Mod(10000))
.add_op(SequenceTruncate(seq_len=1000,
truncate=True,
truncate_side="right",
check_length=True,
n_dims=2))
)
# sparse1_x_sparse2特征,做特征交叉
feature_confs.append(
Feature("sparse1_x_sparse2")
.add_op(SelectFields([SelectField("sparse1"), SelectField("sparse2")]))
.add_op(FeatureCross())
.add_op(Mod(1000))
)
return feature_confs
Embedding配置
from recis.nn.initializers import Initializer, TruncNormalInitializer
from recis.nn.modules.embedding import EmbeddingOption
def get_embedding_conf():
emb_conf = {}
# dense2特征查找dim=8,name=sparse1的emb表
emb_conf["dense2"] = EmbeddingOption(
embedding_dim=8,
shared_name="sparse1",
combiner="sum",
initializer=TruncNormalInitializer(mean=0, std=0.01),
device=torch.device("cuda"),
)
# sparse1特征查找dim=8,name=sparse1的emb表(和dense2共用同一张emb表)
emb_conf["sparse1"] = EmbeddingOption(
embedding_dim=8,
shared_name="sparse1",
combiner="sum",
initializer=TruncNormalInitializer(mean=0, std=0.01),
device=torch.device("cuda"),
)
# sparse2特征查找dim=16,name=sparse2的emb表
emb_conf["sparse2"] = EmbeddingOption(
embedding_dim=16,
shared_name="sparse2",
combiner="sum",
initializer=TruncNormalInitializer(mean=0, std=0.01),
device=torch.device("cuda"),
)
# sparse3特征查找dim=8,name=sparse3的emb表
emb_conf["sparse3"] = EmbeddingOption(
embedding_dim=8,
shared_name="sparse3",
combiner="sum",
initializer=TruncNormalInitializer(mean=0, std=0.01),
device=torch.device("cuda"),
)
emb_conf["sparse4"] = EmbeddingOption(
embedding_dim=16,
shared_name="sparse4",
combiner="tile",
initializer=TruncNormalInitializer(mean=0, std=0.01),
device=torch.device("cuda"),
combiner_kwargs={"tile_len": 1000}
)
emb_conf["sparse5"] = EmbeddingOption(
embedding_dim=16,
shared_name="sparse5",
combiner="tile",
initializer=TruncNormalInitializer(mean=0, std=0.01),
device=torch.device("cuda"),
combiner_kwargs={"tile_len": 1000}
)
emb_conf["sparse1_x_sparse2"] = EmbeddingOption(
embedding_dim=16,
shared_name="sparse1_x_sparse2",
combiner="mean",
initializer=TruncNormalInitializer(mean=0, std=0.01),
device=torch.device("cuda"),
)
return emb_conf
模型定义
定义稀疏部分模型
import torch
import torch.nn as nn
from recis.features.feature_engine import FeatureEngine
from recis.nn import EmbeddingEngine
class SparseModel(nn.Module):
def __init__(self):
super().__init__()
# 构建特征处理引擎
self.feature_engine = FeatureEngine(feature_list=get_feature_conf())
# 构建Embedding处理引擎
self.embedding_engine = EmbeddingEngine(get_embedding_conf())
def forward(self, samples: dict):
samples = self.feature_engine(samples)
samples = self.embedding_engine(samples)
labels = samples.pop("label")
return samples, labels
定义稠密部分模型
class DenseModel(nn.Module):
def __init__(self):
super().__init__()
layers = []
layers.extend([nn.Linear(8 + 8 + 8 + 16 + 8 + 16000 + 16000 + 16, 128), nn.ReLU()])
layers.extend([nn.Linear(128, 64), nn.ReLU()])
layers.extend([nn.Linear(64, 32), nn.ReLU()])
layers.extend([nn.Linear(32, 1)])
self.dnn = nn.Sequential(*layers)
def forward(self, x):
x = self.dnn(x)
logits = torch.sigmoid(x)
return logits
定义完整模型
from recis.framework.metrics import add_metric
from recis.metrics.auroc import AUROC
class DeepCTR(nn.Module):
def __init__(self):
super().__init__()
self.sparse_arch = SparseModel()
self.dense_arch = DenseModel()
self.auc_metric = AUROC(num_thresholds=200, dist_sync_on_step=True)
self.loss = nn.BCELoss()
def forward(self, samples: dict):
samples, labels = self.sparse_arch(samples)
dense_input = torch.cat(
[
samples["dense1"],
samples["dense2"],
samples["sparse1"],
samples["sparse2"],
samples["sparse3"],
samples["sparse4"],
samples["sparse5"],
samples["sparse1_x_sparse2"],
],
-1,
)
logits = self.dense_arch(dense_input)
# 计算损失
loss = self.loss(logits.squeeze(), labels.squeeze())
# 更新AUC指标
self.auc_metric.update(logits.squeeze(), labels.squeeze())
auc_score = self.auc_metric.compute()
# 添加指标到训练框架
add_metric("auc", auc_score)
add_metric("loss", loss)
return loss
训练入口
定义训练流程
import os
import torch
from torch.optim import AdamW
from recis.framework.trainer import Trainer, TrainingArguments
from recis.nn.modules.hashtable import HashTable, filter_out_sparse_param
from recis.optim import SparseAdamWTF
from recis.utils.logger import Logger
logger = Logger(__name__)
def train():
deepctr_model = DeepCTR()
# get dataset
train_dataset = get_dataset(
io_args=IOArgs(
data_paths="./fake_data/",
batch_size=1024,
thread_num=1,
prefetch=1,
drop_remainder=True,
),
)
logger.info(str(deepctr_model))
sparse_params = filter_out_sparse_param(deepctr_model)
sparse_optim = SparseAdamWTF(sparse_params, lr=0.001)
opt = AdamW(params=deepctr_model.parameters(), lr=0.001)
train_config = TrainingArguments(
gradient_accumulation_steps=1,
output_dir="./ckpt/",
model_bank=None,
log_steps=10,
train_steps=100,
train_epoch=1,
eval_steps=None,
save_steps=1000,
max_to_keep=3,
save_concurrency_per_rank=2,
)
deepctr_model = deepctr_model.cuda()
trainer = Trainer(
model=deepctr_model,
args=train_config,
train_dataset=train_dataset,
dense_optimizers=(opt, None),
sparse_optimizer=sparse_optim,
data_to_cuda=False,
)
trainer.train()
设定并发参数(可选)
import os
from multiprocessing import cpu_count
def set_num_threads():
cpu_num = cpu_count() // 16
os.environ["OMP_NUM_THREADS"] = str(cpu_num)
os.environ["OPENBLAS_NUM_THREADS"] = str(cpu_num)
os.environ["MKL_NUM_THREADS"] = str(cpu_num)
os.environ["VECLIB_MAXIMUM_THREADS"] = str(cpu_num)
os.environ["NUMEXPR_NUM_THREADS"] = str(cpu_num)
torch.set_num_interop_threads(cpu_num)
torch.set_num_threads(cpu_num)
# set device for local run
torch.cuda.set_device(int(os.getenv("RANK", "-1")))
设定随机种子(可选)
import numpy as np
import random
def set_seed(seed):
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed) # For multi-GPU setups
np.random.seed(seed)
random.seed(seed)
主脚本
import torch.distributed as dist
if __name__ == "__main__":
set_num_threads()
set_seed(42)
# 创建通信组
dist.init_process_group()
train()
开始训练
export PYTHONPATH=$PWD
MASTER_PORT=12455
# 分布式规模
WORLD_SIZE=2
# 入口脚本
ENTRY=deepctr.py
torchrun --nproc_per_node=$WORLD_SIZE --master_port=$MASTER_PORT $ENTRY