ONOS集群选举分析
首先简单介绍下自己,之前是做 floodlight 控制器开发的,鉴于 ODL 和 onos 的如火如荼的发展,如果不对了解点就感觉自己 OUT 了,因此忙里偷闲,看了点 onos的源码收获颇丰,不敢私藏,也算是抛砖引玉。
对于 onos,我认真读的也就是集群这块,也大概浏览了下其他模块的源码。onos中有些精巧的代码完全可以用于其他项目,比如,最短路径算法, floodligth的实现嵌入到了具体模块,而且不支持多路径, 而 onos提供了三种最短路径算法,而且原生支持多路径,而且模块化做的非常好。我也参考 onos 的部分设计,并且应用于公司项目中。此外,Java 8 的表达力比 Java 7 的表达力的提升在 onos 中体现的淋漓尽致,比如在有些功能相近的模块,floodlight 的实现比 onos 要冗余很多。
总之,onos 整体代码质量要远高于 floodligth。
打算写成一个系列。大体列下提纲:
- 集群选举
- onos 中 Raft 协议实现概论
- onos 中 gossip 协议的实现
- 集群基本原语支持,onos 支持分布式的 ConcurrentHashMap,AtomicCount,Set 等等
- 可以用于其他项目的设计,代码。
本篇主要分析 onos 集群选举的代码路径。
集群协议概述
集群选举, onos 用的 Raft 协议。至于为什么不用 poxos, 我不清楚, 但现在越来越多出现一个趋势,就是大家偏向于用 Raft 代替 Poxos。 原因就是 Raft比较简单。
这里说趋势, 是基于目前 Raft 算法实现和 poxos 协议实现的数量。另外, 我也发现 Harvard,Standford 和 CMU 已经在他们的分布式课程中将原来的 Paxos 替换为 Raft,原因可用参见这里, 而且 Raft 还有官网, 里面包含丰富的资源,而 Paxos 只有论文。所以, 总体趋势上看 Raft 已经渐渐变为主流。
基于 paxos 的实现,我们目前知道的就是 zookeeper, ceph 都实现了 paxos,而 zookeeper 实际并不是精确的 paxos 实现,而是经过修改的 ZAB 协议。最近,腾讯开源了他们的 paxos 实现 phxpaxos,因此,大量的分布式项目依赖 zookeeper 不足为奇。
而 Raft 协议,我大体了解 Raft 的官方网站 Raft 协议的实现情况, 发现基于 java 完整实现的只有copycat, jraft, jgroups-raft, RaftKVDatabase/JSimpleDB, C5 replicator。(其中 C++ 和 Go 的也都有 5 个)
- Jraft : 缺少文档,
- jsimpledb : 并不是只实现 Raft
- C5 replicator : 实现了 Raft 协议
- jgropu-raft : 实现了 Raft 协议
需要说明的是这些项目的 star 都很低,应该没有成熟到可以应用到生产中。相较于其他实现, copycat 还通过Jepsen 来测试 Raft 协议,而其他项目没有。 由此可见, 实现一个完整的可用的 Raft 目前来说还是非常有挑战的。
onos 选择 copycat 作为其 Raft 协议的实现, 从上面分析来说, copycat 的选择是没有问题的。
ONOS 集群选举
注: 本文基于 onos 1.6 分支来进行分析。
ONOS 对集群的选举暴露出了一组接口,如下所示。
public interface LeaderElector extends DistributedPrimitive {
Leadership run(String topic, NodeId nodeId);
void withdraw(String topic);
boolean anoint(String topic, NodeId nodeId);
boolean promote(String topic, NodeId nodeId);
void evict(NodeId nodeId);
Leadership getLeadership(String topic);
Map<String, Leadership> getLeaderships();
void addChangeListener(Consumer<Change<Leadership>> consumer);
void removeChangeListener(Consumer<Change<Leadership>> consumer); }
即 run, withdraw, anoint, promote, evict, 关于它们的用法, 文档解释得非常清楚,这里就直接搬运过来。
Distributed mutual exclusion primitive.
AsyncLeaderElector facilitates mutually exclusive access to a shared
resource by various cluster members.
Each resource is identified by a unique topic name and members register
their desire to access the resource by calling the AsyncLeaderElector's
run method. Access is grated on a FIFO basis.
An instance can unregister itself from the leadership election by calling
AsyncLeaderElector's withdraw method.
If an instance currently holding the resource dies then the next instance
waiting to be leader (in FIFO order) will be automatically granted access
to the resource.
One can register listeners to be notified when a leadership change occurs.
The Listeners are notified via a Leadership Change change subject.
Additionally, AsyncLeaderElector provides methods to query the current
state of leadership for topics. /
创建一个选举器的流程
在 NewDistributedLeadershipStore.java 文件中有这样一段代码
LeaderElector leaderElector
StorageService storageService;
leaderElector = storageService.leaderElectorBuilder()
.withName("onos-leadership-elections")
.build()
.asLeaderElector();
这里创建了一个 leaderElector, 后面代码都是对该段代码的注解。StorageManager 实现了 StorageService 接口。
StorageManager.leaderElectorBuilder()
.withName("onos-leadership-elections")
.build()
.asLeaderElector();
StorageManager.leaderElectorBuilder() 调用了 new DefaultLeaderElectorBuilder(federatedPrimitiveCreator)
public class StorageManager implements StorageService, StorageAdminService {
protected PartitionService partitionService;
private DistributedPrimitiveCreator federatedPrimitiveCreator;
@Activate
public void activate() {
Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
partitionService.getAllPartitionIds().stream()
.filter(id -> !id.equals(PartitionId.from(0)))
.forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap);
transactions = this.<TransactionId, Transaction.State>consistentMapBuilder()
.withName("onos-transactions")
.withSerializer(Serializer.using(KryoNamespaces.API,
Transaction.class,
Transaction.State.class))
.buildAsyncMap();
transactionCoordinator = new TransactionCoordinator(transactions);
log.info("Started");
}
@Override
public LeaderElectorBuilder leaderElectorBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultLeaderElectorBuilder(federatedPrimitiveCreator);
}
....
}
此外,DefaultLeaderElectorBuilder 继承了 LeaderElectorBuilder:
LeaderElectorBuilder leaderElectorBuilder = new DefaultLeaderElectorBuilder(federatedPrimitiveCreator)
leaderElectorBuilder
.withName("onos-leadership-elections")
.build()
.asLeaderElector();
leaderElectorBuilder.withName(“onos-leadership-elections”).build() 返回 AsyncLeaderElector 类型对象 asyncLeaderElector
asyncLeaderElector.asLeaderElector()
AsyncLeaderElector 的 asLeaderElector() 调用了它的 asLeaderElector(long timeoutMillis) 方法,asyncLeaderElector.asLeaderElector(Long.MAX_VALUE) //任务超时时间为 Long.MAX_VALUE。AsyncLeaderElector 的 asLeaderElector(Long.MAX_VALUE) 方法调用了 DefaultLeaderElector 构造函数 DefaultLeaderElector(AsyncLeaderElector asyncElector, long operationTimeoutMillis)。
new DefaultLeaderElector(this, timeoutMillis),其中 this 为 AsyncLeaderElector 的实例化对象 asyncLeaderElector。
注: DefaultLeaderElector 将 LeaderElector 的所有方法通过 CompletableFuture 变为异步操作。
其中 DefaultLeaderElector 实现了 LeaderElector,而 DefaultLeaderElector 实现所有 LeaderElector 的方法依赖构造函数 AsyncLeaderElector, 因此, 问题回到了 leaderElectorBuilder.withName(“onos-leadership-elections”).build() 实际实例化的对象。即 new DefaultLeaderElectorBuilder(federatedPrimitiveCreator).withName(“onos-leadership-elections”).build() 到底做了什么, 其中
public class DefaultLeaderElectorBuilder extends LeaderElectorBuilder {
public DefaultLeaderElectorBuilder(DistributedPrimitiveCreator primitiveCreator) {
this.primitiveCreator = primitiveCreator;
}
public AsyncLeaderElector build() {
return primitiveCreator.newAsyncLeaderElector(name());
}
}
因此, 决定于 new FederatedDistributedPrimitiveCreator(partitionMap).newAsyncLeaderElector(“onos-leadership-elections”)
public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiveCreator {
private final TreeMap<PartitionId, DistributedPrimitiveCreator> members;
private final List<PartitionId> sortedMemberPartitionIds;
public FederatedDistributedPrimitiveCreator(Map<PartitionId, DistributedPrimitiveCreator> members) {
this.members = Maps.newTreeMap();
this.members.putAll(checkNotNull(members));
this.sortedMemberPartitionIds = Lists.newArrayList(members.keySet());
}
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
checkNotNull(name);
Map<PartitionId, AsyncLeaderElector> leaderElectors =
Maps.transformValues(members,
partition -> partition.newAsyncLeaderElector(name));
Hasher<String> hasher = topic -> {
int hashCode = Hashing.sha256().hashString(topic, Charsets.UTF_8).asInt();
return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
};
return new PartitionedAsyncLeaderElector(name, leaderElectors, hasher);
}
}
其中 PartitionedAsyncLeaderElector 实现如下
public class PartitionedAsyncLeaderElector implements AsyncLeaderElector {
private final String name;
private final TreeMap<PartitionId, AsyncLeaderElector> partitions = Maps.newTreeMap();
private final Hasher<String> topicHasher;
public PartitionedAsyncLeaderElector(String name,
Map<PartitionId, AsyncLeaderElector> partitions,
Hasher<String> topicHasher) {
this.name = name;
this.partitions.putAll(checkNotNull(partitions));
this.topicHasher = checkNotNull(topicHasher);
}
@Override
public String name() {
return name;
}
@Override
public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
return getLeaderElector(topic).run(topic, nodeId);
}
@Override
public CompletableFuture<Void> withdraw(String topic) {
return getLeaderElector(topic).withdraw(topic);
}
@Override
public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
return getLeaderElector(topic).anoint(topic, nodeId);
}
@Override
public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
return getLeaderElector(topic).promote(topic, nodeId);
}
@Override
public CompletableFuture<Void> evict(NodeId nodeId) {
return CompletableFuture.allOf(getLeaderElectors().stream()
.map(le -> le.evict(nodeId))
.toArray(CompletableFuture[]::new));
}
@Override
public CompletableFuture<Leadership> getLeadership(String topic) {
return getLeaderElector(topic).getLeadership(topic);
}
@Override
public CompletableFuture<Map<String, Leadership>> getLeaderships() {
Map<String, Leadership> leaderships = Maps.newConcurrentMap();
return CompletableFuture.allOf(getLeaderElectors().stream()
.map(le -> le.getLeaderships().thenAccept(m -> leaderships.putAll(m)))
.toArray(CompletableFuture[]::new))
.thenApply(v -> leaderships);
}
@Override
public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> listener) {
return CompletableFuture.allOf(getLeaderElectors().stream()
.map(map -> map.addChangeListener(listener))
.toArray(CompletableFuture[]::new));
}
@Override
public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> listener) {
return CompletableFuture.allOf(getLeaderElectors().stream()
.map(map -> map.removeChangeListener(listener))
.toArray(CompletableFuture[]::new));
}
/**
* Returns the leaderElector (partition) to which the specified topic maps.
* @param topic topic name
* @return AsyncLeaderElector to which topic maps
*/
private AsyncLeaderElector getLeaderElector(String topic) {
return partitions.get(topicHasher.hash(topic));
}
}
因此, 一个 LeaderElector 实际调用的是实现了 AsyncLeaderElector 接口的 PartitionedAsyncLeaderElector,至此, 一个选举器实现貌似已经完成了。 当你准备研究 onos 是如何实现选举过程时,看看 withdraw, anoint, promote 的实现, 你心中一定是”万马奔腾”的。
那么, 下面我们就继续看看选举过程的具体方法是如何实现的, 实现细节藏在哪里。对于 AsyncLeaderElector 定义的所有接口, 都通过 getLeaderElector(String topic) 来实现。
那 partitions 到底包含什么? 由上面 StorageManager 分析知道, partitions 的实参是 partitionMap。而 partitionMap 又由 PartitionService partitionService 来提供, PartitionManager 实现了接口 PartitionService。
public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
implements PartitionService, PartitionAdminService {
protected ClusterMetadataService metadataService;
private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
@Activate
public void activate() {
currentClusterMetadata.set(metadataService.getClusterMetadata());
currentClusterMetadata.get()
.getPartitions()
.stream()
.filter(partition -> !partition.getId().equals(PartitionId.from(0))) // exclude p0
.forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
messagingService,
clusterService,
CatalystSerializers.getSerializer(),
new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
}
@Override
public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
checkPermission(PARTITION_READ);
return partitions.get(partitionId).client();
}
@Override
public Set<PartitionId> getAllPartitionIds() {
checkPermission(PARTITION_READ);
return partitions.keySet();
}
}
这样说来, partitions.get(topicHasher.hash(topic)),实际对象为 toragePartition 了。
public class StoragePartition implements Managed<StoragePartition> {
private final MessagingService messagingService;
private final MessagingService messagingService;
private final ClusterService clusterService;
private final File logFolder;
private Partition partition;
private NodeId localNodeId;
private StoragePartitionServer server;
private StoragePartitionClient client;
public StoragePartition(Partition partition,
MessagingService messagingService,
ClusterService clusterService,
Serializer serializer,
File logFolder) {
this.partition = partition;
this.messagingService = messagingService;
this.clusterService = clusterService;
this.localNodeId = clusterService.getLocalNode().id();
this.serializer = serializer;
this.logFolder = logFolder;
}
/**
* Returns the partition client instance.
* @return client
*/
public StoragePartitionClient client() {
return client;
}
@Override
public CompletableFuture<Void> open() {
if (partition.getMembers().contains(localNodeId)) {
openServer();
}
return openClient().thenAccept(v -> isOpened.set(true))
.thenApply(v -> null);
}
private CompletableFuture<StoragePartitionClient> openClient() {
client = new StoragePartitionClient(this,
serializer,
new CopycatTransport(CopycatTransport.Mode.CLIENT,
partition.getId(),
messagingService));
return client.open().thenApply(v -> client);
}
}
参考 StorageManager activate() 方法可知, partitionMap 的 value 为 StoragePartition 的 client()方法返回值,由上 StoragePartition 的 client() 和 openClient 可知 client() 返回的实际是StoragePartitionClient, 那 StoragePartitionClient 又是什么?
public class StoragePartitionClient implements DistributedPrimitiveCreator, Managed<StoragePartitionClient> {
private final StoragePartition partition;
private final Transport transport;
private final io.atomix.catalyst.serializer.Serializer serializer;
public StoragePartitionClient(StoragePartition partition,
io.atomix.catalyst.serializer.Serializer serializer,
Transport transport) {
this.partition = partition;
this.serializer = serializer;
this.transport = transport;
}
@Override
public CompletableFuture<Void> open() {
if (client != null && client.isOpen()) {
return CompletableFuture.completedFuture(null);
}
synchronized (StoragePartitionClient.this) {
copycatClient = new CopycatClient(partition.getMemberAddresses(),
transport,
serializer.clone(),
StoragePartition.RESOURCE_TYPES);
copycatClient.onStateChange(state -> log.debug("Partition {} client state"
+ " changed to {}", partition.getId(), state));
client = new AtomixClient(new ResourceClient(copycatClient));
}
return client.open().whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully started client for partition {}", partition.getId());
} else {
log.info("Failed to start client for partition {}", partition.getId(), e);
}
}).thenApply(v -> null);
}
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
return client.getResource(name, AtomixLeaderElector.class).join();
}
}
至此, 终于明白 partitionMap 的 value 为 StoragePartitionClient。
由 FederatedDistributedPrimitiveCreator 的 newAsyncLeaderElector 得知, LeaderElectors 为partitionMap 的 value 的 newAsyncLeaderElector(name) 返回值, 即 StoragePartitionClient的 newAsyncLeaderElector(name) 方法。
由 newAsyncLeaderElector() 及 open() 可知, client.getResource(name, AtomixLeaderElector.class).join() 最终 AsyncLeaderElector 的实例化是 AtomixLeaderElector。
public abstract class Atomix implements ResourceManager<Atomix> {
protected Atomix(ResourceClient client) {
this.client = Assert.notNull(client, "client");
}
@Override
public <T extends Resource> CompletableFuture<T> getResource(String key, Class<? super T> type) {
Assert.argNot(key.trim().length() == 0, "invalid resource key: key must be of non-zero length");
return client.getResource(key, type, new Resource.Config(), new Resource.Options());
}
}
public class AtomixClient extends Atomix {
}
因此, client.getResource(name, AtomixLeaderElector.class).join() 实际为 ResourceClient(copycatClient).getResource(name, AtomixLeaderElector.class, new Resource.Config(), new Resource.Options()).join()。
public class ResourceClient implements ResourceManager<ResourceClient> {
/**
* @throws NullPointerException if {@code client} or {@code registry} are null
*/
public ResourceClient(CopycatClient client) {
this.client = Assert.notNull(client, "client");
}
@Override
@SuppressWarnings("unchecked")
public <T extends Resource> CompletableFuture<T> getResource(String key, Class<? super T> type, Resource.Config config, Resource.Options options) {
return this.<T>getResource(key, type((Class<? extends Resource<?>>) type), config, options);
}
@Override
@SuppressWarnings("unchecked")
public synchronized <T extends Resource> CompletableFuture<T> getResource(String key, ResourceType type, Resource.Config config, Resource.Options options) {
Assert.notNull(key, "key");
Assert.notNull(type, "type");
Assert.notNull(config, "config");
Assert.notNull(options, "options");
T resource;
// Determine whether a singleton instance of the given resource key already exists.
Resource<?> check = instances.get(key);
if (check == null) {
ResourceInstance instance = new ResourceInstance(key, type, config, this::close);
InstanceClient client = new InstanceClient(instance, this.client);
try {
check = type.factory().newInstance().createInstance(client, options);
instances.put(key, check);
} catch (InstantiationException | IllegalAccessException e) {
return Futures.exceptionalFuture(e);
}
}
// Ensure the existing singleton instance type matches the requested instance type. If the instance
// was created new, this condition will always pass. If there was another instance created of a
// different type, an exception will be returned without having to make a request to the cluster.
if (check.type().id() != type.id()) {
return Futures.exceptionalFuture(new IllegalArgumentException("inconsistent resource type: " + type));
}
resource = (T) check;
// Ensure if a singleton instance is already being created, the existing open future is returned.
CompletableFuture<T> future = futures.get(key);
if (future == null) {
future = resource.open();
futures.put(key, future);
}
return future;
}
}
这里不再过多解释, 总之, 最后为
AtomixLeaderElector atomix = AtomixLeaderElectorFactory.createInstance().open().join()
public class AtomixLeaderElectorFactory implements ResourceFactory<AtomixLeaderElector> {
@Override
public AtomixLeaderElector createInstance(CopycatClient client, Properties options) {
return new AtomixLeaderElector(client, options);
}
}
public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
implements AsyncLeaderElector {
public AtomixLeaderElector(CopycatClient client, Properties properties) {
super(client, properties);
}
@Override
public CompletableFuture<AtomixLeaderElector> open() {
return super.open().thenApply(result -> {
client.onEvent(CHANGE_SUBJECT, this::handleEvent);
return result;
});
}
}
终于 withdraw, anoint, promote 的实现浮出水面, 即 AtomixLeaderElectorCommands 来实现,而实现 io.atomix.copycat.Command, 到此远远没有完, 因为 Command 仅仅是个接口。
至此,我们基本可以确定, onos 并没有实现 Raft 协议,而是通过第三方库 atomix 下的 copycat 实现了 Raft协议。关于具体实现,下篇见分晓。
- .NET Core采用的全新配置系统[4]: “Options模式”下各种类型的Options对象是如何绑定的?
- js运算符优先级笔记
- 通过协同绘制用GAN合成高分辨率无尽道路
- ASP.NET MVC的Model元数据与Model模板:预定义模板
- 为您的组织选择正确的企业云解决方案
- 搞定这些疑难杂症,向css3动画说yes
- 前十一个网络游戏业务收入1341亿 同比增22.1%
- ASP.NET MVC Model元数据及其定制:一个重要的接口IMetadataAware
- 使用Docker 1.12.x构建多容器Web应用程序
- 基于 vue2 + vuex 构建一个具有 45 个页面的大型单页面应用
- 深度解剖dubbo源码
- .NET Core采用的全新配置系统[6]: 深入了解三种针对文件(JSON、XML与INI)的配置源
- 基于 vue2 构建和后台真实交互的 管理系统
- ASP.NET MVC的Model元数据与Model模板:模板的获取与执行策略
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 图像处理笔记(5)---- OpenCV 用滑动条做调色板
- 牛X | 一款比传统数据库快100-1000倍的数据库,认识一下
- SpringBoot统一参数校验
- SpringBoot多邮件源发送邮件
- 一个基础的SpringBoot项目该包含哪些
- leetcode树之平衡二叉树
- 3分钟短文:说说Laravel页面会话之间的数据保存Session用法
- Skywalking Php注册不上问题排查
- 第4章代码-图形几何变换
- 第5章代码-三维观察
- 我的2020 九月iOS面试秘籍,为你的跳槽保驾护航
- SAP Spartacus layout设计原理
- Angular依赖注入的一个例子和注入原理单步调试
- Angular依赖注入的一个常见错误NullInjectorError,No provider for XXX
- Redis系列(十二)scan Info Object等特殊命令集合