Skip to main content
Version: Next

Connector Options

The following table lists the options that can be used to configure the Fluss table in Flink, including the storage format, the read/write behaviors.

How to Configure Options

You can configure the Fluss table options in Flink SQL DDL statements. For example, the following SQL statement creates a Fluss table with the table.log.ttl set to 7 days:

Configure options when creating a table

All the options in this page can be set when creating a table. Once a table is created, the defined options will be stored as a part of the metadata of the Fluss table. The stored options will be used when reading or writing the table. For example, the following SQL statement creates a Fluss table with the table.log.ttl set to 7 days and disables the CRC check for the log reading:

CREATE TABLE log_table (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) WITH (
'table.log.ttl' = '7d',
'client.scanner.log.check-crc' = 'false'
);

Dynamically apply options via SQL hints

You can dynamically apply the options via SQL hints. The dynamically applied options will not be stored as metadata of the table. They will only be used for the current query and will not affect other queries on the table. The dynamically applied options have a higher priority than the options stored in the metadata of the table.

note

The Storage Options doesn't supported to be dynamically configured via SQL hints, because the storage behavior is determined when the table is created.

For example, the following SQL statements temporarily disables the CRC check for the log reading and ignores deletes for writing:

-- SQL hints on source tables
SELECT * FROM log_table /*+ OPTIONS('client.scanner.log.check-crc' = '7d') */;

-- SQL hints on sink tables
INSERT INTO pk_table2 /*+ OPTIONS('sink.ignore-delete'='true') */ select * from pk_table1;

Configure options by altering table

This is not supported yet, but is planned in the near future. For example, the following SQL statement alters the Fluss table with the table.log.ttl set to 7 days:

ALTER TABLE log_table SET ('table.log.ttl' = '7d');

Storage Options

OptionTypeDefaultDescription
bucket.numintThe bucket number of Fluss cluster.The number of buckets of a Fluss table.
bucket.keyString(none)Specific the distribution policy of the Fluss table. Data will be distributed to each bucket according to the hash value of bucket-key. If you specify multiple fields, delimiter is ','. If the table is with primary key, you can't specific bucket key currently. The bucket keys will always be the primary key(excluding partition key). If the table is not with primary key, you can specific bucket key, and when the bucket key is not specified, the data will be distributed to each bucket randomly.
table.log.ttlDuration7 daysThe time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted.
table.auto-partition.enabledBooleanfalseWhether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically.
table.auto-partition.time-unitENUMDAYThe time granularity for auto created partitions. The default value is 'DAY'. Valid values are 'HOUR', 'DAY', 'MONTH', 'QUARTER', 'YEAR'. If the value is 'HOUR', the partition format for auto created is yyyyMMddHH. If the value is 'DAY', the partition format for auto created is yyyyMMdd. If the value is 'MONTH', the partition format for auto created is yyyyMM. If the value is 'QUARTER', the partition format for auto created is yyyyQ. If the value is 'YEAR', the partition format for auto created is yyyy.
table.auto-partition.num-precreateInteger2The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the 'table.auto-partition.time-unit' is 'DAY'(default), one precreated partition is for today and another one is for tomorrow.
table.auto-partition.num-retentionInteger7The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7, which means that 7 partitions will be retained.
table.auto-partition.time-zoneStringthe system time zoneThe time zone for auto partitions, which is by default the same as the system time zone.
table.replication.factorInteger(None)The replication factor for the log of the new table. When it's not set, Fluss will use the cluster's default replication factor configured by default.replication.factor. It should be a positive number and not larger than the number of tablet servers in the Fluss cluster. A value larger than the number of tablet servers in Fluss cluster will result in an error when the new table is created.
table.log.formatEnumARROWThe format of the log records in log store. The default value is 'ARROW'. The supported formats are 'ARROW' and 'INDEXED'.
table.log.arrow.compression.typeEnumZSTDThe compression type of the log records if the log format is set to 'ARROW'. The candidate compression type is 'NONE', 'LZ4_FRAME', 'ZSTD'. The default value is 'ZSTD'.
table.log.arrow.compression.zstd.levelInteger3The compression level of the log records if the log format is set to 'ARROW' and the compression type is set to 'ZSTD'. The valid range is 1 to 22. The default value is 3.
table.kv.formatEnumCOMPACTEDThe format of the kv records in kv store. The default value is 'COMPACTED'. The supported formats are 'COMPACTED' and 'INDEXED'.
table.log.tiered.local-segmentsInteger2The number of log segments to retain in local for each table when log tiered storage is enabled. It must be greater that 0. The default is 2.
table.datalake.enabledBooleanfalseWhether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage.
table.datalake.formatEnum(None)The data lake format of the table specifies the tiered Lakehouse storage format, such as Paimon, Iceberg, DeltaLake, or Hudi. Currently, only 'paimon' is supported. Once the table.datalake.format property is configured, Fluss adopts the key encoding and bucketing strategy used by the corresponding data lake format. This ensures consistency in key encoding and bucketing, enabling seamless Union Read functionality across Fluss and Lakehouse. The table.datalake.format can be pre-defined before enabling table.datalake.enabled. This allows the data lake feature to be dynamically enabled on the table without requiring table recreation. If table.datalake.format is not explicitly set during table creation, the table will default to the format specified by the datalake.format configuration in the Fluss cluster
table.merge-engineEnum(None)Defines the merge engine for the primary key table. By default, primary key table doesn't have merge engine. The supported merge engines are 'first_row' and 'versioned'. The first_row merge engine will keep the first row of the same primary key. The versioned merge engine will keep the row with the largest version of the same primary key.
table.merge-engine.versioned.ver-columnString(None)The column name of the version column for the 'versioned' merge engine. If the merge engine is set to 'versioned', the version column must be set.

