Feature Processing Module

RecIS’s Feature Processing module provides efficient and flexible feature engineering and preprocessing capabilities, supporting complex feature transformation pipelines and operator fusion optimization for high-performance feature processing solutions.

Core Features

High-Performance Feature Processing
  • Operator Fusion Optimization: Automatically identifies fusible operators and performs batch processing for significant performance improvements

  • GPU Accelerated Computing: Support for CUDA-accelerated core operators including hashing, bucketing, and cutoff

Feature Operators
  • Hash Operators: Provide FarmHash and MurmurHash algorithms for large-scale categorical feature processing

  • Bucketing Operators: Support for numerical feature discretization and boundary bucketing

  • Sequence Processing: Provide sequence truncation, padding, and length control functionality

  • Feature Crossing: Support for multi-feature cross combinations to generate new features

Flexible Execution Engine
  • Dynamic Compilation: Dynamic compilation and optimization of feature pipelines

  • Caching Mechanism: Feature computation result caching and reuse

Operator Fusion Optimization

RecIS’s feature processing module provides advanced operator fusion optimization mechanisms:

Automatic Fusion Recognition
  • Automatically identifies fusible operators of the same type

  • Batch processing improves GPU utilization

  • Reduces memory copying and kernel launch overhead

Supported Fusion Operators
  • FusedHashOP: Batch hash processing

  • FusedBoundaryOP: Batch bucketing processing

  • FusedModOP: Batch modulo operation processing

  • FusedCutoffOP: Batch sequence truncation processing

Core Components

FeatureEngine

class recis.features.FeatureEngine(feature_list: List[Feature])[source]

Main feature processing engine with automatic optimization.

The FeatureEngine manages a collection of features and their processing pipelines. It automatically optimizes execution through operation fusion, deduplication of identical features, and efficient step-by-step processing.

Key Features:
  • Automatic operation fusion for improved performance

  • Feature deduplication based on hash values

  • Step-by-step execution with dependency management

  • Support for both fused and individual operation execution

Example:

from recis.features.feature import Feature
from recis.features.op import SelectField, Hash, Mod

# simple feature
user_feature = Feature("user_id").\
    add_op(SelectField("user_id")).\
    add_op(Mod(10000))

# sequence feature
seq_feature = Feature("seq_item_id").\
    SequenceTruncate(seq_len=20,
                     truncate=True,
                     truncate_side="right",
                     check_length=False,
                     n_dims=3,
                     dtype=torch.int64).\
    add_op(Mod(10000))
__init__(feature_list: List[Feature])[source]

Initialize the feature engine with a list of features.

The engine automatically deduplicates identical features based on their hash values and compiles the features into optimized execution steps.

Parameters:

feature_list (List[Feature]) – List of features to process.

forward(data: Dict, remain_no_use_data: bool = True) Dict[source]

Process input data through all compiled feature pipelines.

Executes all features through their compiled execution steps, applying automatic operation fusion and managing data flow between steps.

Parameters:
  • data (Dict) – Input data dictionary with feature names as keys.

  • remain_no_use_data (bool) – Whether to include unused input data in the output. Defaults to True.

Returns:

Processed output data with feature results and optionally

unused input data.

Return type:

Dict

Raises:

AssertionError – If input data is not a dictionary.

Feature

class recis.features.feature.Feature(name: str)[source]

A feature processing pipeline that encapsulates a sequence of operations.

The Feature class represents a single feature in a machine learning pipeline, containing a sequence of operations that transform input data. Features can be compiled for optimization and provide hash-based caching for efficiency.

For example:

from recis.features import FeatureEngine
from recis.features.feature import Feature
from recis.features.op import SelectField, Hash, Bucketize

# Define features
features = [
    Feature("user_id").add_op(SelectField("user_id")).add_op(Mod(10000)),
    Feature("age")
    .add_op(SelectField("age"))
    .add_op(Bucketize(boundaries=[18, 25, 35, 45, 55])),
]

# Create feature engine
feature_engine = FeatureEngine(features)

# Data processing
input_data = {
    "user_id": torch.LongTensor([1, 2, 3]),
    "age": torch.FloatTensor([20, 30, 40]),
}

output_data = feature_engine(input_data)
__init__(name: str)[source]

Initialize a new feature with the given name.

Parameters:

name (str) – The unique identifier name for this feature.

add_op(op: _OP)[source]

Add an operation to this feature’s processing pipeline.

Operations are executed in the order they are added. The first operation must be a SelectField or SelectFields operation. Dependencies of the added operation are automatically included.

