View file src/colab/universal_model.py - Download

# -*- coding: utf-8 -*-
"""universal_model.ipynb

Automatically generated by Colaboratory.

Original file is located at
    https://colab.research.google.com/drive/1wlODMxtNSnZH8DceZqGFpOts_lDRTqDW

Universal model

Artificial intelligence and more specifically deep learning use neural networks with a particular structure called a model. The model specifies the number of layers and the connection between them.

Any model can be seen as a particular case of an universal model in which any neuron is connected to any other neuron, where the connections that does not exist in the model have a weight of zero.

Sources :

https://pytorch.org/tutorials/beginner/basics/buildmodel_tutorial.html

https://pytorch.org/tutorials/beginner/introyt/modelsyt_tutorial.html

Import modules
"""

import os
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

"""Get Device for Training

We want to be able to train our model on a hardware accelerator like the GPU or MPS, if available. Let’s check to see if torch.cuda or torch.backends.mps are available, otherwise we use the CPU.

"""

device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

"""Example of simple model with two linear layers and Sigmoid activation function"""

class TinyModel(torch.nn.Module):

    def __init__(self):
        super(TinyModel, self).__init__()

        self.linear1 = torch.nn.Linear(3, 4)
        self.activation = torch.nn.Sigmoid()
        self.linear2 = torch.nn.Linear(4, 2)

    def forward(self, x):
        x = self.linear1(x)
        x = self.activation(x)
        x = self.linear2(x)
        x = self.activation(x)
        return x

model = TinyModel()
print(model)
for param in model.parameters():
  print(param)

X = torch.rand(1, 3)
print(X)
Y = model(X)
print(Y)

"""Y1 = activation (W1 X + B1)

Y2 = W2 Y1 + B2

J code :



```

sigmoid =: 3 : 0
 1 % (1 + ^ -y)
)

W1 =: 0 0 $ 0
W1 =: W1,  0.3896 _0.1911  0.1587
W1 =: W1, _0.0114  0.2474  0.0941
W1 =: W1, _0.3643  0.4872 _0.3196
W1 =: W1,  0.1010  0.4664 _0.2655

B1 =: 0.2310 _0.0823 _0.3589 _0.1669

W2 =: 0 0 $ 0
W2 =: W2,  0.3852  0.3764  0.4004  0.3440
W2 =: W2,  0.3021 _0.4176  0.2904 _0.4338

B2 =: _0.1048 0.1486

X =: 0.4064 0.2629 0.7527

Y =: sigmoid B2 + W2 +/ . * sigmoid B1 + W1 +/ . * X
echo Y

```
Result : 0.649924 0.50661

Example of a simple recurrent model which can be used as universal model
"""

class RecurrentModel(torch.nn.Module):

    def __init__(self):
        super(RecurrentModel, self).__init__()

        self.linear1 = torch.nn.Linear(4, 4)
        self.activation = torch.nn.Sigmoid()

    def forward(self, x):
        x = self.linear1(x)
        x = self.activation(x)
        x = self.linear1(x)
        x = self.activation(x)
        return x

rmodel = RecurrentModel()
print(rmodel)
for param in rmodel.parameters():
  print(param)

X = torch.rand(1, 4)
print(X)
Y = rmodel(X)
print(Y)

