pytorch根据特征图训练LSTM Stacked AutoEncoder
  TEZNKK3IfmPf 2023年11月14日 54 0

文章目录

  • 步骤一:构造训练数据
  • 步骤二:构造LSTM模型
  • 构造三:构建训练数据
    • 构造建模三件套
  • 步骤四:训练模型并保存
  • 全部代码

 

步骤一:构造训练数据

def get_train_data(cluster_shape=(2000, 50)):
    """得到训练数据,这里使用随机数生成训练数据,由此导致最终结果并不好"""

    def get_tensor_from_pd(dataframe_series) -> torch.Tensor:
        return torch.tensor(data=dataframe_series.values)

    # 生成训练数据x并做归一化后,构造成dataframe格式,再转换为tensor格式
    df = pd.DataFrame(data=preprocessing.MinMaxScaler().fit_transform(np.random.randint(0, 10, size=cluster_shape)))
    y = pd.Series(np.random.randint(0, 10, cluster_shape[0]))
    return get_tensor_from_pd(df).float(), get_tensor_from_pd(y).float()

步骤二:构造LSTM模型

class LstmStackedAutoEncoder(nn.Module):

    def __init__(self, embedding_size, feature_length, batch_size, hidden_layer_size=500, encode_size=200):
        super().__init__()
        self.embedding_size = embedding_size  # number of hidden states
        self.feature_length = feature_length
        self.hidden_layer_size = hidden_layer_size  # 自定义
        self.n_layers = 1
        self.batch_size = batch_size

        self.lstm_en = nn.LSTM(embedding_size, hidden_layer_size, batch_first=True)
        self.linear_en = nn.Linear(self.hidden_layer_size * self.feature_length, encode_size)

        self.linear_de = nn.Linear(encode_size, self.hidden_layer_size * self.feature_length)
        self.lstm_de = nn.LSTM(hidden_layer_size, embedding_size, batch_first=True)

        self.sigmoid = nn.Sigmoid()
        self.relu = nn.ReLU()

    def forward(self, input_x):
        # encoder
        en_lstm, (n, c) = self.lstm_en(input_x,  # input_x:[30,1480,25]
                                       (torch.zeros(self.n_layers, self.batch_size, self.hidden_layer_size),
                                        torch.zeros(self.n_layers, self.batch_size, self.hidden_layer_size)))
        en_lstm = en_lstm.contiguous().view(batch_size, -1)
        en_linear = self.linear_en(en_lstm)  # [30,1480,200]
        en_out = self.relu(en_linear)
        # decoder
        de_linear = self.linear_de(en_out)
        de_sigmoid = self.sigmoid(de_linear)  # [30,740000]
        # shape: (n_layers, batch, hidden_size)
        de_sigmoid = de_sigmoid.view([self.batch_size, self.feature_length, self.hidden_layer_size])  # [30,1480,500]
        de_out, (n, c) = self.lstm_de(de_sigmoid,
                                      # 隐层的最后一个维度的与输出的维度相同
                                      (torch.zeros(self.n_layers, self.batch_size, self.embedding_size),
                                       torch.zeros(self.n_layers, self.batch_size, self.embedding_size)))
        return de_out

构造三:构建训练数据

    logging.basicConfig(format='%(asctime)s - [line:%(lineno)d] - %(levelname)s: %(message)s',
                        level=logging.DEBUG)
    x, y = get_train_data()
    epochs = 2
    batch_size = 30
    packet_code = 10
    packet_length = 5
    train_loader = Data.DataLoader(
        dataset=Data.TensorDataset(x, y),  # 封装进Data.TensorDataset()类的数据,可以为任意维度
        batch_size=batch_size,  # 每块的大小
        shuffle=True,  # 要不要打乱数据 (打乱比较好)
        num_workers=6,  # 多进程(multiprocess)来读数据
        drop_last=True
    )

构造建模三件套

    # 建模三件套:loss,优化,epochs
    model = LstmStackedAutoEncoder(packet_length, packet_code, batch_size)  # lstm
    loss_function = nn.MSELoss()  # loss
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # 优化器

步骤四:训练模型并保存

    # 开始训练
    logging.debug("begin train")
    model.train()
    for ep in range(epochs):
        i = 0
        for seq, labels in train_loader:
            optimizer.zero_grad()
            input_seq = seq.view([batch_size, packet_code, packet_length])
            y_pred = model(input_seq).squeeze()  # 压缩维度:得到输出,并将维度为1的去除
            single_loss = loss_function(y_pred, input_seq)
            single_loss.backward()  # 进入到这一行,报错
            optimizer.step()
            logging.debug("TRAIN Finish" + str(i) + " LOSS " + str(single_loss))
            i += 1

保存模型:

    logging.debug("train finish")
    torch.save(model, 'stacked_auto_encoder.pt')
    logging.debug("save finish")

全部代码

