使用Thrift RPC编写程序

1. 概述

本文以C++语言为例介绍了thrift RPC的使用方法,包括对象序列化和反序列化,数据传输和信息交换等。

本文采用了一个示例进行说明,该示例主要完成传输(上报日志或者报表)功能,该示例会贯穿本文,内容涉及thrift定义,代码生成,thrift类说明,client编写方法,server编写方法等。

关于Thrift架构分析,可参考:Thrift架构介绍

关于Thrift文件编写方法,可参考:Thrift使用指南。

关于Thrift内部实现原理,可参考:浅谈Thrift内部实现原理

2. 示例描述

假设我们要使用thrift RPC完成一个数据传输任务,数据格式和PRC接口用一个thrift文件描述,具体如下:

(1) book.thrift,用于描述书籍信息的thrift接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//book.thrift,
namespace cpp example
struct Book_Info {
1: i32 book_id,
2: string book_name,
3: string book_author,
4: double book_price,
5: string book_publisher,
}

(2) rpc.thrift,client向server传输数据(上报日志或者报表)的RPC接口

1
2
3
4
5
6
7
8
9
10
11
12
13
//rpc.thrift
namespace cpp example
include "book.thrift"
service BookServlet {
bool Sender(1: list<book.Book_Info> books);
oneway void Sender2(1: list<book.Book_Info> books);
}

说明:该thrift文件定义了一个service,它包含两个接口,server端需要实现这两个接口以对client提供服务。其中,第一个接口函数是阻塞式的,即要等待server返回值以后才能继续,另外一个声明为oneway类型(返回值为void),表明该函数是非阻塞式的,将数据发给server后不必等待返回结果,但使用该函数时,需要考虑server的承受能力,适度的调整发送频率。

3. Thrift文件与生成的代码对应关系

每个thrift文件会产生四个文件,分别为:${thrift_name}_constants.h,${thrift_name}_constants.cpp,${thrift_name}_types.h,${thrift_name}_types.cpp

对于含有service的thrift文件,会额外生成两个文件,分别为:${service_name}.h,${service_name}.cpp

对于含有service的thrift文件,会生成一个可用的server桩:${service_name}._server.skeleton.cpp

对于本文中的例子,会产生以下文件:

book_constants.h book_constants.cpp

book_types.h book_types.cpp

rpc_constants.h rpc_constants.cpp

rpc_types.h rpc_types.cpp

BookServlet.h BookServlet.cpp

BookServlet_server.skeleton.cpp

4. Thrift类介绍

Thrift代码包(位于thrift-0.6.1/lib/cpp/src)有以下几个目录:

concurrency:并发和时钟管理方面的库

processor:Processor相关类

protocal:Protocal相关类

transport:transport相关类

server:server相关类

4.1 Transport类(how is transmitted?)

负责数据传输,有以下几个可用类:

TFileTransport:文件(日志)传输类,允许client将文件传给server,允许server将收到的数据写到文件中。

THttpTransport:采用Http传输协议进行数据传输

TSocket:采用TCP Socket进行数据传输

TZlibTransport:压缩后对数据进行传输,或者将收到的数据解压

下面几个类主要是对上面几个类地装饰(采用了装饰模式),以提高传输效率。

TBufferedTransport:对某个Transport对象操作的数据进行buffer,即从buffer中读取数据进行传输,或者将数据直接写入buffer

TFramedTransport:同TBufferedTransport类似,也会对相关数据进行buffer,同时,它支持定长数据发送和接收。

TMemoryBuffer:从一个缓冲区中读写数据

4.2 Protocol类(what is transmitted?)

负责数据编码,主要有以下几个可用类:

TBinaryProtocol:二进制编码

TJSONProtocol:JSON编码

TCompactProtocol:密集二进制编码

TDebugProtocol:以用户易读的方式组织数据

4.3 Server类(providing service for clients)

TSimpleServer:简单的单线程服务器,主要用于测试

TThreadPoolServer:使用标准阻塞式IO的多线程服务器

TNonblockingServer:使用非阻塞式IO的多线程服务器,TFramedTransport必须使用该类型的server

5. 对象序列化和反序列化

Thrift中的Protocol负责对数据进行编码,因而可使用Protocol相关对象进行序列化和反序列化。

由于对象序列化和反序列化不设计传输相关的问题,所以,可使用TBinaryProtocol和TMemoryBuffer,具体如下:

(1) 使用thrift进行对象序列化

//对对象object进行序列化,保存到str中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
template <typename Type>
void Object2String(Type& object, string &str) {
  shared_ptr<TMemoryBuffer> membuffer(new TMemoryBuffer());
  shared_ptr<TProtocol> protocol(new TBinaryProtocol(membuffer));
  object.write(protocol.get());
  str.clear();
  str = membuffer.getBufferAsString();
}

