Skip to main content
Version: 0.7

Flink DDL

Create Catalog

Fluss supports creating and managing tables through the Fluss Catalog.

Flink SQL
CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = 'fluss-server-1:9123'
);
Flink SQL
USE CATALOG fluss_catalog;

The following properties can be set if using the Fluss catalog:

OptionRequiredDefaultDescription
typerequired(none)Catalog type, must to be 'fluss' here.
bootstrap.serversrequired(none)Comma separated list of Fluss servers.
default-databaseoptionalflussThe default database to use when switching to this catalog.
client.security.protocoloptionalPLAINTEXTThe security protocol used to communicate with brokers. Currently, only PLAINTEXT and SASL are supported, the configuration value is case insensitive.
client.security.{protocol}.*optional(none)Client-side configuration properties for a specific authentication protocol. E.g., client.security.sasl.jaas.config. More Details in authentication

The following introduced statements assuming the current catalog is switched to the Fluss catalog using USE CATALOG <catalog_name> statement.

Create Database

By default, FlussCatalog will use the fluss database in Flink. Using the following example to create a separate database in order to avoid creating tables under the default fluss database:

Flink SQL
CREATE DATABASE my_db;
Flink SQL
USE my_db;

Drop Database

To delete a database, this will drop all the tables in the database as well:

Flink SQL
-- Flink doesn't allow drop current database, switch to Fluss default database
USE fluss;
Flink SQL
-- drop the database
DROP DATABASE my_db;

Create Table

PrimaryKey Table

The following SQL statement will create a PrimaryKey Table with a primary key consisting of shop_id and user_id.

Flink SQL
CREATE TABLE my_pk_table (
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (shop_id, user_id) NOT ENFORCED
) WITH (
'bucket.num' = '4'
);

Log Table

The following SQL statement creates a Log Table by not specifying primary key clause.

Flink SQL
CREATE TABLE my_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) WITH (
'bucket.num' = '8'
);

Partitioned (PrimaryKey/Log) Table

note
  1. Currently, Fluss only supports partitioned field with STRING type
  2. For the Partitioned PrimaryKey Table, the partitioned field (dt in this case) must be a subset of the primary key (dt, shop_id, user_id in this case)

The following SQL statement creates a Partitioned PrimaryKey Table in Fluss.

Flink SQL
CREATE TABLE my_part_pk_table (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt);

The following SQL statement creates a Partitioned Log Table in Fluss.

Flink SQL
CREATE TABLE my_part_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING,
dt STRING
) PARTITIONED BY (dt);
info

Fluss partitioned table supports dynamic partition creation, which means you can write data into a partition without pre-creating it. You can use the INSERT INTO statement to write data into a partitioned table, and Fluss will automatically create the partition if it does not exist. See the Dynamic Partitioning for more details. But you can still use the Add Partition statement to manually add partitions if needed.

Multi-Fields Partitioned Table

Fluss also support Multi-Fields Partitioning, the following SQL statement creates a Multi-Fields Partitioned Log Table in Fluss:

Flink SQL
CREATE TABLE my_multi_fields_part_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING,
dt STRING,
nation STRING
) PARTITIONED BY (dt, nation);

Auto partitioned (PrimaryKey/Log) table

Fluss also support creat Auto Partitioned (PrimaryKey/Log) Table. The following SQL statement creates an Auto Partitioned PrimaryKey Table in Fluss.

Flink SQL
CREATE TABLE my_auto_part_pk_table (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket.num' = '4',
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'day'
);

The following SQL statement creates an Auto Partitioned Log Table in Fluss.

Flink SQL
CREATE TABLE my_auto_part_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING,
dt STRING
) PARTITIONED BY (dt) WITH (
'bucket.num' = '8',
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'hour'
);

For more details about Auto Partitioned (PrimaryKey/Log) Table, refer to Auto Partitioning.

Options

The supported option in WITH parameters when creating a table are listed in Connector Options page.

Create Table Like

To create a table with the same schema, partitioning, and table properties as another table, use CREATE TABLE LIKE.

Flink SQL
-- there is a temporary datagen table
CREATE TEMPORARY TABLE datagen (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);
Flink SQL
-- creates Fluss table which derives the metadata from the temporary table excluding options
CREATE TABLE my_table LIKE datagen (EXCLUDING OPTIONS);

For more details, refer to the Flink CREATE TABLE documentation.

Drop Table

To delete a table, run:

Flink SQL
DROP TABLE my_table;

This will entirely remove all the data of the table in the Fluss cluster.

Show Partitions

To show all the partitions of a partitioned table, run:

Flink SQL
SHOW PARTITIONS my_part_pk_table;

For multi-field partitioned tables, you can use the SHOW PARTITIONS command with either partial or full partition field conditions to list matching partitions.

Flink SQL
-- Show partitions using a partial partition filter
SHOW PARTITIONS my_multi_fields_part_log_table PARTITION (dt = '2025-03-05');

-- Show partitions using a full partition filter
SHOW PARTITIONS my_multi_fields_part_log_table PARTITION (dt = '2025-03-05', nation = 'US');

For more details, refer to the Flink SHOW PARTITIONS documentation.

Add Partition

Fluss support manually add partitions to an exists partitioned table by Fluss Catalog. If the specified partition not exists, Fluss will create the partition. If the specified partition already exists, Fluss will ignore the request or throw an exception.

To add partitions, run:

Flink SQL
-- Add a partition to a single field partitioned table
ALTER TABLE my_part_pk_table ADD PARTITION (dt = '2025-03-05');

-- Add a partition to a multi-field partitioned table
ALTER TABLE my_multi_fields_part_log_table ADD PARTITION (dt = '2025-03-05', nation = 'US');

For more details, refer to the Flink ALTER TABLE(ADD) documentation.

Drop Partition

Fluss also support manually drop partitions from an exists partitioned table by Fluss Catalog. If the specified partition not exists, Fluss will ignore the request or throw an exception.

To drop partitions, run:

Flink SQL
-- Drop a partition from a single field partitioned table
ALTER TABLE my_part_pk_table DROP PARTITION (dt = '2025-03-05');

-- Drop a partition from a multi-field partitioned table
ALTER TABLE my_multi_fields_part_log_table DROP PARTITION (dt = '2025-03-05', nation = 'US');

For more details, refer to the Flink ALTER TABLE(DROP) documentation.