Skip to main content

9-基于调表实现的一致性哈希算法

跳表

#pragma once
#include <iostream>
#include <cstdlib>
#include <cmath>
#include <cstring>
#include <mutex>
#include <fstream>
#include <cstdlib>
#include <memory>
#include <vector>
#define STORE_FILE "store/dumpFile"

static std::mutex mtx; // mutex for critical section
static std::string delimiter = ":";

//Class template to implement node
template<typename K, typename V>
class Node {

public:

Node() {}

Node(K k, V v, int);

~Node();

K get_key() const;

V get_value() const;

void set_value(V);

// Linear array to hold pointers to next node of different level
std::vector<std::shared_ptr<Node<K, V>>> forward;
//std::shared_ptr<Node<K, V>>*forward;

int node_level;

private:
K key;
V value;
};

template<typename K, typename V>
Node<K, V>::Node(const K k, const V v, int level) {
this->key = k;
this->value = v;
this->node_level = level;

// level + 1, because array index is from 0 - level
//this->forward = new Node<K, V>*[level+1];
this->forward.resize(level+1);

// Fill forward array with 0(nullptr)
//memset(this->forward, 0, sizeof(Node<K, V>*)*(level+1));
};

template<typename K, typename V>
Node<K, V>::~Node() {
//delete []forward;
};

template<typename K, typename V>
K Node<K, V>::get_key() const {
return key;
};

template<typename K, typename V>
V Node<K, V>::get_value() const {
return value;
};
template<typename K, typename V>
void Node<K, V>::set_value(V value) {
this->value=value;
};

// Class template for Skip list
template <typename K, typename V>
class SkipList {

public:
SkipList(int);
~SkipList();
int get_random_level();
std::shared_ptr<Node<K, V>> create_node(K, V, int);
int insert_element(K, V);
void display_list();
V search_element(K);
void delete_element(K);
void dump_file();
void load_file();
int size();

private:
void get_key_value_from_string(const std::string& str, std::string key, std::string value);
bool is_valid_string(const std::string& str);

private:
// Maximum level of the skip list
int _max_level;

// current level of skip list
int _skip_list_level;

// pointer to header node
//std::shared_ptr<Node<K, V>>_header;
std::shared_ptr<Node<K, V>> _header;

// file operator
std::ofstream _file_writer;
std::ifstream _file_reader;

// skiplist current element count
int _element_count;
};

// create new node
template<typename K, typename V>
std::shared_ptr<Node<K, V>> SkipList<K, V>::create_node(const K k, const V v, int level) {
//std::shared_ptr<Node<K, V>>n = new Node<K, V>(k, v, level);
return std::make_shared<Node<K, V>>(k, v, level);
}

// Insert given key and value in skip list
// return 1 means element exists
// return 0 means insert successfully
/*
+------------+
| insert 50 |
+------------+
level 4 +-->1+ 100
|
| insert +----+
level 3 1+-------->10+---------------> | 50 | 70 100
| |
| |
level 2 1 10 30 | 50 | 70 100
| |
| |
level 1 1 4 10 30 | 50 | 70 100
| |
| |
level 0 1 4 9 10 30 40 | 50 | 60 70 100
+----+

*/
template<typename K, typename V>
int SkipList<K, V>::insert_element(const K key, const V value) {

mtx.lock();
std::shared_ptr<Node<K, V>> current = this->_header;


// create update array and initialize it
// update is array which put node that the node->forward[i] should be operated later
//std::shared_ptr<Node<K, V>>update[_max_level+1];
std::vector<std::shared_ptr<Node<K, V>>> update(_max_level+1);
//memset(update, 0, sizeof(Node<K, V>*)*(_max_level+1));

// start form highest level of skip list
for(int i = _skip_list_level; i >= 0; i--) {
while(current->forward[i] != nullptr && current->forward[i]->get_key() < key) {
current = current->forward[i];
}
update[i] = current;
}

// reached level 0 and forward pointer to right node, which is desired to insert key.
current = current->forward[0];

// if current node have key equal to searched key, we get it
if (current != nullptr && current->get_key() == key) {
mtx.unlock();
return 1;
}

// if current is nullptr that means we have reached to end of the level
// if current's key is not equal to key that means we have to insert node between update[0] and current node
if (current == nullptr || current->get_key() != key ) {

// Generate a random level for node
int random_level = get_random_level();

// If random level is greater thar skip list's current level, initialize update value with pointer to header
if (random_level > _skip_list_level) {
for (int i = _skip_list_level+1; i < random_level+1; i++) {
update[i] = _header;
}
_skip_list_level = random_level;
}

// create new node with random level generated
std::shared_ptr<Node<K, V>> inserted_node = create_node(key, value, random_level);

// insert node
for (int i = 0; i <= random_level; i++) {
inserted_node->forward[i] = update[i]->forward[i];
update[i]->forward[i] = inserted_node;
}
_element_count ++;
}
mtx.unlock();
return 0;
}