Read Options

OptionTypeDefaultDescription
scan.startup.modeEnumfullThe scan startup mode enables you to specify the starting point for data consumption. Fluss currently supports the following scan.startup.mode options: full (default), earliest, latest, timestamp. See the Start Reading Position for more details.
scan.startup.timestampLong(None)The timestamp to start reading the data from. This option is only valid when scan.startup.mode is set to timestamp. The format is 'milli-second-since-epoch' or 'yyyy-MM-dd HH:mm:ss', like '1678883047356' or '2023-12-09 23:09:12'.
scan.partition.discovery.intervalDuration10sThe time interval for the Fluss source to discover the new partitions for partitioned table while scanning. A non-positive value disables the partition discovery.
client.scanner.log.check-crcBooleantrueAutomatically check the CRC3 of the read records for LogScanner. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.
client.scanner.log.max-poll-recordsInteger500The maximum number of records returned in a single call to poll() for LogScanner. Note that this config doesn't impact the underlying fetching behavior. The Scanner will cache the records from each fetch request and returns them incrementally from each poll.
client.scanner.log.fetch.max-bytesMemorySize16mbThe maximum amount of data the server should return for a fetch request from client. Records are fetched in batches, and if the first record batch in the first non-empty bucket of the fetch is larger than this value, the record batch will still be returned to ensure that the fetch can make progress. As such, this is not a absolute maximum.
client.scanner.log.fetch.max-bytes-for-bucketMemorySize1mbThe maximum amount of data the server should return for a table bucket in fetch request fom client. Records are fetched in batches, and the max bytes size is config by this option.
client.scanner.log.fetch.min-bytesMemorySize1bThe minimum bytes expected for each fetch log request from client to response. If not enough bytes, wait up to client.scanner.log.fetch-wait-max-time time to return.
client.scanner.log.fetch.wait-max-timeDuration500msThe maximum time to wait for enough bytes to be available for a fetch log request from client to response.
client.scanner.io.tmpdirStringSystem.getProperty("java.io.tmpdir") + "/fluss"Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily
client.scanner.remote-log.prefetch-numInteger4The number of remote log segments to keep in local temp file for LogScanner, which download from remote storage. The default setting is 4.
client.remote-file.download-thread-numInteger3The number of threads the client uses to download remote files.

Lookup Options

OptionTypeDefaultDescription
lookup.asyncBooleantrueWhether to use asynchronous lookup. Asynchronous lookup has better throughput performance than synchronous lookup.
lookup.cacheEnumoptionalNONE
lookup.max-retriesIntegeroptional3
lookup.partial-cache.expire-after-accessDurationoptional(none)
lookup.partial-cache.expire-after-writeDurationoptional(none)
lookup.partial-cache.cache-missing-keyBooleanoptionaltrue
lookup.partial-cache.max-rowsLongoptionaltrue
client.lookup.queue-sizeInteger25600The maximum number of pending lookup operations.
client.lookup.max-batch-sizeInteger128The maximum batch size of merging lookup operations to one lookup request.
client.lookup.max-inflight-requestsInteger128The maximum number of unacknowledged lookup requests for lookup operations.
client.lookup.batch-timeoutDuration100msThe maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send.

