BERT+BiLSTM命名实体识别

Posted HELLO-Zhang先森

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了BERT+BiLSTM命名实体识别相关的知识,希望对你有一定的参考价值。

这是第一次在CSDN上记录自己的学习过程,加油。

本文是记录b站博主“手写AI”的命名实体识别系列课程的学习笔记,构建五个py文件,直接运行即可。

目录

一、前言

二、数据处理

数据案例

读取数据

 构建label2index

三、数据迭代器

接下来看一下如何构建数据类

对三个函数进行一一解释:

四、模型

训练和验证

当label不为None的时候,就是训练: 

当label为None的时候,就是验证: 

五、训练

六、预测 

六、完整代码

config.py

  utils.py

 model.py

train.py 

predict.py 


一、前言

文章比较简单,就是利用BERT+BiLSTM,然后后面直接连上Linear层进行分类,比较简单。

二、数据处理

数据案例

高 B-NAME 勇 E-NAME : O 男 O , O 中 B-CONT 国 I-CONT 国 I-CONT 籍 E-CONT , O 无 O 境 O 外 O 居 O 留 O 权 O , O

读取数据

def read_data(filename):
    with open(filename, 'r', encoding='utf8') as f:
        all_data = f.read().split('\\n')

    all_text = []  # 用来保存所有的文本
    all_label = [] # 用来保存所有的标签
    text = []      # 用来保存一段文本
    labels = []    # 用来保存一段文本的标签
    for data in all_data:
        if data == '':
            all_text.append(text)
            all_label.append(labels)
            text = []
            labels = []
        else:
            t, l = data.split(' ')
            text.append(t)
            labels.append(l)
    return all_text, all_label

 构建label2index

def build_label_2_index(all_label):
    label_2_index = 'PAD': 0, 'UNK': 1
    for labels in all_label:
        for label in labels:
            if label not in label_2_index:
                label_2_index[label] = len(label_2_index)
    return label_2_index, list(label_2_index)

因为会设置模型输出的最大长度,所有,当句子不够长的时候,我们需要对标签进行填充[PAD],当遇到不认识的标签时[UNK].

返回值:

label_2_index:是字典,类似于'PAD': 0, 'UNK': 1

list(label_2_index):是列表,['PAD', 'UNK']

 

三、数据迭代器

在pytorch里面,Dataset和DataLoader这两个类很重要,可以将数据处理好,然后就可以直接读取了。具体的操作流程都是固定的,主要是以下三个函数:

def __init__(self):  # 初始化函数
    pass
def __getitem__(self, item): # 读取一个数据
    pass
def __len__(self)    # 返回整个数据的长度
    pass

接下来看一下如何构建数据类

class Data(Dataset):
    def __init__(self, all_text, all_label, tokenizer, label2index, max_len):
        self.all_text = all_text
        self.all_label = all_label
        self.tokenizer = tokenizer
        self.label2index = label2index
        self.max_len = max_len

    def __getitem__(self, item):
        text = self.all_text[item]
        labels = self.all_label[item][:self.max_len]

        # 需要对text编码,让bert可以接受
        text_index = self.tokenizer.encode(text,
                                           add_special_tokens=True,
                                           max_length=self.max_len + 2,
                                           padding='max_length',
                                           truncation=True,
                                           return_tensors='pt',
                                           )
        # 也需要将label进行编码
        # 那么我们需要构建一个函数来传入label2index
        # labels_index = [self.label2index.get(label, 1) for label in labels]
        # 上面那个就仅仅是转化,我们需要将label和text对齐
        labels_index = [0] + [self.label2index.get(label, 1) for label in labels] + [0] + [0] * (
                self.max_len - len(text))

        # 这里需要注意text_index.squeeze(),squeeze()是默认去掉维度为1的那个维度
        # text_index的原始维度是:batch_size,1,seq_len
        # 在后续操作的过程中,将输入数据喂入模型时,如果不做处理,就会报错
        # 这里多输出一个len(text)!目的是在验证的时候,用的上,后面会介绍用处

        return text_index.squeeze(), torch.tensor(labels_index), len(text)

    def __len__(self):
        return len(self.all_text)

对三个函数进行一一解释:

1、 def __init__(self, all_text, all_label, tokenizer, label2index, max_len)

需要在初始化函数中传入需要的参数,比如:

all_text和all_label:你读取的所有文本和标签(数据处理部分);

tokenizer:因为要将文本传入BERT模型中,直接传入肯定是不行的,需要将文本转成数字(这是transformers封装好的,直接调用就行);

label2index:与上面的tokenizer相似,也需要将标签转成数字,这里直接编写代码即可(数据处理部分)

max_len:设置你想要的最大长度

四、模型