// Display skip list
template<typename K, typename V>
void SkipList<K, V>::display_list() {

std::cout << "\n*****Skip List*****"<<"\n";
for (int i = 0; i <= _skip_list_level; i++) {
std::shared_ptr<Node<K, V>>node = this->_header->forward[i];
std::cout << "Level " << i << ": ";
while (node != nullptr) {
std::cout << node->get_key() << ":" << node->get_value() << ";";
node = node->forward[i];
}
std::cout << std::endl;
}
}

// Dump data in memory to file
template<typename K, typename V>
void SkipList<K, V>::dump_file() {

std::cout << "dump_file-----------------" << std::endl;
_file_writer.open(STORE_FILE);
std::shared_ptr<Node<K, V>> node = this->_header->forward[0];

while (node != nullptr) {
_file_writer << node->get_key() << ":" << node->get_value() << "\n";
std::cout << node->get_key() << ":" << node->get_value() << ";\n";
node = node->forward[0];
}

_file_writer.flush();
_file_writer.close();
return ;
}

// Load data from disk
template<typename K, typename V>
void SkipList<K, V>::load_file() {

_file_reader.open(STORE_FILE);
std::cout << "load_file-----------------" << std::endl;
std::string line;
std::string key;
std::string value;
while (getline(_file_reader, line)) {
get_key_value_from_string(line, key, value);
if (key.empty() || value.empty()) {
continue;
}
insert_element(key, value);
std::cout << "key:" << key << "value:" << value << std::endl;
}
_file_reader.close();
}

// Get current SkipList size
template<typename K, typename V>
int SkipList<K, V>::size() {
return _element_count;
}

template<typename K, typename V>
void SkipList<K, V>::get_key_value_from_string(const std::string& str, std::string key, std::string value) {

if(!is_valid_string(str)) {
return;
}
key = str.substr(0, str.find(delimiter));
value = str.substr(str.find(delimiter)+1, str.length());
}

template<typename K, typename V>
bool SkipList<K, V>::is_valid_string(const std::string& str) {

if (str.empty()) {
return false;
}
if (str.find(delimiter) == std::string::npos) {
return false;
}
return true;
}

// Delete element from skip list
template<typename K, typename V>
void SkipList<K, V>::delete_element(K key) {

mtx.lock();
std::shared_ptr<Node<K, V>>current = this->_header;
//std::shared_ptr<Node<K, V>>update[_max_level+1];
std::vector<std::shared_ptr<Node<K, V>>> update(_max_level+1);

//memset(update, 0, sizeof(Node<K, V>*)*(_max_level+1));

// start from highest level of skip list
for (int i = _skip_list_level; i >= 0; i--) {
while (current->forward[i] !=nullptr && current->forward[i]->get_key() < key) {
current = current->forward[i];
}
update[i] = current;
}

current = current->forward[0];
if (current != nullptr && current->get_key() == key) {

// start for lowest level and delete the current node of each level
for (int i = 0; i <= _skip_list_level; i++) {

// if at level i, next node is not target node, break the loop.
if (update[i]->forward[i] != current)
break;

update[i]->forward[i] = current->forward[i];
}

// Remove levels which have no elements
while (_skip_list_level > 0 && _header->forward[_skip_list_level] == 0) {
_skip_list_level --;
}

_element_count --;
}
mtx.unlock();
return;
}

// Search for element in skip list
/*
+------------+
| select 60 |
+------------+
level 4 +-->1+ 100
|
|
level 3 1+-------->10+------------------>50+ 70 100
|
|
level 2 1 10 30 50| 70 100
|
|
level 1 1 4 10 30 50| 70 100
|
|
level 0 1 4 9 10 30 40 50+-->60 70 100
*/
template<typename K, typename V>
V SkipList<K, V>::search_element(K key) {

std::shared_ptr<Node<K, V>>current = _header;

// start from highest level of skip list
for (int i = _skip_list_level; i >= 0; i--) {
while (current->forward[i] && current->forward[i]->get_key() < key) {
current = current->forward[i];
}
}

//reached level 0 and advance pointer to right node, which we search
current = current->forward[0];

// if current node have key equal to searched key, we get it
if (current and current->get_key() == key) {
return current->get_value();
}

if(current) return current->get_value();
if(!this->_header->forward[0])
{
std::cout << "no data!!"<<std::endl;
exit(1);
}
return this->_header->forward[0]->get_value();
}


// construct skip list
template<typename K, typename V>
SkipList<K, V>::SkipList(int max_level) {

this->_max_level = max_level;
this->_skip_list_level = 0;
this->_element_count = 0;

// create header node and initialize key and value to nullptr
K k;
V v;
this->_header = std::make_shared<Node<K, V>>(k, v, _max_level);
};

template<typename K, typename V>
SkipList<K, V>::~SkipList() {

if (_file_writer.is_open()) {
_file_writer.close();
}
if (_file_reader.is_open()) {
_file_reader.close();
}
}

