国外海报设计网站团购网站切换城市js特效代码
 # 规范化技术,旨在替代传统的 Layer Normalization(LN)
 # 核心思想是对输入张量的每个样本的每个特征进行规范化,使其均值为 0,方差为 1
 class Qwen2RMSNorm(nn.Module):
     def __init__(self, hidden_size, eps=1e-6): # 隐藏层的大小
         super().__init__()
         # 一个可学习的权重参数,初始化为全 1 张量。
         self.weight = nn.Parameter(torch.ones(hidden_size))
         # 用于防止除零错误的小常数。
         self.variance_epsilon = eps
     def forward(self, hidden_states):
         # 记录输入张量的数据类型,以便最终转换回原始类型。
         input_dtype = hidden_states.dtype
         # 转换为 torch.float32 类型,以确保数值稳定性。
         hidden_states = hidden_states.to(torch.float32)
         # 计算每个样本的方差
         variance = hidden_states.pow(2).mean(-1, keepdim=True)
         # 计算每个样本的 RMS 值,并对每个样本进行规范化
         hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
         # 应用可学习的权重,其中 γγ 是一个可学习的参数,用于缩放规范化后的张量。
         return self.weight * hidden_states.to(input_dtype)
 # 用于生成旋转位置嵌入。这种嵌入方法在 Transformer 模型中用于捕捉序列中的位置信息,尤其适用于长序列任务。
 # 通过旋转的方式将位置信息编码到嵌入向量中。具体步骤如下:
 # 生成频率:通过指数函数生成一系列频率值。计算正弦和余弦:利用生成的频率计算正弦和余弦值
 # ,旋转嵌入:将输入向量按一定规则旋转,以嵌入位置信息。
 class Qwen2RotaryEmbedding(nn.Module):
     def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None):
         super().__init__()
         self.dim = dim
         # 最大位置嵌入的长度,默认为 2048,base:基数,默认为 10000。。
         self.max_position_embeddings = max_position_embeddings
         self.base = base
         # inv_freq:计算频率的逆值。
         # 位置列表先归一化(从绝对位置变成相对位置),之后取指数(1--接近10000),之后取倒数,位置从1--越来越小
         inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2, dtype=torch.int64).float().to(device) / self.dim))
         # register_buffer:将 inv_freq 注册为缓冲区,以便在模型保存和加载时保持不变。
         # register_buffer 方法用于注册一个非训练的缓冲区(buffer),这意味着它不会被梯度更新。当你使用 register_buffer 注册一个缓
         # 冲区时,它会被保存在模型的状态字典(state dict)中,并且在模型保存和加载时也会被序列化。
         # persistent=True:缓冲区会出现在模型的状态字典中,并且会被序列化和加载。
         # persistent=False:缓冲区不会出现在模型的状态字典中,但在实际保存和加载时,仍然会被序列化并加载。
         self.register_buffer("inv_freq", inv_freq, persistent=False)
         # Build here to make `torch.jit.trace` work.生成正弦和余弦缓存
         self._set_cos_sin_cache(
             seq_len=max_position_embeddings, device=self.inv_freq.device, dtype=torch.get_default_dtype()
         )
     def _set_cos_sin_cache(self, seq_len, device, dtype):
         self.max_seq_len_cached = seq_len
         # t 是一个包含位置索引的张量,形状为 (seq_len,)。
         t = torch.arange(self.max_seq_len_cached, device=device, dtype=torch.int64).type_as(self.inv_freq)
         # torch.outer:计算外积,得到一个形状为 (seq_len, dim/2) 的张量
         freqs = torch.outer(t, self.inv_freq) # 计算频率。
         # Different from paper, but it uses a different permutation in order to obtain the same calculation
         # 拼接频率。emb 的形状为 (seq_len, dim)。
         # 在旋转位置嵌入(RoPE)中,我们通常将嵌入向量分为两个部分,并分别应用正弦和余弦变换。具体来说:
         # 对于每个位置 tt,计算频率 ff,得到一个形状为 (seq_len, dim/2) 的张量。
         # 将频率张量拼接两次,得到一个形状为 (seq_len, dim) 的张量。
         # 这样做的原因是,我们将嵌入向量分为两部分,每部分对应一个频率值。
         emb = torch.cat((freqs, freqs), dim=-1)
         # cos_cached 和 sin_cached:注册正弦和余弦缓存。
         self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False)
         self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False)
     def forward(self, x, seq_len=None): # x:输入张量。
         # x: [bs, num_attention_heads, seq_len, head_size]
         # 如果 seq_len 大于已缓存的最大长度,则重新生成缓存。
         if seq_len > self.max_seq_len_cached:
             self._set_cos_sin_cache(seq_len=seq_len, device=x.device, dtype=x.dtype)
         return ( # 返回正弦和余弦缓存的切片。
             self.cos_cached[:seq_len].to(dtype=x.dtype),
             self.sin_cached[:seq_len].to(dtype=x.dtype),
         )