(2)使用thrift进行对象反序列化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//对str中保存的对象进行反序列化,保存到object中
template <typename Type>
void String2Object(string& buffer, Type &object) {
  shared_ptr<TMemoryBuffer> membuffer(new TMemoryBuffer(
  reinterpret_cast<uint*>(buffer.data())));
  shared_ptr<TProtocol> protocol(new TBinaryProtocol(membuffer));
  object.read(protocol.get());
}

6. 编写client和server

6.1 client端代码编写

Client编写的方法分为以下几个步骤:

(1) 定义TTransport,为你的client设置传输方式(如socket, http等)。

(2) 定义Protocal,使用装饰模式(Decorator设计模式)封装TTransport,为你的数据设置编码格式(如二进制格式,JSON格式等)

(3) 实例化client对象,调用服务接口。

说明:如果用户在thrift文件中定义了一个叫${server_name}的service,则会生成一个叫${server_name}Client的对象,比如,我给出的例子中,thrift会自动生成一个叫BookServletClient的类,Client端的代码编写如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include " gen-cpp/BookServlet.h" //一定要包含该头文件
//其头文件,其他using namespace …….
int main(int argc, char** argv) {
  shared_ptr<TTransport> socket(new TSocket("localhost", 9090));
  shared_ptr<TTransport> transport(new TBufferedTransport(socket));
  shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
  example::BookServletClient client(protocol);
try {
  transport->open();
  vector<example::Book_Info> books;
  …...
  client.Sender(books);//RPC函数,调用serve端的该函数
  transport->close();
} catch (TException &tx) {
  printf("ERROR: %s\n", tx.what());
}
}

6.2 Server端代码编写

(1) 定义一个TProcess,这个是thrift根据用户定义的thrift文件自动生成的类

(2) 使用TServerTransport获得一个TTransport

(3) 使用TTransportFactory,可选地将原始传输转换为一个适合的应用传输(典型的是使用TBufferedTransportFactory)

(4) 使用TProtocolFactory,为TTransport创建一个输入和输出

(5) 创建TServer对象(单线程,可以使用TSimpleServer;对于多线程,用户可使用TThreadPoolServer或者TNonblockingServer),调用它的server()函数。

说明:thrift会为每一个带service的thrift文件生成一个简单的server代码(桩),在例子中,thrift会生成BookServlet_server.skeleton.cpp,用户可以在这个文件基础上实现自己的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include "gen-cpp/BookServlet.h"
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <transport/TServerSocket.h>
#include <transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using boost::shared_ptr;
using namespace example;
class BookServletHandler : virtual public BookServletIf {
public:
BookServletHandler() {
// Your initialization goes here
}
//用户需实现这个接口
bool Sender(const std::vector<example::Book_Info> & books) {
  // Your implementation goes here
  printf("Sender\n");
}
//用户需实现这个接口
void Sender2(const std::vector<example::Book_Info> & books) {
  // Your implementation goes here
  printf("Sender2\n");
}
};
int main(int argc, char **argv) {
  int port = 9090;
  shared_ptr<BookServletHandler> handler(new BookServletHandler());
  shared_ptr<TProcessor> processor(new BookServletProcessor(handler));
  shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
  shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
  shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
  TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
  server.serve();
  return 0;
}

7. 总结

至此,关于thrift框架的三篇文章已经全部完成,包括:

(1) Thrift框架介绍: Thrift框架介绍

(2) Thrift文件编写方法: Thrift使用指南

(3) Thrift RPC使用方法:利用Thrift RPC编写程序

与thrift类似的开源RPC框架还有google的protocal buffer,它虽然支持的语言比较少,但效率更高,因而受到越来越多的关注。

由于thrift开源时间很早,经受了时间的验证,因而许多系统更愿意采用thrift,如Hadoop,Cassandra等。

附:thrift与protocal buffer比较

从上面的比较可以看出,thrift胜在“丰富的特性“上,而protocal buffer胜在“文档化”非常好上。在具体实现上,它们非常类似,都是使用唯一整数标记字段域,这就使得增加和删除字段与不会破坏已有的代码。

它们的最大区别是thrift支持完整的client/server RPC框架,而protocal buffer只会产生接口,具体实现,还需要用户做大量工作。

另外,从序列化性能上比较,Protocal Buffer要远远优于thrift,具体可参考:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/?ca=drs-tp4608 

8. 参考资料

(1) http://stuartsierra.com/2008/07/10/thrift-vs-protocol-buffers

(2) Thrift: Scalable Cross-Language Services Implementation. Mark Slee, Aditya Agarwal and Marc Kwiatkowski. Facebook

(3) Thrift网站:http://thrift.apache.org/

(4) Protocal Buffer网站:

http://code.google.com/intl/zh-CN/apis/protocolbuffers/docs/overview.html

原创文章,转载请注明: 转载自董的博客

本文链接地址: http://dongxicheng.org/search-engine/thrift-rpc/

THRIFT RPC的一个简单C++ DEMO

