IO Module

RecIS’s IO module provides efficient and flexible data loading and preprocessing capabilities, supporting multiple data formats and optimized data pipelines for deep learning model training. With RecIS’s IO module, you can achieve better performance without needing to combine with traditional DataLoader.

Core Features

Data Support Support
  • ORC Files: Support for Optimized Row Columnar format, suitable for large-scale offline data processing

High-Performance Data Processing
  • Multi-threaded parallel reading and data preprocessing

  • Configurable prefetching and buffering mechanisms

  • Direct data organization on different devices (CPU/GPU/Pin Memory)

Flexible Feature Configuration
  • Support for sparse features (variable-length) and dense features (fixed-length)

  • Hash feature processing with FarmHash and MurmurHash algorithms

  • RaggedTensor format for variable-length features

Distributed Training Optimization
  • Multi-worker data sharding

  • State saving and recovery mechanisms

Dataset Classes

OrcDataset

class recis.io.OrcDataset(batch_size, worker_idx=0, worker_num=1, read_threads_num=4, pack_threads_num=None, prefetch=1, is_compressed=False, drop_remainder=False, worker_slice_batch_num=None, shuffle=False, ragged_format=True, transform_fn=None, save_interval=100, dtype=torch.float32, device='cpu')[source]

ORC Dataset for reading Optimized Row Columnar format files.

This class provides functionality to read ORC files efficiently with support for both sparse (variable-length) and dense (fixed-length) features. It extends DatasetBase to provide ORC-specific optimizations including hash feature processing, data sharding, and batch processing.

The OrcDataset supports distributed training by allowing multiple workers to process different shards of the data concurrently. It also provides flexible feature configuration with hash bucketing for categorical features.

hash_types

List of hash algorithms used for features.

Type:

List[str]

hash_buckets

List of hash bucket sizes for features.

Type:

List[int]

hash_features

List of feature names that use hashing.

Type:

List[str]

Example

Creating and configuring an ORC dataset:

# Initialize dataset
dataset = OrcDataset(
    batch_size=512, worker_idx=0, worker_num=4, shuffle=True, ragged_format=True
)

# Add data sources
dataset.add_paths(["/data/train/part1", "/data/train/part2"])

# Configure sparse features with hashing
dataset.varlen_feature("item_id", hash_type="farm", hash_bucket=1000000)
dataset.varlen_feature("category_id", hash_type="murmur", hash_bucket=10000)

# Configure dense features
dataset.fixedlen_feature("price", default_value=0.0)
dataset.fixedlen_feature("rating", default_value=3.0)
Members:

__init__, add_path, add_paths, varlen_feature, fixedlen_feature

Best Practices

  1. Reasonable Parameter Settings:

    dataset = OrcDataset(
        batch_size=1024,
        read_threads_num=2,
        prefetch=1,           # Number of prefetch batches
        device="cuda"         # Put batch results directly on cuda
    )
    
  2. Using Data Preprocessing:

    def transform_batch(batch):
        # Custom batch processing logic
        return processed_batch
    
    dataset = OrcDataset(
        batch_size=1024,
        transform_fn=transform_batch
    )
    
  3. Distributed Data Reading:

    import os
    
    rank = int(os.environ.get("RANK", 0))
    world_size = int(os.environ.get("WORLD_SIZE", 1))
    
    dataset = OrcDataset(
        batch_size=1024,
        worker_idx=rank,
        worker_num=world_size
    )
    

Common Questions

Q: How to handle variable-length sequences?

A: Use varlen_feature to define variable-length features, RecIS will automatically process them into RaggedTensor format:

dataset.varlen_feature("sequence_ids")
# Data will be processed as RaggedTensor, containing values and offsets

Q: How to customize data preprocessing?

A: Pass a custom processing function through the transform_fn parameter:

def custom_transform(batch):
    # Custom processing logic
    batch['processed_feature'] = process_feature(batch['raw_feature'])
    return batch

dataset = OrcDataset(batch_size=1024, transform_fn=custom_transform)

Q: How to optimize data reading performance?

A: You can optimize from the following aspects:

  1. Modify read_threads_num and prefetch parameters

  2. Set reasonable batch_size

  3. Set device=’cuda’ to automatically organize output results on cuda