Training Framework Module

RecIS’s training framework module provides comprehensive model training, evaluation, and management capabilities, simplifying the development workflow for deep learning models.

Core Components

TrainingArguments

class recis.framework.trainer.TrainingArguments(gradient_accumulation_steps: int = 1, output_dir: str = 'output_dir', model_bank: list | None = None, log_steps: int = 100, train_steps: int | None = None, train_epoch: int | None = 1, eval_steps: int | None = None, save_steps: int | None = 1000, max_to_keep: int = 5, save_concurrency_per_rank: int = 4, save_every_n_windows: int = 1, save_every_n_epochs: int | None = None, save_end: bool | None = True, load_update_steps: int | None = None, load_update_windows: int | None = 1, load_update_epochs: int | None = None, params_not_save: List[str] | None = None, save_filter_fn: Callable | None = None, saver_option: SaverOptions | None = None, ckpt_save_arg: CheckpointSaveArguments | None = None, ckpt_load_arg: CheckpointLoadArguments | None = None, mixed_precision: str | None = None, window_iter: int | None = None)[source]

Configuration class for training parameters.

This dataclass contains all the configuration parameters needed for training, including optimization settings, logging intervals, and checkpoint management.

gradient_accumulation_steps

Number of steps to accumulate gradients before performing an optimizer step. Defaults to 1.

Type:

int

output_dir

Directory where checkpoints and logs will be saved. Defaults to “output_dir”.

Type:

str

model_bank

List of model bank paths for initialization. Defaults to None.

Type:

Optional[list]

log_steps

Number of training steps between logging. Defaults to 100.

Type:

int

train_steps

Maximum number of training steps. If None, will train for full epochs. Defaults to None.

Type:

Optional[int]

train_epoch

Number of training epochs. Defaults to 1.

Type:

Optional[int]

eval_steps

Number of evaluation steps. If None, evaluates on full dataset. Defaults to None.

Type:

Optional[int]

save_steps

Number of steps between checkpoint saves. Defaults to 1000.

Type:

Optional[int]

max_to_keep

Maximum number of checkpoints to keep. Defaults to 5.

Type:

int

save_concurrency_per_rank

Number of concurrent save operations per rank. Defaults to 4.

Type:

int

save_every_n_windows

Number of io windows to save checkpoints. Defaults to 1.

Type:

Optional[int]

save_every_n_epochs

Number of epochs to save checkpoints. Defaults to None.

Type:

Optional[int]

save_end

Whether to save checkpoints at the end of training. Defaults to True.

Type:

bool

load_update_steps

Number of steps to load dynamic model bank. Defaults to None.

Type:

Optional[int]

load_update_windows

Number of window to load dynamic model bank. Defaults to 1.

Type:

Optional[int]

load_update_epochs

Number of epochs to load dynamic model bank. Defaults to None.

Type:

Optional[int]

params_not_save

Names of parameters not to save. Defaults to None.

Type:

Optional[list]

save_filter_fn

Function to filter checkpoint blocks. Defaults to None.

Type:

[Callable]

saver_option

Options for checkpoint saver. Defaults to None.

Type:

Optional[SaverOptions]

ckpt_save_arg

Arguments for checkpoint save. Defaults to None.

Type:

Optional[CheckpointSaveArguments]

ckpt_load_arg

Arguments for checkpoint load. Defaults to None.

Type:

Optional[CheckpointLoadArguments]

mixed_precision

Mixed precision training mode. Defaults to None. Only support “bf16” and “fp16”.

Type:

Optional[str]

Trainer

class recis.framework.trainer.Trainer(model: Module | None = None, args: TrainingArguments = None, train_dataset: Dataset | None = None, eval_dataset: Dataset | None = None, hooks: List[Hook] | None = None, dense_optimizers: Tuple[Optimizer, LambdaLR] = (None, None), sparse_optimizer: SparseOptimizer | None = None, data_to_cuda: bool = False, saver: Saver | None = None, **kwargs)[source]

Main training orchestrator with distributed training and checkpoint management.

The Trainer class provides a comprehensive training framework that handles: - Distributed training coordination using Accelerate - Automatic checkpoint saving and loading - Training and evaluation loops with metrics tracking - Hook system for extensible training workflows - Support for both dense and sparse optimizers

args

Training configuration parameters.

Type:

TrainingArguments

hooks

List of training hooks for extensibility.

Type:

List[Hook]

train_dataset

Training dataset.

Type:

Optional[Dataset]

eval_dataset

Evaluation dataset.

Type:

Optional[Dataset]

model

The model to train.

Type:

nn.Module

dense_optimizer

Dense parameter optimizer.

Type:

torch.optim.Optimizer

dense_lr_scheduler

Learning rate scheduler for dense optimizer.

sparse_optimizer

Sparse parameter optimizer.

Type:

Optional[sparse_optim.SparseOptimizer]

data_to_cuda

Whether to automatically move data to CUDA.

Type:

bool

accelerator

Accelerate instance for distributed training.

