3層ニューラルネットワーク

パターン認識とか機械学習の本を開くと、ニューラルネットワークの章の最初に出てくる「3層ニューラルネットワーク」です。

この回では、3層ニューラルネットワークを実際に作成しながら、ニューラルネットワークの基本を学んでいきます。

参考になるサイト

準備

ここでは、ダミーデータとして以下のようなデータを1000件作成した。

  • データの特徴量(要素)が10個
  • 1つの特徴量は0以上1未満の値
  • 10個の特徴量の合計値が5以上ならクラス1、それ以外ならクラス0

このデータセットにおいて、10個の特徴量を入力としてニューラルネットワークに与え、ニューラルネットワークは入力されたデータがクラス1なのか、クラス0なのかを判定するようなクラス分類の学習を実施します。

まずは、以下のコードをコピーしてPythonファイルを作成してください。

import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from collections import Counter
from sklearn.metrics import classification_report

def generate_data(seed=42):
    '''
    10個の特徴量を持つダミーデータXを作成し、
    特徴量の合計値が5以上ならクラス1、
    そうでなければクラス0とする。
    '''
    np.random.seed(seed) # 乱数のシード値を設定する
    X = np.random.rand(1000, 10) # ランダムな数値データを生成
    y = (np.sum(X, axis=1) >= 5.0).astype(int)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=seed) # データを訓練データとテストデータに分割
    
    # PyTorchのテンソルに変換して返す
    return (torch.tensor(X_train, dtype=torch.float32),
            torch.tensor(y_train, dtype=torch.long),
            torch.tensor(X_test, dtype=torch.float32),
            torch.tensor(y_test, dtype=torch.long))


def print_class_distribution(y_train, y_test):
    def print_distribution(label, counts, total):
        print(f"{label} class distribution:")
        for key, count in counts.items():
            print(f"  Class {key}: {count} samples ({count / total:.2%})")
    
    print_distribution("Training set", Counter(y_train.numpy()), len(y_train))
    print_distribution("Test set", Counter(y_test.numpy()), len(y_test))

def get_dataloader(X_train, y_train, batch_size=32):
    train_data = TensorDataset(X_train, y_train)
    return DataLoader(train_data, batch_size=batch_size, shuffle=True)

# ここにニューラルネットワークを定義する
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        # ここに必要なレイヤーを定義する
        
    def forward(self, x):
        # ここにネットワークの順伝搬を記載する
        


# ここに学習用のコードを書く
def train_loop(model, train_loader, criterion, optimizer, device, epochs=100):
    # 学習ループの中身を書く

# ここに推論用のコードを書く
def evaluate_loop(model, X_test, y_test, device):
    model.eval()
    with torch.no_grad(): # 勾配計算を無効化
        X_test, y_test = X_test.to(device), y_test.to(device)
        outputs = nn.functional.softmax(model(X_test), dim=-1)
        _, predicted = torch.max(outputs, 1)
    print(classification_report(y_test.cpu().numpy(), predicted.cpu().numpy(), target_names=['Class 0', 'Class 1'], digits=3))
    print(f"予測値: \n クラス0      クラス1\n{np.array2string(outputs[:5].cpu().detach().numpy(), suppress_small=True)}")

def main():
    X_train, y_train, X_test, y_test = generate_data()
    print_class_distribution(y_train, y_test)
    train_dataloader = get_dataloader(X_train, y_train)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
    model = SimpleNN()
    model.to(device)
    criterion = nn.NLLLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
    
    train_loop(model, train_dataloader, criterion, optimizer, device, epochs=100)
    evaluate_loop(model, X_test, y_test, device)

if __name__ == "__main__":
    main()

Main関数の概要

データの生成 (generate_data):

  • generate_data関数を呼び出し、ダミーデータを作成します。このデータは10個の特徴量を持つサンプル1000個から構成され、特徴量の合計が5以上であればクラス1、そうでなければクラス0のラベルが付けられます。
  • このデータは、訓練データとテストデータに分割され、それぞれX_train, y_trainX_test, y_testに格納されます。