Parameters:

op (_OP) – The operation to add to the pipeline.

Returns:

This feature instance for method chaining.

Return type:

Feature

Raises:

ValueError – If the feature has already been compiled or if the first operation is not a SelectField/SelectFields operation.

forward(data)[source]

Execute the feature processing pipeline on input data.

Applies all operations in the pipeline sequentially to transform the input data according to the feature definition.

Parameters:

data – Input data to be processed. The format depends on the first operation in the pipeline.

Returns:

The transformed data after applying all operations in sequence.

Feature Operations

Basic Operations

SelectField

class recis.features.op.SelectField(name: str, dtype=torch.int64, dim=None, from_dict=True)[source]

Data input operation for extracting fields from input dictionaries.

This operation serves as the entry point for feature pipelines, extracting specific fields from input data dictionaries and optionally applying sequence processing operations.

Examples:

from recis.features.op import SelectField

# ID Feature
data_input = SelectField("user_id")

# Sequence Feature
data_input_sequence = SelectField("user_seq", dim=2)
__init__(name: str, dtype=torch.int64, dim=None, from_dict=True)[source]

Initialize the data input operation.

Parameters:
  • name (str) – Name of the input field to extract.

  • dtype (torch.dtype) – Expected data type. Defaults to torch.long.

  • dim (int, optional) – Dimension specification for RaggedTensor.

  • from_dict (bool) – Whether to extract from dictionary. Defaults to True.

forward(data)[source]

Extract and process input data.

Parameters:

data (Union[dict, torch.Tensor, RaggedTensor]) – Input data to process.

Returns:

Processed input data.

Return type:

Union[torch.Tensor, RaggedTensor]

SelectFields

class recis.features.op.SelectFields(input_list: List[SelectField])[source]

Multi-field data input operation for processing multiple inputs simultaneously.

This operation applies multiple SelectField operations and returns their results as a list, enabling parallel processing of multiple input fields.

input_list

List of SelectField operations to apply.

Type:

List[SelectField]

Examples:

from recis.features.op import SelectFields

multi_input = SelectFields(["user_id", "item_id", "category_id"])
__init__(input_list: List[SelectField])[source]

Initialize the multi-data input operation.

Parameters:

input_list (List[SelectField]) – List of SelectField operations to execute.

forward(data)[source]

Process input data through multiple SelectField operations.

Parameters:

data – Input data to be processed by all SelectField operations.

Returns:

List of results from each SelectField operation.

Return type:

List

Hash Operations

Hash

class recis.features.op.Hash(hash_type: str)[source]

Hash operation for applying hash functions to sequence data.

This operation applies either FarmHash or MurmurHash algorithms to RaggedTensor data, commonly used for feature hashing and dimensionality reduction in recommendation systems.

hash_type

Type of hash function (“farm” or “murmur”).

Type:

str

Examples:

from recis.features.op import Hash

# Farm Hash
hash_op = Hash(hash_type="farm")

# Murmur Hash
murmur_hash = Hash(hash_type="murmur")
__init__(hash_type: str)[source]

Initialize the hash operation.

Parameters:

hash_type (str) – Hash algorithm to use (“farm” or “murmur”).

Raises:

AssertionError – If hash_type is not “farm” or “murmur”.

forward(x: RaggedTensor | Tensor)[source]

Apply hash function to input RaggedTensor.

Parameters:

x (Union[RaggedTensor, torch.Tensor]) – Input RaggedTensor to hash.

Returns:

Hashed output with reduced dimensionality.

Return type:

RaggedTensor

IDMultiHash

class recis.features.op.IDMultiHash(num_buckets: List[int])[source]

Multi-hash operation for generating multiple hash values.

This operation applies multiple hash functions with different parameters to generate several hash values from a single input, useful for techniques like feature hashing and locality-sensitive hashing.

Examples:

from recis.features.op import IDMultiHash

multi_hash = IDMultiHash(num_buckets=[20000, 20000, 10000, 500])
__init__(num_buckets: List[int])[source]

Initialize the multi-hash operation.

Parameters:

num_buckets (List[int]) – List of bucket counts for each hash function. Must contain at least one element.

Raises:

AssertionError – If num_buckets is empty.

forward(x: RaggedTensor | Tensor)[source]

Apply multi-hash operation to input data.

Parameters:

x (Union[RaggedTensor, torch.Tensor]) – Input data to hash.

Returns:

Dictionary with keys ‘multi_hash_0’, ‘multi_hash_1’, etc.,

containing the results of each hash function.

Return type:

