Configuration
All configuration can be set in Fluss configuration file conf/server.yaml
The configuration is parsed and evaluated when the Fluss processes are started. Changes to the configuration file require restarting the relevant processes.
Users can organize config in format key: value
, such as:
conf/server.yaml
default.bucket.number: 8
default.replication.factor: 3
remote.data.dir: /home/fluss/data
remote.fs.write-buffer-size: 10mb
auto-partition.check.interval: 5min
Server Configuration
Server configuration refers to a set of configurations used to specify the running parameters of a server. These settings can only be configured at the time of cluster startup and do not support dynamic modification during the Fluss cluster working.
Fluss Cluster
Option | Type | Default | Description |
---|---|---|---|
default.bucket.number | Integer | 1 | The default number of buckets for a table in Fluss cluster. It's a cluster-level parameter and all the tables without specifying bucket number in the cluster will use the value as the bucket number. |
default.replication.factor | Integer | 1 | The default replication factor for the log of a table in Fluss cluster. It's a cluster-level parameter, and all the tables without specifying replication factor in the cluster will use the value as replication factor. |
remote.data.dir | String | (none) | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. |
remote.fs.write-buffer-size | MemorySize | 4kb | The default size of the write buffer for writing the local files to remote file systems. |
plugin.classloader.parent-first-patterns.default | String | java., com.alibaba.fluss., javax.annotation., org.slf4j, org.apache.log4j, org.apache.logging, org.apache.commons.logging, ch.qos.logback | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. |
auto-partition.check.interval | Duration | 10min | The interval of auto partition check. he default value is 10 minutes. |
CoordinatorServer
Option | Type | Default | Description |
---|---|---|---|
coordinator.host | String | (None) | The config parameter defining the network address to connect to for communication with the coordinator server. If the coordinator server is used as a bootstrap server (discover all the servers in the cluster), the value of this config option should be a static hostname or address. |
coordinator.port | String | 9123 | The config parameter defining the network port to connect to for communication with the coordinator server. Like 'coordinator.host', if the coordinator server is used as a bootstrap server (discover all the servers in the cluster), the value of this config option should be a static port. Otherwise, the value can be set to "0" for a dynamic service name resolution. The value accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. |
coordinator.io-pool.size | Integer | 1 | The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 1. |
TabletServer
Option | Type | Default | Description |
---|---|---|---|
tablet-server.host | String | (None) | The external address of the network interface where the TabletServer is exposed. Because different TabletServer need different values for this option, usually it is specified in an additional non-shared TabletServer-specific config file. |
tablet-server.port | String | 0 | The external RPC port where the TabletServer is exposed. |
tablet-server.id | Integer | (None) | The id for the tablet server. |
data.dir | String | /tmp/fluss-data | This configuration controls the directory where fluss will store its data. The default value is /tmp/fluss-data |
server.writer-id.expiration-time | Duration | 7d | The time that the tablet server will wait without receiving any write request from a client before expiring the related status. The default value is 7 days. |
server.writer-id.expiration-check-interval | Duration | 10min | The interval at which to remove writer ids that have expired due to 'server.writer-id.expiration-time passing. The default value is 10 minutes. |
server.background.threads | Integer | 10 | The number of threads to use for various background processing tasks. The default value is 10. |
Zookeeper
Option | Type | Default | Description |
---|---|---|---|
zookeeper.address | String | (None) | The ZooKeeper address to use, when running Fluss with ZooKeeper. |
zookeeper.path.root | String | /fluss | The root path under which Fluss stores its entries in ZooKeeper. |
zookeeper.client.session-timeout | Integer | 60000 | Defines the session timeout for the ZooKeeper session in ms. |
zookeeper.client.connection-timeout | Integer | 15000 | Defines the connection timeout for ZooKeeper in ms. |
zookeeper.client.retry-wait | Integer | 5000 | Defines the pause between consecutive retries in ms. |
zookeeper.client.max-retry-attempts | Integer | 3 | Defines the number of connection retries before the client gives up. |
zookeeper.client.tolerate-suspended-connections | Boolean | false | Defines whether a suspended ZooKeeper connection will be treated as an error that causes the leader information to be invalidated or not. In case you set this option to %s, Fluss will wait until a ZooKeeper connection is marked as lost before it revokes the leadership of components. This has the effect that Fluss is more resilient against temporary connection instabilities at the cost of running more likely into timing issues with ZooKeeper. |
zookeeper.client.ensemble-tracker | Boolean | true | Defines whether Curator should enable ensemble tracker. This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Default Curator EnsembleTracking logic watches CuratorEventType.GET_CONFIG events and changes ZooKeeper connection string. It is not desired behaviour when ZooKeeper is running under the Virtual IPs. Under certain configurations EnsembleTracking can lead to setting of ZooKeeper connection string with unresolvable hostnames. |
Netty
Option | Type | Default | Description |
---|---|---|---|
netty.server.num-network-threads | Integer | 3 | The number of threads that the server uses for receiving requests from the network and sending responses to the network. |
netty.server.num-worker-threads | Integer | 8 | The number of threads that the server uses for processing requests, which may include disk and remote I/O. |
netty.server.max-queued-requests | Integer | 500 | The number of queued requests allowed for worker threads, before blocking the I/O threads. |
netty.connection.max-idle-time | Duration | 10min | Close idle connections after the number of milliseconds specified by this config. |
netty.client.num-network-threads | Integer | 1 | The number of threads that the client uses for sending requests to the network and receiving responses from network. The default value is 1 |
Log
Option | Type | Default | Description |
---|---|---|---|
log.segment.file-size | MemorySize | 1024m | This configuration controls the segment file size for the log. Retention and cleaning is always done a file at a time so a larger segment size means fewer files but less granular control over retention. |
log.index.file-size | MemorySize | 10m | This configuration controls the size of the index that maps offsets to file positions. We preallocate this index file and shrink it only after log rolls. You generally should not need to change this setting. |
log.index.interval-size | MemorySize | 4k | This setting controls how frequently fluss adds an index entry to its offset index. The default setting ensures that we index a message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact position in the log but makes the index larger. You probably don't need to change this. |
log.file-preallocate | Boolean | false | True if we should preallocate the file on disk when creating a new log segment. |
log.flush.interval-messages | Long | Long.MAX_VALUE | This setting allows specifying an interval at which we will force a fsync of data written to the log. For example if this was set to 1, we would fsync after every message; if it were 5 we would fsync after every five messages. |
log.replica.high-watermark.checkpoint-interval | Duration | 5s | The frequency with which the high watermark is saved out to disk. The default setting is 5 seconds. |
log.replica.max-lag-time | Duration | 30s | If a follower replica hasn't sent any fetch log requests or hasn't consumed up the leaders log end offset for at least this time, the leader will remove the follower replica form isr |
log.replica.write-operation-purge-number | Integer | 1000 | The purge number (in number of requests) of the write operation manager, the default value is 1000. |
log.replica.fetcher-number | Integer | 1 | Number of fetcher threads used to replicate log records from each source tablet server. The total number of fetchers on each tablet server is bound by this parameter multiplied by the number of tablet servers in the cluster. Increasing this value can increase the degree of I/O parallelism in the follower and leader tablet server at the cost of higher CPU and memory utilization. |
log.replica.fetch-backoff-interval | Duration | 1s | The amount of time to sleep when fetch bucket error occurs. |
log.fetch.max-bytes | MemorySize | 16mb | The maximum amount of data the server should return for a fetch request. Records are fetched in batches for log scanner or follower, for one request batch, and if the first record batch in the first non-empty bucket of the fetch is larger than this value, the record batch will still be returned to ensure that the fetch can make progress. As such, this is not a absolute maximum. Note that the fetcher performs multiple fetches in parallel. |
log.fetch.max-bytes-for-bucket | MemorySize | 1mb | The maximum amount of data the server should return for a table bucket in fetch request. Records are fetched in batches for consumer or follower, for one request batch, the max bytes size is config by this option. |
log.replica.min-in-sync-replicas-number | Integer | 1 | When a producer set acks to all (-1), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception(NotEnoughReplicas). when used together, this config and 'acks' allow you to enforce greater durability guarantees. A typical scenario would be to create a table with a replication factor of 3. set this conf to 2, and produce with acks = -1. This will ensure that the producer raises an exception if a majority of replicas don't receive a write. |
Log Tiered Storage
Option | Type | Default | Description |
---|---|---|---|
remote.log.task-interval-duration | Duration | 1min | Interval at which remote log manager runs the scheduled tasks like copy segments, clean up remote log segments, delete local log segments etc. If the value is set to 0s, it means that the remote log storage is disabled. |
remote.log.index-file-cache-size | MemorySize | 1gb | The total size of the space allocated to store index files fetched from remote storage in the local storage. |
remote.log-manager.thread-pool-size | Integer | 4 | Size of the thread pool used in scheduling tasks to copy segments, fetch remote log indexes and clean up remote log segments. |
remote.log.data-transfer-thread-num | Integer | 4 | The number of threads the server uses to transfer (download and upload) remote log file can be data file, index file and remote log metadata file. |
Kv
Option | Type | Default | Description |
---|---|---|---|
kv.snapshot.interval | Duration | 10min | The interval to perform periodic snapshot for kv data. The default setting is 10 minutes. |
kv.snapshot.scheduler-thread-num | Integer | 1 | The number of threads that the server uses to schedule snapshot kv data for all the replicas in the server. |
kv.snapshot.transfer-thread-num | Integer | 4 | The number of threads the server uses to transfer (download and upload) kv snapshot files. |
kv.snapshot.num-retained | Integer | 1 | The maximum number of completed snapshots to retain. |
kv.rocksdb.thread.num | Integer | 2 | The maximum number of concurrent background flush and compaction jobs (per bucket of table). The default value is '2'. |
kv.rocksdb.files.open | Integer | -1 | The maximum number of open files (per bucket of table) that can be used by the DB, '-1' means no limit. The default value is '-1'. |
kv.rocksdb.log.max-file-size | MemorySize | 25mb | The maximum size of RocksDB's file used for information logging. If the log files becomes larger than this, a new file will be created. If 0, all logs will be written to one log file. The default maximum file size is '25MB'. |
kv.rocksdb.log.file-num | Integer | 4 | The maximum number of files RocksDB should keep for information logging (Default setting: 4). |
kv.rocksdb.log.dir | String | (None) | The directory for RocksDB's information logging files. If empty (Fluss default setting), log files will be in the same directory as the Fluss log. If non-empty, this directory will be used and the data directory's absolute path will be used as the prefix of the log file name. If setting this option as a non-existing location, e.g '/dev/null', RocksDB will then create the log under its own database folder as before. |
kv.rocksdb.log.level | Enum | INFO_LEVEL | The specified information logging level for RocksDB. Candidate log level is 'DEBUG_LEVEL', 'INFO_LEVEL', 'WARN_LEVEL', 'ERROR_LEVEL', 'FATAL_LEVEL', 'HEADER_LEVEL', NUM_INFO_LOG_LEVELS, . If unset, Fluss will use INFO_LEVEL. Note: RocksDB info logs will not be written to the Fluss's tablet server logs and there is no rolling strategy, unless you configure 'kv.rocksdb.log.dir', 'kv.rocksdb.log.max-file-size' and 'kv.rocksdb.log.file-num' accordingly. Without a rolling strategy, it may lead to uncontrolled disk space usage if configured with increased log levels! There is no need to modify the RocksDB log level, unless for troubleshooting RocksDB. |
kv.rocksdb.write-batch-size | MemorySize | 2mb | The max size of the consumed memory for RocksDB batch write, will flush just based on item count if this config set to 0. |
kv.rocksdb.compaction.style | Enum | LEVEL | The specified compaction style for DB. Candidate compaction style is LEVEL, FIFO, UNIVERSAL, or NONE, and Fluss chooses 'LEVEL' as default style. |
kv.rocksdb.compaction.level.use-dynamic-size | Boolean | false | If true, RocksDB will pick target size of each level dynamically. From an empty DB, RocksDB would make last level the base level, which means merging L0 data into the last level, until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. The default value is 'false'. For more information, please refer to %s https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true RocksDB's doc. |
kv.rocksdb.compression.per.level | Enum | LZ4,LZ4,LZ4,LZ4,LZ4,ZSTD,ZSTD | A comma-separated list of Compression Type. Different levels can have different compression policies. In many cases, lower levels use fast compression algorithms, while higher levels with more data use slower but more effective compression algorithms. The N th element in the List corresponds to the compression type of the level N-1 When 'kv.rocksdb.compaction.level.use-dynamic-size' is true, compression_per_level[0] still determines L0, but other elements are based on the base level and may not match the level seen in the info log. Note: If the List size is smaller than the level number, the undefined lower level uses the last Compression Type in the List. The optional values include NO, SNAPPY, LZ4, ZSTD. For more information about compression type, please refer to doc https://github.com/facebook/rocksdb/wiki/Compression. The default value is ‘LZ4,LZ4,LZ4,LZ4,LZ4,ZSTD,ZSTD’, indicates there is lz4 compaction of level0 and level4, ZSTD compaction algorithm is used from level5 to level6. LZ4 is a lightweight compression algorithm so it usually strikes a good balance between space and CPU usage. ZSTD is more space save than LZ4, but it is more CPU-intensive. Different machines deploy compaction modes according to CPU and I/O resources. The default value is for the scenario that CPU resources are adequate. If you find the IO pressure of the system is not big when writing a lot of data, but CPU resources are inadequate, you can exchange I/O resources for CPU resources and change the compaction mode to 'NO,NO,NO,LZ4,LZ4,ZSTD,ZSTD'. |
kv.rocksdb.compaction.level.target-file-size-base | MemorySize | 64mb | The target file size for compaction, which determines a level-1 file size. The default value is '64MB'. |
kv.rocksdb.compaction.level.max-size-level-base | MemorySize | 256mb | The upper-bound of the total size of level base files in bytes. The default value is '256MB'. |
kv.rocksdb.writebuffer.size | MemorySize | 64mb | The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. The default writebuffer size is '64MB'. |
kv.rocksdb.writebuffer.count | Integer | 2 | The maximum number of write buffers that are built up in memory. The default value is '2'. |
kv.rocksdb.writebuffer.number-to-merge | Integer | 1 | The minimum number of write buffers that will be merged together before writing to storage. The default value is '1'. |
kv.rocksdb.block.blocksize | MemorySize | 4kb | The approximate size (in bytes) of user data packed per block. The default blocksize is '4KB'. |
kv.rocksdb.block.cache-size | MemorySize | 8mb | The amount of the cache for data blocks in RocksDB. The default block-cache size is '8MB'. |
kv.rocksdb.use-bloom-filter | Boolean | true | If true, every newly created SST file will contain a Bloom filter. It is enabled by default. |
kv.rocksdb.bloom-filter.bits-per-key | Double | 10.0 | Bits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0. |
kv.rocksdb.bloom-filter.block-based-mode | Boolean | false | If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is 'false'. |
kv.recover.log-record-batch.max-size | MemorySize | 16mb | The max fetch size for fetching log to apply to kv during recovering kv. |
Metrics
Option | Type | Default | Description |
---|---|---|---|
metrics.reporters | List | (None) | An optional list of reporter names. If configured, only reporters whose name matches in the list will be started |
metrics.reporter.prometheus.port | String | 9249 | The port the Prometheus reporter listens on, defaults to 9249. In order to be able to run several instances of the reporter on one host (e.g. when one TabletServer is colocated with the CoordinatorServer) it is advisable to use a port range like 9250-9260. |
metrics.reporter.jmx.port | String | (None) | The port for the JMXServer that JMX clients can connect to. If not set, the JMXServer won't start. In order to be able to run several instances of the reporter on one host (e.g. when one TabletServer is colocated with the CoordinatorServer) it is advisable to use a port range like 9990-9999. |
Lakehouse
Option | Type | Default | Description |
---|---|---|---|
lakehouse.storage | String | (None) | The kind of lakehouse storage used by of Fluss such as Paimon, Iceberg, Hudi. Now, only support Paimon. |
Table Configuration
Option | Type | Default | Description |
---|---|---|---|
bucket.num | int | The bucket number of Fluss cluster. | The number of buckets of a Fluss table. |
bucket.key | String | (none) | Specific the distribution policy of the Fluss table. Data will be distributed to each bucket according to the hash value of bucket-key. If you specify multiple fields, delimiter is ','. If the table is with primary key, you can't specific bucket key currently. The bucket keys will always be the primary key(excluding partition key). If the table is not with primary key, you can specific bucket key, and when the bucket key is not specified, the data will be distributed to each bucket randomly. |
table.log.ttl | Duration | 7 days | The time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted. |
table.auto-partition.enabled | Boolean | false | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. |
table.auto-partition.time-unit | ENUM | (none) | The time granularity for auto created partitions. Valid values are 'HOUR', 'DAY', 'MONTH', 'QUARTER', 'YEAR'. If the value is 'HOUR', the partition format for auto created is yyyyMMddHH. If the value is 'DAY', the partition format for auto created is yyyyMMdd. If the value is 'MONTH', the partition format for auto created is yyyyMM. If the value is 'QUARTER', the partition format for auto created is yyyyQ. If the value is 'YEAR', the partition format for auto created is yyyy. |
table.auto-partition.num-precreate | Integer | 4 | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. |
table.auto-partition.num-retention | Integer | -1 | The number of history partitions to retain for auto created partitions in each check for auto partition. The default value is -1 which means retain all partitions. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. |
table.auto_partitioning.time-zone | String | the system time zone | The time zone for auto partitions, which is by default the same as the system time zone. |
table.replication.factor | Integer | (None) | The replication factor for the log of the new table. When it's not set, Fluss will use the cluster's default replication factor configured by default.replication.factor. It should be a positive number and not larger than the number of tablet servers in the Fluss cluster. A value larger than the number of tablet servers in Fluss cluster will result in an error when the new table is created. |
table.log.format | Enum | ARROW | The format of the log records in log store. The default value is 'ARROW'. The supported formats are 'ARROW' and 'INDEXED'. |
table.kv.format | Enum | COMPACTED | The format of the kv records in kv store. The default value is 'COMPACTED'. The supported formats are 'COMPACTED' and 'INDEXED'. |
table.log.tiered.local-segments | Integer | 2 | The number of log segments to retain in local for each table when log tiered storage is enabled. It must be greater that 0. The default is 2. |
table.datalake.enabled | Boolean | false | Whether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage. |
Config by Flink SQL
When you create a table in Fluss by Flink, you can specify the table configuration by Flink in with options, like:
CREATE TABLE my_table (
id INT,
name STRING,
PRIMARY KEY (id)
) WITH (
'bucket.num' = '2',
'table.log.ttl' = '7 days',
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'DAY');
note
Currently, we don't support alter table configuration by Flink. This will be supported soon.
Client Configuration
Option | Type | Default | Description |
---|---|---|---|
client.id | String | "" | An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging. |
client.connect-timeout | Duration | 120s | The Netty client connect timeout. |
bootstrap.servers | List | (None) | A list of host/port pairs to use for establishing the initial connection to the Fluss cluster. The list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down) |
client.writer.buffer.memory-size | MemorySize | 64mb | The total bytes of memory the writer can use to buffer internal rows. |
client.writer.buffer.page-size | MemorySize | 128kb | Size of every page in memory buffers ('client.writer.buffer.memory-size'). |
client.writer.batch-size | MemorySize | 2mb | The writer or walBuilder will attempt to batch records together into one batch for the same bucket. This helps performance on both the client and the server. |
client.writer.legacy.batch-size | MemorySize | 64kb | The writer or walBuilder will attempt to batch records together into one batch for the same bucket. This helps performance on both the client and the server. |
client.writer.batch-timeout | Duration | 100ms | The writer groups ay rows that arrive in between request sends into a single batched request. Normally this occurs only under load when rows arrive faster than they can be sent out. However in some circumstances the writer may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay, that is, rather than immediately sending out a row, the writer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get client.writer.batch-size worth of rows for a bucket it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this bucket we will delay for the specified time waiting for more records to show up. This setting defaults to 100ms |
client.writer.bucket.no-key-assigner | Enum | STICKY | The bucket assigner for no key table. For table with bucket key or primary key, we choose a bucket based on a hash of the key. For these table without bucket key and primary key, we can use this option to specify bucket assigner, the candidate assigner is ROUND_ROBIN, STICKY, the default assigner is STICKY. ROUND_ROBIN: this strategy will assign the bucket id for the input row by round robin. STICKY: this strategy will assign new bucket id only if the batch changed in record accumulator, otherwise the bucket id will be the same as the front record. |
client.writer.acks | String | all | The number of acknowledgments the writer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0: If set to 0, then the writer will not wait for any acknowledgment from the server at all. No guarantee can be mode that the server has received the record in this case. acks=1: This will mean the leader will write the record to its local log but will respond without awaiting full acknowledge the record but before the followers have replicated it then the record will be lost. acks=-1 (all): This will mean the leader will wait for the full ser of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive, This is the strongest available guarantee. |
client.writer.request-max-size | MemorySize | 10mb | The maximum size of a request in bytes. This setting will limit the number of record batches the writer will send in a single request to avoid sending huge requests. Note that this retry is no different than if the writer resent the row upon receiving the error. |
client.writer.retries | Integer | Integer.MAX_VALUE | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. |
client.writer.enable-idempotence | Boolean | true | Writer idempotence is enabled by default if no conflicting config are set. If conflicting config are set and writer idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting config are set, a ConfigException is thrown |
client.writer.max-inflight-requests-per-bucket | Integer | 5 | The maximum number of unacknowledged requests per bucket for writer. This configuration can work only if 'client.writer.enable-idempotence' is set to true. When the number of inflight requests per bucket exceeds this setting, the writer will wait for the inflight requests to complete before sending out new requests. This setting defaults to 5 |
client.request-timeout | Duration | 30s | The timeout for a request to complete. If user set the write ack to -1, this timeout is the max time that delayed write try to complete. The default setting is 30 seconds. |
client.scanner.log.check-crc | Boolean | true | Automatically check the CRC3 of the read records for LogScanner. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. |
client.scanner.log.max-poll-records | Integer | 500 | The maximum number of records returned in a single call to poll() for LogScanner. Note that this config doesn't impact the underlying fetching behavior. The Scanner will cache the records from each fetch request and returns them incrementally from each poll. |
client.lookup.queue-size | Integer | 256 | The maximum number of pending lookup operations. |
client.lookup.max-batch-size | Integer | 128 | The maximum batch size of merging lookup operations to one lookup request. |
client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for lookup operations. |
client.scanner.remote-log.prefetch-num | Integer | 4 | The number of remote log segments to keep in local temp file for LogScanner, which download from remote storage. The default setting is 4. |
client.scanner.io.tmpdir | String | System.getProperty("java.io.tmpdir") + "/fluss" | Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily |
client.remote-file.download-thread-num | Integer | 3 | The number of threads the client uses to download remote files. |
client.filesystem.security.token.renewal.backoff | Duration | 1h | The time period how long to wait before retrying to obtain new security tokens for filesystem after a failure. |
client.filesystem.security.token.renewal.time-ratio | Double | 0.75 | Ratio of the tokens's expiration time when new credentials for access filesystem should be re-obtained. |
client.metrics.enabled | Boolean | true | Enable metrics for client. When metrics is enabled, the client will collect metrics and report by the JMX metrics reporter. |
Config by Flink SQL
- When you create a table in Fluss by Flink, you can specify the client configuration by Flink in with options, like:
CREATE TABLE my_table (
id INT,
name STRING,
PRIMARY KEY (id)
) WITH (
'bucket.num' = '2',
'client.writer.acks ' = 'all');
- Also, you can change the client configuration use Flink SQL Hints like:
INSERT INTO my_table /*+ OPTIONS('client.writer.acks' = '0') */
SELECT *
FROM my_source