Skip to main content
Version: Next

Flink Writes

You can directly insert or update data into a Fluss table using the INSERT INTO statement. Fluss primary key tables can accept all types of messages (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE), while Fluss log table can only accept INSERT type messages.

INSERT INTO

INSERT INTO statements are used to write data to Fluss tables. They support both streaming and batch modes and are compatible with primary-key tables (for upserting data) as well as log tables (for appending data).

Appending Data to the Log Table

Create a Log table.

Flink SQL
CREATE TABLE log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
);

Insert data into the Log table.

Flink SQL
CREATE TEMPORARY TABLE source (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) WITH ('connector' = 'datagen');
Flink SQL
INSERT INTO log_table
SELECT * FROM source;

Perform Data Upserts to the PrimaryKey Table.

Create a primary key table.

Flink SQL
CREATE TABLE pk_table (
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (shop_id, user_id) NOT ENFORCED
);

Updates All Columns

Flink SQL
CREATE TEMPORARY TABLE source (
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT
) WITH ('connector' = 'datagen');
Flink SQL
INSERT INTO pk_table
SELECT * FROM source;

Partial Updates

Flink SQL
CREATE TEMPORARY TABLE source (
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT
) WITH ('connector' = 'datagen');
Flink SQL
-- only partial-update the num_orders column
INSERT INTO pk_table (shop_id, user_id, num_orders)
SELECT shop_id, user_id, num_orders FROM source;

DELETE FROM

Fluss supports deleting data for primary-key tables in batch mode via DELETE FROM statement. Currently, only single data deletions based on the primary key are supported.

  • the primary key table
Flink SQL
-- DELETE statement requires batch mode
SET 'execution.runtime-mode' = 'batch';
Flink SQL
-- The condition must include all primary key equality conditions.
DELETE FROM pk_table WHERE shop_id = 10000 and user_id = 123456;

UPDATE

Fluss enables data updates for primary-key tables in batch mode using the UPDATE statement. Currently, only single-row updates based on the primary key are supported.

Flink SQL
-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
Flink SQL
-- The condition must include all primary key equality conditions.
UPDATE pk_table SET total_amount = 2 WHERE shop_id = 10000 and user_id = 123456;