目录

目录1. 背景2. 远程过程调用的定义3. 远程过程调用的优点4. 远程过程调用的缺点5. 实例5.1. 客户端5.2. 远程对象5.3. 服务端6. 参考文献

1. 背景

这学期上了《分布式系统》课程,内容主要是基于Java实现分布式计算,所以老师前几节课主要在给我们讲用Java做分布式可能会用到的一些技术。为了方便学习和记录,我将老师讲的内容结合书籍和资料做了一些整理,这一篇主要讨论远程过程调用。

2. 远程过程调用的定义

远程过程调用是一种进程间通信技术,用于基于客户端-服务器的应用程序。它也被称为子例程调用或函数调用。

客户端有一个请求消息,RPC将其转换并发送给服务器。此请求可以是对远程服务器的过程或函数调用。当服务器接收到请求时,它将所需的响应发送回客户机。客户端在服务器处理调用时被阻塞,只有在服务器完成后才恢复执行。[1]

远程过程调用的顺序如下:

客户端存根由客户端调用。客户端存根发出一个系统调用,将消息发送到服务器,并将参数放入消息中。客户端操作系统将消息从客户端发送到服务器。消息由服务器操作系统传递到服务器存根。服务器存根从消息中删除参数。然后,服务器存根调用服务器执行操作。执行完成后,结果原路返回到客户端。

过程如下图所示

3. 远程过程调用的优点

RPC的一些优点如下:

远程过程调用支持面向过程和面向线程的模型。RPC的内部消息传递机制对用户隐藏。重写和重新开发代码的工作量在远程过程调用中是最小的。远程过程调用可以在分布式环境中使用,也可以在本地环境中使用。为了提高性能,RPC省略了许多协议层。

4. 远程过程调用的缺点

RPC的一些缺点如下:

远程过程调用是一个可以用不同方式实现的概念。它不是一个标准。RPC对于硬件架构没有灵活性。它只基于交互。远程过程调用而增加了成本。

5. 实例

Java 中可以通过使用Socket、反射机制和动态代理来实现一个RPC框架。

5.1. 客户端

5.1.1. 客户端实现

