IO Module
RecIS’s IO module provides efficient and flexible data loading and preprocessing capabilities, supporting multiple data formats and optimized data pipelines for deep learning model training. With RecIS’s IO module, you can achieve better performance without needing to combine with traditional DataLoader.
Core Features
- Data Support Support
ORC Files: Support for Optimized Row Columnar format, suitable for large-scale offline data processing
- High-Performance Data Processing
Multi-threaded parallel reading and data preprocessing
Configurable prefetching and buffering mechanisms
Direct data organization on different devices (CPU/GPU/Pin Memory)
- Flexible Feature Configuration
Support for sparse features (variable-length) and dense features (fixed-length)
Hash feature processing with FarmHash and MurmurHash algorithms
RaggedTensor format for variable-length features
- Distributed Training Optimization
Multi-worker data sharding
State saving and recovery mechanisms
Dataset Classes
DatasetBase
- class recis.io.dataset_base.DatasetBase(batch_size, worker_idx=0, worker_num=1, read_threads_num=4, pack_threads_num=None, prefetch=1, is_compressed=False, drop_remainder=False, worker_slice_batch_num=None, ragged_format=True, transform_fn=None, save_interval=100, dtype=torch.float32, device='cpu', prefetch_transform=None)[source]
Base class for all RecIS dataset implementations.
This class provides the foundational functionality for data loading and preprocessing in RecIS. It inherits from PyTorch’s IterableDataset and implements common features such as multi-threading, batching, prefetching, and data transformation pipelines.
The DatasetBase class supports: - Distributed data loading across multiple workers - Parallel data reading with configurable thread counts - Automatic batching with optional remainder dropping - Data prefetching for improved performance - Flexible data transformation pipelines - State management for resumable training - Both dense and ragged tensor formats
- ragged_format
Whether to use RaggedTensor format for variable-length data. Defaults to True.
- Type:
- transform_fn
Data transformation function(s). Defaults to None.
- Type:
callable or List[callable], optional
- dtype
Data type for floating-point tensors. Defaults to torch.float32.
- Type:
Example:
# Create a custom dataset by inheriting from DatasetBase class MyDataset(DatasetBase): def make_dataset_fn(self): # Implement dataset creation logic pass def _shard_path(self, sub_id, sub_num): # Implement path sharding logic pass # Use the dataset dataset = MyDataset( batch_size=1024, read_threads_num=4, prefetch=2, device="cuda" )
Note
This is an abstract base class. Subclasses must implement the make_dataset_fn and _shard_path methods to provide specific data source functionality.
- __init__(batch_size, worker_idx=0, worker_num=1, read_threads_num=4, pack_threads_num=None, prefetch=1, is_compressed=False, drop_remainder=False, worker_slice_batch_num=None, ragged_format=True, transform_fn=None, save_interval=100, dtype=torch.float32, device='cpu', prefetch_transform=None) None[source]
- dump_io_state()[source]
Dumps the current IO state for checkpointing.
Returns the current state of the IO system, which can be used to resume data loading from a specific point during training recovery.
- Returns:
Current IO state dictionary, or None if save_interval is 0.
- Return type:
Dict or None
- fixedlen_feature(name, default_value)[source]
Defines a fixed-length feature column with default values.
Fixed-length features are columns that have a consistent shape across all samples. Default values are used when the feature is missing or incomplete in the data.
- Parameters:
name (str) – Name of the feature column.
default_value (List) – Default value(s) to use when the feature is missing. Should be a list even for scalar values.
Example:
dataset.fixedlen_feature("age", default_value=[25.0]) dataset.fixedlen_feature("gender", default_value=[0]) dataset.fixedlen_feature("embedding", default_value=[0.0] * 128)
- load_io_state(io_states)[source]
Loads IO state for resuming data loading.
Restores the IO system to a previously saved state, allowing training to resume from a specific data loading checkpoint.
- Parameters:
io_states (Dict) – Previously saved IO state dictionary.
- reset()[source]
Reset the dataset to initial state.
Resets the io state, allowing the dataset to be reused from the beginning.
- varlen_feature(name, hash_type=None, hash_bucket=0, trans_int8=False)[source]
Configure a variable-length (sparse) feature with optional hashing.
Variable-length features are columns that contain sequences or lists of values with varying lengths across samples. These features can optionally be processed with hash functions for dimensionality reduction and categorical encoding.
- Parameters:
name (str) – Name of the feature column in the ODPS tables.
hash_type (str, optional) – Hash algorithm to use for the feature. Supported values are “farm” (FarmHash) and “murmur” (MurmurHash). If None, no hashing is applied. Defaults to None.
hash_bucket (int, optional) – Size of the hash bucket (vocabulary size). Only used when hash_type is specified. Defaults to 0.
trans_int8 (bool, optional) – Whether to convert string data directly to int8 tensors without hashing. Only effective when hash_type is None. Defaults to False.
Example:
# Sparse feature with FarmHash for large vocabularies dataset.varlen_feature( "user_clicked_items", hash_type="farm", hash_bucket=1000000 ) # Sparse feature with MurmurHash for smaller vocabularies dataset.varlen_feature( "item_categories", hash_type="murmur", hash_bucket=50000 ) # Raw sparse feature without hashing (for pre-processed IDs) dataset.varlen_feature("user_behavior_sequence") # String feature converted to int8 (for text processing) dataset.varlen_feature("review_tokens", trans_int8=True)
- Raises:
AssertionError – If hash_type is not “farm” or “murmur” when specified.
Note
Hash functions are useful for handling large categorical vocabularies by mapping them to a fixed-size space. FarmHash generally provides better distribution properties, while MurmurHash is faster for smaller vocabularies.
OrcDataset
- class recis.io.OrcDataset(batch_size, worker_idx=0, worker_num=1, read_threads_num=4, pack_threads_num=None, prefetch=1, is_compressed=False, drop_remainder=False, worker_slice_batch_num=None, shuffle=False, ragged_format=True, transform_fn=None, save_interval=100, dtype=torch.float32, device='cpu', prefetch_transform=None)[source]
ORC Dataset for reading Optimized Row Columnar format files.
This class provides functionality to read ORC files efficiently with support for both sparse (variable-length) and dense (fixed-length) features. It extends DatasetBase to provide ORC-specific optimizations including hash feature processing, data sharding, and batch processing.
The OrcDataset supports distributed training by allowing multiple workers to process different shards of the data concurrently. It also provides flexible feature configuration with hash bucketing for categorical features.
Example
Creating and configuring an ORC dataset:
# Initialize dataset dataset = OrcDataset( batch_size=512, worker_idx=0, worker_num=4, shuffle=True, ragged_format=True ) # Add data sources dataset.add_paths(["/data/train/part1", "/data/train/part2"]) # Configure sparse features with hashing dataset.varlen_feature("item_id", hash_type="farm", hash_bucket=1000000) dataset.varlen_feature("category_id", hash_type="murmur", hash_bucket=10000) # Configure dense features dataset.fixedlen_feature("price", default_value=0.0) dataset.fixedlen_feature("rating", default_value=3.0)
- __init__(batch_size, worker_idx=0, worker_num=1, read_threads_num=4, pack_threads_num=None, prefetch=1, is_compressed=False, drop_remainder=False, worker_slice_batch_num=None, shuffle=False, ragged_format=True, transform_fn=None, save_interval=100, dtype=torch.float32, device='cpu', prefetch_transform=None) None[source]
Initialize OrcDataset with configuration parameters.
- Parameters:
batch_size (int) – Number of samples per batch.
worker_idx (int, optional) – Index of current worker. Defaults to 0.
worker_num (int, optional) – Total number of workers. Defaults to 1.
read_threads_num (int, optional) – Number of reading threads. Defaults to 4.
pack_threads_num (int, optional) – Number of packing threads. Defaults to None.
prefetch (int, optional) – Number of batches to prefetch. Defaults to 1.
is_compressed (bool, optional) – Whether data is compressed. Defaults to False.
drop_remainder (bool, optional) – Whether to drop incomplete batches. Defaults to False.
worker_slice_batch_num (int, optional) – Number of batches per worker slice. Defaults to None.
shuffle (bool, optional) – Whether to shuffle the data. Defaults to False.
ragged_format (bool, optional) – Whether to use ragged tensor format. Defaults to True.
transform_fn (callable, optional) – Data transformation function. Defaults to None.
save_interval (int, optional) – Interval for saving checkpoints. Defaults to 100.
dtype (torch.dtype, optional) – Data type for tensors. Defaults to torch.float32.
device (str, optional) – Device for tensor operations. Defaults to “cpu”.
prefetch_transform (int, optional) – Number of batches to prefetch for transform. Defaults to None.
- Raises:
AssertionError – If is_compressed is True (not supported yet).
Note
Compressed data is not currently supported for ORC datasets.
OdpsDataset
LakeStreamDataset
WindowIO
Common Questions
Q: How to handle variable-length sequences?
A: Use varlen_feature to define variable-length features, RecIS will automatically process them into RaggedTensor format:
dataset.varlen_feature("sequence_ids")
# Data will be processed as RaggedTensor, containing values and offsets
Q: How to customize data preprocessing?
A: Pass a custom processing function through the transform_fn parameter:
def custom_transform(batch):
# Custom processing logic
batch['processed_feature'] = process_feature(batch['raw_feature'])
return batch
dataset = OrcDataset(batch_size=1024, transform_fn=custom_transform)
Q: How to optimize data reading performance?
A: You can optimize from the following aspects:
Modify read_threads_num and prefetch parameters
Set reasonable batch_size
Set device=’cuda’ to automatically organize output results on cuda