#YStream客户端使用介绍

# 连接YStream服务端

# 创建YStream客户端配置类

YStream提供一系列柯里化(Currying)构造方法,用于生成连接配置。

YstreamConfig.<String>builder().build()

参数说明如下表所示。

参数 必填/选填 描述
serverName 必填 数据库实例所在服务器IP地址或域名。
port 必填 数据库服务端口。
user 必填 数据库用户名。
password 必填 数据库用户密码。
database 必填 数据库名称(保留参数,但不校验内容)。
deserializer 必填 反序列化器实现,用于指定YStream客户端输出的内容,提供默认实现 DefaultDeserializer。
startMode 选填 启动模式,分为第一次启动(StartMode.START)和断点续传(StartMode.RECOVER),默认值为StartMode.START。
queueSize 选填 YStream客户端内置阻塞队列的长度,获取增量逻辑日志时直接从该队列获取,默认值为128。
pollTimeout 选填 从阻塞队列中获取下一个结果的超时时间(单位:秒),默认值为10。
clientResponseTimeout 选填 YStream服务端等待YStream客户端响应的最长时间(单位:秒),默认值为60。
recoverPosition 选填 StartMode.RECOVER模式必填,用于指定断点续传的点位。

# 实现Deserializer接口

YStream客户端提供将逻辑日志的内容反序列化为调用者指定类型的接口,调用者可根据实际情况自行实现该接口或直接使用YStream客户端提供的接口。

# 与YStream服务端建立连接

YStream客户端提供如下方法建立与YStream服务端的连接:

YstreamClientBoot.getClient().open(YstreamConfig<T> ystreamConfig)

示例

public static YstreamClientBoot getClient() throws YstreamException {
  // 获取YStream客户端实例
  YstreamClientBoot<String> client = YstreamClientBoot.getClient();
  // 创建YStream客户端配置类
  YstreamConfig<String> config = YstreamConfig.<String>builder()
      .setHost(HOST)
      .setPort(PORT)
      .setUser(USER)
      .setPassword(PASSWORD)
      .setServerName(CONNECTION_NAME)
      .setDeserializer(new StringDeserializer())
      .build();
  // 创建与YStream服务端的连接
  client.open(config);
  return client;
}

# 获取增量逻辑日志

调用YStream客户端的next方法获取增量逻辑日志。

client.next();

# 设置已提交的position

每一条逻辑日志都带有一个唯一且递增的position。逻辑日志的position不会随着进程重启而变化,可以作为断点续传的接续点。

YStream客户端收到增量逻辑日志并确定无需再使用该逻辑日志后(例如该逻辑日志已在目标端应用提交),可以调用setAppliedPosition方法标记已经提交的逻辑日志位置,以便数据库推进YStream重启恢复点,清理无用的归档日志。若长时间未调用setAppliedPosition更新applied position,会导致数据库无法推进YStream重启恢复点,无法及时清理无用的归档日志。

client.setAppliedPosition(appliedPosition);

# 关闭YStream服务端连接

当不再需要获取增量逻辑日志时,可调用YStream客户端的close方法关闭与YStream服务端的连接。

client.close();