dict

Integer Modulo

class recis.features.op.Mod(mod_value)[source]

Unsigned 64-bit integer modulo operation.

This operation applies modulo arithmetic to input values, treating them as unsigned 64-bit integers. Commonly used for hash bucketing and ID space reduction.

mod

The modulo value to apply.

Type:

int

Examples:

from recis.features.op import Mod

mod_op = Mod(mod_value=1000000)
__init__(mod_value)[source]

Initialize the modulo operation.

Parameters:

mod_value (int) – The modulo value for the operation.

forward(x: RaggedTensor | Tensor)[source]

Apply modulo operation to input data.

Parameters:

x (Union[RaggedTensor, torch.Tensor]) – Input tensor data.

Returns:

Output with modulo applied to values.

Return type:

Union[RaggedTensor, torch.Tensor]

Float Bucketing Operations

class recis.features.op.Bucketize(boundary)[source]

Bucketize-based bucketing operation for continuous value discretization.

This operation maps continuous values to discrete bucket indices based on predefined boundary values. Values are assigned to buckets according to which boundaries they fall between.

boundary

Sorted tensor of boundary values defining buckets.

Type:

torch.Tensor

Examples:

from recis.features.op import Bucketize

# age bucketing
age_boundary = Bucketize(
    boundary=[18, 25, 35, 45, 55, 65],
)

# inputs: [20, 30, 40, 50, 60]
# outputs: [1, 2, 3, 4, 5]  (bucket ID)
__init__(boundary)[source]

Initialize the boundary operation.

Parameters:

boundary (Union[List[float], torch.Tensor]) – Bucketize values for bucketing. Must be sorted in ascending order.

Sequence Truncation

class recis.features.op.SequenceTruncate(seq_len=64, check_length=True, truncate=True, truncate_side='left', n_dims=2, dtype=torch.int64)[source]

Sequence processing operation for truncation.

This operation handles sequence data by applying truncation to ensure sequences meet specified length requirements. Supports both 2D and 3D sequence data with configurable truncation sides.

Examples:

from recis.features.op import SequenceTruncate

SequenceTruncate(
    seq_len=20,
    truncate=True,
    truncate_side="right",
    check_length=False,
    n_dims=3,
    dtype=torch.int64,
)
__init__(seq_len=64, check_length=True, truncate=True, truncate_side='left', n_dims=2, dtype=torch.int64)[source]

Initialize the sequence processing operation.

Parameters:
  • seq_len (int) – Target sequence length. Defaults to 64.

  • check_length (bool) – Whether to validate sequence length. Defaults to True.

  • truncate (bool) – Whether to apply truncation. Defaults to True.

  • truncate_side (str) – Truncation side (“left” or “right”). Defaults to “left”.

  • n_dims (int) – Number of input dimensions (2 or 3). Defaults to 2.

  • dtype (torch.dtype) – Data type of sequences. Defaults to torch.long.

Raises:

AssertionError – If n_dims is not 2 or 3.

forward(x: RaggedTensor | Tensor)[source]

Process sequences with truncation and padding.

Parameters:

x (Union[RaggedTensor, torch.Tensor]) – Input sequence data.

Returns:

Processed sequence data with target length.

Return type:

RaggedTensor

Raises:

AssertionError – If check_length is True and sequence exceeds target length.

Cross Features

FeatureCross

class recis.features.op.FeatureCross[source]

Feature crossing operation for generating interaction features.

This operation creates cross features by combining two RaggedTensor inputs, generating new features that capture interactions between the original features.

Examples:

from recis.features.op import FeatureCross

cross_op = FeatureCross()
__init__()[source]

Initialize the feature cross operation.

forward(data: List[RaggedTensor])[source]

Create cross features from two RaggedTensor inputs.

Parameters:

data (List[RaggedTensor]) – List containing exactly two RaggedTensor inputs.

Returns:

Cross feature tensor combining the input features.

Return type:

RaggedTensor

Raises:

AssertionError – If inputs are not RaggedTensors or if there aren’t exactly two inputs.

Advanced Usage

Custom Operations

You can inherit from base operation classes to implement custom feature processing:

from recis.features.op import _OP

class CustomNormalize(_OP):
    def __init__(self, mean=0.0, std=1.0):
        super().__init__()
        self.mean = mean
        self.std = std

    def forward(self, x):
        return (x - self.mean) / self.std

# Use custom operation
custom_feature = Feature("normalized_score").\
                     add_op(SelectField("score")),\
                     add_op(CustomNormalize(mean=0.5, std=0.2))