Training Framework Module
RecIS’s training framework module provides comprehensive model training, evaluation, and management capabilities, simplifying the development workflow for deep learning models.
Core Components
TrainingArguments
- class recis.framework.trainer.TrainingArguments(gradient_accumulation_steps: int = 1, output_dir: str = 'output_dir', model_bank: list | None = None, log_steps: int = 100, train_steps: int | None = None, train_epoch: int | None = 1, eval_steps: int | None = None, save_steps: int | None = 1000, max_to_keep: int = 5, save_concurrency_per_rank: int = 4, save_every_n_windows: int = 1, save_every_n_epochs: int | None = None, save_end: bool | None = True, load_update_steps: int | None = None, load_update_windows: int | None = 1, load_update_epochs: int | None = None, params_not_save: List[str] | None = None, save_filter_fn: Callable | None = None, saver_option: SaverOptions | None = None, ckpt_save_arg: CheckpointSaveArguments | None = None, ckpt_load_arg: CheckpointLoadArguments | None = None, mixed_precision: str | None = None, window_iter: int | None = None)[source]
Configuration class for training parameters.
This dataclass contains all the configuration parameters needed for training, including optimization settings, logging intervals, and checkpoint management.
- gradient_accumulation_steps
Number of steps to accumulate gradients before performing an optimizer step. Defaults to 1.
- Type:
- train_steps
Maximum number of training steps. If None, will train for full epochs. Defaults to None.
- Type:
Optional[int]
- eval_steps
Number of evaluation steps. If None, evaluates on full dataset. Defaults to None.
- Type:
Optional[int]
- load_update_steps
Number of steps to load dynamic model bank. Defaults to None.
- Type:
Optional[int]
- load_update_windows
Number of window to load dynamic model bank. Defaults to 1.
- Type:
Optional[int]
- load_update_epochs
Number of epochs to load dynamic model bank. Defaults to None.
- Type:
Optional[int]
- save_filter_fn
Function to filter checkpoint blocks. Defaults to None.
- Type:
[Callable]
- saver_option
Options for checkpoint saver. Defaults to None.
- Type:
Optional[SaverOptions]
- ckpt_save_arg
Arguments for checkpoint save. Defaults to None.
- Type:
Optional[CheckpointSaveArguments]
- ckpt_load_arg
Arguments for checkpoint load. Defaults to None.
- Type:
Optional[CheckpointLoadArguments]
Trainer
- class recis.framework.trainer.Trainer(model: Module | None = None, args: TrainingArguments = None, train_dataset: Dataset | None = None, eval_dataset: Dataset | None = None, hooks: List[Hook] | None = None, dense_optimizers: Tuple[Optimizer, LambdaLR] = (None, None), sparse_optimizer: SparseOptimizer | None = None, data_to_cuda: bool = False, saver: Saver | None = None, **kwargs)[source]
Main training orchestrator with distributed training and checkpoint management.
The Trainer class provides a comprehensive training framework that handles: - Distributed training coordination using Accelerate - Automatic checkpoint saving and loading - Training and evaluation loops with metrics tracking - Hook system for extensible training workflows - Support for both dense and sparse optimizers
- args
Training configuration parameters.
- Type:
- train_dataset
Training dataset.
- Type:
Optional[Dataset]
- eval_dataset
Evaluation dataset.
- Type:
Optional[Dataset]
- model
The model to train.
- Type:
nn.Module
- dense_optimizer
Dense parameter optimizer.
- Type:
- dense_lr_scheduler
Learning rate scheduler for dense optimizer.
- sparse_optimizer
Sparse parameter optimizer.
- Type:
Optional[sparse_optim.SparseOptimizer]
- accelerator
Accelerate instance for distributed training.
- Type:
Accelerator
Example:
from recis.framework import Trainer, TrainingArguments from recis.optim import SparseAdamW from torch.optim import AdamW # Set training arguments training_args = TrainingArguments( output_dir="./checkpoints", train_steps=10000, eval_steps=1000, save_steps=2000, log_steps=100, gradient_accumulation_steps=4, ) # split sparse params from recis.nn.modules.hashtable import filter_out_sparse_param sparse_params = filter_out_sparse_param(model) # create optimizers sparse_optimizer = SparseAdamW(sparse_params, lr=0.001) dense_optimizer = AdamW(model.parameters(), lr=0.001) # create trainer trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset, dense_optimizers=(dense_optimizer, None), sparse_optimizer=sparse_optimizer, data_to_cuda=True, ) # train the model trainer.train()
- __init__(model: Module | None = None, args: TrainingArguments = None, train_dataset: Dataset | None = None, eval_dataset: Dataset | None = None, hooks: List[Hook] | None = None, dense_optimizers: Tuple[Optimizer, LambdaLR] = (None, None), sparse_optimizer: SparseOptimizer | None = None, data_to_cuda: bool = False, saver: Saver | None = None, **kwargs) None[source]
Initialize the Trainer with model, datasets, and training configuration.
- Parameters:
model (Optional[nn.Module]) – The model to train.
args (TrainingArguments) – Training configuration. If None, uses default.
train_dataset (Optional[Dataset]) – Training dataset.
eval_dataset (Optional[Dataset]) – Evaluation dataset.
hooks (Optional[List[Hook]]) – List of training hooks for extensibility.
dense_optimizers (Tuple) – Tuple of (optimizer, lr_scheduler) for dense parameters.
sparse_optimizer (Optional[sparse_optim.SparseOptimizer]) – Optimizer for sparse parameters.
data_to_cuda (bool) – Whether to automatically move data to CUDA. Defaults to False.
**kwargs – Additional arguments passed to Accelerator.
- add_hook(hook: Hook)[source]
Add a single hook to the trainer.
- Parameters:
hook (Hook) – The hook to add.
- add_hooks(hooks: List[Hook])[source]
Add multiple hooks to the trainer.
- Parameters:
hooks (List[Hook]) – List of hooks to add.
- evaluate(eval_steps=None)[source]
Execute the evaluation loop.
- Parameters:
eval_steps (Optional[int]) – Override for number of evaluation steps. If None, evaluates on full dataset.
Saver
- class recis.framework.checkpoint_manager.Saver(options: SaverOptions)[source]
Checkpoint saver for managing model and training state persistence.
The Saver class handles the saving and loading of model checkpoints including: - Dense and sparse model parameters - Optimizer states - IO states for datasets - Checkpoint versioning and cleanup - Support for distributed filesystems
Example
>>> saver = Saver( ... model=model, ... sparse_optim=sparse_optimizer, ... output_dir="./checkpoints", ... max_keep=5, ... ) >>> saver.save("checkpoint_001")
- __init__(options: SaverOptions)[source]
Initialize the checkpoint saver.
- Parameters:
model (torch.nn.Module) – The model to save checkpoints for.
sparse_optim (Optional) – Sparse optimizer instance for sparse parameters.
output_dir (str) – Directory to save checkpoints. Defaults to “./”.
max_keep (int) – Maximum number of checkpoints to keep. Defaults to 1.
concurrency (int) – Number of concurrent save operations. Defaults to 4.
- register_for_checkpointing(name, obj: object)[source]
Register an object for checkpointing.
- Parameters:
- Raises:
ValueError – If the name is already registered.
- register_io_state(name, obj: object)[source]
Register an object for IO state persistence.
- Parameters:
- Raises:
ValueError – If the name is already registered.
- save(ckpt_id: str, label_key: str | None = None, label_value: str | None = None)[source]
Save a complete checkpoint with the given ID.
This method saves all registered components including model parameters, optimizer states, and IO states. It also handles checkpoint versioning and cleanup of old checkpoints.
ModelBankParser
TODO(lanling.ljw)
Exporter
- class recis.framework.exporter.Exporter(model, sparse_model_name, dense_model_name, dataset, ckpt_dir, export_dir, dense_optimizer=None, export_folder_name='fx_user_model', export_model_name='user_model', export_outputs=None, fg=None, fg_conf_or_path=None, mc_conf_or_path=None, filter_sparse_opt=False)[source]
Model exporter for RecIS framework with support for sparse and dense models.
The Exporter class handles the export process for trained RecIS models, managing both sparse embedding tables and dense neural network components. It supports distributed export across multiple workers and handles various storage backends including local filesystem and cloud storage.
- Key Features:
Separate export of sparse and dense model components
Distributed export with automatic file partitioning
Support for multiple storage backends (local, cloud)
Configuration export for feature generation and model compilation
Automatic model preparation and state loading
- model
Complete model containing both sparse and dense components.
- sparse_model
Sparse embedding component of the model.
- dense_model
Dense neural network component of the model.
- dataset
Dataset used for model tracing during export.
- dense_optimizer
Optional optimizer for dense model components.
- export_outputs
Specification of model output nodes.
- fx_tool
Tool for exporting TorchFX models.
- Type:
ExportTorchFxTool
- __init__(model, sparse_model_name, dense_model_name, dataset, ckpt_dir, export_dir, dense_optimizer=None, export_folder_name='fx_user_model', export_model_name='user_model', export_outputs=None, fg=None, fg_conf_or_path=None, mc_conf_or_path=None, filter_sparse_opt=False)[source]
Initialize the model exporter with configuration parameters.
- Parameters:
model – Complete RecIS model containing sparse and dense components.
sparse_model_name (str) – Name of the sparse model submodule.
dense_model_name (str) – Name of the dense model submodule.
dataset – Dataset for model tracing during export process.
ckpt_dir (str) – Directory path containing model checkpoints.
export_dir (str) – Target directory for exported model files.
dense_optimizer – Optional optimizer for dense model components.
export_folder_name (str, optional) – Name of the export folder. Defaults to “fx_user_model”.
export_model_name (str, optional) – Name for the exported model. Defaults to “user_model”.
export_outputs – Specification of model output nodes for export.
fg – Feature generator instance for configuration extraction.
fg_conf_or_path – Feature generation configuration dict or file path.
mc_conf_or_path – Model compilation configuration dict or file path.
filter_sparse_opt (bool, optional) – Whether to filter sparse optimization parameters. Defaults to False.
- Raises:
AssertionError – If neither fg nor fg_conf_or_path is provided.
AssertionError – If neither fg nor mc_conf_or_path is provided.
AssertionError – If MOS is required but not available for model paths.
- export()[source]
Execute the complete model export process.
This method orchestrates the entire export workflow, including model preparation, sparse component export, dense component export, and metadata export. The process is designed to work in distributed environments with automatic work partitioning.
- The export process consists of:
Model preparation and checkpoint loading
Sparse model component export
Dense model component export with TorchFX
Configuration metadata export
Note
This method should be called on all workers in a distributed setup. File operations are automatically partitioned based on worker rank.
Advanced Usage
Custom Training Pipeline
from framework.trainer import Trainer
class MyTrainer(Trainer):
def _train_step(self, data, epoch, metrics):
self.dense_optimizer.zero_grad()
if self.sparse_optimizer is not None:
self.sparse_optimizer.zero_grad()
loss = self.model(data)
metrics.update(epoch=epoch)
metrics.update(loss=loss)
metrics.update(get_global_metrics())
loss.backward()
self.dense_optimizer.step()
if self.sparse_optimizer is not None:
self.sparse_optimizer.step()
if self.dense_lr_scheduler is not None:
self.dense_lr_scheduler.step()
Gradient Accumulation Training
# Configure gradient accumulation
training_args = TrainingArguments(
output_dir="./output",
train_steps=10000,
gradient_accumulation_steps=8, # Accumulate 8 steps before update
log_steps=100
)
# Trainer will automatically handle gradient accumulation
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset
)