摊牌了,我要手写一个RPC

   日期:2020-10-19     浏览:161    评论:0    
核心提示:文章目录前言需要解决的问题手写RPC实战1、定义通信协议2、自定义注解3、定义接口4、实现接口5、暴露服务并监听处理请求6、生成RPC动态代理对象7、消费者注入RPC动态代理对象功能测试尾巴前言RPC是远程过程调用(Remote Procedure Call)的缩写形式。SAP系统RPC调用的原理其实很简单,有一些类似于三层构架的C/S系统,第三方的客户程序通过接口调用SAP内部的标准或自定义函数,获得函数返回的数据进行处理后显示或打印。随着微服务、分布式的大热,开发者慢慢趋向于将一个大的服务拆分

文章目录

  • 前言
  • 需要解决的问题
  • 手写RPC实战
    • 1、定义通信协议
    • 2、自定义注解
    • 3、定义接口
    • 4、实现接口
    • 5、暴露服务并监听处理请求
    • 6、生成RPC动态代理对象
    • 7、消费者注入RPC动态代理对象
  • 功能测试
  • 尾巴

前言

RPC是远程过程调用(Remote Procedure Call)的缩写形式。SAP系统RPC调用的原理其实很简单,有一些类似于三层构架的C/S系统,第三方的客户程序通过接口调用SAP内部的标准或自定义函数,获得函数返回的数据进行处理后显示或打印。

随着微服务、分布式的大热,开发者慢慢趋向于将一个大的服务拆分成多个独立的小的服务。
服务经过拆分后,服务与服务之间的通信就变得至关重要。

RPC说白了就是节点A去调用节点B的服务,站在Java的角度看,就是像调用本地函数一样调用远程函数

需要解决的问题

要想实现RPC,首先需要解决以下几个问题:

  1. 服务之间如何通信?
    Socket 网络IO。
  2. 请求参数、返回结果如何传输?
    Java将对象序列化为字节数组通过网络IO传输。
  3. 接口没有实现类,该如何调用?
    JDK动态代理生成代理对象。
  4. 如何发起远程调用?
    在代理对象中发起Socket请求远程服务器。

手写RPC实战

首先看下目录结构:

1、定义通信协议

消费者发起一个调用请求,服务者必须知道你要调哪个服务,参数是什么,这些需要封装好。

@Data
public class RpcMessage implements Serializable { 
	private static final long serialVersionUID = 1L;

	private String interfaceName;//调用的Service接口名
	private String methodName;//调用的方法名
	private Class<?>[] argsType;//参数类型列表
	private Object[] args;//参数
}

2、自定义注解

分别是服务的提供者和消费者。

