聊聊claudb的SlaveReplication
时间:2022-07-24
本文章向大家介绍聊聊claudb的SlaveReplication,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
序
本文主要研究一下claudb的SlaveReplication
SlaveReplication
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/SlaveReplication.java
public class SlaveReplication implements RespCallback {
private static final DatabaseKey MASTER_KEY = safeKey("master");
private static final Logger LOGGER = LoggerFactory.getLogger(SlaveReplication.class);
private static final String SYNC_COMMAND = "SYNC";
private final RespClient client;
private final DBServerContext server;
private final DBCommandProcessor processor;
private final String host;
private final int port;
public SlaveReplication(DBServerContext server, Session session, String host, int port) {
this.server = server;
this.host = host;
this.port = port;
this.client = new RespClient(host, port, this);
this.processor = new DBCommandProcessor(server, session);
}
public void start() {
client.start();
server.setMaster(false);
server.getAdminDatabase().put(MASTER_KEY, createState(false));
}
public void stop() {
client.stop();
server.setMaster(true);
}
@Override
public void onConnect() {
LOGGER.info("Connected with master");
client.send(array(string(SYNC_COMMAND)));
server.getAdminDatabase().put(MASTER_KEY, createState(true));
}
@Override
public void onDisconnect() {
LOGGER.info("Disconnected from master");
server.getAdminDatabase().put(MASTER_KEY, createState(false));
}
@Override
public void onMessage(RedisToken token) {
token.accept(RedisTokenVisitor.builder()
.onString(string -> {
processRDB(string);
return null;
})
.onArray(array -> {
processor.processCommand(array);
return null;
}).build());
}
private void processRDB(StringRedisToken token) {
try {
SafeString value = token.getValue();
server.importRDB(toStream(value));
LOGGER.info("loaded RDB file from master");
} catch (IOException e) {
LOGGER.error("error importing RDB file", e);
}
}
private InputStream toStream(SafeString value) {
return new ByteBufferInputStream(value.getBytes());
}
private DatabaseValue createState(boolean connected) {
return hash(entry(safeString("host"), safeString(host)),
entry(safeString("port"), safeString(valueOf(port))),
entry(safeString("state"), safeString(connected ? "connected" : "disconnected")));
}
}
- SlaveReplication实现了RespCallback接口,其构造器创建RespClient、DBCommandProcessor;其start方法执行client.start(),标记server为slave及master connect为false;其stop方法执行client.stop()及标记server为master;其onConnect方法执行client.send(array(string(SYNC_COMMAND)))并标记master connect为true;其onDisconnect标记master connect为false;其onMessage方法在onString时执行processRDB,在onArray时执行processor.processCommand(array);processRDB方法执行server.importRDB(toStream(value))
DBCommandProcessor
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/DBCommandProcessor.java
public class DBCommandProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(DBCommandProcessor.class);
private final DBServerContext server;
private final Session session;
public DBCommandProcessor(DBServerContext server) {
this(server, new DefaultSession("dummy", null));
}
public DBCommandProcessor(DBServerContext server, Session session) {
this.server = server;
this.session = session;
}
public void processCommand(ArrayRedisToken token) {
Sequence<RedisToken> array = token.getValue();
StringRedisToken commandToken = (StringRedisToken) array.stream().findFirst().orElse(nullString());
List<RedisToken> paramTokens = array.stream().skip(1).collect(toList());
LOGGER.debug("new command recieved: {}", commandToken);
RespCommand command = server.getCommand(commandToken.getValue().toString());
if (command != null) {
command.execute(request(commandToken, paramTokens));
}
}
private Request request(StringRedisToken commandToken, List<RedisToken> array) {
return new DefaultRequest(server, session, commandToken.getValue(), arrayToList(array));
}
private ImmutableArray<SafeString> arrayToList(List<RedisToken> request) {
RedisTokenVisitor<SafeString> visitor = RedisTokenVisitor.<SafeString>builder()
.onString(StringRedisToken::getValue).build();
return ImmutableArray.from(visit(request.stream(), visitor));
}
}
- DBCommandProcessor的processCommand方法解析commandToken及paramTokens,然后通过server.getCommand(commandToken.getValue().toString())找到对应的RespCommand,然后执行command.execute(request(commandToken, paramTokens))
小结
SlaveReplication实现了RespCallback接口,其构造器创建RespClient、DBCommandProcessor;其start方法执行client.start(),标记server为slave及master connect为false;其stop方法执行client.stop()及标记server为master;其onConnect方法执行client.send(array(string(SYNC_COMMAND)))并标记master connect为true;其onDisconnect标记master connect为false;其onMessage方法在onString时执行processRDB,在onArray时执行processor.processCommand(array);processRDB方法执行server.importRDB(toStream(value))
doc
- SlaveReplication
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 用 Redis 散列实现短网址生成器|文末福利
- 原创|面试官:Java对象一定分配在堆上吗?
- 频繁FGC的真凶原来是它
- 类加载器知识点吐血整理
- ThreadPoolExecutor 线程池"源码分析"
- 一起刷 leetcode 之螺旋矩阵(头条和美团真题)
- 如何快速判断一个用户是否访问过我们的 APP?
- replication-manager之switchover剖析
- 组复制安装部署 | 全方位认识 MySQL 8.0 Group Replication
- 提升低端设备的 Web 性能
- TypeScript 4.0 RC发布,带来诸多更新
- istio mcp实现探究
- K8S 生态周报| Helm v2 进入维护期倒计时
- Halcon实例转OpenCV:计算回形针方向
- OpenCV检测轮廓极点(Python C++)