クラス分布の表示 (print_class_distribution):

  • 訓練データとテストデータにおける各クラスのサンプル数を確認し、その分布を表示します。これによりデータセットのバランスを確認できます。

データローダの準備 (get_dataloader):

  • X_trainy_trainTensorDatasetに変換し、DataLoaderを使ってミニバッチを作成します。これにより、訓練データが効率的にニューラルネットワークに供給されるようになります。

デバイスの設定:

  • 使用するデバイス(GPUまたはCPU)を確認し、PyTorchのdeviceを適切に設定します。これにより、トレーニングが可能な場合にはGPUを使用し、そうでなければCPUを使用します。

モデル、損失関数、最適化手法の準備:

  • SimpleNNというニューラルネットワークモデルをインスタンス化し、選択したデバイスに転送します。
  • 損失関数としてNLLLoss(負の対数尤度損失)を、最適化手法としてSGD(確率的勾配降下法)を使用します。

モデルのトレーニング (train_loop):

  • 訓練ループを開始します。指定したエポック数(ここでは100回)を通じて、モデルは訓練データを学習します。この部分の詳細な実装はtrain_loop関数に記載されています。

モデルの評価 (evaluate_loop):

  • 訓練後、テストデータを用いてモデルの評価を行います。evaluate_loop関数は、モデルの予測結果を計算し、classification_reportを使って精度、再現率、F1スコアなどを表示します。

ニューラルネットワークの定義

まずは、学習させたいニューラルネットワークを定義します。
今回は、3層ニューラルネットワークを作成します。

全結合層の定義

ニューラルネットワークと言っても実は色々と種類があるのですが、ここでは最も基本的な「全結合層」を使用します。全結合層は名前の通り、前と後ろのユニットが全て繋がっているネットワークです。ここで定義するネットワークには、必ず守らなければならない制約があります。
それは…「入力層の次元が10」・「出力層の次元が2」である必要があります。これは簡単な話で、ネットワークに入力するデータの特徴量(要素)が10個あり、ネットワークが判定するクラスが2個あるからです。
一方で、中間層の次元には特に制約はありませんので、好きな値に設定してみましょう。ここでは5次元に設定します。

ここで、注意しなければならないのが、レイヤーの定義では重み層を定義することになるため、1つ目は入力層なので10次元、2つ目は中間層なので5次元、3つ目は出力層なので2次元のようには定義できません。重み層を定義するので、最初のレイヤー1は入力が10次元、出力が5次元のself.layer1 = nn.Linear(10, 5)のような重み層として定義します。

では、SimpleNNクラスの__init__メソッドのsuper(SimpleNN, self).__init__()の後に、「入力層の次元が10」・「中間層の次元が5」・「出力層の次元が2」になるような重み層を定義してみましょう。

答え
# ここにニューラルネットワークを定義する
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        # ここに必要なレイヤーを定義する
        self.layer1 = nn.Linear(10, 5)
        self.layer2 = nn.Linear(5, 2)

    def forward(self, x):
        # ここにネットワークの順伝搬を記載する

おまけ:バイアス項

ニューラルネットワークの図を見ていると、前方のユニットと繋がっておらず、いきなり現れたバイアス項と書かれたユニットが登場したと思います。このバイアスですが、実は先ほど定義した重みそうには既に設定されています。nn.Linearにはバイアスを持つかどうかを設定できる引数があり、デフォルトでTrueになっているからです。もし、バイアスを無しにしたい場合はnn.Linear(10, 5, bias=False)のように設定することができます。

活性化関数の定義

