缓存内存池
main.cc
#include <iostream>
#include <libgen.h>
#include "benchmark.h"
#include "concurrentalloc.h"
using namespace std;
using namespace apollo;
int main(int argc, char* argv[]) {
#ifdef TCMALLOC
cout << "defined!" << endl;
#else
cout << "undefined!" << endl;
#endif
if (argc <= 2) {
cout << "Usage: " << basename(argv[0]) << " malloc_free_count thread_count" << endl;
return 0;
}
cout << "=============" << endl;
size_t n = atoi(argv[1]);
size_t threadcnt = atoi(argv[2]);
BenchmarkMalloc(n, threadcnt, 10);
cout << endl
<< endl;
BenchmarkConcurrentMalloc(n, threadcnt, 10);
cout << "=============" << endl;
return 0;
}
工具类 utilis
utilis.h
#ifndef __APOLLO_UTILTS_H__
#define __APOLLO_UTILTS_H__
#include <cassert>
#include <mutex>
#include <new>
#ifdef _WIN32
#include <Windows.h>
#elif __linux__
#include <unistd.h>
#endif
namespace apollo {
/// ThreadCache申请内存的上限 即256KB
static const size_t kMaxBytes = 256 * 1024;
/// 哈希桶的数目
static const size_t kBucketSize = 208;
/// PageCache中哈希桶的数目
static const size_t kPageBucketSize = 129;
/// 页大小偏移转换 即2的12次方为4096 即一页的大小
static const size_t kPageShift = 12;
/// 页号: 地址=页号*4K
#ifdef _WIN64
using page_t = unsigned long long;
#elif _WIN32
using page_t = size_t;
#elif __linux__
using page_t = unsigned long long;
#endif
/// 线程局部存储
#ifdef _WIN32
#define TLS __declspec(thread)
#elif __linux__
#define TLS __thread
#endif
/**
* @brief 调用系统接口申请内存
*/
inline static void* systemAlloc(size_t npage) {
#ifdef _WIN32
void* ptr = VirtualAlloc(0, _npage * (1 << kPageShift), MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#elif __linux__
void* ptr = sbrk(npage * (1 << kPageShift));
#endif
if (ptr == nullptr) {
throw std::bad_alloc();
}
return ptr;
}
/**
* @brief 调用系统接口释放内存
*/
inline static void systemFree(void* ptr) {
#ifdef _WIN32
VirtualFree(_ptr, 0, MEM_RELEASE);
#elif __linux__
brk(ptr);
#endif
}
/**
* @breif 访问下一个对象
*/
inline static void*& nextObj(void* ptr) {
return (*(void**)ptr);
}
/**
* @brief 用于字节对齐和哈希映射
* @details 不同字节数的对齐数不同
* ======================================
* 字节数 对齐数 哈希桶下标
* ======================================
* [1,128] 8 [0, 16)
* [129, 1024] 16 [16, 72)
* [1025, 8K] 128 [72, 128)
* [8K+1, 64K] 1024 [128, 184)
* [64K+1, 256K] 8K [184, 208)
* ======================================
*/
class AlignHelper {
public:
/**
* @brief 获取向上对齐后的字节数
*/
static inline size_t roundUp(size_t bytes) {
if (bytes <= 128) {
return _roundUp(bytes, 8);
} else if (bytes <= 1024) {
return _roundUp(bytes, 16);
} else if (bytes <= 8 * 1024) {
return _roundUp(bytes, 128);
} else if (bytes <= 64 * 1024) {
return _roundUp(bytes, 1024);
} else if (bytes <= 256 * 1024) {
return _roundUp(bytes, 8 * 1024);
} else {
// 大于256KB的按页对齐
return _roundUp(bytes, 1 << kPageShift);
}
}
/**
* @brief 获取对应的哈希桶的下标
*/
static inline size_t index(size_t bytes) {
// 每个区间内的自由链表数目
static size_t group[4] = { 16, 56, 56, 56 };
if (bytes <= 128) {
return _index(bytes, 3);
} else if (bytes <= 1024) {
return _index(bytes - 128, 4) + group[0];
} else if (bytes <= 8 * 1024) {
return _index(bytes - 1024, 7) + group[0] + group[1];
} else if (bytes <= 64 * 1024) {
return _index(bytes - 8 * 1024, 10) + group[0] + group[1] + group[2];
} else if (bytes <= 256 * 1024) {
return _index(bytes - 64 * 1024, 13) + group[0] + group[1] + group[2] + group[3];
} else {
assert(false);
return -1;
}
}
/**
* @brief 获取CentralCache实际应给ThreadCache的具体的对象个数
* @details 通过慢开始反馈调节算法,将对象数目控制在2~512个之间
*/
static size_t numMoveSize(size_t size) {
assert(size > 0);
// 对象越小,计算出的上限越高
// 对象越大,计算出的上限越低
int num = kMaxBytes / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
static size_t numMovePage(size_t size) {
size_t num = numMoveSize(size); // 计算出ThreadCache一次向CentralCache申请对象的个数上限
// 先计算num个size大小的对象所需的字节数 然后将字节数转化为页数
size_t npage = (num * size) >> kPageShift;
if (npage == 0) // 至少给一页
{
npage = 1;
}
return npage;
}
private:
/**
* @brief 获取向上对齐后的字节数
*
* @param _bytes 需要调整的字节数
* @param _align 对齐数
* @return 返回对齐后的字节数
*/
static inline size_t _roundUp(size_t bytes, size_t align) {
return ((bytes + align - 1) & ~(align - 1));
}
/**
* @brief 获取对应的哈希桶的下标
*
* @param _bytes 字节数
* @param _align 将对齐数转换为2的n次方的形式 例如对齐数为8 则传入3
* @return 返回字节数对应的哈希桶下标
*/
static inline size_t _index(size_t bytes, size_t alignShift) {
return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1;
}
};
/**
* @brief 管理小内存块的自由链表
*/
class FreeList {
public:
FreeList()
: freelist_(nullptr)
, maxSize_(1)
, size_(0) { }
/**
* @brief 将对象头插到自由链表中
*/
void push(void* obj) {
assert(obj);
nextObj(obj) = freelist_;
freelist_ = obj;
}
/**
* @brief 弹出自由链表的头部对象
*/
void* pop() {
assert(freelist_);
void* obj = freelist_;
freelist_ = nextObj(freelist_);
return obj;
}
/**
* @brief 将start和end之间的对象插入到自由链表中
*
* @param _start 所插入对象的起始地址
* @param _end 所插入对象的结束地址
* @param _cnt 所插入的对象的个数
*/
void pushRange(void* start, void* end, size_t cnt) {
assert(start);
assert(end);
// 头插
nextObj(end) = freelist_;
freelist_ = start;
size_ += cnt;
}
/**
* @brief 将start和end之间的对象从自由链表中移除
*
* @param _start 传出参数,所移除对象的起始地址
* @param _end 传出参数,所移除对象的结束地址
* @param _cnt 所移除对象的个数
*/
void popRange(void*& start, void*& end, size_t cnt) {
assert(cnt <= size_);
// 头删
start = freelist_;
end = start;
for (size_t i = 0; i < cnt - 1; i++) {
end = nextObj(end);
}
freelist_ = nextObj(end); // 自由链表指向end的下一个对象
nextObj(end) = nullptr; // 取出的一段链表的表尾置空
size_ -= cnt;
}
void* clear() {
size_ = 0;
void* list = freelist_;
freelist_ = nullptr;
return list;
}
bool empty() const { return freelist_ == nullptr; }
size_t maxSize() const { return maxSize_; }
void setMaxSize(size_t _size) { maxSize_ = _size; }
size_t size() const { return size_; }
private:
void* freelist_;
size_t maxSize_; // 内存块的最大数目
size_t size_;
};
/**
* @brief 对象池
*/
template <class T>
class ObjectPool {
public:
/// 分配内存
T* alloc() {
T* obj = nullptr;
// 优先利用归还的内存块对象
if (freelist_ != nullptr) {
// 从自由链表头部删除一个对象
obj = (T*)freelist_;
freelist_ = nextObj(freelist_);
} else {
// 保证对象足够存储地址
size_t objsize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
// 剩余内存不够一个对象大小时 重新开辟大块内存空间
if (remainBytes_ < objsize) {
// 如果对象大小大于默认大小则需要进行更改
size_t bytes = sizeof(T) > kMemorySize_ ? sizeof(T) : kMemorySize_;
memory_ = (char*)systemAlloc(bytes >> kPageShift);
if (memory_ == nullptr) {
throw std::bad_alloc();
}
remainBytes_ = bytes;
}
// 从大块内存中切出objsize字节的内存
obj = (T*)memory_;
memory_ += objsize;
remainBytes_ -= objsize;
}
// 定位new 显式调用对象的构造函数
new (obj) T;
return obj;
}
/// 释放内存
void free(T* obj) {
// 显式调用析构函数清理对象
obj->~T();
// 将释放的对象头插到自由链表
nextObj(obj) = freelist_;
freelist_ = obj;
}
private:
char* memory_ = nullptr; // 指向大块内存
size_t remainBytes_ = 0; // 大块内存在切分过程中剩余的字节数
void* freelist_ = nullptr; // 内存块归还后形成的自由链表
const size_t kMemorySize_ = 128 * 1024;
};
/**
* @brief 管理以页为单位的大内存块
*/
struct Span {
Span()
: pageId_(0)
, cnt_(0)
, next_(nullptr)
, prev_(nullptr)
, useCnt_(0)
, freelist_(nullptr)
, used_(false)
, blockSize_(0) { }
page_t pageId_; // 大块内存的起始页号
size_t cnt_; // 页的数量
Span* next_; // 下一个大块内存
Span* prev_; // 上一个大块内存
size_t useCnt_; // 切割为小块内存后,分配给ThreadCache的计数
void* freelist_; // 切割为小块内存后形成的自由链表
bool used_; // 是否正在被使用
size_t blockSize_; // 所切割的小内存块的大小
};
/**
* @brief 带有头节点的大内存块双向链表
*/
class SpanList {
public:
SpanList() {
head_ = spanpool_.alloc();
head_->next_ = head_;
head_->prev_ = head_;
}
/**
* @brief 将newspan插入到pos之前
*
* @param _pos 要插入的位置
* @param _newspan 被插入的对象
*/
void insert(Span* pos, Span* newspan) {
assert(pos && newspan);
Span* prev = pos->prev_;
prev->next_ = newspan;
newspan->prev_ = prev;
newspan->next_ = pos;
pos->prev_ = newspan;
}
/**
* @brief 将pos指向的元素从双向链表中移除
*/
void erase(Span* pos) {
assert(pos && pos != head_);
Span* prev = pos->prev_;
Span* next = pos->next_;
prev->next_ = next;
next->prev_ = prev;
}
Span* begin() { return head_->next_; }
Span* end() { return head_; }
bool empty() { return head_ == head_->next_; }
void pushFront(Span* span) { insert(begin(), span); }
Span* popFront() {
Span* front = head_->next_;
erase(front);
return front;
}
private:
Span* head_;
static ObjectPool<Span> spanpool_;
public:
std::mutex mtx_;
};
} // namespace apollo
#endif // !__APOLLO_UTILTS_HPP__
utilis.cc
#include "utilis.h"
using namespace apollo;
ObjectPool<Span> SpanList::spanpool_;
测试 benchmark
benchmark.h
#ifndef _TEST_H_
#define _TEST_H_
#include <stddef.h>
struct TreeNode {
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode()
: _val(0)
, _left(nullptr)
, _right(nullptr) { }
};
/// 测试malloc/free
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds);
/// 测试concurrentAlloc/concurrentFree
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds);
#endif // !_TEST_H_
benchmark.cc
#include "benchmark.h"
#include "concurrentalloc.h"
#include "utilis.h"
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>
using namespace std;
using namespace apollo;
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds) {
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime;
malloc_costtime = 0;
std::atomic<size_t> free_costtime;
free_costtime = 0;
for (size_t k = 0; k < nworks; ++k) {
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j) {
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++) {
// v.push_back(malloc(16));
v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++) {
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread) {
t.join();
}
cout << nworks << " 个线程并发执行 " << rounds << " 轮次,每轮次 malloc " << ntimes
<< " 次,共花费 " << (double)malloc_costtime / CLOCKS_PER_SEC << " s" << endl;
cout << nworks << " 个线程并发执行 " << rounds << " 轮次,每轮次 free " << ntimes
<< " 次,共花费 " << (double)free_costtime / CLOCKS_PER_SEC << " s" << endl;
cout << nworks << " 个线程并发执行 malloc&free " << nworks * rounds * ntimes
<< " 次,共花费 " << ((double)malloc_costtime + (double)free_costtime) / CLOCKS_PER_SEC << " s" << endl;
}
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds) {
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime;
malloc_costtime = 0;
std::atomic<size_t> free_costtime;
free_costtime = 0;
for (size_t k = 0; k < nworks; ++k) {
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j) {
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++) {
// v.push_back(ConcurrentAlloc(16));
v.push_back(concurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++) {
concurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread) {
t.join();
}
cout << nworks << " 个线程并发执行 " << rounds << " 轮次,每轮次 alloc " << ntimes
<< " 次,共花费 " << (double)malloc_costtime / CLOCKS_PER_SEC << " s" << endl;
cout << nworks << " 个线程并发执行 " << rounds << " 轮次,每轮次 dealloc " << ntimes
<< " 次,共花费 " << (double)free_costtime / CLOCKS_PER_SEC << " s" << endl;
cout << nworks << " 个线程并发执行 alloc&dealloc " << nworks * rounds * ntimes
<< " 次,共花费 " << ((double)malloc_costtime + (free_costtime)) / CLOCKS_PER_SEC << " s" << endl;
}
内存分配器 concurrentalloc
concurrentalloc.h
#ifndef __APOLLO_CONCURRENT_ALLOC_H__
#define __APOLLO_CONCURRENT_ALLOC_H__
#include "pagecache.h"
#include "threadcache.h"
#include <cassert>
void* operator new(size_t size);
void* operator new[](size_t size);
void operator delete(void* ptr) noexcept;
void operator delete[](void* ptr) noexcept;
namespace apollo {
static TLS ThreadCache* s_tlsthreadcache = nullptr;
static void* concurrentAlloc(size_t size) {
if (size > kMaxBytes) // 大于256KB的内存申请
{
// 计算出对齐后需要申请的页数
size_t alignsize = AlignHelper::roundUp(size);
size_t npage = alignsize >> kPageShift;
// 向PageCache申请npage页的span
Span* span = nullptr;
{
PageCache* cache = PageCache::getInstance();
std::lock_guard<std::mutex> lock(cache->mtx_);
span = cache->newSpan(npage);
span->used_ = true;
span->blockSize_ = size;
}
assert(span);
void* ptr = (void*)(span->pageId_ << kPageShift);
return ptr;
} else {
// 通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (s_tlsthreadcache == nullptr) {
static std::mutex mtx;
static ObjectPool<ThreadCache> tcpool;
{
std::lock_guard<std::mutex> lock(mtx);
s_tlsthreadcache = tcpool.alloc();
}
}
return s_tlsthreadcache->allocate(size);
}
}
static void concurrentFree(void* ptr) {
if (ptr != nullptr) {
Span* span = PageCache::getInstance()->mapToSpan(ptr);
size_t size = span->blockSize_;
if (size > kMaxBytes) // 大于256KB的内存释放
{
PageCache* cache = PageCache::getInstance();
std::lock_guard<std::mutex> lock(cache->mtx_);
cache->revertSpanToPageCache(span);
} else {
assert(s_tlsthreadcache);
s_tlsthreadcache->deallocate(ptr, size);
}
}
}
} // namespace apollo
#endif // !__APOLLO_CONCURRENT_ALLOC_H__
concurrentalloc.cc
#include "concurrentalloc.h"
using namespace apollo;
void* operator new(size_t size) {
#ifdef TCMALLOC
return apollo::concurrentAlloc(size);
#else
return malloc(size);
#endif
}
void* operator new[](size_t size) {
#ifdef TCMALLOC
return apollo::concurrentAlloc(size);
#else
return malloc(size);
#endif
}
void operator delete(void* ptr) noexcept {
#ifdef TCMALLOC
return apollo::concurrentFree(ptr);
#else
return free(ptr);
#endif
}
void operator delete[](void* ptr) noexcept {
#ifdef TCMALLOC
return apollo::concurrentFree(ptr);
#else
return free(ptr);
#endif
}
基数数 radixtree.h
#ifndef __APOLLO_RADIX_TREE_H__
#define __APOLLO_RADIX_TREE_H__
#include "utilis.h"
#include <cassert>
#include <cstring>
namespace apollo {
template <int BITS>
class RadixTree {
public:
using idx_t = uintptr_t;
private:
static const int kInteriorBits = (BITS + 2) / 3; // 第一、二层对应页号的比特位个数
static const int kInteriorLength = 1 << kInteriorBits; // 第一、二层存储元素的个数
static const int kLeafBits = BITS - 2 * kInteriorBits; // 第三层对应页号的比特位个数
static const int kLeafLength = 1 << kLeafBits; // 第三层存储元素的个数
struct Node {
Node* ptrs_[kInteriorLength];
};
struct Leaf {
void* values_[kLeafLength];
};
Node* newNode() {
static ObjectPool<Node> node_pool;
Node* res = node_pool.alloc();
if (res != nullptr) {
memset(res, 0, sizeof(*res));
}
return res;
}
Node* root_;
public:
explicit RadixTree() {
root_ = newNode();
}
void* get(idx_t idx) const {
const idx_t idx_first = idx >> (kLeafBits + kInteriorBits); // 第一层对应的下标
const idx_t idx_second = (idx >> kLeafBits) & (kInteriorLength - 1); // 第二层对应的下标
const idx_t idx_third = idx & (kLeafLength - 1); // 第三层对应的下标
// 页号超出范围 或映射该页号的空间未开辟
if ((idx >> BITS) > 0 || root_->ptrs_[idx_first] == nullptr
|| root_->ptrs_[idx_first]->ptrs_[idx_second] == nullptr) {
return nullptr;
}
return reinterpret_cast<Leaf*>(root_->ptrs_[idx_first]->ptrs_[idx_second])->values_[idx_third];
}
void set(idx_t idx, void* ptr) {
assert(idx >> BITS == 0);
const idx_t idx_first = idx >> (kLeafBits + kInteriorBits); // 第一层对应的下标
const idx_t idx_second = (idx >> kLeafBits) & (kInteriorLength - 1); // 第二层对应的下标
const idx_t idx_third = idx & (kLeafLength - 1); // 第三层对应的下标
ensure(idx, 1); // 确保映射第_idx页页号的空间是开辟好了的
// 建立该页号与对应span的映射
reinterpret_cast<Leaf*>(root_->ptrs_[idx_first]->ptrs_[idx_second])->values_[idx_third] = ptr;
}
private:
/**
* @brief 确保[_start, _start+_n-1]页号的空间是开辟好的
*/
bool ensure(idx_t start, size_t n) {
for (idx_t key = start; key <= start + n - 1;) {
const idx_t idx_first = key >> (kLeafBits + kInteriorBits); // 第一层对应的下标
const idx_t idx_second = (key >> kLeafBits) & (kInteriorLength - 1); // 第二层对应的下标
// 下标值超出范围
if (idx_first >= kInteriorLength || idx_second >= kInteriorLength)
return false;
if (root_->ptrs_[idx_first] == nullptr) // 第一层idx_first下标指向的空间未开辟
{
// 开辟对应空间
Node* n = newNode();
if (n == nullptr)
return false;
root_->ptrs_[idx_first] = n;
}
if (root_->ptrs_[idx_first]->ptrs_[idx_second] == nullptr) // 第二层idx_second下标指向的空间未开辟
{
// 开辟对应空间
static ObjectPool<Leaf> leaf_pool;
Leaf* leaf = leaf_pool.alloc();
if (leaf == nullptr)
return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs_[idx_first]->ptrs_[idx_second] = reinterpret_cast<Node*>(leaf);
}
key = ((key >> kLeafBits) + 1) << kLeafBits; // 继续后续检查
}
return true;
}
};
#ifdef _WIN64
using PageMap = RadixTree<64 - kPageShift>;
#elif _WIN32
using PageMap = RadixTree<32 - kPageShift>;
#elif __linux__
using PageMap = RadixTree<64 - kPageShift>;
#endif
} // namespace apollo
#endif // !__APOLLO_RADIX_TREE_H__
线程缓存 threadcache
threadcache.h
#ifndef __APOLLO_THREAD_CACHE_H__
#define __APOLLO_THREAD_CACHE_H__
#include "utilis.h"
namespace apollo {
/**
* @brief 线程缓存对象
* @details 线程独享,无需锁变量。其允许申请的最大内存为256KB
*/
class ThreadCache {
public:
ThreadCache() = default;
ThreadCache(const ThreadCache&) = delete;
ThreadCache& operator=(const ThreadCache&) = delete;
/**
* @brief 申请内存对象
*/
void* allocate(size_t size);
/**
* @brief 释放内存对象
*
* @param _ptr 要释放的内存对象
* @param _size 对象的大小
*/
void deallocate(void* ptr, size_t size);
private:
/**
* @brief 向CentralCache申请对象
*
* @param _index 对象在哈希桶中的索引
* @param _size 对象的大小
*/
void* fetchFromCentralCache(size_t index, size_t size);
/**
* @brief 将FreeList对象归还给CentralCache
*
* @param _list 要归还的自由链表
* @param _size 自由链表中内存块的大小
*/
void revertListToCentralCache(FreeList& list, size_t size);
private:
FreeList freelists_[kBucketSize];
};
} // namespace apollo
#endif // !__APOLLO_THREAD_CACHE_H__
threadcache.cc
#include "threadcache.h"
#include "centralcache.h"
using namespace apollo;
void* ThreadCache::allocate(size_t size) {
assert(size <= kMaxBytes);
size_t align = AlignHelper::roundUp(size);
size_t index = AlignHelper::index(size);
if (!freelists_[index].empty()) {
return freelists_[index].pop();
} else {
return fetchFromCentralCache(index, align);
}
}
void ThreadCache::deallocate(void* ptr, size_t size) {
assert(ptr && size <= kMaxBytes);
// 找出对应的自由链表桶将对象插入
size_t index = AlignHelper::index(size);
freelists_[index].push(ptr);
// 当自由链表长度大于一次批量申请的对象个数时就开始还一段list给CentralCache
if (freelists_[index].size() >= freelists_[index].maxSize()) {
revertListToCentralCache(freelists_[index], size);
}
}
void* ThreadCache::fetchFromCentralCache(size_t index, size_t size) {
// 慢开始反馈调节算法
// 刚开始不会一次向CentralCache申请太多对象,因为太多了会造成内存浪费
// 如果不断有_size大小的需求,那么fetchnum就会不断增长 直到达到上限
size_t limit = freelists_[index].maxSize();
size_t fetchnum = std::min(limit, AlignHelper::numMoveSize(size));
if (fetchnum == limit) {
freelists_[index].setMaxSize(limit + 1);
}
void * start = nullptr, *end = nullptr;
size_t actualnum = CentralCache::getInstance()->fetchRangeObj(start, end, fetchnum, size);
assert(actualnum >= 1); // 至少有一个对象
if (actualnum == 1) // 申请到对象的个数是一个,则直接将这一个对象返回即可
{
assert(start == end);
return start;
} else // 申请到对象的个数是多个,还需要将剩下的对象挂到ThreadCache中对应的哈希桶中
{
freelists_[index].pushRange(nextObj(start), end, actualnum - 1);
return start;
}
}
void ThreadCache::revertListToCentralCache(FreeList& list, size_t size) {
// void* start = nullptr, * end = nullptr;
//// 从list中取出一次批量个数的对象
//_list.PopRange(start, end, _list.MaxSize());
void* start = list.clear();
// 将取出的对象还给CentralCache中对应的Span
CentralCache::getInstance()->releaseList(start, size);
}
中心缓存 centralcache.h
centralcache.h
#ifndef __APOLLO_CENTRAL_CACHE_H__
#define __APOLLO_CENTRAL_CACHE_H__
#include "utilis.h"
namespace apollo {
/**
* @brief 中心缓存对象
* @details 线程共享,需要桶锁,内部结构与ThreadCache类似
*/
class CentralCache {
public:
static CentralCache* getInstance() {
static CentralCache cache;
return &cache;
}
/**
* @brief 从CentralCache中获取一定数量的对象
*
* @param _start 传出参数,连续对象的起始地址
* @param _end 传出参数,连续对象的末尾地址
* @param _cnt 连续对象的个数
* @param _size 单个对象的大小
* @return 返回实际获取到的对象个数
*/
size_t fetchRangeObj(void*& start, void*& end, size_t cnt, size_t size);
/**
* @brief 将一定数量的自由链表对象归还给对应的Span
*
* @param _start 连续对象的起始地址
* @param _size 单个对象的大小
*/
void releaseList(void* start, size_t size);
private:
/**
* @brief 获取一个非空的Span对象
*
* @param _list 从_list中获取非空对象
* @param _size 对象的大小
* @param _bucket_lock _list对象的桶锁
* @return 返回非空的Span对象
*/
Span* getOneSpan(SpanList& list, size_t size, std::unique_lock<std::mutex>& bucket_lock);
private:
CentralCache() = default;
CentralCache(const CentralCache&) = delete;
CentralCache& operator=(const CentralCache&) = delete;
private:
SpanList spanlists_[kBucketSize];
};
} // namespace apollo
#endif // !__APOLLO_CENTRAL_CACHE_H__
centralcache.cc
#include "centralcache.h"
#include "pagecache.h"
using namespace apollo;
size_t CentralCache::fetchRangeObj(void*& start, void*& end, size_t cnt, size_t size) {
size_t index = AlignHelper::index(size);
std::unique_lock<std::mutex> lock(spanlists_[index].mtx_); // 加锁
// 在对应的哈希桶中获取一个非空的span
Span* span = getOneSpan(spanlists_[index], size, lock);
// span不为空且其自由链表也不为空
assert(span && span->freelist_);
// 从span中获取n个对象 如果不够n个,有多少拿多少
start = span->freelist_;
end = span->freelist_;
size_t actualnum = 1;
while (nextObj(end) && (cnt - 1)) {
end = nextObj(end);
actualnum++;
cnt--;
}
span->freelist_ = nextObj(end); // 取完后剩下的对象继续放到自由链表
nextObj(end) = nullptr; // 取出的一段链表的表尾置空
span->useCnt_ += actualnum; // 更新被分配给ThreadCache的计数
return actualnum;
}
void CentralCache::releaseList(void* start, size_t size) {
size_t index = AlignHelper::index(size);
std::unique_lock<std::mutex> bucket_lock(spanlists_[index].mtx_); // 加锁
while (start) {
void* next = nextObj(start); // 记录下一个
Span* span = PageCache::getInstance()->mapToSpan(start);
// 将对象头插到Span的自由链表
nextObj(start) = span->freelist_;
span->freelist_ = start;
span->useCnt_--; // 更新被分配给ThreadCache的计数
if (span->useCnt_ == 0) // 说明这个span分配出去的对象全部都回来了
{
// 此时这个span就可以再回收给PageCache,PageCache可以再尝试去做前后页的合并
spanlists_[index].erase(span);
span->freelist_ = nullptr; // 自由链表置空
span->next_ = nullptr;
span->prev_ = nullptr;
// 释放span给PageCache时,使用PageCache的锁就可以了,这时把桶锁解掉
bucket_lock.unlock(); // 解桶锁
{
PageCache* cache = PageCache::getInstance();
std::lock_guard<std::mutex> lock(cache->mtx_);
cache->revertSpanToPageCache(span);
}
bucket_lock.lock(); // 加桶锁
}
start = next;
}
}
Span* CentralCache::getOneSpan(SpanList& list, size_t size, std::unique_lock<std::mutex>& bucket_lock) {
// 先在_list中寻找非空的span 如果有则直接返回
Span* it = list.begin();
while (it != list.end()) {
if (it->freelist_ != nullptr) {
return it;
} else {
it = it->next_;
}
}
bucket_lock.unlock(); // 解桶锁
Span* span = nullptr;
{
std::lock_guard<std::mutex> lock(PageCache::getInstance()->mtx_);
// 如果_list中没有非空的span,只能向PageCache申请
span = PageCache::getInstance()->newSpan(AlignHelper::numMovePage(size));
span->used_ = true;
span->blockSize_ = size;
}
assert(span);
// 计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->pageId_ << kPageShift);
size_t bytes = span->cnt_ << kPageShift;
// 把大块内存切成size大小的对象链接起来
char* end = start + bytes;
// 先切一块下来去做尾,方便尾插
span->freelist_ = start;
start += size;
void* tail = span->freelist_;
// 尾插
while (start < end) {
nextObj(tail) = start;
tail = nextObj(tail);
start += size;
}
nextObj(tail) = nullptr; // 尾的指向置空
bucket_lock.lock(); // 加桶锁
// 将切好的span头插到_list
list.pushFront(span);
return span;
}
页缓存 pagecache
pagecache.h
#ifndef __APOLLO_PAGE_CACHE_H__
#define __APOLLO_PAGE_CACHE_H__
#include "utilis.h"
#include "radixtree.h"
#include <mutex>
// #include <unordered_map>
namespace apollo {
/**
* @brief 页缓存对象
* @details 线程共享,需要对象锁
*/
class PageCache {
public:
static PageCache* getInstance() {
static PageCache cache;
return &cache;
}
/**
* @brief 获取一个_npage页的Span对象
*/
Span* newSpan(size_t npage);
/**
* @brief 获取对象到Span的映射
*/
Span* mapToSpan(void* obj);
/**
* @brief 释放空闲的Span到PageCache 并合并相邻的Span
*/
void revertSpanToPageCache(Span* span);
private:
PageCache() = default;
PageCache(const PageCache&) = delete;
PageCache& operator=(const PageCache&) = delete;
public:
std::mutex mtx_;
private:
SpanList spanlists_[kPageBucketSize];
// std::unordered_map<page_t, Span*> hash_;
PageMap hash_;
ObjectPool<Span> span_pool_;
};
} // namespace apollo
#endif // !__APOLLO_PAGE_CACHE_H__
pagecache.cc
#include "pagecache.h"
#include <cassert>
using namespace apollo;
Span* PageCache::newSpan(size_t npage) {
assert(npage > 0);
if (npage > kPageBucketSize - 1) // 大于128页直接找堆申请
{
void* ptr = systemAlloc(npage);
Span* span = span_pool_.alloc();
span->pageId_ = (page_t)ptr >> kPageShift;
span->cnt_ = npage;
// 建立页号与span之间的映射
// hash_[span->page_id_] = span;
hash_.set(span->pageId_, span);
return span;
}
// 先检查第_npage个桶里面有没有span 有则直接返回
if (!spanlists_[npage].empty()) {
Span* res = spanlists_[npage].popFront();
// 建立页号与span的映射,方便CentralCache回收小块内存时查找对应的Span
for (page_t i = 0; i < res->cnt_; i++) {
// hash_[res->page_id_ + i] = res;
hash_.set(res->pageId_ + i, res);
}
return res;
}
// 如果没有则检查一下后面的桶里面有没有span 如果有可以将其进行切分
for (size_t i = npage + 1; i < kPageBucketSize; i++) {
if (!spanlists_[i].empty()) {
Span* nSpan = spanlists_[i].popFront();
Span* kSpan = span_pool_.alloc();
// 在nSpan的头部切k页下来
kSpan->pageId_ = nSpan->pageId_;
kSpan->cnt_ = npage;
nSpan->pageId_ += npage;
nSpan->cnt_ -= npage;
// 将剩下的挂到对应映射的位置
spanlists_[nSpan->cnt_].pushFront(nSpan);
// 存储nSpan的首尾页号与nSpan之间的映射,方便PageCache合并span时进行前后页的查找
// hash_[nSpan->page_id_] = nSpan;
// hash_[nSpan->page_id_ + nSpan->cnt_ - 1] = nSpan;
hash_.set(nSpan->pageId_, nSpan);
hash_.set(nSpan->pageId_ + nSpan->cnt_ - 1, nSpan);
// 建立页号与span的映射,方便CentralCache回收小块内存时查找对应的Span
for (page_t i = 0; i < kSpan->cnt_; i++) {
// hash_[kSpan->page_id_ + i] = kSpan;
hash_.set(kSpan->pageId_ + i, kSpan);
}
return kSpan;
}
}
// 走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
Span* largespan = span_pool_.alloc();
void* ptr = systemAlloc(kPageBucketSize - 1);
largespan->pageId_ = (page_t)ptr >> kPageShift;
largespan->cnt_ = kPageBucketSize - 1;
spanlists_[largespan->cnt_].pushFront(largespan);
// 尽量避免代码重复,递归调用自己
return newSpan(npage);
}
Span* PageCache::mapToSpan(void* obj) {
page_t id = (page_t)obj >> kPageShift; // 获取页号
// std::lock_guard<std::mutex> lock(mtx_);
// auto ret = hash_.find(id);
// if (ret != hash_.end())
//{
// return ret->second;
// }
// else
//{
// assert(false);
// return nullptr;
// }
Span* ret = (Span*)hash_.get(id);
assert(ret);
return ret;
}
void PageCache::revertSpanToPageCache(Span* span) {
if (span->cnt_ > kPageBucketSize - 1) // 大于128页直接释放给堆
{
void* ptr = (void*)(span->pageId_ << kPageShift);
systemFree(ptr);
span_pool_.free(span);
return;
}
// 对span的前后页,尝试进行合并,缓解内存碎片问题
// 向前合并
while (1) {
page_t prev_id = span->pageId_ - 1;
// auto ret = hash_.find(prev_id);
// 前面的页号没有(还未向系统申请),停止向前合并
// if (ret == hash_.end())
//{
// break;
// }
Span* ret = (Span*)hash_.get(prev_id);
if (ret == nullptr)
break;
// 前面的页号对应的span正在被使用,停止向前合并
// Span* prev_span = ret->second;
Span* prev_span = ret;
if (prev_span->used_ == true) {
break;
}
// 合并出超过128页的span无法进行管理,停止向前合并
if (prev_span->cnt_ + span->cnt_ > kPageBucketSize - 1) {
break;
}
// 进行向前合并
span->pageId_ = prev_span->pageId_;
span->cnt_ += prev_span->cnt_;
// 将prevSpan从对应的双链表中移除
spanlists_[prev_span->cnt_].erase(prev_span);
span_pool_.free(prev_span);
}
// 向后合并
while (1) {
page_t next_id = span->pageId_ + span->cnt_;
// auto ret = hash_.find(next_id);
//// 后面的页号没有(还未向系统申请),停止向后合并
// if (ret == hash_.end())
//{
// break;
// }
Span* ret = (Span*)hash_.get(next_id);
if (ret == nullptr)
break;
// 后面的页号对应的span正在被使用,停止向后合并
// Span* next_span = ret->second;
Span* next_span = ret;
if (next_span->used_ == true) {
break;
}
// 合并出超过128页的span无法进行管理,停止向后合并
if (next_span->cnt_ + span->cnt_ > kPageBucketSize - 1) {
break;
}
// 进行向后合并
span->cnt_ += next_span->cnt_;
// 将nextSpan从对应的双链表中移除
spanlists_[next_span->cnt_].erase(next_span);
span_pool_.free(next_span);
}
// 将合并后的span挂到对应的双链表当中
spanlists_[span->cnt_].pushFront(span);
// 建立该span与其首尾页的映射
// hash_[span->page_id_] = span;
// hash_[span->page_id_ + span->cnt_ - 1] = span;
hash_.set(span->pageId_, span);
hash_.set(span->pageId_ + span->cnt_ - 1, span);
// 将该span设置为未被使用的状态
span->used_ = false;
}