Hook System

Basic Hooks

RecIS provides a rich Hook system to extend the training process:

Hook

class recis.hooks.hook.Hook[source]

Base class for all hooks in the RecIS training system.

Hooks provide a way to extend the training process by defining callback methods that are called at specific points during training, evaluation, and execution. All custom hooks should inherit from this base class and override the relevant callback methods.

The hook system supports the following callback points: - Training lifecycle: before_train, after_train - Evaluation lifecycle: before_evaluate, after_evaluate - Epoch lifecycle: before_epoch, after_epoch - Step lifecycle: before_step, after_step - Cleanup: end

after_data(is_train=True, *args, **kwargs)[source]

Called after each data batch.

after_epoch(is_train=True, *args, **kwargs)[source]

Called after each training epoch completes.

This method is invoked at the end of each training epoch, after all steps in that epoch have been executed.

after_step(is_train=True, *args, **kwargs)[source]

Called after each training step completes.

This method is invoked after each individual training step has been executed. Use this for per-step processing logic, such as logging metrics or updating statistics.

after_window(is_train=True, *args, **kwargs)[source]

Called after each window.

before_epoch(is_train=True, *args, **kwargs)[source]

Called before each training epoch starts.

This method is invoked at the beginning of each training epoch, before any steps in that epoch are executed.

before_step(is_train=True, *args, **kwargs)[source]

Called before each training step.

This method is invoked before each individual training step is executed. Use this for per-step setup logic.

before_window(is_train=True, *args, **kwargs)[source]

Called before each window.

end(is_train=True, *args, **kwargs)[source]

Called at the very end of the training process.

This method is invoked for final cleanup operations, such as closing files, finalizing logs, or releasing resources. It is called after all other hook methods have completed.

out_off_data(*args, **kwargs)[source]

Called when out off data iterator.

start(is_train=True, *args, **kwargs)[source]

Called at the very start of the training process.

window_mode(*args, **kwargs)[source]

Called when use window io mode.

Change arguments for window io run mode.

LoggerHook

recis.framework.metrics.add_metric(name, metric)[source]

Add or update a metric in the global metrics registry.

Parameters:
  • name (str) – The name of the metric to add or update.

  • metric – The metric value to store. Can be any type (float, int, tensor, etc.).

Example

>>> add_metric("accuracy", 0.95)
>>> add_metric("loss", 0.05)
>>> add_metric("learning_rate", 0.001)
class recis.hooks.logger_hook.LoggerHook(log_step=10)[source]

Hook for logging training metrics and progress.

The LoggerHook logs training metrics at regular intervals and provides performance statistics including queries per second (QPS). This hook is automatically added by the Trainer, so manual addition is typically not required.

Parameters:

log_step (int) – Logging interval in steps. Defaults to 10.

Example

>>> from recis.hooks import LoggerHook
>>> # Create logger hook with custom interval
>>> from recis.framework.metrics import add_metric
>>> add_metric("loss", 0.123)
>>> add_metric("accuracy", 0.95)
>>> logger_hook = LoggerHook(log_step=50)
>>> trainer.add_hook(logger_hook)
>>> # The hook will automatically log metrics every 50 steps
>>> # Output format: <gstep=100> <lstep=50> <qps=12.34> <loss=0.123> <accuracy=0.95>

Note

The Trainer automatically adds a LoggerHook, so manual addition is usually not necessary unless you need custom logging intervals or multiple loggers.

ProfilerHook

class recis.hooks.profiler_hook.ProfilerHook(wait=1, warmup=48, active=1, repeat=4, output_dir='./')[source]

Hook for performance profiling during training.

The ProfilerHook uses PyTorch’s profiler to collect detailed performance metrics during training. It captures CPU and GPU activities, memory usage, operation shapes, and FLOP counts. The profiling results are saved as Chrome trace files for visualization in Chrome’s tracing tool.

Parameters:
  • wait (int) – Number of steps to wait before starting profiling. Defaults to 1.

  • warmup (int) – Number of warmup steps before active profiling. Defaults to 48.

  • active (int) – Number of active profiling steps. Defaults to 1.

  • repeat (int) – Number of profiling cycles to repeat. Defaults to 4.

  • output_dir (str) – Directory to save profiling results. Defaults to “./”.

prof

PyTorch profiler instance.

Type:

torch.profiler.profile

logger

Logger instance for outputting messages.

Type:

Logger

output_dir

Output directory for profiling results.

Type:

str

Example

>>> from recis.hooks import ProfilerHook
>>> # Create profiler hook with custom settings
>>> profiler_hook = ProfilerHook(
...     wait=1, warmup=28, active=2, repeat=1, output_dir="./timeline/"
... )
>>> trainer.add_hook(profiler_hook)
>>> # The hook will automatically profile training and save results
>>> # Results will be saved as Chrome trace files (.json)

Note

The profiling results can be visualized by opening the generated .json files in Chrome’s tracing tool (chrome://tracing/).

__init__(wait=1, warmup=48, active=1, repeat=4, output_dir='./')[source]

MLTrackerHook

recis.hooks.ml_tracker_hook.add_to_ml_tracker(name: str, data)[source]

Adds data to the ML tracker trace map.

This function adds metrics or other data to the global trace map that will be logged to the ML tracking system. Tensor data is automatically converted to numpy arrays for compatibility.

Parameters:
  • name (str) – Name/key for the data being tracked.

  • data – Data to be tracked. Can be torch.Tensor or any other type. Tensors are automatically converted to CPU numpy arrays.

Example

>>> import torch
>>> # Add scalar metrics
>>> add_to_ml_tracker("loss", 0.123)
>>> add_to_ml_tracker("learning_rate", 0.001)
>>> # Add tensor data (automatically converted)
>>> predictions = torch.tensor([0.1, 0.9, 0.3])
>>> add_to_ml_tracker("predictions", predictions)
>>> # The tensor will be stored as numpy array in trace map

Note

Tensor data is detached from the computation graph and moved to CPU before conversion to numpy to ensure compatibility with the ML tracker.

class recis.hooks.ml_tracker_hook.MLTrackerHook(project: str, name: str, config: Dict, id=None)[source]

Hook for experiment tracking with ML tracking systems.

The MLTrackerHook integrates with ML tracking platforms to automatically log training metrics, hyperparameters, and other experiment data. It initializes an ML tracker session and logs accumulated data after each training step.

Parameters:
  • project (str) – Name of the project for experiment tracking.

  • name (str) – Name of the experiment run.

  • config (Dict) – Configuration dictionary containing hyperparameters and other experiment settings.

  • id (optional) – Unique identifier for the experiment run. If None, a new ID will be generated automatically.

tracker

ML tracker instance for logging experiment data.

Example

>>> from recis.hooks import MLTrackerHook, add_to_ml_tracker
>>> # Create ML tracker hook
>>> config = {
...     "learning_rate": 0.001,
...     "batch_size": 32,
...     "model_type": "transformer",
... }
>>> ml_hook = MLTrackerHook(
...     project="recommendation_model", name="experiment_v1", config=config
... )
>>> trainer.add_hook(ml_hook)
>>> # During training, add metrics to be tracked
>>> add_to_ml_tracker("train_loss", loss.item())
>>> add_to_ml_tracker("train_accuracy", accuracy)
>>> # The hook will automatically log these metrics after each step

Note

This hook is only available in internal environments where the ml_tracker library is accessible. Use add_to_ml_tracker() to add data that should be logged to the tracking system.

__init__(project: str, name: str, config: Dict, id=None) None[source]

TraceToOdpsHook

recis.hooks.trace_to_odps_hook.add_to_trace(name: str, tensor: Tensor | ndarray | list = None)[source]

Adds data to the trace map for ODPS logging.

This function adds training data to the global trace map that will be uploaded to ODPS tables. Supports tensors, numpy arrays, and lists.

Parameters:
  • name (str) – Name/key for the data being traced.

  • tensor (Union[torch.Tensor, np.ndarray, list]) – Data to be traced. Must be one of the supported types.

Raises:

ValueError – If the tensor type is not supported.

Example

>>> import torch
>>> import numpy as np
>>> # Add tensor data
>>> embeddings = torch.randn(100, 64)
>>> add_to_trace("user_embeddings", embeddings)
>>> # Add numpy array
>>> features = np.random.rand(100, 32)
>>> add_to_trace("item_features", features)
>>> # Add list data
>>> user_ids = [1, 2, 3, 4, 5]
>>> add_to_trace("user_ids", user_ids)

Note

Tensor data is automatically converted to numpy arrays for compatibility with ODPS. A warning is logged if data with the same name already exists.

class recis.hooks.trace_to_odps_hook.TraceToOdpsHook(config: Dict, fields: List[str], types: List[str], worker_num: int = 1, size_threshold: int = 52428800)[source]

Hook for tracing training data to ODPS tables.

The TraceToOdpsHook provides high-performance data collection and upload capabilities for training traces. It uses multiprocessing to avoid blocking the main training process and supports configurable batching and buffering.

Parameters:
  • config (Dict) – ODPS configuration dictionary containing connection details. Required keys: access_id, access_key, project, end_point, table_name. Optional keys: partition.

  • fields (List[str]) – List of field names for the ODPS table schema.

  • types (List[str]) – List of field types corresponding to the fields.

  • worker_num (int) – Number of worker processes for parallel uploads. Defaults to 1.

  • size_threshold (int) – Buffer size threshold in bytes for triggering flushes. Defaults to 50 MiB.

queue

Multiprocessing queue for data transfer.

Type:

Queue

writer_num

Number of writer processes.

Type:

int

writers

List of writer process instances.

Type:

List[TraceWriter]

Example

>>> from recis.hooks import TraceToOdpsHook, add_to_trace
>>> # Configure ODPS connection
>>> config = {
...     "access_id": "your_access_id",
...     "access_key": "your_access_key",
...     "project": "your_project",
...     "end_point": "your_endpoint",
...     "table_name": "training_traces",
...     "partition": "dt=20231201",
... }
>>> # Define table schema
>>> fields = ["user_id", "item_id", "embedding", "score"]
>>> types = ["bigint", "bigint", "string", "double"]
>>> # Create hook
>>> odps_hook = TraceToOdpsHook(
...     config=config, fields=fields, types=types, worker_num=2
... )
>>> trainer.add_hook(odps_hook)
>>> # During training, add data to be traced
>>> add_to_trace("user_embeddings", user_embeddings)
>>> add_to_trace("item_scores", item_scores)
>>> # The hook will automatically upload data after each step

Note

This hook is only available in internal environments where ODPS access is configured. Use add_to_trace() to add data that should be uploaded to ODPS tables.

__init__(config: Dict, fields: List[str], types: List[str], worker_num: int = 1, size_threshold: int = 52428800) None[source]

MetricReportHook

class recis.hooks.metric_report_hook.MetricReportHook(model, report_args: ReportArguments | None = None)[source]
__init__(model, report_args: ReportArguments | None = None)[source]

HashTableFilterHook

class recis.hooks.filter_hook.HashTableFilterHook(filter_interval: int = 100)[source]

Hook for automatic hash table feature filtering during training.

This hook manages the lifecycle of features in hash tables by coordinating filtering operations across multiple hash table instances. It automatically updates step counters and triggers filtering operations at configurable intervals to remove stale or inactive features.

The hook integrates with the hash table filter system to:
  • Track global training steps for each hash table filter

  • Execute filtering operations at specified intervals

  • Provide comprehensive logging of filter activities

  • Support dynamic adjustment of filtering frequency

Parameters:

filter_interval (int, optional) – Number of training steps between filter operations. If None, filtering is disabled. Defaults to 100.

Examples:

Please refer to the documentation Feature Admission and Feature Filtering

# Create and configure filter hook
filter_hook = HashTableFilterHook(filter_interval=200)

# Training loop integration
for epoch in range(num_epochs):
    for step, batch in enumerate(dataloader):
        # ... training logic ...

        # Hook automatically manages filtering
        filter_hook.after_step(global_step=global_step)

        global_step += 1
__init__(filter_interval: int = 100)[source]

Initialize the hash table filter hook.

Parameters:

filter_interval (int, optional) – Number of training steps between filter operations. Must be positive. If None, filtering is disabled. Defaults to 100.

Example:

# Standard filtering every 100 steps
hook = HashTableFilterHook(filter_interval=100)

Custom Hooks

from recis.hooks import Hook

class CustomHook(Hook):
    def __init__(self, custom_param):
        self.custom_param = custom_param

    def before_train(self, trainer):
        print(f"Training started with {self.custom_param}")

    def after_step(self, trainer):
        if trainer.state.global_step % 1000 == 0:
            # Execute custom logic every 1000 steps
            self.custom_logic(trainer)

    def custom_logic(self, trainer):
        # Custom logic implementation
        pass

# Use custom hook
custom_hook = CustomHook("my_parameter")
trainer.add_hook(custom_hook)