Source code for recis.features.feature

import hashlib
from typing import Dict, List

from torch import nn

from .op import _OP, SelectField, SelectFields


[docs] class Feature(nn.Module): """A feature processing pipeline that encapsulates a sequence of operations. The Feature class represents a single feature in a machine learning pipeline, containing a sequence of operations that transform input data. Features can be compiled for optimization and provide hash-based caching for efficiency. For example: .. code-block:: python from recis.features import FeatureEngine from recis.features.feature import Feature from recis.features.op import SelectField, Hash, Bucketize # Define features features = [ Feature("user_id").add_op(SelectField("user_id")).add_op(Mod(10000)), Feature("age") .add_op(SelectField("age")) .add_op(Bucketize(boundaries=[18, 25, 35, 45, 55])), ] # Create feature engine feature_engine = FeatureEngine(features) # Data processing input_data = { "user_id": torch.LongTensor([1, 2, 3]), "age": torch.FloatTensor([20, 30, 40]), } output_data = feature_engine(input_data) """
[docs] def __init__(self, name: str): """Initialize a new feature with the given name. Args: name (str): The unique identifier name for this feature. """ super().__init__() self._name = name self._compiled = False self._ops = nn.ModuleList() self._buffers_data = None self._input_flag = False
def set_buffers_data(self, data): self._buffers_data = data @property def id(self): return id(self) @property def name(self): return self._name @property def compiled(self): return self._compiled def compiled_(self, value: bool): assert isinstance(value, bool) self._compiled = value @property def ops(self) -> List[nn.Module]: return self._ops
[docs] def add_op(self, op: _OP): """Add an operation to this feature's processing pipeline. Operations are executed in the order they are added. The first operation must be a SelectField or SelectFields operation. Dependencies of the added operation are automatically included. Args: op (_OP): The operation to add to the pipeline. Returns: Feature: This feature instance for method chaining. Raises: ValueError: If the feature has already been compiled or if the first operation is not a SelectField/SelectFields operation. """ if self._compiled: raise ValueError(f"feature {self.name} has been compiled") if len(self._ops) == 0: if type(op) in [SelectField, SelectFields]: self._input_flag = True else: raise ValueError(f"feature {self.name} must start with DataInputOP") self._ops.append(op) return self
@staticmethod def from_json(json_obj: Dict): raise NotImplementedError def get_hash(self) -> int: if len(self._ops) == 0: return 0 op_hashes = [] for op in self._ops: op_hashes.append(op.get_hash()) combined_str = f"{self._name}:{sorted(op_hashes)}" hash_bytes = hashlib.sha256(combined_str.encode("utf-8")).digest() hash_value = int.from_bytes(hash_bytes[:8], byteorder="big", signed=True) return hash_value
[docs] def forward(self, data): """Execute the feature processing pipeline on input data. Applies all operations in the pipeline sequentially to transform the input data according to the feature definition. Args: data: Input data to be processed. The format depends on the first operation in the pipeline. Returns: The transformed data after applying all operations in sequence. """ x = data for op in self._ops: x = op(x) return x