EventBus源码学习笔记(二)
EventBus深入学习二
开始研究源码的设计思路,从
Listener
注册出发,EventBus
如何维护监听者信息,到Publisher
发送消息,消息以怎样的渠道分发给所有的Listener
, 顺序如何保证,传递性如何保证,出现异常如何处理,找不到监听者怎么处理等等
EventBus
这个类相当于一个中转站,
Publisher
调用它的post(Object)
来推送事件;然后将事件一次推送给注册的Listener
1. 注册关系的维护
在初始化s时, EventBus
对象会维护一个 private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
实例, 这个就是维护订阅关系的核心类
注册方法如下
/**
* Registers all subscriber methods on {@code object} to receive events.
*
* @param object object whose subscriber methods should be registered.
*/
public void register(Object object) {
subscribers.register(object);
}
接着我们看下这个类的具体实现
SubscriberRegistry.java
/**
* All registered subscribers, indexed by event type.
*
* <p>The {@link CopyOnWriteArraySet} values make it easy and relatively lightweight to get an
* immutable snapshot of all current subscribers to an event without any locking.
*/
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
/**
* Registers all subscriber methods on the given listener object.
*/
void register(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
eventSubscribers = MoreObjects.firstNonNull(
subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
subscribers : - 对象初始化时创建 - 维护的是EventType
-> Listener
的映射关系,value为一个集合,说明一个事件可以推送给多个Listener
- 监听者,可以有可以监听多个不同类型的事件
注册流程: - 根据注册的对象,将其中所有的回调方法都捞出来 - 将上步的结果塞入 subscribers
集合中; key为 Listener
的类名
注册流程详解
注册目的就是发布消息后,
EventBus
可以将这个Event
传递”Listener
(即订阅方)
为了实现上面的目的,如果要我们自己实现,会怎么做?
- 将类中,所有包含
@Subscribe
注解的方法捞出来 - 方法的第一个参数就是
Event
, 因为注册的目的是为了实现回调, 所以封装一个类,包含这个Listener
对象的引用 + 要执行的方法
上面注册的实际实现和上面的步骤差不多
- 获取所有包含注解的方法
- 实际的代码如下
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
for (Class<?> supertype : supertypes) {
for (Method method : supertype.getDeclaredMethods()) {
if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
// TODO(cgdecker): Should check for a generic parameter type and error out
Class<?>[] parameterTypes = method.getParameterTypes();
checkArgument(parameterTypes.length == 1,
"Method %s has @Subscribe annotation but has %s parameters."
+ "Subscriber methods must have exactly 1 parameter.",
method, parameterTypes.length);
MethodIdentifier ident = new MethodIdentifier(method);
if (!identifiers.containsKey(ident)) {
identifiers.put(ident, method);
}
}
}
}
return ImmutableList.copyOf(identifiers.values());
}
- 将上面的方法转换为Map, 看这个
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
- key为事件类型
Event.class
; value为一个包含Listener
,Method
,EventBus
实例的对象Subscriber
- key为事件类型
- 将上面的map塞入
subscribers
集合-
subscribers
集合包含的是所有的 (事件 -> 监听者回调集合)Event
->Set<Listener.Method>
- 简单的迭代即可实现塞数据了
- 根据
subscribers
的数据结构,其实可以看到,一个Listener
对象,如果注册多次,最终的效果其实是一样的,这个监听者,并不会被调用多次; 如果一个Lisntener
类,有多个对象,则注册后,每个对象的回调都会执行到;- 实现原因: Set 集合存储
Subscriber
对象 -
Subscriber
的 hashcode & equals 方法没有重写
- 实现原因: Set 集合存储
-
到此, 注册完毕;注销的方法和上面差不多,唯一的区别是最后一个是向 subscribers
塞数据,一个是从其中删数据而已
题外话
如果我们想获取工程中所有包含某个注解的类可以怎么办?
- 如果是用spring的话, 可以考虑 `ApplicationContext.getBeansWithAnnotation()`
获取工程中,所有包含某个注解的方法,除了上面的主动注册,有什么其他的方法?
###2. 推送事件
发布方,调用
EventBus.post(Object)
方法实现消息的推送
预测
正式开始之前,我们可以先预测一下,当发布方调用了这个方法之后,会执行那些action
- 根据事件类型,可以从注册关系表
subscribers
中获取出所有的监听者,以及对应的回调方法, 放在一个集合中 - 因为监听的先后顺序可能有要求,那么将上面的集合进行排序
- 循环遍历上面,依次执行
上面是正向的操作流程,接着一些异常情况和边界也需要考虑下
- 如果一个事件找不到订阅者如何处理
- 如果某个监听者执行完毕之后,希望其之后的监听者都不能接受这个事件(类似web应用中的拦截器,如果被拦截了,如果被拦截了,后面的拦截器也没必要继续执行)
- 某个监听者很遗憾的抛出了个异常,会不会整调链路都挂掉
深入解读
带着上面的臆测,来实际看下EventBus
自己是怎么玩的
/**
* 将事件推送给所有的监听者,不管监听者是否正常处理,都是正确返回
* Posts an event to all registered subscribers. This method will return
* successfully after the event has been posted to all subscribers, and
* regardless of any exceptions thrown by subscribers.
*
* 如果一个事件没有监听者,且该事件不是 DeadEvent, 则转为 DeadEvent并重新推送
* <p>If no subscribers have been subscribed for {@code event}'s class, and
* {@code event} is not already a {@link DeadEvent}, it will be wrapped in a
* DeadEvent and reposted.
*
* @param event event to post.
*/
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
上面的解释比较清楚, 基本上核心的推送就是 dispatcher.dispatch(event, eventSubscribers);
实际的使用的是 PerThreadQueuedDispatcher
推送代码如下,逻辑比较清晰,将Event
塞入队列, 然后将队列中的所有消息依次推送给所有的订阅者
- 可以参考下面的设计,优雅的避免重复推送的问题
- 考虑为什么要先写入队列,然后再依次推送队列中的事件
/**
* Per-thread queue of events to dispatch.
*/
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};
/**
* Per-thread dispatch state, used to avoid reentrant event dispatching.
*/
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
最终真正执行推送Event
的是这个方法 com.google.common.eventbus.Subscriber#dispatchEvent
/**
* Dispatches {@code event} to this subscriber using the proper executor.
*/
final void dispatchEvent(final Object event) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
/**
* Invokes the subscriber method. This method can be overridden to make the invocation
* synchronized.
*/
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
3. 图解
上面从源码的角度,对整个流程顺了一遍,下面的图对几个主要的类结构进行了抽取,并对上面的几个方法进行了简要的说明
图一, 将上面说明的几个类属性 + 方法进行了说明
图二, 对逻辑进行列举
4. 新技能
1.根据class,获取所有超类集合 (EventBus的实际使用中,Event的超类集合都塞入了缓存,加快查询速度)
TypeToken.of(concreteClass).getTypes().rawTypes());
2.获取类中标有注解的方法
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz, Class annotationClz) {
Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
for (Class<?> supertype : supertypes) {
for (Method method : supertype.getDeclaredMethods()) {
if (method.isAnnotationPresent(annotationClz) && !method.isSynthetic()) {
// TODO(cgdecker): Should check for a generic parameter type and error out
Class<?>[] parameterTypes = method.getParameterTypes();
MethodIdentifier ident = new MethodIdentifier(method);
if (!identifiers.containsKey(ident)) {
identifiers.put(ident, method);
}
}
}
}
return ImmutableList.copyOf(identifiers.values());
}
private static final class MethodIdentifier {
private final String name;
private final List<Class<?>> parameterTypes;
MethodIdentifier(Method method) {
this.name = method.getName();
this.parameterTypes = Arrays.asList(method.getParameterTypes());
}
@Override
public int hashCode() {
return Objects.hashCode(name, parameterTypes);
}
@Override
public boolean equals(@Nullable Object o) {
if (o instanceof MethodIdentifier) {
MethodIdentifier ident = (MethodIdentifier) o;
return name.equals(ident.name) && parameterTypes.equals(ident.parameterTypes);
}
return false;
}
}
- Python爬虫(十五)_案例:使用bs4的爬虫
- Python爬虫(十六)_JSON模块与JsonPath
- 多类好米齐交易:域名776.cn近10万元结拍
- Python爬虫(八)_Requests的使用
- Python爬虫(十一)_案例:使用正则表达式的爬虫
- Python爬虫(十二)_XPath与lxml类库
- 区块链域名热度不减 健康类英文米近六位交易
- Python爬虫(九)_非结构化数据与结构化数据
- Python爬虫(十)_正则表达式
- python爬虫(四)_urllib2库的基本使用
- 投资人榴莲又卖出一枚三拼域名
- python爬虫(五)_urllib2:Get请求和Post请求
- python爬虫(七)_urllib2:urlerror和httperror
- 双拼市场好!米友售出域名chuijia.com
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法