"""Y = ReLU (B + W (ReLU (B + W X)))

```
Wrm =: 0 0 $ 0
Wrm =: Wrm,  0.0832  0.3243  0.3291 _0.1202
Wrm =: Wrm, _0.2429 _0.4229 _0.0540 _0.0469
Wrm =: Wrm,  0.3151  0.3024  0.1318 _0.3459
Wrm =: Wrm, _0.1136 _0.0378  0.3753 _0.4263

Brm =: _0.4079 _0.4827 0.1531 _0.2981

Xrm =: 0.2864 0.9614 0.9418 0.5663

Yrm =: sigmoid Brm + Wrm +/ . * sigmoid Brm + Wrm +/ . * Xrm
echo Yrm

```
Result : 0.467941 0.314583 0.582492 0.419346

Simulate a two layer model with a recurrent model



```

sigmoid =: 3 : 0
 1 % (1 + ^ -y)
)

W1 =: 0 0 $ 0
W1 =: W1,  0.3896 _0.1911  0.1587
W1 =: W1, _0.0114  0.2474  0.0941
W1 =: W1, _0.3643  0.4872 _0.3196
W1 =: W1,  0.1010  0.4664 _0.2655

B1 =: 0.2310 _0.0823 _0.3589 _0.1669

W2 =: 0 0 $ 0
W2 =: W2,  0.3852  0.3764  0.4004  0.3440
W2 =: W2,  0.3021 _0.4176  0.2904 _0.4338

B2 =: _0.1048 0.1486

X =: 0.4064 0.2629 0.7527

Y =: sigmoid B2 + W2 +/ . * sigmoid B1 + W1 +/ . * X
echo Y

Wr =: ((3 3 $ 0),W1,(2 3 $ 0)),.((7 4 $ 0),W2),.(9 2 $ 0)
echo Wr

Br =: 0 0 0, B1, B2
echo Br

Xr =: X, (6 $ 0)
echo Xr

Yr =: sigmoid Br + Wr +/ . * sigmoid Br + Wr +/ . * Xr
echo Yr


```

Results :    



```

   Xr
0.4064 0.2629 0.7527 0 0 0 0 0 0
   Wr
      0       0       0      0       0      0       0 0 0
      0       0       0      0       0      0       0 0 0
      0       0       0      0       0      0       0 0 0
 0.3896 _0.1911  0.1587      0       0      0       0 0 0
_0.0114  0.2474  0.0941      0       0      0       0 0 0
_0.3643  0.4872 _0.3196      0       0      0       0 0 0
  0.101  0.4664 _0.2655      0       0      0       0 0 0
      0       0       0 0.3852  0.3764 0.4004   0.344 0 0
      0       0       0 0.3021 _0.4176 0.2904 _0.4338 0 0
   Br
0 0 0 0.231 _0.0823 _0.3589 _0.1669 _0.1048 0.1486
   Yr
0.5 0.5 0.5 0.600992 0.520676 0.387638 0.496013 0.649924 0.50661
   

```

Example of a layered network with MNIST data

```
Y1[512] = ReLU (W1[512,28*28] X[28*28] + B1[512])
Y2[512] = ReLU (W2[512,512] Y1[512] + B2[512])
Y[10] = W3[10,512] Y2[512] + B3(10)
```  

equivalent of

```
X' = X , 0 0 ... 0 (512+512+10 zeros)

             [28*28] [512] [512] [10]      
W' = [28*28]    0      0    0     0
     [512]      W1     0    0     0
     [512]      0      W2   0     0
     [10]       0      0    W3    0

B' = 0 ... 0 , B1 , B2 , B3

n = 28*28 + 512 + 512 + 10
Y'1[n] = ReLU (W'[n,n] X[n] + B'[n])
Y'2[n] = ReLU (W'[n,n] Y'1[n] + B'[n])
Y' = W'[n,n] Y'2[n] + B'[n]
```
"""

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor
from torch.utils.data import TensorDataset

import numpy

# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

transform = transforms.ToTensor()
training_data = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_data = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

batch_size = 50

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

model = NeuralNetwork().to(device)
print(model)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float32).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

torch.save(model.state_dict(), "model.pth")
print("Saved PyTorch Model State to model.pth")

model = NeuralNetwork()
model.load_state_dict(torch.load("model.pth"))

classes = [
    "Zero ",
    "One  ",
    "Two  ",
    "Three",
    "Four ",
    "Five ",
    "Six  ",
    "Seven",
    "Eight",
    "Nine ",
]

model.eval()

ngood = 0
nbad = 0

for i in range(100):
    x, y = test_data[i][0], test_data[i][1]
    with torch.no_grad():
        pred = model(x)
        predicted, actual = classes[pred[0].argmax(0)], classes[y]
        if predicted == actual:
            result = "Good"
            ngood = ngood + 1
        else:
            result = "Bad"
            nbad = nbad + 1
        print(f'Predicted: "{predicted}", Actual: "{actual}", Result: {result}')

print(f'{ngood} good, {nbad} bad')

"""Equivalent with universal model

"""

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor
from torch.utils.data import TensorDataset

