联邦学习实战——基于同态加密和差分隐私混合加密机制的FedAvg

Posted HERODING23

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了联邦学习实战——基于同态加密和差分隐私混合加密机制的FedAvg相关的知识,希望对你有一定的参考价值。

联邦学习实战——基于同态加密和差分隐私混合加密机制的FedAvg

前言

好久都没更新联邦学习相关内容了,这也是我更新这篇我认为非常硬核的文章的原因,这也算是实现了我在学习联邦学习半年以来的一个目标,基于混合加密机制实现联邦学习任务,这次任务使用的框架是FedAvg,在github上非常热门的联邦学习模拟实现方案,FedAvg的代码还是非常好理解的,本文的结构将主要分为三个部分,第一部分是对FedAvg代码的讲解和修改,第二部分将差分隐私机制加入到FedAvg中,包括高斯机制和拉普拉斯机制,第三部分将同态加密算法Paillier加入到第二部分中,实现基于同态加密和差分隐私混合加密机制的FedAvg,话不多说,就让我们开始吧!


1. FedAvg

联邦学习有两种更新方式,对服务器端,一种是共享梯度方式,一种是共享模型参数。共享模型参数是做了几轮梯度下降,针对共享梯度,它的一大优势是通信代价会低;同时,对整个梯度信息的保护也会更好,因此本次联邦学习实战的内容也是基于共享参数形式实现的。本次实战使用的FedAvg是github上非常热门的联邦学习实现方案,它模拟了多个参与方进行联邦学习的场景,并且支持Non-IID和独立同分布数据集,代码逻辑十分清晰,对于联邦学习初学者入门再适合不过了。

FedAvg实现的是联邦学习场景下的手写数字识别模型的训练,它的结构如下图所示,包括数据集,模型,参与方和服务器端代码,以及数据集导入的代码,除了TensorFlow版本,它还包括PyTorch版本,更便于支持gpu运行代码,本文介绍的代码部分也是基于PyTorch实现的。下面就对各个部分进行简单的介绍。

1.1 getData.py

getData.py文件是对MNIST数据集的数据集提取和预处理,获得训练数据集和测试数据集,MNIST数据集的数据格式为-ubyte.gz,因此需要编写函数对图片数据进行提取,如下所示:

def extract_images(filename):
    """Extract the images into a 4D uint8 numpy array [index, y, x, depth]."""
    print('Extracting', filename)
    with gzip.open(filename) as bytestream:
        magic = _read32(bytestream)
        if magic != 2051:
            raise ValueError(
                    'Invalid magic number %d in MNIST image file: %s' %
                    (magic, filename))
        num_images = _read32(bytestream)
        rows = _read32(bytestream)
        cols = _read32(bytestream)
        buf = bytestream.read(rows * cols * num_images)
        data = np.frombuffer(buf, dtype=np.uint8)
        data = data.reshape(num_images, rows, cols, 1)
        return data

本质上是将比特流数据转换为[index, y, x, depth]维度的数组形式,index为下标,y和x分别表示每张图像行列像素点数目,depth是图像的通道,对于MNIST黑白图片,通道为1。接着编写图片标签提取函数,将标签提取出并转为one-hot形式,便于模型训练时计算损失函数并反向传播,代码如下:

# 标签one-hot编码
def dense_to_one_hot(labels_dense, num_classes=10):
    """Convert class labels from scalars to one-hot vectors."""
    num_labels = labels_dense.shape[0]
    index_offset = np.arange(num_labels) * num_classes
    labels_one_hot = np.zeros((num_labels, num_classes))
    labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
    return labels_one_hot

# 提取标签
def extract_labels(filename):
    """Extract the labels into a 1D uint8 numpy array [index]."""
    print('Extracting', filename)
    with gzip.open(filename) as bytestream:
        magic = _read32(bytestream)
        if magic != 2049:
            raise ValueError(
                    'Invalid magic number %d in MNIST label file: %s' %
                    (magic, filename))
        num_items = _read32(bytestream)
        buf = bytestream.read(num_items)
        labels = np.frombuffer(buf, dtype=np.uint8)
        return dense_to_one_hot(labels)

