Feature Processing Module
RecIS’s Feature Processing module provides efficient and flexible feature engineering and preprocessing capabilities, supporting complex feature transformation pipelines and operator fusion optimization for high-performance feature processing solutions.
Core Features
- High-Performance Feature Processing
Operator Fusion Optimization: Automatically identifies fusible operators and performs batch processing for significant performance improvements
GPU Accelerated Computing: Support for CUDA-accelerated core operators including hashing, bucketing, and cutoff
- Feature Operators
Hash Operators: Provide FarmHash and MurmurHash algorithms for large-scale categorical feature processing
Bucketing Operators: Support for numerical feature discretization and boundary bucketing
Sequence Processing: Provide sequence truncation, padding, and length control functionality
Feature Crossing: Support for multi-feature cross combinations to generate new features
- Flexible Execution Engine
Dynamic Compilation: Dynamic compilation and optimization of feature pipelines
Caching Mechanism: Feature computation result caching and reuse
Operator Fusion Optimization
RecIS’s feature processing module provides advanced operator fusion optimization mechanisms:
- Automatic Fusion Recognition
Automatically identifies fusible operators of the same type
Batch processing improves GPU utilization
Reduces memory copying and kernel launch overhead
- Supported Fusion Operators
FusedHashOP: Batch hash processing
FusedBoundaryOP: Batch bucketing processing
FusedModOP: Batch modulo operation processing
FusedCutoffOP: Batch sequence truncation processing
Core Components
FeatureEngine
- class recis.features.FeatureEngine(feature_list: List[Feature])[source]
Main feature processing engine with automatic optimization.
The FeatureEngine manages a collection of features and their processing pipelines. It automatically optimizes execution through operation fusion, deduplication of identical features, and efficient step-by-step processing.
- Key Features:
Automatic operation fusion for improved performance
Feature deduplication based on hash values
Step-by-step execution with dependency management
Support for both fused and individual operation execution
Example:
from recis.features.feature import Feature from recis.features.op import SelectField, Hash, Mod # simple feature user_feature = Feature("user_id").\ add_op(SelectField("user_id")).\ add_op(Mod(10000)) # sequence feature seq_feature = Feature("seq_item_id").\ SequenceTruncate(seq_len=20, truncate=True, truncate_side="right", check_length=False, n_dims=3, dtype=torch.int64).\ add_op(Mod(10000))
- __init__(feature_list: List[Feature])[source]
Initialize the feature engine with a list of features.
The engine automatically deduplicates identical features based on their hash values and compiles the features into optimized execution steps.
- Parameters:
feature_list (List[Feature]) – List of features to process.
- forward(data: Dict, remain_no_use_data: bool = True) Dict [source]
Process input data through all compiled feature pipelines.
Executes all features through their compiled execution steps, applying automatic operation fusion and managing data flow between steps.
- Parameters:
data (Dict) – Input data dictionary with feature names as keys.
remain_no_use_data (bool) – Whether to include unused input data in the output. Defaults to True.
- Returns:
- Processed output data with feature results and optionally
unused input data.
- Return type:
Dict
- Raises:
AssertionError – If input data is not a dictionary.
Feature
- class recis.features.feature.Feature(name: str)[source]
A feature processing pipeline that encapsulates a sequence of operations.
The Feature class represents a single feature in a machine learning pipeline, containing a sequence of operations that transform input data. Features can be compiled for optimization and provide hash-based caching for efficiency.
For example:
from recis.features import FeatureEngine from recis.features.feature import Feature from recis.features.op import SelectField, Hash, Bucketize # Define features features = [ Feature("user_id").add_op(SelectField("user_id")).add_op(Mod(10000)), Feature("age") .add_op(SelectField("age")) .add_op(Bucketize(boundaries=[18, 25, 35, 45, 55])), ] # Create feature engine feature_engine = FeatureEngine(features) # Data processing input_data = { "user_id": torch.LongTensor([1, 2, 3]), "age": torch.FloatTensor([20, 30, 40]), } output_data = feature_engine(input_data)
- __init__(name: str)[source]
Initialize a new feature with the given name.
- Parameters:
name (str) – The unique identifier name for this feature.
- add_op(op: _OP)[source]
Add an operation to this feature’s processing pipeline.
Operations are executed in the order they are added. The first operation must be a SelectField or SelectFields operation. Dependencies of the added operation are automatically included.
- Parameters:
op (_OP) – The operation to add to the pipeline.
- Returns:
This feature instance for method chaining.
- Return type:
- Raises:
ValueError – If the feature has already been compiled or if the first operation is not a SelectField/SelectFields operation.
- forward(data)[source]
Execute the feature processing pipeline on input data.
Applies all operations in the pipeline sequentially to transform the input data according to the feature definition.
- Parameters:
data – Input data to be processed. The format depends on the first operation in the pipeline.
- Returns:
The transformed data after applying all operations in sequence.
Feature Operations
Basic Operations
SelectField
- class recis.features.op.SelectField(name: str, dtype=torch.int64, dim=None, from_dict=True)[source]
Data input operation for extracting fields from input dictionaries.
This operation serves as the entry point for feature pipelines, extracting specific fields from input data dictionaries and optionally applying sequence processing operations.
Examples:
from recis.features.op import SelectField # ID Feature data_input = SelectField("user_id") # Sequence Feature data_input_sequence = SelectField("user_seq", dim=2)
- __init__(name: str, dtype=torch.int64, dim=None, from_dict=True)[source]
Initialize the data input operation.
- Parameters:
name (str) – Name of the input field to extract.
dtype (torch.dtype) – Expected data type. Defaults to torch.long.
dim (int, optional) – Dimension specification for RaggedTensor.
from_dict (bool) – Whether to extract from dictionary. Defaults to True.
- forward(data)[source]
Extract and process input data.
- Parameters:
data (Union[dict, torch.Tensor, RaggedTensor]) – Input data to process.
- Returns:
Processed input data.
- Return type:
Union[torch.Tensor, RaggedTensor]
SelectFields
- class recis.features.op.SelectFields(input_list: List[SelectField])[source]
Multi-field data input operation for processing multiple inputs simultaneously.
This operation applies multiple SelectField operations and returns their results as a list, enabling parallel processing of multiple input fields.
- input_list
List of SelectField operations to apply.
- Type:
List[SelectField]
Examples:
from recis.features.op import SelectFields multi_input = SelectFields(["user_id", "item_id", "category_id"])
- __init__(input_list: List[SelectField])[source]
Initialize the multi-data input operation.
- Parameters:
input_list (List[SelectField]) – List of SelectField operations to execute.
Hash Operations
Hash
- class recis.features.op.Hash(hash_type: str)[source]
Hash operation for applying hash functions to sequence data.
This operation applies either FarmHash or MurmurHash algorithms to RaggedTensor data, commonly used for feature hashing and dimensionality reduction in recommendation systems.
Examples:
from recis.features.op import Hash # Farm Hash hash_op = Hash(hash_type="farm") # Murmur Hash murmur_hash = Hash(hash_type="murmur")
- __init__(hash_type: str)[source]
Initialize the hash operation.
- Parameters:
hash_type (str) – Hash algorithm to use (“farm” or “murmur”).
- Raises:
AssertionError – If hash_type is not “farm” or “murmur”.
- forward(x: RaggedTensor | Tensor)[source]
Apply hash function to input RaggedTensor.
- Parameters:
x (Union[RaggedTensor, torch.Tensor]) – Input RaggedTensor to hash.
- Returns:
Hashed output with reduced dimensionality.
- Return type:
RaggedTensor
IDMultiHash
- class recis.features.op.IDMultiHash(num_buckets: List[int])[source]
Multi-hash operation for generating multiple hash values.
This operation applies multiple hash functions with different parameters to generate several hash values from a single input, useful for techniques like feature hashing and locality-sensitive hashing.
Examples:
from recis.features.op import IDMultiHash multi_hash = IDMultiHash(num_buckets=[20000, 20000, 10000, 500])
- __init__(num_buckets: List[int])[source]
Initialize the multi-hash operation.
- Parameters:
num_buckets (List[int]) – List of bucket counts for each hash function. Must contain at least one element.
- Raises:
AssertionError – If num_buckets is empty.
- forward(x: RaggedTensor | Tensor)[source]
Apply multi-hash operation to input data.
- Parameters:
x (Union[RaggedTensor, torch.Tensor]) – Input data to hash.
- Returns:
- Dictionary with keys ‘multi_hash_0’, ‘multi_hash_1’, etc.,
containing the results of each hash function.
- Return type:
Integer Modulo
- class recis.features.op.Mod(mod_value)[source]
Unsigned 64-bit integer modulo operation.
This operation applies modulo arithmetic to input values, treating them as unsigned 64-bit integers. Commonly used for hash bucketing and ID space reduction.
Examples:
from recis.features.op import Mod mod_op = Mod(mod_value=1000000)
- __init__(mod_value)[source]
Initialize the modulo operation.
- Parameters:
mod_value (int) – The modulo value for the operation.
- forward(x: RaggedTensor | Tensor)[source]
Apply modulo operation to input data.
- Parameters:
x (Union[RaggedTensor, torch.Tensor]) – Input tensor data.
- Returns:
Output with modulo applied to values.
- Return type:
Union[RaggedTensor, torch.Tensor]
Float Bucketing Operations
- class recis.features.op.Bucketize(boundary)[source]
Bucketize-based bucketing operation for continuous value discretization.
This operation maps continuous values to discrete bucket indices based on predefined boundary values. Values are assigned to buckets according to which boundaries they fall between.
- boundary
Sorted tensor of boundary values defining buckets.
- Type:
Examples:
from recis.features.op import Bucketize # age bucketing age_boundary = Bucketize( boundary=[18, 25, 35, 45, 55, 65], ) # inputs: [20, 30, 40, 50, 60] # outputs: [1, 2, 3, 4, 5] (bucket ID)
- __init__(boundary)[source]
Initialize the boundary operation.
- Parameters:
boundary (Union[List[float], torch.Tensor]) – Bucketize values for bucketing. Must be sorted in ascending order.
Sequence Truncation
- class recis.features.op.SequenceTruncate(seq_len=64, check_length=True, truncate=True, truncate_side='left', n_dims=2, dtype=torch.int64)[source]
Sequence processing operation for truncation.
This operation handles sequence data by applying truncation to ensure sequences meet specified length requirements. Supports both 2D and 3D sequence data with configurable truncation sides.
Examples:
from recis.features.op import SequenceTruncate SequenceTruncate( seq_len=20, truncate=True, truncate_side="right", check_length=False, n_dims=3, dtype=torch.int64, )
- __init__(seq_len=64, check_length=True, truncate=True, truncate_side='left', n_dims=2, dtype=torch.int64)[source]
Initialize the sequence processing operation.
- Parameters:
seq_len (int) – Target sequence length. Defaults to 64.
check_length (bool) – Whether to validate sequence length. Defaults to True.
truncate (bool) – Whether to apply truncation. Defaults to True.
truncate_side (str) – Truncation side (“left” or “right”). Defaults to “left”.
n_dims (int) – Number of input dimensions (2 or 3). Defaults to 2.
dtype (torch.dtype) – Data type of sequences. Defaults to torch.long.
- Raises:
AssertionError – If n_dims is not 2 or 3.
- forward(x: RaggedTensor | Tensor)[source]
Process sequences with truncation and padding.
- Parameters:
x (Union[RaggedTensor, torch.Tensor]) – Input sequence data.
- Returns:
Processed sequence data with target length.
- Return type:
RaggedTensor
- Raises:
AssertionError – If check_length is True and sequence exceeds target length.
Cross Features
FeatureCross
- class recis.features.op.FeatureCross[source]
Feature crossing operation for generating interaction features.
This operation creates cross features by combining two RaggedTensor inputs, generating new features that capture interactions between the original features.
Examples:
from recis.features.op import FeatureCross cross_op = FeatureCross()
- forward(data: List[RaggedTensor])[source]
Create cross features from two RaggedTensor inputs.
- Parameters:
data (List[RaggedTensor]) – List containing exactly two RaggedTensor inputs.
- Returns:
Cross feature tensor combining the input features.
- Return type:
RaggedTensor
- Raises:
AssertionError – If inputs are not RaggedTensors or if there aren’t exactly two inputs.
Advanced Usage
Custom Operations
You can inherit from base operation classes to implement custom feature processing:
from recis.features.op import _OP
class CustomNormalize(_OP):
def __init__(self, mean=0.0, std=1.0):
super().__init__()
self.mean = mean
self.std = std
def forward(self, x):
return (x - self.mean) / self.std
# Use custom operation
custom_feature = Feature("normalized_score").\
add_op(SelectField("score")),\
add_op(CustomNormalize(mean=0.5, std=0.2))