TProcessor
??这层主要负责应用层也就是需要我们平常自己实现的一层,它里面封装了Handler类。一般thrift 生成的代码中我们只需要负责写Handler类的逻辑即可,Handler中的逻辑就是我们自己定义的服务逻辑。
分析 demo
Service Serv {string put(1:i32 value) ,
}
Handler
??可以看到Handler 类继承了ServIf 类,thrift 是通过c++中Base类指针可以指向Dervied类对象,从而实现通过 Processor来操作我们自己实现的Handler类。
Processor
thrift 内部文件代码:
class TProcessorEventHandler {
public:virtual ~TProcessorEventHandler() {
}/*** Called before calling other callback methods.* Expected to return some sort of context object.* The return value is passed to all other callbacks* for that function invocation.*/virtual void* getContext(const char* fn_name, void* serverContext) {
(void)fn_name;(void)serverContext;return NULL;}/*** Expected to free resources associated with a context.*/virtual void freeContext(void* ctx, const char* fn_name) {
(void)ctx;(void)fn_name;}/*** Called before reading arguments.*/virtual void preRead(void* ctx, const char* fn_name) {
(void)ctx;(void)fn_name;}/*** Called between reading arguments and calling the handler.*/virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) {
(void)ctx;(void)fn_name;(void)bytes;}/*** Called between calling the handler and writing the response.*/virtual void preWrite(void* ctx, const char* fn_name) {
(void)ctx;(void)fn_name;}/*** Called after writing the response.*/virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) {
(void)ctx;(void)fn_name;(void)bytes;}/*** Called when an async function call completes successfully.*/virtual void asyncComplete(void* ctx, const char* fn_name) {
(void)ctx;(void)fn_name;}/*** Called if the handler throws an undeclared exception.*/virtual void handlerError(void* ctx, const char* fn_name) {
(void)ctx;(void)fn_name;}protected:TProcessorEventHandler() {
}
};
class TProcessor {
public:virtual ~TProcessor() {
}virtual bool process(boost::shared_ptr<protocol::TProtocol> in,boost::shared_ptr<protocol::TProtocol> out,void* connectionContext) = 0;bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> io, void* connectionContext) {
return process(io, io, connectionContext);}boost::shared_ptr<TProcessorEventHandler> getEventHandler() {
return eventHandler_; }void setEventHandler(boost::shared_ptr<TProcessorEventHandler> eventHandler) {
eventHandler_ = eventHandler;}protected:TProcessor() {
}boost::shared_ptr<TProcessorEventHandler> eventHandler_;
};class TDispatchProcessor : public TProcessor {
public:virtual bool process(boost::shared_ptr<protocol::TProtocol> in,boost::shared_ptr<protocol::TProtocol> out,void* connectionContext) {
std::string fname;protocol::TMessageType mtype;int32_t seqid;in->readMessageBegin(fname, mtype, seqid);if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) {
GlobalOutput.printf("received invalid message type %d from client", mtype);return false;}return dispatchCall(in.get(), out.get(), fname, seqid, connectionContext);}
protected:virtual bool dispatchCall(apache::thrift::protocol::TProtocol* in,apache::thrift::protocol::TProtocol* out,const std::string& fname,int32_t seqid,void* callContext) = 0;
};业务生成代码:class ServProcessor : public ::apache::thrift::TDispatchProcessor {
protected:boost::shared_ptr<ServIf> iface_;virtual bool dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext);private:typedef void (ServProcessor::*ProcessFunction)(int32_t,::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*);typedef std::map<std::string, ProcessFunction> ProcessMap;ProcessMap processMap_;void process_put(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);public:ServProcessor(boost::shared_ptr<ServIf> iface) :iface_(iface) {
processMap_["put"] = &ServProcessor::process_put; //这个put是我们Service中的函数名}virtual ~ServProcessor() {
}
};bool ServProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot,
::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid,
void* callContext) {
ProcessMap::iterator pfn;pfn = processMap_.find(fname);if (pfn == processMap_.end()) {
iprot->skip(::apache::thrift::protocol::T_STRUCT);iprot->readMessageEnd();iprot->getTransport()->readEnd();::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::UNKNOWN_METHOD, "Invalid method name: '"+fname+"'");oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);x.write(oprot);oprot->writeMessageEnd();oprot->getTransport()->writeEnd();oprot->getTransport()->flush();return true;}(this->*(pfn->second))(seqid, iprot, oprot, callContext); //Processor::process_put return true;
}
void ServProcessor::process_put(int32_t seqid, ::apache::thrift::protocol::TProtocol*iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext)
{
void* ctx = NULL;if (this->eventHandler_.get() != NULL) {
ctx = this->eventHandler_->getContext("Serv.put", callContext);}::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "Serv.put");if (this->eventHandler_.get() != NULL) {
this->eventHandler_->preRead(ctx, "Serv.put");}Serv_put_args args;args.read(iprot);iprot->readMessageEnd();uint32_t bytes = iprot->getTransport()->readEnd();if (this->eventHandler_.get() != NULL) {
this->eventHandler_->postRead(ctx, "Serv.put", bytes);}Serv_put_result result;try {
iface_->put(result.success, args.value); //这里调用了我们自己设置的逻辑函数result.__isset.success = true;} catch (const std::exception& e) {
if (this->eventHandler_.get() != NULL) {
this->eventHandler_->handlerError(ctx, "Serv.put");}::apache::thrift::TApplicationException x(e.what());oprot->writeMessageBegin("put", ::apache::thrift::protocol::T_EXCEPTION, seqid);x.write(oprot);oprot->writeMessageEnd();oprot->getTransport()->writeEnd();oprot->getTransport()->flush();return;}if (this->eventHandler_.get() != NULL) {
this->eventHandler_->preWrite(ctx, "Serv.put");}oprot->writeMessageBegin("put", ::apache::thrift::protocol::T_REPLY, seqid);result.write(oprot); //返回结果oprot->writeMessageEnd();bytes = oprot->getTransport()->writeEnd();oprot->getTransport()->flush();if (this->eventHandler_.get() != NULL) {
this->eventHandler_->postWrite(ctx, "Serv.put", bytes);}
}
调用逻辑流
TDispatchProcessor::processor -> ServProcessor::dispatchCall -> ServProcessor::process_put
--> Handler::put
??从上面的源码可以看出ServProcessor::ProcessMap 主要用来查找方法的,当client发起相应RPC时,RPC会把相应函数的名字传过去,Server收到函数后就在ProcessMap查找是否有相应函数名的函数,有使用相应函数指针去调用函数,随后会调用相应的业务函数去进行业务处理然后把结果返回回去。