图片和标签信息提取完成后,多次assert以验证提取图片过程没有出现问题。接着将图片展平,即每张图片28 * 28被展平为784 * 1,这里的目的是用于全连接神经网络的使用,如果只是用卷积神经网络,大可不必将其展平(在模型部分还要复原回去)。紧接着就是对数据标准化处理,如果不进行这一步,模型训练的结果可能惨不忍睹(比如预测所有图像都是7),这是由于发生了梯度爆炸现象。标准化处理后是两个数据处理方式:IID和Non-IID,熟悉联邦学习的读者应该了解这个概念,这也是联邦学习面临的一个巨大问题——数据非独立同分布。
在传统应用场景中,数据存储在中心,ML model 可以获取所有数据的整体信息,但是在联邦学习中,由于数据仅存储在本地,导致数据之间分布的不一致性,例如,美国西海岸针叶林茂盛,东海岸阔叶林分布广泛,又因为用户之间喜好和生活习惯不同,因此产生了数据分布的不一致性; 另一方面,以家庭或亲属关系的群体之间会相互影响,产生了数据分布的不独立性,以FedAvg中MNIST为例,Non-IID的数据应该是这样表现的:参与方1只有手写数字1,参与方2只有手写数字2,以此类推。在Non-IID数据场景下,模型训练会异常缓慢,效率很低,这将在之后的实验所有介绍。
FedAvg提供了IID和Non-IID两种数据处理形式,帮助用户理解实际应用场景下联邦学习的过程,Non-IID数据就是不对数据进行操作,IID数据就是将所有数据打乱,这样各个参与方数据的分布就满足独立同分布了,代码如下:

class GetDataSet(object):
    def __init__(self, dataSetName, isIID):
        self.name = dataSetName
        self.train_data = None
        self.train_label = None
        self.train_data_size = None
        self.test_data = None
        self.test_label = None
        self.test_data_size = None

        self._index_in_train_epoch = 0

        if self.name == 'mnist':
            self.mnistDataSetConstruct(isIID)
        else:
            pass


    def mnistDataSetConstruct(self, isIID):
        data_dir = r'./data/MNIST'
        # 选定图片路径
        train_images_path = os.path.join(data_dir, 'train-images-idx3-ubyte.gz')
        train_labels_path = os.path.join(data_dir, 'train-labels-idx1-ubyte.gz')
        test_images_path = os.path.join(data_dir, 't10k-images-idx3-ubyte.gz')
        test_labels_path = os.path.join(data_dir, 't10k-labels-idx1-ubyte.gz')
        # 从.gz中提取图片
        train_images = extract_images(train_images_path)
        train_labels = extract_labels(train_labels_path)
        test_images = extract_images(test_images_path)
        test_labels = extract_labels(test_labels_path)

        assert train_images.shape[0] == train_labels.shape[0]
        assert test_images.shape[0] == test_labels.shape[0]

        self.train_data_size = train_images.shape[0]
        self.test_data_size = test_images.shape[0]

        # mnist黑白图片通道为1
        assert train_images.shape[3] == 1
        assert test_images.shape[3] == 1
        # 图片展平
        train_images = train_images.reshape(train_images.shape[0], train_images.shape[1] * train_images.shape[2])
        test_images = test_images.reshape(test_images.shape[0], test_images.shape[1] * test_images.shape[2])
        
        # 标准化处理
        train_images = train_images.astype(np.float32)
        train_images = np.multiply(train_images, 1.0 / 255.0)
        test_images = test_images.astype(np.float32)
        test_images = np.multiply(test_images, 1.0 / 255.0)

        # 是否独立同分布
        if isIID:
            # 打乱顺序
            order = np.arange(self.train_data_size)
            np.random.shuffle(order)
            self.train_data = train_images[order]
            self.train_label = train_labels[order]
        else:
            # 按照0——9顺序排列
            labels = np.argmax(train_labels, axis=1)
            order = np.argsort(labels)
            self.train_data = train_images[order]
            self.train_label = train_labels[order]

        self.test_data = test_images
        self.test_label = test_labels