重み層を定義できたら、次は活性化関数を定義してみましょう。
活性化関数といえば、「活性化関数は非線形な変換を行う関数で、ニューラルネットワークの各層に非線形変換を行うことで、線形変換だけでは解けない複雑なパターンを学習できるようにする」みたいな説明がどこかに出てくると思いますが、非線形な関数だと捉えておけばひとまず良いと思います。
活性化関数にもたくさん種類がありますが、詳しい説明をされている方の記事を参照してください。
ここでは、ReLU 関数を使いますので、nn.ReLU()を使って重み層と同じ箇所に定義してみてください。

答え
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        # ここに必要なレイヤーを定義する
        self.layer1 = nn.Linear(10, 5)
        self.layer2 = nn.Linear(5, 2)
        self.act_fn = nn.ReLU()

    def forward(self, x):
        # ここにネットワークの順伝搬を記載する

重み層の時とは違って、活性化関数はact_fn1 = nn.ReLU() act_fn2 = nn.ReLU()のように同じ種類の活性化関数を複数定義する必要はありません。なぜなら、重み層とは異なり、活性化関数(例: nn.ReLU())は学習するパラメータを持たず、単なる数値変換を行う関数だからです。そのため、複数の層で同じインスタンスを使い回しても、新たに定義しても、動作は変わりません。一方で、nn.Linear などの重み層は学習するパラメータを持つため、各層ごとに異なるインスタンスを定義する必要があります。

順伝搬の計算を定義

ここまでで、重み層・活性化関数の定義が完了しました。
あとは、ネットワークで実行される計算の順番を定義すれば、ニューラルネットワークの定義は完了です。
ニューラルネットワークでは、入力層で入力データを受け取り、1層目の重み→活性化関数→2層目の重み→活性化関数の順で進み、最後に出力層から結果が出力されるという流れだったと思います。
このような計算をforwardメソッドに定義していきます。forward(self, x)xは入力データです。例えば、重みと入力ベクトルを掛けてバイアスを足すというWx+bの計算はx = self.layer1(x)のようにすることで実行できます。また、活性化関数の計算はx = self.act_fn(x)で実行できます。
それでは、forwardメソッドにニューラルネットワークの計算を定義して、最終結果を返すコードを書いてください。ただし、出力層(一番最後)の活性化関数の計算はここではまだ定義しないでください。

答え
# ここにニューラルネットワークを定義する
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        # ここに必要なレイヤーを定義する
        self.layer1 = nn.Linear(10, 5)
        self.layer2 = nn.Linear(5, 2)
        self.act_fn = nn.ReLU()

    def forward(self, x):
        # ここにネットワークの順伝搬を記載する
        x = self.layer1(x)
        x = self.act_fn(x)
        x = self.layer2(x)
        # x = self.act_fn(x) 出力層の活性化関数はここでは定義しない
        return x

おまけ

nn.Sequentialを使用することで、レイヤー・活性化関数と順伝搬の流れを同時に定義できます。
ネットワークがシンプルで、各レイヤー・活性化関数の出力を次の入力に順番に渡すだけで良い時は、nn.Sequentialを使用すると簡単に書けます。
この回では扱いませんが、今後の回では各レイヤー・活性化関数の出力を整形したり、別の処理を行ってから次に渡すようなネットワークが登場します。そのような際には、上で挙げたようにforward部分で計算を定義する必要があります。

class SimpleNN(nn.Module):
    def __init__(self, device=None):
        super(SimpleNN, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(10, 5),
            nn.ReLU(),
            nn.Linear(5, 2)
        )

    def forward(self, x):
        return self.model(x)

ニューラクネットワークの学習を行うコードを作成

いよいよ、学習のコードを書いていきます。
このコードはtrain_loop関数の中に書いていきます。
まず、学習の前にしなければならないのはmodel.train()という設定です。この設定でモデルを学習モードにします。(学習モードの詳細に触れるとややこしくなるのでここでは触れません/興味のある方は調べてみてください)

def train_loop(model, train_loader, criterion, optimizer, device, epochs=100):
    model.train()

続いて、学習の基本の形…エポックのループとミニバッチのループを作成します。

