Skip to main content

Real-Time Analytics With Flink

This guide will get you up and running with Apache Flink to do real-time analytics, covering some powerful features of Fluss, including integrating with Paimon. The guide is derived from TPC-H Q5.

For more information on working with Flink, refer to the Apache Flink Engine section.

Environment Setup

Prerequisites

Before proceeding with this guide, ensure that Docker and the Docker Compose plugin are installed on your machine. All commands were tested with Docker version 27.4.0 and Docker Compose version v2.30.3.

note

We encourage you to use a recent version of Docker and Compose v2 (however, Compose v1 might work with a few adaptions).

Starting required components

We will use docker compose to spin up the required components for this tutorial.

  1. Create a working directory for this guide.
mkdir fluss-quickstart-flink
cd fluss-quickstart-flink
  1. Create a docker-compose.yml file with the following content:
services:
coordinator-server:
image: fluss/fluss:0.5.0
command: coordinatorServer
depends_on:
- zookeeper
environment:
- |
FLUSS_PROPERTIES=
zookeeper.address: zookeeper:2181
coordinator.host: coordinator-server
remote.data.dir: /tmp/fluss/remote-data
lakehouse.storage: paimon
paimon.catalog.metastore: filesystem
paimon.catalog.warehouse: /tmp/paimon
tablet-server:
image: fluss/fluss:0.5.0
command: tabletServer
depends_on:
- coordinator-server
environment:
- |
FLUSS_PROPERTIES=
zookeeper.address: zookeeper:2181
tablet-server.host: tablet-server
data.dir: /tmp/fluss/data
remote.data.dir: /tmp/fluss/remote-data
kv.snapshot.interval: 0s
lakehouse.storage: paimon
paimon.catalog.metastore: filesystem
paimon.catalog.warehouse: /tmp/paimon
zookeeper:
restart: always
image: zookeeper:3.8.4

jobmanager:
image: fluss/quickstart-flink:1.20-0.5
ports:
- "8083:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
volumes:
- shared-tmpfs:/tmp/paimon
taskmanager:
image: fluss/quickstart-flink:1.20-0.5
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 10
taskmanager.memory.process.size: 2048m
taskmanager.memory.framework.off-heap.size: 256m
volumes:
- shared-tmpfs:/tmp/paimon

volumes:
shared-tmpfs:
driver: local
driver_opts:
type: "tmpfs"
device: "tmpfs"

The Docker Compose environment consists of the following containers:

  • Fluss Cluster: a Fluss CoordinatorServer, a Fluss TabletServer and a ZooKeeper server.
  • Flink Cluster: a Flink JobManager and a Flink TaskManager container to execute queries.

Note: The fluss/quickstart-flink image is based on flink:1.20.0-java17 and includes the fluss-connector-flink, paimon-flink and flink-connector-faker to simplify this guide.

  1. To start all containers, run:
docker compose up -d

This command automatically starts all the containers defined in the Docker Compose configuration in detached mode.

Run

docker ps

to check whether all containers are running properly.

You can also visit http://localhost:8083/ to see if Flink is running normally.

note
  • If you want to run with your own Flink environment, remember to download the fluss-connector-flink, flink-connector-faker, paimon-flink connector jars and then put them to FLINK_HOME/lib/.
  • All the following commands involving docker compose should be executed in the created working directory that contains the docker-compose.yml file.

Congratulations, you are all set!

Enter into SQL-Client

First, use the following command to enter the Flink SQL CLI Container:

docker compose exec jobmanager ./sql-client

Note: To simplify this guide, three temporary tables have been pre-created with faker connector to generate data. You can view their schemas by running the following commands:

Flink SQL Client
SHOW CREATE TABLE source_customer;
Flink SQL Client
SHOW CREATE TABLE source_order;
Flink SQL Client
SHOW CREATE TABLE source_nation;

Create Fluss Tables

Create Fluss Catalog

Use the following SQL to create a Fluss catalog:

Flink SQL Client
CREATE CATALOG my_fluss WITH (
'type' = 'fluss',
'bootstrap.servers' = 'coordinator-server:9123'
);
Flink SQL Client
USE CATALOG my_fluss;

Create Tables

Running the following SQL to create Fluss tables to be used in this guide:

Flink SQL Client
CREATE TABLE fluss_order (
`order_key` BIGINT,
`cust_key` INT NOT NULL,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING,
`ptime` AS PROCTIME(),
PRIMARY KEY (`order_key`) NOT ENFORCED
);
Flink SQL Client
CREATE TABLE fluss_customer (
`cust_key` INT NOT NULL,
`name` STRING,
`phone` STRING,
`nation_key` INT NOT NULL,
`acctbal` DECIMAL(15, 2),
`mktsegment` STRING,
PRIMARY KEY (`cust_key`) NOT ENFORCED
);
Flink SQL Client
CREATE TABLE `fluss_nation` (
`nation_key` INT NOT NULL,
`name` STRING,
PRIMARY KEY (`nation_key`) NOT ENFORCED
);
Flink SQL Client
CREATE TABLE enriched_orders (
`order_key` BIGINT,
`cust_key` INT NOT NULL,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING,
`cust_name` STRING,
`cust_phone` STRING,
`cust_acctbal` DECIMAL(15, 2),
`cust_mktsegment` STRING,
`nation_name` STRING,
PRIMARY KEY (`order_key`) NOT ENFORCED
);

