Fluss Java Client
Overview
Fluss Admin
API that supports asynchronous operations for managing and inspecting Fluss resources. It communicates with the Fluss cluster and provides methods for:
- Managing databases (create, drop, list)
- Managing tables (create, drop, list)
- Managing partitions (create, drop, list)
- Retrieving metadata (schemas, snapshots, server information)
Fluss Table
API allows you to interact with Fluss tables for reading and writing data.
Dependency
In order to use the client, you need to add the following dependency to your pom.xml
file.
<!-- https://mvnrepository.com/artifact/com.alibaba.fluss/fluss-client -->
<dependency>
<groupId>com.alibaba.fluss</groupId>
<artifactId>fluss-client</artifactId>
<version>0.6.0</version>
</dependency>
Initialization
Connection
is the main entry point for the Fluss Java client. It is used to create Admin
and Table
instances.
The Connection
object is created using the ConnectionFactory
class, which takes a Configuration
object as an argument.
The Configuration
object contains the necessary configuration parameters for connecting to the Fluss cluster, such as the bootstrap servers.
The Connection
object is thread-safe and can be shared across multiple threads. It is recommended to create a
single Connection
instance per application and use it to create multiple Admin
and Table
instances.
Table
and Admin
instances, on the other hand, are not thread-safe and should be created for each thread that needs to access them.
Caching or pooling of Table
and Admin
is not recommended.
Create a new Admin
instance:
// creating Connection object to connect with Fluss cluster
Configuration conf = new Configuration();
conf.setString("bootstrap.servers", "localhost:9123");
Connection connection = ConnectionFactory.createConnection(conf);
// obtain Admin instance from the Connection
Admin admin = connection.getAdmin();
admin.listDatabases().get().forEach(System.out::println);
// obtain Table instance from the Connection
Table table = connection.getTable(TablePath.of("my_db", "my_table");
System.out.println(table.getTableInfo());
Working Operations
All methods in FlussAdmin
return CompletableFuture
objects. You can handle these in two ways:
Blocking Operations
For synchronous behavior, use the get()
method:
// Blocking call
List<String> databases = admin.listDatabases().get();
Asynchronous Operations
For non-blocking behavior, use the thenAccept
, thenApply
, or other methods:
admin.listDatabases()
.thenAccept(databases -> {
System.out.println("Available databases:");
databases.forEach(System.out::println);
})
.exceptionally(ex -> {
System.err.println("Failed to list databases: " + ex.getMessage());
return null;
});
Creating Databases and Tables
Creating a Database
// Create database descriptor
DatabaseDescriptor descriptor = DatabaseDescriptor.builder()
.comment("This is a test database")
.customProperty("owner", "data-team")
.build();
// Create database (true means ignore if exists)
admin.createDatabase("my_db", descriptor, true) // non-blocking call
.thenAccept(unused -> System.out.println("Database created successfully"))
.exceptionally(ex -> {
System.err.println("Failed to create database: " + ex.getMessage());
return null;
});
Creating a Table
Schema schema = Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("created_at", DataTypes.TIMESTAMP())
.column("is_active", DataTypes.BOOLEAN())
.primaryKey("id")
.build();
// Use the schema in a table descriptor
TableDescriptor tableDescriptor = TableDescriptor.builder()
.schema(schema)
.distributedBy(1, "id") // Distribute by the id column with 1 buckets
// .partitionedBy("") // Partition by the partition key
.build();
TablePath tablePath = TablePath.of("my_db", "user_table");
admin.createTable(tablePath, tableDescriptor, false).get(); // blocking call
TableInfo tableInfo = admin.getTableInfo(tablePath).get(); // blocking call
System.out.println(tableInfo);
Table API
Writers
In order to write data to Fluss tables, first you need to create a Table instance.
TablePath tablePath = TablePath.of("my_db", "user_table");
Table table = connection.getTable(tablePath);
In Fluss we have both Primary Key Tables and Log Tables, so the client provides different functionality depending on the table type.
You can use an UpsertWriter
to write data to a Primary Key table, and an AppendWriter
to write data to a Log Table.
table.newUpsert().createWriter();
table.newAppend().createWriter();
Let's take a look at how to write data to a Primary Key table.
List<User> users = List.of(
new User("1", 20, LocalDateTime.now() , true),
new User("2", 22, LocalDateTime.now() , true),
new User("3", 23, LocalDateTime.now() , true),
new User("4", 24, LocalDateTime.now() , true),
new User("5", 25, LocalDateTime.now() , true)
);
Note: Currently data in Fluss is written in the form of rows
, so we need to convert our POJO to GenericRow
, while the Fluss community is working to provide
a more user-friendly API for writing data.
Table table = connection.getTable(tablePath);
List<GenericRow> rows = users.stream().map(user -> {
GenericRow row = new GenericRow(4);
row.setField(0, BinaryString.fromString(user.getId()));
row.setField(1, user.getAge());
row.setField(2, TimestampNtz.fromLocalDateTime(user.getCreatedAt()));
row.setField(3, user.isActive());
return row;
}).collect(Collectors.toList());
System.out.println("Upserting rows to the table");
UpsertWriter writer = table.newUpsert().createWriter();
// upsert() is a non-blocking call that sends data to Fluss server with batching and timeout
rows.forEach(writer::upsert);
// call flush() to blocking the thread until all data is written successfully
writer.flush();
For a Log table you can use the AppendWriter
API to write data.
table.newAppend().createWriter().append(row);
Scanner
In order to read data from Fluss tables, first you need to create a Scanner instance. Then users can subscribe to the table buckets and start polling for records.
LogScanner logScanner = table.newScan()
.createLogScanner();
int numBuckets = table.getTableInfo().getNumBuckets();
System.out.println("Number of buckets: " + numBuckets);
for (int i = 0; i < numBuckets; i++) {
System.out.println("Subscribing to bucket " + i);
logScanner.subscribeFromBeginning(i);
}
long scanned = 0;
Map<Integer, List<String>> rowsMap = new HashMap<>();
while (true) {
System.out.println("Polling for records...");
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
for (TableBucket bucket : scanRecords.buckets()) {
for (ScanRecord record : scanRecords.records(bucket)) {
InternalRow row = record.getRow();
// Process the row
...
}
}
scanned += scanRecords.count();
}
Lookup
You can also use the Fluss API to perform lookups on a table. This is useful for querying specific records based on their primary key.
LookupResult lookup = table.newLookup().createLookuper().lookup(rowKey).get();