Seq2Seq示例模型

本章节使用 RecIS 训练 seq2seq 模型,帮助初学者熟悉 RecIS 的基础使用方法。相关代码在 examples/seq2seq 目录下。

数据构建

训练使用的 orc 数据已经准备好,在 examples/seq2seq/data 目录下。可以使用 python 的 pyarrow 库预览这些数据:

import pyarrow.orc as orc
file = "./data/part-0.orc"
table = orc.ORCFile(file).read()
print(table)

打印 table,可以发现这是一些序列特征数据,包含用户行为序列、商品ID、类别ID等信息。

数据定义

定义IO参数

@dataclass
class IOArgs:
    data_paths: str
    batch_size: int
    # 读取数据的并发数
    thread_num: int
    # 数据预取数量
    prefetch: int
    drop_remainder: bool

构建 dataset

def get_dataset(io_args):
    # 并行获取数据
    worker_idx = int(os.environ.get("RANK", 0))
    worker_num = int(os.environ.get("WORLD_SIZE", 1))
    dataset = OrcDataset(
        io_args.batch_size,
        worker_idx=worker_idx,
        worker_num=worker_num,
        read_threads_num=io_args.thread_num,
        prefetch=io_args.prefetch,
        is_compressed=False,
        drop_remainder=io_args.drop_remainder,
        transform_fn=[lambda x: x[0]],
        dtype=torch.float32,
        device="cuda",
        save_interval=1000,
    )
    data_paths = io_args.data_paths.split(",")
    for path in data_paths:
        dataset.add_path(path)

    # 读取变长特征
    for item in FEATURE_CONFIG:
        fn = item["name"]
        dataset.varlen_feature(
            fn, item.get("hash_type", None), item.get("hash_bucket_size", 0)
        )
    return dataset

特征处理配置

# 特征处理配置
def get_feature_conf():
    feature_confs = []
    for item in FEATURE_CONFIG:
        fn = item["name"]
        feature_confs.append(
            Feature(fn)
            .add_op(SelectField(fn))
            .add_op(
                SequenceTruncate(
                    seq_len=SEQ_LEN,
                    truncate=True,
                    truncate_side="right",
                    check_length=False,
                    n_dims=3,
                    dtype=torch.int64,
                )
            )
        )
    return feature_confs

其中 FEATURE_CONFIG 定义了特征的基本信息:

FEATURE_CONFIG = [
    {
        "name": "item_id",
        "emb_dim": 128,
        "hash_bucket_size": 2048000,
        "shard_name": "item_id",
    },
    {
        "name": "cate_id",
        "emb_dim": 128,
        "hash_bucket_size": 2048,
        "shard_name": "cate_id",
    },
    {
        "name": "behavior_id",
        "emb_dim": 128,
        "hash_type": "murmur",
        "hash_bucket_size": 0,
        "shard_name": "behavior_id",
    },
    {
        "name": "timestamp",
        "emb_dim": 128,
        "hash_type": "murmur",
        "hash_bucket_size": 2048000,
        "shard_name": "timestamp",
    },
]

Embedding 配置

def get_embedding_conf():
    emb_conf = {}
    for item in FEATURE_CONFIG:
        fn = item["name"]
        emb_dim = item.get("emb_dim", 0)
        shard_name = item.get("shard_name", fn)
        emb_conf[fn] = EmbeddingOption(
            embedding_dim=emb_dim,
            shared_name=shard_name,
            combiner="mean",
            initializer=TruncNormalInitializer(std=0.001),
            device=torch.device("cuda"),
        )
    return emb_conf

模型定义

定义稀疏部分模型

class SparseModel(nn.Module):
    def __init__(self):
        super().__init__()
        # 特征处理
        self.feature_engine = FeatureEngine(feature_list=get_feature_conf())
        # 计算特征的 embedding
        self.embedding_engine = EmbeddingEngine(get_embedding_conf())

    def forward(self, samples: dict):
        samples = self.feature_engine(samples)
        samples = self.embedding_engine(samples)
        return samples

定义Transformer编码器

class CasualMultiHeadAttention(nn.Module):
    def __init__(self, config: ModelConfig) -> None:
        super().__init__()
        self.attn = nn.Linear(config.hidden_size, 3 * config.hidden_size)
        self.proj = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.dropout)
        self.nhead = config.nhead
        self.d_head = config.hidden_size // self.nhead
        self.hidden_size = config.hidden_size

    def forward(self, x: torch.Tensor, attn_mask: torch.Tensor = None) -> torch.Tensor:
        batch, seq, _ = x.size()
        q, k, v = self.attn(x).split(self.hidden_size, dim=2)
        q = q.view(batch, seq, self.nhead, self.d_head).transpose(1, 2)
        k = k.view(batch, seq, self.nhead, self.d_head).transpose(1, 2)
        v = v.view(batch, seq, self.nhead, self.d_head).transpose(1, 2)

        attn_output = F.scaled_dot_product_attention(
            q, k, v, attn_mask=attn_mask, is_causal=True
        )

        attn_output = attn_output.transpose(1, 2).contiguous().view(*x.size())
        attn_output = self.dropout(attn_output)
        output = self.proj(attn_output)
        output = self.dropout(output)
        return output

class FeedForward(nn.Module):
    def __init__(self, config: ModelConfig):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(config.hidden_size, config.dim_feedforward),
            nn.GELU(),
            nn.Linear(config.dim_feedforward, config.hidden_size),
            nn.Dropout(config.dropout),
        )

    def forward(self, x: torch.Tensor):
        return self.net(x)

