IO Module

RecIS’s IO module provides efficient and flexible data loading and preprocessing capabilities, supporting multiple data formats and optimized data pipelines for deep learning model training. With RecIS’s IO module, you can achieve better performance without needing to combine with traditional DataLoader.

Core Features

Data Support Support
  • ORC Files: Support for Optimized Row Columnar format, suitable for large-scale offline data processing

High-Performance Data Processing
  • Multi-threaded parallel reading and data preprocessing

  • Configurable prefetching and buffering mechanisms

  • Direct data organization on different devices (CPU/GPU/Pin Memory)

Flexible Feature Configuration
  • Support for sparse features (variable-length) and dense features (fixed-length)

  • Hash feature processing with FarmHash and MurmurHash algorithms

  • RaggedTensor format for variable-length features

Distributed Training Optimization
  • Multi-worker data sharding

  • State saving and recovery mechanisms

Dataset Classes

DatasetBase

class recis.io.dataset_base.DatasetBase(batch_size, worker_idx=0, worker_num=1, read_threads_num=4, pack_threads_num=None, prefetch=1, is_compressed=False, drop_remainder=False, worker_slice_batch_num=None, ragged_format=True, transform_fn=None, save_interval=100, dtype=torch.float32, device='cpu', prefetch_transform=None)[source]

Base class for all RecIS dataset implementations.

This class provides the foundational functionality for data loading and preprocessing in RecIS. It inherits from PyTorch’s IterableDataset and implements common features such as multi-threading, batching, prefetching, and data transformation pipelines.

The DatasetBase class supports: - Distributed data loading across multiple workers - Parallel data reading with configurable thread counts - Automatic batching with optional remainder dropping - Data prefetching for improved performance - Flexible data transformation pipelines - State management for resumable training - Both dense and ragged tensor formats

batch_size

Number of samples per batch.

Type:

int

worker_idx

Index of current worker in distributed setup. Defaults to 0.

Type:

int

worker_num

Total number of workers in distributed setup. Defaults to 1.

Type:

int

read_threads_num

Number of parallel reading threads. Defaults to 4.

Type:

int

pack_threads_num

Number of packing threads. Defaults to None.

Type:

int, optional

prefetch

Number of batches to prefetch. Defaults to 1.

Type:

int

is_compressed

Whether data is compressed. Defaults to False.

Type:

bool

drop_remainder

Whether to drop the last incomplete batch. Defaults to False.

Type:

bool

worker_slice_batch_num

Number of batches per worker slice. Defaults to None.

Type:

int, optional

ragged_format

Whether to use RaggedTensor format for variable-length data. Defaults to True.

Type:

bool

transform_fn

Data transformation function(s). Defaults to None.

Type:

callable or List[callable], optional

save_interval

Interval for saving IO state. Defaults to 100.

Type:

int

dtype

Data type for floating-point tensors. Defaults to torch.float32.

Type:

torch.dtype

device

Target device for data placement (“cpu”, “cuda”, or “pin”). Defaults to “cpu”.

Type:

str

Example:

# Create a custom dataset by inheriting from DatasetBase
class MyDataset(DatasetBase):
    def make_dataset_fn(self):
        # Implement dataset creation logic
        pass

    def _shard_path(self, sub_id, sub_num):
        # Implement path sharding logic
        pass


# Use the dataset
dataset = MyDataset(
    batch_size=1024, read_threads_num=4, prefetch=2, device="cuda"
)

Note

This is an abstract base class. Subclasses must implement the make_dataset_fn and _shard_path methods to provide specific data source functionality.

__init__(batch_size, worker_idx=0, worker_num=1, read_threads_num=4, pack_threads_num=None, prefetch=1, is_compressed=False, drop_remainder=False, worker_slice_batch_num=None, ragged_format=True, transform_fn=None, save_interval=100, dtype=torch.float32, device='cpu', prefetch_transform=None) None[source]
dump_io_state()[source]

