Source code for recis.io.orc_dataset

import os

import torch

from recis.io.dataset_base import DatasetBase
from recis.utils.logger import Logger


if not os.environ.get("BUILD_DOCUMENT", None) == "1":
    import column_io.dataset.dataset as column_io_dataset
    from column_io.dataset.file_sharding import OrcFileSharding


logger = Logger(__name__)


def get_dir_size(dir_name):
    """Get the size of a directory.

    Args:
        dir_name (str): The directory path to measure.

    Returns:
        int: The size of the directory. Currently returns 0 as placeholder.

    Todo:
        Implement actual file size calculation logic.
    """
    # TODO(yuhuan.zh) get file real size
    return 0


[docs] class OrcDataset(DatasetBase): """ORC Dataset for reading Optimized Row Columnar format files. This class provides functionality to read ORC files efficiently with support for both sparse (variable-length) and dense (fixed-length) features. It extends DatasetBase to provide ORC-specific optimizations including hash feature processing, data sharding, and batch processing. The OrcDataset supports distributed training by allowing multiple workers to process different shards of the data concurrently. It also provides flexible feature configuration with hash bucketing for categorical features. Attributes: hash_types (List[str]): List of hash algorithms used for features. hash_buckets (List[int]): List of hash bucket sizes for features. hash_features (List[str]): List of feature names that use hashing. Example: Creating and configuring an ORC dataset: .. code-block:: python # Initialize dataset dataset = OrcDataset( batch_size=512, worker_idx=0, worker_num=4, shuffle=True, ragged_format=True ) # Add data sources dataset.add_paths(["/data/train/part1", "/data/train/part2"]) # Configure sparse features with hashing dataset.varlen_feature("item_id", hash_type="farm", hash_bucket=1000000) dataset.varlen_feature("category_id", hash_type="murmur", hash_bucket=10000) # Configure dense features dataset.fixedlen_feature("price", default_value=0.0) dataset.fixedlen_feature("rating", default_value=3.0) """ def __init__( self, batch_size, worker_idx=0, worker_num=1, read_threads_num=4, pack_threads_num=None, prefetch=1, is_compressed=False, drop_remainder=False, worker_slice_batch_num=None, shuffle=False, ragged_format=True, transform_fn=None, save_interval=100, dtype=torch.float32, device="cpu", ) -> None: """Initialize OrcDataset with configuration parameters. Args: batch_size (int): Number of samples per batch. worker_idx (int, optional): Index of current worker. Defaults to 0. worker_num (int, optional): Total number of workers. Defaults to 1. read_threads_num (int, optional): Number of reading threads. Defaults to 4. pack_threads_num (int, optional): Number of packing threads. Defaults to None. prefetch (int, optional): Number of batches to prefetch. Defaults to 1. is_compressed (bool, optional): Whether data is compressed. Defaults to False. drop_remainder (bool, optional): Whether to drop incomplete batches. Defaults to False. worker_slice_batch_num (int, optional): Number of batches per worker slice. Defaults to None. shuffle (bool, optional): Whether to shuffle the data. Defaults to False. ragged_format (bool, optional): Whether to use ragged tensor format. Defaults to True. transform_fn (callable, optional): Data transformation function. Defaults to None. save_interval (int, optional): Interval for saving checkpoints. Defaults to 100. dtype (torch.dtype, optional): Data type for tensors. Defaults to torch.float32. device (str, optional): Device for tensor operations. Defaults to "cpu". Raises: AssertionError: If is_compressed is True (not supported yet). Note: Compressed data is not currently supported for ORC datasets. """ super().__init__( batch_size, worker_idx, worker_num, read_threads_num, pack_threads_num, prefetch, is_compressed, drop_remainder, worker_slice_batch_num, ragged_format, transform_fn, save_interval, dtype, device, ) assert not is_compressed, "OrcDataset not support compressed data yet." self._shuffle = shuffle self._dir_sizes = [] self._total_row_count = 0 self.hash_types = [] self.hash_buckets = [] self.hash_features = [] def add_path(self, file_path): """Add a single file path to the dataset. Args: file_path (str): Path to the ORC file or directory to be added. Example: .. code-block:: python dataset.add_path("/data/train/part_001.orc") """ self._paths.append(file_path) self._dir_sizes.append(0) def add_paths(self, file_paths): """Add multiple file paths to the dataset. Args: file_paths (List[str]): List of paths to ORC files or directories. Example: .. code-block:: python dataset.add_paths( [ "/data/train/part_001.orc", "/data/train/part_002.orc", "/data/train/part_003.orc", ] ) """ for file_path in file_paths: self.add_path(file_path) def varlen_feature(self, name, hash_type=None, hash_bucket=0, trans_int8=False): """Configure a variable-length (sparse) feature with optional hashing. Variable-length features are columns that contain sequences or lists of values with varying lengths across samples. These features can optionally be processed with hash functions for dimensionality reduction and categorical encoding. Args: name (str): Name of the feature column in the ORC files. hash_type (str, optional): Hash algorithm to use for the feature. Supported values are "farm" (FarmHash) and "murmur" (MurmurHash). If None, no hashing is applied. Defaults to None. hash_bucket (int, optional): Size of the hash bucket (vocabulary size). Only used when hash_type is specified. Defaults to 0. trans_int8 (bool, optional): Whether to convert string data directly to int8 tensors without hashing. Only effective when hash_type is None. Defaults to False. Example: .. code-block:: python # Sparse feature with FarmHash dataset.varlen_feature( "item_sequence", hash_type="farm", hash_bucket=100000 ) # Sparse feature with MurmurHash dataset.varlen_feature( "category_ids", hash_type="murmur", hash_bucket=50000 ) # Raw sparse feature without hashing dataset.varlen_feature("user_tags") # String feature converted to int8 dataset.varlen_feature("text_tokens", trans_int8=True) Raises: AssertionError: If hash_type is not "farm" or "murmur" when specified. Note: Hash functions are useful for handling large categorical vocabularies by mapping them to a fixed-size space. FarmHash generally provides better distribution properties, while MurmurHash is faster. """ if name not in self._select_column: self._select_column.append(name) if hash_type: assert hash_type in [ "farm", "murmur", ], "hash_type must be farm / murmur" self.hash_features.append(name) self.hash_buckets.append(hash_bucket) self.hash_types.append(hash_type) elif trans_int8: self.hash_features.append(name) self.hash_buckets.append(hash_bucket) self.hash_types.append("no_hash") def fixedlen_feature(self, name, default_value): """Configure a fixed-length (dense) feature. Fixed-length features are typically used for numerical data where each sample has exactly one value, such as user age, item price, or ratings. Args: name (str): Name of the feature column. default_value (float): Default value to use when feature is missing. Example: .. code-block:: python # Numerical features with default values dataset.fixedlen_feature("user_age", default_value=25.0) dataset.fixedlen_feature("item_price", default_value=0.0) dataset.fixedlen_feature("rating", default_value=3.0) Note: Default values are important for handling missing data gracefully and ensuring consistent tensor shapes across batches. """ if name not in self._select_column: self._select_column.append(name) if name not in self._dense_column: self._dense_column.append(name) self._dense_default_value.append(default_value) def _shard_path(self, sub_id, sub_num): """Create data shards for distributed processing. This method partitions the input files across multiple workers and threads to enable parallel data loading. It uses OrcFileSharding to ensure balanced distribution of data. Args: sub_id (int): Sub-process identifier within the worker. sub_num (int): Total number of sub-processes per worker. Note: This is an internal method used by the dataset creation process. The sharding strategy ensures that each worker processes a unique subset of the data while maintaining load balance. """ file_shard = OrcFileSharding() file_shard.add_paths(self._paths) self._shard_paths = file_shard.partition( self._worker_idx * sub_num + sub_id, self._worker_num * sub_num, self._read_threads_num, shuffle=self._shuffle, ) def make_dataset_fn(self): """Create a dataset factory function for ORC file processing. This method returns a lambda function that creates a column_io Dataset from ORC files with the configured features and processing parameters. Returns: callable: A function that takes a file path and returns a Dataset object. Note: The returned function is used internally by the data loading pipeline to create dataset instances for each shard of data. """ return lambda x: column_io_dataset.Dataset.from_orc_files( [x.decode() if isinstance(x, bytes) else x], self._is_compressed, self._batch_size, self._select_column, self.hash_features, self.hash_types, self.hash_buckets, self._dense_column, self._dense_default_value, ) def get_dir_size(self): """Calculate and return the sizes of all configured data directories. This method iterates through all added paths and calculates their sizes, updating the internal tracking of total row count for the dataset. Returns: List[int]: List of directory sizes corresponding to each path. Note: Currently uses a placeholder implementation that returns 0 for all directories. The actual size calculation logic needs to be implemented. """ for i, dir_name in enumerate(self._paths): dir_size = get_dir_size(dir_name) self._dir_sizes[i] = dir_size self._total_row_count += dir_size return self._dir_sizes