1.2 Models.py

Models.py文件中,定义了执行任务所使用的模型,这里提供了两种模型,简单全连接层模型和简单卷积神经网络模型,每个模型都继承了torch的nn.Module,表明是神经网络模型,每个模型类中都定义了__init__(self)和forward()两个函数,初始化函数中定义了每个模型的组件,比如简单全连接层模型中定义了三个卷积模块,两个池化模块,两个线性模块,并在forward()函数中组装起来。代码如下:

import torch
import torch.nn as nn
import torch.nn.functional as F

'''
简单全连接模型
'''
class Mnist_2NN(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(784, 200)
        self.fc2 = nn.Linear(200, 200)
        self.fc3 = nn.Linear(200, 10)

    def forward(self, inputs):
        tensor = F.relu(self.fc1(inputs))
        tensor = F.relu(self.fc2(tensor))
        tensor = self.fc3(tensor)
        return tensor

'''
简单卷积神经网络模型
'''
class Mnist_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        # 定义每一层模型
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=0)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=0)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=0)
        self.fc1 = nn.Linear(3*3*64, 64)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, inputs):
        # 构造模型
        tensor = inputs.view(-1, 1, 28, 28)
        tensor = F.relu(self.conv1(tensor))
        tensor = self.pool1(tensor)
        tensor = F.relu(self.conv2(tensor))
        tensor = self.pool2(tensor)
        tensor = F.relu(self.conv3(tensor))     
        tensor = tensor.view(-1, 3*3*64)
        tensor = F.relu(self.fc1(tensor))
        tensor = self.fc2(tensor)
        return tensor


至于每一个模块中具体的参数信息,包括channel的大小,卷积核的大小,步长,padding,神经元个数的设计,就不具体细说了,请各位自行阅读,并修改成自己想用的模型。

1.3 client.py

参与方的代码都存放在client.py文件中,里面包括了两个类的实现,一个是单个参与方类client,另一个是ClientsGroup,即参与方的集合类,首先分析一下client类,它定义了一个初始化函数,一个本地更新函数,一个本地评估函数(由于评估部分在server.py实现,这里就直接pass了,可以自定义修改)。
初始化函数输入参数有trainDataSet和dev,前者是输入到该参与方的数据集,dev是计算环境(cpu or gpu)。初始化函数中定义了训练数据,计算环境,本地参数等。
本地更新函数输入了本地迭代次数,本地BatchSize,神经网络模型,损失函数,优化器,全局参数。在本地更新函数中,首先向模型中导入全局参数,根据localBatchSize转换本地数据,接着执行localEpoch次本地更新。本地更新的过程就是普通的正向传播,梯度下降,反向传播的过程,这里就不赘述了。

import numpy as np
import torch
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader
from getData import GetDataSet


class client(object):
    def __init__(self, trainDataSet, dev):
        self.train_ds = trainDataSet
        self.dev = dev
        self.train_dl = None
        self.local_parameters = None

    def localUpdate(self, localEpoch, localBatchSize, Net, lossFun, opti, global_parameters):
        Net.load_state_dict(global_parameters, strict=True)
        self.train_dl = DataLoader(self.train_ds, batch_size=localBatchSize, shuffle=True)
        for epoch in range(localEpoch):
            for data, label in self.train_dl:
                data, label = data.to(self.dev), label.to(self.dev)
                preds = Net(data)
                loss = lossFun(preds, label)
                loss.backward()
                opti.step()
                opti.zero_grad()

        return Net.state_dict()

    def local_val(self):
        pass

1.4 server.py

server.py是FedAvg实现的核心部分,项目的执行也是直接运行该文件,原github运行的代码为:

python server.py -nc 100 -cf 0.1 -E 5 -B 10 -mn mnist_cnn  -ncomm 1000 -iid 0 -lr 0.01 -vf 20 -g 0