Dumps the current IO state for checkpointing.

Returns the current state of the IO system, which can be used to resume data loading from a specific point during training recovery.

Returns:

Current IO state dictionary, or None if save_interval is 0.

Return type:

Dict or None

fixedlen_feature(name, default_value)[source]

Defines a fixed-length feature column with default values.

Fixed-length features are columns that have a consistent shape across all samples. Default values are used when the feature is missing or incomplete in the data.

Parameters:
  • name (str) – Name of the feature column.

  • default_value (List) – Default value(s) to use when the feature is missing. Should be a list even for scalar values.

Example:

dataset.fixedlen_feature("age", default_value=[25.0])
dataset.fixedlen_feature("gender", default_value=[0])
dataset.fixedlen_feature("embedding", default_value=[0.0] * 128)
load_io_state(io_states)[source]

Loads IO state for resuming data loading.

Restores the IO system to a previously saved state, allowing training to resume from a specific data loading checkpoint.

Parameters:

io_states (Dict) – Previously saved IO state dictionary.

reset()[source]

Reset the dataset to initial state.

Resets the io state, allowing the dataset to be reused from the beginning.

varlen_feature(name, hash_type=None, hash_bucket=0, trans_int8=False)[source]

Configure a variable-length (sparse) feature with optional hashing.

Variable-length features are columns that contain sequences or lists of values with varying lengths across samples. These features can optionally be processed with hash functions for dimensionality reduction and categorical encoding.

Parameters:
  • name (str) – Name of the feature column in the ODPS tables.

  • hash_type (str, optional) – Hash algorithm to use for the feature. Supported values are “farm” (FarmHash) and “murmur” (MurmurHash). If None, no hashing is applied. Defaults to None.

  • hash_bucket (int, optional) – Size of the hash bucket (vocabulary size). Only used when hash_type is specified. Defaults to 0.

  • trans_int8 (bool, optional) – Whether to convert string data directly to int8 tensors without hashing. Only effective when hash_type is None. Defaults to False.

Example:

# Sparse feature with FarmHash for large vocabularies
dataset.varlen_feature(
    "user_clicked_items", hash_type="farm", hash_bucket=1000000
)

# Sparse feature with MurmurHash for smaller vocabularies
dataset.varlen_feature(
    "item_categories", hash_type="murmur", hash_bucket=50000
)

# Raw sparse feature without hashing (for pre-processed IDs)
dataset.varlen_feature("user_behavior_sequence")

# String feature converted to int8 (for text processing)
dataset.varlen_feature("review_tokens", trans_int8=True)
Raises:

AssertionError – If hash_type is not “farm” or “murmur” when specified.

Note

Hash functions are useful for handling large categorical vocabularies by mapping them to a fixed-size space. FarmHash generally provides better distribution properties, while MurmurHash is faster for smaller vocabularies.

OrcDataset

class recis.io.OrcDataset(batch_size, worker_idx=0, worker_num=1, read_threads_num=4, pack_threads_num=None, prefetch=1, is_compressed=False, drop_remainder=False, worker_slice_batch_num=None, shuffle=False, ragged_format=True, transform_fn=None, save_interval=100, dtype=torch.float32, device='cpu', prefetch_transform=None)[source]

ORC Dataset for reading Optimized Row Columnar format files.

This class provides functionality to read ORC files efficiently with support for both sparse (variable-length) and dense (fixed-length) features. It extends DatasetBase to provide ORC-specific optimizations including hash feature processing, data sharding, and batch processing.

The OrcDataset supports distributed training by allowing multiple workers to process different shards of the data concurrently. It also provides flexible feature configuration with hash bucketing for categorical features.

hash_types

List of hash algorithms used for features.

Type:

List[str]

hash_buckets

List of hash bucket sizes for features.

Type:

List[int]

hash_features

List of feature names that use hashing.

Type:

List[str]

