public class SimpleCanalConnector extends Object implements CanalConnector
构造器和说明 |
---|
SimpleCanalConnector(SocketAddress address,
String username,
String password,
String destination) |
SimpleCanalConnector(SocketAddress address,
String username,
String password,
String destination,
int soTimeout) |
限定符和类型 | 方法和说明 |
---|---|
void |
ack(long batchId)
进行 batch id 的确认。
|
boolean |
checkValid()
检查下链接是否合法
几种case下链接不合法:
1.
|
void |
connect()
链接对应的canal server
|
void |
disconnect()
释放链接
|
com.alibaba.otter.canal.protocol.Message |
get(int batchSize)
获取数据,自动进行确认,该方法返回的条件:尝试拿batchSize条记录,有多少取多少,不会阻塞等待
|
com.alibaba.otter.canal.protocol.Message |
get(int batchSize,
Long timeout,
TimeUnit unit)
获取数据,自动进行确认
该方法返回的条件:
a.
|
SocketAddress |
getAddress() |
String |
getPassword() |
int |
getSoTimeout() |
String |
getUsername() |
com.alibaba.otter.canal.protocol.Message |
getWithoutAck(int batchSize)
不指定 position 获取事件,该方法返回的条件: 尝试拿batchSize条记录,有多少取多少,不会阻塞等待
canal 会记住此 client 最新的position。 |
com.alibaba.otter.canal.protocol.Message |
getWithoutAck(int batchSize,
Long timeout,
TimeUnit unit)
不指定 position 获取事件.
|
void |
rollback()
回滚到未进行
ack 的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿 |
void |
rollback(long batchId)
回滚到未进行
ack 的地方,指定回滚具体的batchId |
void |
setRollbackOnConnect(boolean rollbackOnConnect) |
void |
setRollbackOnDisConnect(boolean rollbackOnDisConnect) |
void |
setSoTimeout(int soTimeout) |
void |
setZkClientx(com.alibaba.otter.canal.common.zookeeper.ZkClientx zkClientx) |
void |
subscribe()
客户端订阅,不提交客户端filter,以服务端的filter为准
|
void |
subscribe(String filter)
客户端订阅,重复订阅时会更新对应的filter信息
说明:
a.
|
void |
unsubscribe()
取消订阅
|
public SimpleCanalConnector(SocketAddress address, String username, String password, String destination)
public SimpleCanalConnector(SocketAddress address, String username, String password, String destination, int soTimeout)
public void connect() throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
connect
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void disconnect() throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
disconnect
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void subscribe() throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
subscribe
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void subscribe(String filter) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
说明: a. 如果本次订阅中filter信息为空,则直接使用canal server服务端配置的filter信息 b. 如果本次订阅中filter信息不为空,目前会直接替换canal server服务端配置的filter信息,以本次提交的为准 TODO: 后续可以考虑,如果本次提交的filter不为空,在执行过滤时,是对canal server filter + 本次filter的交集处理,达到只取1份binlog数据,多个客户端消费不同的表
subscribe
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void unsubscribe() throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
unsubscribe
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public com.alibaba.otter.canal.protocol.Message get(int batchSize) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
get
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public com.alibaba.otter.canal.protocol.Message get(int batchSize, Long timeout, TimeUnit unit) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
该方法返回的条件: a. 拿够batchSize条记录或者超过timeout时间 b. 如果timeout=0,则阻塞至拿到batchSize记录才返回
get
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public com.alibaba.otter.canal.protocol.Message getWithoutAck(int batchSize) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
getWithoutAck
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public com.alibaba.otter.canal.protocol.Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
该方法返回的条件: a. 拿够batchSize条记录或者超过timeout时间 b. 如果timeout=0,则阻塞至拿到batchSize记录才返回canal 会记住此 client 最新的position。
getWithoutAck
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void ack(long batchId) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
ack
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void rollback(long batchId) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
ack
的地方,指定回滚具体的batchIdrollback
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void rollback() throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
ack
的地方,下次fetch的时候,可以从最后一个没有 ack
的地方开始拿rollback
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public boolean checkValid()
CanalConnector
几种case下链接不合法: 1. 链接canal server失败,一直没有一个可用的链接,返回false 2. 当前客户端在进行running抢占的时候,做为备份节点存在,非处于工作节点,返回false 说明: a. 当前客户端一旦做为备份节点存在,当前所有的对CanalConnector的操作都会处于阻塞状态,直到转为工作节点 b. 所以业务方最好定时调用checkValid()方法用,比如调用CanalConnector所在线程的interrupt,直接退出CanalConnector,并根据自己的需要退出自己的资源
checkValid
在接口中 CanalConnector
public SocketAddress getAddress()
public String getUsername()
public String getPassword()
public int getSoTimeout()
public void setSoTimeout(int soTimeout)
public void setZkClientx(com.alibaba.otter.canal.common.zookeeper.ZkClientx zkClientx)
public void setRollbackOnConnect(boolean rollbackOnConnect)
public void setRollbackOnDisConnect(boolean rollbackOnDisConnect)
Copyright © 2013. All rights reserved.