import numpy

# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

transform = transforms.ToTensor()
training_data = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_data = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

batch_size = 50

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        n = 28*28 + 512 + 512 + 10
        self.linear = nn.Linear(n, n)

    def forward(self, x):
        x = self.flatten(x)
        z = torch.zeros(x.shape[0], 512+512+10).to(device)
        x = torch.cat((x, z), 1)
        y = self.linear(x)
        y = nn.ReLU()(y)
        y = self.linear(y)
        y = nn.ReLU()(y)
        y = self.linear(y)
        y = y[:, 28*28+512+512:]
        return y

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.ninput = 28*28
        self.ninterm = 512 + 512
        self.noutput = 10
        self.ntotal = self.ninput + self.ninterm + self.noutput
        self.linear = nn.Linear(self.ntotal, self.ntotal)

    def forward(self, x):
        x = self.flatten(x)
        z = torch.zeros(x.shape[0], self.ninterm + self.noutput).to(device)
        x = torch.cat((x, z), 1)
        y = self.linear(x)
        y = nn.LeakyReLU(0.01)(y)
        y = self.linear(y)
        y = nn.LeakyReLU(0.01)(y)
        y = self.linear(y)
        y = y[:, self.ninput + self.ninterm:]
        return y

model = NeuralNetwork().to(device)
print(model)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float32).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

torch.save(model.state_dict(), "model.pth")
print("Saved PyTorch Model State to model.pth")

# model = NeuralNetwork()
model.load_state_dict(torch.load("model.pth"))

classes = [
    "Zero ",
    "One  ",
    "Two  ",
    "Three",
    "Four ",
    "Five ",
    "Six  ",
    "Seven",
    "Eight",
    "Nine ",
]

model.eval()

ngood = 0
nbad = 0

for i in range(100):
    x, y = test_data[i][0], test_data[i][1]
    x = torch.tensor(x).to(device)
    y = torch.tensor(y).to(device)
    with torch.no_grad():
        pred = model(x)
        # print(f"pred={pred}")
        predicted, actual = classes[pred[0].argmax(0)], classes[y]
        if predicted == actual:
            result = "Good"
            ngood = ngood + 1
        else:
            result = "Bad"
            nbad = nbad + 1
        print(f'Predicted: "{predicted}", Actual: "{actual}", Result: {result}')

print(f'{ngood} good, {nbad} bad')

"""Other example : image classification

Original version :    

"""

from torchvision.datasets import CIFAR10
from torchvision.transforms import transforms
from torch.utils.data import DataLoader

import torch

# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

# Loading and normalizing the data.
# Define transformations for the training and test sets
transformations = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# CIFAR10 dataset consists of 50K training images. We define the batch size of 10 to load 5,000 batches of images.
batch_size = 10
number_of_labels = 10

# Create an instance for training.
# When we run this code for the first time, the CIFAR10 train dataset will be downloaded locally.
train_set =CIFAR10(root="./data",train=True,transform=transformations,download=True)

# Create a loader for the training set which will read the data within batch size and put into memory.
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=0)
print("The number of images in a training set is: ", len(train_loader)*batch_size)

# Create an instance for testing, note that train is set to False.
# When we run this code for the first time, the CIFAR10 test dataset will be downloaded locally.
test_set = CIFAR10(root="./data", train=False, transform=transformations, download=True)

# Create a loader for the test set which will read the data within batch size and put into memory.
# Note that each shuffle is set to false for the test loader.
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=0)
print("The number of images in a test set is: ", len(test_loader)*batch_size)

print("The number of batches per epoch is: ", len(train_loader))
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')


import torch
import torch.nn as nn
import torchvision
import torch.nn.functional as F

