| import torch |
| import torch.nn as nn |
|
|
| from typing import List, Tuple |
| from torch.nn import TransformerEncoder, TransformerEncoderLayer |
| from transformers.models.bert import BertModel |
| from fastNLP.modules.torch import MLP,ConditionalRandomField,allowed_transitions |
| from torch.nn import CrossEntropyLoss |
|
|
|
|
| class ConvFeatureExtractionModel(nn.Module): |
|
|
| def __init__( |
| self, |
| conv_layers: List[Tuple[int, int, int]], |
| conv_dropout: float = 0.0, |
| conv_bias: bool = False, |
| ): |
| super().__init__() |
|
|
| def block(n_in, n_out, k, stride=1, conv_bias=False): |
| padding = k // 2 |
| return nn.Sequential( |
| nn.Conv1d(in_channels=n_in, out_channels=n_out, kernel_size=k, stride=stride, padding=padding, bias=conv_bias), |
| nn.Dropout(conv_dropout), |
| |
| nn.ReLU(), |
| |
| ) |
|
|
| in_d = 1 |
| self.conv_layers = nn.ModuleList() |
| for _, cl in enumerate(conv_layers): |
| assert len(cl) == 3, "invalid conv definition: " + str(cl) |
| (dim, k, stride) = cl |
|
|
| self.conv_layers.append( |
| block(in_d, dim, k, stride=stride, conv_bias=conv_bias)) |
| in_d = dim |
|
|
| def forward(self, x): |
| |
| for conv in self.conv_layers: |
| x = conv(x) |
| return x |
|
|
|
|
| class ModelWiseCNNClassifier(nn.Module): |
|
|
| def __init__(self, id2labels, dropout_rate=0.1): |
| super(ModelWiseCNNClassifier, self).__init__() |
| feature_enc_layers = [(64, 5, 1)] + [(128, 3, 1)] * 3 + [(64, 3, 1)] |
| self.conv = ConvFeatureExtractionModel( |
| conv_layers=feature_enc_layers, |
| conv_dropout=0.0, |
| conv_bias=False, |
| ) |
|
|
| embedding_size = 4 *64 |
| self.norm = nn.LayerNorm(embedding_size) |
| |
| self.label_num = len(id2labels) |
| self.dropout = nn.Dropout(dropout_rate) |
| self.classifier = nn.Sequential(nn.Linear(embedding_size, self.label_num)) |
| self.crf = ConditionalRandomField(num_tags=self.label_num, allowed_transitions=allowed_transitions(id2labels)) |
| self.crf.trans_m.data *= 0 |
|
|
| def conv_feat_extract(self, x): |
| out = self.conv(x) |
| out = out.transpose(1, 2) |
| return out |
|
|
| def forward(self, x, labels): |
| x = x.transpose(1, 2) |
| out1 = self.conv_feat_extract(x[:, 0:1, :]) |
| out2 = self.conv_feat_extract(x[:, 1:2, :]) |
| out3 = self.conv_feat_extract(x[:, 2:3, :]) |
| out4 = self.conv_feat_extract(x[:, 3:4, :]) |
| outputs = torch.cat((out1, out2, out3, out4), dim=2) |
| |
| outputs = self.norm(outputs) |
| dropout_outputs = self.dropout(outputs) |
| logits = self.classifier(dropout_outputs) |
| |
| if self.training: |
| loss_fct = CrossEntropyLoss(ignore_index=-1) |
| loss = loss_fct(logits.view(-1, self.label_num), labels.view(-1)) |
| output = {'loss': loss, 'logits': logits} |
| else: |
| mask = labels.gt(-1) |
| paths, scores = self.crf.viterbi_decode(logits=logits, mask=mask) |
| paths[mask==0] = -1 |
| output = {'preds': paths, 'logits': logits} |
| pass |
|
|
| return output |
| |
|
|
| class ModelWiseTransformerClassifier(nn.Module): |
|
|
| def __init__(self, id2labels, seq_len, intermediate_size = 512, num_layers=2, dropout_rate=0.1): |
| super(ModelWiseTransformerClassifier, self).__init__() |
| |
| feature_enc_layers = [(64, 5, 1)] + [(128, 3, 1)] * 3 + [(64, 3, 1)] |
| self.conv = ConvFeatureExtractionModel( |
| conv_layers=feature_enc_layers, |
| conv_dropout=0.0, |
| conv_bias=False, |
| ) |
| |
| self.seq_len = seq_len |
| embedding_size = 4 *64 |
| self.encoder_layer = TransformerEncoderLayer( |
| d_model=embedding_size, |
| nhead=16, |
| dim_feedforward=intermediate_size, |
| dropout=dropout_rate, |
| batch_first=True) |
| self.encoder = TransformerEncoder(encoder_layer=self.encoder_layer, |
| num_layers=num_layers) |
|
|
| self.position_encoding = torch.zeros((seq_len, embedding_size)) |
| for pos in range(seq_len): |
| for i in range(0, embedding_size, 2): |
| self.position_encoding[pos, i] = torch.sin( |
| torch.tensor(pos / (10000**((2 * i) / embedding_size)))) |
| self.position_encoding[pos, i + 1] = torch.cos( |
| torch.tensor(pos / (10000**((2 * |
| (i + 1)) / embedding_size)))) |
| |
| self.norm = nn.LayerNorm(embedding_size) |
| |
| self.label_num = len(id2labels) |
| self.dropout = nn.Dropout(dropout_rate) |
| self.classifier = nn.Sequential(nn.Linear(embedding_size, self.label_num)) |
| self.crf = ConditionalRandomField(num_tags=self.label_num, allowed_transitions=allowed_transitions(id2labels)) |
| self.crf.trans_m.data *= 0 |
|
|
| def conv_feat_extract(self, x): |
| out = self.conv(x) |
| out = out.transpose(1, 2) |
| return out |
|
|
| def forward(self, x, labels): |
| mask = labels.gt(-1) |
| padding_mask = ~mask |
|
|
| x = x.transpose(1, 2) |
| out1 = self.conv_feat_extract(x[:, 0:1, :]) |
| out2 = self.conv_feat_extract(x[:, 1:2, :]) |
| out3 = self.conv_feat_extract(x[:, 2:3, :]) |
| out4 = self.conv_feat_extract(x[:, 3:4, :]) |
| out = torch.cat((out1, out2, out3, out4), dim=2) |
| |
| outputs = out + self.position_encoding.to(out.device) |
| outputs = self.norm(outputs) |
| outputs = self.encoder(outputs, src_key_padding_mask=padding_mask) |
| dropout_outputs = self.dropout(outputs) |
| logits = self.classifier(dropout_outputs) |
| |
| if self.training: |
| loss_fct = CrossEntropyLoss(ignore_index=-1) |
| loss = loss_fct(logits.view(-1, self.label_num), labels.view(-1)) |
| output = {'loss': loss, 'logits': logits} |
| else: |
| paths, scores = self.crf.viterbi_decode(logits=logits, mask=mask) |
| paths[mask==0] = -1 |
| output = {'preds': paths, 'logits': logits} |
| pass |
|
|
| return output |
| |
|
|
| class TransformerOnlyClassifier(nn.Module): |
|
|
| def __init__(self, id2labels, seq_len, embedding_size=4, num_heads=2, intermediate_size=64, num_layers=2, dropout_rate=0.1): |
| super(TransformerOnlyClassifier, self).__init__() |
|
|
| self.encoder_layer = TransformerEncoderLayer( |
| d_model=embedding_size, |
| nhead=num_heads, |
| dim_feedforward=intermediate_size, |
| dropout=dropout_rate, |
| batch_first=True) |
| self.encoder = TransformerEncoder(encoder_layer=self.encoder_layer, |
| num_layers=num_layers) |
|
|
| self.position_encoding = torch.zeros((seq_len, embedding_size)) |
| for pos in range(seq_len): |
| for i in range(0, embedding_size, 2): |
| self.position_encoding[pos, i] = torch.sin( |
| torch.tensor(pos / (10000**((2 * i) / embedding_size)))) |
| self.position_encoding[pos, i + 1] = torch.cos( |
| torch.tensor(pos / (10000**((2 * |
| (i + 1)) / embedding_size)))) |
| |
| self.norm = nn.LayerNorm(embedding_size) |
| |
| self.label_num = len(id2labels) |
| self.dropout = nn.Dropout(dropout_rate) |
| self.classifier = nn.Sequential(nn.Linear(embedding_size, self.label_num)) |
| self.crf = ConditionalRandomField(num_tags=self.label_num, allowed_transitions=allowed_transitions(id2labels)) |
| self.crf.trans_m.data *= 0 |
| |
| def forward(self, inputs, labels): |
| mask = labels.gt(-1) |
| padding_mask = ~mask |
| |
| outputs = inputs + self.position_encoding.to(inputs.device) |
| outputs = self.norm(outputs) |
| outputs = self.encoder(outputs, src_key_padding_mask=padding_mask) |
| dropout_outputs = self.dropout(outputs) |
| logits = self.classifier(dropout_outputs) |
| |
| if self.training: |
| loss_fct = CrossEntropyLoss(ignore_index=-1) |
| loss = loss_fct(logits.view(-1, self.label_num), labels.view(-1)) |
| output = {'loss': loss, 'logits': logits} |
| else: |
| paths, scores = self.crf.viterbi_decode(logits=logits, mask=mask) |
| paths[mask==0] = -1 |
| output = {'preds': paths, 'logits': logits} |
| pass |
|
|
| return output |