可见参数繁多,执行起来过于繁琐,因此在介绍前,我对该部分进行了简单的修改,原代码为:

parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, description="FedAvg")
parser.add_argument('-g', '--gpu', type=str, default='0', help='gpu id to use(e.g. 0,1,2,3)')
parser.add_argument('-nc', '--num_of_clients', type=int, default=100, help='numer of the clients')
parser.add_argument('-cf', '--cfraction', type=float, default=0.1, help='C fraction, 0 means 1 client, 1 means total clients')
parser.add_argument('-E', '--local_epoch', type=int, default=5, help='local train epoch')
parser.add_argument('-B', '--batch_size', type=int, default=10, help='local train batch size')
parser.add_argument('-mn', '--model_name', type=str, default='mnist_2nn', help='the model to train')
parser.add_argument('-lr', "--learning_rate", type=float, default=0.01, help="learning rate, \\
                    use value from origin paper as default")
parser.add_argument('-vf', "--val_freq", type=int, default=5, help="model validation frequency(of communications)")
parser.add_argument('-sf', '--save_freq', type=int, default=20, help='global model save frequency(of communication)')
parser.add_argument('-ncomm', '--num_comm', type=int, default=1000, help='number of communications')
parser.add_argument('-sp', '--save_path', type=str, default='./checkpoints', help='the saving path of checkpoints')
parser.add_argument('-iid', '--IID', type=int, default=0, help='the way to allocate data to clients')

我将该部分代码改为了conf.json文件,这样之后所有的超参数调整都在该文件中进行,执行起来只需要链接到该文件即可,conf.json内容如下:


	// 是否使用gpu
	"gpu" : "0",
	// 参与方个数
	"num_of_clients" : 10,
	// 参与方百分比
	"cfraction" : 1,
	// 本地迭代训练次数
	"local_epoch" : 3,
	// mini-batch
	"batch_size" : 100,
	// 模型名称
	"model_name" : "mnist_cnn",
	// 学习率
	"learning_rate" : 0.005,
	// 评估频率 次/epoch
	"val_freq" : 1,
	// 模型参数存储频率 次/epoch
	"save_freq" : 1000,
	// 全局迭代次数
	"num_comm" : 100,
	// 模型参数存储路径
	"save_path" : "./checkpoints",
	// 是否独立同分布
	"IID" : false,
  // 差分隐私噪声幅度
	"sigma" : 0.1,
  // 是否添加差分隐私
	"noise" : 1
	


执行代码为:

python server.py -c ./utils/conf.json

当然也要在main函数中添加解析json文件的代码,代码如下:

# 定义解析器
parser = argparse.ArgumentParser(description='FedAvg')
parser.add_argument('-c', '--conf', dest='conf')
arg = parser.parse_args()

# 解析器解析json文件
with open(arg.conf, 'r') as f:
    args = json.load(f)

读取到json文件数据后,首先将这些超参数取出,定义相关的模型实例,参与方实例,计算环境,损失函数和优化器,代码如下:

# 创建中间参数保存目录
test_mkdir(args['save_path'])

# 使用gpu or cpu
os.environ['CUDA_VISIBLE_DEVICES'] = args['gpu']
dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# 定义使用模型(全连接 or 简单卷积)
net = None
if args['model_name'] == 'mnist_2nn':
    net = Mnist_2NN()
elif args['model_name'] == 'mnist_cnn':
    net = Mnist_CNN()

# 如果gpu设备不止一个,并行计算
if torch.cuda.device_count() > 1:
    print("Let's use"以上是关于联邦学习实战——基于同态加密和差分隐私混合加密机制的FedAvg的主要内容,如果未能解决你的问题,请参考以下文章

联邦学习安全防御之同态加密

京东云开发者|经典同态加密算法Paillier解读 - 原理实现和应用

同态加密实现数据隐私计算,能让你的小秘密更加秘密

什么是隐私计算,它是怎样保护我们的隐私安全?

CVPR2021 | 批量恢复加密图像,联邦学习危机?

CVPR2021 | 批量恢复加密图像,联邦学习危机?