基础使用

本章节介绍 RecIS 的基础使用方法,帮助初学者快速上手。

环境准备

在开始之前,确保已经正确安装了 recis 以及数据拓展 column-io:

import torch
import recis
import column_io

基础概念

FeatureEngine

特征处理引擎,支持复杂的特征变换

HashTable

稀疏参数存储的核心组件

DynamicEmbedding

动态扩张的 embedding 表,支持稀疏参数的自动管理

EmbeddingOption

Embedding表配置选项

EmbeddingEngine

Embedding表管理引擎,管理多张DynamicEmbedding,并提供稀疏合并等优化策略

SparseOptimizer

专为稀疏参数设计的优化器

Trainer

训练管理器

数据处理

数据转换工具

将CSV文件转换成ORC格式数据

from recis.nn import DynamicEmbedding, EmbeddingOption

读取数据示例

import os
from recis.io.orc_dataset import OrcDataset

worker_idx = int(os.environ.get("RANK", 0))
worker_num = int(os.environ.get("WORLD_SIZE", 1))
dataset = OrcDataset(
    1024, # batch size
    worker_idx=worker_idx,
    worker_num=worker_num,
    read_threads_num=2, # 读取数据线程数
    prefetch=1, # 预取数据个数
    is_compressed=False,
    drop_remainder=True, # 删除不满batch的数据
    transform_fn=[lambda x: x[0]],
    dtype=torch.float32,
    device="cuda", # dataset数据结果直接输出到cuda上
    save_interval=None,
)
data_paths = ["./data_dir/"]
for path in data_paths:
    dataset.add_path(path)
# 读取定长特征
dataset.fixedlen_feature("label", [0.0])
# 读取变长特征
dataset.varlen_feature("user_id")
dataset.varlen_feature("item_id")

# 构建读取数据
iter = iter(dataset)
data = next(iter)

特征工程

基础特征处理

from recis.features import FeatureEngine
from recis.features.feature import Feature
from recis.features.op import SelectField, Hash, Bucketize

# 定义特征处理流水线
features = [
    # 用户 ID 哈希
    Feature(
        name="user_id",
        ops=[
            SelectField("user_id"),
            Hash(bucket_size=100000)
        ]
    ),

    # 商品 ID 哈希
    Feature(
        name="item_id",
        ops=[
            SelectField("item_id"),
            Hash(bucket_size=50000)
        ]
    ),

    # 年龄分桶
    Feature(
        name="age_bucket",
        ops=[
            SelectField("age"),
            Bucketize(boundaries=[18, 25, 35, 45, 55, 65])
        ]
    )
]

# 创建特征引擎
feature_engine = FeatureEngine(features)

# 处理数据
input_data = {
    'user_id': torch.LongTensor([[1], [2], [3]]),
    'item_id': torch.LongTensor([[101], [102], [103]]),
    'age': torch.FloatTensor([[25], [35], [45]])
}

processed_data = feature_engine(input_data)

print("原始数据:", input_data)
print("处理后数据:", processed_data)

稀疏Embedding表

构建Embedding表

创建第一个 Embedding

from recis.nn import DynamicEmbedding, EmbeddingOption

# 配置 embedding 选项
emb_opt = EmbeddingOption(
    embedding_dim=64,
    shared_name="my_embedding",
    combiner="sum"
)

# 创建动态 embedding
embedding = DynamicEmbedding(emb_opt)

# 使用 embedding
ids = torch.LongTensor([[1], [2], [3], [100], [1000]])
emb_output = embedding(ids)

print(f"输入 ID: {ids}")
print(f"Embedding 输出形状: {emb_output.shape}")
print(f"Embedding 输出: {emb_output}")

使用EmbeddingEngine管理优化Embedding表

from recis.nn import EmbeddingEngine, EmbeddingOption

# 配置 embedding 选项
user_emb_opt = EmbeddingOption(
    embedding_dim=64,
    shared_name="user_emb",
    combiner="sum"
)
id_emb_opt = EmbeddingOption(
    embedding_dim=64,
    shared_name="id_emb",
    combiner="sum"
)

# 创建动态 embedding
embedding = EmbeddingEngine(
    {"user_emb": user_emb_opt, "item_emb": id_emb_opt}
)

# 使用 embedding
user_ids = torch.LongTensor([[1], [2], [3], [100], [1000]])
item_ids = torch.LongTensor([[11], [22], [33], [111], [1111]])
emb_output = embedding({"user_emb": user_ids, "item_emb": item_ids})

print(f"Embedding 输出: {emb_output}")

构建稀疏参数优化器

from recis.optim import SparseAdamW
from recis.nn.modules.hashtable import filter_out_sparse_param

# 创建一个简单模型
class SimpleModel(torch.nn.Module):
    def __init__(self):
        super().__init__()
        emb_opt = EmbeddingOption(embedding_dim=32)
        self.embedding = DynamicEmbedding(emb_opt)
        self.linear = torch.nn.Linear(32, 1)

    def forward(self, ids):
        emb = self.embedding(ids)
        return self.linear(emb)

