R-GAT实现基于方面级情感分析
Posted Icy Hunter
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了R-GAT实现基于方面级情感分析相关的知识,希望对你有一定的参考价值。
前言
这篇博客主要是为了记录一下一套完整的深度学习模型的流程,方便以后好copy
代码如下
数据预处理
数据预处理其实比较重要,不过看模型喂入数据的格式就知道应该将数据处理成什么样了,过程肯定仁者见仁,智者见智,这里就不介绍了。
EarlyStopping
这个主要是为了让你训练的时候,能够根据训练结果来决定是否结束训练,如果测试集损失一直在升高,就没必要训练了。
import torch
import numpy as np
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
class EarlyStopping:
"""Early stops the training if validation loss doesn't improve after a given patience."""
def __init__(self, patience=7, verbose=False, delta=0, name="checkpoint.pt"):
"""
Args:
patience (int): How long to wait after last time validation loss improved.
Default: 7
verbose (bool): If True, prints a message for each validation loss improvement.
Default: False
delta (float): Minimum change in the monitored quantity to qualify as an improvement.
Default: 0
"""
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.val_loss_min = np.Inf
self.delta = delta
self.name = name
def __call__(self, val_loss, model):
score = -val_loss
if self.best_score is None:
self.best_score = score
self.save_checkpoint(val_loss, model)
elif score < self.best_score + self.delta:
self.counter += 1
print(f'EarlyStopping counter: self.counter out of self.patience')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(val_loss, model)
self.counter = 0
def save_checkpoint(self, val_loss, model):
'''Saves model when validation loss decrease.'''
if self.verbose:
print(f'Validation loss decreased (self.val_loss_min:.6f --> val_loss:.6f). Saving model ...')
torch.save(model.state_dict(), './models/' + self.name) # 这里会存储迄今最优模型的参数
self.val_loss_min = val_loss
patience = 7
early_stopping = EarlyStopping(patience, verbose=True, name="R-GAT.pt")
R-GAT模型
这个以前讲过,用的就是异构图卷积,可以参考基于注意力机制的图神经网络且考虑关系的R-GAT的一些理解以及DGL代码实现
但是方面级情感预测,每个句子里的方面可能不止一个词,自己操作一下就好了。
from dgl.nn.pytorch import HeteroGraphConv
import torch
import torch as th
import torch.nn as nn
import torch.nn.functional as F
from dgl import function as fn
from dgl.ops import edge_softmax
from dgl.utils import expand_as_pair
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class GATConv(nn.Module):
def __init__(
self,
in_feats, # 输入的节点特征维度
out_feats, # 输出的节点特征维度
edge_feats, # 输入边的特征维度
num_heads=1, # 注意力头数
feat_drop=0.0, # 节点特征dropout
attn_drop=0.0, # 注意力dropout
edge_drop=0.0, # 边特征dropout
negative_slope=0.2,
activation=None,
allow_zero_in_degree=False,
use_symmetric_norm=False,
):
super(GATConv, self).__init__()
self._num_heads = num_heads
self._in_src_feats, self._in_dst_feats = expand_as_pair(in_feats)
self._out_feats = out_feats
self._allow_zero_in_degree = allow_zero_in_degree
self._use_symmetric_norm = use_symmetric_norm
if isinstance(in_feats, tuple):
self.fc_src = nn.Linear(self._in_src_feats, out_feats * num_heads, bias=False)
self.fc_dst = nn.Linear(self._in_dst_feats, out_feats * num_heads, bias=False)
else:
self.fc = nn.Linear(self._in_src_feats, out_feats * num_heads, bias=False)
self.fc_edge = nn.Linear(edge_feats, out_feats * num_heads, bias=False)
self.attn_l = nn.Parameter(torch.FloatTensor(size=(1, num_heads, out_feats)))
self.attn_edge=nn.Parameter(torch.FloatTensor(size=(1, num_heads, out_feats)))
self.attn_r = nn.Parameter(torch.FloatTensor(size=(1, num_heads, out_feats)))
self.feat_drop = nn.Dropout(feat_drop)
self.attn_drop = nn.Dropout(attn_drop)
self.edge_drop = edge_drop
self.leaky_relu = nn.LeakyReLU(negative_slope)
self.reset_parameters()
self._activation = activation
# 初始化参数
def reset_parameters(self):
gain = nn.init.calculate_gain("relu")
if hasattr(self, "fc"):
nn.init.xavier_normal_(self.fc.weight, gain=gain)
else:
nn.init.xavier_normal_(self.fc_src.weight, gain=gain)
nn.init.xavier_normal_(self.fc_dst.weight, gain=gain)
nn.init.xavier_normal_(self.fc_edge.weight, gain=gain)
nn.init.xavier_normal_(self.attn_l, gain=gain)
nn.init.xavier_normal_(self.attn_r, gain=gain)
nn.init.xavier_normal_(self.attn_edge, gain=gain)
def set_allow_zero_in_degree(self, set_value):
self._allow_zero_in_degree = set_value
def forward(self, graph, feat):
with graph.local_scope():
if not self._allow_zero_in_degree:
if (graph.in_degrees() == 0).any():
assert False
# feat[0]源节点的特征
# feat[1]目标节点的特征
# h_edge 边的特征
h_src = self.feat_drop(feat[0])
h_dst = self.feat_drop(feat[1])
h_edge = self.feat_drop(graph.edata['deprel'])
if not hasattr(self, "fc_src"):
self.fc_src, self.fc_dst = self.fc, self.fc
# 特征赋值
feat_src, feat_dst,feat_edge= h_src, h_dst,h_edge
# 转换成多头注意力的形状
feat_src = self.fc_src(h_src).view(-1, self._num_heads, self._out_feats)
feat_dst = self.fc_dst(h_dst).view(-1, self._num_heads, self._out_feats)
feat_edge = self.fc_edge(h_edge).view(-1, self._num_heads, self._out_feats)
# NOTE: GAT paper uses "first concatenation then linear projection"
# to compute attention scores, while ours is "first projection then
# addition", the two approaches are mathematically equivalent:
# We decompose the weight vector a mentioned in the paper into
# [a_l || a_r], then
# a^T [Wh_i || Wh_j] = a_l Wh_i + a_r Wh_j
# Our implementation is much efficient because we do not need to
# save [Wh_i || Wh_j] on edges, which is not memory-efficient. Plus,
# addition could be optimized with DGL's built-in function u_add_v,
# which further speeds up computation and saves memory footprint.
# 简单来说就是拼接矩阵相乘和拆开分别矩阵相乘再相加的效果是一样的
# 但是前者更加高效
# 左节点的注意力权重
el = (feat_src * self.attn_l).sum(dim=-1).unsqueeze(-1)
graph.srcdata.update("ft": feat_src, "el": el)
# 右节点的注意力权重
er = (feat_dst * self.attn_r).sum(dim=-1).unsqueeze(-1)
# print("er", er.shape)
graph.dstdata.update("er": er)
# 左节点权重+右节点权重 = 节点计算出的注意力权重(e)
graph.apply_edges(fn.u_add_v("el", "er", "e"))
# 边计算出来的注意力权重
ee = (feat_edge * self.attn_edge).sum(dim=-1).unsqueeze(-1)
# print("ee", ee.shape)
# 边注意力权重加上节点注意力权重得到最终的注意力权重
# 这里可能应该也是和那个拼接操作等价吧
graph.edata.update("e": graph.edata["e"]+ee)
# 经过激活函数,一起激活和分别激活可能也是等价吧
e = self.leaky_relu(graph.edata["e"])
# 注意力权重的正则化
if self.training and self.edge_drop > 0:
perm = torch.randperm(graph.number_of_edges(), device=graph.device)
bound = int(graph.number_of_edges() * self.edge_drop)
eids = perm[bound:]
a = torch.zeros_like(e)
a[eids] = self.attn_drop(edge_softmax(graph, e[eids], eids=eids))
graph.edata.update("a": a)
else:
graph.edata["a"] = self.attn_drop(edge_softmax(graph, e))
# 消息传递
graph.update_all(fn.u_mul_e("ft", "a", "m"), fn.sum("m", "ft"))
rst = graph.dstdata["ft"]
# 标准化
degs = graph.in_degrees().float().clamp(min=1)
norm = torch.pow(degs, -1)
shp = norm.shape + (1,) * (feat_dst.dim() - 1)
norm = torch.reshape(norm, shp)
rst = rst * norm
return rst
class RGAT(nn.Module):
def __init__(
self,
in_feats, # 输入的特征维度 (边和节点一样)
hid_feats, # 隐藏层维度
out_feats, # 输出的维度
num_heads, # 注意力头数
rel_names, # 关系的名称(用于异构图卷积)
rel_feats, # 关系的特征大小
):
super().__init__()
self.conv1 = HeteroGraphConv(rel: GATConv(in_feats, hid_feats // num_heads, rel_feats, num_heads) for rel in rel_names,aggregate='sum')
self.conv2 = HeteroGraphConv(rel: GATConv(hid_feats, out_feats, rel_feats, num_heads) for rel in rel_names,aggregate='sum')
self.hid_feats = hid_feats
def forward(self,graph,inputs):
# graph 输入的异构图
# inputs 输入节点的特征
h = self.conv1(graph, inputs) # 第一层异构卷积
h = k: F.relu(v).view(-1, self.hid_feats) for k, v in h.items() # 经过激活函数,将注意力头数拉平
h = self.conv2(graph, h) # 第二层异构卷积
return h
class Model(nn.Module):
def __init__(self, in_features, hidden_features, out_features, num_heads, rel_names, vocab_size, rel_feats, rel_size, num_classes):
super().__init__()
self.rgat = RGAT(in_features, hidden_features, out_features, num_heads, rel_names, rel_feats)
self.embed = nn.Embedding(vocab_size, in_features)
self.dep_embed = nn.Embedding(rel_size, rel_feats)
self.fc = nn.Linear(num_heads * out_features, num_classes)
self.num_heads = num_heads
self.out_features = out_features
def forward(self, g, x):
x = k: self.embed(v) for k, v in x.items()
g.edata["deprel"] = self.dep_embed(g.edata["deprel"])
# print(x)
h = self.rgat(g, x)
# 输出的就是每个节点经过R-GAT后的特征
# for k, v in h.items():
# print(k, v.shape)
# 这里就一种关系
h = h["word"].view(-1, self.num_heads * self.out_features)
aspects_words_pos = torch.nonzero(g.ndata["from_to"]!=0).squeeze() # from to 的位置
aspects_num = len(aspects_words_pos) // 2 # aspects方面的个数
aspects_words_h = torch.zeros(aspects_num, self.out_features * self.num_heads) # 方面词的隐藏层,由于一个方面可能不止一个词,采取平均的方式
for i in range(aspects_num):
h_i = aspects_words_h[aspects_words_pos[i*2]:aspects_words_pos[i*2+1]]
h_i = torch.sum(h_i, axis=0)
aspects_words_h[i] = h_i
return self.fc(aspects_words_h)
模型参数设置
具体含义下面会讲到
batch_size = 4
epochs = 5
lr = 1e-3
vocab_size = 7873
rel_size = 45
in_features = 300
hidden_features = 8
out_features = 300
num_heads = 4
rel_feats = 32
num_classes = 3
assert in_features // (num_heads)
assert rel_feats // (num_heads)
assert hidden_features // (num_heads)
初始化模型和读取数据
g = th.load(r"C:\\Users\\ASUS\\OneDrive\\桌面\\sentiment_classfication\\R-GAT\\github_data\\MAMS\\processed\\valid.pt")
model = Model(in_features, # embedding_size
hidden_features, # hidden_size
out_features, # output_size
num_heads, # 注意力头数
g[0].etypes, # 关系
vocab_size, # 字典大小
rel_feats,# 关系的大小rel_embedding_size
rel_size,# 关系的个数
num_classes)# 类别
# print(model)
分batch批量训练
from collections import namedtuple
from torch.utils.data import DataLoader
import numpy as np
DataBatch = namedtuple('SSTBatch', ['graph', 'wordid', 'label', "pos", "tag_word","from_to"])
def batcher(dev):
def batcher_dev(batch):
batch_trees = dgl.batch(batch)
return DataBatch(graph=batch_trees.to(device),
# mask=batch_trees.ndata['mask'].to(device),
wordid=batch_trees.nodes['word'].data['token'].to(device), # 词编号
label=batch_trees.nodes['word'].data['label'].to(device), # 词的标签,无标签为-1
pos=batch_trees.nodes['word'].data['pos'].to(device), # 词性标注
tag_word=batch_trees.nodes['word'].data["tag_word"].to(device), # 标签词的位置
from_to = batch_trees.nodes['word'].data["from_to"].to(以上是关于R-GAT实现基于方面级情感分析的主要内容,如果未能解决你的问题,请参考以下文章
[Python人工智能] 三十六.基于Transformer的商品评论情感分析 keras构建多头自注意力(Transformer)模型
[Python人工智能] 三十五.基于Transformer的商品评论情感分析 机器学习和深度学习的Baseline模型实现
论文泛读195PGCD:用于基于方面的情感分析的位置引导贡献分布单元