public class ClusterCanalConnector extends Object implements CanalConnector
构造器和说明 |
---|
ClusterCanalConnector(String username,
String password,
String destination,
CanalNodeAccessStrategy accessStrategy) |
限定符和类型 | 方法和说明 |
---|---|
void |
ack(long batchId)
进行 batch id 的确认。
|
boolean |
checkValid()
检查下链接是否合法
几种case下链接不合法:
1.
|
void |
connect()
链接对应的canal server
|
void |
disconnect()
释放链接
|
com.alibaba.otter.canal.protocol.Message |
get(int batchSize)
获取数据,自动进行确认,该方法返回的条件:尝试拿batchSize条记录,有多少取多少,不会阻塞等待
|
com.alibaba.otter.canal.protocol.Message |
get(int batchSize,
Long timeout,
TimeUnit unit)
获取数据,自动进行确认
该方法返回的条件:
a.
|
CanalNodeAccessStrategy |
getAccessStrategy() |
SimpleCanalConnector |
getCurrentConnector() |
String |
getPassword() |
int |
getRetryInterval() |
int |
getRetryTimes() |
int |
getSoTimeout() |
String |
getUsername() |
com.alibaba.otter.canal.protocol.Message |
getWithoutAck(int batchSize)
不指定 position 获取事件,该方法返回的条件: 尝试拿batchSize条记录,有多少取多少,不会阻塞等待
canal 会记住此 client 最新的position。 |
com.alibaba.otter.canal.protocol.Message |
getWithoutAck(int batchSize,
Long timeout,
TimeUnit unit)
不指定 position 获取事件.
|
void |
rollback()
回滚到未进行
ack 的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿 |
void |
rollback(long batchId)
回滚到未进行
ack 的地方,指定回滚具体的batchId |
void |
setAccessStrategy(CanalNodeAccessStrategy accessStrategy) |
void |
setPassword(String password) |
void |
setRetryInterval(int retryInterval) |
void |
setRetryTimes(int retryTimes) |
void |
setSoTimeout(int soTimeout) |
void |
setUsername(String username) |
void |
subscribe()
客户端订阅,不提交客户端filter,以服务端的filter为准
|
void |
subscribe(String filter)
客户端订阅,重复订阅时会更新对应的filter信息
说明:
a.
|
void |
unsubscribe()
取消订阅
|
public ClusterCanalConnector(String username, String password, String destination, CanalNodeAccessStrategy accessStrategy)
public void connect() throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
connect
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public boolean checkValid()
CanalConnector
几种case下链接不合法: 1. 链接canal server失败,一直没有一个可用的链接,返回false 2. 当前客户端在进行running抢占的时候,做为备份节点存在,非处于工作节点,返回false 说明: a. 当前客户端一旦做为备份节点存在,当前所有的对CanalConnector的操作都会处于阻塞状态,直到转为工作节点 b. 所以业务方最好定时调用checkValid()方法用,比如调用CanalConnector所在线程的interrupt,直接退出CanalConnector,并根据自己的需要退出自己的资源
checkValid
在接口中 CanalConnector
public void disconnect() throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
disconnect
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void subscribe() throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
subscribe
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void subscribe(String filter) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
说明: a. 如果本次订阅中filter信息为空,则直接使用canal server服务端配置的filter信息 b. 如果本次订阅中filter信息不为空,目前会直接替换canal server服务端配置的filter信息,以本次提交的为准 TODO: 后续可以考虑,如果本次提交的filter不为空,在执行过滤时,是对canal server filter + 本次filter的交集处理,达到只取1份binlog数据,多个客户端消费不同的表
subscribe
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void unsubscribe() throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
unsubscribe
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public com.alibaba.otter.canal.protocol.Message get(int batchSize) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
get
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public com.alibaba.otter.canal.protocol.Message get(int batchSize, Long timeout, TimeUnit unit) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
该方法返回的条件: a. 拿够batchSize条记录或者超过timeout时间 b. 如果timeout=0,则阻塞至拿到batchSize记录才返回
get
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public com.alibaba.otter.canal.protocol.Message getWithoutAck(int batchSize) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
getWithoutAck
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public com.alibaba.otter.canal.protocol.Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
该方法返回的条件: a. 拿够batchSize条记录或者超过timeout时间 b. 如果timeout=0,则阻塞至拿到batchSize记录才返回canal 会记住此 client 最新的position。
getWithoutAck
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void rollback(long batchId) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
ack
的地方,指定回滚具体的batchIdrollback
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void rollback() throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
ack
的地方,下次fetch的时候,可以从最后一个没有 ack
的地方开始拿rollback
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public void ack(long batchId) throws com.alibaba.otter.canal.protocol.exception.CanalClientException
CanalConnector
ack
在接口中 CanalConnector
com.alibaba.otter.canal.protocol.exception.CanalClientException
public String getUsername()
public void setUsername(String username)
public String getPassword()
public void setPassword(String password)
public int getSoTimeout()
public void setSoTimeout(int soTimeout)
public int getRetryTimes()
public void setRetryTimes(int retryTimes)
public int getRetryInterval()
public void setRetryInterval(int retryInterval)
public CanalNodeAccessStrategy getAccessStrategy()
public void setAccessStrategy(CanalNodeAccessStrategy accessStrategy)
public SimpleCanalConnector getCurrentConnector()
Copyright © 2013. All rights reserved.