class MyModel(nn.Module):
    def __init__(self, class_num):
        super(MyModel, self).__init__()
        self.class_num = class_num

        self.bert = BertModel.from_pretrained(BERT_PATH)

        self.lstm = nn.LSTM(768,
                            768 // 2,
                            bidirectional=True,
                            batch_first=True)

        self.linear = nn.Linear(768, class_num)
        self.loss_fn = nn.CrossEntropyLoss()

    def forward(self, batch_text, batch_label=None):
        output = self.bert(batch_text)
        bert_out0, bert_out1 = output[0], output[1]
        output1, _ = self.lstm(bert_out0)
        pre = self.linear(output1)

        if batch_label is not None:
            loss = self.loss_fn(pre.reshape(-1, pre.shape[-1]), batch_label.reshape(-1))
            return loss
        else:
            return torch.argmax(pre, dim=-1)

 将输入数据喂入模型,然后得到输出。

当模型有标签数据的时候,那么就会返回损失值,然后反向传播,更新,梯度清零;

当模型没有标签数据的时候,那么就是预测了,模型的输出应该就是标签类别,所以要在初始化函数中传入整个类别数(len(label2index))

注意:此时,这个标签值,应该是数字,后续还需要将其转换为真是标签进行计算。

所以在初始化函数中:

def __init__(self, class_num),设置了一class_num==整个类别数(len(label2index))

训练和验证

当label不为None的时候,就是训练: 

 if batch_label is not None:
            loss = self.loss_fn(pre.reshape(-1, pre.shape[-1]),batch_label.reshape(-1))
            return loss
  为什么要将pre和batch_label的维度进行改变?

首先,需要看pre的原始维度:

 pre.shape == torch.Size([batch_size, max_len, class_num])

batch_label.shape == torch.Size([batch_size, max_len])

其次,loss_fn = nn.CrossEntropyLoss(),需要输入的向量维度是二维的,所以我们需要对维度进行改变!

最后,

pre.reshape(-1, pre.shape[-1]) == (batch_size*max_len, class_num)

batch_label.reshape(-1) == (batch_size*max_len)

当label为None的时候,就是验证: 

直接返回:return torch.argmax(pre, dim=-1)。

五、训练

def train():
    
    # 读取训练文件夹
    train_filename = os.path.join('data', 'train.txt')
    # 返回训练数据的文本和标签
    train_text, train_label = read_data(train_filename)

    # 验证集
    dev_filename = os.path.join('data', 'dev.txt')
    dev_text, dev_label = read_data(dev_filename)
    # print(train_filename)
    
    # 得到label2index, index2label
    label2index, index2label = build_label_2_index(train_label)

    # 数据迭代器
    train_data = Data(train_text, train_label, tokenizer, label2index, MAX_LEN)
    train_loader = DataLoader(train_data, batch_size=32, shuffle=False)

    dev_data = Data(dev_text, dev_label, tokenizer, label2index, MAX_LEN)
    dev_loader = DataLoader(dev_data, batch_size=32, shuffle=False)

    # 模型
    model = MyModel(len(label2index)).to(DEVICE)
    optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)

    # 训练

    for epoch in range(EPOCHS):
        model.train()
        for batch_idx, data in enumerate(train_loader):
            batch_text, batch_label, batch_len = data
            # 将数据放到GPU上
            loss = model(batch_text.to(DEVICE), batch_label.to(DEVICE))
            loss.backward()

            optimizer.step()
            optimizer.zero_grad()

            if batch_idx % 10 == 0:
                print(f'Epoch: epoch, BATCH: batch_idx, Training Loss:  loss.item()')
        # torch.save(model, MODEL_DIR + f'model_epoch.pth')

        model.eval()
        
        # 用来存放预测标签和真实标签
        all_pre = []
        all_tag = []
        
        for batch_text, batch_label, batch_len in dev_loader:
            
            # 因为是预测,所以在模型输入的地方,没有加入batch_label
            pre = model(batch_text.to(DEVICE))
            
            # 将pre从GPU上读下来,转成list
            pre = pre.cpu().numpy().tolist()
            batch_label = batch_label.cpu().numpy().tolist()

            # 还有一点要注意, from seqeval.metrics import f1_score
            # 在使用 f1_score的时候,所需要的标签应该是完整的,而不是经过填充过的
            # 所以我们需要将填充过的标签信息进行拆分怎么做呢?
            # 就需要将最开始没有填充过的文本长度记录下来,在__getitem__的返回量中增加一个长度量,那样我们就能知道文本真实长度
            # 然后就此进行切分,因为左边增加了一个开始符,需要去掉一个即可;右边按照长度来切分

            for p, t, l in zip(pre, batch_label, batch_len):
                p = p[1: l + 1]
                t = t[1: l + 1]

                pre = [index2label[j] for j in p]
                tag = [index2label[j] for j in t]
                all_pre.append(pre)
                all_tag.append(tag)
        f1_score_ = f1_score(all_pre, all_tag)
        p_score = precision_score(all_pre, all_tag)
        r_score = recall_score(all_pre, all_tag)
        # f1_score(batch_label_index, pre)
        print(f'p值=p_score, r值=r_score, f1=f1_score_')