Example

Creating and configuring an ORC dataset:

# Initialize dataset
dataset = OrcDataset(
    batch_size=512, worker_idx=0, worker_num=4, shuffle=True, ragged_format=True
)

# Add data sources
dataset.add_paths(["/data/train/part1", "/data/train/part2"])

# Configure sparse features with hashing
dataset.varlen_feature("item_id", hash_type="farm", hash_bucket=1000000)
dataset.varlen_feature("category_id", hash_type="murmur", hash_bucket=10000)

# Configure dense features
dataset.fixedlen_feature("price", default_value=0.0)
dataset.fixedlen_feature("rating", default_value=3.0)
__init__(batch_size, worker_idx=0, worker_num=1, read_threads_num=4, pack_threads_num=None, prefetch=1, is_compressed=False, drop_remainder=False, worker_slice_batch_num=None, shuffle=False, ragged_format=True, transform_fn=None, save_interval=100, dtype=torch.float32, device='cpu', prefetch_transform=None) None[source]

Initialize OrcDataset with configuration parameters.

Parameters:
  • batch_size (int) – Number of samples per batch.

  • worker_idx (int, optional) – Index of current worker. Defaults to 0.

  • worker_num (int, optional) – Total number of workers. Defaults to 1.

  • read_threads_num (int, optional) – Number of reading threads. Defaults to 4.

  • pack_threads_num (int, optional) – Number of packing threads. Defaults to None.

  • prefetch (int, optional) – Number of batches to prefetch. Defaults to 1.

  • is_compressed (bool, optional) – Whether data is compressed. Defaults to False.

  • drop_remainder (bool, optional) – Whether to drop incomplete batches. Defaults to False.

  • worker_slice_batch_num (int, optional) – Number of batches per worker slice. Defaults to None.

  • shuffle (bool, optional) – Whether to shuffle the data. Defaults to False.

  • ragged_format (bool, optional) – Whether to use ragged tensor format. Defaults to True.

  • transform_fn (callable, optional) – Data transformation function. Defaults to None.

  • save_interval (int, optional) – Interval for saving checkpoints. Defaults to 100.

  • dtype (torch.dtype, optional) – Data type for tensors. Defaults to torch.float32.

  • device (str, optional) – Device for tensor operations. Defaults to “cpu”.

  • prefetch_transform (int, optional) – Number of batches to prefetch for transform. Defaults to None.

Raises:

AssertionError – If is_compressed is True (not supported yet).

Note

Compressed data is not currently supported for ORC datasets.

add_path(file_path)[source]

Add a single file path to the dataset.

Parameters:

file_path (str) – Path to the ORC file or directory to be added.

Example:

dataset.add_path("/data/train/part_001.orc")
add_paths(file_paths)[source]

Add multiple file paths to the dataset.

Parameters:

file_paths (List[str]) – List of paths to ORC files or directories.

Example:

dataset.add_paths(
    [
        "/data/train/part_001.orc",
        "/data/train/part_002.orc",
        "/data/train/part_003.orc",
    ]
)

OdpsDataset

LakeStreamDataset

WindowIO

Common Questions

Q: How to handle variable-length sequences?

A: Use varlen_feature to define variable-length features, RecIS will automatically process them into RaggedTensor format:

dataset.varlen_feature("sequence_ids")
# Data will be processed as RaggedTensor, containing values and offsets

Q: How to customize data preprocessing?

A: Pass a custom processing function through the transform_fn parameter:

def custom_transform(batch):
    # Custom processing logic
    batch['processed_feature'] = process_feature(batch['raw_feature'])
    return batch

dataset = OrcDataset(batch_size=1024, transform_fn=custom_transform)

Q: How to optimize data reading performance?

A: You can optimize from the following aspects:

  1. Modify read_threads_num and prefetch parameters

  2. Set reasonable batch_size

  3. Set device=’cuda’ to automatically organize output results on cuda