class TransformerEncoderLayer(nn.Module):
    def __init__(self, config: ModelConfig):
        super().__init__()
        self.norm1 = nn.LayerNorm(config.hidden_size)
        self.attn = CasualMultiHeadAttention(config)
        self.norm2 = nn.LayerNorm(config.hidden_size)
        self.ffn = FeedForward(config)

    def forward(self, x: torch.Tensor, attn_mask: torch.Tensor = None):
        x = x + self.attn(self.norm1(x), attn_mask=attn_mask)
        x = x + self.ffn(self.norm2(x))
        return x

class Transformer(nn.Module):
    def __init__(self, config: ModelConfig):
        super().__init__()
        self.layers = nn.ModuleList(
            [TransformerEncoderLayer(config) for _ in range(config.num_layers)]
        )
        self.final_layer_norm = nn.LayerNorm(config.hidden_size)
        self.seq_len = config.seq_len

    def forward(self, x: torch.Tensor, attn_mask: torch.Tensor = None):
        for layer in self.layers:
            x = layer(x, attn_mask=attn_mask)
        x = self.final_layer_norm(x)
        return x

定义解码器

class Decoder(nn.Module):
    def __init__(self, config: ModelConfig):
        super().__init__()
        self.proj = nn.Linear(config.emb_size, config.hidden_size)
        self.trans = Transformer(config)
        self.loss_fn = nn.CrossEntropyLoss()

    def cal_loss(self, preds: torch.Tensor, items: torch.Tensor):
        preds = preds[:, 1:, :]
        preds = preds.reshape(-1, preds.shape[-1])
        items = items[:, :-1, :]
        items = items.reshape(-1, items.shape[-1])
        preds = F.normalize(preds, p=2, dim=-1, eps=1e-6)
        items = F.normalize(items, p=2, dim=-1, eps=1e-6)
        labels = torch.arange(preds.shape[0], device=preds.device, dtype=torch.long)
        cos_sim = torch.matmul(preds, items.t())
        loss = self.loss_fn(cos_sim, labels)
        with torch.no_grad():
            hits = (cos_sim.detach().argmax(dim=1) == labels).sum()
        add_metric("hit_rate", hits / preds.shape[0])
        add_metric("loss", loss)
        return loss

    def forward(self, x: torch.Tensor, attn_mask: torch.Tensor = None):
        items = self.proj(x)
        preds = self.trans(items, attn_mask)
        if self.training:
            loss = self.cal_loss(preds, items)
            return loss
        else:
            return preds

定义完整模型

class Seq2SeqModel(nn.Module):
    def __init__(self, config: ModelConfig):
        super().__init__()
        self.dense = Decoder(config)
        self.sparse = SparseModel()
        self.casual_mask = (
            torch.tril(torch.ones(config.seq_len, config.seq_len))
            .view(1, 1, config.seq_len, config.seq_len)
            .cuda()
        )

    def build_embedding(self, samples: dict[torch.Tensor]):
        embs = []
        for item in FEATURE_CONFIG:
            fn = item["name"]
            embs.append(samples[fn])
        return torch.cat(embs, dim=-1)

    def cal_mask(self, seq_len):
        return self.casual_mask[:, :, :seq_len, :seq_len]

    def forward(self, samples: dict[torch.Tensor]):
        samples = self.sparse(samples)
        emb = self.build_embedding(samples)
        with torch.amp.autocast(dtype=torch.bfloat16, device_type="cuda"):
            return self.dense(emb, self.cal_mask(emb.shape[1]))

其中 ModelConfig 定义了模型的基本配置:

@dataclass
class ModelConfig:
    seq_len: int = 1024
    hidden_size: int = 1024
    num_layers: int = 8
    nhead: int = 8
    dim_feedforward: int = 1024
    dropout: float = 0.1
    emb_size: int = 512

训练入口

定义训练流程

首先获取数据集:

train_dataset = get_dataset(args.io_args)

然后创建模型,在分别创建稀疏模型和稠密模型的优化器:

model = Seq2SeqModel(args.model_config)
model = model.cuda()

# optimizer
sparse_param = filter_out_sparse_param(model)
dense_opt, sparse_opt = get_optimizer(model, args.lr.dense_lr, args.lr.sparse_lr)

其中 get_optimizer 函数定义如下:

def get_optimizer(model: nn.Module, dense_lr, sparse_lr):
    sparse_param = filter_out_sparse_param(model)
    dense_opt = AdamW(model.parameters(), lr=dense_lr)
    sparse_opt = SparseAdamW(sparse_param, lr=sparse_lr)
    return (dense_opt, sparse_opt)

最后创建训练流程:

trainer = Trainer(
    model=model,
    args=args.train_config,
    train_dataset=train_dataset,
    dense_optimizers=(dense_opt, None),
    sparse_optimizer=sparse_opt,
)

环境设置

设置分布式相关环境和随机种子:

def set_num_threads():
    cpu_num = cpu_count() // 16
    os.environ["OMP_NUM_THREADS"] = str(cpu_num)
    os.environ["OPENBLAS_NUM_THREADS"] = str(cpu_num)
    os.environ["MKL_NUM_THREADS"] = str(cpu_num)
    os.environ["VECLIB_MAXIMUM_THREADS"] = str(cpu_num)
    os.environ["NUMEXPR_NUM_THREADS"] = str(cpu_num)
    torch.set_num_interop_threads(cpu_num)
    torch.set_num_threads(cpu_num)
    # set device for local run
    torch.cuda.set_device(int(os.getenv("RANK", "-1")))

def set_seed(seed):
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)  # For multi-GPU setups
    np.random.seed(seed)
    random.seed(seed)

开始训练

通过 run.sh 脚本启动训练:bash run.sh 即可。