Write Options

OptionTypeDefaultDescription
sink.ignore-deleteBooleanfalseIf set to true, the sink will ignore DELETE and UPDATE_BEFORE changelog events.
client.writer.buffer.memory-sizeMemorySize64mbThe total bytes of memory the writer can use to buffer internal rows.
client.writer.buffer.page-sizeMemorySize128kbSize of every page in memory buffers ('client.writer.buffer.memory-size').
client.writer.buffer.per-request-memory-sizeMemorySize16mbThe minimum number of bytes that will be allocated by the writer rounded down to the closest multiple of client.writer.buffer.page-size. It must be greater than or equal to client.writer.buffer.page-size. This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments.
client.writer.batch-sizeMemorySize2mbThe writer or walBuilder will attempt to batch records together into one batch for the same bucket. This helps performance on both the client and the server.
client.writer.buffer.wait-timeoutDuration2^(63)-1nsDefines how long the writer will block when waiting for segments to become available.
client.writer.batch-timeoutDuration100msThe writer groups ay rows that arrive in between request sends into a single batched request. Normally this occurs only under load when rows arrive faster than they can be sent out. However in some circumstances the writer may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay, that is, rather than immediately sending out a row, the writer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get client.writer.batch-size worth of rows for a bucket it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this bucket we will delay for the specified time waiting for more records to show up.
client.writer.bucket.no-key-assignerEnumSTICKYThe bucket assigner for no key table. For table with bucket key or primary key, we choose a bucket based on a hash of the key. For these table without bucket key and primary key, we can use this option to specify bucket assigner, the candidate assigner is ROUND_ROBIN, STICKY, the default assigner is STICKY.
ROUND_ROBIN: this strategy will assign the bucket id for the input row by round robin.
STICKY: this strategy will assign new bucket id only if the batch changed in record accumulator, otherwise the bucket id will be the same as the front record.
client.writer.acksStringallThe number of acknowledgments the writer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
acks=0: If set to 0, then the writer will not wait for any acknowledgment from the server at all. No guarantee can be mode that the server has received the record in this case.
acks=1: This will mean the leader will write the record to its local log but will respond without awaiting full acknowledge the record but before the followers have replicated it then the record will be lost.
acks=-1 (all): This will mean the leader will wait for the full ser of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive, This is the strongest available guarantee.
client.writer.request-max-sizeMemorySize10mbThe maximum size of a request in bytes. This setting will limit the number of record batches the writer will send in a single request to avoid sending huge requests. Note that this retry is no different than if the writer resent the row upon receiving the error.
client.writer.retriesIntegerInteger.MAX_VALUESetting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.
client.writer.enable-idempotenceBooleantrueWriter idempotence is enabled by default if no conflicting config are set. If conflicting config are set and writer idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting config are set, a ConfigException is thrown
client.writer.max-inflight-requests-per-bucketInteger5The maximum number of unacknowledged requests per bucket for writer. This configuration can work only if 'client.writer.enable-idempotence' is set to true. When the number of inflight requests per bucket exceeds this setting, the writer will wait for the inflight requests to complete before sending out new requests.

Other Options

OptionTypeDefaultDescription
bootstrap.serversList(None)A list of host/port pairs to use for establishing the initial connection to the Fluss cluster. The list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down)
client.idString""An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.
client.connect-timeoutDuration120sThe Netty client connect timeout.
client.request-timeoutDuration30sThe timeout for a request to complete. If user set the write ack to -1, this timeout is the max time that delayed write try to complete. The default setting is 30 seconds.
client.filesystem.security.token.renewal.backoffDuration1hThe time period how long to wait before retrying to obtain new security tokens for filesystem after a failure.
client.filesystem.security.token.renewal.time-ratioDouble0.75Ratio of the tokens's expiration time when new credentials for access filesystem should be re-obtained.
client.metrics.enabledBooleantrueEnable metrics for client. When metrics is enabled, the client will collect metrics and report by the JMX metrics reporter.