基础使用
本章节介绍 RecIS 的基础使用方法,帮助初学者快速上手。
环境准备
在开始之前,确保已经正确安装了 recis 以及数据拓展 column-io:
import torch
import recis
import column_io
基础概念
- FeatureEngine
特征处理引擎,支持复杂的特征变换
- HashTable
稀疏参数存储的核心组件
- DynamicEmbedding
动态扩张的 embedding 表,支持稀疏参数的自动管理
- EmbeddingOption
Embedding表配置选项
- EmbeddingEngine
Embedding表管理引擎,管理多张DynamicEmbedding,并提供稀疏合并等优化策略
- SparseOptimizer
专为稀疏参数设计的优化器
- Trainer
训练管理器
数据处理
数据转换工具
将CSV文件转换成ORC格式数据
from recis.nn import DynamicEmbedding, EmbeddingOption
读取数据示例
import os
from recis.io.orc_dataset import OrcDataset
worker_idx = int(os.environ.get("RANK", 0))
worker_num = int(os.environ.get("WORLD_SIZE", 1))
dataset = OrcDataset(
1024, # batch size
worker_idx=worker_idx,
worker_num=worker_num,
read_threads_num=2, # 读取数据线程数
prefetch=1, # 预取数据个数
is_compressed=False,
drop_remainder=True, # 删除不满batch的数据
transform_fn=[lambda x: x[0]],
dtype=torch.float32,
device="cuda", # dataset数据结果直接输出到cuda上
save_interval=None,
)
data_paths = ["./data_dir/"]
for path in data_paths:
dataset.add_path(path)
# 读取定长特征
dataset.fixedlen_feature("label", [0.0])
# 读取变长特征
dataset.varlen_feature("user_id")
dataset.varlen_feature("item_id")
# 构建读取数据
iter = iter(dataset)
data = next(iter)
特征工程
基础特征处理
from recis.features import FeatureEngine
from recis.features.feature import Feature
from recis.features.op import SelectField, Hash, Bucketize
# 定义特征处理流水线
features = [
# 用户 ID 哈希
Feature(
name="user_id",
ops=[
SelectField("user_id"),
Hash(bucket_size=100000)
]
),
# 商品 ID 哈希
Feature(
name="item_id",
ops=[
SelectField("item_id"),
Hash(bucket_size=50000)
]
),
# 年龄分桶
Feature(
name="age_bucket",
ops=[
SelectField("age"),
Bucketize(boundaries=[18, 25, 35, 45, 55, 65])
]
)
]
# 创建特征引擎
feature_engine = FeatureEngine(features)
# 处理数据
input_data = {
'user_id': torch.LongTensor([[1], [2], [3]]),
'item_id': torch.LongTensor([[101], [102], [103]]),
'age': torch.FloatTensor([[25], [35], [45]])
}
processed_data = feature_engine(input_data)
print("原始数据:", input_data)
print("处理后数据:", processed_data)
稀疏Embedding表
构建Embedding表
创建第一个 Embedding
from recis.nn import DynamicEmbedding, EmbeddingOption
# 配置 embedding 选项
emb_opt = EmbeddingOption(
embedding_dim=64,
shared_name="my_embedding",
combiner="sum"
)
# 创建动态 embedding
embedding = DynamicEmbedding(emb_opt)
# 使用 embedding
ids = torch.LongTensor([[1], [2], [3], [100], [1000]])
emb_output = embedding(ids)
print(f"输入 ID: {ids}")
print(f"Embedding 输出形状: {emb_output.shape}")
print(f"Embedding 输出: {emb_output}")
使用EmbeddingEngine管理优化Embedding表
from recis.nn import EmbeddingEngine, EmbeddingOption
# 配置 embedding 选项
user_emb_opt = EmbeddingOption(
embedding_dim=64,
shared_name="user_emb",
combiner="sum"
)
id_emb_opt = EmbeddingOption(
embedding_dim=64,
shared_name="id_emb",
combiner="sum"
)
# 创建动态 embedding
embedding = EmbeddingEngine(
{"user_emb": user_emb_opt, "item_emb": id_emb_opt}
)
# 使用 embedding
user_ids = torch.LongTensor([[1], [2], [3], [100], [1000]])
item_ids = torch.LongTensor([[11], [22], [33], [111], [1111]])
emb_output = embedding({"user_emb": user_ids, "item_emb": item_ids})
print(f"Embedding 输出: {emb_output}")
构建稀疏参数优化器
from recis.optim import SparseAdamW
from recis.nn.modules.hashtable import filter_out_sparse_param
# 创建一个简单模型
class SimpleModel(torch.nn.Module):
def __init__(self):
super().__init__()
emb_opt = EmbeddingOption(embedding_dim=32)
self.embedding = DynamicEmbedding(emb_opt)
self.linear = torch.nn.Linear(32, 1)
def forward(self, ids):
emb = self.embedding(ids)
return self.linear(emb)
model = SimpleModel()
# 分离稀疏和稠密参数
sparse_params = filter_out_sparse_param(model)
print("稀疏参数:", list(sparse_params.keys()))
# 创建优化器
sparse_optimizer = SparseAdamW(sparse_params, lr=0.001)
dense_optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)
使用Trainer进行训练
简单训练
# 构建模型
# model = ...
# 构建数据
# dataset = ...
# 定义优化器
# sparse_params = filter_out_sparse_param(model)
# sparse_optim = SparseAdamWTF(sparse_params, lr=0.001)
# opt = AdamW(params=model.parameters(), lr=0.001)
train_config = TrainingArguments(
gradient_accumulation_steps=1,
output_dir="./ckpt/",
log_steps=10,
train_steps=100, # 只训练100step
)
trainer = Trainer(
model=model,
args=train_config,
train_dataset=dataset,
dense_optimizers=(opt, None),
sparse_optimizer=sparse_optim,
)
trainer.train()
高阶训练拓展
1. 边训练边测试
# 构建模型
# model = ...
# 构建数据
# dataset = ...
# 定义优化器
# sparse_params = filter_out_sparse_param(deepctrmodel_model.state_dict())
# sparse_optim = SparseAdamWTF(sparse_params, lr=0.001)
# opt = AdamW(params=model.parameters(), lr=0.001)
train_config = TrainingArguments(
gradient_accumulation_steps=1,
output_dir="./ckpt/",
log_steps=10,
train_steps=100,
eval_steps=10,
)
trainer = Trainer(
model=model,
args=train_config,
train_dataset=dataset,
dense_optimizers=(opt, None),
sparse_optimizer=sparse_optim,
)
# 训练100step,测试10step,这样的循环模式重复10次
trainer.train_and_evaluate(10)
2. 自定义训练流程
from framework.trainer import Trainer
class MyTrainer(Trainer):
def _train_step(self, data, epoch, metrics):
self.dense_optimizer.zero_grad()
if self.sparse_optimizer is not None:
self.sparse_optimizer.zero_grad()
loss = self.model(data)
metrics.update(epoch=epoch)
metrics.update(loss=loss)
metrics.update(get_global_metrics())
loss.backward()
self.dense_optimizer.step()
if self.sparse_optimizer is not None:
self.sparse_optimizer.step()
if self.dense_lr_scheduler is not None:
self.dense_lr_scheduler.step()
3. 自定义Saver保存信息
from framework.trainer import Trainer
from framework.checkpoint_manager import Saver
class MySaver(Saver):
def save_dense_params(self, ckpt_path: str, dense_state_dict: OrderedDict):
pt_file = os.path.join(ckpt_path, "model.pt")
with fs.open(pt_file, "wb") as f:
torch.save(dense_state_dict, f=f)
class MyTrainer(Trainer):
def build_saver(self, model, args):
saver = MySaver(
model,
self.sparse_optimizer,
output_dir=args.output_dir,
max_keep=args.max_to_keep,
concurrency=args.save_concurrency_per_rank,
)
return saver
评估指标使用
基础指标计算
from recis.metrics import AUROC
# 创建 AUC 指标
auc_metric = AUROC(num_thresholds=200)
# 模拟预测和标签
predictions = torch.rand(1000) # 随机预测值 [0, 1]
labels = torch.randint(0, 2, (1000,)) # 随机标签 {0, 1}
# 更新指标
auc_metric.update(predictions, labels)
# 计算 AUC
auc_score = auc_metric.compute()
print(f"AUC Score: {auc_score:.4f}")
# 重置指标
auc_metric.reset()
在训练中使用指标
from recis.framework.metrics import add_metric
class ModelWithMetrics(torch.nn.Module):
def __init__(self):
super().__init__()
self.embedding = DynamicEmbedding(EmbeddingOption(embedding_dim=32))
self.linear = torch.nn.Linear(32, 1)
self.auc_metric = AUROC(num_thresholds=200)
self.loss_fn = torch.nn.BCEWithLogitsLoss()
def forward(self, batch):
ids = batch['ids']
labels = batch['labels']
# 模型预测
emb = self.embedding(ids)
logits = self.linear(emb).squeeze()
# 计算损失
loss = self.loss_fn(logits, labels)
# 更新指标
probs = torch.sigmoid(logits)
self.auc_metric.update(probs, labels.long())
auc = self.auc_metric.compute()
# 添加指标到训练框架
add_metric("auc", auc)
add_metric("loss", loss)
return loss
# 使用带指标的模型
model_with_metrics = ModelWithMetrics()
下一步
完成基础使用后,您可以:
学习 DeepFM示例模型 了解DeepFM完整模型实现
学习 Seq2Seq示例模型 了解SEq2Seq完整模型实现
学习 CTR示例模型 了解更多特征转换场景
参考 API Documentation 深入了解 API 详情
如果遇到问题,请查看 常见问题 或寻求社区帮助。