六、预测 

就没有跑那么多了,直接保存模型,读取一条数据进行预测。

def predict():
    train_filename = os.path.join('data', 'train.txt')
    train_text, train_label = read_data(train_filename)

    test_filename = os.path.join('data', 'test.txt')
    test_text, _ = read_data(test_filename)
    text = test_text[1]

    print(text)

    inputs = tokenizer.encode(text,
                              return_tensors='pt')
    inputs = inputs.to(DEVICE)
    model = torch.load(MODEL_DIR + 'model_1.pth')
    y_pre = model(inputs).reshape(-1)  # 或者是y_pre[0]也行,因为y_pre是一个batch,需要进行reshape

    _, id2label = build_label_2_index(train_label)

    label = [id2label[l] for l in y_pre[1:-1]]
    print(text)
    print(label)


if __name__ == '__main__':
    predict()

 

六、完整代码

 完整代码分为5部分:config.py, utils.py, model.py, train.py, predict.py

config.py

import torch
from transformers import BertModel, BertTokenizer
from torch.utils.data import DataLoader, Dataset
EPOCHS = 2
BATCH_SIZE = 64
LEARNING_RATE = 2e-5
MAX_LEN = 50
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'  # 调用GPU

BERT_PATH = r'BERT_MODEL\\roberta'  # 你自己的bert模型地址

tokenizer = BertTokenizer.from_pretrained(BERT_PATH)
MODEL_DIR = 'model/'   # 这是保存模型的地址,建在你代码的同一级即可

  utils.py

 

import torch
from torch.utils.data import DataLoader, Dataset


def read_data(filename):
    with open(filename, 'r', encoding='utf8') as f:
        all_data = f.read().split('\\n')

    all_text = []
    all_label = []
    text = []
    labels = []
    for data in all_data:
        if data == '':
            all_text.append(text)
            all_label.append(labels)
            text = []
            labels = []
        else:
            t, l = data.split(' ')
            text.append(t)
            labels.append(l)
    return all_text, all_label


def build_label_2_index(all_label):
    label_2_index = 'PAD': 0, 'UNK': 1
    for labels in all_label:
        for label in labels:
            if label not in label_2_index:
                label_2_index[label] = len(label_2_index)
    return label_2_index, list(label_2_index)


class Data(Dataset):
    def __init__(self, all_text, all_label, tokenizer, label2index, max_len):
        self.all_text = all_text
        self.all_label = all_label
        self.tokenizer = tokenizer
        self.label2index = label2index
        self.max_len = max_len

    def __getitem__(self, item):
        text = self.all_text[item]
        labels = self.all_label[item][:self.max_len]

        # 需要对text编码,让bert可以接受
        text_index = self.tokenizer.encode(text,
                                           add_special_tokens=True,
                                           max_length=self.max_len + 2,
                                           padding='max_length',
                                           truncation=True,
                                           return_tensors='pt',
                                           )
        # 也需要将label进行编码
        # 那么我们需要构建一个函数来传入label2index
        # labels_index = [self.label2index.get(label, 1) for label in labels]
        # 上面那个就仅仅是转化,我们需要将label和text对齐
        labels_index = [0] + [self.label2index.get(label, 1) for label in labels] + [0] + [0] * (
                self.max_len - len(text))

        return text_index.squeeze(), torch.tensor(labels_index), len(text)

    def __len__(self):
        return len(self.all_text)

 model.py

import torch.nn as nn
from config import *


class MyModel(nn.Module):
    def __init__(self, class_num):
        super(MyModel, self).__init__()
        self.class_num = class_num

        self.bert = BertModel.from_pretrained(BERT_PATH)

        self.lstm = nn.LSTM(768,
                            768 // 2,
                            bidirectional=True,
                            batch_first=True)

        self.linear = nn.Linear(768, class_num)
        self.loss_fn = nn.CrossEntropyLoss()

    def forward(self, batch_text, batch_label=None):
        output = self.bert(batch_text)
        bert_out0, bert_out1 = output[0], output[1]
        output1, _ = self.lstm(bert_out0)
        pre = self.linear(output1)

        if batch_label is not None:
            loss = self.loss_fn(pre.reshape(-1, pre.shape[-1]), batch_label.reshape(-1))
            return loss
        else:
            return torch.argmax(pre, dim=-1)