Streaming into Fluss

First, run the following SQL to sync data from source tables to Fluss tables:

Flink SQL Client
EXECUTE STATEMENT SET
BEGIN
INSERT INTO fluss_nation SELECT * FROM `default_catalog`.`default_database`.source_nation;
INSERT INTO fluss_customer SELECT * FROM `default_catalog`.`default_database`.source_customer;
INSERT INTO fluss_order SELECT * FROM `default_catalog`.`default_database`.source_order;
END;

Fluss primary-key tables support high QPS point lookup queries on primary keys. Performing a lookup join is really efficient and you can use it to enrich the fluss_orders table with information from the fluss_customer and fluss_nation primary-key tables.

Flink SQL Client
INSERT INTO enriched_orders
SELECT o.order_key,
o.cust_key,
o.total_price,
o.order_date,
o.order_priority,
o.clerk,
c.name,
c.phone,
c.acctbal,
c.mktsegment,
n.name
FROM fluss_order o
LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON o.cust_key = c.cust_key
LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n`
ON c.nation_key = n.nation_key;

Run Ad-hoc Queries on Fluss Tables

You can now perform real-time analytics directly on Fluss tables. For instance, to calculate the number of orders placed by a specific customer, you can execute the following SQL query to obtain instant, real-time results.

Flink SQL Client
-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';
Flink SQL Client
-- switch to batch mode
SET 'execution.runtime-mode' = 'batch';
Flink SQL Client
-- use limit to query the enriched_orders table
SELECT * FROM enriched_orders LIMIT 2;

Sample Output

+-----------+----------+-------------+------------+----------------+--------+------------+----------------+--------------+-----------------+-------------+
| order_key | cust_key | total_price | order_date | order_priority | clerk | cust_name | cust_phone | cust_acctbal | cust_mktsegment | nation_name |
+-----------+----------+-------------+------------+----------------+--------+------------+----------------+--------------+-----------------+-------------+
| 23199744 | 9 | 266.44 | 2024-08-29 | high | Clerk1 | Joe King | 908.207.8513 | 124.28 | FURNITURE | JORDAN |
| 10715776 | 2 | 924.43 | 2024-11-04 | medium | Clerk3 | Rita Booke | (925) 775-0717 | 172.39 | FURNITURE | UNITED |
+-----------+----------+-------------+------------+----------------+--------+------------+----------------+--------------+-----------------+-------------+

If you are interested in a specific customer, you can retrieve their details by performing a lookup on the cust_key.

Flink SQL Client
-- lookup by primary key
SELECT * FROM fluss_customer WHERE `cust_key` = 1;

Sample Output

+----------+---------------+--------------+------------+---------+------------+
| cust_key | name | phone | nation_key | acctbal | mktsegment |
+----------+---------------+--------------+------------+---------+------------+
| 1 | Al K. Seltzer | 817-617-7960 | 1 | 533.41 | AUTOMOBILE |
+----------+---------------+--------------+------------+---------+------------+

Note: Overall the query results are returned really fast, as Fluss enables efficient primary key lookups for tables with defined primary keys.

Update/Delete rows on Fluss Tables

You can use UPDATE and DELETE statements to update/delete rows on Fluss tables.

Update

Flink SQL Client
-- update by primary key
UPDATE fluss_customer SET `name` = 'fluss_updated' WHERE `cust_key` = 1;

Then you can lookup the specific row:

Flink SQL Client
SELECT * FROM fluss_customer WHERE `cust_key` = 1;

Sample Output

+----------+---------------+--------------+------------+---------+------------+
| cust_key | name | phone | nation_key | acctbal | mktsegment |
+----------+---------------+--------------+------------+---------+------------+
| 1 | fluss_updated | 817-617-7960 | 1 | 533.41 | AUTOMOBILE |
+----------+---------------+--------------+------------+---------+------------+

Notice that the name column has been updated to fluss_updated.

Delete

DELETE FROM fluss_customer WHERE `cust_key` = 1;

The following SQL query should return an empty result.

Flink SQL Client
SELECT * FROM fluss_customer WHERE `cust_key` = 1;

Integrate with Paimon

Start the Lakehouse Tiering Service

To integrate with Apache Paimon, you need to start the Lakehouse Tiering Service. Open a new terminal, navigate to the fluss-quickstart-flink directory, and execute the following command within this directory to start the service:

docker compose exec coordinator-server ./bin/lakehouse.sh -D flink.rest.address=jobmanager -D flink.rest.port=8081 -D flink.execution.checkpointing.interval=30s

You should see a Flink Job named fluss-paimon-tiering-service running in the Flink Web UI.

Streaming into Fluss datalake-enabled tables

By default, tables are created with data lake integration disabled, meaning the Lakehouse Tiering Service will not tier the table's data to the data lake.

To enable lakehouse functionality as a tiered storage solution for a table, you must create the table with the configuration option table.datalake.enabled = true. Return to the SQL client and execute the following SQL statement to create a table with data lake integration enabled:

Flink SQL Client
CREATE TABLE datalake_enriched_orders (
`order_key` BIGINT,
`cust_key` INT NOT NULL,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING,
`cust_name` STRING,
`cust_phone` STRING,
`cust_acctbal` DECIMAL(15, 2),
`cust_mktsegment` STRING,
`nation_name` STRING,
PRIMARY KEY (`order_key`) NOT ENFORCED
) WITH ('table.datalake.enabled' = 'true');

Next, perform streaming data writing into the datalake-enabled table, datalake_enriched_orders:

Flink SQL Client
-- switch to streaming mode
SET 'execution.runtime-mode' = 'streaming';
Flink SQL Client
-- insert tuples into datalake_enriched_orders
INSERT INTO datalake_enriched_orders
SELECT o.order_key,
o.cust_key,
o.total_price,
o.order_date,
o.order_priority,
o.clerk,
c.name,
c.phone,
c.acctbal,
c.mktsegment,
n.name
FROM fluss_order o
LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON o.cust_key = c.cust_key
LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n`
ON c.nation_key = n.nation_key;

