View file src/colab/mnistds_optuna.py - Download

# -*- coding: utf-8 -*-
"""mnistds_optuna.ipynb

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1WUpEzyGNdy9V2ff8gOhNOtsnmSuEazAy

https://colab.research.google.com/drive/1JIuTDS5JFXy21B-VG7fK5WDMkrWTyLhe#scrollTo=_qEl_XlRHaO3
"""

!pip install optuna

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor
from torch.utils.data import TensorDataset

import numpy

import optuna
import copy

# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")


transform = transforms.ToTensor()
training_data = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_data = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

batch_size = 50

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

# Define model
class NeuralNetwork1(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

class NeuralNetwork2(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


model1 = NeuralNetwork1().to(device)
print(model1)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model1.parameters(), lr=1e-3)

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float32).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    return correct

def train_model(model, loss_fn, optimizer, epochs=10):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    for t in range(epochs):
        print(f"Epoch {t+1}\n-------------------------------")
        train(train_dataloader, model, loss_fn, optimizer)
        epoch_acc = test(test_dataloader, model, loss_fn)
        if epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = copy.deepcopy(model.state_dict())
    print("Done!")
    return best_acc

best_acc = train_model(model1, loss_fn, optimizer, 5)

print(f'Best accuracy: {100*best_acc:6.3f}%')

def objective(trial):
    params = {
        "model": trial.suggest_categorical('model',["model1", "model2"]),
        "optimizer_name": trial.suggest_categorical('optimizer_name',["SGD", "Adam"]),
        "lr": trial.suggest_loguniform('lr', 1e-4, 1e-2),
    }
    # model = NeuralNetwork1().to(device)
    if params["model"] == "model1":
        model = NeuralNetwork1().to(device)
    elif params["model"] == "model2":
        model = NeuralNetwork2().to(device)
    # loss_fn = nn.CrossEntropyLoss()
    # optimizer = torch.optim.SGD(model.parameters(), lr=params['lr'])
    optimizer = getattr(
        torch.optim, params["optimizer_name"]
    )(model.parameters(), lr=params["lr"])
    best_acc = train_model(model, loss_fn, optimizer, epochs=5)
    return best_acc

sampler = optuna.samplers.TPESampler()
pruner = optuna.pruners.MedianPruner(n_startup_trials=3, n_warmup_steps=5, interval_steps=3)

study = optuna.create_study(
    sampler=sampler,
    pruner=pruner,
    direction='maximize')
study.optimize(func=objective, n_trials=10)

print('Number of finished trials:', len(study.trials))
print('Best trial:', study.best_trial)
print('Best params:', study.best_params)

optuna.visualization.plot_contour(study, params=['optimizer_name','model'])

optuna.visualization.plot_slice(study)

optuna.visualization.plot_param_importances(study)

optuna.visualization.plot_optimization_history(study)