Thrift是一种开源的跨语言的RPC服务框架,最初由facebook公司开发的,在2007年facebook将其提交apache基金会开源了。对于当时的facebook来说创造thrift是为了解决facebook系统中各系统间大数据量的传输通信以及系统之间语言环境不同需要跨平台的特性。

 

Thrift代码包(位于thrift-0.6.1/lib/cpp/src)有以下几个目录:

concurrency:并发和时钟管理方面的库

processor:Processor相关类

protocal:Protocal相关类

transport:transport相关类

server:server相关类

Transport类(how is transmitted?)

负责数据传输,有以下几个可用类:

TFileTransport:文件(日志)传输类,允许client将文件传给server,允许server将收到的数据写到文件中。

THttpTransport:采用Http传输协议进行数据传输

TSocket:采用TCP Socket进行数据传输

TZlibTransport:压缩后对数据进行传输,或者将收到的数据解压

下面几个类主要是对上面几个类地装饰(采用了装饰模式),以提高传输效率。

TBufferedTransport:对某个Transport对象操作的数据进行buffer,即从buffer中读取数据进行传输,或者将数据直接写入buffer

TFramedTransport:同TBufferedTransport类似,也会对相关数据进行buffer,同时,它支持定长数据发送和接收。

TMemoryBuffer:从一个缓冲区中读写数据

Protocol类(what is transmitted?)

负责数据编码,主要有以下几个可用类:

TBinaryProtocol:二进制编码

TJSONProtocol:JSON编码

TCompactProtocol:密集二进制编码

TDebugProtocol:以用户易读的方式组织数据

Server类(providing service for clients)

TSimpleServer:简单的单线程服务器,主要用于测试

TThreadPoolServer:使用标准阻塞式IO的多线程服务器

TNonblockingServer:使用非阻塞式IO的多线程服务器,TFramedTransport必须使用该类型的server

 

首先需要定义.thrift接口文件:

复制代码
namespace cpp project
struct CompanyInfo{
1: i32 id;
2: string name;
3: string desc;
4: string country;
}

namespace cpp project
include "define.thrift"
service CompanyServlet {
bool Sender(1: list<define.CompanyInfo> companies);
oneway void Sender2(1: list<define.CompanyInfo> companies);
}
复制代码

然后使用thrift –gen cpp rpc.thrift生成代码,生成的代码里有一个服务端框架文件(CompanyServlet_server.skeleton.cpp),可以以此来构建server。

编写server端代码

复制代码
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "./gen-cpp/CompanyServlet.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/server/TNonblockingServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>

#include <cstdio>
#include <cstdlib>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using boost::shared_ptr;

using namespace  ::project;

class CompanyServletHandler : virtual public CompanyServletIf {
 public:
  CompanyServletHandler() {
    // Your initialization goes here
  }

  bool Sender(const std::vector< ::project::CompanyInfo> & companies) {
    // Your implementation goes here
    printf("Sender\n");
    for(const ::project::CompanyInfo &info : companies){
        printf("id[%d], name[%s], desc[%s], country[%s]\n", info.id, info.name.c_str(), info.desc.c_str(), info.country.c_str());
    }
    return true;
  }

  void Sender2(const std::vector< ::project::CompanyInfo> & companies) {
    // Your implementation goes here
    printf("Sender2\n");
  }

};

int main(int argc, char **argv) {
  int port = 9090;
  shared_ptr<CompanyServletHandler> handler(new CompanyServletHandler());
  shared_ptr<TProcessor> processor(new CompanyServletProcessor(handler));
  shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
  shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
  shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

  TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);

  server.serve();
  return 0;
}
复制代码

这里只实现了简单的打印。

编写client代码

复制代码
#include "./gen-cpp/CompanyServlet.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>

#include <cstdio>
#include <cstdlib>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;

using boost::shared_ptr;

using namespace  ::project;

int main(int argc, char *argv[])
{
    int port = 9090;
    shared_ptr<TTransport> tsocket(new TSocket("localhost", port));
    shared_ptr<TTransport> transport(new TBufferedTransport(tsocket));
    shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
    project::CompanyServletClient client(protocol);

    for(;;){
        transport->open();
        CompanyInfo info;
        info.id = 700;
        info.name = "Tencent";
        info.desc = "Integrated Internet Service Provider";
        info.country = "China";

        std::vector<CompanyInfo> v;
        v.push_back(info);
        if(!client.Sender(v))
            printf("Sender failed!\n");
        transport->close();

        sleep(1);
    }

    return 0;
}
复制代码

贴上编译命令吧

g++ -W -g -Wno-unused -std=c++11 -o thrift_test_server.run thrift_test_server.cpp  ./gen-cpp/define_types.o  ./gen-cpp/rpc_types.o  ./gen-cpp/define_constants.o  ./gen-cpp/rpc_constants.o  ./gen-cpp/CompanyServlet.o -lthrift

https://www.cnblogs.com/jasonkyle/p/7009878.html