CTR Example Model

This chapter introduces the basic usage methods of RecIS through a simple CTR model, helping beginners get started quickly and learn more feature processing usage. For specific training code, please refer to the examples/deepctr directory

Data Construction

Import Required Modules

import os
import random
import string

import numpy as np
import pyarrow as pa
import pyarrow.orc as orc

Generate Data

# Data output directory, data information definition
file_dir = "./fake_data/"
os.makedirs(file_dir, exist_ok=True)
bs = 2047
file_num = 10

dense1 = np.random.rand(bs, 8)
dense2 = np.random.rand(bs, 1)
label = np.floor(np.random.rand(bs, 1) + 0.5, dtype=np.float32)
sparse1 = np.arange(bs, dtype=np.int64).reshape(bs, 1)
sparse2 = np.arange(bs, dtype=np.int64).reshape(bs, 1)
sparse3 = np.arange(bs, dtype=np.int64).reshape(bs, 1)

# Generate long sequence features
long_int_seq = []
for i in range(bs):
    seq_len = np.random.randint(1, 2000, dtype=np.int64)
    sequence = np.random.randint(0, 1000000, size=seq_len, dtype=np.int64).tolist()
    long_int_seq.append(sequence)

def generate_random_string(length):
    characters = string.ascii_letters + string.digits
    return ''.join(random.choices(characters, k=length))

# Generate long string sequence features
strs = []
for i in range(1000):
    strs.append(generate_random_string(10))
long_str_seq = []
for i in range(bs):
    seq_len = np.random.randint(1, 2000, dtype=np.int64)
    sequence = random.choices(strs, k=seq_len)
    long_str_seq.append(sequence)

data = {
    "label": label.tolist(),
    "dense1": dense1.tolist(),
    "dense2": dense2.tolist(),
    "sparse1": sparse1.tolist(),
    "sparse2": sparse2.tolist(),
    "sparse3": sparse3.tolist(),
    "sparse4": long_int_seq,
    "sparse5": long_str_seq,
}

table = pa.Table.from_pydict(data)
# Generate data
for i in range(file_num):
    orc.write_table(table, os.path.join(file_dir, "data_{}.orc".format(i)))

Data Definition

Define IO Parameters

from dataclasses import dataclass
@dataclass
class IOArgs:
    data_paths: str
    batch_size: int
    # Concurrency for data reading
    thread_num: int
    # Data prefetch quantity
    prefetch: int
    drop_remainder: bool

Build Dataset

import os
import torch

import recis
from recis.io.orc_dataset import OrcDataset

