Thrift源码分析– 方法调用模型分析

RPC调用本质上就是一种网络编程,客户端向服务器发送消息,服务器拿到消息之后做后续动作。只是RPC这种消息比较特殊,它封装了方法调用,包括方法名,方法参数。服务端拿到这个消息之后,解码消息,然后要通过方法调用模型来完成实际服务器端业务方法的调用。

这篇讲讲Thrfit的方法调用模型。Thrift的方法调用模型很简单,就是通过方法名和实际方法实现类的注册完成,没有使用反射机制,类加载机制。

和方法调用相关的几个核心类:

  1. 自动生成的Iface接口,是远程方法的顶层接口

  2. 自动生成的Processor类及相关父类,包括TProcessor接口,TBaseProcess抽象类

  3. ProcessFunction抽象类,抽象了一个具体的方法调用,包含了方法名信息,调用方法的抽象过程等

  4. TNonblcokingServer,是NIO服务器的默认实现,通过Args参数来配置Processor等信息

  5. FrameBuffer类,服务器NIO的缓冲区对象,这个对象在服务器端收到全包并解码后,会调用Processor去完成实际的方法调用

  6. 服务器端的方法的具体实现类,实现Iface接口

下面逐个来分析相关的类。

Iface接口是自动生成的,描述了方法的接口。 服务器端服务提供方DemoService要实现Iface接口

public class DemoService {

 public interface Iface {

   public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException;

 }
}

public class DemoServiceImpl implements DemoService.Iface{

@Override
public int demoMethod(String param1, Parameter param2,
        Map<String, String> param3) throws TException {

    return 0;
}

}

来看TProcess相关类和接口

  1. TProcessor就定义了一个顶层的调用方法process,参数是输入流和输出流

  2. 抽象类TBaseProcessor提供了TProcessor的process的默认实现,先读消息头,拿到要调用的方法名,然后从维护的一个Map中取ProcessFunction对象。ProcessFunction对象是实际方法的抽象,调用它的process方法,实际是调用了实际的方法。

  3. Processor类是自动生成了,它依赖Iface接口,负责把实际的方法实现和方法的key关联起来,放到Map中维护

public interface TProcessor {
 public boolean process(TProtocol in, TProtocol out)
   throws TException;
}

public abstract class TBaseProcessor<I> implements TProcessor {
private final I iface;
private final Map<String,ProcessFunction<I, ? extends TBase>> processMap;

protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) {
  this.iface = iface;
  this.processMap = processFunctionMap;
}

@Override
public boolean process(TProtocol in, TProtocol out) throws TException {
  TMessage msg = in.readMessageBegin();
  ProcessFunction fn = processMap.get(msg.name);
  if (fn == null) {
    TProtocolUtil.skip(in, TType.STRUCT);
    in.readMessageEnd();
    TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");
    out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));
    x.write(out);
    out.writeMessageEnd();
    out.getTransport().flush();
    return true;
  }
  fn.process(msg.seqid, in, out, iface);
  return true;
}
}
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
   public Processor(I iface) {
     super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
   }

   protected Processor(I iface, Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
     super(iface, getProcessMap(processMap));
   }

   private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
     processMap.put("demoMethod", new demoMethod());
     return processMap;
   }

   private static class demoMethod<I extends Iface> extends org.apache.thrift.ProcessFunction<I, demoMethod_args> {
     public demoMethod() {
       super("demoMethod");
     }

     protected demoMethod_args getEmptyArgsInstance() {
       return new demoMethod_args();
     }

     protected demoMethod_result getResult(I iface, demoMethod_args args) throws org.apache.thrift.TException {
       demoMethod_result result = new demoMethod_result();
       result.success = iface.demoMethod(args.param1, args.param2, args.param3);
       result.setSuccessIsSet(true);
       return result;
     }
   }

 }

自动生成的demoMethod类继承了ProcessFunction类,它负载把方法参数,iface, 方法返回值这些抽象的概念组合在一起,通过抽象模型来完成实际方法的调用。实际方法的实现者实现了Iface接口。

TNonblockingServer是NIO服务器的实现,它通过Selector来检查IO就绪状态,进而调用相关的Channel。就方法调用而言,它处理的是读事件,用handelRead()来进一步处理

