MultiheadAttention

Init

1
2
3
4
5
6
7
8
9
10
11
torch.nn.MultiheadAttention(embed_dim, 
num_heads,
dropout=0.0,
bias=True,
add_bias_kv=False,
add_zero_attn=False,
kdim=None,
vdim=None,
batch_first=False,
device=None,
dtype=None)
  • embed_dim (int): 输入特征的维度。
  • num_heads (int): 多头注意力的头数。
  • dropout (float, optional): 可选的 dropout 层,用于防止过拟合。默认值为 0.0
  • bias (bool, optional): 是否使用偏置。默认值为 True
  • add_bias_kv (bool, optional): 是否添加键值偏置。默认值为 False
  • add_zero_attn (bool, optional): 是否添加零注意力。默认值为 False
  • kdim (int, optional): 键的维度,如果为 None,则使用 embed_dim
  • vdim (int, optional): 值的维度,如果为 None,则使用 embed_dim

Forward

1
2
3
4
5
6
7
forward(query, 
key,
value,
key_padding_mask=None,
need_weights=True,
attn_mask=None,
average_attn_weights=True)
  • query (Tensor): (seq_len, batch_size, embed_dim)
  • key (Tensor): (seq_len, batch_size, embed_dim)`。
  • value (Tensor): (seq_len, batch_size, embed_dim)
  • key_padding_mask (ByteTensor, optional): 用于屏蔽某些位置的填充标记。 (batch_size, seq_len)
  • need_weights (bool, optional): 是否返回注意力权重。默认值为 True
  • attn_mask (Tensor, optional): 用于屏蔽某些位置的注意力掩码。 (seq_len, seq_len)(batch_size, seq_len, seq_len)

Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import torch
import torch.nn.functional as F
import torch.nn as nn
import math

class MultiheadAttention(nn.Module):
def __init__(self, embed_dim, num_heads, dropout=0.0):
super(MultiheadAttention, self).__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads

self.q_linear = nn.Linear(embed_dim, embed_dim)
self.k_linear = nn.Linear(embed_dim, embed_dim)
self.v_linear = nn.Linear(embed_dim, embed_dim)

self.out_linear = nn.Linear(embed_dim, embed_dim)
self.dropout = nn.Dropout(p=dropout)

def forward(self, query, key, value, mask=None):
# 线性变换
query = self.q_linear(query)
key = self.k_linear(key)
value = self.v_linear(value)

# 将每个头的张量分割
query = query.view(query.size(0), -1, self.num_heads, self.head_dim).transpose(1, 2)
key = key.view(key.size(0), -1, self.num_heads, self.head_dim).transpose(1, 2)
value = value.view(value.size(0), -1, self.num_heads, self.head_dim).transpose(1, 2)

# 缩放点积注意力
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.head_dim)
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))

attention_weights = F.softmax(scores, dim=-1)
attention_weights = self.dropout(attention_weights)

# 注意力权重与值相乘并合并
output = torch.matmul(attention_weights, value)
output = output.transpose(1, 2).contiguous().view(output.size(0), -1, self.embed_dim)
output = self.out_linear(output)

return output, attention_weights
  • embed_dim 是输入张量的嵌入维度,同时也是输出张量的嵌入维度
  • num_heads 参数表示要使用的注意力头的数量
  • 在计算过程中,embed_dim 被平均分配到每个注意力头上,因此每个头的维度是 embed_dim // num_heads
  • att_weight [batch_size,seq_len,seq_len]
1
2
3
4
5
6
7
8
9
x = torch.randn((5, 10, 256))
#调用nn.MultiheadAttention
multihead_attention = nn.MultiheadAttention(embed_dim=256, num_heads=4,batch_first=True)
output, att_weight = multihead_attention(x, x, x)
print(output.shape)
print(att_weight.shape)
>>>
torch.Size([5, 10, 256])
torch.Size([5, 10, 10])

EncoderLayer

Init

1
2
3
4
5
6
7
8
9
torch.nn.TransformerEncoderLayer(d_model, 
nhead,
dim_feedforward=2048,
dropout=0.1,
activation=<function relu>,
layer_norm_eps=1e-05,
batch_first=False,
norm_first=False,
bias=True)
  • d_model
    • 模型中输入和输出的特征维度
    • 整数
  • nhead
    • 多头自注意力机制中head的数量
    • 整数
  • dim_feedforward
    • 每个位置的前馈网络中隐藏层的维度
    • 整数,默认2048
  • dropout
    • 模型中各层的丢弃概率
    • 浮点数,默认 0.1
  • activation
    • 前馈网络中使用的激活函数。
    • 激活函数,默认 ReLU。
  • layer_norm_eps
    • 层归一化中的 epsilon 值。
    • 浮点数,默认值 1e-05。
  • batch_first
    • 输入和输出是否以批次为第一维度(True 表示是,False 表示不是)。
    • 布尔值,默认值 False。
  • norm_first
    • 是否在自注意力和前馈网络之前应用层归一化。如果为 True,则在自注意力和前馈网络之前应用层归一化;如果为 False,则在自注意力和前馈网络之后应用层归一化。
    • 布尔值,默认值 False。

Forward

1
2
3
4
forward(src, 
src_mask=None,
src_key_padding_mask=None,
is_causal=False)
  • src
    • 输入序列, (seq_len, batch, embed_dim) or (batch, seq_len, embed_dim),具体看 batch_first 参数
    • torch.Tensor。
  • src_mask:
    • 用于屏蔽(mask)输入序列中的特定位置。如果某个位置的值为 True,表示该位置需要被屏蔽(不参与计算)。形状为 (seq_len, seq_len) 的布尔型张量或者 None
    • torch.Tensor 或 None
  • src_key_padding_mask
    • 用于屏蔽输入序列中的特定位置,与 src_mask 不同的是,该屏蔽是基于输入序列的 key padding mask。形状为 (batch, seq_len) 的布尔型张量或者 None
    • torch.Tensor 或 None
  • is_causal
    • 一个布尔值,表示是否采用因果(causal)注意力机制。如果设置为 True,表示模型只能关注到当前位置之前的位置。这在处理时间序列等因果关系时很有用
    • 布尔值

code

  • 调用nn.MyTransformerEncoderLayer
1
2
3
4
5
6
7
# (sequence_length, batch_size, features)
x = torch.randn((5, 10, 128))
encoder_layer = torch.nn.TransformerEncoderLayer(d_model=128, nhead=4)
output = encoder_layer(x)
output
>>>
torch.Size([5, 10, 128])
  • 如需提取注意力权重
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MyTransformerEncoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation="relu"):
super(MyTransformerEncoderLayer, self).__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.activation = nn.ReLU() if activation == "relu" else nn.GELU()

def forward(self, src, src_mask=None, src_key_padding_mask=None):
src2, att_weight = self.self_attn(src, src, src, attn_mask=src_mask,
key_padding_mask=src_key_padding_mask)
src = src + self.dropout1(src2)
src = self.norm1(src)

src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
src = src + self.dropout2(src2)
src = self.norm2(src)

return src,att_weight
1
2
3
4
5
6
7
8
9
# (sequence_length, batch_size, features)
x = torch.randn((5, 10, 128))
encoder_layer = MyTransformerEncoderLayer(d_model=128, nhead=4)
output,att_weight= encoder_layer(x)
print(output.shape)
print(att_weight.shape)
>>>
torch.Size([5, 10, 128])
torch.Size([5, 10, 10])

DecoderLayer

Init

1
2
3
4
5
6
7
8
9
10
11
torch.nn.TransformerDecoderLayer(d_model, 
nhead,
dim_feedforward=2048,
dropout=0.1,
activation=<function relu>,
layer_norm_eps=1e-05,
batch_first=False,
norm_first=False,
bias=True,
device=None,
dtype=None)
  • d_model
    • 模型中输入和输出的特征维度
    • 整数
  • nhead
    • 多头自注意力机制中 head的数量
    • 整数
  • dim_feedforward
    • 每个位置的前馈网络中隐藏层的维度
    • 整数,默认值 2048
  • dropout
    • 用于模型中各层的丢弃概率
    • 浮点数,默认值 0.1
  • activation
    • 前馈网络中使用的激活函数
    • 激活函数,默认为 ReLU
  • layer_norm_eps
    • 层归一化中的 epsilon 值
    • 浮点数,默认值为 1e-05
  • batch_first
    • 输入和输出是否以批次为第一维度(True 表示是,False 表示不是)
    • 布尔值,默认值为 False
  • norm_first
    • 是否在自注意力和前馈网络之前应用层归一化。如果为 True,则在自注意力和前馈网络之前应用层归一化;如果为 False,则在自注意力和前馈网络之后应用层归一化
    • 布尔值,默认值为 False
  • bias
    • 是否在自注意力和前馈网络的线性层中使用偏置
    • 布尔值,默认值为 True
  • device
    • 模型所在的设备(如 ‘cuda’ 或 ‘cpu’)
    • 字符串,默认值为 None
  • dtype
    • 模型的数据类型
    • torch.dtype,默认值为 None
1
2
3
4
5
6
7
8
forward(tgt, 
memory,
tgt_mask=None,
memory_mask=None,
tgt_key_padding_mask=None,
memory_key_padding_mask=None,
tgt_is_causal=False,
memory_is_causal=False)
  • tgt
    • 目标序列
    • torch.Tensor,形状为 (tgt_len, batch_size, d_model)
  • memory
    • 编码器的输出(记忆)
    • torch.Tensor,形状为 (src_len, batch_size, d_model)
  • tgt_mask
    • 对目标序列的掩码,用于确保解码器在生成每个位置的输出时只依赖于先前生成的位置。如果为 None,则不应用任何掩码。
    • torch.Tensor,形状为 (tgt_len, tgt_len)
  • memory_mask
    • 对编码器输出的掩码,用于确保解码器在生成每个位置的输出时只依赖于先前生成的位置。如果为 None,则不应用任何掩码。
    • torch.Tensor,形状为 (tgt_len, src_len)
  • tgt_key_padding_mask
    • 对目标序列的填充掩码,用于屏蔽目标序列中的填充位置。如果为 None,则不应用任何填充掩码
    • torch.Tensor,形状为 (batch_size, tgt_len)
  • memory_key_padding_mask
    • 对编码器输出的填充掩码,用于屏蔽编码器输出中的填充位置。如果为 None,则不应用任何填充掩码
    • torch.Tensor,形状为 (batch_size, src_len)
  • tgt_is_causal
    • 指示目标序列是否是自回归的。如果为 True,则表示目标序列是自回归的,可以在解码器中使用;如果为 False,则表示目标序列已经包含了先前生成的信息
    • 布尔值,默认为 False
  • memory_is_causal
    • 指示编码器输出是否是自回归的。如果为 True,则表示编码器输出是自回归的,可以在解码器中使用;如果为 False,则表示编码器输出是直接的上下文信息
    • 布尔值,默认为 False

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import torch
import torch.nn as nn

class TransformerDecoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation="relu"):
super(TransformerDecoderLayer, self).__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.dropout3 = nn.Dropout(dropout)
self.activation = nn.ReLU() if activation == "relu" else nn.GELU()

def forward(self, tgt, memory, tgt_mask=None, memory_mask=None, tgt_key_padding_mask=None, memory_key_padding_mask=None):
tgt2 = self.self_attn(tgt, tgt, tgt, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask)[0]
tgt = tgt + self.dropout1(tgt2)
tgt = self.norm1(tgt)

tgt2 = self.multihead_attn(tgt, memory, memory, attn_mask=memory_mask, key_padding_mask=memory_key_padding_mask)[0]
tgt = tgt + self.dropout2(tgt2)
tgt = self.norm2(tgt)

tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
tgt = tgt + self.dropout3(tgt2)
tgt = self.norm3(tgt)

return tgt
1
2
3
4
5
6
7
8
# 直接调用pytorch库
memory = torch.randn((5, 2, 128))
tgt = torch.randn((5, 2, 128))
encoder_layer = torch.nn.TransformerDecoderLayer(d_model=128, nhead=4)
output2 = encoder_layer(tgt, memory)
output2.shape
>>>
torch.Size([5, 2, 128])

Demo1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import torch
from torch import nn
import numpy as np
import math

class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation="relu"):
super(TransformerEncoderLayer, self).__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.activation = nn.ReLU() if activation == "relu" else nn.GELU()

def forward(self, src, src_mask=None, src_key_padding_mask=None):
src2, att_weight = self.self_attn(src, src, src, attn_mask=src_mask,
key_padding_mask=src_key_padding_mask)
src = src + self.dropout1(src2)
src = self.norm1(src)

src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
src = src + self.dropout2(src2)
src = self.norm2(src)

return src,att_weight

class PositionalEncoding(nn.Module):

def __init__(self, d_model, dropout=0.1, max_len=2000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)

pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)

def forward(self, x):
x = x + self.pe[:x.size(0), :]
return self.dropout(x)

class MyNet(nn.Module):
def __init__(self,vocab_size,d_model,nlayers,nhead,dropout,dim_feedforward):
super(MyNet, self).__init__()

self.embeding = nn.Embedding(vocab_size,d_model)
self.pos_encoder = PositionalEncoding(d_model)

self.transformer_encoder = []
for i in range(nlayers):
self.transformer_encoder.append(TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout))
self.transformer_encoder= nn.ModuleList(self.transformer_encoder)

#self.decoder = nn.Linear(,dim_feedforward)


def forward(self,x):
# (batch_size, sequence_length , features)
x = self.embeding(x)
x = self.pos_encoder(x)
# (sequence_length, batch_size, features)
x = x.permute(1,0,2)

attention_weights = []
for layer in self.transformer_encoder:
x,attention_weights_layer=layer(x)
attention_weights.append(attention_weights_layer)
attention_weights=torch.stack(attention_weights)

# (sequence_length, batch_size, features)->(batch_size, sequence_length, features)
# [10, 5, 128]
x = x.permute(1,0,2)
# (nlayers,batch_size,sequence_length,sequence_length)->(batch_size, nlayers,sequence_length,sequence_length)
# [4, 5, 10, 10]
attention_weights = attention_weights.permute(1,0,2,3)


return x,attention_weights
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
vocab_size=20
d_model=128
nlayers=3
nhead=4
dropout=0.2
dim_feedforward=1024

net = MyNet(vocab_size,d_model,nlayers,nhead,dropout,dim_feedforward)

x = torch.rand((5, 10))*10
x = x.long()
out,att_weight = net(x)
out.shape
att_weight.shape
>>>
torch.Size([5, 10, 128])
torch.Size([5, 3, 10, 10])

Demo2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class FeedForward(nn.Module):
def __init__(self, d_model, d_ff=2048, dropout = 0.1):
super().__init__()

# set d_ff as a default to 2048
self.linear_1 = nn.Linear(d_model, d_ff)
self.dropout = nn.Dropout(dropout)
self.linear_2 = nn.Linear(d_ff, d_model)

def forward(self, x):
x = self.dropout(F.relu(self.linear_1(x)))
x = self.linear_2(x)
return x

class EncoderLayer(nn.Module):
def __init__(self, d_model, heads, dropout=0.1):
super().__init__()
self.norm_1 = nn.LayerNorm(d_model)
self.norm_2 = nn.LayerNorm(d_model)
self.attn = nn.MultiheadAttention(d_model, heads, dropout=dropout)
self.ff = FeedForward(d_model, dropout=dropout)
self.dropout_1 = nn.Dropout(dropout)
self.dropout_2 = nn.Dropout(dropout)

def forward(self, x, mask):
x2, att_weight= self.attn(x,x,x,mask)
x = x + self.dropout_1(x2)
x = self.norm_1(x)
x2 = self.ff(x)
x = x+self.dropout_2(x2)
x = self.norm_2(x)
return x,att_weight

class PositionalEncoder(nn.Module):
def __init__(self, d_model, max_seq_len = 200, dropout = 0.1):
super().__init__()
self.d_model = d_model
self.dropout = nn.Dropout(dropout)
# create constant 'pe' matrix with values dependant on
# pos and i
pe = torch.zeros(max_seq_len, d_model)
for pos in range(max_seq_len):
for i in range(0, d_model, 2):
pe[pos, i] = math.sin(pos / (10000 ** ((2 * i)/d_model)))
pe[pos, i + 1] = math.cos(pos / (10000 ** ((2 * (i + 1))/d_model)))
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)


def forward(self, x):
# make embeddings relatively larger
x = x * math.sqrt(self.d_model)
#add constant to embedding
seq_len = x.size(1)
pe = Variable(self.pe[:,:seq_len], requires_grad=False)
if x.is_cuda:
pe.cuda()
x = x + pe
return self.dropout(x)

class K_mer_aggregate(nn.Module):
def __init__(self,kmers,in_dim,out_dim,dropout=0.1):
'''
x: (batch_size, sequence_length, features)
return: (batch_size, sequence_length, features)
'''
super(K_mer_aggregate, self).__init__()
self.dropout=nn.Dropout(dropout)
self.convs=[]
for i in kmers:
print(i)
# sequence_length -> sequence_length-i+1
self.convs.append(nn.Conv1d(in_dim,out_dim,i,padding=0))
self.convs=nn.ModuleList(self.convs)
self.activation=nn.ReLU(inplace=True)
self.norm=nn.LayerNorm(out_dim)

def forward(self,x):
# 卷积是在最后一个维度上做的
# (batch_size, sequence_length, features)->(batch_size, features, sequence_length)
x = x.permute(0,2,1)
outputs=[]
for conv in self.convs:
outputs.append(conv(x))
outputs=torch.cat(outputs,dim=2)
outputs=self.norm(outputs.permute(0,2,1))
return outputs

class LinearDecoder(nn.Module):
def __init__(self,d_model,n_class,dropout):
super(LinearDecoder, self).__init__()
self.global_avg_pool = nn.AdaptiveAvgPool1d(1)
self.d_ff = n_class * 8
self.linear_1 = nn.Linear(d_model, self.d_ff)
self.relu = nn.ReLU(inplace=True)
self.dropout = nn.Dropout(dropout)
self.classifier = nn.Linear(self.d_ff, n_class)
def forward(self,x):
x = x.permute(0,2,1)
x = self.global_avg_pool(x).squeeze(2)
x = self.dropout(self.relu(self.linear_1(x)))
x = self.classifier(x)
return x

def get_clones(module, n_layers):
return nn.ModuleList([copy.deepcopy(module) for i in range(n_layers)])

class Encoder(nn.Module):
def __init__(self, vocab_size, d_model, n_layers, heads, n_class, kmers, dropout):
super().__init__()
self.n_layers = n_layers
self.embed = nn.Embedding(vocab_size, d_model)
self.pe = PositionalEncoder(d_model, dropout=dropout)
self.kmer_aggregation = K_mer_aggregate(kmers,d_model,d_model)
self.layers = get_clones(EncoderLayer(d_model, heads, dropout), n_layers)
self.norm = nn.LayerNorm(d_model)
self.decoder = LinearDecoder(d_model,n_class,dropout)

def forward(self, src, mask=None):
# (batch_size, sequence_length , features)
x = self.embed(src)
x = self.pe(x)

# sequence_length - kmer + 1
x = self.kmer_aggregation(x)

# (batch_size, sequence_length , features)->(sequence_length, batch_size, features)
x = x.permute(1,0,2)
attention_weights = []
for i in range(self.N):
x, attention_weights_layer = self.layers[i](x, mask)
attention_weights.append(attention_weights_layer)

# (nlayers,batch_size,sequence_length,sequence_length)->(batch_size, nlayers,sequence_length,sequence_length)
attention_weights=torch.stack(attention_weights).permute(1,0,2,3)

# (sequence_length, batch_size, features)->(batch_size, sequence_length , features)
x = self.norm(x).permute(1,0,2)
x = self.decoder(x)


return x,attention_weights
1
2
3
4
5
6
7
8
9
10
net = Encoder(40, 128, 2, 4, 2, [3,4], 0.1)
x = torch.rand((5, 10)).long()
out,att = net(x)
out.shape
>>>
torch.Size([5, 2])

att.shape
>>>
torch.Size([5, 2, 15, 15])

Ref

torch.nn — PyTorch 2.1 documentation