#YStream客户端使用示例
# 配置数据库YStream Server
创建数据库用户ystream_user,并赋予YSTREAM_CAPTURE权限。
在YashanDB主库开启附加日志,附加日志的开启有两种方式:
表级附加日志:
ALTER TABLE tablename ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY或ALL) COLUMNS;
库级附加日志:
ALTER DATABASE ADD SUPPLEMENTAL LOG TABLE TYPE (HEAP); ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
创建YStream Server,此处使用数据库当前SCN。
EXEC DBMS_YSTREAM_ADM.CREATE('server_1', 'ystream_user', null);
指定要解析的表名,此处指定2张表。
EXEC DBMS_YSTREAM_ADM.ADD_TABLES('server_1', '"sales"."employees","sales"."department"', null);
启动YStream Server。
EXEC DBMS_YSTREAM_ADM.START('server_1');
配置客户端参数,启动YStream解析,客户端示例如下。
# 基于YStream接口开发应用程序
package ystream0;
// Ystreamexample.java
// 演示基于YStream开发的主要步骤
import com.sics.ystream.YstreamClientBoot;
import com.sics.ystream.conf.StartMode;
import com.sics.ystream.conf.YstreamConfig;
import com.sics.ystream.result.LogPosition;
import com.sics.ystream.result.Position;
import com.sics.ystream.result.SystemChangeNumber;
import java.util.Arrays;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.stream.Collectors;
public class Ystreamexample {
private static final String HOST = "192.168.1.2";
private static final String PORT = "1688";
private static final String USER = "ystream_user";
private static final String PASSWORD = "ystream_password";
private static final String SERVER_NAME = "server_1";
public static void main(String[] args) {
// 解析命令行参数,判断是否断点续传
List<String> argList = Arrays.stream(args).collect(Collectors.toList());
boolean isRecover = Boolean.parseBoolean(argList.get(0));
// 获取YStream客户端实例
YstreamClientBoot<String> client = YstreamClientBoot.getClient();
// 创建YStream客户端配置类
CustomDeserializer customDeserializer = new CustomDeserializer();
YstreamConfig.Builder<String> configBuilder = YstreamConfig.<String>builder();
configBuilder
.setHost(HOST)
.setPort(PORT)
.setUser(USER)
.setPassword(PASSWORD)
.setServerName(SERVER_NAME)
.setStartMode(isRecover ? StartMode.RECOVER : StartMode.START)
.setDeserializer(customDeserializer);
if (isRecover) {
Position recoverPosition =
new Position(
new SystemChangeNumber(Long.parseLong(argList.get(1))),
new LogPosition(
Byte.parseByte(argList.get(2)),
Long.parseLong(argList.get(3)),
Integer.parseInt(argList.get(4)),
Integer.parseInt(argList.get(5))));
configBuilder.setRecoverPosition(recoverPosition);
}
YstreamConfig<String> config = configBuilder.build();
// 创建定时线程,每3秒清理一次已使用的逻辑日志
Timer timer = new Timer();
TimerTask task =
new TimerTask() {
@Override
public void run() {
Position currentPosition = customDeserializer.getCurrentPosition();
if (currentPosition != null) {
client.setAppliedPosition(currentPosition);
}
}
};
long delay = 0;
long period = 3000L;
timer.scheduleAtFixedRate(task, delay, period);
try {
// 创建YStream服务端连接
client.open(config);
customDeserializer.setCharset(client.getCharset());
customDeserializer.setNationalCharset(client.getNationalCharset());
while (true) {
// 获取增量逻辑日志
String next = client.next();
if (next != null) {
// 打印增量逻辑日志
System.out.println(next);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
timer.cancel();
task.cancel();
// 关闭YStream服务端连接
client.close();
}
}
}
package ystream0;
// CustomDeserializer
// 演示实现Deserializer接口的自定义反序列化器反序列化为SQL
import com.sics.ystream.deserializer.Deserializer;
import com.sics.ystream.exception.YstreamException;
import com.sics.ystream.exception.YstreamIoException;
import com.sics.ystream.exception.YstreamSqlException;
import com.sics.ystream.metadata.Column;
import com.sics.ystream.metadata.ColumnType;
import com.sics.ystream.metadata.TableMetadata;
import com.sics.ystream.result.Position;
import com.sics.ystream.result.YstreamChunk;
import com.sics.ystream.result.YstreamColumn;
import com.sics.ystream.result.YstreamColumns;
import com.sics.ystream.result.YstreamDdl;
import com.sics.ystream.result.YstreamDml;
import com.sics.ystream.result.YstreamLcrInterface;
import com.sics.ystream.result.YstreamXactInterface;
import com.sics.ystream.util.ChunkUtil;
import com.sics.ystream.util.YstreamUtil;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
public class CustomDeserializer implements Deserializer<String> {
private Charset charset;
private Charset nationalCharset;
private YstreamDml lcrCache = null;
private List<byte[]> chunkList = new ArrayList<>();
private Map<Integer, String> map = new TreeMap<>();
private int chunkDataCount;
private Position currentPosition;
public void setCharset(Charset charset) {
this.charset = charset;
}
public void setNationalCharset(Charset nationalCharset) {
this.nationalCharset = nationalCharset;
}
public Position getCurrentPosition() {
return currentPosition;
}
@Override
public String deserialize(YstreamLcrInterface lcr, TableMetadata tableMetadata)
throws YstreamException {
switch (lcr.getLcrType()) {
case YSTREAM_DDL:
YstreamDdl ddl = (YstreamDdl) lcr;
currentPosition = ddl.getPosition();
return ddl.getDdlText() + ";";
case YSTREAM_DML:
final YstreamDml dml = (YstreamDml) lcr;
currentPosition = dml.getPosition();
if (dml.hasChunkData()) {
lcrCache = dml;
chunkDataCount = lcrCache.getChunkDataCount();
assert chunkDataCount > 0;
return "";
} else {
return dml.getStatement(false) + ";";
}
case YSTREAM_CHUNK:
YstreamChunk chunk = (YstreamChunk) lcr;
currentPosition = chunk.getPosition();
if (chunkDataCount > 0) {
if (!chunk.isEnd()) {
chunkList.add(chunk.getBytes());
} else {
try {
map.put(
chunk.getColumn().getColumnId(),
concat(chunkList, chunk.getColumn(), charset, nationalCharset));
} catch (IOException e) {
throw new YstreamIoException(e);
}
chunkDataCount--;
}
}
String sql = lcrCache.getStatement(true);
YstreamColumns newValues = lcrCache.getNewValues();
for (YstreamColumn column : newValues.getColumns()) {
if (!column.isOutRow()) {
map.put(column.getColumn().getColumnId(), getDataString(column));
}
}
List<String> lists = new ArrayList<>(map.values());
lists.addAll(
lcrCache.getOldValues().getColumns().stream()
.map(
o -> {
try {
return getDataString(o);
} catch (YstreamSqlException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList()));
int offset = 0;
for (String s : lists) {
for (int j = offset; j < sql.length(); j++) {
if (sql.charAt(j) == '?') {
sql = sql.replaceFirst("/?", s);
offset += s.length();
}
offset++;
}
}
return sql + "\n";
case YSTREAM_XACT:
final YstreamXactInterface xact = (YstreamXactInterface) lcr;
switch (xact.getXactType()) {
case BEGIN:
return "begin;";
case COMMIT:
currentPosition = xact.getPosition();
return "commit;";
default:
throw new UnsupportedOperationException(String.format("Not support data, %s.", xact));
}
case YSTREAM_METADATA:
return "";
default:
throw new UnsupportedOperationException(
String.format("lcr type error: %s.", lcr.getLcrType().toString()));
}
}
private static String getDataString(YstreamColumn column) throws YstreamSqlException {
int dataType = column.getColumn().getDataType();
if (ColumnType.isCharacterType(dataType) || ColumnType.isBlobType(dataType) || ColumnType.isClobType(dataType)) {
return YstreamUtil.singleQuotation(column.getString());
}
return column.getString();
}
private static String concat(
List<byte[]> chunkList, Column column, Charset charset, Charset nCharset)
throws IOException, YstreamSqlException {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
for (byte[] bytes : chunkList) {
stream.write(bytes);
}
return YstreamUtil.singleQuotation(ChunkUtil.parseString(stream.toByteArray(), column, charset, nCharset));
}
}