Type:

Accelerator

Example:

from recis.framework import Trainer, TrainingArguments
from recis.optim import SparseAdamW
from torch.optim import AdamW

# Set training arguments
training_args = TrainingArguments(
    output_dir="./checkpoints",
    train_steps=10000,
    eval_steps=1000,
    save_steps=2000,
    log_steps=100,
    gradient_accumulation_steps=4,
)

# split sparse params
from recis.nn.modules.hashtable import filter_out_sparse_param

sparse_params = filter_out_sparse_param(model)

# create optimizers
sparse_optimizer = SparseAdamW(sparse_params, lr=0.001)
dense_optimizer = AdamW(model.parameters(), lr=0.001)

# create trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    dense_optimizers=(dense_optimizer, None),
    sparse_optimizer=sparse_optimizer,
    data_to_cuda=True,
)

# train the model
trainer.train()
__init__(model: Module | None = None, args: TrainingArguments = None, train_dataset: Dataset | None = None, eval_dataset: Dataset | None = None, hooks: List[Hook] | None = None, dense_optimizers: Tuple[Optimizer, LambdaLR] = (None, None), sparse_optimizer: SparseOptimizer | None = None, data_to_cuda: bool = False, saver: Saver | None = None, **kwargs) None[source]

Initialize the Trainer with model, datasets, and training configuration.

Parameters:
  • model (Optional[nn.Module]) – The model to train.

  • args (TrainingArguments) – Training configuration. If None, uses default.

  • train_dataset (Optional[Dataset]) – Training dataset.

  • eval_dataset (Optional[Dataset]) – Evaluation dataset.

  • hooks (Optional[List[Hook]]) – List of training hooks for extensibility.

  • dense_optimizers (Tuple) – Tuple of (optimizer, lr_scheduler) for dense parameters.

  • sparse_optimizer (Optional[sparse_optim.SparseOptimizer]) – Optimizer for sparse parameters.

  • data_to_cuda (bool) – Whether to automatically move data to CUDA. Defaults to False.

  • **kwargs – Additional arguments passed to Accelerator.

add_hook(hook: Hook)[source]

Add a single hook to the trainer.

Parameters:

hook (Hook) – The hook to add.

add_hooks(hooks: List[Hook])[source]

Add multiple hooks to the trainer.

Parameters:

hooks (List[Hook]) – List of hooks to add.

evaluate(eval_steps=None)[source]

Execute the evaluation loop.

Parameters:

eval_steps (Optional[int]) – Override for number of evaluation steps. If None, evaluates on full dataset.

train(train_steps=None)[source]

Execute the training loop.

Parameters:

train_steps (Optional[int]) – Override for number of training steps. If None, uses args.train_steps.

train_and_evaluate(train_steps=None, eval_steps=None)[source]

Execute alternating training and evaluation loops.

Parameters:
  • train_steps (Optional[int]) – Override for number of training steps per epoch.

  • eval_steps (Optional[int]) – Override for number of evaluation steps.

Saver

class recis.framework.checkpoint_manager.Saver(options: SaverOptions)[source]

Checkpoint saver for managing model and training state persistence.

The Saver class handles the saving and loading of model checkpoints including: - Dense and sparse model parameters - Optimizer states - IO states for datasets - Checkpoint versioning and cleanup - Support for distributed filesystems

Example

>>> saver = Saver(
...     model=model,
...     sparse_optim=sparse_optimizer,
...     output_dir="./checkpoints",
...     max_keep=5,
... )
>>> saver.save("checkpoint_001")
__init__(options: SaverOptions)[source]

Initialize the checkpoint saver.

Parameters:
  • model (torch.nn.Module) – The model to save checkpoints for.

  • sparse_optim (Optional) – Sparse optimizer instance for sparse parameters.

  • output_dir (str) – Directory to save checkpoints. Defaults to “./”.

  • max_keep (int) – Maximum number of checkpoints to keep. Defaults to 1.

  • concurrency (int) – Number of concurrent save operations. Defaults to 4.

register_for_checkpointing(name, obj: object)[source]

Register an object for checkpointing.

Parameters:
  • name (str) – Name identifier for the checkpointed object.

  • obj (object) – Object to include in checkpoints.

Raises:

ValueError – If the name is already registered.

register_io_state(name, obj: object)[source]

Register an object for IO state persistence.

Parameters:
  • name (str) – Name identifier for the IO state.

  • obj (object) – Object that supports IO state dump/load operations.

Raises:

ValueError – If the name is already registered.

save(ckpt_id: str, label_key: str | None = None, label_value: str | None = None)[source]

Save a complete checkpoint with the given ID.

This method saves all registered components including model parameters, optimizer states, and IO states. It also handles checkpoint versioning and cleanup of old checkpoints.

Parameters:
  • ckpt_id (str) – Unique identifier for this checkpoint.

  • label_key (str) – Key for the label when saving to MOS. Defaults to None.

  • label_value (str) – Value for the label when saving to MOS. Defaults to None.