まず、機械学習ではデータセットを使ってモデルを学習するわけですが、データセットを何回学習するかを設定することができます。小学生の漢字の勉強に例えると、漢字ドリル1冊が学習データセットであり、この漢字ドリルを何周解くか、という感じです。1周しかしないなら1エポック、3周するなら3エポックといった感じでエポック数というのを決めます。

次に登場するのが、ミニバッチという概念です。
先ほどの例で説明すると、漢字ドリルを1冊解く際に1日で全部解くことは稀だと思います。普通は1日3ページ解く、といった風に決めて何日もかけて1冊を解き終わると思います。この1日(回)あたりに解くページ数(データのサンプル数)のことをミニバッチサイズと言います。
つまり、学習データセットに含まれるサンプル数が160個あった際に、ミニバッチサイズ16で学習すると1エポック(1周)に10個のミニバッチがあるという風になります。モデルの学習はミニバッチ単位で行うので、1エポックあたり10回学習し、1回の学習には16個のサンプルを使用するという風に解釈できます。

ここで、「エポック=データセットを何周学習するか」という指標は直感的だと思いますが、なぜミニバッチにわざわざ分けるのか?という疑問を持った方もいると思います。これにはいくつかの理由があります。
まず最も単純な理由は、「1回でデータセット全部を学習するのは大変だから」という理由です。何が大変かというと、1回の学習に使用するサンプル数が増えるほど、メモリ使用量が大きくなるのが大変なのです。ニューラルネットワークの学習ではパラメータを更新する際に、「(多少乱暴に言うと)元のパラメータから勾配を引く」と言う計算をしていたと思います。この計算に必要な勾配情報を1回の学習が終わるまでメモリに保持しておく必要があるため、1回の学習に使用するサンプル数が多くなればなるほどメモリ消費量が増えてしまいます。

では、エポックとミニバッチのループ処理を以下のように作成します。
train_loaderはループ処理の中で、1バッチ分の入力データxと正解ラベルyを順番に取り出します。

def train_loop(model, train_loader, criterion, optimizer, device, epochs=100):
    model.train()
    for epoch in range(epochs):
        for batch_X, batch_y in train_loader:
       # ここに学習の中身を書く

学習ループの中身は以下のような処理を記述します。

  1. 取り出した入力データxと正解ラベルyをモデルと同じデバイスに転送します
  2. optimizer.zero_grad()でモデルのパラメータが持つ勾配情報をゼロにリセットします
  3. modelbatch_Xを入力として与えて、順伝播(forward)の出力をoutputsとして受け取ります
  4. nn.functional.log_softmaxをモデルの出力に適用してクラス分類の結果を0〜1の確率分布にします
  5. criterionにモデルの出力(確率分布)と正解ラベルbatch_yを与えて、損失(誤差)を計算します
  6. 計算した損失をloss.backward()で逆伝播(バックプロパゲーション)を実行し、各パラメータの勾配を計算します
  7. 計算した勾配情報を用いてoptimizer.step()でパラメータを更新します
答え

def train_loop(model, train_loader, criterion, optimizer, device, epochs=100):
    model.train()
    for epoch in range(epochs):
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(nn.functional.log_softmax(outputs, dim=-1), batch_y)
            loss.backward()
            optimizer.step()
        # 10エポックごとにLossを表示する
        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")

実行とその後の改良

ここまででコードは完成したので、実行してみましょう。
実行がうまくいったら、モデルのネットワーク構造やエポック数、optimizerなどを変更してみて分類性能を改善してみましょう。

発展①

以下のコードを導入し、Bag-of-Words(BoW)を用いてテキストのクラス分類を実行するコードに書き換えてみてください。

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import classification_report
from collections import Counter
from transformers import AutoTokenizer
import pandas as pd
import numpy as np

# BERTトークナイザと語彙取得
tokenizer = AutoTokenizer.from_pretrained("tohoku-nlp/bert-base-japanese-v3")
vocab = tokenizer.get_vocab()  # 語彙は {token: index} の辞書

