Source code for recis.fg.fg_parser

import copy
import json
from collections import OrderedDict
from dataclasses import dataclass
from enum import IntEnum
from typing import Dict, List, Optional

import torch

from recis.fg.utils import dict_lower_case
from recis.io.dataset_config import FeatureIOConf
from recis.utils.logger import Logger


FG_FEATURE_KEY = "features"
FEATURE_NAME_KEY = "feature_name"
SEQUENCE_NAME_KEY = "sequence_name"
SEQUENCE_LENGTH_KEY = "sequence_length"

value_type_key = "value_type"
hash_bucket_key = "hash_bucket_size"
hash_type_key = "hash_type"
shared_name_key = "shared_name"
feature_type_key = "feature_type"
value_dim_key = "value_dimension"
boundaries_key = "boundaries"
compress_strategy_key = "compress_strategy"
combiner_key = "combiner"
emb_dim_key = "embedding_dimension"
trainable_key = "trainable"
emb_device_key = "emb_device"
emb_type_key = "emb_type"
admit_hook_key = "admit_hook"
filter_hook_key = "filter_hook"

gen_key_type_key = "gen_key_type"
gen_value_type_key = "gen_val_type"
from_feature_key = "from_feature"

HASH_TYPE_MAP = {"farmhash": "farm", "murmur": "murmur"}
VALUE_TYPE_MAP = {"string": torch.int8, "double": torch.float32, "integer": torch.int64}
EMB_TYPE_MAP = {
    "float": torch.float32,
    "fp16": torch.float16,
    "bf16": torch.bfloat16,
    "int8": torch.int8,
}

logger = Logger(__name__)


class IdTransformType(IntEnum):
    """Enumeration of ID transformation types for feature processing.

    This enum defines the different ways feature IDs can be transformed
    during the feature processing pipeline. Each transformation type
    corresponds to a specific operation applied to input feature values.

    Attributes:
        RAW (int): No transformation, use raw values as-is.
        BUCKETIZE (int): Apply bucketization based on predefined boundaries.
        HASH (int): Apply hash function to convert values to integers.
        MOD (int): Apply modulo operation for bucket assignment.
        MASK (int): Apply mask operation (not currently supported).
        MULTIHASH (int): Apply multiple hash functions for advanced embedding.
        HASH_MULTIHASH (int): Combine hash and multi-hash transformations.
        MOD_MULTIHASH (int): Combine modulo and multi-hash transformations.
    """

    RAW = 1
    BUCKETIZE = 2
    HASH = 3
    MOD = 4
    MASK = 5  # TODO(yuhuan.zh) not support
    MULTIHASH = 6
    HASH_MULTIHASH = 7
    MOD_MULTIHASH = 8


class EmbTransformType(IntEnum):
    """Enumeration of embedding transformation types.

    This enum defines how features are transformed into embeddings
    after ID transformation. It determines the final representation
    format for features in the embedding space.

    Attributes:
        RAW (int): No embedding transformation, use raw values.
        LOOKUP (int): Standard embedding lookup from embedding tables.
        MULTIHASH_LOOKUP (int): Multi-hash embedding lookup for advanced strategies.
    """

    RAW = 1
    LOOKUP = 2
    MULTIHASH_LOOKUP = 3


