联邦学习实战——基于同态加密和差分隐私混合加密机制的FedAvg
Posted HERODING23
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了联邦学习实战——基于同态加密和差分隐私混合加密机制的FedAvg相关的知识,希望对你有一定的参考价值。
联邦学习实战——基于同态加密和差分隐私混合加密机制的FedAvg
前言
好久都没更新联邦学习相关内容了,这也是我更新这篇我认为非常硬核的文章的原因,这也算是实现了我在学习联邦学习半年以来的一个目标,基于混合加密机制实现联邦学习任务,这次任务使用的框架是FedAvg,在github上非常热门的联邦学习模拟实现方案,FedAvg的代码还是非常好理解的,本文的结构将主要分为三个部分,第一部分是对FedAvg代码的讲解和修改,第二部分将差分隐私机制加入到FedAvg中,包括高斯机制和拉普拉斯机制,第三部分将同态加密算法Paillier加入到第二部分中,实现基于同态加密和差分隐私混合加密机制的FedAvg,话不多说,就让我们开始吧!
1. FedAvg
联邦学习有两种更新方式,对服务器端,一种是共享梯度方式,一种是共享模型参数。共享模型参数是做了几轮梯度下降,针对共享梯度,它的一大优势是通信代价会低;同时,对整个梯度信息的保护也会更好,因此本次联邦学习实战的内容也是基于共享参数形式实现的。本次实战使用的FedAvg是github上非常热门的联邦学习实现方案,它模拟了多个参与方进行联邦学习的场景,并且支持Non-IID和独立同分布数据集,代码逻辑十分清晰,对于联邦学习初学者入门再适合不过了。
FedAvg实现的是联邦学习场景下的手写数字识别模型的训练,它的结构如下图所示,包括数据集,模型,参与方和服务器端代码,以及数据集导入的代码,除了TensorFlow版本,它还包括PyTorch版本,更便于支持gpu运行代码,本文介绍的代码部分也是基于PyTorch实现的。下面就对各个部分进行简单的介绍。
1.1 getData.py
getData.py
文件是对MNIST数据集的数据集提取和预处理,获得训练数据集和测试数据集,MNIST数据集的数据格式为-ubyte.gz,因此需要编写函数对图片数据进行提取,如下所示:
def extract_images(filename):
"""Extract the images into a 4D uint8 numpy array [index, y, x, depth]."""
print('Extracting', filename)
with gzip.open(filename) as bytestream:
magic = _read32(bytestream)
if magic != 2051:
raise ValueError(
'Invalid magic number %d in MNIST image file: %s' %
(magic, filename))
num_images = _read32(bytestream)
rows = _read32(bytestream)
cols = _read32(bytestream)
buf = bytestream.read(rows * cols * num_images)
data = np.frombuffer(buf, dtype=np.uint8)
data = data.reshape(num_images, rows, cols, 1)
return data
本质上是将比特流数据转换为[index, y, x, depth]维度的数组形式,index为下标,y和x分别表示每张图像行列像素点数目,depth是图像的通道,对于MNIST黑白图片,通道为1。接着编写图片标签提取函数,将标签提取出并转为one-hot形式,便于模型训练时计算损失函数并反向传播,代码如下:
# 标签one-hot编码
def dense_to_one_hot(labels_dense, num_classes=10):
"""Convert class labels from scalars to one-hot vectors."""
num_labels = labels_dense.shape[0]
index_offset = np.arange(num_labels) * num_classes
labels_one_hot = np.zeros((num_labels, num_classes))
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot
# 提取标签
def extract_labels(filename):
"""Extract the labels into a 1D uint8 numpy array [index]."""
print('Extracting', filename)
with gzip.open(filename) as bytestream:
magic = _read32(bytestream)
if magic != 2049:
raise ValueError(
'Invalid magic number %d in MNIST label file: %s' %
(magic, filename))
num_items = _read32(bytestream)
buf = bytestream.read(num_items)
labels = np.frombuffer(buf, dtype=np.uint8)
return dense_to_one_hot(labels)
图片和标签信息提取完成后,多次assert以验证提取图片过程没有出现问题。接着将图片展平,即每张图片28 * 28被展平为784 * 1,这里的目的是用于全连接神经网络的使用,如果只是用卷积神经网络,大可不必将其展平(在模型部分还要复原回去)。紧接着就是对数据标准化处理,如果不进行这一步,模型训练的结果可能惨不忍睹(比如预测所有图像都是7),这是由于发生了梯度爆炸现象。标准化处理后是两个数据处理方式:IID和Non-IID,熟悉联邦学习的读者应该了解这个概念,这也是联邦学习面临的一个巨大问题——数据非独立同分布。
在传统应用场景中,数据存储在中心,ML model 可以获取所有数据的整体信息,但是在联邦学习中,由于数据仅存储在本地,导致数据之间分布的不一致性,例如,美国西海岸针叶林茂盛,东海岸阔叶林分布广泛,又因为用户之间喜好和生活习惯不同,因此产生了数据分布的不一致性; 另一方面,以家庭或亲属关系的群体之间会相互影响,产生了数据分布的不独立性,以FedAvg中MNIST为例,Non-IID的数据应该是这样表现的:参与方1只有手写数字1,参与方2只有手写数字2,以此类推。在Non-IID数据场景下,模型训练会异常缓慢,效率很低,这将在之后的实验所有介绍。
FedAvg提供了IID和Non-IID两种数据处理形式,帮助用户理解实际应用场景下联邦学习的过程,Non-IID数据就是不对数据进行操作,IID数据就是将所有数据打乱,这样各个参与方数据的分布就满足独立同分布了,代码如下:
class GetDataSet(object):
def __init__(self, dataSetName, isIID):
self.name = dataSetName
self.train_data = None
self.train_label = None
self.train_data_size = None
self.test_data = None
self.test_label = None
self.test_data_size = None
self._index_in_train_epoch = 0
if self.name == 'mnist':
self.mnistDataSetConstruct(isIID)
else:
pass
def mnistDataSetConstruct(self, isIID):
data_dir = r'./data/MNIST'
# 选定图片路径
train_images_path = os.path.join(data_dir, 'train-images-idx3-ubyte.gz')
train_labels_path = os.path.join(data_dir, 'train-labels-idx1-ubyte.gz')
test_images_path = os.path.join(data_dir, 't10k-images-idx3-ubyte.gz')
test_labels_path = os.path.join(data_dir, 't10k-labels-idx1-ubyte.gz')
# 从.gz中提取图片
train_images = extract_images(train_images_path)
train_labels = extract_labels(train_labels_path)
test_images = extract_images(test_images_path)
test_labels = extract_labels(test_labels_path)
assert train_images.shape[0] == train_labels.shape[0]
assert test_images.shape[0] == test_labels.shape[0]
self.train_data_size = train_images.shape[0]
self.test_data_size = test_images.shape[0]
# mnist黑白图片通道为1
assert train_images.shape[3] == 1
assert test_images.shape[3] == 1
# 图片展平
train_images = train_images.reshape(train_images.shape[0], train_images.shape[1] * train_images.shape[2])
test_images = test_images.reshape(test_images.shape[0], test_images.shape[1] * test_images.shape[2])
# 标准化处理
train_images = train_images.astype(np.float32)
train_images = np.multiply(train_images, 1.0 / 255.0)
test_images = test_images.astype(np.float32)
test_images = np.multiply(test_images, 1.0 / 255.0)
# 是否独立同分布
if isIID:
# 打乱顺序
order = np.arange(self.train_data_size)
np.random.shuffle(order)
self.train_data = train_images[order]
self.train_label = train_labels[order]
else:
# 按照0——9顺序排列
labels = np.argmax(train_labels, axis=1)
order = np.argsort(labels)
self.train_data = train_images[order]
self.train_label = train_labels[order]
self.test_data = test_images
self.test_label = test_labels
1.2 Models.py
在Models.py
文件中,定义了执行任务所使用的模型,这里提供了两种模型,简单全连接层模型和简单卷积神经网络模型,每个模型都继承了torch的nn.Module,表明是神经网络模型,每个模型类中都定义了__init__(self)和forward()两个函数,初始化函数中定义了每个模型的组件,比如简单全连接层模型中定义了三个卷积模块,两个池化模块,两个线性模块,并在forward()函数中组装起来。代码如下:
import torch
import torch.nn as nn
import torch.nn.functional as F
'''
简单全连接模型
'''
class Mnist_2NN(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(784, 200)
self.fc2 = nn.Linear(200, 200)
self.fc3 = nn.Linear(200, 10)
def forward(self, inputs):
tensor = F.relu(self.fc1(inputs))
tensor = F.relu(self.fc2(tensor))
tensor = self.fc3(tensor)
return tensor
'''
简单卷积神经网络模型
'''
class Mnist_CNN(nn.Module):
def __init__(self):
super().__init__()
# 定义每一层模型
self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=0)
self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=0)
self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=0)
self.fc1 = nn.Linear(3*3*64, 64)
self.fc2 = nn.Linear(64, 10)
def forward(self, inputs):
# 构造模型
tensor = inputs.view(-1, 1, 28, 28)
tensor = F.relu(self.conv1(tensor))
tensor = self.pool1(tensor)
tensor = F.relu(self.conv2(tensor))
tensor = self.pool2(tensor)
tensor = F.relu(self.conv3(tensor))
tensor = tensor.view(-1, 3*3*64)
tensor = F.relu(self.fc1(tensor))
tensor = self.fc2(tensor)
return tensor
至于每一个模块中具体的参数信息,包括channel的大小,卷积核的大小,步长,padding,神经元个数的设计,就不具体细说了,请各位自行阅读,并修改成自己想用的模型。
1.3 client.py
参与方的代码都存放在client.py
文件中,里面包括了两个类的实现,一个是单个参与方类client,另一个是ClientsGroup,即参与方的集合类,首先分析一下client类,它定义了一个初始化函数,一个本地更新函数,一个本地评估函数(由于评估部分在server.py实现,这里就直接pass了,可以自定义修改)。
初始化函数输入参数有trainDataSet和dev,前者是输入到该参与方的数据集,dev是计算环境(cpu or gpu)。初始化函数中定义了训练数据,计算环境,本地参数等。
本地更新函数输入了本地迭代次数,本地BatchSize,神经网络模型,损失函数,优化器,全局参数。在本地更新函数中,首先向模型中导入全局参数,根据localBatchSize转换本地数据,接着执行localEpoch次本地更新。本地更新的过程就是普通的正向传播,梯度下降,反向传播的过程,这里就不赘述了。
import numpy as np
import torch
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader
from getData import GetDataSet
class client(object):
def __init__(self, trainDataSet, dev):
self.train_ds = trainDataSet
self.dev = dev
self.train_dl = None
self.local_parameters = None
def localUpdate(self, localEpoch, localBatchSize, Net, lossFun, opti, global_parameters):
Net.load_state_dict(global_parameters, strict=True)
self.train_dl = DataLoader(self.train_ds, batch_size=localBatchSize, shuffle=True)
for epoch in range(localEpoch):
for data, label in self.train_dl:
data, label = data.to(self.dev), label.to(self.dev)
preds = Net(data)
loss = lossFun(preds, label)
loss.backward()
opti.step()
opti.zero_grad()
return Net.state_dict()
def local_val(self):
pass
1.4 server.py
server.py
是FedAvg实现的核心部分,项目的执行也是直接运行该文件,原github运行的代码为:
python server.py -nc 100 -cf 0.1 -E 5 -B 10 -mn mnist_cnn -ncomm 1000 -iid 0 -lr 0.01 -vf 20 -g 0
可见参数繁多,执行起来过于繁琐,因此在介绍前,我对该部分进行了简单的修改,原代码为:
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, description="FedAvg")
parser.add_argument('-g', '--gpu', type=str, default='0', help='gpu id to use(e.g. 0,1,2,3)')
parser.add_argument('-nc', '--num_of_clients', type=int, default=100, help='numer of the clients')
parser.add_argument('-cf', '--cfraction', type=float, default=0.1, help='C fraction, 0 means 1 client, 1 means total clients')
parser.add_argument('-E', '--local_epoch', type=int, default=5, help='local train epoch')
parser.add_argument('-B', '--batch_size', type=int, default=10, help='local train batch size')
parser.add_argument('-mn', '--model_name', type=str, default='mnist_2nn', help='the model to train')
parser.add_argument('-lr', "--learning_rate", type=float, default=0.01, help="learning rate, \\
use value from origin paper as default")
parser.add_argument('-vf', "--val_freq", type=int, default=5, help="model validation frequency(of communications)")
parser.add_argument('-sf', '--save_freq', type=int, default=20, help='global model save frequency(of communication)')
parser.add_argument('-ncomm', '--num_comm', type=int, default=1000, help='number of communications')
parser.add_argument('-sp', '--save_path', type=str, default='./checkpoints', help='the saving path of checkpoints')
parser.add_argument('-iid', '--IID', type=int, default=0, help='the way to allocate data to clients')
我将该部分代码改为了conf.json
文件,这样之后所有的超参数调整都在该文件中进行,执行起来只需要链接到该文件即可,conf.json
内容如下:
// 是否使用gpu
"gpu" : "0",
// 参与方个数
"num_of_clients" : 10,
// 参与方百分比
"cfraction" : 1,
// 本地迭代训练次数
"local_epoch" : 3,
// mini-batch
"batch_size" : 100,
// 模型名称
"model_name" : "mnist_cnn",
// 学习率
"learning_rate" : 0.005,
// 评估频率 次/epoch
"val_freq" : 1,
// 模型参数存储频率 次/epoch
"save_freq" : 1000,
// 全局迭代次数
"num_comm" : 100,
// 模型参数存储路径
"save_path" : "./checkpoints",
// 是否独立同分布
"IID" : false,
// 差分隐私噪声幅度
"sigma" : 0.1,
// 是否添加差分隐私
"noise" : 1
执行代码为:
python server.py -c ./utils/conf.json
当然也要在main函数中添加解析json文件的代码,代码如下:
# 定义解析器
parser = argparse.ArgumentParser(description='FedAvg')
parser.add_argument('-c', '--conf', dest='conf')
arg = parser.parse_args()
# 解析器解析json文件
with open(arg.conf, 'r') as f:
args = json.load(f)
读取到json文件数据后,首先将这些超参数取出,定义相关的模型实例,参与方实例,计算环境,损失函数和优化器,代码如下:
# 创建中间参数保存目录
test_mkdir(args['save_path'])
# 使用gpu or cpu
os.environ['CUDA_VISIBLE_DEVICES'] = args['gpu']
dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
# 定义使用模型(全连接 or 简单卷积)
net = None
if args['model_name'] == 'mnist_2nn':
net = Mnist_2NN()
elif args['model_name'] == 'mnist_cnn':
net = Mnist_CNN()
# 如果gpu设备不止一个,并行计算
if torch.cuda.device_count() > 1:
print("Let's use"以上是关于联邦学习实战——基于同态加密和差分隐私混合加密机制的FedAvg的主要内容,如果未能解决你的问题,请参考以下文章