Skip to main content
Version: Next

Flink Lookup Joins

Flink lookup joins are important because they enable efficient, real-time enrichment of streaming data with reference data, a common requirement in many real-time analytics and processing scenarios.

Lookup

Instructions

  • Use a primary key table as a dimension table, and the join condition must include all primary keys of the dimension table.
  • Fluss lookup join is in asynchronous mode by default for higher throughput. You can change the mode of lookup join as synchronous mode by setting the SQL Hint 'lookup.async' = 'false'.

Examples

  1. Create two tables.
Flink SQL
CREATE TABLE `fluss_catalog`.`my_db`.`orders` (
`o_orderkey` INT NOT NULL,
`o_custkey` INT NOT NULL,
`o_orderstatus` CHAR(1) NOT NULL,
`o_totalprice` DECIMAL(15, 2) NOT NULL,
`o_orderdate` DATE NOT NULL,
`o_orderpriority` CHAR(15) NOT NULL,
`o_clerk` CHAR(15) NOT NULL,
`o_shippriority` INT NOT NULL,
`o_comment` STRING NOT NULL,
`o_dt` STRING NOT NULL,
PRIMARY KEY (o_orderkey) NOT ENFORCED
);
Flink SQL
CREATE TABLE `fluss_catalog`.`my_db`.`customer` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` CHAR(15) NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` CHAR(10) NOT NULL,
`c_comment` STRING NOT NULL,
PRIMARY KEY (c_custkey) NOT ENFORCED
);
  1. Perform lookup join.
Flink SQL
USE CATALOG fluss_catalog;
Flink SQL
USE my_db;
Flink SQL
CREATE TEMPORARY TABLE lookup_join_sink
(
order_key INT NOT NULL,
order_totalprice DECIMAL(15, 2) NOT NULL,
customer_name STRING NOT NULL,
customer_address STRING NOT NULL
) WITH ('connector' = 'blackhole');
Flink SQL
-- look up join in asynchronous mode.
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
LEFT JOIN `customer`
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;
Flink SQL
-- look up join in synchronous mode.
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
LEFT JOIN `customer` /*+ OPTIONS('lookup.async' = 'false') */
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;

Examples (Partitioned Table)

Continuing from the previous example, if our dimension table is a Fluss partitioned primary key table, as follows:

Flink SQL
CREATE TABLE `fluss_catalog`.`my_db`.`customer_partitioned` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` CHAR(15) NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` CHAR(10) NOT NULL,
`c_comment` STRING NOT NULL,
`dt` STRING NOT NULL,
PRIMARY KEY (`c_custkey`, `dt`) NOT ENFORCED
)
PARTITIONED BY (`dt`)
WITH (
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'year'
);

To do a lookup join with the Fluss partitioned primary key table, we need to specify the primary keys (including partition key) in the join condition.

Flink SQL
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
LEFT JOIN `customer_partitioned`
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey` AND `o`.`o_dt` = `c`.`dt`;

For more details about Fluss partitioned table, see Partitioned Tables.

Prefix Lookup

Instructions

  • Use a primary key table as a dimension table, and the join condition must a prefix subset of the primary keys of the dimension table.
  • The bucket key of Fluss dimension table need to set as the join key when creating Fluss table.
  • Fluss prefix lookup join is in asynchronous mode by default for higher throughput. You can change the mode of prefix lookup join as synchronous mode by setting the SQL Hint 'lookup.async' = 'false'.

Examples

  1. Create two tables.
Flink SQL
CREATE TABLE `fluss_catalog`.`my_db`.`orders` (
`o_orderkey` INT NOT NULL,
`o_custkey` INT NOT NULL,
`o_orderstatus` CHAR(1) NOT NULL,
`o_totalprice` DECIMAL(15, 2) NOT NULL,
`o_orderdate` DATE NOT NULL,
`o_orderpriority` CHAR(15) NOT NULL,
`o_clerk` CHAR(15) NOT NULL,
`o_shippriority` INT NOT NULL,
`o_comment` STRING NOT NULL,
`o_dt` STRING NOT NULL,
PRIMARY KEY (o_orderkey) NOT ENFORCED
);
Flink SQL
-- primary keys are (c_custkey, c_nationkey)
-- bucket key is (c_custkey)
CREATE TABLE `fluss_catalog`.`my_db`.`customer` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` CHAR(15) NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` CHAR(10) NOT NULL,
`c_comment` STRING NOT NULL,
PRIMARY KEY (`c_custkey`, `c_nationkey`) NOT ENFORCED
) WITH (
'bucket.key' = 'c_custkey'
);
  1. Perform prefix lookup.
Flink SQL
USE CATALOG fluss_catalog;
Flink SQL
USE my_db;
Flink SQL
CREATE TEMPORARY TABLE lookup_join_sink
(
order_key INT NOT NULL,
order_totalprice DECIMAL(15, 2) NOT NULL,
customer_name STRING NOT NULL,
customer_address STRING NOT NULL
) WITH ('connector' = 'blackhole');
Flink SQL
-- prefix look up join in asynchronous mode.
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
LEFT JOIN `customer`
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;

-- join key is a prefix set of dimension table primary keys.
Flink SQL
-- prefix look up join in synchronous mode.
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
LEFT JOIN `customer` /*+ OPTIONS('lookup.async' = 'false') */
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey`;

Examples (Partitioned Table)

Continuing from the previous prefix lookup example, if our dimension table is a Fluss partitioned primary key table, as follows:

Flink SQL
-- primary keys are (c_custkey, c_nationkey, dt)
-- bucket key is (c_custkey)
CREATE TABLE `fluss_catalog`.`my_db`.`customer_partitioned` (
`c_custkey` INT NOT NULL,
`c_name` STRING NOT NULL,
`c_address` STRING NOT NULL,
`c_nationkey` INT NOT NULL,
`c_phone` CHAR(15) NOT NULL,
`c_acctbal` DECIMAL(15, 2) NOT NULL,
`c_mktsegment` CHAR(10) NOT NULL,
`c_comment` STRING NOT NULL,
`dt` STRING NOT NULL,
PRIMARY KEY (`c_custkey`, `c_nationkey`, `dt`) NOT ENFORCED
)
PARTITIONED BY (`dt`)
WITH (
'bucket.key' = 'c_custkey',
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'year'
);

To do a prefix lookup with the Fluss partitioned primary key table, the prefix lookup join key is in pattern of a prefix subset of primary keys (excluding partition key) + partition key.

Flink SQL
INSERT INTO lookup_join_sink
SELECT `o`.`o_orderkey`, `o`.`o_totalprice`, `c`.`c_name`, `c`.`c_address`
FROM
(SELECT `orders`.*, proctime() AS ptime FROM `orders`) AS `o`
LEFT JOIN `customer_partitioned`
FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON `o`.`o_custkey` = `c`.`c_custkey` AND `o`.`o_dt` = `c`.`dt`;

-- join key is a prefix set of dimension table primary keys (excluding partition key) + partition key.

For more details about Fluss partitioned table, see Partitioned Tables.

Lookup Options

Fluss lookup join supports various configuration options. For more details, please refer to the Connector Options page.