def get_dataset(io_args):
    # Get current rank id and rank num in distributed mode for data parallelism
    worker_idx = int(os.environ.get("RANK", 0))
    worker_num = int(os.environ.get("WORLD_SIZE", 1))
    dataset = OrcDataset(
        io_args.batch_size,
        worker_idx=worker_idx,
        worker_num=worker_num,
        read_threads_num=io_args.thread_num,
        prefetch=io_args.prefetch,
        is_compressed=False,
        drop_remainder=io_args.drop_remainder,
        # Data preprocessing
        transform_fn=[lambda x: x[0]],
        dtype=torch.float32,
        # Batch packaging results directly placed on cuda
        device="cuda",
        save_interval=None,
    )
    data_paths = io_args.data_paths.split(",")
    for path in data_paths:
        dataset.add_path(path)
    # Set feature columns to read
    # Read fixed-length features and default values
    dataset.fixedlen_feature("label", [0.0])
    dataset.fixedlen_feature("dense1", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
    dataset.fixedlen_feature("dense2", [0.0])
    # Read variable-length features
    dataset.varlen_feature("sparse1")
    dataset.varlen_feature("sparse2")
    dataset.varlen_feature("sparse3")
    dataset.varlen_feature("sparse4")
    # sparse5 is a string sequence that needs hash processing.
    # You can perform hash operations in the dataset by setting hash_type="farm",
    # or by setting hash_type=None and trans_int8=True to read strings
    # as int8 byte streams, then perform hashing through HashOp later.
    dataset.varlen_feature("sparse5", hash_type=None, trans_int8=True)
    return dataset

Feature Processing Configuration

from recis.features.feature import Feature
from recis.features.op import (
    Bucketize,
    SelectField,
    SelectFields,
    FeatureCross,
    SequenceTruncate,
    Mod,
)
def get_feature_conf():
    feature_confs = []
    # dense1 feature read directly, dim is 8
    feature_confs.append(Feature("dense1").add_op(SelectField("dense1", dim=8)))
    # dense2 feature, dim is 1, needs bucketing transformation
    feature_confs.append(
        Feature("dense2")
        .add_op(SelectField("dense2", dim=1))
        .add_op(Bucketize([0, 0.5, 1]))
    )
    # sparse1 / sparse2 features, read directly
    feature_confs.append(Feature("sparse1").add_op(SelectField("sparse1")))
    feature_confs.append(Feature("sparse2").add_op(SelectField("sparse2")))
    # sparse3 feature, perform modulo 10000 calculation
    feature_confs.append(
        Feature("sparse3").add_op(SelectField("sparse3")).add_op(Mod(10000))
    )
    # sparse4 feature, perform modulo calculation and truncation
    feature_confs.append(
        Feature("sparse4")
            .add_op(SelectField("sparse4"))
            .add_op(Mod(10000))
            .add_op(SequenceTruncate(seq_len=1000,
                                    truncate=True,
                                    truncate_side="right",
                                    check_length=True,
                                    n_dims=2))
    )
    # sparse5 feature, perform hash, modulo and truncation
    feature_confs.append(
        Feature("sparse5")
            .add_op(SelectField("sparse5"))
            .add_op(Hash(hash_type="farm"))
            .add_op(Mod(10000))
            .add_op(SequenceTruncate(seq_len=1000,
                                    truncate=True,
                                    truncate_side="right",
                                    check_length=True,
                                    n_dims=2))
    )
    # sparse1_x_sparse2 feature, perform feature crossing
    feature_confs.append(
        Feature("sparse1_x_sparse2")
        .add_op(SelectFields([SelectField("sparse1"), SelectField("sparse2")]))
        .add_op(FeatureCross())
        .add_op(Mod(1000))
    )
    return feature_confs

Embedding Configuration

from recis.nn.initializers import Initializer, TruncNormalInitializer
from recis.nn.modules.embedding import EmbeddingOption
def get_embedding_conf():
    emb_conf = {}
    # dense2 feature looks up emb table with dim=8, name=sparse1
    emb_conf["dense2"] = EmbeddingOption(
        embedding_dim=8,
        shared_name="sparse1",
        combiner="sum",
        initializer=TruncNormalInitializer(mean=0, std=0.01),
        device=torch.device("cuda"),
    )
    # sparse1 feature looks up emb table with dim=8, name=sparse1 (shares the same emb table with dense2)
    emb_conf["sparse1"] = EmbeddingOption(
        embedding_dim=8,
        shared_name="sparse1",
        combiner="sum",
        initializer=TruncNormalInitializer(mean=0, std=0.01),
        device=torch.device("cuda"),
    )
    # sparse2 feature looks up emb table with dim=16, name=sparse2
    emb_conf["sparse2"] = EmbeddingOption(
        embedding_dim=16,
        shared_name="sparse2",
        combiner="sum",
        initializer=TruncNormalInitializer(mean=0, std=0.01),
        device=torch.device("cuda"),
    )
    # sparse3 feature looks up emb table with dim=8, name=sparse3
    emb_conf["sparse3"] = EmbeddingOption(
        embedding_dim=8,
        shared_name="sparse3",
        combiner="sum",
        initializer=TruncNormalInitializer(mean=0, std=0.01),
        device=torch.device("cuda"),
    )
    emb_conf["sparse4"] = EmbeddingOption(
        embedding_dim=16,
        shared_name="sparse4",
        combiner="tile",
        initializer=TruncNormalInitializer(mean=0, std=0.01),
        device=torch.device("cuda"),
        combiner_kwargs={"tile_len": 1000}
    )
    emb_conf["sparse5"] = EmbeddingOption(
        embedding_dim=16,
        shared_name="sparse5",
        combiner="tile",
        initializer=TruncNormalInitializer(mean=0, std=0.01),
        device=torch.device("cuda"),
        combiner_kwargs={"tile_len": 1000}
    )
    emb_conf["sparse1_x_sparse2"] = EmbeddingOption(
        embedding_dim=16,
        shared_name="sparse1_x_sparse2",
        combiner="mean",
        initializer=TruncNormalInitializer(mean=0, std=0.01),
        device=torch.device("cuda"),
    )
    return emb_conf

Model definition

Define sparse model

import torch
import torch.nn as nn

from recis.features.feature_engine import FeatureEngine
from recis.nn import EmbeddingEngine

class SparseModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.feature_engine = FeatureEngine(feature_list=get_feature_conf())
        self.embedding_engine = EmbeddingEngine(get_embedding_conf())

    def forward(self, samples: dict):
        samples = self.feature_engine(samples)
        samples = self.embedding_engine(samples)
        labels = samples.pop("label")
        return samples, labels

Define dense model

class DenseModel(nn.Module):
    def __init__(self):
        super().__init__()
        layers = []
        layers.extend([nn.Linear(8 + 8 + 8 + 16 + 8 + 16000 + 16000 + 16, 128), nn.ReLU()])
        layers.extend([nn.Linear(128, 64), nn.ReLU()])
        layers.extend([nn.Linear(64, 32), nn.ReLU()])
        layers.extend([nn.Linear(32, 1)])
        self.dnn = nn.Sequential(*layers)

    def forward(self, x):
        x = self.dnn(x)
        logits = torch.sigmoid(x)
        return logits

Define whole model

from recis.framework.metrics import add_metric
from recis.metrics.auroc import AUROC

class DeepCTR(nn.Module):
    def __init__(self):
        super().__init__()
        self.sparse_arch = SparseModel()
        self.dense_arch = DenseModel()
        self.auc_metric = AUROC(num_thresholds=200, dist_sync_on_step=True)
        self.loss = nn.BCELoss()

    def forward(self, samples: dict):
        samples, labels = self.sparse_arch(samples)
        dense_input = torch.cat(
            [
                samples["dense1"],
                samples["dense2"],
                samples["sparse1"],
                samples["sparse2"],
                samples["sparse3"],
                samples["sparse4"],
                samples["sparse5"],
                samples["sparse1_x_sparse2"],
            ],
            -1,
        )
        logits = self.dense_arch(dense_input)

        loss = self.loss(logits.squeeze(), labels.squeeze())

        self.auc_metric.update(logits.squeeze(), labels.squeeze())
        auc_score = self.auc_metric.compute()

        add_metric("auc", auc_score)
        add_metric("loss", loss)

        return loss

Training

Define training process

import os
import torch
from torch.optim import AdamW

from recis.framework.trainer import Trainer, TrainingArguments
from recis.nn.modules.hashtable import HashTable, filter_out_sparse_param
from recis.optim import SparseAdamWTF
from recis.utils.logger import Logger

logger = Logger(__name__)

def train():
    deepctr_model = DeepCTR()
    # get dataset
    train_dataset = get_dataset(
        io_args=IOArgs(
            data_paths="./fake_data/",
            batch_size=1024,
            thread_num=1,
            prefetch=1,
            drop_remainder=True,
        ),
    )
    logger.info(str(deepctr_model))
    sparse_params = filter_out_sparse_param(deepctr_model)

    sparse_optim = SparseAdamWTF(sparse_params, lr=0.001)
    opt = AdamW(params=deepctr_model.parameters(), lr=0.001)

    train_config = TrainingArguments(
        gradient_accumulation_steps=1,
        output_dir="./ckpt/",
        model_bank=None,
        log_steps=10,
        train_steps=100,
        train_epoch=1,
        eval_steps=None,
        save_steps=1000,
        max_to_keep=3,
        save_concurrency_per_rank=2,
    )

    deepctr_model = deepctr_model.cuda()
    trainer = Trainer(
        model=deepctr_model,
        args=train_config,
        train_dataset=train_dataset,
        dense_optimizers=(opt, None),
        sparse_optimizer=sparse_optim,
        data_to_cuda=False,
    )
    trainer.train()

Set parallelism parameters (optional)

import os
from multiprocessing import cpu_count

def set_num_threads():
    cpu_num = cpu_count() // 16
    os.environ["OMP_NUM_THREADS"] = str(cpu_num)
    os.environ["OPENBLAS_NUM_THREADS"] = str(cpu_num)
    os.environ["MKL_NUM_THREADS"] = str(cpu_num)
    os.environ["VECLIB_MAXIMUM_THREADS"] = str(cpu_num)
    os.environ["NUMEXPR_NUM_THREADS"] = str(cpu_num)
    torch.set_num_interop_threads(cpu_num)
    torch.set_num_threads(cpu_num)
    # set device for local run
    torch.cuda.set_device(int(os.getenv("RANK", "-1")))

Set random seed (optional)

import numpy as np
import random

def set_seed(seed):
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)  # For multi-GPU setups
    np.random.seed(seed)
    random.seed(seed)

Main entry

import torch.distributed as dist
if __name__ == "__main__":
    set_num_threads()
    set_seed(42)
    dist.init_process_group()
    train()

Start training

export PYTHONPATH=$PWD
MASTER_PORT=12455
WORLD_SIZE=2
ENTRY=deepctr.py

torchrun --nproc_per_node=$WORLD_SIZE --master_port=$MASTER_PORT $ENTRY