Netty高级篇
时间:2022-07-23
本文章向大家介绍Netty高级篇,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
本文将用netty实现一个简单的RPC框架。
一、什么叫RPC?
RPC,远程调用,就是A程序部署在1号机器上,B程序部署在2号机器上,A可以像调本地方法一样地去调用B程序,而不需要程序员额外地编写这个交互过程,这就叫RPC远程调用。dubbo、Ribbon、openfeign都是RPC框架。
二、RPC调用过程
- 请求:服务消费者发送请求 ---> 编码 ---> 通过网络发送 ---> 服务提供者接收请求 ---> 解码 ---> 处理请求
- 响应:服务提供者处理完请求进行响应 ---> 编码 ---> 通过网络发送 ---> 消费者接收响应 ---> 解码 ---> 处理响应
这两个流程中,我们只需要关心发送请求和处理响应即可,中间那些过程对程序员来说都是透明的。所以,要实现RPC框架,要处理的就是封装中间那些步骤。
三、实现思路
假如现在我们需要调用服务提供者HelloServiceImpl的hello方法,调用流程如下:
- 客户端创建代理对象,通过Netty,远程连接服务端,将参数发送过去。
- 服务端收到参数,传给hello方法,接收到返回值,再发送给客户端。
四、代码实操
1、HelloService接口:
public interface HelloService {
String hello(String msg);
}
2、客户端:
- HelloServiceImpl:**
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String msg) {
if (StringUtils.isNotBlank(msg)) {
return"收到客户端消息:[" + msg + "]";
}
else return "收到客户端消息";
}
}
- NettyServerHandler:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg = " + msg);
if (msg.toString().startsWith("HelloService#hello#")){
String param = msg.toString().substring(msg.toString().lastIndexOf("#") + 1);
String result = new HelloServiceImpl().hello(param);
ctx.writeAndFlush(result);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
- NettyServer:
public class NettyServer {
public static void startService(String host, int port){
startServer0(host, port);
}
private static void startServer0(String host, int port){
EventLoopGroup bossgroup = new NioEventLoopGroup(1);
EventLoopGroup workergroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossgroup, workergroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
System.out.println("provider is in active");
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
bossgroup.shutdownGracefully();
workergroup.shutdownGracefully();
}
}
}
- ServerBootStrap
public class ServerBootStrap {
public static void main(String[] args){
NettyServer.startService("127.0.0.1", 8848);
}
}
3、客户端:
- NettyClientHandler:
/**
* 流程:业务代码远程调用某个方法,被代理对象拦截,进而执行call方法,
* call方法请求服务端,然后在waiting着,服务器处理完请求将响应返回给另外一个方法,
* 即read方法,read方法收到响应后,所以就应该唤醒call方法中等待的线程,继续往下执行。
* @author: zhu
* @date: 2020/8/22 16:09
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
private String result;
private String param;
/**
* 被代理对象调用,发送数据给服务器,然后等待被唤醒
* @return
* @throws Exception
*/
@Override
public synchronized Object call() throws Exception {
// 将参数发送给服务端
context.writeAndFlush(param);
// 发送之后就等待被唤醒
wait();
// 被唤醒的时候read方法已经拿到result了,这里直接return即可
return result;
}
/**
* 与服务器连接创建成功后被调用
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
/**
* 收到服务器数据后被调用
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify(); // 唤醒call方法
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
public void setParam(String param) {
this.param = param;
}
}
- NettyClient :
public class NettyClient {
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler clientHandler;
/**
* 初始化
*/
private static void initClient(){
clientHandler = new NettyClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(clientHandler);
}
});
try {
bootstrap.connect("127.0.0.1", 8848).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Object getBean(final Class<?> serviceClass, final String providerName){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[] {serviceClass}, (proxy, method, args) -> {
if (clientHandler == null){
initClient();
}
// 设置要发给服务器端的参数
clientHandler.setParam(providerName + args[0]);
return executorService.submit(clientHandler).get();
});
}
}
- ClientBootStrap:
public class ClientBootStrap {
// 定义协议头
private static final String providerName = "HelloService#hello#";
public static void main(String[] args){
// 创建消费者
NettyClient customer = new NettyClient();
HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
String result = service.hello("扣你吉瓦");
System.out.println(result);
}
}
先启动ServerBootStrap,然后启动ClientBootStrap去调用服务端的hello方法,最后可以成功拿到返回结果。
- hdu----149850 years, 50 colors(最小覆盖点)
- hdu------1281 棋盘游戏(最小覆盖点)
- hdu-----(1179)Ollivanders: Makers of Fine Wands since 382 BC.(二分匹配)
- hdu-----(1151)Air Raid(最小覆盖路径)
- hdu-----(1150)Machine Schedule(最小覆盖点)
- 【重磅】微软Facebook联手发布AI生态系统,CNTK+Caffe2+PyTorch挑战TensorFlow
- hduoj-----(1068)Girls and Boys(二分匹配)
- 使用Django suit或Bootstrap美化admin模板
- hdu---------(1026)Ignatius and the Princess I(bfs+dfs)
- hdu-----(1113)Word Amalgamation(字符串排序)
- HDUoj-------(1128)Self Numbers
- cf------(round 2)A. Winner
- cf------(round)#1 C. Ancient Berland Circus(几何)
- MySQL配置TokuDB的简单总结
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 编写第二个页面:新闻阅读列表页面
- jQuery用于请求服务器的函数
- jQuery的简单使用
- 编写第一个小程序页面
- 在CentOS7下安装MongoDB
- redis集群搭建
- redis慢查询日志,php安装redis扩展,redis存储session,redis主从配置
- jQuery介绍与常见选择器的使用
- redis常用操作,redis操作键值,redis安全设置
- redis介绍,redis安装,redis持久化,redis数据类型
- AJAX的post请求与上传文件
- memcached的一些简单使用
- nosql介绍,memrcached介绍,安装memcached,查看memcachedq状态
- 如何在IDEA2017创建Maven的Web工程
- JSP上传文件与导出Excel表