Muduo分布式通信框架
0.项目基础
构建cmakelist.txt
外层cmakelist
# 设置cmake的最低版本和项目名称
cmake_minimum_required(VERSION 3.0)
project(mprpc)
# 生成debug版本,可以进行gdb调试
set(CMAKE_BUILD_TYPE "Debug")
# 设置项目可执行文件输出的路径
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
# 设置项目库文件输出的路径
set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/lib)
# 设置项目编译头文件搜索路径 -I
include_directories(${PROJECT_SOURCE_DIR}/src/include)
include_directories(${PROJECT_SOURCE_DIR}/example)
# 设置项目库文件搜索路径 -L
link_directories(${PROJECT_SOURCE_DIR}/lib)
# src包含了mprpc框架所有的相关代码
add_subdirectory(src)
# example包含了mprpc框架使用的示例代码
add_subdirectory(example)
src源码层cmakelist
#aux_source_directory(. SRC_LIST)
set(SRC_LIST
mprpcapplication.cc
mprpcconfig.cc
rpcheader.pb.cc
rpcprovider.cc
mprpcchannel.cc
mprpccontroller.cc
logger.cc
zookeeperutil.cc)
add_library(mprpc ${SRC_LIST})
target_link_libraries(mprpc muduo_net muduo_base pthread zookeeper_mt)
1.protobuf使用
1.编写配置文件
test.proto
syntax = "proto3"; // 声明了protobuf的版本
package fixbug; // 声明了代码所在的包(对于C++来说是namespace)
// 定义下面的选项,表示生成service服务类和rpc方法描述,默认不生成
option cc_generic_services = true;
message ResultCode
{
int32 errcode = 1;
bytes errmsg = 2; //bytes就是c++的字符串方法
}
// 数据 列表 映射表
// 定义登录请求消息类型 name pwd
message LoginRequest
{
bytes name = 1;
bytes pwd = 2;
}
// 定义登录响应消息类型
message LoginResponse
{
ResultCode result = 1;
bool success = 2;
}
message GetFriendListsRequest
{
uint32 userid = 1;
}
message User
{
bytes name = 1;
uint32 age = 2;
enum Sex
{
MAN = 0;
WOMAN = 1;
}
Sex sex = 3;
}
message GetFriendListsResponse
{
ResultCode result = 1;
repeated User friend_list = 2; // 定义了一个列表类型
}
// 在protobuf里面怎么定义描述rpc方法的类型 - service
service UserServiceRpc
{
rpc Login(LoginRequest) returns(LoginResponse);
rpc GetFriendLists(GetFriendListsRequest) returns(GetFriendListsResponse);
}
2.编译配置文件-生成Cpp文件
protoc test.proto --cpp_out=./
3.运行时需要包含生成的文件以及链接动态库 -lprotobuf
main.cpp
#include "test.pb.h"
#include <iostream>
#include <string>
using namespace fixbug;
int main1()
{
// 封装了login请求对象的数据
LoginRequest req;
req.set_name("zhang san");
req.set_pwd("123456");
// 对象数据序列化 =》 char*
std::string send_str;
if (req.SerializeToString(&send_str))
{
std::cout << send_str.c_str() << std::endl;
}
// 从send_str反序列化一个login请求对象
LoginRequest reqB;
if (reqB.ParseFromString(send_str))
{
std::cout << reqB.name() << std::endl;
std::cout << reqB.pwd() << std::endl;
}
return 0;
}
int main()
{
// LoginResponse rsp;
// ResultCode *rc = rsp.mutable_result();
// rc->set_errcode(1);
// rc->set_errmsg("登录处理失败了");
GetFriendListsResponse rsp;
ResultCode *rc = rsp.mutable_result();
rc->set_errcode(0);
User *user1 = rsp.add_friend_list();
user1->set_name("zhang san");
user1->set_age(20);
user1->set_sex(User::MAN);
User *user2 = rsp.add_friend_list();
user2->set_name("li si");
user2->set_age(22);
user2->set_sex(User::MAN);
std::cout << rsp.friend_list_size() << std::endl;
return 0;
}
2.Example - callee
CMakeLists.txt
# set(SRC_LIST userservice.cc ../user.pb.cc)
set(SRC_LIST friendservice.cc ../friend.pb.cc)
add_executable(provider ${SRC_LIST})
target_link_libraries(provider mprpc protobuf)
userservice.cc
#include <iostream>
#include <string>
#include "user.pb.h"
#include "mprpcapplication.h"
#include "rpcprovider.h"
/*
UserService原来是一个本地服务,提供了两个进程内的本地方法,Login和GetFriendLists
*/
class UserService : public fixbug::UserServiceRpc // 使用在rpc服务发布端(rpc服务提供者)
{
public:
bool Login(std::string name, std::string pwd)
{
std::cout << "doing local service: Login" << std::endl;
std::cout << "name:" << name << " pwd:" << pwd << std::endl;
return false;
}
bool Register(uint32_t id, std::string name, std::string pwd)
{
std::cout << "doing local service: Register" << std::endl;
std::cout << "id:" << id << "name:" << name << " pwd:" << pwd << std::endl;
return true;
}
/*
重写基类UserServiceRpc的虚函数 下面这些方法都是框架直接调用的
1. caller ===> Login(LoginRequest) => muduo => callee
2. callee ===> Login(LoginRequest) => 交到下面重写的这个Login方法上了
*/
void Login(::google::protobuf::RpcController* controller,
const ::fixbug::LoginRequest* request,
::fixbug::LoginResponse* response,
::google::protobuf::Closure* done)
{
// 框架给业务上报了请求参数LoginRequest,应用获取相应数据做本地业务
std::string name = request->name();
std::string pwd = request->pwd();
// 做本地业务
bool login_result = Login(name, pwd);
// 把响应写入 包括错误码、错误消息、返回值
fixbug::ResultCode *code = response->mutable_result();
code->set_errcode(0);
code->set_errmsg("");
response->set_sucess(login_result);
// 执行回调操作 执行响应对象数据的序列化和网络发送(都是由框架来完成的)
done->Run();
}
void Register(::google::protobuf::RpcController* controller,
const ::fixbug::RegisterRequest* request,
::fixbug::RegisterResponse* response,
::google::protobuf::Closure* done)
{
uint32_t id = request->id();
std::string name = request->name();
std::string pwd = request->pwd();
bool ret = Register(id, name, pwd);
response->mutable_result()->set_errcode(0);
response->mutable_result()->set_errmsg("");
response->set_sucess(ret);
done->Run();
}
};
int main(int argc, char **argv)
{
// 调用框架的初始化操作
MprpcApplication::Init(argc, argv);
// provider是一个rpc网络服务对象。把UserService对象发布到rpc节点上
RpcProvider provider;
provider.NotifyService(new UserService());
// 启动一个rpc服务发布节点 Run以后,进程进入阻塞状态,等待远程的rpc调用请求
provider.Run();
return 0;
}
friendservice.cc
#include <iostream>
#include <string>
#include "friend.pb.h"
#include "mprpcapplication.h"
#include "rpcprovider.h"
#include <vector>
#include "logger.h"
class FriendService : public fixbug::FiendServiceRpc
{
public:
std::vector<std::string> GetFriendsList(uint32_t userid)
{
std::cout << "do GetFriendsList service! userid:" << userid << std::endl;
std::vector<std::string> vec;
vec.push_back("gao yang");
vec.push_back("liu hong");
vec.push_back("wang shuo");
return vec;
}
// 重写基类方法
void GetFriendsList(::google::protobuf::RpcController* controller,
const ::fixbug::GetFriendsListRequest* request,
::fixbug::GetFriendsListResponse* response,
::google::protobuf::Closure* done)
{
uint32_t userid = request->userid();
std::vector<std::string> friendsList = GetFriendsList(userid);
response->mutable_result()->set_errcode(0);
response->mutable_result()->set_errmsg("");
for (std::string &name : friendsList)
{
std::string *p = response->add_friends();
*p = name;
}
done->Run();
}
};
int main(int argc, char **argv)
{
LOG_ERR("ddddd");
LOG_INFO("ddddd");
// 调用框架的初始化操作
MprpcApplication::Init(argc, argv);
// provider是一个rpc网络服务对象。把UserService对象发布到rpc节点上
RpcProvider provider;
provider.NotifyService(new FriendService());
// 启动一个rpc服务发布节点 Run以后,进程进入阻塞状态,等待远程的rpc调用请求
provider.Run();
return 0;
}
3.Example - caller
CMakeLists.txt
# set(SRC_LIST calluserservice.cc ../user.pb.cc)
set(SRC_LIST callfriendservice.cc ../friend.pb.cc)
add_executable(consumer ${SRC_LIST})
target_link_libraries(consumer mprpc protobuf)
calluserservice.cc
#include <iostream>
#include "mprpcapplication.h"
#include "user.pb.h"
#include "mprpcchannel.h"
int main(int argc, char **argv)
{
// 整个程序启动以后,想使用mprpc框架来享受rpc服务调用,一定需要先调用框架的初始化函数(只初始化一次)
MprpcApplication::Init(argc, argv);
// 演示调用远程发布的rpc方法Login
fixbug::UserServiceRpc_Stub stub(new MprpcChannel());
// rpc方法的请求参数
fixbug::LoginRequest request;
request.set_name("zhang san");
request.set_pwd("123456");
// rpc方法的响应
fixbug::LoginResponse response;
// 发起rpc方法的调用 同步的rpc调用过程 MprpcChannel::callmethod
stub.Login(nullptr, &request, &response, nullptr); // RpcChannel->RpcChannel::callMethod 集中来做所有rpc方法调用的参数序列化和网络发送
// 一次rpc调用完成,读调用的结果
if (0 == response.result().errcode())
{
std::cout << "rpc login response success:" << response.sucess() << std::endl;
}
else
{
std::cout << "rpc login response error : " << response.result().errmsg() << std::endl;
}
// 演示调用远程发布的rpc方法Register
fixbug::RegisterRequest req;
req.set_id(2000);
req.set_name("mprpc");
req.set_pwd("666666");
fixbug::RegisterResponse rsp;
// 以同步的方式发起rpc调用请求,等待返回结果
stub.Register(nullptr, &req, &rsp, nullptr);
// 一次rpc调用完成,读调用的结果
if (0 == rsp.result().errcode())
{
std::cout << "rpc register response success:" << rsp.sucess() << std::endl;
}
else
{
std::cout << "rpc register response error : " << rsp.result().errmsg() << std::endl;
}
return 0;
}
callfriendservice.cc
#include <iostream>
#include "mprpcapplication.h"
#include "friend.pb.h"
int main(int argc, char **argv)
{
// 整个程序启动以后,想使用mprpc框架来享受rpc服务调用,一定需要先调用框架的初始化函数(只初始化一次)
MprpcApplication::Init(argc, argv);
// 演示调用远程发布的rpc方法Login
fixbug::FiendServiceRpc_Stub stub(new MprpcChannel());
// rpc方法的请求参数
fixbug::GetFriendsListRequest request;
request.set_userid(1000);
// rpc方法的响应
fixbug::GetFriendsListResponse response;
// 发起rpc方法的调用 同步的rpc调用过程 MprpcChannel::callmethod
MprpcController controller;
stub.GetFriendsList(&controller, &request, &response, nullptr); // RpcChannel->RpcChannel::callMethod 集中来做所有rpc方法调用的参数序列化和网络发送
// 一次rpc调用完成,读调用的结果
if (controller.Failed())
{
std::cout << controller.ErrorText() << std::endl;
}
else
{
if (0 == response.result().errcode())
{
std::cout << "rpc GetFriendsList response success!" << std::endl;
int size = response.friends_size();
for (int i=0; i < size; ++i)
{
std::cout << "index:" << (i+1) << " name:" << response.friends(i) << std::endl;
}
}
else
{
std::cout << "rpc GetFriendsList response error : " << response.result().errmsg() << std::endl;
}
}
return 0;
}
4.Example - proto
user.proto
syntax = "proto3";
package fixbug;
option cc_generic_services = true;
message ResultCode
{
int32 errcode = 1;
bytes errmsg = 2;
}
message LoginRequest
{
bytes name = 1;
bytes pwd = 2;
}
message LoginResponse
{
ResultCode result = 1;
bool sucess = 2;
}
message RegisterRequest
{
uint32 id = 1;
bytes name = 2;
bytes pwd = 3;
}
message RegisterResponse
{
ResultCode result = 1;
bool sucess = 2;
}
service UserServiceRpc
{
rpc Login(LoginRequest) returns(LoginResponse);
rpc Register(RegisterRequest) returns(RegisterResponse);
}
friend.proto
syntax = "proto3";
package fixbug;
option cc_generic_services = true;
message ResultCode
{
int32 errcode = 1;
bytes errmsg = 2;
}
message GetFriendsListRequest
{
uint32 userid = 1;
}
message GetFriendsListResponse
{
ResultCode result = 1;
repeated bytes friends = 2;
}
// 好友模块
service FiendServiceRpc
{
rpc GetFriendsList(GetFriendsListRequest) returns(GetFriendsListResponse);
}
5.mprpcapplication
mprpcapplication.h
#pragma once
#include "mprpcconfig.h"
#include "mprpcchannel.h"
#include "mprpccontroller.h"
// mprpc框架的基础类,负责框架的一些初始化操作
class MprpcApplication
{
public:
static void Init(int argc, char **argv);
static MprpcApplication& GetInstance();
static MprpcConfig& GetConfig();
private:
static MprpcConfig m_config;
MprpcApplication(){}
MprpcApplication(const MprpcApplication&) = delete;
MprpcApplication(MprpcApplication&&) = delete;
};
mprpcapplication.cc
#include "mprpcapplication.h"
#include <iostream>
#include <unistd.h>
#include <string>
MprpcConfig MprpcApplication::m_config;
void ShowArgsHelp()
{
std::cout<<"format: command -i <configfile>" << std::endl;
}
void MprpcApplication::Init(int argc, char **argv)
{
if (argc < 2)
{
ShowArgsHelp();
exit(EXIT_FAILURE);
}
int c = 0;
std::string config_file;
while((c = getopt(argc, argv, "i:")) != -1)
{
switch (c)
{
case 'i':
config_file = optarg;
break;
case '?':
ShowArgsHelp();
exit(EXIT_FAILURE);
case ':':
ShowArgsHelp();
exit(EXIT_FAILURE);
default:
break;
}
}
// 开始加载配置文件了 rpcserver_ip= rpcserver_port zookeeper_ip= zookepper_port=
m_config.LoadConfigFile(config_file.c_str());
// std::cout << "rpcserverip:" << m_config.Load("rpcserverip") << std::endl;
// std::cout << "rpcserverport:" << m_config.Load("rpcserverport") << std::endl;
// std::cout << "zookeeperip:" << m_config.Load("zookeeperip") << std::endl;
// std::cout << "zookeeperport:" << m_config.Load("zookeeperport") << std::endl;
}
MprpcApplication& MprpcApplication::GetInstance()
{
static MprpcApplication app;
return app;
}
MprpcConfig& MprpcApplication::GetConfig()
{
return m_config;
}
mprpcconfig.h
#pragma once
#include <unordered_map>
#include <string>
// rpcserverip rpcserverport zookeeperip zookeeperport
// 框架读取配置文件类
class MprpcConfig
{
public:
// 负责解析加载配置文件
void LoadConfigFile(const char *config_file);
// 查询配置项信息
std::string Load(const std::string &key);
private:
std::unordered_map<std::string, std::string> m_configMap;
// 去掉字符串前后的空格
void Trim(std::string &src_buf);
};
mprpcconfig.cc
#include "mprpcconfig.h"
#include <iostream>
#include <string>
// 负责解析加载配置文件
void MprpcConfig::LoadConfigFile(const char *config_file)
{
FILE *pf = fopen(config_file, "r");
if (nullptr == pf)
{
std::cout << config_file << " is note exist!" << std::endl;
exit(EXIT_FAILURE);
}
// 1.注释 2.正确的配置项 = 3.去掉开头的多余的空格
while(!feof(pf))
{
char buf[512] = {0};
fgets(buf, 512, pf);
// 去掉字符串前面多余的空格
std::string read_buf(buf);
Trim(read_buf);
// 判断#的注释
if (read_buf[0] == '#' || read_buf.empty())
{
continue;
}
// 解析配置项
int idx = read_buf.find('=');
if (idx == -1)
{
// 配置项不合法
continue;
}
std::string key;
std::string value;
key = read_buf.substr(0, idx);
Trim(key);
// rpcserverip=127.0.0.1\n
int endidx = read_buf.find('\n', idx);
value = read_buf.substr(idx+1, endidx-idx-1);
Trim(value);
m_configMap.insert({key, value});
}
fclose(pf);
}
// 查询配置项信息
std::string MprpcConfig::Load(const std::string &key)
{
auto it = m_configMap.find(key);
if (it == m_configMap.end())
{
return "";
}
return it->second;
}
// 去掉字符串前后的空格
void MprpcConfig::Trim(std::string &src_buf)
{
int idx = src_buf.find_first_not_of(' ');
if (idx != -1)
{
// 说明字符串前面有空格
src_buf = src_buf.substr(idx, src_buf.size()-idx);
}
// 去掉字符串后面多余的空格
idx = src_buf.find_last_not_of(' ');
if (idx != -1)
{
// 说明字符串后面有空格
src_buf = src_buf.substr(0, idx+1);
}
}
6.rpcprovider
rpcprovider.h
#pragma once
#include "google/protobuf/service.h"
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/TcpConnection.h>
#include <string>
#include <functional>
#include <google/protobuf/descriptor.h>
#include <unordered_map>
// 框架提供的专门发布rpc服务的网络对象类
class RpcProvider
{
public:
// 这里是框架提供给外部使用的,可以发布rpc方法的函数接口
void NotifyService(google::protobuf::Service *service);
// 启动rpc服务节点,开始提供rpc远程网络调用服务
void Run();
private:
// 组合EventLoop
muduo::net::EventLoop m_eventLoop;
// service服务类型信息
struct ServiceInfo
{
google::protobuf::Service *m_service; // 保存服务对象
std::unordered_map<std::string, const google::protobuf::MethodDescriptor*> m_methodMap; // 保存服务方法
};
// 存储注册成功的服务对象和其服务方法的所有信息
std::unordered_map<std::string, ServiceInfo> m_serviceMap;
// 新的socket连接回调
void OnConnection(const muduo::net::TcpConnectionPtr&);
// 已建立连接用户的读写事件回调
void OnMessage(const muduo::net::TcpConnectionPtr&, muduo::net::Buffer*, muduo::Timestamp);
// Closure的回调操作,用于序列化rpc的响应和网络发送
void SendRpcResponse(const muduo::net::TcpConnectionPtr&, google::protobuf::Message*);
};
rpcprovider.cc
#include "rpcprovider.h"
#include "mprpcapplication.h"
#include "rpcheader.pb.h"
#include "logger.h"
#include "zookeeperutil.h"
/*
service_name => service描述
=》 service* 记录服务对象
method_name => method方法对象
json protobuf
*/
// 这里是框架提供给外部使用的,可以发布rpc方法的函数接口
void RpcProvider::NotifyService(google::protobuf::Service *service)
{
ServiceInfo service_info;
// 获取了服务对象的描述信息
const google::protobuf::ServiceDescriptor *pserviceDesc = service->GetDescriptor();
// 获取服务的名字
std::string service_name = pserviceDesc->name();
// 获取服务对象service的方法的数量
int methodCnt = pserviceDesc->method_count();
// std::cout << "service_name:" << service_name << std::endl;
LOG_INFO("service_name:%s", service_name.c_str());
for (int i=0; i < methodCnt; ++i)
{
// 获取了服务对象指定下标的服务方法的描述(抽象描述) UserService Login
const google::protobuf::MethodDescriptor* pmethodDesc = pserviceDesc->method(i);
std::string method_name = pmethodDesc->name();
service_info.m_methodMap.insert({method_name, pmethodDesc});
LOG_INFO("method_name:%s", method_name.c_str());
}
service_info.m_service = service;
m_serviceMap.insert({service_name, service_info});
}
// 启动rpc服务节点,开始提供rpc远程网络调用服务
void RpcProvider::Run()
{
// 读取配置文件rpcserver的信息
std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
muduo::net::InetAddress address(ip, port);
// 创建TcpServer对象
muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider");
// 绑定连接回调和消息读写回调方法 分离了网络代码和业务代码
server.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));
server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
// 设置muduo库的线程数量
server.setThreadNum(4);
// 把当前rpc节点上要发布的服务全部注册到zk上面,让rpc client可以从zk上发现服务
// session timeout 30s zkclient 网络I/O线程 1/3 * timeout 时间发送ping消息
ZkClient zkCli;
zkCli.Start();
// service_name为永久性节点 method_name为临时性节点
for (auto &sp : m_serviceMap)
{
// /service_name /UserServiceRpc
std::string service_path = "/" + sp.first;
zkCli.Create(service_path.c_str(), nullptr, 0);
for (auto &mp : sp.second.m_methodMap)
{
// /service_name/method_name /UserServiceRpc/Login 存储当前这个rpc服务节点主机的ip和port
std::string method_path = service_path + "/" + mp.first;
char method_path_data[128] = {0};
sprintf(method_path_data, "%s:%d", ip.c_str(), port);
// ZOO_EPHEMERAL表示znode是一个临时性节点
zkCli.Create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
}
}
// rpc服务端准备启动,打印信息
std::cout << "RpcProvider start service at ip:" << ip << " port:" << port << std::endl;
// 启动网络服务
server.start();
m_eventLoop.loop();
}
// 新的socket连接回调
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn)
{
if (!conn->connected())
{
// 和rpc client的连接断开了
conn->shutdown();
}
}
/*
在框架内部,RpcProvider和RpcConsumer协商好之间通信用的protobuf数据类型
service_name method_name args 定义proto的message类型,进行数据头的序列化和反序列化
service_name method_name args_size
16UserServiceLoginzhang san123456
header_size(4个字节) + header_str + args_str
10 "10"
10000 "1000000"
std::string insert和copy方法
*/
// 已建立连接用户的读写事件回调 如果远程有一个rpc服务的调用请求,那么OnMessage方法就会响应
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buffer,
muduo::Timestamp)
{
// 网络上接收的远程rpc调用请求的字符流 Login args
std::string recv_buf = buffer->retrieveAllAsString();
// 从字符流中读取前4个字节的内容
uint32_t header_size = 0;
recv_buf.copy((char*)&header_size, 4, 0);
// 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息
std::string rpc_header_str = recv_buf.substr(4, header_size);
mprpc::RpcHeader rpcHeader;
std::string service_name;
std::string method_name;
uint32_t args_size;
if (rpcHeader.ParseFromString(rpc_header_str))
{
// 数据头反序列化成功
service_name = rpcHeader.service_name();
method_name = rpcHeader.method_name();
args_size = rpcHeader.args_size();
}
else
{
// 数据头反序列化失败
std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl;
return;
}
// 获取rpc方法参数的字符流数据
std::string args_str = recv_buf.substr(4 + header_size, args_size);
// 打印调试信息
std::cout << "============================================" << std::endl;
std::cout << "header_size: " << header_size << std::endl;
std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
std::cout << "service_name: " << service_name << std::endl;
std::cout << "method_name: " << method_name << std::endl;
std::cout << "args_str: " << args_str << std::endl;
std::cout << "============================================" << std::endl;
// 获取service对象和method对象
auto it = m_serviceMap.find(service_name);
if (it == m_serviceMap.end())
{
std::cout << service_name << " is not exist!" << std::endl;
return;
}
auto mit = it->second.m_methodMap.find(method_name);
if (mit == it->second.m_methodMap.end())
{
std::cout << service_name << ":" << method_name << " is not exist!" << std::endl;
return;
}
google::protobuf::Service *service = it->second.m_service; // 获取service对象 new UserService
const google::protobuf::MethodDescriptor *method = mit->second; // 获取method对象 Login
// 生成rpc方法调用的请求request和响应response参数
google::protobuf::Message *request = service->GetRequestPrototype(method).New();
if (!request->ParseFromString(args_str))
{
std::cout << "request parse error, content:" << args_str << std::endl;
return;
}
google::protobuf::Message *response = service->GetResponsePrototype(method).New();
// 给下面的method方法的调用,绑定一个Closure的回调函数
google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,
const muduo::net::TcpConnectionPtr&,
google::protobuf::Message*>
(this,
&RpcProvider::SendRpcResponse,
conn, response);
// 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
// new UserService().Login(controller, request, response, done)
service->CallMethod(method, nullptr, request, response, done);
}
// Closure的回调操作,用于序列化rpc的响应和网络发送
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message *response)
{
std::string response_str;
if (response->SerializeToString(&response_str)) // response进行序列化
{
// 序列化成功后,通过网络把rpc方法执行的结果发送会rpc的调用方
conn->send(response_str);
}
else
{
std::cout << "serialize response_str error!" << std::endl;
}
conn->shutdown(); // 模拟http的短链接服务,由rpcprovider主动断开连接
}
6.5 MprpcChannel
MprpcChannel.h
#pragma once
#include <google/protobuf/service.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/message.h>
class MprpcChannel : public google::protobuf::RpcChannel
{
public:
// 所有通过stub代理对象调用的rpc方法,都走到这里了,统一做rpc方法调用的数据数据序列化和网络发送
void CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf:: Closure* done);
};
MprpcChannel.cc
#include "mprpcchannel.h"
#include <string>
#include "rpcheader.pb.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <errno.h>
#include "mprpcapplication.h"
#include "mprpccontroller.h"
#include "zookeeperutil.h"
/*
header_size + service_name method_name args_size + args
*/
// 所有通过stub代理对象调用的rpc方法,都走到这里了,统一做rpc方法调用的数据数据序列化和网络发送
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf:: Closure* done)
{
const google::protobuf::ServiceDescriptor* sd = method->service();
std::string service_name = sd->name(); // service_name
std::string method_name = method->name(); // method_name
// 获取参数的序列化字符串长度 args_size
uint32_t args_size = 0;
std::string args_str;
if (request->SerializeToString(&args_str))
{
args_size = args_str.size();
}
else
{
controller->SetFailed("serialize request error!");
return;
}
// 定义rpc的请求header
mprpc::RpcHeader rpcHeader;
rpcHeader.set_service_name(service_name);
rpcHeader.set_method_name(method_name);
rpcHeader.set_args_size(args_size);
uint32_t header_size = 0;
std::string rpc_header_str;
if (rpcHeader.SerializeToString(&rpc_header_str))
{
header_size = rpc_header_str.size();
}
else
{
controller->SetFailed("serialize rpc header error!");
return;
}
// 组织待发送的rpc请求的字符串
std::string send_rpc_str;
send_rpc_str.insert(0, std::string((char*)&header_size, 4)); // header_size
send_rpc_str += rpc_header_str; // rpcheader
send_rpc_str += args_str; // args
// 打印调试信息
std::cout << "============================================" << std::endl;
std::cout << "header_size: " << header_size << std::endl;
std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
std::cout << "service_name: " << service_name << std::endl;
std::cout << "method_name: " << method_name << std::endl;
std::cout << "args_str: " << args_str << std::endl;
std::cout << "============================================" << std::endl;
// 使用tcp编程,完成rpc方法的远程调用
int clientfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == clientfd)
{
char errtxt[512] = {0};
sprintf(errtxt, "create socket error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}
// 读取配置文件rpcserver的信息
// std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
// uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
// rpc调用方想调用service_name的method_name服务,需要查询zk上该服务所在的host信息
ZkClient zkCli;
zkCli.Start();
// /UserServiceRpc/Login
std::string method_path = "/" + service_name + "/" + method_name;
// 127.0.0.1:8000
std::string host_data = zkCli.GetData(method_path.c_str());
if (host_data == "")
{
controller->SetFailed(method_path + " is not exist!");
return;
}
int idx = host_data.find(":");
if (idx == -1)
{
controller->SetFailed(method_path + " address is invalid!");
return;
}
std::string ip = host_data.substr(0, idx);
uint16_t port = atoi(host_data.substr(idx+1, host_data.size()-idx).c_str());
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
// 连接rpc服务节点
if (-1 == connect(clientfd, (struct sockaddr*)&server_addr, sizeof(server_addr)))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "connect error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}
// 发送rpc请求
if (-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "send error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}
// 接收rpc请求的响应值
char recv_buf[1024] = {0};
int recv_size = 0;
if (-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "recv error! errno:%d", errno);
controller->SetFailed(errtxt);
return;
}
// 反序列化rpc调用的响应数据
// std::string response_str(recv_buf, 0, recv_size); // bug出现问题,recv_buf中遇到\0后面的数据就存不下来了,导致反序列化失败
// if (!response->ParseFromString(response_str))
if (!response->ParseFromArray(recv_buf, recv_size))
{
close(clientfd);
char errtxt[512] = {0};
sprintf(errtxt, "parse error! response_str:%s", recv_buf);
controller->SetFailed(errtxt);
return;
}
close(clientfd);
}
7.mprpccontroller
mprpccontroller.h
#pragma once
#include <google/protobuf/service.h>
#include <string>
class MprpcController : public google::protobuf::RpcController
{
public:
MprpcController();
void Reset();
bool Failed() const;
std::string ErrorText() const;
void SetFailed(const std::string& reason);
// 目前未实现具体的功能
void StartCancel();
bool IsCanceled() const;
void NotifyOnCancel(google::protobuf::Closure* callback);
private:
bool m_failed; // RPC方法执行过程中的状态
std::string m_errText; // RPC方法执行过程中的错误信息
};
mprpccontroller.cc
#include "mprpccontroller.h"
MprpcController::MprpcController()
{
m_failed = false;
m_errText = "";
}
void MprpcController::Reset()
{
m_failed = false;
m_errText = "";
}
bool MprpcController::Failed() const
{
return m_failed;
}
std::string MprpcController::ErrorText() const
{
return m_errText;
}
void MprpcController::SetFailed(const std::string& reason)
{
m_failed = true;
m_errText = reason;
}
// 目前未实现具体的功能
void MprpcController::StartCancel(){}
bool MprpcController::IsCanceled() const {return false;}
void MprpcController::NotifyOnCancel(google::protobuf::Closure* callback) {}
8.logger
lockqueue.h
#pragma once
#include <queue>
#include <thread>
#include <mutex> // pthread_mutex_t
#include <condition_variable> // pthread_condition_t
// 异步写日志的日志队列
template<typename T>
class LockQueue
{
public:
// 多个worker线程都会写日志queue
void Push(const T &data)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push(data);
m_condvariable.notify_one();
}
// 一个线程读日志queue,写日志文件
T Pop()
{
std::unique_lock<std::mutex> lock(m_mutex);
while (m_queue.empty())
{
// 日志队列为空,线程进入wait状态
m_condvariable.wait(lock);
}
T data = m_queue.front();
m_queue.pop();
return data;
}
private:
std::queue<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_condvariable;
};
logger.h
#pragma once
#include "lockqueue.h"
#include <string>
// 定义宏 LOG_INFO("xxx %d %s", 20, "xxxx")
#define LOG_INFO(logmsgformat, ...) \
do \
{ \
Logger &logger = Logger::GetInstance(); \
logger.SetLogLevel(INFO); \
char c[1024] = {0}; \
snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
logger.Log(c); \
} while(0) \
#define LOG_ERR(logmsgformat, ...) \
do \
{ \
Logger &logger = Logger::GetInstance(); \
logger.SetLogLevel(ERROR); \
char c[1024] = {0}; \
snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
logger.Log(c); \
} while(0) \
// 定义日志级别
enum LogLevel
{
INFO, // 普通信息
ERROR, // 错误信息
};
// Mprpc框架提供的日志系统
class Logger
{
public:
// 获取日志的单例
static Logger& GetInstance();
// 设置日志级别
void SetLogLevel(LogLevel level);
// 写日志
void Log(std::string msg);
private:
int m_loglevel; // 记录日志级别
LockQueue<std::string> m_lckQue; // 日志缓冲队列
Logger();
Logger(const Logger&) = delete;
Logger(Logger&&) = delete;
};
logger.cc
#include "logger.h"
#include <time.h>
#include <iostream>
// 获取日志的单例
Logger& Logger::GetInstance()
{
static Logger logger;
return logger;
}
Logger::Logger()
{
// 启动专门的写日志线程
std::thread writeLogTask([&](){
for (;;)
{
// 获取当前的日期,然后取日志信息,写入相应的日志文件当中 a+
time_t now = time(nullptr);
tm *nowtm = localtime(&now);
char file_name[128];
sprintf(file_name, "%d-%d-%d-log.txt", nowtm->tm_year+1900, nowtm->tm_mon+1, nowtm->tm_mday);
FILE *pf = fopen(file_name, "a+");
if (pf == nullptr)
{
std::cout << "logger file : " << file_name << " open error!" << std::endl;
exit(EXIT_FAILURE);
}
std::string msg = m_lckQue.Pop();
char time_buf[128] = {0};
sprintf(time_buf, "%d:%d:%d =>[%s] ",
nowtm->tm_hour,
nowtm->tm_min,
nowtm->tm_sec,
(m_loglevel == INFO ? "info" : "error"));
msg.insert(0, time_buf);
msg.append("\n");
fputs(msg.c_str(), pf);
fclose(pf);
}
});
// 设置分离线程,守护线程
writeLogTask.detach();
}
// 设置日志级别
void Logger::SetLogLevel(LogLevel level)
{
m_loglevel = level;
}
// 写日志, 把日志信息写入lockqueue缓冲区当中
void Logger::Log(std::string msg)
{
m_lckQue.Push(msg);
}
9.zookeeperutil
zookeeperutil.h
#pragma once
#include <semaphore.h>
#include <zookeeper/zookeeper.h>
#include <string>
// 封装的zk客户端类
class ZkClient
{
public:
ZkClient();
~ZkClient();
// zkclient启动连接zkserver
void Start();
// 在zkserver上根据指定的path创建znode节点
void Create(const char *path, const char *data, int datalen, int state=0);
// 根据参数指定的znode节点路径,或者znode节点的值
std::string GetData(const char *path);
private:
// zk的客户端句柄
zhandle_t *m_zhandle;
};
zookeeperutil.cc
#include "zookeeperutil.h"
#include "mprpcapplication.h"
#include <semaphore.h>
#include <iostream>
// 全局的watcher观察器 zkserver给zkclient的通知
void global_watcher(zhandle_t *zh, int type,
int state, const char *path, void *watcherCtx)
{
if (type == ZOO_SESSION_EVENT) // 回调的消息类型是和会话相关的消息类型
{
if (state == ZOO_CONNECTED_STATE) // zkclient和zkserver连接成功
{
sem_t *sem = (sem_t*)zoo_get_context(zh);
sem_post(sem);
}
}
}
ZkClient::ZkClient() : m_zhandle(nullptr)
{
}
ZkClient::~ZkClient()
{
if (m_zhandle != nullptr)
{
zookeeper_close(m_zhandle); // 关闭句柄,释放资源 MySQL_Conn
}
}
// 连接zkserver
void ZkClient::Start()
{
std::string host = MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");
std::string port = MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");
std::string connstr = host + ":" + port;
/*
zookeeper_mt:多线程版本
zookeeper的API客户端程序提供了三个线程
API调用线程
网络I/O线程 pthread_create poll
watcher回调线程 pthread_create
*/
m_zhandle = zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
if (nullptr == m_zhandle)
{
std::cout << "zookeeper_init error!" << std::endl;
exit(EXIT_FAILURE);
}
sem_t sem;
sem_init(&sem, 0, 0);
zoo_set_context(m_zhandle, &sem);
sem_wait(&sem);
std::cout << "zookeeper_init success!" << std::endl;
}
void ZkClient::Create(const char *path, const char *data, int datalen, int state)
{
char path_buffer[128];
int bufferlen = sizeof(path_buffer);
int flag;
// 先判断path表示的znode节点是否存在,如果存在,就不再重复创建了
flag = zoo_exists(m_zhandle, path, 0, nullptr);
if (ZNONODE == flag) // 表示path的znode节点不存在
{
// 创建指定path的znode节点了
flag = zoo_create(m_zhandle, path, data, datalen,
&ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen);
if (flag == ZOK)
{
std::cout << "znode create success... path:" << path << std::endl;
}
else
{
std::cout << "flag:" << flag << std::endl;
std::cout << "znode create error... path:" << path << std::endl;
exit(EXIT_FAILURE);
}
}
}
// 根据指定的path,获取znode节点的值
std::string ZkClient::GetData(const char *path)
{
char buffer[64];
int bufferlen = sizeof(buffer);
int flag = zoo_get(m_zhandle, path, 0, buffer, &bufferlen, nullptr);
if (flag != ZOK)
{
std::cout << "get znode error... path:" << path << std::endl;
return "";
}
else
{
return buffer;
}
}