@dataclass
class FGConf:
    """Configuration dataclass for feature generation settings.

    This dataclass holds all configuration parameters needed for feature
    generation, including transformation settings, embedding parameters,
    and various feature-specific options.

    Attributes:
        name (str): Name of the feature.
        is_sparse (bool): Whether the feature is sparse.
        gen_key_type (str): Type of key generation.
        gen_value_type (str): Type of value generation.
        need_hash (bool): Whether hashing is needed. Defaults to False.
        hash_type (str): Type of hash function to use. Defaults to "farm".
        hash_bucket_size (int): Size of hash buckets. Defaults to 0.
        is_seq (bool): Whether this is a sequence feature. Defaults to False.
        seq_length (int): Length of sequences. Defaults to 0.
        value_dimension (int): Dimension of feature values. Defaults to 1.
        combiner (str): Combiner strategy for embeddings. Defaults to "mean".
        embedding_dim (Optional[int]): Embedding dimension. Defaults to None.
        value_type (Optional[str]): Type of feature values. Defaults to None.
        boundaries (Optional[List[float]]): Boundaries for bucketization. Defaults to None.
        compress_strategy (Optional[str]): Compression strategy. Defaults to None.
        shared_name (Optional[str]): Shared embedding name. Defaults to None.
        from_feature (Optional[str]): Source feature for copying. Defaults to None.
        emb_device (Optional[str]): Device for embeddings. Defaults to None.
        emb_type (Optional[str]): Embedding data type. Defaults to None.
        trainable (bool): Whether embeddings are trainable. Defaults to True.
        admit_hook (Optional[Dict[str, str]]): Admission hook configuration. Defaults to None.
        filter_hook (Optional[Dict[str, str]]): Filter hook configuration. Defaults to None.
    """

    name: str
    is_sparse: bool
    gen_key_type: str
    gen_value_type: str
    need_hash: bool = False
    hash_type: str = "farm"
    hash_bucket_size: int = 0
    is_seq: bool = False
    seq_length: int = 0
    value_dimension: int = 1
    combiner: str = "mean"
    embedding_dim: Optional[int] = None
    value_type: Optional[str] = None
    boundaries: Optional[List[float]] = None
    compress_strategy: Optional[str] = None
    shared_name: Optional[str] = None
    from_feature: Optional[str] = None
    emb_device: Optional[str] = None
    emb_type: Optional[str] = None
    trainable: bool = True
    admit_hook: Optional[Dict[str, str]] = None
    filter_hook: Optional[Dict[str, str]] = None


@dataclass
class FeatureEmbConf:
    """Configuration dataclass for feature embedding settings.

    This dataclass contains all parameters needed to configure feature
    embeddings, including transformation types, dimensions, and various
    embedding-specific options.

    Attributes:
        io_name (str): Name used for I/O operations.
        out_name (str): Output name for the feature.
        id_transform_type (IdTransformType): Type of ID transformation.
        emb_transform_type (EmbTransformType): Type of embedding transformation.
        hash_type (str): Hash function type. Defaults to "farm".
        hash_bucket_size (int): Hash bucket size. Defaults to 0.
        seq_length (int): Sequence length for sequence features. Defaults to 0.
        raw_dim (Optional[int]): Raw dimension of the feature. Defaults to None.
        combiner (str): Embedding combiner strategy. Defaults to "mean".
        embedding_dim (Optional[int]): Embedding dimension. Defaults to None.
        dtype (Optional[torch.dtype]): Data type for the feature. Defaults to None.
        boundaries (Optional[List[float]]): Bucketization boundaries. Defaults to None.
        compress_strategy (Optional[str]): Compression strategy. Defaults to None.
        shared_name (Optional[str]): Shared embedding name. Defaults to None.
        emb_device (Optional[str]): Embedding device. Defaults to None.
        emb_type (Optional[torch.dtype]): Embedding data type. Defaults to None.
        trainable (bool): Whether embedding is trainable. Defaults to True.
        admit_hook (Optional[Dict[str, str]]): Admission hook config. Defaults to None.
        filter_hook (Optional[Dict[str, str]]): Filter hook config. Defaults to None.
    """

    io_name: str
    out_name: str
    id_transform_type: IdTransformType
    emb_transform_type: EmbTransformType
    hash_type: str = "farm"
    hash_bucket_size: int = 0
    seq_length: int = 0
    raw_dim: Optional[int] = None
    combiner: str = "mean"
    embedding_dim: Optional[int] = None
    dtype: Optional["torch.dtype"] = None
    boundaries: Optional[List[float]] = None
    compress_strategy: Optional[str] = None
    shared_name: Optional[str] = None
    emb_device: Optional[str] = None
    emb_type: Optional["torch.dtype"] = None
    trainable: bool = True
    admit_hook: Optional[Dict[str, str]] = None
    filter_hook: Optional[Dict[str, str]] = None


