View file src/ia/pytorch/neural_trading.py - Download
# -*- coding: utf-8 -*-
"""neural_trading.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/185BNtBV7OeD3WVXG53GVknhe4XqOyJDR
# NEURAL TRADING
Imports
"""
import torch
from torch import nn
"""Functions"""
def lirecourslf(lnf):
c=[]
for nf in lnf:
print(nf)
f=open(nf,"r")
hdr=f.readline()
for line1 in f:
line=line1[0:len(line1)-1]
r=line.split('\t')
if len(r)>7:
for i in range(3,8):
r[i]=float(r[i].replace(',','.'))
trouve=0
for i in range(len(c)):
if c[i][0][0]==r[0]:
if len(r)>=8:
c[i]=c[i]+[r]
trouve=1
if trouve==0:
# if len(c[len(c)-1])<=1:
# c=c[0:len(c)-1]
c=c+[[r]]
cr = []
for a in c:
if len(a)>1:
cr = cr + [a]
return cr
def coursval(cours,v):
for cv in cours:
if len(cv[0])>1:
if cv[0][1].find(v) == 0:
return cv
return []
def mme(lcs,p):
a = 2 / float(p+1)
c0 = lcs[0][6]
l0 = 1000
l = l0
nt = 0
d = []
m = lcs[0][6]
s = 0
t=-1
for cs in lcs:
t=t+1
c = cs[6]
m = a * c + (1-a) * m
s = (c - m)/m
# print t,c,m,s
if s<0 :
acte = -nt - vd * int((l+c*nt)/float(c))
elif s>0 :
acte = int(l/float(c))
else:
acte = 0
if t 0:
l = l - fraisa
if acte < 0:
l = l - fraisv
k = l + nt * c
# d = d + [[c,m,nt,k/10.0]]
d = d + [[c/c0,m/c0,nt*c0/l0*0.2+0.2,k/l0,(c-m)/c0]]
#if afc>0:
# print(k, lcs[0][1])
# courbes(d,['black','red','green','blue','purple'],wd,hg,' ')
return k
lnf = [
'cours/srd-0804.xls',
'cours/srd-0805.xls',
'cours/srd-0806.xls',
'cours/srd-0807.xls',
'cours/srd-0808.xls',
'cours/srd-0809.xls',
'cours/srd-0810.xls',
'cours/srd-0811.xls',
'cours/srd-0812.xls',
'cours/srd-0901.xls',
'cours/srd-0902.xls',
'cours/srd-0903.xls',
'cours/srd-0904.xls',
'cours/srd-0905.xls',
'cours/srd-0906.xls',
'cours/srd-0907.xls',
'cours/srd-0908.xls',
'cours/srd-0909.xls',
'cours/srd-0910.xls',
'cours/srd-0911.xls',
]
"""Download data files"""
# Commented out IPython magic to ensure Python compatibility.
# %%script bash
# mkdir cours
# cd cours
# for f in srd-0804.xls srd-0805.xls srd-0806.xls srd-0807.xls srd-0808.xls srd-0809.xls srd-0810.xls srd-0811.xls srd-0812.xls srd-0901.xls srd-0902.xls srd-0903.xls srd-0904.xls srd-0905.xls srd-0906.xls srd-0907.xls srd-0908.xls srd-0909.xls srd-0910.xls srd-0911.xls
# do
# echo $f
# wget http://log.chez.com/src/pytrade/cours/$f
# done
# echo
# ls
#
!head cours/srd-0804.xls
"""Example of trading with MME"""
cours = lirecourslf(lnf)
cours_alten = coursval(cours, 'ALTEN')
t1 = 40
vd = 0
fraisa = 7.5
fraisv = 7.5
print(mme(cours_alten, 26))
"""Other functions"""
import math
def cloture1(cv):
return [l[6] for l in cv]
def norm1(y):
return [z/y[0] for z in y]
def nc1(cv):
return [(l[6]/cv[0][6]) for l in cv]
def lnc1(cv):
return [math.log(l[6]/cv[0][6]) for l in cv]
ca = nc1(cours_alten)
print(ca)
from matplotlib import pyplot as plt
plt.figure(figsize=(15,6))
plt.plot(ca)
plt.title('ALTEN')
plt.show()
"""Trading functions"""
def trade1(cv, t1, signal):
tt = len(cv) # temps total
l = 1.0 # liquidités
q = 0.0 # quantité d'actions détenues
graph = []
for t in range(t1, tt):
s = signal(cv[t-t1:t])
if (s >= 1): # on achète
q = q + l / cv[t]
l = 0
elif (s <= -1): # on vend
l = l + q * cv[t]
q = 0
k = l + q * cv[t]
# print(f"{t}: cours={cv[t]:5.3f} s={s:2d} l={l:5.3f} q={q:5.3f} k={k:5.3f}")
graph = graph + [[cv[t], q, k]]
plt.figure(figsize=(15,6))
plt.plot(graph)
plt.show()
return k
def trade2(cv, t1, signal):
tt = len(cv) # temps total
l = 1.0 # liquidités
q = 0.0 # quantité d'actions détenues
graph = []
q = q + 0.5 * l / cv[t1]
l = l - 0.5 * l
for t in range(t1, tt):
s = signal(cv[t-t1:t])
#if (s >= 0.26): # on achète
# q = q + l / cv[t]
# l = 0
#elif (s <= 0.255): # on vend
# l = l + q * cv[t]
# q = 0
if s > 1:
s = 1
if s < -1:
s = -1
if s > 0: # on achète
q = q + s * l / cv[t]
l = l - s * l
elif s < 0: # on vend
l = l - s * q * cv[t]
q = q + s * q
k = l + q * cv[t]
# print(f"{t}: cours={cv[t]:5.3f} s={s:5.3f} l={l:5.3f} q={q:5.3f} k={k:5.3f}")
# print(f"{t}: cours={cv[t]:5.3f}")
graph = graph + [[cv[t], q.to("cpu"), k.to("cpu")]]
plt.figure(figsize=(15,6))
plt.plot(graph)
plt.show()
return k
def trade(cv, t1, signal, display_graph=False):
tt = len(cv) # temps total
l = torch.tensor(1.0, requires_grad=True) # liquidités
q = torch.tensor(0.0, requires_grad=True) # quantité d'actions détenues
graph = []
q = q + 0.5 * l / cv[t1]
l = l - 0.5 * l
for t in range(t1, tt):
s = signal(cv[t-t1:t])
#if (s >= 0.26): # on achète
# q = q + l / cv[t]
# l = 0
#elif (s <= 0.255): # on vend
# l = l + q * cv[t]
# q = 0
if s > 1:
s = 1
if s < -1:
s = -1
if s > 0: # on achète
q = q + s * l / cv[t]
l = l - s * l
elif s < 0: # on vend
l = l - s * q * cv[t]
q = q + s * q
k = l + q * cv[t]
# print(f"{t}: cours={cv[t]:5.3f} s={s:5.3f} l={l:5.3f} q={q:5.3f} k={k:5.3f}")
# print(f"{t}: cours={cv[t]:5.3f}")
if display_graph:
graph = graph + [[cv[t], q.to("cpu").detach().numpy(), k.to("cpu").detach().numpy()]]
if display_graph:
plt.figure(figsize=(15,6))
plt.plot(graph)
plt.show()
return k
def signal0(cv):
return 0
def signal1(cv):
return 1
def signal2(cv):
if (cv[-1] < 0.6):
return 1
elif (cv[-1] > 0.8):
return -1
else:
return 0
"""Some parameters"""
N = 20
# trait A:B, test C:D
# A = 130
# B = 400
# C = 0
# D = 110
A = 160 - N
B = 400
C = 30 - N
D = 110
"""Trading example : buy under 0.6, sell above 0.8."""
k = trade(ca, N, signal2, True)
print(k)
"""Imports and device"""
import torch
from torch import nn
# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")
"""Model"""
# Commented out IPython magic to ensure Python compatibility.
# %%script echo Disabled
# # Define model
# class NeuralNetwork(nn.Module):
# def __init__(self):
# super().__init__()
# self.flatten = nn.Flatten()
# self.linear_relu_stack = nn.Sequential(
# nn.Linear(N, 60),
# nn.ReLU(),
# nn.Linear(60, 30),
# nn.ReLU(),
# nn.Linear(30, 15),
# nn.ReLU(),
# nn.Linear(15, 1)
# )
# def forward(self, x):
# x = self.flatten(x)
# logits = self.linear_relu_stack(x)
# return logits
#
# model = NeuralNetwork().to(device)
# print(model)
# Define model
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(N, 2*N),
nn.ReLU(),
nn.Linear(2*N, N),
nn.ReLU(),
nn.Linear(N, N//2),
nn.ReLU(),
nn.Linear(N//2, 1)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
model = NeuralNetwork().to(device)
print(model)
# Commented out IPython magic to ensure Python compatibility.
# %%script echo Disabled
# # Define model
# class NeuralNetwork(nn.Module):
# def __init__(self):
# super().__init__()
# self.flatten = nn.Flatten()
# self.linear_relu_stack = nn.Sequential(
# nn.Linear(N, 30),
# nn.ReLU(),
# #nn.Linear(30, 30),
# #nn.ReLU(),
# nn.Linear(30, 15),
# nn.ReLU(),
# nn.Linear(15, 1)
# )
# def forward(self, x):
# x = self.flatten(x)
# logits = self.linear_relu_stack(x)
# return logits
#
# model = NeuralNetwork().to(device)
# print(model)
# Commented out IPython magic to ensure Python compatibility.
# %%script echo Disabled
# # Define model
# class NeuralNetwork(nn.Module):
# def __init__(self):
# super().__init__()
# self.flatten = nn.Flatten()
# self.linear_relu_stack = nn.Sequential(
# nn.Linear(N, 10),
# nn.ReLU(),
# nn.Linear(10, 5),
# nn.ReLU(),
# nn.Linear(5, 1)
# )
# def forward(self, x):
# x = self.flatten(x)
# logits = self.linear_relu_stack(x)
# return logits
#
# model = NeuralNetwork().to(device)
# print(model)
# Commented out IPython magic to ensure Python compatibility.
# %%script echo Disabled
# # Define model
# class NeuralNetwork(nn.Module):
# def __init__(self):
# super().__init__()
# self.flatten = nn.Flatten()
# self.linear_relu_stack = nn.Sequential(
# nn.Linear(N, 1),
# )
# def forward(self, x):
# x = self.flatten(x)
# logits = self.linear_relu_stack(x)
# return logits
#
# model = NeuralNetwork().to(device)
# print(model)
"""Loss function"""
loss_fn = nn.CrossEntropyLoss()
# optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
# optimizer = torch.optim.RMSprop(model.parameters(), lr=1e-3)
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
cv = ca[0:N]
t = torch.FloatTensor([cv]).to(device)
s = model(t)
print(f"s = {s}")
def signal_neur(cv):
t = torch.FloatTensor([cv]).to(device)
s = model(t)
return s[0][0]
s = signal_neur(cv)
print(f"signal_neur -> {s}")
k = trade(ca, N, signal_neur, True)
print(f"k = {k}")
"""Train the model from random parameters with one value"""
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
# optimizer = torch.optim.RMSprop(model.parameters(), lr=1e-3)
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# ca_train = ca[100:300]
# ca_test = ca[0:100]
# ca_train = ca[0:160]
# ca_test = ca[160:400]
ca_train = ca[A:B]
ca_test = ca[C:D]
with torch.no_grad():
k = trade(ca_train, N, signal_neur)
print("Result before training:")
print(k)
def train():
model.train()
k = trade(ca_train, N, signal_neur)
loss = -k
optimizer.zero_grad()
loss.backward()
optimizer.step()
epochs = 200
for e in range(epochs):
train()
with torch.no_grad():
k_train = trade(ca_train, N, signal_neur)
k_test = trade(ca_test, N, signal_neur)
if (e % 10 == 0):
print(f"Epoch {e:3d}: k_train={k_train:6.4f}, k_test={k_test:6.4f}")
with torch.no_grad():
trade(ca_train, N, signal_neur, True)
trade(ca_test, N, signal_neur, True)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)
# optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
# optimizer = torch.optim.RMSprop(model.parameters(), lr=1e-3)
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
ca_train = ca[A:B]
ca_test = ca[C:D]
def train_mv():
for cv in cours[0:26]:
#print(cv[0])
ncv = nc1(cv)
ncv_train = ncv[130:400]
model.train()
k = trade(ncv_train, N, signal_neur)
loss = -k
optimizer.zero_grad()
loss.backward()
optimizer.step()
model = NeuralNetwork().to(device)
print(model)
# Commented out IPython magic to ensure Python compatibility.
# %%script echo Disabled
# # print parameters
# print(model.parameters())
# for p in model.parameters():
# print(p)
x = torch.tensor(0.123).to(device)
print(x)
p = torch.nn.Parameter(x)
print(p)
def test():
sk_train = 0
sk_test = 0
for cv in cours[0:26]:
ncv = nc1(cv)
ncv_train = ncv[130:400]
ncv_test = ncv[0:100]
k_train = trade(ncv_train, N, signal_neur)
k_test = trade(ncv_test, N, signal_neur)
sk_train = sk_train + k_train
sk_test = sk_test + k_test
#print(f"Epoch {e:3d} {cv[0][1]}: k_test={k_test:6.4f}")
km_train = sk_train / 26
km_test = sk_test / 26
return km_train
"""Search for a good seed to initialize model parameters"""
best_seed = 0
best_km = 0
for seed in range(10):
torch.manual_seed(seed)
model = NeuralNetwork().to(device)
km = test()
print(f"seed {seed}: km={km}")
if km > best_km:
best_seed = seed
best_km = km
print(f"best seed: {best_seed}; best km:{best_km}")
"""Initialize model parameters with the best seed found"""
torch.manual_seed(best_seed)
model = NeuralNetwork().to(device)
"""Train the model starting from good parameters with several values"""
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
# optimizer = torch.optim.RMSprop(model.parameters(), lr=1e-3)
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# optimizer = torch.optim.Adagrad(model.parameters())
# ca_train = ca[100:300]
# ca_test = ca[0:100]
# ca_train = ca[0:160]
# ca_test = ca[160:400]
ca_train = ca[A:B]
ca_test = ca[C:D]
with torch.no_grad():
k = trade(ca_train, N, signal_neur)
print("Result before training:")
print(k)
def train():
for cv in cours[0:26]:
ncv = nc1(cv)
ncv_train = ncv[130:400]
model.train()
k = trade(ncv_train, N, signal_neur)
loss = -k
optimizer.zero_grad()
loss.backward()
optimizer.step()
epochs = 10
for e in range(epochs):
train()
with torch.no_grad():
#k_train = trade(ca_train, 30, signal_neur)
#k_test = trade(ca_test, 30, signal_neur)
if (e % 1 == 0):
#print(" ")
sk_train = 0
sk_test = 0
for cv in cours[0:26]:
ncv = nc1(cv)
ncv_train = ncv[130:400]
ncv_test = ncv[0:100]
k_train = trade(ncv_train, N, signal_neur)
k_test = trade(ncv_test, N, signal_neur)
sk_train = sk_train + k_train
sk_test = sk_test + k_test
#print(f"Epoch {e:3d} {cv[0][1]}: k_test={k_test:6.4f}")
km_train = sk_train / 26
km_test = sk_test / 26
print(f"Epoch {e:3d}: train:{km_train:7.5f} test:{km_test:7.5f}")
with torch.no_grad():
k = trade(ca_train, N, signal_neur)
print("Result after training:")
print(k)
with torch.no_grad():
trade(ca_train, N, signal_neur, True)
trade(ca_test, N, signal_neur, True)
# Commented out IPython magic to ensure Python compatibility.
# %%script echo Disabled
#
# with torch.no_grad():
# k = trade(ca_train, N, signal_neur)
# print("Result before training:")
# print(k)
#
# for e in range(10):
# print(e)
# train()
#
# with torch.no_grad():
# k = trade(ca_train, N, signal_neur)
# print("Result after training:")
# print(k)
"""List of values"""
for cv in cours:
print(cv[0])
epochs = 1
for e in range(epochs):
train_mv()
with torch.no_grad():
k_train = trade(ca_train, N, signal_neur)
k_test = trade(ca_test, N, signal_neur)
if (e % 1 == 0):
print(f"Epoch {e:3d}: k_train={k_train:6.4f}, k_test={k_test:6.4f}")
with torch.no_grad():
trade(ca_train, N, signal_neur, True)
trade(ca_test, N, signal_neur, True)