@Target({ ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service//引入Spring Service,自动注入IOC容器
// 服务提供者
public @interface MyRpcService { 

}


@Target({ ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 服务消费者
public @interface MyRpcReference { 

}

3、定义接口

public interface UserService { 

	// 根据UserId查找用户
	R<UserResp> findById(Long userId);
}

4、实现接口

加上自定义注解@MyRpcService,后续需要扫描这些实现类,并暴露服务。

@MyRpcService
public class UserServiceImpl implements UserService{ 

	@Override
	public R<UserResp> findById(Long userId) { 
		UserResp userResp = new UserResp();
		userResp.setId(userId);
		userResp.setName("张三");
		userResp.setPwd("root@abc");
		return R.ok(userResp);
	}
}

5、暴露服务并监听处理请求

应用程序启动后,从Spring的IOC容器中,找到加了@MyRpcService注解的服务,并暴露出去。


@Component
public class ProviderListener implements ApplicationListener<ApplicationStartedEvent> { 

	@Override
	public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) { 
		ConfigurableApplicationContext context = applicationStartedEvent.getApplicationContext();
		for (Object bean : context.getBeansWithAnnotation(MyRpcService.class).values()) { 
			ProviderHolder.addService(bean);
		}
		try { 
			ProviderHolder.start();
		} catch (Exception e) { 
			e.printStackTrace();
		}
		System.err.println("provider...启动");
	}
}

暴露服务,处理消费者请求的核心代码


public class ProviderHolder { 
	// 缓存所有的服务提供者
	private static final Map<String, Provider> SERVICES = new ConcurrentHashMap<>();
	// 起一个线程池,处理消费者的请求
	private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

	// 添加服务
	public static void addService(Object bean) { 
		Class<?> beanClass = bean.getClass();
		String interfaceName = beanClass.getInterfaces()[0].getName();
		SERVICES.put(interfaceName, new Provider(bean));
	}

	
	public static void start() throws Exception { 
		if (SERVICES.isEmpty()) { 
			return;
		}
		// 开启ServerSocket,端口3333,监听消费者发起的请求。
		ServerSocket serverSocket = new ServerSocket(3333);
		while (true) { 
			// 当有请求到达,提交一个任务到线程池
			Socket socket = serverSocket.accept();
			EXECUTOR_SERVICE.submit(() -> { 
				try { 
					// 从网络IO中读取消费者发送的参数
					Object o = new ObjectInputStream(socket.getInputStream()).readObject();
					if (o instanceof RpcMessage) { 
						RpcMessage message = (RpcMessage) o;
						// 找到消费者要调用的服务
						Provider provider = SERVICES.get(message.getInterfaceName());
						if (provider == null) { 
							return;
						}
						// 利用反射调用服务
						Object result = provider.invoke(message.getMethodName(), message.getArgsType(), message.getArgs());
						OutputStream outputStream = socket.getOutputStream();
						// 将返回结果序列化为字节数组并通过Socket写回
						outputStream.write(ObjectUtil.serialize(result));
						outputStream.flush();
					}
				} catch (Exception e) { 
					e.printStackTrace();
				}
			});
		}
	}
}

6、生成RPC动态代理对象


public class RpcProxy implements InvocationHandler { 
	private Object origin = new Object();

	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 
		if (Object.class.equals(method.getDeclaringClass())) { 
			return method.invoke(origin, args);
		}
		// 开启一个Socket
		Socket socket = new Socket("127.0.0.1", 3333);
		// 封装请求协议
		RpcMessage message = new RpcMessage();
		message.setInterfaceName(method.getDeclaringClass().getName());
		message.setMethodName(method.getName());
		message.setArgsType(method.getParameterTypes());
		message.setArgs(args);
		// 将请求参数序列化成字节数组通过网络IO写回
		OutputStream outputStream = socket.getOutputStream();
		outputStream.write(ObjectUtil.serialize(message));
		outputStream.flush();
		// 阻塞,等待服务端处理完毕返回结果
		Object o = new ObjectInputStream(socket.getInputStream()).readObject();
		// 返回给调用者
		return o;
	}
}

7、消费者注入RPC动态代理对象


@Component
public class RpcBeanPostProcessor implements BeanPostProcessor { 

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { 
		Class<?> beanClass = bean.getClass();
		Field[] fields = ClassUtil.getDeclaredFields(beanClass);
		for (Field field : fields) { 
			if (field.getAnnotation(MyRpcReference.class) == null) { 
				continue;
			}
			Object proxy = Proxy.newProxyInstance(beanClass.getClassLoader(), new Class[]{ field.getType()}, new RpcProxy());
			field.setAccessible(true);
			try { 
				field.set(bean, proxy);
			} catch (IllegalAccessException e) { 
				e.printStackTrace();
			}
		}
		return bean;
	}
}

功能测试

核心代码写好了,那就可以开始测试功能是否符合预期了。

1、启动服务提供者

2、启动消费者,并发起一个请求

尾巴

基于篇幅原因,本文只是实现了RPC最基本最简单的功能,主要是理解RPC的思想。
当然,还有很多可以优化的点:

  1. Service暴露的所有方法缓存起来,每次调用再反射查找开销还是很大的。
  2. 使用Netty提升网络IO的通信性能。
  3. 连接池的引入。
  4. 注册中心的加入。
  5. 写回的数据没有包装协议。
  6. 数据格式的扩展,请求头的加入。
 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服