template<typename K, typename V>
int SkipList<K, V>::get_random_level(){

int k = 1;
while (rand() % 2) {
k++;
}
k = (k < _max_level) ? k : _max_level;
return k;
};
// vim: et tw=100 ts=4 sw=4 cc=120

一致性哈希

consistent_hash.h

#pragma once
#include "string"
#include "SkipList.h"
#include "unordered_map"
#include "unordered_set"
#include "vector"
#include "iostream"

class consistent_hash {
public:
unsigned int real_node_sum; // 真实节点数量
unsigned int virtual_node_sum; //虚拟节点数量
std::unordered_map<std::string, std::unordered_set<unsigned int>> real_node_map; //真实节点对应的虚拟节点的hash值
std::unique_ptr<SkipList<unsigned int, std::string>> consistent_hash_list; // 哈希环
consistent_hash();
~consistent_hash();
std::string search_by_name(std::string name);
//添加真实节点 -- 第二个参数为虚拟节点个数
void add_real_node(std::string ip, unsigned int virtual_node_num);
//删除真实节点
void drop_real_node(std::string ip);
//打印所有节点
void print();
};


consistent_hash.cpp

#include "consistent_hash.h"

int HASH_LEN = 32;


unsigned int my_getMurMurHash(const void *key, int len) {
const unsigned int m = 0x5bd1e995;
const int r = 24;
const int seed = 97;
unsigned int h = seed ^len;
// Mix 4 bytes at a time into the hash
const unsigned char *data = (const unsigned char *) key;
while (len >= 4) {
unsigned int k = *(unsigned int *) data;
k *= m;
k ^= k >> r;
k *= m;
h *= m;
h ^= k;

data += 4;
len -= 4;
}
// Handle the last few bytes of the input array
switch (len) {
case 3:
h ^= data[2] << 16;
case 2:
h ^= data[1] << 8;
case 1:
h ^= data[0];
h *= m;
};
// Do a few final mixes of the hash to ensure the last few
// bytes are well-incorporated.
h ^= h >> 13;
h *= m;
h ^= h >> 15;
return h;
}


consistent_hash::consistent_hash()
{
real_node_sum = 0; // 真实节点数量
virtual_node_sum = 0; //虚拟节点数量
this->consistent_hash_list = std::make_unique<SkipList<unsigned int, std::string>>(10);
}


std::string consistent_hash::search_by_name(std::string name)
{
unsigned int s = my_getMurMurHash(name.c_str(), HASH_LEN);
std::string v_node = this->consistent_hash_list->search_element(s);

return v_node;
}


//添加真实节点 -- 第二个参数为虚拟节点个数
void consistent_hash::add_real_node(std::string ip, unsigned int num)
{
std::unordered_set<unsigned int> umap_hash;
for(unsigned int i = 0; i < num ; i++ )
{
std::string ip_port = ip + std::to_string(i);
unsigned int hash_value = my_getMurMurHash(ip_port.c_str(), HASH_LEN);
umap_hash.insert(hash_value);
consistent_hash_list->insert_element(hash_value,ip);
}
real_node_map[ip] = umap_hash;
real_node_sum++;
this->virtual_node_sum+=num;
}


//删除真实节点
void consistent_hash::drop_real_node(std::string ip)
{
unsigned int val_count = 0;
if(real_node_map.find(ip) != real_node_map.end())
{
val_count = real_node_map[ip].size();
for(unsigned int value: real_node_map[ip])
{
consistent_hash_list->delete_element(value);
}
real_node_map.erase(ip);
real_node_sum--;
virtual_node_sum-=val_count;
}
}

//打印所有节点
void consistent_hash::print()
{
std::cout << "True Node num: " << real_node_map.size() << std::endl;
for(auto x: real_node_map)
{
std::cout << x.first <<" The nums of virtual: " << x.second.size()<<std::endl;
}
std::cout << "All Node num: " << virtual_node_sum << std::endl;
std::cout << "<<<<<<<<<<<<<<<<<<<<<<<end>>>>>>>>>>>>>>>>>>>>>>>" << std::endl;
}

consistent_hash::~consistent_hash()
{

}

测试函数

//
// Created by li on 2023-05-21.
//
//#include "SkipList.h"
#include "iostream"
#include "string"
#include "consistent_hash.h"
using namespace std;


int main() {
consistent_hash hash;
hash.add_real_node("192.168.2.136", 10);
hash.add_real_node("192.168.2.137", 10);
hash.add_real_node("192.168.2.138", 10);
hash.add_real_node("192.168.2.139", 10);
hash.print();
for (int i = 0; i < 20; ++i) {
string p = "stringnames";
string res = hash.search_by_name(p+ to_string(i));
cout << p+ to_string(i) << "->"<< res << endl;
}
hash.drop_real_node("192.168.2.136");
hash.print();
for (int i = 0; i < 20; ++i) {
string p = "stringnames";
string res = hash.search_by_name(p+ to_string(i));
cout << p+ to_string(i) << "->"<< res << endl;
}
return 0;
}