model = SimpleModel()

# 分离稀疏和稠密参数
sparse_params = filter_out_sparse_param(model)

print("稀疏参数:", list(sparse_params.keys()))

# 创建优化器
sparse_optimizer = SparseAdamW(sparse_params, lr=0.001)
dense_optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)

使用Trainer进行训练

简单训练

# 构建模型
# model = ...
# 构建数据
# dataset = ...
# 定义优化器
# sparse_params = filter_out_sparse_param(model)

# sparse_optim = SparseAdamWTF(sparse_params, lr=0.001)
# opt = AdamW(params=model.parameters(), lr=0.001)

train_config = TrainingArguments(
    gradient_accumulation_steps=1,
    output_dir="./ckpt/",
    log_steps=10,
    train_steps=100, # 只训练100step
)

trainer = Trainer(
    model=model,
    args=train_config,
    train_dataset=dataset,
    dense_optimizers=(opt, None),
    sparse_optimizer=sparse_optim,
)
trainer.train()

高阶训练拓展

1. 边训练边测试

# 构建模型
# model = ...
# 构建数据
# dataset = ...
# 定义优化器
# sparse_params = filter_out_sparse_param(deepctrmodel_model.state_dict())

# sparse_optim = SparseAdamWTF(sparse_params, lr=0.001)
# opt = AdamW(params=model.parameters(), lr=0.001)

train_config = TrainingArguments(
    gradient_accumulation_steps=1,
    output_dir="./ckpt/",
    log_steps=10,
    train_steps=100,
    eval_steps=10,
)

trainer = Trainer(
    model=model,
    args=train_config,
    train_dataset=dataset,
    dense_optimizers=(opt, None),
    sparse_optimizer=sparse_optim,
)
# 训练100step,测试10step,这样的循环模式重复10次
trainer.train_and_evaluate(10)

2. 自定义训练流程

from framework.trainer import Trainer
class MyTrainer(Trainer):
     def _train_step(self, data, epoch, metrics):
        self.dense_optimizer.zero_grad()
        if self.sparse_optimizer is not None:
            self.sparse_optimizer.zero_grad()
        loss = self.model(data)
        metrics.update(epoch=epoch)
        metrics.update(loss=loss)
        metrics.update(get_global_metrics())
        loss.backward()
        self.dense_optimizer.step()
        if self.sparse_optimizer is not None:
            self.sparse_optimizer.step()
        if self.dense_lr_scheduler is not None:
            self.dense_lr_scheduler.step()

3. 自定义Saver保存信息

from framework.trainer import Trainer
from framework.checkpoint_manager import Saver

class MySaver(Saver):
    def save_dense_params(self, ckpt_path: str, dense_state_dict: OrderedDict):
        pt_file = os.path.join(ckpt_path, "model.pt")
        with fs.open(pt_file, "wb") as f:
            torch.save(dense_state_dict, f=f)

class MyTrainer(Trainer):
     def build_saver(self, model, args):
        saver = MySaver(
            model,
            self.sparse_optimizer,
            output_dir=args.output_dir,
            max_keep=args.max_to_keep,
            concurrency=args.save_concurrency_per_rank,
        )
        return saver

评估指标使用

基础指标计算

from recis.metrics import AUROC

# 创建 AUC 指标
auc_metric = AUROC(num_thresholds=200)

# 模拟预测和标签
predictions = torch.rand(1000)  # 随机预测值 [0, 1]
labels = torch.randint(0, 2, (1000,))  # 随机标签 {0, 1}

# 更新指标
auc_metric.update(predictions, labels)

# 计算 AUC
auc_score = auc_metric.compute()
print(f"AUC Score: {auc_score:.4f}")

# 重置指标
auc_metric.reset()

在训练中使用指标

from recis.framework.metrics import add_metric

class ModelWithMetrics(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.embedding = DynamicEmbedding(EmbeddingOption(embedding_dim=32))
        self.linear = torch.nn.Linear(32, 1)
        self.auc_metric = AUROC(num_thresholds=200)
        self.loss_fn = torch.nn.BCEWithLogitsLoss()

    def forward(self, batch):
        ids = batch['ids']
        labels = batch['labels']

        # 模型预测
        emb = self.embedding(ids)
        logits = self.linear(emb).squeeze()

        # 计算损失
        loss = self.loss_fn(logits, labels)

        # 更新指标
        probs = torch.sigmoid(logits)
        self.auc_metric.update(probs, labels.long())
        auc = self.auc_metric.compute()

        # 添加指标到训练框架
        add_metric("auc", auc)
        add_metric("loss", loss)

        return loss

# 使用带指标的模型
model_with_metrics = ModelWithMetrics()

下一步

完成基础使用后,您可以:

  1. 学习 DeepFM示例模型 了解DeepFM完整模型实现

  2. 学习 Seq2Seq示例模型 了解SEq2Seq完整模型实现

  3. 学习 CTR示例模型 了解更多特征转换场景

  4. 参考 API Documentation 深入了解 API 详情

如果遇到问题,请查看 常见问题 或寻求社区帮助。