import torch
import torch.nn as nn
import torch.utils.data as Data
import os
import logging
import numpy as np
import pandas as pd
from sklearn import preprocessing


def get_train_data(cluster_shape=(2000, 50)):
    """得到训练数据,这里使用随机数生成训练数据,由此导致最终结果并不好"""

    def get_tensor_from_pd(dataframe_series) -> torch.Tensor:
        return torch.tensor(data=dataframe_series.values)

    # 生成训练数据x并做归一化后,构造成dataframe格式,再转换为tensor格式
    df = pd.DataFrame(data=preprocessing.MinMaxScaler().fit_transform(np.random.randint(0, 10, size=cluster_shape)))
    y = pd.Series(np.random.randint(0, 10, cluster_shape[0]))
    return get_tensor_from_pd(df).float(), get_tensor_from_pd(y).float()


class LstmStackedAutoEncoder(nn.Module):

    def __init__(self, embedding_size, feature_length, batch_size, hidden_layer_size=500, encode_size=200):
        super().__init__()
        self.embedding_size = embedding_size  # number of hidden states
        self.feature_length = feature_length
        self.hidden_layer_size = hidden_layer_size  # 自定义
        self.n_layers = 1
        self.batch_size = batch_size

        self.lstm_en = nn.LSTM(embedding_size, hidden_layer_size, batch_first=True)
        self.linear_en = nn.Linear(self.hidden_layer_size * self.feature_length, encode_size)

        self.linear_de = nn.Linear(encode_size, self.hidden_layer_size * self.feature_length)
        self.lstm_de = nn.LSTM(hidden_layer_size, embedding_size, batch_first=True)

        self.sigmoid = nn.Sigmoid()
        self.relu = nn.ReLU()

    def forward(self, input_x):
        # encoder
        en_lstm, (n, c) = self.lstm_en(input_x,  # input_x:[30,1480,25]
                                       (torch.zeros(self.n_layers, self.batch_size, self.hidden_layer_size),
                                        torch.zeros(self.n_layers, self.batch_size, self.hidden_layer_size)))
        en_lstm = en_lstm.contiguous().view(batch_size, -1)
        en_linear = self.linear_en(en_lstm)  # [30,1480,200]
        en_out = self.relu(en_linear)
        # decoder
        de_linear = self.linear_de(en_out)
        de_sigmoid = self.sigmoid(de_linear)  # [30,740000]
        # shape: (n_layers, batch, hidden_size)
        de_sigmoid = de_sigmoid.view([self.batch_size, self.feature_length, self.hidden_layer_size])  # [30,1480,500]
        de_out, (n, c) = self.lstm_de(de_sigmoid,
                                      # 隐层的最后一个维度的与输出的维度相同
                                      (torch.zeros(self.n_layers, self.batch_size, self.embedding_size),
                                       torch.zeros(self.n_layers, self.batch_size, self.embedding_size)))
        return de_out


if __name__ == '__main__':
    # 通过原始的训练数据,训练 Stacked Lstm 并将其保存在本地
    logging.basicConfig(format='%(asctime)s - [line:%(lineno)d] - %(levelname)s: %(message)s',
                        level=logging.DEBUG)
    x, y = get_train_data()
    epochs = 2
    batch_size = 30
    packet_code = 10
    packet_length = 5
    train_loader = Data.DataLoader(
        dataset=Data.TensorDataset(x, y),  # 封装进Data.TensorDataset()类的数据,可以为任意维度
        batch_size=batch_size,  # 每块的大小
        shuffle=True,  # 要不要打乱数据 (打乱比较好)
        num_workers=6,  # 多进程(multiprocess)来读数据
        drop_last=True
    )
    # 建模三件套:loss,优化,epochs
    model = LstmStackedAutoEncoder(packet_length, packet_code, batch_size)  # lstm
    loss_function = nn.MSELoss()  # loss
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # 优化器
    # 开始训练
    logging.debug("begin train")
    model.train()
    for ep in range(epochs):
        i = 0
        for seq, labels in train_loader:
            optimizer.zero_grad()
            input_seq = seq.view([batch_size, packet_code, packet_length])
            y_pred = model(input_seq).squeeze()  # 压缩维度:得到输出,并将维度为1的去除
            single_loss = loss_function(y_pred, input_seq)
            single_loss.backward()  # 进入到这一行,报错
            optimizer.step()
            logging.debug("TRAIN Finish" + str(i) + " LOSS " + str(single_loss))
            i += 1
    logging.debug("train finish")
    torch.save(model, 'stacked_auto_encoder.pt')
    logging.debug("save finish")
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月14日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2024年03月29日   60   0   0 pytorch
  I7JaHrFMuDsU   2024年04月26日   32   0   0 pytorch
  TEZNKK3IfmPf   2023年11月14日   34   0   0 pytorch
  TEZNKK3IfmPf   2023年11月15日   28   0   0 pytorch
TEZNKK3IfmPf