[docs] class FGParser: """Feature Generation configuration parser and processor. The FGParser class is responsible for parsing feature generation configuration files, processing feature definitions, and creating structured configurations for the feature generation pipeline. It handles both regular and sequence features, applies various transformations, and manages feature filtering based on model configuration. Key Features: - Parse JSON configuration files for feature definitions - Filter features based on model configuration requirements - Handle sequence features with proper length and structure - Support feature copying and inheritance - Generate I/O and embedding configurations - Validate and transform feature parameters Attributes: already_hashed (bool): Whether input features are already hashed. hash_in_io (bool): Whether to perform hashing in I/O layer. mc_parser: Model configuration parser instance. devel_mode (bool): Whether development mode is enabled. multihash_conf_ (dict): Multi-hash configuration dictionary. fg_conf (list): Parsed feature generation configuration. parsed_conf_ (list): Processed feature configurations. io_conf_ (dict): I/O configuration dictionary. emb_conf_ (dict): Embedding configuration dictionary. """
[docs] def __init__( self, conf_file_path, mc_parser, already_hashed=False, hash_in_io=False, lower_case=False, devel_mode=False, ): """Initialize the FG Parser. Args: conf_file_path (str): Path to the feature generation configuration file. mc_parser: Model configuration parser instance. already_hashed (bool, optional): Whether features are already hashed. Defaults to False. hash_in_io (bool, optional): Whether to hash in I/O layer. Defaults to False. lower_case (bool, optional): Whether to convert keys to lowercase. Defaults to False. devel_mode (bool, optional): Whether to enable development mode. Defaults to False. """ self.already_hashed = already_hashed self.hash_in_io = hash_in_io self.mc_parser = mc_parser self.devel_mode = devel_mode self.multihash_conf_ = {} self.fg_path = conf_file_path self.lower_case = lower_case self.fg_conf = self._init_fg(conf_file_path, lower_case) self.parsed_conf_ = self._parse_feature_conf() self.io_conf_ = self._init_io_conf() self.emb_conf_ = self._init_emb_conf()
@property def feature_blocks(self): """Get feature blocks from the model configuration parser. Returns: dict: Dictionary mapping block names to feature lists. """ return self.mc_parser.feature_blocks @property def io_configs(self): """Get I/O configurations for all features. Returns: dict: Dictionary mapping feature names to I/O configurations. """ return self.io_conf_ @property def emb_configs(self): """Get embedding configurations for all features. Returns: dict: Dictionary mapping feature names to embedding configurations. """ return self.emb_conf_ @property def seq_block_names(self): """Get sequence block names from the model configuration parser. Returns: list: List of sequence block names. """ return self.mc_parser.seq_block_names @property def multihash_conf(self): """Get multi-hash configuration dictionary. Returns: dict: Multi-hash configuration settings. """ return self.multihash_conf_ def get_mc_conf(self): """Get mc configuration dictionary. Returns: dict: mc configuration settings. """ return self.mc_parser.mc_conf def get_fg_conf(self): """Get fg configuration dictionary. Returns: dict: fg configuration settings. """ return self._load_fg_conf(self.fg_path, self.lower_case) def _init_fg(self, fg_path, lower_case): """Initialize feature generation configuration. Args: fg_path (str): Path to the FG configuration file. lower_case (bool): Whether to convert keys to lowercase. Returns: list: Processed and filtered feature configuration list. """ fg = self._load_fg_conf(fg_path, lower_case) fg = fg[FG_FEATURE_KEY] self._build_mc(fg) fg = self._filter_fg(fg) return fg def _load_fg_conf(self, fg_path, lower_case): """Load feature generation configuration from file. Args: fg_path (str): Path to the configuration file. lower_case (bool): Whether to convert keys to lowercase. Returns: list: Raw feature configuration list. """ with open(fg_path) as f: fg = json.load(f) fg = dict_lower_case(fg, lower_case) return fg def _build_mc(self, fg): """Build model configuration from feature generation config. Args: fg (list): Feature generation configuration list. """ candidate_seq_blocks = {} for fea_conf in fg: if self._is_seq(fea_conf): candidate_seq_blocks[fea_conf[SEQUENCE_NAME_KEY]] = fea_conf[ FEATURE_NAME_KEY ] self.mc_parser.init_blocks(candidate_seq_blocks) def _filter_fg(self, fg): """Filter feature configuration based on model configuration. Args: fg (list): Raw feature configuration list. Returns: list: Filtered feature configuration list. """ filter_fg = [] for fea_conf in fg: if self._is_seq(fea_conf): fea_name = fea_conf[SEQUENCE_NAME_KEY] else: fea_name = fea_conf[FEATURE_NAME_KEY] if self.mc_parser.has_fea(fea_name): if self._is_seq(fea_conf): seq_fg = copy.deepcopy(fea_conf) seq_fg[FG_FEATURE_KEY] = [] for seq_fea_conf in fea_conf[FG_FEATURE_KEY]: seq_fea_name = seq_fea_conf[FEATURE_NAME_KEY] if self.mc_parser.has_seq_fea(fea_name, seq_fea_name): seq_fg[FG_FEATURE_KEY].append(seq_fea_conf) filter_fg.append(seq_fg) else: filter_fg.append(fea_conf) return filter_fg def _is_feature_copy(self, fea_conf): """Check if a feature configuration is a copy of another feature. Args: fea_conf (dict): Feature configuration dictionary. Returns: tuple: (is_copy: bool, copy_name: str or None) """ copy_name = fea_conf.get(from_feature_key, None) is_copy = copy_name is not None return is_copy, copy_name def _is_seq(self, fea_conf): """Check if a feature configuration represents a sequence feature. Args: fea_conf (dict): Feature configuration dictionary. Returns: bool: True if it's a sequence feature, False otherwise. """ return SEQUENCE_NAME_KEY in fea_conf
[docs] def get_seq_len(self, fea_name): """Get sequence length for a sequence feature. Args: fea_name (str): Name of the sequence feature. Returns: int: Sequence length of the feature. Raises: RuntimeError: If the feature is not a sequence feature. """ if fea_name in self.emb_conf_: return self.emb_conf_[fea_name].seq_length else: raise RuntimeError(f"feature: {fea_name} is not a seq feature")
def _parse_feature_conf(self): """Parse all feature configurations into structured format. Returns: list: List of parsed feature configuration objects. """ parsed_conf = [] for fea_conf in self.fg_conf: if self._is_seq(fea_conf): seq_len = fea_conf[SEQUENCE_LENGTH_KEY] seq_name = fea_conf[SEQUENCE_NAME_KEY] for sub_fea_conf in fea_conf[FG_FEATURE_KEY]: fc = self._parse_fg( sub_fea_conf, seq_len=seq_len, seq_prefix=seq_name ) parsed_conf.append(fc) else: fc = self._parse_fg(fea_conf, seq_len=0) parsed_conf.append(fc) return parsed_conf def _parse_fg(self, fea_conf, seq_len=0, seq_prefix=""): """Parse a single feature configuration. Args: fea_conf (dict): Feature configuration dictionary. seq_len (int, optional): Sequence length. Defaults to 0. seq_prefix (str, optional): Sequence name prefix. Defaults to "". Returns: FGConf: Parsed feature configuration object. """ name = fea_conf[FEATURE_NAME_KEY] if seq_len > 0: name = seq_prefix + "_" + name is_sparse = fea_conf[feature_type_key].lower() != "raw_feature" gen_key_type = fea_conf[gen_key_type_key] gen_val_type = fea_conf[gen_value_type_key] # TODO(yuhuan.zh) maybe no need hash? need_hash = fea_conf[value_type_key].lower() == "string" hash_type = fea_conf.get(hash_type_key, "farmhash") hash_type = HASH_TYPE_MAP[hash_type] # TODO(yuhuan.zh) support change conflict to non-conf hash_bucket_size = fea_conf.get(hash_bucket_key, 0) is_seq = seq_len > 0 seq_length = seq_len value_dimension = fea_conf.get(value_dim_key, 1) combiner = fea_conf.get(combiner_key, "mean") embedding_dim = fea_conf.get(emb_dim_key, None) value_type = fea_conf[value_type_key].lower() boundaries = fea_conf.get(boundaries_key, None) boundaries = ( boundaries if boundaries is None else list(map(float, boundaries.split(","))) ) compress_strategy = fea_conf.get(compress_strategy_key, None) # add to multihash configs if compress_strategy is not None: self.multihash_conf_[name] = compress_strategy shared_name = fea_conf.get(shared_name_key, name) _, from_feature = self._is_feature_copy(fea_conf) emb_device = fea_conf.get(emb_device_key, None) emb_type = fea_conf.get(emb_type_key, None) trainable = fea_conf.get(trainable_key, True) admit_hook = fea_conf.get(admit_hook_key, None) filter_hook = fea_conf.get(filter_hook_key, None) return FGConf( name, is_sparse, gen_key_type, gen_val_type, need_hash, hash_type, hash_bucket_size, is_seq=is_seq, seq_length=seq_length, value_dimension=value_dimension, combiner=combiner, embedding_dim=embedding_dim, value_type=value_type, boundaries=boundaries, compress_strategy=compress_strategy, shared_name=shared_name, from_feature=from_feature, emb_device=emb_device, emb_type=emb_type, trainable=trainable, admit_hook=admit_hook, filter_hook=filter_hook, ) def _parse_emb_type(self, fea_conf): """Parse embedding transformation type from feature configuration. Args: fea_conf (FGConf): Feature configuration object. Returns: EmbTransformType: Parsed embedding transformation type. Raises: RuntimeError: If the gen_value_type is not supported. """ trans_t = None if fea_conf.gen_value_type == "lookup": trans_t = EmbTransformType.LOOKUP elif fea_conf.gen_value_type == "multihash_lookup": trans_t = EmbTransformType.MULTIHASH_LOOKUP elif fea_conf.gen_value_type == "idle": trans_t = EmbTransformType.RAW else: raise RuntimeError( f"Not support gen_value_type: {fea_conf.gen_value_type} in feature {fea_conf}" ) return trans_t def _parse_id_type(self, fea_conf): """Parse ID transformation type from feature configuration. This method determines the appropriate ID transformation type based on the feature's gen_key_type and the parser's configuration settings (already_hashed, hash_in_io, devel_mode). Args: fea_conf (FGConf): Feature configuration object. Returns: IdTransformType: Parsed ID transformation type. Raises: NotImplementedError: If mask type is used in non-development mode. RuntimeError: If the gen_key_type is not supported. """ trans_t = None if fea_conf.gen_key_type == "idle": trans_t = IdTransformType.RAW elif fea_conf.gen_key_type == "boundary": trans_t = IdTransformType.BUCKETIZE elif fea_conf.gen_key_type == "hash": if self.already_hashed: if fea_conf.hash_bucket_size > 0: trans_t = IdTransformType.MOD else: trans_t = IdTransformType.RAW elif self.hash_in_io: trans_t = IdTransformType.RAW else: trans_t = IdTransformType.HASH elif fea_conf.gen_key_type == "mask": # TODO(yuhuan.zh) support mask feature if self.devel_mode: trans_t = IdTransformType.RAW else: trans_t = IdTransformType.MASK raise NotImplementedError("not support gen_key type: mask yet!") elif fea_conf.gen_key_type == "multihash": if self.already_hashed: if fea_conf.hash_bucket_size > 0: trans_t = IdTransformType.MOD_MULTIHASH else: trans_t = IdTransformType.MULTIHASH elif self.hash_in_io: trans_t = IdTransformType.MULTIHASH else: trans_t = IdTransformType.HASH_MULTIHASH else: raise RuntimeError( f"Not support gen_key_type: {fea_conf.gen_key_type} in feature {fea_conf}" ) return trans_t def _parse_dtype_dim(self, fea_conf): """Parse data type and dimension from feature configuration. This method determines the appropriate PyTorch data type and dimension for a feature based on its value type and configuration settings. Args: fea_conf (FGConf): Feature configuration object. Returns: tuple: A tuple containing (dtype: torch.dtype, dim: int or None). Raises: NotImplementedError: If string type feature uses idle or mask gen_key_type in non-development mode. """ dtype = VALUE_TYPE_MAP[fea_conf.value_type] dim = fea_conf.value_dimension if fea_conf.value_type == "string": dim = None if self.already_hashed or self.hash_in_io: dtype = torch.int64 # TODO(yuhuan.zh) support string input raw / mask feature if fea_conf.gen_key_type in ["idle", "mask"]: dim = fea_conf.value_dimension if self.devel_mode: logger.warning( f"String type feature: {fea_conf} not support idle or mask yet, maybe get wrong value" ) else: raise NotImplementedError( f"String type feature: {fea_conf} not support idle or mask yet." ) return dtype, dim def _init_emb_conf(self): """Initialize embedding configurations for all parsed features. This method creates FeatureEmbConf objects for each parsed feature configuration, determining the appropriate transformation types, data types, and other embedding parameters. Returns: OrderedDict: Dictionary mapping feature names to FeatureEmbConf objects. """ emb_conf = OrderedDict() for fea_conf in self.parsed_conf_: id_type = self._parse_id_type(fea_conf) emb_type = self._parse_emb_type(fea_conf) dtype, dim = self._parse_dtype_dim(fea_conf) ec = FeatureEmbConf( io_name=fea_conf.from_feature if fea_conf.from_feature is not None else fea_conf.name, out_name=fea_conf.name, id_transform_type=id_type, emb_transform_type=emb_type, embedding_dim=fea_conf.embedding_dim, raw_dim=dim, shared_name=fea_conf.shared_name, hash_bucket_size=fea_conf.hash_bucket_size, hash_type=fea_conf.hash_type, boundaries=fea_conf.boundaries, compress_strategy=fea_conf.compress_strategy, combiner=fea_conf.combiner, seq_length=fea_conf.seq_length, dtype=dtype, emb_device=fea_conf.emb_device, emb_type=EMB_TYPE_MAP[fea_conf.emb_type] if fea_conf.emb_type is not None else None, trainable=fea_conf.trainable, admit_hook=fea_conf.admit_hook, filter_hook=fea_conf.filter_hook, ) emb_conf[fea_conf.name] = ec return emb_conf def _init_io_conf(self): """Initialize I/O configurations for all parsed features. This method creates FeatureIOConf objects for each parsed feature configuration, determining the appropriate I/O parameters such as variable length format, hash settings, and dimensions. Returns: OrderedDict: Dictionary mapping feature names to FeatureIOConf objects. """ io_conf = OrderedDict() for fea_conf in self.parsed_conf_: real_name = ( fea_conf.name if fea_conf.from_feature is None else fea_conf.from_feature ) varlen = self._is_io_sparse(fea_conf) hash_type, trans_int, hash_bucket = self._get_io_hash_args(fea_conf) fc = FeatureIOConf( name=real_name, varlen=varlen, hash_type=hash_type, hash_bucket_size=hash_bucket, trans_int=trans_int, dim=fea_conf.value_dimension, ) io_conf[real_name] = fc return io_conf def _is_io_sparse(self, conf): """Determine if a feature should use sparse I/O format. Args: conf (FGConf): Feature configuration object. Returns: bool: True if the feature should use sparse format, False otherwise. """ # cannot convert bucketize features to sparse sparse_format = conf.is_sparse or conf.is_seq return sparse_format def _get_io_hash_args(self, conf): """Get I/O hash arguments for a feature configuration. This method determines the appropriate hash settings for I/O operations based on the feature configuration and parser settings. Args: conf (FGConf): Feature configuration object. Returns: tuple: A tuple containing (hash_type: str or None, trans_int: bool, hash_bucket: int). """ need_hash = conf.need_hash hash_bucket = conf.hash_bucket_size hash_type = conf.hash_type if need_hash else None trans_int = False if self.already_hashed: hash_type = None hash_bucket = 0 trans_int = False elif need_hash and (not self.hash_in_io): hash_type = None hash_bucket = 0 trans_int = True return hash_type, trans_int, hash_bucket