package service;import rpc.DynamicProxyFactory;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class clienTest {static void test(int threadId) {// 这里实现了一个登陆的操作 // 实现细节并不重要,所以没有给出 loginService 的代码 // 通过动态代理与服务器建立连接 loginService loginservice =DynamicProxyFactory.getProxy(loginService.class, "localhost", 8000);// 执行远程操作 String Result = loginservice.login("123", "123");// 输出结果 System.out.println(String.format("Client Thread %d result= %s",threadId, Result));}public static void main(String[] args) throws Exception {int threadNum = 4;// 创建多线程 ExecutorService executor = Executors.newFixedThreadPool(4);// 多线程调用 for (int i = 0; i < 4; i++) {int finalI = i;executor.submit(() -> {test(finalI + 1);});}}}

5.1.2. 动态代理类

package rpc;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Proxy;// 动态代理类public class DynamicProxyFactory {@SuppressWarnings("unchecked")public static <T> T getProxy(final Class<T> classType, final String host, final int port) {//泛型方法,传入什么类型返回什么类型InvocationHandler handler = (proxy, method, args) -> {Connector connector = null;try {connector = new Connector(host, port);RemoteCall call = new RemoteCall(classType.getName(), method.getName(), method.getParameterTypes(), args);connector.send(call);call = (RemoteCall) connector.receive();return call.getResult();} finally {if (connector != null) connector.close();}};System.out.println("代理开始执行");return (T) Proxy.newProxyInstance(classType.getClassLoader(),new Class<?>[]{classType}, handler);}}

5.1.3. 连接器

package rpc;import java.io.InputStream;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.io.OutputStream;import java.net.Socket;public class Connector {private String host;private int port;private Socket skt;private InputStream is;private ObjectInputStream ois;private OutputStream os;private ObjectOutputStream oos;public Connector(String host, int port) throws Exception {this.host = host;this.port = port;connect(host, port);}// 发送对象方法 public void send(Object obj) throws Exception {oos.writeObject(obj);}// 接收对象方法 public Object receive() throws Exception {return ois.readObject();}// 建立与远程服务器的连接 public void connect() throws Exception {connect(host, port);}// 建立与远程服务器的连接 public void connect(String host, int port) throws Exception {skt = new Socket(host, port);os = skt.getOutputStream();oos = new ObjectOutputStream(os);is = skt.getInputStream();ois = new ObjectInputStream(is);}// 关闭连接 public void close() {try {ois.close();oos.close();skt.close();} catch (Exception e) {System.out.println(" Conector.close :" + e);}}}

5.2. 远程对象

package rpc;import java.io.Serializable;//相当于一个javaBeanpublic class RemoteCall implements Serializable {private static final long serialVersionUID = 1L;private String className;// 表示服务类名或接口名private String methodName; // 表示功能方法名private Class<?>[] paramTypes;//表示方法参数类型 private Object[] params;//表示方法参数值/如果方法正常执行,则resul 为方法返回值,如果方法抛出异常,则resul 为该异常 private Object result;public RemoteCall() {}public RemoteCall(String className, String methodName, Class<?>[] paramTypes, Object[] params) {this.className = className;this.methodName = methodName;this.paramTypes = paramTypes;this.params = params;}public String getClassName() {return className;}public void setClassName(String className) {this.className = className;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName = methodName;}public Class<?>[] getParamTypes() {return paramTypes;}public void setParamTypes(Class<?>[] paramTypes) {this.paramTypes = paramTypes;}public Object[] getParams() {return params;}public void setParams(Object[] params) {this.params = params;}public Object getResult() {return result;}public void setResult(Object result) {this.result = result;}public String toString() {return "className=" + className + " methodName=" + methodName;}}

5.3. 服务端

package rpc;import java.io.InputStream;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.io.OutputStream;import java.lang.reflect.Method;import java.net.Proxy;import java.net.ServerSocket;import java.net.Socket;import java.util.HashMap;import java.util.Map;import service.loginServiceImp;import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class Server {private final static String CLASS_PATH = "service.";// 存放远程对象的缓存 private HashMap<String, Object> remoteObjects = new HashMap<>();//注册服务:把一个远程对象放到缓存中 public void register(String className, Object remoteObject) {remoteObjects.put(className, remoteObject);}// 8000端口负载均衡 public void transferService(ServerSocket serverSocket, int threadId) throws Exception {//随机数生成器 final Random random = new Random();// 循环运行 while (true) {System.out.println(String.format("Thread:%d 负载均衡启动......", threadId));// 创建hashMap HashMap<String, Object> map = new HashMap<>();// 等待服务 Socket socket = serverSocket.accept();// 获取客户端输入流 ObjectInputStream clientOis = new ObjectInputStream(socket.getInputStream());// 获取客户端输出流 ObjectOutputStream clientOos = new ObjectOutputStream(socket.getOutputStream());// 将客户端传输的对象存入HashMap map.put("clientObject", (RemoteCall) clientOis.readObject());// 随机获取一个端口 int serverIndex = 8001 + random.nextInt(2);// 打开端口 Socket targetSocket = new Socket("localhost", serverIndex);// 输出 System.out.println(String.format("Thread:%d 负载均衡传输任务到端口 %d......", threadId, serverIndex));// 获取输出对象流 ObjectOutputStream oos = new ObjectOutputStream(targetSocket.getOutputStream());// 传输hashMap oos.writeObject(map);// 获取输入对象流 ObjectInputStream serverOis = new ObjectInputStream(targetSocket.getInputStream());// 接收服务器发送的Call 对象 RemoteCall remotecallobj = (RemoteCall) serverOis.readObject();// 向客户端发送对象 clientOos.writeObject(remotecallobj);// 关闭资源 oos.close();socket.close();clientOos.close();}}// 暴露服务,创建基于流的Socket,并在8001、8002 端口监听 public void exportService(ServerSocket serverSocket, int threadId) throws Exception {// 循环运行 while (true) {System.out.println(String.format("Thread:%d 服务器启动......", threadId));Socket socket = serverSocket.accept();// 获取负载服务器输入流 ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());// 获取负载服务器输出流 ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());// 获取负载服务器传入的hashMap HashMap<String, Object> map = (HashMap<String, Object>) ois.readObject();// 接收客户发送的Call 对象 RemoteCall remotecallobj = (RemoteCall) map.get("clientObject");// 输出 System.out.println(remotecallobj);// 调用相关对象的方法 System.out.println(String.format("Thread:%d calling......", threadId));remotecallobj = invoke(remotecallobj);// 向客户发送包含了执行结果的remotecallobj 对象 oos.writeObject(remotecallobj);ois.close();oos.close();socket.close();}}public RemoteCall invoke(RemoteCall call) {Object result = null;try {String className = call.getClassName();String methodName = call.getMethodName();Object[] params = call.getParams();Class<?> classType = Class.forName(className);Class<?>[] paramTypes = call.getParamTypes();Method method = classType.getMethod(methodName, paramTypes);// 从hashmap缓存中取出相关的远程对象Object Object remoteObject = remoteObjects.get(className);if (remoteObject == null) {throw new Exception(className + " 的远程对象不存在");} else {result = method.invoke(remoteObject, params);System.out.println("远程调用结束:remotObject:" + remoteObject.toString() + ",params:" + params.toString());}} catch (Exception e) {System.out.println("错误:" + e.getMessage());}call.setResult(result);return call;}public static void main(String args[]) throws Exception {// 线程数量 int threadNum = 6;// 初始化 Server server = new Server();// 创建多线程 ExecutorService executor = Executors.newFixedThreadPool(threadNum);//把事先创建的RemoteServceImpl 对象加人到服务器的缓存中 //在服务注册中心注册服务 server.register(CLASS_PATH + "loginService", new loginServiceImp());// 多线程运行 Future[] future = new Future[threadNum];// 创建端口数组 ServerSocket[] serverSocket = new ServerSocket[threadNum];// 创建8000端口 ServerSocket port8000 = new ServerSocket(8000);// 创建8001端口 ServerSocket port8001 = new ServerSocket(8001);// 创建8002端口 ServerSocket port8002 = new ServerSocket(8002);for (int i = 0; i < threadNum; i++) {// 第一、二个线程端口为8000,负责调度资源,负载均衡 if (i <= 1) {serverSocket[i] = port8000;} else if (i <= 3) {// 第三、四个线程端口为8001,模拟第一台主机 serverSocket[i] = port8001;} else {// 第五、六个线程端口为8002,模拟第二台主机 serverSocket[i] = port8002;}}// 循环创建线程 for (int i = 0; i < threadNum; i++) {int finalI = i;future[i] = executor.submit(() -> {try {// 8000端口负载均衡 if (finalI <= 1) {server.transferService(serverSocket[finalI], finalI + 1);} else {//打开网络端口,接受外部请求,执行服务功能,返回结果 server.exportService(serverSocket[finalI], finalI + 1);}} catch (Exception e) {e.printStackTrace();}return 1;});}// 循环获取线程结果,阻塞同步 for (int i = 0; i < threadNum; i++) {future[i].get();}// 关闭端口 for (int i = 0; i < threadNum; i++) {serverSocket[i].close();}}}

6. 参考文献

[1] https://www.tutorialspoint.com/remote-procedure-call-rpc

联系邮箱:curren_wong@163.com

Github:https://github.com/CurrenWong

欢迎转载/Star/Fork,有问题欢迎通过邮箱交流。

分类: 百科知识 标签: 暂无标签

评论

暂无评论数据

暂无评论数据

目录