train.py 

from utils import *
from model import *
from config import *
from seqeval.metrics import f1_score, precision_score, recall_score
import os


def train():

    # 读取训练文件夹
    train_filename = os.path.join('data', 'train.txt')
    # 返回训练数据的文本和标签
    train_text, train_label = read_data(train_filename)

    # 验证集
    dev_filename = os.path.join('data', 'dev.txt')
    dev_text, dev_label = read_data(dev_filename)
    # print(train_filename)

    # 得到label2index, index2label
    label2index, index2label = build_label_2_index(train_label)

    # 数据迭代器
    train_data = Data(train_text, train_label, tokenizer, label2index, MAX_LEN)
    train_loader = DataLoader(train_data, batch_size=32, shuffle=False)

    dev_data = Data(dev_text, dev_label, tokenizer, label2index, MAX_LEN)
    dev_loader = DataLoader(dev_data, batch_size=32, shuffle=False)

    # 模型
    model = MyModel(len(label2index)).to(DEVICE)
    optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)

    # 训练

    for epoch in range(EPOCHS):
        model.train()
        for batch_idx, data in enumerate(train_loader):
            batch_text, batch_label, batch_len = data
            # 将数据放到GPU上
            loss = model(batch_text.to(DEVICE), batch_label.to(DEVICE))
            loss.backward()

            optimizer.step()
            optimizer.zero_grad()

            if batch_idx % 10 == 0:
                print(f'Epoch: epoch, BATCH: batch_idx, Training Loss:  loss.item()')
        # torch.save(model, MODEL_DIR + f'model_epoch.pth')

        model.eval()

        # 用来存放预测标签和真实标签
        all_pre = []
        all_tag = []

        for batch_text, batch_label, batch_len in dev_loader:

            # 因为是预测,所以在模型输入的地方,没有加入batch_label
            pre = model(batch_text.to(DEVICE))

            # 将pre从GPU上读下来,转成list
            pre = pre.cpu().numpy().tolist()
            batch_label = batch_label.cpu().numpy().tolist()

            # 还有一点要注意, from seqeval.metrics import f1_score
            # 在使用 f1_score的时候,所需要的标签应该是完整的,而不是经过填充过的
            # 所以我们需要将填充过的标签信息进行拆分怎么做呢?
            # 就需要将最开始没有填充过的文本长度记录下来,在__getitem__的返回量中增加一个长度量,那样我们就能知道文本真实长度
            # 然后就此进行切分,因为左边增加了一个开始符,需要去掉一个即可;右边按照长度来切分

            for p, t, l in zip(pre, batch_label, batch_len):
                p = p[1: l + 1]
                t = t[1: l + 1]

                pre = [index2label[j] for j in p]
                tag = [index2label[j] for j in t]
                all_pre.append(pre)
                all_tag.append(tag)
        f1_score_ = f1_score(all_pre, all_tag)
        p_score = precision_score(all_pre, all_tag)
        r_score = recall_score(all_pre, all_tag)
        # f1_score(batch_label_index, pre)
        print(f'p值=p_score, r值=r_score, f1=f1_score_')
        # print(2*p_score*r_score/(p_score+r_score))


if __name__ == '__main__':
    train()

predict.py 

from utils import *
from model import *
from config import *
import os


def predict():
    train_filename = os.path.join('data', 'train.txt')
    train_text, train_label = read_data(train_filename)

    test_filename = os.path.join('data', 'test.txt')
    test_text, _ = read_data(test_filename)
    text = test_text[1]

    print(text)

    inputs = tokenizer.encode(text,
                              return_tensors='pt')
    inputs = inputs.to(DEVICE)
    model = torch.load(MODEL_DIR + 'model_1.pth')
    y_pre = model(inputs).reshape(-1)  # 或者是y_pre[0]也行,因为y_pre是一个batch,需要进行reshape

    _, id2label = build_label_2_index(train_label)

    label = [id2label[l] for l in y_pre[1:-1]]
    print(text)
    print(label)


if __name__ == '__main__':
    predict()

 

 

 

以上是关于BERT+BiLSTM命名实体识别的主要内容,如果未能解决你的问题,请参考以下文章

NLP命名体识别bilstm+crf

基于tensorflow的bilstm_crf的命名实体识别(数据集是msra命名实体识别数据集)

BiLSTM-CRF命名实体识别模型中的CRF层

NLP屠夫系列- NER之实战BILSTM

基于BiLSTM-CNN-CRF的中文分词(一)

文本数据挖掘中文命名实体识别:HMM模型+BiLSTM_CRF模型(Pytorch)调研与实验分析