private void select() {
     try {
       // wait for io events.
       selector.select();

       // process the io events we received
       Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
       while (!stopped_ && selectedKeys.hasNext()) {
         SelectionKey key = selectedKeys.next();
         selectedKeys.remove();

         // skip if not valid
         if (!key.isValid()) {
           cleanupSelectionKey(key);
           continue;
         }

         // if the key is marked Accept, then it has to be the server
         // transport.
         if (key.isAcceptable()) {
           handleAccept();
         } else if (key.isReadable()) {
           // deal with reads
           handleRead(key);
         } else if (key.isWritable()) {
           // deal with writes
           handleWrite(key);
         } else {
           LOGGER.warn("Unexpected state in select! " + key.interestOps());
         }
       }
     } catch (IOException e) {
       LOGGER.warn("Got an IOException while selecting!", e);
     }
   }

  protected void handleRead(SelectionKey key) {
    FrameBuffer buffer = (FrameBuffer) key.attachment();
    if (!buffer.read()) {
      cleanupSelectionKey(key);
      return;
    }

    // if the buffer's frame read is complete, invoke the method.
    <strong>if (buffer.isFrameFullyRead()) {
      if (!requestInvoke(buffer)) {
        cleanupSelectionKey(key);
      }
    }</strong>
  }

  protected boolean requestInvoke(FrameBuffer frameBuffer) {
  frameBuffer.invoke();
  return true;
}

非阻塞同步IO的NIO服务器都会使用缓冲区来存放读写的中间状态。FrameBuffer就是这样的一个缓冲区,它由于涉及到方法调用,所以提供了invoke()方法来实现对Processor的调用。

public void invoke() {
     TTransport inTrans = getInputTransport();
     TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
     TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());

     try {
       processorFactory_.getProcessor(inTrans).process(inProt, outProt);
       responseReady();
       return;
     } catch (TException te) {
       LOGGER.warn("Exception while invoking!", te);
     } catch (Throwable t) {
       LOGGER.error("Unexpected throwable while invoking!", t);
     }
     // This will only be reached when there is a throwable.
     state_ = FrameBufferState.AWAITING_CLOSE;
     requestSelectInterestChange();
   }

FrameBuffer使用了processorFactory来获得Processor。ProcessorFactory是在创建服务器的时候传递过来的,只是对Processor的简单封装。

protected TServer(AbstractServerArgs args) {
   processorFactory_ = args.processorFactory;
   serverTransport_ = args.serverTransport;
   inputTransportFactory_ = args.inputTransportFactory;
   outputTransportFactory_ = args.outputTransportFactory;
   inputProtocolFactory_ = args.inputProtocolFactory;
   outputProtocolFactory_ = args.outputProtocolFactory;
 }

public class TProcessorFactory {

private final TProcessor processor_;

public TProcessorFactory(TProcessor processor) {
  processor_ = processor;
}

public TProcessor getProcessor(TTransport trans) {
  return processor_;
}
}

public T processor(TProcessor processor) {
    this.processorFactory = new TProcessorFactory(processor);
    return (T) this;
  }

下面是一个实际的TNonblockingServer的配置实例

除了配置服务器运行的基本参数,最重要的就是把实际的服务提供者通过服务器参数的方式作为Processor传递给TNonblockingServer,供FrameBuffer调用。

public class DemoServiceImpl implements DemoService.Iface{

 @Override
 public int demoMethod(String param1, Parameter param2,
     Map<String, String> param3) throws TException {

   return 0;
 }

 public static void main(String[] args){
   TNonblockingServerSocket socket;
   try {
     socket = new TNonblockingServerSocket(9090);
     TNonblockingServer.Args options = new TNonblockingServer.Args(socket);
     TProcessor processor = new DemoService.Processor(new DemoServiceImpl());
     options.processor(processor);
     options.protocolFactory(new TCompactProtocol.Factory());
     TServer server = new TNonblockingServer(options);
     server.serve();            
   } catch (Exception e) {
     throw new RuntimeException(e);
   }
 }

}
赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » Thrift源码分析– 方法调用模型分析
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

关注【Java 技术驿站】公众号,每天早上 8:10 为你推送一篇技术文章

扫描二维码关注我!


关注【Java 技术驿站】公众号 回复 “VIP”,获取 VIP 地址永久关闭弹出窗口

免费获取资源

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