class Qwen2MLP(nn.Module):
     def __init__(self, config):
         super().__init__()
         self.hidden_size = config.hidden_size # d
         self.intermediate_size = config.intermediate_size # hd
         self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) # d-->hd
         self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)# d-->hd
         self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) # hd-->d
         self.act_fn = ACT2FN[config.hidden_act]
     def forward(self, hidden_state): # (h,s,d)
         # 门控信号生成:gate_proj(hidden_state) 生成门控信号
         # 特征调整:gate_output 与 up_output 相乘,将门控信号应用于特征表示。
         # 门控机制的作用:通过门控信号动态调整哪些特征应该通过哪些特征应该被抑制。
         # 激活函数的选择:如果 config.hidden_act 是 "sigmoid",那么激活函数将是 sigmoid
         return self.down_proj(self.act_fn(self.gate_proj(hidden_state)) * self.up_proj(hidden_state))
class Qwen2Attention(nn.Module):
     def __init__(self, config: Qwen2Config, layer_idx: Optional[int] = None):
         super().__init__() # 调用父类的初始化方法
         self.config = config # 配置类实例
         self.layer_idx = layer_idx # 层索引
         if layer_idx is None:
             logger.warning_once(
                 f"Instantiating {self.__class__.__name__} without passing `layer_idx` is not recommended and will "
                 "to errors during the forward call, if caching is used. Please make sure to provide a `layer_idx` "
                 "when creating this class."
             )
         
         self.hidden_size = config.hidden_size # d
         self.num_heads = config.num_attention_heads # q_h
         self.head_dim = self.hidden_size // self.num_heads # dk
         self.num_key_value_heads = config.num_key_value_heads # kv_h
         self.num_key_value_groups = self.num_heads // self.num_key_value_heads # 比例
         self.max_position_embeddings = config.max_position_embeddings # p
         self.rope_theta = config.rope_theta # base
         self.is_causal = True # 是否用因果掩码
         self.attention_dropout = config.attention_dropout # dropout
         # 嵌入维度必须能被整除
         if (self.head_dim * self.num_heads) != self.hidden_size:
             raise ValueError(
                 f"hidden_size must be divisible by num_heads (got `hidden_size`: {self.hidden_size}"
                 f" and `num_heads`: {self.num_heads})."
             )
         # 线性投影
         self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=True)
         #需要注意的是这里的投影维度可能和q的投影维度不同
         self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=True)
         self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=True)
         # 最后一个线性转换层
         self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)
         # 旋转位置嵌入层
         self.rotary_emb = Qwen2RotaryEmbedding(
             self.head_dim, # dk
             max_position_embeddings=self.max_position_embeddings,# max_position
             base=self.rope_theta, # base
         )
     def forward(
         self,
         hidden_states: torch.Tensor,
         attention_mask: Optional[torch.Tensor] = None,# 可选
         position_ids: Optional[torch.LongTensor] = None,# 可选
         past_key_value: Optional[Cache] = None, # 可选参数:缓存
         output_attentions: bool = False,# 是否输出注意力权重
         use_cache: bool = False, # 是否使用缓存
         cache_position: Optional[torch.LongTensor] = None, # 缓存位置
     ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
         bsz, q_len, _ = hidden_states.size() # b,s,d
         # 投影
         query_states = self.q_proj(hidden_states)
         key_states = self.k_proj(hidden_states)
         value_states = self.v_proj(hidden_states)
         # (b,q_len,q_h,dk)-->(b,q_h,q_len,dk),transpose:换轴(转置)
         query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
         # (b,k_h,k_len,dk)
         key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
         value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
         kv_seq_len = key_states.shape[-2] # k_len
         # 缓存上个时间步的key,value表示
         if past_key_value is not None: # 如果设置了缓存
             if self.layer_idx is None: # 就必须有layer_idx,不然报错
                 raise ValueError(
                     f"The cache structure has changed since version v4.36. If you are using {self.__class__.__name__} "
                     "for auto-regressive decoding with k/v caching, please make sure to initialize the attention class "
                     "with a layer index."
                 )
             kv_seq_len += past_key_value.get_usable_length(kv_seq_len, self.layer_idx)
         # 旋转位置嵌入,传kv_len
         # 键/值序列长度:kv_seq_len 是键和值向量的长度,这是因为键和值向量代表的是相同的序列。
         # 查询序列长度:q_len 是查询向量的长度,这可能不同于键/值向量的长度。
         # 旋转位置嵌入:在计算旋转位置嵌入时,使用键/值序列长度是为了确保位置信息与键和值向量一致。
         cos, sin = self.rotary_emb(value_states, seq_len=kv_seq_len)
         # 返回带位置信息的嵌入表示
         query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids)
         # 如果past_key_value is not None
         if past_key_value is not None:
             cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position}  # Specific to RoPE models
             # 更新
             key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs)
         # repeat k/v heads if n_kv_heads < n_heads
         # 如果键值头数量少于查询头数量,则重复键值头以匹配查询头数量。
         key_states = repeat_kv(key_states, self.num_key_value_groups)
         value_states = repeat_kv(value_states, self.num_key_value_groups)
         # (b,q_h,q_len,dk)@(b,k_h,dk,k_len)-->(b,h,q_len,k_len)
         attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(self.head_dim)
         if attn_weights.size() != (bsz, self.num_heads, q_len, kv_seq_len):
             raise ValueError(
                 f"Attention weights should be of size {(bsz, self.num_heads, q_len, kv_seq_len)}, but is"
                 f" {attn_weights.size()}"
             )
         # 切片,在最后一个维度切出q_len的长度
         if attention_mask is not None:  # no matter the length, we just slice it
             causal_mask = attention_mask[:, :, :, : key_states.shape[-2]]
             # 相加,一般遮挡的地方是很大的负数
             attn_weights = attn_weights + causal_mask
         # upcast attention to fp32
         # 在q_len上归一化,得到query序列中每个token对应key中token的一系列权重,这些权重中较大的值表示和当前query中的token
         # 相似度较近,较小的表示离当前query中token较远
         attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype)
         # dropout
         attn_weights = nn.functional.dropout(attn_weights, p=self.attention_dropout, training=self.training)
         # (b,h,q_len,k_len)@(b,h,v_len,dk)-->(b,h,q_len,dk)
         attn_output = torch.matmul(attn_weights, value_states)
         if attn_output.size() != (bsz, self.num_heads, q_len, self.head_dim):
             raise ValueError(
                 f"`attn_output` should be of size {(bsz, self.num_heads, q_len, self.head_dim)}, but is"
                 f" {attn_output.size()}"
             )
         # (b,h,q_len,dk)-->(b,h,q_len,h,dk),之后.contiguous()转为内存连续存储
         attn_output = attn_output.transpose(1, 2).contiguous()
         # (b,h,q_len,h,dk)-->(b,h,d)
         attn_output = attn_output.reshape(bsz, q_len, self.hidden_size)
         # 最后经过线性转换
         attn_output = self.o_proj(attn_output)
         # 不输出注意力权重
         if not output_attentions:
             attn_weights = None
         # 返回多头注意力的输出,注意力权重,上个时间步的key_value的缓存
         return attn_output, attn_weights, past_key_value