Real-Time Analytics on Fluss datalake-enabled Tables

The data for the datalake_enriched_orders table is stored in Fluss (for real-time data) and Paimon (for historical data).

When querying the datalake_enriched_orders table, Fluss uses a union operation that combines data from both Fluss and Paimon to provide a complete result set -- combines real-time and historical data.

If you wish to query only the data stored in Paimon—offering high-performance access without the overhead of unioning data—you can use the datalake_enriched_orders$lake table by appending the $lake suffix. This approach also enables all the optimizations and features of a Flink Paimon table source, including system table such as datalake_enriched_orders$lake$snapshots.

To query the snapshots directly from Paimon, use the following SQL:

Flink SQL Client
-- switch to batch mode
SET 'execution.runtime-mode' = 'batch';
Flink SQL Client
-- query snapshots in paimon
SELECT snapshot_id, total_record_count FROM datalake_enriched_orders$lake$snapshots;

Sample Output:

+-------------+--------------------+
| snapshot_id | total_record_count |
+-------------+--------------------+
| 1 | 650 |
+-------------+--------------------+

Note: Make sure to wait for the checkpoints (~30s) to complete before querying the snapshots, otherwise the result will be empty.

Run the following SQL to do analytics on Paimon data:

Flink SQL Client
-- to sum prices of all orders in paimon
SELECT sum(total_price) as sum_price FROM datalake_enriched_orders$lake;

Sample Output:

+------------+
| sum_price |
+------------+
| 1669519.92 |
+------------+

To achieve results with sub-second data freshness, you can query the table directly, which seamlessly unifies data from both Fluss and Paimon:

Flink SQL Client
-- to sum prices of all orders in fluss and paimon
SELECT sum(total_price) as sum_price FROM datalake_enriched_orders;

The result looks like:

+------------+
| sum_price |
+------------+
| 1777908.36 |
+------------+

You can execute the real-time analytics query multiple times, and the results will vary with each run as new data is continuously written to Fluss in real-time.

Finally, you can use the following command to view the files stored in Paimon:

docker compose exec taskmanager tree /tmp/paimon/fluss.db

Sample Output:

/tmp/paimon/fluss.db
└── datalake_enriched_orders
├── bucket-0
│ ├── changelog-aef1810f-85b2-4eba-8eb8-9b136dec5bdb-0.orc
│ └── data-aef1810f-85b2-4eba-8eb8-9b136dec5bdb-1.orc
├── manifest
│ ├── manifest-aaa007e1-81a2-40b3-ba1f-9df4528bc402-0
│ ├── manifest-aaa007e1-81a2-40b3-ba1f-9df4528bc402-1
│ ├── manifest-list-ceb77e1f-7d17-4160-9e1f-f334918c6e0d-0
│ ├── manifest-list-ceb77e1f-7d17-4160-9e1f-f334918c6e0d-1
│ └── manifest-list-ceb77e1f-7d17-4160-9e1f-f334918c6e0d-2
├── schema
│ └── schema-0
└── snapshot
├── EARLIEST
├── LATEST
└── snapshot-1

The files adhere to Paimon's standard format, enabling seamless querying with other engines such as StarRocks.

Clean up

After finishing the tutorial, run exit to exit Flink SQL CLI Container and then run

docker compose down -v

to stop all containers.

Learn more

Now that you're up an running with Fluss and Flink, check out the Apache Flink Engine docs to learn more features with Flink!