# Define a convolution neural network
class Network(nn.Module):
    def __init__(self):
        super(Network, self).__init__()

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=12, kernel_size=5, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(12)
        self.conv2 = nn.Conv2d(in_channels=12, out_channels=12, kernel_size=5, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(12)
        self.pool = nn.MaxPool2d(2,2)
        self.conv4 = nn.Conv2d(in_channels=12, out_channels=24, kernel_size=5, stride=1, padding=1)
        self.bn4 = nn.BatchNorm2d(24)
        self.conv5 = nn.Conv2d(in_channels=24, out_channels=24, kernel_size=5, stride=1, padding=1)
        self.bn5 = nn.BatchNorm2d(24)
        self.fc1 = nn.Linear(24*10*10, 10)

    def forward(self, input):
        input = input.to(device)
        output = F.relu(self.bn1(self.conv1(input).to(device)))
        output = F.relu(self.bn2(self.conv2(output)))
        output = self.pool(output)
        output = F.relu(self.bn4(self.conv4(output)))
        output = F.relu(self.bn5(self.conv5(output)))
        output = output.view(-1, 24*10*10)
        output = self.fc1(output)

        # print(f"input:{input.shape} output:{output.shape}")

        return output.to(device)

# Instantiate a neural network model
model = Network()


from torch.optim import Adam

# Define the loss function with Classification Cross-Entropy loss and an optimizer with Adam optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001, weight_decay=0.0001)


from torch.autograd import Variable

# Function to save the model
def saveModel():
    path = "./myFirstModel.pth"
    torch.save(model.state_dict(), path)

# Function to test the model with the test dataset and print the accuracy for the test images
def testAccuracy():

    model.to(device)

    model.eval()
    accuracy = 0.0
    total = 0.0

    with torch.no_grad():
        for data in test_loader:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            # run the model on the test set to predict labels
            outputs = model(images)
            # the label with the highest energy will be our prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            accuracy += (predicted == labels).sum().item()

    # compute the accuracy over all test images
    accuracy = (100 * accuracy / total)
    return(accuracy)


# Training function. We simply have to loop over our data iterator and feed the inputs to the network and optimize.
def train(num_epochs):

    best_accuracy = 0.0

    # Define your execution device
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("The model will be running on", device, "device")
    # Convert model parameters and buffers to CPU or Cuda
    model.to(device)

    for epoch in range(num_epochs):  # loop over the dataset multiple times
        running_loss = 0.0
        running_acc = 0.0

        for i, (images, labels) in enumerate(train_loader, 0):

            # get the inputs
            images = Variable(images.to(device))
            labels = Variable(labels.to(device))

            # zero the parameter gradients
            optimizer.zero_grad()
            # predict classes using images from the training set
            outputs = model(images)
            # compute the loss based on model output and real labels
            loss = loss_fn(outputs, labels)
            # backpropagate the loss
            loss.backward()
            # adjust parameters based on the calculated gradients
            optimizer.step()

            # Let's print statistics for every 1,000 images
            running_loss += loss.item()     # extract the loss value
            if i % 1000 == 999:
            # if i % 100 == 99:
                # print every 1000 (twice per epoch)
                print('[%d, %5d] loss: %.3f' %
                      (epoch + 1, i + 1, running_loss / 1000))
                # zero the loss
                running_loss = 0.0

        # Compute and print the average accuracy fo this epoch when tested over all 10000 test images
        accuracy = testAccuracy()
        print('For epoch', epoch+1,'the test accuracy over the whole test set is %d %%' % (accuracy))

        # we want to save the model if the accuracy is the best
        if accuracy > best_accuracy:
            saveModel()
            best_accuracy = accuracy


import matplotlib.pyplot as plt
import numpy as np

# Function to show the images
def imageshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.cpu().numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# Function to test the model with a batch of images and show the labels predictions
def testBatch():

    model.to(device)

    # get batch of images from the test DataLoader
    images, labels = next(iter(test_loader))
    images = images.to(device)
    labels = labels.to(device)

    # show all images as one image grid
    imageshow(torchvision.utils.make_grid(images))

    # Show the real labels on the screen
    print('Real labels: ', ' '.join('%5s' % classes[labels[j]]
                               for j in range(batch_size)))

    # Let's see what if the model identifiers the  labels of those example
    outputs = model(images)
    outputs = outputs.to(device)

    # We got the probability for every 10 labels. The highest (max) probability should be correct label
    _, predicted = torch.max(outputs, 1)

    # Let's show the predicted labels on the screen to compare with the real ones
    print('Predicted: ', ' '.join('%5s' % classes[predicted[j]]
                              for j in range(batch_size)))


if __name__ == "__main__":

    # Let's build our model
    train(5)
    print('Finished Training')

    # Test which classes performed well
    # testModelAccuracy()
    testAccuracy()

    # Let's load the model we just created and test the accuracy per label
    model = Network()
    path = "myFirstModel.pth"
    model.load_state_dict(torch.load(path))

    # Test with batch of images
    testBatch()

"""Image classification, version with universal model :    

"""

from torchvision.datasets import CIFAR10
from torchvision.transforms import transforms
from torch.utils.data import DataLoader

import torch

# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

# Loading and normalizing the data.
# Define transformations for the training and test sets
transformations = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# CIFAR10 dataset consists of 50K training images. We define the batch size of 10 to load 5,000 batches of images.
batch_size = 10
number_of_labels = 10

# Create an instance for training.
# When we run this code for the first time, the CIFAR10 train dataset will be downloaded locally.
train_set =CIFAR10(root="./data",train=True,transform=transformations,download=True)

# Create a loader for the training set which will read the data within batch size and put into memory.
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=0)
print("The number of images in a training set is: ", len(train_loader)*batch_size)

