实现 memcached 客户端:TCP连接池一致性哈希自定义协议
Posted Node Python Go全栈开发
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实现 memcached 客户端:TCP连接池一致性哈希自定义协议相关的知识,希望对你有一定的参考价值。
const HashRing = require('hashring');
// 输入集群地址构造 hash ring
const ring = new HashRing(['127.0.0.1:11211', '127.0.0.2:11211']);
// 输入 key 获取指定节点
const host = ring.get(key);
02
—
TCP 编程
const net = require('net');
const socket = new net.Socket();
socket.connect({
host: host, // 目标主机
port: port, // 目标端口
// localAddress: localAddress, // 本地地址
// localPort: localPort, // 本地端口
});
socket.setKeepAlive(true); // 保活
// 连接相关
socket.on('connect', () => {
console.log(`socket connected`);
});
socket.on('error', error => {
console.log(`socket error: ${error}`);
});
socket.on('close', hadError => {
console.log(`socket closed, transmission error: ${hadError}`);
});
socket.on('data', data => {
// 接受数据
});
socket.write(data); // 发送数据
03
—
连接池
-
建立连接,将连接放入池中。 -
需要使用连接时(进行数据收发),从连接池中取出连接。 -
连接使用完毕后,将连接放回到池中。 -
其它。
const net = require('net');
const genericPool = require('generic-pool');
// 自定义创建连接池的函数
function _buildPool(remote_server) {
const factory = {
create: function () {
return new Promise((resolve, reject) => {
const host = remote_server.split(':')[0];
const port = remote_server.split(':')[1];
const socket = new net.Socket();
socket.connect({
host: host, // 目标主机
port: port, // 目标端口
});
socket.setKeepAlive(true);
socket.on('connect', () => {
console.log(`socket connected: ${remote_server} , local: ${socket.localAddress}:${socket.localPort}`);
resolve(socket);
});
socket.on('error', error => {
console.log(`socket error: ${remote_server} , ${error}`);
reject(error);
});
socket.on('close', hadError => {
console.log(`socket closed: ${remote_server} , transmission error: ${hadError}`);
});
});
},
destroy: function (socket) {
return new Promise((resolve) => {
socket.destroy();
resolve();
});
},
validate: function (socket) { // validate socket
return new Promise((resolve) => {
if (socket.connecting || socket.destroyed || !socket.readable || !socket.writable) {
return resolve(false);
} else {
return resolve(true);
}
});
}
};
const pool = genericPool.createPool(factory, {
max: 10, // 最大连接数
min: 0, // 最小连接数
testOnBorrow: true, // 从池中取连接时进行 validate 函数验证
});
return pool;
}
// 连接池基本使用
const pool = _buildPool('127.0.0.1:11211'); // 构建连接池
const s = await pool.acquire(); // 从连接池中取连接
await pool.release(s); // 使用完成后释放连接
04
—
自定义协议
get <key>\r\n
VALUE <key> <flags> <bytes>\r\n
<data block>\r\n
// 定义一个请求方法并返回响应数据
function _request(command) {
return new Promise(async (resolve, reject) => {
try {
// ...这里省略了连接池构建相关部分
const s = await pool.acquire(); // 取连接
const bufs = [];
s.on('data', async buf => { // 监听 data 事件接受响应数据
bufs.push(buf);
const END_BUF = Buffer.from('\r\n'); // 数据接受完成的结束位
if (END_BUF.equals(buf.slice(-2))) {
s.removeAllListeners('data'); // 移除监听
try {
await pool.release(s); // 释放连接
} catch (error) { }
const data = Buffer.concat(bufs).toString();
return resolve(data);
}
});
s.write(command);
} catch (error) {
return reject(error);
}
});
}
// get
function get(key) {
return new Promise(async (resolve, reject) => {
try {
const command = `get ${key}\r\n`;
const data = await _request(key, command);
// ...响应数据的处理,注意有省略
// key not exist
if (data === 'END\r\n') {
return resolve(undefined);
}
/*
VALUE <key> <flags> <bytesLength>\r\n
<data block>\r\n
*/
const data_arr = data.split('\r\n');
const response_line = data_arr[0].split(' ');
const value_flag = response_line[2];
const value_length = Number(response_line[3]);
let value = data_arr.slice(1, -2).join('');
value = unescapeValue(value); // unescape \r\n
// ...有省略
return resolve(value);
} catch (error) {
return reject(error);
}
});
}
class Memcached {
constructor(serverLocations, options) {
this._configs = {
...{
pool: {
max: 1,
min: 0,
idle: 30000, // 30000 ms.
},
timeout: 5000, // timeout for every command, 5000 ms.
retries: 5, // max retry times for failed request.
maxWaitingClients: 10000, // maximum number of queued requests allowed
}, ...options
};
this._hashring = new HashRing(serverLocations);
this._pools = {}; // 通过 k-v 的形式存储具体的地址及它的连接池
}
_buildPool(remote_server) {
// ...
}
_request(key, command) {
// ...
}
// get
async get(key) {
// ...
}
// ... 其他方法
}
// 使用实例
const memcached = new Memcached(['127.0.0.1:11211'], {
pool: {
max: 10,
min: 0
}
});
const key = 'testkey';
const result = await memcached.get(key);
以上是关于实现 memcached 客户端:TCP连接池一致性哈希自定义协议的主要内容,如果未能解决你的问题,请参考以下文章