Flink DDL
Create Catalog
Fluss supports creating and managing tables through the Fluss Catalog.
CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = 'fluss-server-1:9123'
);
USE CATALOG fluss_catalog;
The following properties can be set if using the Fluss catalog:
Option | Required | Default | Description |
---|---|---|---|
type | required | (none) | Catalog type, must to be 'fluss' here. |
bootstrap.servers | required | (none) | Comma separated list of Fluss servers. |
default-database | optional | fluss | The default database to use when switching to this catalog. |
The following introduced statements assuming the current catalog is switched to the Fluss catalog using USE CATALOG <catalog_name>
statement.
Create Database
By default, FlussCatalog will use the fluss
database in Flink. Using the following example to create a separate database in order to avoid creating tables under the default fluss
database:
CREATE DATABASE my_db;
USE my_db;
Drop Database
To delete a database, this will drop all the tables in the database as well:
-- Flink doesn't allow drop current database, switch to Fluss default database
USE fluss;
-- drop the database
DROP DATABASE my_db;
Create Table
PrimaryKey Table
The following SQL statement will create a PrimaryKey Table with a primary key consisting of shop_id and user_id.
CREATE TABLE my_pk_table (
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (shop_id, user_id) NOT ENFORCED
) WITH (
'bucket.num' = '4'
);
Log Table
The following SQL statement creates a Log Table by not specifying primary key clause.
CREATE TABLE my_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) WITH (
'bucket.num' = '8'
);
Partitioned (PrimaryKey/Log) Table
- Currently, Fluss only supports one partitioned field with
STRING
type - For the Partitioned PrimaryKey Table, the partitioned field (
dt
in this case) must be a subset of the primary key (dt, shop_id, user_id
in this case)
The following SQL statement creates a Partitioned PrimaryKey Table in Fluss.
CREATE TABLE my_part_pk_table (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt);
The following SQL statement creates a Partitioned Log Table in Fluss.
CREATE TABLE my_part_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING,
dt STRING
) PARTITIONED BY (dt);
After the Partitioned (PrimaryKey/Log) Table is created, you need first manually create the corresponding partition using the Add Partition statement before you write/read data into this partition.
Auto partitioned (PrimaryKey/Log) table
Fluss also support creat Auto Partitioned (PrimaryKey/Log) Table. The following SQL statement creates an Auto Partitioned PrimaryKey Table in Fluss.
CREATE TABLE my_auto_part_pk_table (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket.num' = '4',
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'day'
);
The following SQL statement creates an Auto Partitioned Log Table in Fluss.
CREATE TABLE my_auto_part_log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING,
dt STRING
) PARTITIONED BY (dt) WITH (
'bucket.num' = '8',
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'hour'
);
For more details about Auto Partitioned (PrimaryKey/Log) Table, refer to Auto Partitioning Options.
Options
The supported option in WITH
parameters when creating a table are listed in Connector Options page.
Create Table Like
To create a table with the same schema, partitioning, and table properties as another table, use CREATE TABLE LIKE
.
-- there is a temporary datagen table
CREATE TEMPORARY TABLE datagen (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);
-- creates Fluss table which derives the metadata from the temporary table excluding options
CREATE TABLE my_table LIKE datagen (EXCLUDING OPTIONS);
For more details, refer to the Flink CREATE TABLE documentation.
Drop Table
To delete a table, run:
DROP TABLE my_table;
This will entirely remove all the data of the table in the Fluss cluster.