集群聊天服务器项目
1.项目简介
chatserver
├── autobuild.sh
├── bin
│ ├── ChatClient
│ └── ChatServer
├── build
│ └── compile.md
├── CMakeLists.txt
├── include
│ ├── public.hpp
│ └── server
│ ├── chatserver.hpp
│ ├── chatservice.hpp
│ ├── db
│ │ └── db.h
│ ├── model
│ │ ├── friendmodel.hpp
│ │ ├── group.hpp
│ │ ├── groupmodel.hpp
│ │ ├── groupuser.hpp
│ │ ├── offlinemessagemodel.hpp
│ │ ├── user.hpp
│ │ └── usermodel.hpp
│ └── redis
│ └── redis.hpp
├── README.md
├── src
│ ├── client
│ │ ├── CMakeLists.txt
│ │ └── main.cpp
│ ├── CMakeLists.txt
│ └── server
│ ├── chatserver.cpp
│ ├── chatservice.cpp
│ ├── CMakeLists.txt
│ ├── db
│ │ └── db.cpp
│ ├── main.cpp
│ ├── model
│ │ ├── friendmoel.cpp
│ │ ├── groupmodel.cpp
│ │ ├── offlinemessagemodel.cpp
│ │ └── usermodel.cpp
│ └── redis
│ └── redis.cpp
├── test
│ ├── testjson
│ │ ├── json.hpp
│ │ ├── testjson
│ │ └── testjson.cpp
│ └── testmuduo
│ ├── CMakeLists.txt
│ └── muduo_server.cpp
└── thirdparty
└── json.hpp
2.chatserver类
chatserver是服务器层,主要提供服务器网络层的代码。
chatserver.h
#ifndef CHATSERVER_H
#define CHATSERVER_H
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
using namespace muduo;
using namespace muduo::net;
// 聊天服务器的主类
class ChatServer
{
public:
// 初始化聊天服务器对象
ChatServer(EventLoop *loop,
const InetAddress &listenAddr,
const string &nameArg);
// 启动服务
void start();
private:
// 上报链接相关信息的回调函数
void onConnection(const TcpConnectionPtr &);
// 上报读写事件相关信息的回调函数
void onMessage(const TcpConnectionPtr &,
Buffer *,
Timestamp);
TcpServer _server; // 组合的muduo库,实现服务器功能的类对象
EventLoop *_loop; // 指向事件循环对象的指针
};
#endif
chatserver.cpp
#include "chatserver.hpp"
#include "json.hpp"
#include "chatservice.hpp"
#include <iostream>
#include <functional>
#include <string>
using namespace std;
using namespace placeholders;
using json = nlohmann::json;
// 初始化聊天服务器对象
ChatServer::ChatServer(EventLoop *loop,
const InetAddress &listenAddr,
const string &nameArg)
: _server(loop, listenAddr, nameArg), _loop(loop)
{
// 注册链接回调
_server.setConnectionCallback(std::bind(&ChatServer::onConnection, this, _1));
// 注册消息回调
_server.setMessageCallback(std::bind(&ChatServer::onMessage, this, _1, _2, _3));
// 设置线程数量
_server.setThreadNum(4);
}
// 启动服务
void ChatServer::start()
{
_server.start();
}
// 上报链接相关信息的回调函数
void ChatServer::onConnection(const TcpConnectionPtr &conn)
{
// 客户端断开链接
if (!conn->connected())
{
ChatService::instance()->clientCloseException(conn);
conn->shutdown();
}
}
// 上报读写事件相关信息的回调函数
void ChatServer::onMessage(const TcpConnectionPtr &conn,
Buffer *buffer,
Timestamp time)
{
string buf = buffer->retrieveAllAsString();
// 测试,添加json打印代码
cout << buf << endl;
// 数据的反序列化
json js = json::parse(buf);
// 达到的目的:完全解耦网络模块的代码和业务模块的代码
// 通过js["msgid"] 获取=》业务handler=》conn js time
auto msgHandler = ChatService::instance()->getHandler(js["msgid"].get<int>());
// 回调消息绑定好的事件处理器,来执行相应的业务处理
msgHandler(conn, js, time);
}
3.chatservice类
提供业务方法
chatservice.h
#ifndef CHATSERVICE_H
#define CHATSERVICE_H
#include <muduo/net/TcpConnection.h>
#include <unordered_map>
#include <functional>
#include <mutex>
using namespace std;
using namespace muduo;
using namespace muduo::net;
#include "redis.hpp"
#include "groupmodel.hpp"
#include "friendmodel.hpp"
#include "usermodel.hpp"
#include "offlinemessagemodel.hpp"
#include "json.hpp"
using json = nlohmann::json;
// 表示处理消息的事件回调方法类型
using MsgHandler = std::function<void(const TcpConnectionPtr &conn, json &js, Timestamp)>;
// 聊天服务器业务类
class ChatService
{
public:
// 获取单例对象的接口函数
static ChatService *instance();
// 处理登录业务
void login(const TcpConnectionPtr &conn, json &js, Timestamp time);
// 处理注册业务
void reg(const TcpConnectionPtr &conn, json &js, Timestamp time);
// 一对一聊天业务
void oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time);
// 添加好友业务
void addFriend(const TcpConnectionPtr &conn, json &js, Timestamp time);
// 创建群组业务
void createGroup(const TcpConnectionPtr &conn, json &js, Timestamp time);
// 加入群组业务
void addGroup(const TcpConnectionPtr &conn, json &js, Timestamp time);
// 群组聊天业务
void groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time);
// 处理注销业务
void loginout(const TcpConnectionPtr &conn, json &js, Timestamp time);
// 处理客户端异常退出
void clientCloseException(const TcpConnectionPtr &conn);
// 服务器异常,业务重置方法
void reset();
// 获取消息对应的处理器
MsgHandler getHandler(int msgid);
// 从redis消息队列中获取订阅的消息
void handleRedisSubscribeMessage(int, string);
private:
ChatService();
// 存储消息id和其对应的业务处理方法
unordered_map<int, MsgHandler> _msgHandlerMap;
// 存储在线用户的通信连接
unordered_map<int, TcpConnectionPtr> _userConnMap;
// 定义互斥锁,保证_userConnMap的线程安全
mutex _connMutex;
// 数据操作类对象
UserModel _userModel;
OfflineMsgModel _offlineMsgModel;
FriendModel _friendModel;
GroupModel _groupModel;
// redis操作对象
Redis _redis;
};
#endif
chatservice.cpp
#include "chatservice.hpp"
#include "public.hpp"
#include <muduo/base/Logging.h>
#include <vector>
using namespace std;
using namespace muduo;
// 获取单例对象的接口函数
ChatService *ChatService::instance()
{
static ChatService service;
return &service;
}
// 注册消息以及对应的Handler回调操作
ChatService::ChatService()
{
// 用户基本业务管理相关事件处理回调注册
_msgHandlerMap.insert({LOGIN_MSG, std::bind(&ChatService::login, this, _1, _2, _3)});
_msgHandlerMap.insert({LOGINOUT_MSG, std::bind(&ChatService::loginout, this, _1, _2, _3)});
_msgHandlerMap.insert({REG_MSG, std::bind(&ChatService::reg, this, _1, _2, _3)});
_msgHandlerMap.insert({ONE_CHAT_MSG, std::bind(&ChatService::oneChat, this, _1, _2, _3)});
_msgHandlerMap.insert({ADD_FRIEND_MSG, std::bind(&ChatService::addFriend, this, _1, _2, _3)});
// 群组业务管理相关事件处理回调注册
_msgHandlerMap.insert({CREATE_GROUP_MSG, std::bind(&ChatService::createGroup, this, _1, _2, _3)});
_msgHandlerMap.insert({ADD_GROUP_MSG, std::bind(&ChatService::addGroup, this, _1, _2, _3)});
_msgHandlerMap.insert({GROUP_CHAT_MSG, std::bind(&ChatService::groupChat, this, _1, _2, _3)});
// 连接redis服务器
if (_redis.connect())
{
// 设置上报消息的回调
_redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage, this, _1, _2));
}
}
// 服务器异常,业务重置方法
void ChatService::reset()
{
// 把online状态的用户,设置成offline
_userModel.resetState();
}
// 获取消息对应的处理器
MsgHandler ChatService::getHandler(int msgid)
{
// 记录错误日志,msgid没有对应的事件处理回调
auto it = _msgHandlerMap.find(msgid);
if (it == _msgHandlerMap.end())
{
// 返回一个默认的处理器,空操作
return [=](const TcpConnectionPtr &conn, json &js, Timestamp) {
LOG_ERROR << "msgid:" << msgid << " can not find handler!";
};
}
else
{
return _msgHandlerMap[msgid];
}
}
// 处理登录业务 id pwd pwd
void ChatService::login(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
int id = js["id"].get<int>();
string pwd = js["password"];
User user = _userModel.query(id);
if (user.getId() == id && user.getPwd() == pwd)
{
if (user.getState() == "online")
{
// 该用户已经登录,不允许重复登录
json response;
response["msgid"] = LOGIN_MSG_ACK;
response["errno"] = 2;
response["errmsg"] = "this account is using, input another!";
conn->send(response.dump());
}
else
{
// 登录成功,记录用户连接信息
{
lock_guard<mutex> lock(_connMutex);
_userConnMap.insert({id, conn});
}
// id用户登录成功后,向redis订阅channel(id)
_redis.subscribe(id);
// 登录成功,更新用户状态信息 state offline=>online
user.setState("online");
_userModel.updateState(user);
json response;
response["msgid"] = LOGIN_MSG_ACK;
response["errno"] = 0;
response["id"] = user.getId();
response["name"] = user.getName();
// 查询该用户是否有离线消息
vector<string> vec = _offlineMsgModel.query(id);
if (!vec.empty())
{
response["offlinemsg"] = vec;
// 读取该用户的离线消息后,把该用户的所有离线消息删除掉
_offlineMsgModel.remove(id);
}
// 查询该用户的好友信息并返回
vector<User> userVec = _friendModel.query(id);
if (!userVec.empty())
{
vector<string> vec2;
for (User &user : userVec)
{
json js;
js["id"] = user.getId();
js["name"] = user.getName();
js["state"] = user.getState();
vec2.push_back(js.dump());
}
response["friends"] = vec2;
}
// 查询用户的群组信息
vector<Group> groupuserVec = _groupModel.queryGroups(id);
if (!groupuserVec.empty())
{
// group:[{groupid:[xxx, xxx, xxx, xxx]}]
vector<string> groupV;
for (Group &group : groupuserVec)
{
json grpjson;
grpjson["id"] = group.getId();
grpjson["groupname"] = group.getName();
grpjson["groupdesc"] = group.getDesc();
vector<string> userV;
for (GroupUser &user : group.getUsers())
{
json js;
js["id"] = user.getId();
js["name"] = user.getName();
js["state"] = user.getState();
js["role"] = user.getRole();
userV.push_back(js.dump());
}
grpjson["users"] = userV;
groupV.push_back(grpjson.dump());
}
response["groups"] = groupV;
}
conn->send(response.dump());
}
}
else
{
// 该用户不存在,用户存在但是密码错误,登录失败
json response;
response["msgid"] = LOGIN_MSG_ACK;
response["errno"] = 1;
response["errmsg"] = "id or password is invalid!";
conn->send(response.dump());
}
}
// 处理注册业务 name password
void ChatService::reg(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
string name = js["name"];
string pwd = js["password"];
User user;
user.setName(name);
user.setPwd(pwd);
bool state = _userModel.insert(user);
if (state)
{
// 注册成功
json response;
response["msgid"] = REG_MSG_ACK;
response["errno"] = 0;
response["id"] = user.getId();
conn->send(response.dump());
}
else
{
// 注册失败
json response;
response["msgid"] = REG_MSG_ACK;
response["errno"] = 1;
conn->send(response.dump());
}
}
// 处理注销业务
void ChatService::loginout(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
int userid = js["id"].get<int>();
{
lock_guard<mutex> lock(_connMutex);
auto it = _userConnMap.find(userid);
if (it != _userConnMap.end())
{
_userConnMap.erase(it);
}
}
// 用户注销,相当于就是下线,在redis中取消订阅通道
_redis.unsubscribe(userid);
// 更新用户的状态信息
User user(userid, "", "", "offline");
_userModel.updateState(user);
}
// 处理客户端异常退出
void ChatService::clientCloseException(const TcpConnectionPtr &conn)
{
User user;
{
lock_guard<mutex> lock(_connMutex);
for (auto it = _userConnMap.begin(); it != _userConnMap.end(); ++it)
{
if (it->second == conn)
{
// 从map表删除用户的链接信息
user.setId(it->first);
_userConnMap.erase(it);
break;
}
}
}
// 用户注销,相当于就是下线,在redis中取消订阅通道
_redis.unsubscribe(user.getId());
// 更新用户的状态信息
if (user.getId() != -1)
{
user.setState("offline");
_userModel.updateState(user);
}
}
// 一对一聊天业务
void ChatService::oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
int toid = js["toid"].get<int>();
{
lock_guard<mutex> lock(_connMutex);
auto it = _userConnMap.find(toid);
if (it != _userConnMap.end())
{
// toid在线,转发消息 服务器主动推送消息给toid用户
it->second->send(js.dump());
return;
}
}
// 查询toid是否在线
User user = _userModel.query(toid);
if (user.getState() == "online")
{
_redis.publish(toid, js.dump());
return;
}
// toid不在线,存储离线消息
_offlineMsgModel.insert(toid, js.dump());
}
// 添加好友业务 msgid id friendid
void ChatService::addFriend(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
int userid = js["id"].get<int>();
int friendid = js["friendid"].get<int>();
// 存储好友信息
_friendModel.insert(userid, friendid);
}
// 创建群组业务
void ChatService::createGroup(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
int userid = js["id"].get<int>();
string name = js["groupname"];
string desc = js["groupdesc"];
// 存储新创建的群组信息
Group group(-1, name, desc);
if (_groupModel.createGroup(group))
{
// 存储群组创建人信息
_groupModel.addGroup(userid, group.getId(), "creator");
}
}
// 加入群组业务
void ChatService::addGroup(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
int userid = js["id"].get<int>();
int groupid = js["groupid"].get<int>();
_groupModel.addGroup(userid, groupid, "normal");
}
// 群组聊天业务
void ChatService::groupChat(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
int userid = js["id"].get<int>();
int groupid = js["groupid"].get<int>();
vector<int> useridVec = _groupModel.queryGroupUsers(userid, groupid);
lock_guard<mutex> lock(_connMutex);
for (int id : useridVec)
{
auto it = _userConnMap.find(id);
if (it != _userConnMap.end())
{
// 转发群消息
it->second->send(js.dump());
}
else
{
// 查询toid是否在线
User user = _userModel.query(id);
if (user.getState() == "online")
{
_redis.publish(id, js.dump());
}
else
{
// 存储离线群消息
_offlineMsgModel.insert(id, js.dump());
}
}
}
}
// 从redis消息队列中获取订阅的消息
void ChatService::handleRedisSubscribeMessage(int userid, string msg)
{
lock_guard<mutex> lock(_connMutex);
auto it = _userConnMap.find(userid);
if (it != _userConnMap.end())
{
it->second->send(msg);
return;
}
// 存储该用户的离线消息
_offlineMsgModel.insert(userid, msg);
}
4.数据库DB类
db.h
#ifndef DB_H
#define DB_H
#include <mysql/mysql.h>
#include <string>
using namespace std;
// 数据库操作类
class MySQL
{
public:
// 初始化数据库连接
MySQL();
// 释放数据库连接资源
~MySQL();
// 连接数据库
bool connect();
// 更新操作
bool update(string sql);
// 查询操作
MYSQL_RES *query(string sql);
// 获取连接
MYSQL* getConnection();
private:
MYSQL *_conn;
};
#endif
db.cpp
#include "db.h"
#include <muduo/base/Logging.h>
// 数据库配置信息
static string server = "127.0.0.1";
static string user = "root";
static string password = "123456";
static string dbname = "chat";
// 初始化数据库连接
MySQL::MySQL()
{
_conn = mysql_init(nullptr);
}
// 释放数据库连接资源
MySQL::~MySQL()
{
if (_conn != nullptr)
mysql_close(_conn);
}
// 连接数据库
bool MySQL::connect()
{
MYSQL *p = mysql_real_connect(_conn, server.c_str(), user.c_str(),
password.c_str(), dbname.c_str(), 3306, nullptr, 0);
if (p != nullptr)
{
// C和C++代码默认的编码字符是ASCII,如果不设置,从MySQL上拉下来的中文显示?
mysql_query(_conn, "set names gbk");
LOG_INFO << "connect mysql success!";
}
else
{
LOG_INFO << "connect mysql fail!";
}
return p;
}
// 更新操作
bool MySQL::update(string sql)
{
if (mysql_query(_conn, sql.c_str()))
{
LOG_INFO << __FILE__ << ":" << __LINE__ << ":"
<< sql << "更新失败!";
return false;
}
return true;
}
// 查询操作
MYSQL_RES *MySQL::query(string sql)
{
if (mysql_query(_conn, sql.c_str()))
{
LOG_INFO << __FILE__ << ":" << __LINE__ << ":"
<< sql << "查询失败!";
return nullptr;
}
return mysql_use_result(_conn);
}
// 获取连接
MYSQL* MySQL::getConnection()
{
return _conn;
}
5.public.h序列号
#ifndef PUBLIC_H
#define PUBLIC_H
/*
server和client的公共文件
*/
enum EnMsgType
{
LOGIN_MSG = 1, // 登录消息
LOGIN_MSG_ACK, // 登录响应消息
LOGINOUT_MSG, // 注销消息
REG_MSG, // 注册消息
REG_MSG_ACK, // 注册响应消息
ONE_CHAT_MSG, // 聊天消息
ADD_FRIEND_MSG, // 添加好友消息
CREATE_GROUP_MSG, // 创建群组
ADD_GROUP_MSG, // 加入群组
GROUP_CHAT_MSG, // 群聊天
};
#endif
6.User和User_model
User.h
#ifndef USER_H
#define USER_H
#include <string>
using namespace std;
// User表的ORM类
class User
{
public:
User(int id = -1, string name = "", string pwd = "", string state = "offline")
{
this->id = id;
this->name = name;
this->password = pwd;
this->state = state;
}
void setId(int id) { this->id = id; }
void setName(string name) { this->name = name; }
void setPwd(string pwd) { this->password = pwd; }
void setState(string state) { this->state = state; }
int getId() { return this->id; }
string getName() { return this->name; }
string getPwd() { return this->password; }
string getState() { return this->state; }
protected:
int id;
string name;
string password;
string state;
};
#endif
UserModel.h
#ifndef USERMODEL_H
#define USERMODEL_H
#include "user.hpp"
// User表的数据操作类
class UserModel {
public:
// User表的增加方法
bool insert(User &user);
// 根据用户号码查询用户信息
User query(int id);
// 更新用户的状态信息
bool updateState(User user);
// 重置用户的状态信息
void resetState();
};
#endif
UserModel.cpp
#include "usermodel.hpp"
#include "db.h"
#include <iostream>
using namespace std;
// User表的增加方法
bool UserModel::insert(User &user)
{
// 1.组装sql语句
char sql[1024] = {0};
sprintf(sql, "insert into user(name, password, state) values('%s', '%s', '%s')",
user.getName().c_str(), user.getPwd().c_str(), user.getState().c_str());
MySQL mysql;
if (mysql.connect())
{
if (mysql.update(sql))
{
// 获取插入成功的用户数据生成的主键id
user.setId(mysql_insert_id(mysql.getConnection()));
return true;
}
}
return false;
}
// 根据用户号码查询用户信息
User UserModel::query(int id)
{
// 1.组装sql语句
char sql[1024] = {0};
sprintf(sql, "select * from user where id = %d", id);
MySQL mysql;
if (mysql.connect())
{
MYSQL_RES *res = mysql.query(sql);
if (res != nullptr)
{
MYSQL_ROW row = mysql_fetch_row(res);
if (row != nullptr)
{
User user;
user.setId(atoi(row[0]));
user.setName(row[1]);
user.setPwd(row[2]);
user.setState(row[3]);
mysql_free_result(res);
return user;
}
}
}
return User();
}
// 更新用户的状态信息
bool UserModel::updateState(User user)
{
// 1.组装sql语句
char sql[1024] = {0};
sprintf(sql, "update user set state = '%s' where id = %d", user.getState().c_str(), user.getId());
MySQL mysql;
if (mysql.connect())
{
if (mysql.update(sql))
{
return true;
}
}
return false;
}
// 重置用户的状态信息
void UserModel::resetState()
{
// 1.组装sql语句
char sql[1024] = "update user set state = 'offline' where state = 'online'";
MySQL mysql;
if (mysql.connect())
{
mysql.update(sql);
}
}
7.离线消息offlinemessagemodel
offlinemessagemodel.h
#ifndef OFFLINEMESSAGEMODEL_H
#define OFFLINEMESSAGEMODEL_H
#include <string>
#include <vector>
using namespace std;
// 提供离线消息表的操作接口方法
class OfflineMsgModel
{
public:
// 存储用户的离线消息
void insert(int userid, string msg);
// 删除用户的离线消息
void remove(int userid);
// 查询用户的离线消息
vector<string> query(int userid);
};
#endif
offlinemessagemodel.cpp
#include "offlinemessagemodel.hpp"
#include "db.h"
// 存储用户的离线消息
void OfflineMsgModel::insert(int userid, string msg)
{
// 1.组装sql语句
char sql[1024] = {0};
sprintf(sql, "insert into offlinemessage values(%d, '%s')", userid, msg.c_str());
MySQL mysql;
if (mysql.connect())
{
mysql.update(sql);
}
}
// 删除用户的离线消息
void OfflineMsgModel::remove(int userid)
{
// 1.组装sql语句
char sql[1024] = {0};
sprintf(sql, "delete from offlinemessage where userid=%d", userid);
MySQL mysql;
if (mysql.connect())
{
mysql.update(sql);
}
}
// 查询用户的离线消息
vector<string> OfflineMsgModel::query(int userid)
{
// 1.组装sql语句
char sql[1024] = {0};
sprintf(sql, "select message from offlinemessage where userid = %d", userid);
vector<string> vec;
MySQL mysql;
if (mysql.connect())
{
MYSQL_RES *res = mysql.query(sql);
if (res != nullptr)
{
// 把userid用户的所有离线消息放入vec中返回
MYSQL_ROW row;
while((row = mysql_fetch_row(res)) != nullptr)
{
vec.push_back(row[0]);
}
mysql_free_result(res);
return vec;
}
}
return vec;
}
8.friendmodel
friendmodel.hpp
#ifndef FRIENDMODEL_H
#define FRIENDMODEL_H
#include "user.hpp"
#include <vector>
using namespace std;
// 维护好友信息的操作接口方法
class FriendModel
{
public:
// 添加好友关系
void insert(int userid, int friendid);
// 返回用户好友列表
vector<User> query(int userid);
};
#endif
friendmodel.cpp
#include "friendmodel.hpp"
#include "db.h"
// 添加好友关系
void FriendModel::insert(int userid, int friendid)
{
// 1.组装sql语句
char sql[1024] = {0};
sprintf(sql, "insert into friend values(%d, %d)", userid, friendid);
MySQL mysql;
if (mysql.connect())
{
mysql.update(sql);
}
}
// 返回用户好友列表
vector<User> FriendModel::query(int userid)
{
// 1.组装sql语句
char sql[1024] = {0};
sprintf(sql, "select a.id,a.name,a.state from user a inner join friend b on b.friendid = a.id where b.userid=%d", userid);
vector<User> vec;
MySQL mysql;
if (mysql.connect())
{
MYSQL_RES *res = mysql.query(sql);
if (res != nullptr)
{
// 把userid用户的所有离线消息放入vec中返回
MYSQL_ROW row;
while((row = mysql_fetch_row(res)) != nullptr)
{
User user;
user.setId(atoi(row[0]));
user.setName(row[1]);
user.setState(row[2]);
vec.push_back(user);
}
mysql_free_result(res);
return vec;
}
}
return vec;
}
9.群聊业务
groupuser.hpp
#ifndef GROUPUSER_H
#define GROUPUSER_H
#include "user.hpp"
// 群组用户,多了一个role角色信息,从User类直接继承,复用User的其它信息
class GroupUser : public User
{
public:
void setRole(string role) { this->role = role; }
string getRole() { return this->role; }
private:
string role;
};
#endif
group.hpp
#ifndef GROUP_H
#define GROUP_H
#include "groupuser.hpp"
#include <string>
#include <vector>
using namespace std;
// User表的ORM类
class Group
{
public:
Group(int id = -1, string name = "", string desc = "")
{
this->id = id;
this->name = name;
this->desc = desc;
}
void setId(int id) { this->id = id; }
void setName(string name) { this->name = name; }
void setDesc(string desc) { this->desc = desc; }
int getId() { return this->id; }
string getName() { return this->name; }
string getDesc() { return this->desc; }
vector<GroupUser> &getUsers() { return this->users; }
private:
int id;
string name;
string desc;
vector<GroupUser> users;
};
#endif
groupmodel.hpp
#ifndef GROUPMODEL_H
#define GROUPMODEL_H
#include "group.hpp"
#include <string>
#include <vector>
using namespace std;
// 维护群组信息的操作接口方法
class GroupModel
{
public:
// 创建群组
bool createGroup(Group &group);
// 加入群组
void addGroup(int userid, int groupid, string role);
// 查询用户所在群组信息
vector<Group> queryGroups(int userid);
// 根据指定的groupid查询群组用户id列表,除userid自己,主要用户群聊业务给群组其它成员群发消息
vector<int> queryGroupUsers(int userid, int groupid);
};
#endif
groupmodel.cpp
#include "groupmodel.hpp"
#include "db.h"
// 创建群组
bool GroupModel::createGroup(Group &group)
{
// 1.组装sql语句
char sql[1024] = {0};
sprintf(sql, "insert into allgroup(groupname, groupdesc) values('%s', '%s')",
group.getName().c_str(), group.getDesc().c_str());
MySQL mysql;
if (mysql.connect())
{
if (mysql.update(sql))
{
group.setId(mysql_insert_id(mysql.getConnection()));
return true;
}
}
return false;
}
// 加入群组
void GroupModel::addGroup(int userid, int groupid, string role)
{
// 1.组装sql语句
char sql[1024] = {0};
sprintf(sql, "insert into groupuser values(%d, %d, '%s')",
groupid, userid, role.c_str());
MySQL mysql;
if (mysql.connect())
{
mysql.update(sql);
}
}
// 查询用户所在群组信息
vector<Group> GroupModel::queryGroups(int userid)
{
/*
1. 先根据userid在groupuser表中查询出该用户所属的群组信息
2. 在根据群组信息,查询属于该群组的所有用户的userid,并且和user表进行多表联合查询,查出用户的详细信息
*/
char sql[1024] = {0};
sprintf(sql, "select a.id,a.groupname,a.groupdesc from allgroup a inner join \
groupuser b on a.id = b.groupid where b.userid=%d",
userid);
vector<Group> groupVec;
MySQL mysql;
if (mysql.connect())
{
MYSQL_RES *res = mysql.query(sql);
if (res != nullptr)
{
MYSQL_ROW row;
// 查出userid所有的群组信息
while ((row = mysql_fetch_row(res)) != nullptr)
{
Group group;
group.setId(atoi(row[0]));
group.setName(row[1]);
group.setDesc(row[2]);
groupVec.push_back(group);
}
mysql_free_result(res);
}
}
// 查询群组的用户信息
for (Group &group : groupVec)
{
sprintf(sql, "select a.id,a.name,a.state,b.grouprole from user a \
inner join groupuser b on b.userid = a.id where b.groupid=%d",
group.getId());
MYSQL_RES *res = mysql.query(sql);
if (res != nullptr)
{
MYSQL_ROW row;
while ((row = mysql_fetch_row(res)) != nullptr)
{
GroupUser user;
user.setId(atoi(row[0]));
user.setName(row[1]);
user.setState(row[2]);
user.setRole(row[3]);
group.getUsers().push_back(user);
}
mysql_free_result(res);
}
}
return groupVec;
}
// 根据指定的groupid查询群组用户id列表,除userid自己,主要用户群聊业务给群组其它成员群发消息
vector<int> GroupModel::queryGroupUsers(int userid, int groupid)
{
char sql[1024] = {0};
sprintf(sql, "select userid from groupuser where groupid = %d and userid != %d", groupid, userid);
vector<int> idVec;
MySQL mysql;
if (mysql.connect())
{
MYSQL_RES *res = mysql.query(sql);
if (res != nullptr)
{
MYSQL_ROW row;
while ((row = mysql_fetch_row(res)) != nullptr)
{
idVec.push_back(atoi(row[0]));
}
mysql_free_result(res);
}
}
return idVec;
}
10.Client
#include "json.hpp"
#include <iostream>
#include <thread>
#include <string>
#include <vector>
#include <chrono>
#include <ctime>
#include <unordered_map>
#include <functional>
using namespace std;
using json = nlohmann::json;
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <semaphore.h>
#include <atomic>
#include "group.hpp"
#include "user.hpp"
#include "public.hpp"
// 记录当前系统登录的用户信息
User g_currentUser;
// 记录当前登录用户的好友列表信息
vector<User> g_currentUserFriendList;
// 记录当前登录用户的群组列表信息
vector<Group> g_currentUserGroupList;
// 控制主菜单页面程序
bool isMainMenuRunning = false;
// 用于读写线程之间的通信
sem_t rwsem;
// 记录登录状态
atomic_bool g_isLoginSuccess{false};
// 接收线程
void readTaskHandler(int clientfd);
// 获取系统时间(聊天信息需要添加时间信息)
string getCurrentTime();
// 主聊天页面程序
void mainMenu(int);
// 显示当前登录成功用户的基本信息
void showCurrentUserData();
// 聊天客户端程序实现,main线程用作发送线程,子线程用作接收线程
int main(int argc, char **argv)
{
if (argc < 3)
{
cerr << "command invalid! example: ./ChatClient 127.0.0.1 6000" << endl;
exit(-1);
}
// 解析通过命令行参数传递的ip和port
char *ip = argv[1];
uint16_t port = atoi(argv[2]);
// 创建client端的socket
int clientfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == clientfd)
{
cerr << "socket create error" << endl;
exit(-1);
}
// 填写client需要连接的server信息ip+port
sockaddr_in server;
memset(&server, 0, sizeof(sockaddr_in));
server.sin_family = AF_INET;
server.sin_port = htons(port);
server.sin_addr.s_addr = inet_addr(ip);
// client和server进行连接
if (-1 == connect(clientfd, (sockaddr *)&server, sizeof(sockaddr_in)))
{
cerr << "connect server error" << endl;
close(clientfd);
exit(-1);
}
// 初始化读写线程通信用的信号量
sem_init(&rwsem, 0, 0);
// 连接服务器成功,启动接收子线程
std::thread readTask(readTaskHandler, clientfd); // pthread_create
readTask.detach(); // pthread_detach
// main线程用于接收用户输入,负责发送数据
for (;;)
{
// 显示首页面菜单 登录、注册、退出
cout << "========================" << endl;
cout << "1. login" << endl;
cout << "2. register" << endl;
cout << "3. quit" << endl;
cout << "========================" << endl;
cout << "choice:";
int choice = 0;
cin >> choice;
cin.get(); // 读掉缓冲区残留的回车
switch (choice)
{
case 1: // login业务
{
int id = 0;
char pwd[50] = {0};
cout << "userid:";
cin >> id;
cin.get(); // 读掉缓冲区残留的回车
cout << "userpassword:";
cin.getline(pwd, 50);
json js;
js["msgid"] = LOGIN_MSG;
js["id"] = id;
js["password"] = pwd;
string request = js.dump();
g_isLoginSuccess = false;
int len = send(clientfd, request.c_str(), strlen(request.c_str()) + 1, 0);
if (len == -1)
{
cerr << "send login msg error:" << request << endl;
}
sem_wait(&rwsem); // 等待信号量,由子线程处理完登录的响应消息后,通知这里
if (g_isLoginSuccess)
{
// 进入聊天主菜单页面
isMainMenuRunning = true;
mainMenu(clientfd);
}
}
break;
case 2: // register业务
{
char name[50] = {0};
char pwd[50] = {0};
cout << "username:";
cin.getline(name, 50);
cout << "userpassword:";
cin.getline(pwd, 50);
json js;
js["msgid"] = REG_MSG;
js["name"] = name;
js["password"] = pwd;
string request = js.dump();
int len = send(clientfd, request.c_str(), strlen(request.c_str()) + 1, 0);
if (len == -1)
{
cerr << "send reg msg error:" << request << endl;
}
sem_wait(&rwsem); // 等待信号量,子线程处理完注册消息会通知
}
break;
case 3: // quit业务
close(clientfd);
sem_destroy(&rwsem);
exit(0);
default:
cerr << "invalid input!" << endl;
break;
}
}
return 0;
}
// 处理注册的响应逻辑
void doRegResponse(json &responsejs)
{
if (0 != responsejs["errno"].get<int>()) // 注册失败
{
cerr << "name is already exist, register error!" << endl;
}
else // 注册成功
{
cout << "name register success, userid is " << responsejs["id"]
<< ", do not forget it!" << endl;
}
}
// 处理登录的响应逻辑
void doLoginResponse(json &responsejs)
{
if (0 != responsejs["errno"].get<int>()) // 登录失败
{
cerr << responsejs["errmsg"] << endl;
g_isLoginSuccess = false;
}
else // 登录成功
{
// 记录当前用户的id和name
g_currentUser.setId(responsejs["id"].get<int>());
g_currentUser.setName(responsejs["name"]);
// 记录当前用户的好友列表信息
if (responsejs.contains("friends"))
{
// 初始化
g_currentUserFriendList.clear();
vector<string> vec = responsejs["friends"];
for (string &str : vec)
{
json js = json::parse(str);
User user;
user.setId(js["id"].get<int>());
user.setName(js["name"]);
user.setState(js["state"]);
g_currentUserFriendList.push_back(user);
}
}
// 记录当前用户的群组列表信息
if (responsejs.contains("groups"))
{
// 初始化
g_currentUserGroupList.clear();
vector<string> vec1 = responsejs["groups"];
for (string &groupstr : vec1)
{
json grpjs = json::parse(groupstr);
Group group;
group.setId(grpjs["id"].get<int>());
group.setName(grpjs["groupname"]);
group.setDesc(grpjs["groupdesc"]);
vector<string> vec2 = grpjs["users"];
for (string &userstr : vec2)
{
GroupUser user;
json js = json::parse(userstr);
user.setId(js["id"].get<int>());
user.setName(js["name"]);
user.setState(js["state"]);
user.setRole(js["role"]);
group.getUsers().push_back(user);
}
g_currentUserGroupList.push_back(group);
}
}
// 显示登录用户的基本信息
showCurrentUserData();
// 显示当前用户的离线消息 个人聊天信息或者群组消息
if (responsejs.contains("offlinemsg"))
{
vector<string> vec = responsejs["offlinemsg"];
for (string &str : vec)
{
json js = json::parse(str);
// time + [id] + name + " said: " + xxx
if (ONE_CHAT_MSG == js["msgid"].get<int>())
{
cout << js["time"].get<string>() << " [" << js["id"] << "]" << js["name"].get<string>()
<< " said: " << js["msg"].get<string>() << endl;
}
else
{
cout << "群消息[" << js["groupid"] << "]:" << js["time"].get<string>() << " [" << js["id"] << "]" << js["name"].get<string>()
<< " said: " << js["msg"].get<string>() << endl;
}
}
}
g_isLoginSuccess = true;
}
}
// 子线程 - 接收线程
void readTaskHandler(int clientfd)
{
for (;;)
{
char buffer[1024] = {0};
int len = recv(clientfd, buffer, 1024, 0); // 阻塞了
if (-1 == len || 0 == len)
{
close(clientfd);
exit(-1);
}
// 接收ChatServer转发的数据,反序列化生成json数据对象
json js = json::parse(buffer);
int msgtype = js["msgid"].get<int>();
if (ONE_CHAT_MSG == msgtype)
{
cout << js["time"].get<string>() << " [" << js["id"] << "]" << js["name"].get<string>()
<< " said: " << js["msg"].get<string>() << endl;
continue;
}
if (GROUP_CHAT_MSG == msgtype)
{
cout << "群消息[" << js["groupid"] << "]:" << js["time"].get<string>() << " [" << js["id"] << "]" << js["name"].get<string>()
<< " said: " << js["msg"].get<string>() << endl;
continue;
}
if (LOGIN_MSG_ACK == msgtype)
{
doLoginResponse(js); // 处理登录响应的业务逻辑
sem_post(&rwsem); // 通知主线程,登录结果处理完成
continue;
}
if (REG_MSG_ACK == msgtype)
{
doRegResponse(js);
sem_post(&rwsem); // 通知主线程,注册结果处理完成
continue;
}
}
}
// 显示当前登录成功用户的基本信息
void showCurrentUserData()
{
cout << "======================login user======================" << endl;
cout << "current login user => id:" << g_currentUser.getId() << " name:" << g_currentUser.getName() << endl;
cout << "----------------------friend list---------------------" << endl;
if (!g_currentUserFriendList.empty())
{
for (User &user : g_currentUserFriendList)
{
cout << user.getId() << " " << user.getName() << " " << user.getState() << endl;
}
}
cout << "----------------------group list----------------------" << endl;
if (!g_currentUserGroupList.empty())
{
for (Group &group : g_currentUserGroupList)
{
cout << group.getId() << " " << group.getName() << " " << group.getDesc() << endl;
for (GroupUser &user : group.getUsers())
{
cout << user.getId() << " " << user.getName() << " " << user.getState()
<< " " << user.getRole() << endl;
}
}
}
cout << "======================================================" << endl;
}
// "help" command handler
void help(int fd = 0, string str = "");
// "chat" command handler
void chat(int, string);
// "addfriend" command handler
void addfriend(int, string);
// "creategroup" command handler
void creategroup(int, string);
// "addgroup" command handler
void addgroup(int, string);
// "groupchat" command handler
void groupchat(int, string);
// "loginout" command handler
void loginout(int, string);
// 系统支持的客户端命令列表
unordered_map<string, string> commandMap = {
{"help", "显示所有支持的命令,格式help"},
{"chat", "一对一聊天,格式chat:friendid:message"},
{"addfriend", "添加好友,格式addfriend:friendid"},
{"creategroup", "创建群组,格式creategroup:groupname:groupdesc"},
{"addgroup", "加入群组,格式addgroup:groupid"},
{"groupchat", "群聊,格式groupchat:groupid:message"},
{"loginout", "注销,格式loginout"}};
// 注册系统支持的客户端命令处理
unordered_map<string, function<void(int, string)>> commandHandlerMap = {
{"help", help},
{"chat", chat},
{"addfriend", addfriend},
{"creategroup", creategroup},
{"addgroup", addgroup},
{"groupchat", groupchat},
{"loginout", loginout}};
// 主聊天页面程序
void mainMenu(int clientfd)
{
help();
char buffer[1024] = {0};
while (isMainMenuRunning)
{
cin.getline(buffer, 1024);
string commandbuf(buffer);
string command; // 存储命令
int idx = commandbuf.find(":");
if (-1 == idx)
{
command = commandbuf;
}
else
{
command = commandbuf.substr(0, idx);
}
auto it = commandHandlerMap.find(command);
if (it == commandHandlerMap.end())
{
cerr << "invalid input command!" << endl;
continue;
}
// 调用相应命令的事件处理回调,mainMenu对修改封闭,添加新功能不需要修改该函数
it->second(clientfd, commandbuf.substr(idx + 1, commandbuf.size() - idx)); // 调用命令处理方法
}
}
// "help" command handler
void help(int, string)
{
cout << "show command list >>> " << endl;
for (auto &p : commandMap)
{
cout << p.first << " : " << p.second << endl;
}
cout << endl;
}
// "addfriend" command handler
void addfriend(int clientfd, string str)
{
int friendid = atoi(str.c_str());
json js;
js["msgid"] = ADD_FRIEND_MSG;
js["id"] = g_currentUser.getId();
js["friendid"] = friendid;
string buffer = js.dump();
int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
if (-1 == len)
{
cerr << "send addfriend msg error -> " << buffer << endl;
}
}
// "chat" command handler
void chat(int clientfd, string str)
{
int idx = str.find(":"); // friendid:message
if (-1 == idx)
{
cerr << "chat command invalid!" << endl;
return;
}
int friendid = atoi(str.substr(0, idx).c_str());
string message = str.substr(idx + 1, str.size() - idx);
json js;
js["msgid"] = ONE_CHAT_MSG;
js["id"] = g_currentUser.getId();
js["name"] = g_currentUser.getName();
js["toid"] = friendid;
js["msg"] = message;
js["time"] = getCurrentTime();
string buffer = js.dump();
int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
if (-1 == len)
{
cerr << "send chat msg error -> " << buffer << endl;
}
}
// "creategroup" command handler groupname:groupdesc
void creategroup(int clientfd, string str)
{
int idx = str.find(":");
if (-1 == idx)
{
cerr << "creategroup command invalid!" << endl;
return;
}
string groupname = str.substr(0, idx);
string groupdesc = str.substr(idx + 1, str.size() - idx);
json js;
js["msgid"] = CREATE_GROUP_MSG;
js["id"] = g_currentUser.getId();
js["groupname"] = groupname;
js["groupdesc"] = groupdesc;
string buffer = js.dump();
int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
if (-1 == len)
{
cerr << "send creategroup msg error -> " << buffer << endl;
}
}
// "addgroup" command handler
void addgroup(int clientfd, string str)
{
int groupid = atoi(str.c_str());
json js;
js["msgid"] = ADD_GROUP_MSG;
js["id"] = g_currentUser.getId();
js["groupid"] = groupid;
string buffer = js.dump();
int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
if (-1 == len)
{
cerr << "send addgroup msg error -> " << buffer << endl;
}
}
// "groupchat" command handler groupid:message
void groupchat(int clientfd, string str)
{
int idx = str.find(":");
if (-1 == idx)
{
cerr << "groupchat command invalid!" << endl;
return;
}
int groupid = atoi(str.substr(0, idx).c_str());
string message = str.substr(idx + 1, str.size() - idx);
json js;
js["msgid"] = GROUP_CHAT_MSG;
js["id"] = g_currentUser.getId();
js["name"] = g_currentUser.getName();
js["groupid"] = groupid;
js["msg"] = message;
js["time"] = getCurrentTime();
string buffer = js.dump();
int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
if (-1 == len)
{
cerr << "send groupchat msg error -> " << buffer << endl;
}
}
// "loginout" command handler
void loginout(int clientfd, string)
{
json js;
js["msgid"] = LOGINOUT_MSG;
js["id"] = g_currentUser.getId();
string buffer = js.dump();
int len = send(clientfd, buffer.c_str(), strlen(buffer.c_str()) + 1, 0);
if (-1 == len)
{
cerr << "send loginout msg error -> " << buffer << endl;
}
else
{
isMainMenuRunning = false;
}
}
// 获取系统时间(聊天信息需要添加时间信息)
string getCurrentTime()
{
auto tt = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
struct tm *ptm = localtime(&tt);
char date[60] = {0};
sprintf(date, "%d-%02d-%02d %02d:%02d:%02d",
(int)ptm->tm_year + 1900, (int)ptm->tm_mon + 1, (int)ptm->tm_mday,
(int)ptm->tm_hour, (int)ptm->tm_min, (int)ptm->tm_sec);
return std::string(date);
}