# BoWベクトルを作成する関数(語彙は固定でtokenizerのものを使う)
def build_bow_features(texts, vocab):
    bow_vectors = []

    for text in texts:
        tokens = tokenizer.tokenize(text)
        vec = np.zeros(len(vocab), dtype=np.float32)
        for token in tokens:
            if token in vocab:
                vec[vocab[token]] += 1
        bow_vectors.append(vec)

    return np.array(bow_vectors)

# CSVファイルを読み込んでBoWとラベルを返す
def load_csv_bow(csv_path, vocab):
    label_mapping = {'a': 0, 'e': 1, 'm': 2}
    df = pd.read_csv(csv_path, header=None, names=["label", "text"])
    texts = df['text'].astype(str).tolist()
    labels = (df['label'].astype(str)).map(label_mapping).tolist()
    X_bow = build_bow_features(texts, vocab)
    return torch.tensor(X_bow, dtype=torch.float32), torch.tensor(labels, dtype=torch.long)

Main関数の変更例は以下のようになります。
この他の主な変更箇所は、モデルのネットワークとtrain_loop、evaluate_loopの部分です。

def main():
    train_file = '../data/train.csv'
    dev_file = '../data/dev.csv'
    test_file = '../data/test.csv'

    # 語彙はすでに上で定義済み(tokenizer.get_vocab())
    print("Loading data...")
    X_train, y_train = load_csv_bow(train_file, vocab)
    X_dev, y_dev = load_csv_bow(dev_file, vocab)
    X_test, y_test = load_csv_bow(test_file, vocab)

    print_class_distribution(y_train, "Train")
    print_class_distribution(y_dev, "Dev")
    print_class_distribution(y_test, "Test")

    train_loader = get_dataloader(X_train, y_train)
    dev_loader = get_dataloader(X_dev, y_dev, batch_size=32, shuffle=False)
    test_loader = get_dataloader(X_test, y_test, batch_size=32, shuffle=False)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = SimpleNN(input_dim=X_train.shape[1]).to(device)
    criterion = nn.NLLLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

    print("Training model...")
    train_loop(model, train_loader, criterion, optimizer, device,
               epochs=20)

    
    print("Evaluating on Test set:")
    evaluate(model, test_loader, device)

発展②

早期終了(early stopping)と呼ばれる手法を導入してみましょう。

答え

# 早期終了の処理を関数にする
def early_stopping(model, X_dev, y_dev, criterion, patience=3, device='cpu'):
    best_loss = float('inf')
    counter = 0
    best_model_state = None

    model.eval()
    with torch.no_grad():
        # 検証損失を計算
        X_dev, y_dev = X_dev.to(device), y_dev.to(device)
        outputs = model(X_dev)
        val_loss = criterion(nn.functional.log_softmax(outputs, dim=-1), y_dev).item()

    if val_loss < best_loss:
        best_loss = val_loss
        best_model_state = model.state_dict()
        counter = 0
    else:
        counter += 1
        print(f"  🔸 No improvement. EarlyStopping counter: {counter}/{patience}")
        if counter >= patience:
            print("  ⏹️ Early stopping triggered.")
            return True, best_model_state

    return False, best_model_state

# 学習ループ(早期終了機能を関数化)
def train_loop(model, train_loader, criterion, optimizer, device,
               X_dev, y_dev, patience=3, epochs=20):
    best_loss = float('inf')
    best_model_state = None

    for epoch in range(epochs):
        model.train()
        total_loss = 0.0

        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(nn.functional.log_softmax(outputs, dim=-1), batch_y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_train_loss = total_loss / len(train_loader)

        # Early stoppingチェック
        stop, best_model_state = early_stopping(model, X_dev, y_dev, criterion, patience, device)
        
        print(f"Epoch {epoch+1}/{epochs} | Train Loss: {avg_train_loss:.4f}")

        if stop:
            model.load_state_dict(best_model_state)
            break