ModelBankParser

TODO(lanling.ljw)

class recis.framework.model_bank.ModelBankParser(output_dir: str, model_bank_content: list[Dict[str, Any]], model_names: set[str], sparse_model_names: set[str], sparse_tables: set[str], dense_model_names: set[str], extra_fields)[source]
__init__(output_dir: str, model_bank_content: list[Dict[str, Any]], model_names: set[str], sparse_model_names: set[str], sparse_tables: set[str], dense_model_names: set[str], extra_fields)[source]

Exporter

class recis.framework.exporter.Exporter(model, sparse_model_name, dense_model_name, dataset, ckpt_dir, export_dir, dense_optimizer=None, export_folder_name='fx_user_model', export_model_name='user_model', export_outputs=None, fg=None, fg_conf_or_path=None, mc_conf_or_path=None, filter_sparse_opt=False)[source]

Model exporter for RecIS framework with support for sparse and dense models.

The Exporter class handles the export process for trained RecIS models, managing both sparse embedding tables and dense neural network components. It supports distributed export across multiple workers and handles various storage backends including local filesystem and cloud storage.

Key Features:
  • Separate export of sparse and dense model components

  • Distributed export with automatic file partitioning

  • Support for multiple storage backends (local, cloud)

  • Configuration export for feature generation and model compilation

  • Automatic model preparation and state loading

rank

Current worker rank in distributed setup.

Type:

int

shard_num

Total number of workers in distributed setup.

Type:

int

model

Complete model containing both sparse and dense components.

sparse_model

Sparse embedding component of the model.

dense_model

Dense neural network component of the model.

dataset

Dataset used for model tracing during export.

ckpt_dir

Directory containing model checkpoints.

Type:

str

export_dir

Target directory for exported model files.

Type:

str

dense_optimizer

Optional optimizer for dense model components.

export_model_name

Name for the exported model.

Type:

str

export_outputs

Specification of model output nodes.

filter_sparse_opt

Whether to filter sparse optimization parameters.

Type:

bool

fg_conf

Feature generation configuration.

Type:

dict

mc_conf

Model compilation configuration.

Type:

dict

fx_tool

Tool for exporting TorchFX models.

Type:

ExportTorchFxTool

__init__(model, sparse_model_name, dense_model_name, dataset, ckpt_dir, export_dir, dense_optimizer=None, export_folder_name='fx_user_model', export_model_name='user_model', export_outputs=None, fg=None, fg_conf_or_path=None, mc_conf_or_path=None, filter_sparse_opt=False)[source]

Initialize the model exporter with configuration parameters.

Parameters:
  • model – Complete RecIS model containing sparse and dense components.

  • sparse_model_name (str) – Name of the sparse model submodule.

  • dense_model_name (str) – Name of the dense model submodule.

  • dataset – Dataset for model tracing during export process.

  • ckpt_dir (str) – Directory path containing model checkpoints.

  • export_dir (str) – Target directory for exported model files.

  • dense_optimizer – Optional optimizer for dense model components.

  • export_folder_name (str, optional) – Name of the export folder. Defaults to “fx_user_model”.

  • export_model_name (str, optional) – Name for the exported model. Defaults to “user_model”.

  • export_outputs – Specification of model output nodes for export.

  • fg – Feature generator instance for configuration extraction.

  • fg_conf_or_path – Feature generation configuration dict or file path.

  • mc_conf_or_path – Model compilation configuration dict or file path.

  • filter_sparse_opt (bool, optional) – Whether to filter sparse optimization parameters. Defaults to False.

Raises:
export()[source]

Execute the complete model export process.

This method orchestrates the entire export workflow, including model preparation, sparse component export, dense component export, and metadata export. The process is designed to work in distributed environments with automatic work partitioning.

The export process consists of:
  1. Model preparation and checkpoint loading

  2. Sparse model component export

  3. Dense model component export with TorchFX

  4. Configuration metadata export

Note

This method should be called on all workers in a distributed setup. File operations are automatically partitioned based on worker rank.

Advanced Usage

Custom Training Pipeline

from framework.trainer import Trainer
class MyTrainer(Trainer):
     def _train_step(self, data, epoch, metrics):
        self.dense_optimizer.zero_grad()
        if self.sparse_optimizer is not None:
            self.sparse_optimizer.zero_grad()
        loss = self.model(data)
        metrics.update(epoch=epoch)
        metrics.update(loss=loss)
        metrics.update(get_global_metrics())
        loss.backward()
        self.dense_optimizer.step()
        if self.sparse_optimizer is not None:
            self.sparse_optimizer.step()
        if self.dense_lr_scheduler is not None:
            self.dense_lr_scheduler.step()

Gradient Accumulation Training

# Configure gradient accumulation
training_args = TrainingArguments(
    output_dir="./output",
    train_steps=10000,
    gradient_accumulation_steps=8,  # Accumulate 8 steps before update
    log_steps=100
)

# Trainer will automatically handle gradient accumulation
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset
)