# Create an instance for testing, note that train is set to False.
# When we run this code for the first time, the CIFAR10 test dataset will be downloaded locally.
test_set = CIFAR10(root="./data", train=False, transform=transformations, download=True)

# Create a loader for the test set which will read the data within batch size and put into memory.
# Note that each shuffle is set to false for the test loader.
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=0)
print("The number of images in a test set is: ", len(test_loader)*batch_size)

print("The number of batches per epoch is: ", len(train_loader))
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')


import torch
import torch.nn as nn
import torchvision
import torch.nn.functional as F

# Define a convolution neural network
class Network1(nn.Module):
    def __init__(self):
        super(Network, self).__init__()

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=12, kernel_size=5, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(12)
        self.conv2 = nn.Conv2d(in_channels=12, out_channels=12, kernel_size=5, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(12)
        self.pool = nn.MaxPool2d(2,2)
        self.conv4 = nn.Conv2d(in_channels=12, out_channels=24, kernel_size=5, stride=1, padding=1)
        self.bn4 = nn.BatchNorm2d(24)
        self.conv5 = nn.Conv2d(in_channels=24, out_channels=24, kernel_size=5, stride=1, padding=1)
        self.bn5 = nn.BatchNorm2d(24)
        self.fc1 = nn.Linear(24*10*10, 10)

    def forward(self, input):
        input = input.to(device)
        output = F.relu(self.bn1(self.conv1(input)))
        output = F.relu(self.bn2(self.conv2(output)))
        output = self.pool(output)
        output = F.relu(self.bn4(self.conv4(output)))
        output = F.relu(self.bn5(self.conv5(output)))
        output = output.view(-1, 24*10*10)
        output = self.fc1(output)

        # print(f"input:{input.shape} output:{output.shape}")

        return output.to(device)

# Define model
class Network(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        # self.ntotal = 3*3*32*32 + 3*3*32*32/4 + 10
        # self.ntotal = int(self.ntotal)
        self.ninput = 3*32*32
        self.noutput = 10
        self.ntotal = self.ninput + self.noutput + 512
        print(f"ntotal = {self.ntotal}")
        self.linear = nn.Linear(self.ntotal, self.ntotal)

    def forward(self, x):
        x = self.flatten(x)
        z = torch.zeros(x.shape[0], self.ntotal - self.ninput).to(device)
        x = torch.cat((x, z), 1)
        y = self.linear(x)
        y = nn.ReLU()(y)
        y = self.linear(y)
        y = nn.ReLU()(y)
        y = self.linear(y)
        y = nn.ReLU()(y)
        y = self.linear(y)
        y = nn.ReLU()(y)
        y = self.linear(y)
        y = y[:, self.ntotal - self.noutput:]
        # print(f"x:{x.shape} y:{y.shape}")
        return y

# Instantiate a neural network model
model = Network()


from torch.optim import Adam

# Define the loss function with Classification Cross-Entropy loss and an optimizer with Adam optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001, weight_decay=0.0001)


from torch.autograd import Variable

# Function to save the model
def saveModel():
    path = "./myFirstModel.pth"
    torch.save(model.state_dict(), path)

# Function to test the model with the test dataset and print the accuracy for the test images
def testAccuracy():

    model.to(device)

    model.eval()
    accuracy = 0.0
    total = 0.0

    with torch.no_grad():
        for data in test_loader:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            # run the model on the test set to predict labels
            outputs = model(images)
            # the label with the highest energy will be our prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            accuracy += (predicted == labels).sum().item()

    # compute the accuracy over all test images
    accuracy = (100 * accuracy / total)
    return(accuracy)


# Training function. We simply have to loop over our data iterator and feed the inputs to the network and optimize.
def train(num_epochs):

    best_accuracy = 0.0

    # Define your execution device
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("The model will be running on", device, "device")
    # Convert model parameters and buffers to CPU or Cuda
    model.to(device)

    for epoch in range(num_epochs):  # loop over the dataset multiple times
        running_loss = 0.0
        running_acc = 0.0

        for i, (images, labels) in enumerate(train_loader, 0):

            # get the inputs
            images = Variable(images.to(device))
            labels = Variable(labels.to(device))

            # zero the parameter gradients
            optimizer.zero_grad()
            # predict classes using images from the training set
            outputs = model(images)
            # compute the loss based on model output and real labels
            loss = loss_fn(outputs, labels)
            # backpropagate the loss
            loss.backward()
            # adjust parameters based on the calculated gradients
            optimizer.step()

            # Let's print statistics for every 1,000 images
            running_loss += loss.item()     # extract the loss value
            if i % 1000 == 999:
            # if i % 100 == 99:
                # print every 1000 (twice per epoch)
                print('[%d, %5d] loss: %.3f' %
                      (epoch + 1, i + 1, running_loss / 1000))
                # zero the loss
                running_loss = 0.0

        # Compute and print the average accuracy fo this epoch when tested over all 10000 test images
        accuracy = testAccuracy()
        print('For epoch', epoch+1,'the test accuracy over the whole test set is %d %%' % (accuracy))

        # we want to save the model if the accuracy is the best
        if accuracy > best_accuracy:
            saveModel()
            best_accuracy = accuracy


import matplotlib.pyplot as plt
import numpy as np

# Function to show the images
def imageshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.cpu().numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# Function to test the model with a batch of images and show the labels predictions
def testBatch():

    model.to(device)

    # get batch of images from the test DataLoader
    images, labels = next(iter(test_loader))
    images = images.to(device)
    labels = labels.to(device)

    # show all images as one image grid
    imageshow(torchvision.utils.make_grid(images))

    # Show the real labels on the screen
    print('Real labels: ', ' '.join('%5s' % classes[labels[j]]
                               for j in range(batch_size)))

    # Let's see what if the model identifiers the  labels of those example
    outputs = model(images)
    outputs = outputs.to(device)

    # We got the probability for every 10 labels. The highest (max) probability should be correct label
    _, predicted = torch.max(outputs, 1)

    # Let's show the predicted labels on the screen to compare with the real ones
    print('Predicted: ', ' '.join('%5s' % classes[predicted[j]]
                              for j in range(batch_size)))


if __name__ == "__main__":

    # Let's build our model
    train(5)
    print('Finished Training')

    # Test which classes performed well
    # testModelAccuracy()
    testAccuracy()

    # Let's load the model we just created and test the accuracy per label
    model = Network()
    path = "myFirstModel.pth"
    model.load_state_dict(torch.load(path))

    # Test with batch of images
    testBatch()

"""Other example : GAN (generative adversarial network) generating fake handwritten digits.

Original version :    

"""

import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.utils import make_grid
from tqdm import tqdm


# Configuration
epochs      = 100
batch_size  = 64
sample_size = 100    # Number of random values to sample
g_lr        = 1.0e-4 # Generator's learning rate
d_lr        = 1.0e-4 # Discriminator's learning rate


# DataLoader for MNIST
transform = transforms.ToTensor()
dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
dataloader = DataLoader(dataset, batch_size=batch_size, drop_last=True)


# Generator Network
class Generator(nn.Sequential):
    def __init__(self, sample_size: int):
        super().__init__(
            nn.Linear(sample_size, 128),
            nn.LeakyReLU(0.01),
            nn.Linear(128, 784),
            nn.Sigmoid())

        # Random value vector size
        self.sample_size = sample_size

    def forward(self, batch_size: int):
        # Generate randon values
        z = torch.randn(batch_size, self.sample_size)

        # Generator output
        output = super().forward(z)

        # Convert the output into a greyscale image (1x28x28)
        generated_images = output.reshape(batch_size, 1, 28, 28)
        return generated_images


# Discriminator Network
class Discriminator(nn.Sequential):
    def __init__(self):
        super().__init__(
            nn.Linear(784, 128),
            nn.LeakyReLU(0.01),
            nn.Linear(128, 1))

    def forward(self, images: torch.Tensor, targets: torch.Tensor):
        prediction = super().forward(images.reshape(-1, 784))
        loss = F.binary_cross_entropy_with_logits(prediction, targets)
        return loss


# To save images in grid layout
def save_image_grid(epoch: int, images: torch.Tensor, ncol: int):
    image_grid = make_grid(images, ncol)     # Images in a grid
    image_grid = image_grid.permute(1, 2, 0) # Move channel last
    image_grid = image_grid.cpu().numpy()    # To Numpy

    plt.imshow(image_grid)
    plt.xticks([])
    plt.yticks([])
    plt.savefig(f'generated_{epoch:03d}.jpg')
    if (epoch % 10 == 9):
        plt.show()
    plt.close()


# Real and fake labels
real_targets = torch.ones(batch_size, 1)
fake_targets = torch.zeros(batch_size, 1)


# Generator and Discriminator networks
generator = Generator(sample_size)
discriminator = Discriminator()


# Optimizers
d_optimizer = torch.optim.Adam(discriminator.parameters(), lr=d_lr)
g_optimizer = torch.optim.Adam(generator.parameters(), lr=g_lr)


# Training loop
for epoch in range(epochs):

    d_losses = []
    g_losses = []

    for images, labels in tqdm(dataloader):

        #===============================
        # Discriminator Network Training
        #===============================

        # Loss with MNIST image inputs and real_targets as labels
        discriminator.train()
        d_loss = discriminator(images, real_targets)

        # Generate images in eval mode
        generator.eval()
        with torch.no_grad():
            generated_images = generator(batch_size)

        # Loss with generated image inputs and fake_targets as labels
        d_loss += discriminator(generated_images, fake_targets)

        # Optimizer updates the discriminator parameters
        d_optimizer.zero_grad()
        d_loss.backward()
        d_optimizer.step()

        #===============================
        # Generator Network Training
        #===============================

        # Generate images in train mode
        generator.train()
        generated_images = generator(batch_size)

        # Loss with generated image inputs and real_targets as labels
        discriminator.eval() # eval but we still need gradients
        g_loss = discriminator(generated_images, real_targets)

        # Optimizer updates the generator parameters
        g_optimizer.zero_grad()
        g_loss.backward()
        g_optimizer.step()

        # Keep losses for logging
        d_losses.append(d_loss.item())
        g_losses.append(g_loss.item())

    # Print average losses
    print(epoch, np.mean(d_losses), np.mean(g_losses))

    # Save images
    save_image_grid(epoch, generator(batch_size), ncol=8)

"""Version vith universal models :    

"""

import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.utils import make_grid
from tqdm import tqdm


# Configuration
epochs      = 100
batch_size  = 64
sample_size = 100    # Number of random values to sample
g_lr        = 1.0e-4 # Generator's learning rate
d_lr        = 1.0e-4 # Discriminator's learning rate


# DataLoader for MNIST
transform = transforms.ToTensor()
dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
dataloader = DataLoader(dataset, batch_size=batch_size, drop_last=True)

# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

# Generator Network
class Generator(nn.Sequential):
    def __init__(self, sample_size: int):
        super().__init__()
        #   nn.Linear(sample_size, 128),
        #   nn.LeakyReLU(0.01),
        #   nn.Linear(128, 784)
        #   nn.Sigmoid())
        self.ninput = sample_size
        self.ninterm = 128
        self.noutput = 784
        self.ntotal = self.ninput + self.ninterm + self.noutput
        self.linear = nn.Linear(self.ntotal, self.ntotal)


        # Random value vector size
        self.sample_size = sample_size

    def forward(self, batch_size: int):
        # Generate randon values
        x = torch.randn(batch_size, self.sample_size)

        # Generator output
        # output = super().forward(z)

        z = torch.zeros(x.shape[0], self.ninterm + self.noutput) #.to(device)
        x = torch.cat((x, z), 1)
        y = self.linear(x)
        y = nn.LeakyReLU(0.01)(y)
        y = self.linear(y)
        y = nn.Sigmoid()(y)
        output = y[:, self.ninput + self.ninterm:]

        # Convert the output into a greyscale image (1x28x28)
        generated_images = output.reshape(batch_size, 1, 28, 28)
        return generated_images


# Discriminator Network
class Discriminator(nn.Sequential):
    def __init__(self):
        super().__init__()
        #    nn.Linear(784, 128),
        #    nn.LeakyReLU(0.01),
        #    nn.Linear(128, 1))
        self.ninput = 784
        self.ninterm = 128
        self.noutput = 1
        self.ntotal = self.ninput + self.ninterm + self.noutput
        self.linear = nn.Linear(self.ntotal, self.ntotal)

    def forward(self, images: torch.Tensor, targets: torch.Tensor):
        # prediction = super().forward(images.reshape(-1, 784))

        x = images.reshape(-1, 784)
        z = torch.zeros(x.shape[0], self.ninterm + self.noutput) #.to(device)
        x = torch.cat((x, z), 1)
        y = self.linear(x)
        y = nn.LeakyReLU(0.01)(y)
        y = self.linear(y)
        prediction = y[:, self.ninput + self.ninterm:]

        loss = F.binary_cross_entropy_with_logits(prediction, targets)
        return loss


# To save images in grid layout
def save_image_grid(epoch: int, images: torch.Tensor, ncol: int):
    image_grid = make_grid(images, ncol)     # Images in a grid
    image_grid = image_grid.permute(1, 2, 0) # Move channel last
    image_grid = image_grid.cpu().numpy()    # To Numpy

    plt.imshow(image_grid)
    plt.xticks([])
    plt.yticks([])
    plt.savefig(f'generated_{epoch:03d}.jpg')
    if (epoch % 10 == 9):
        plt.show()
    plt.close()


# Real and fake labels
real_targets = torch.ones(batch_size, 1)
fake_targets = torch.zeros(batch_size, 1)


# Generator and Discriminator networks
generator = Generator(sample_size)
discriminator = Discriminator()


# Optimizers
d_optimizer = torch.optim.Adam(discriminator.parameters(), lr=d_lr)
g_optimizer = torch.optim.Adam(generator.parameters(), lr=g_lr)


# Training loop
for epoch in range(epochs):

    d_losses = []
    g_losses = []

    for images, labels in tqdm(dataloader):

        #===============================
        # Discriminator Network Training
        #===============================

        # Loss with MNIST image inputs and real_targets as labels
        discriminator.train()
        d_loss = discriminator(images, real_targets)

        # Generate images in eval mode
        generator.eval()
        with torch.no_grad():
            generated_images = generator(batch_size)

        # Loss with generated image inputs and fake_targets as labels
        d_loss += discriminator(generated_images, fake_targets)

        # Optimizer updates the discriminator parameters
        d_optimizer.zero_grad()
        d_loss.backward()
        d_optimizer.step()

        #===============================
        # Generator Network Training
        #===============================

        # Generate images in train mode
        generator.train()
        generated_images = generator(batch_size)

        # Loss with generated image inputs and real_targets as labels
        discriminator.eval() # eval but we still need gradients
        g_loss = discriminator(generated_images, real_targets)

        # Optimizer updates the generator parameters
        g_optimizer.zero_grad()
        g_loss.backward()
        g_optimizer.step()

        # Keep losses for logging
        d_losses.append(d_loss.item())
        g_losses.append(g_loss.item())

    # Print average losses
    print(epoch, np.mean(d_losses), np.mean(g_losses))

    # Save images
    save_image_grid(epoch, generator(batch_size), ncol=8)