View file src/colab/huobi_ai.py - Download
# -*- coding: utf-8 -*-
"""huobi_ai.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1U385BdwkfCxVVLCbHGUHMGwUIkhyaE1I
"""
# Commented out IPython magic to ensure Python compatibility.
# %matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
!git clone https://github.com/ericls/huobi.git
# Commented out IPython magic to ensure Python compatibility.
# %cd huobi
# Commented out IPython magic to ensure Python compatibility.
# %%writefile market_tickers.py
#
# market_tickers = Endpoint(
# method='get',
# path='/market/tickers',
# auth_required=False,
# params={
# }
# )
!cat market_tickers.py >> huobi/rest/endpoints/market.py
from huobi import HuobiRestClient
from google.colab import userdata
client = HuobiRestClient("", "")
tickers = client.market_tickers().data['data']
symbols = [ticker['symbol'] for ticker in tickers]
print(f"{len(symbols)} symbols")
i = 0
klines = dict()
for symbol in symbols:
i = i + 1
print(f"{i:3}) Get klines for {symbol}")
klines1 = client.market_history_kline(symbol=symbol, period='1min', size=2000)
klines[symbol] = list(reversed(klines1.data['data']))
npast = 5
nfuture = 5
input_size = 6 * (npast - 1)
print(f"Input size: {input_size}")
intermediate_size_1 = 4 * (npast - 1)
intermediate_size_2 = 2 * (npast - 1)
import torch
from torch import nn
# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")
# Define model
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
# self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(input_size, intermediate_size_1),
nn.ReLU(),
nn.Linear(intermediate_size_1, intermediate_size_2),
nn.ReLU(),
nn.Linear(intermediate_size_2, 1)
)
def forward(self, x):
# x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
model = NeuralNetwork().to(device)
print(model)
train_begin = 0
train_end = 1000
def variation(cur, prev):
if cur == 0 and prev == 0:
return 0
else:
return 2.0 * (cur - prev) / (cur + prev)
loss_function = nn.MSELoss()
def train():
model.train()
n = 0
total_loss = 0
for symbol in symbols:
n = n + 1
klines_symbol = klines[symbol][train_begin:train_end]
nklines = len(klines_symbol)
if nklines == train_end - train_begin:
inputs = []
targets = []
for shift in range(nklines - npast - nfuture):
past_klines = klines_symbol[shift : shift+npast]
future_klines = klines_symbol[shift+npast : shift+npast+nfuture]
input = []
for i in range(len(past_klines) - 1):
prev = past_klines[i]
cur = past_klines[i+1]
item = [cur['open']/prev['close'], cur['close']/prev['close'], cur['low']/prev['close'], cur['high']/prev['close'], variation(cur['vol'], prev['vol']), variation(cur['count'], prev['count'])]
input = input + item
inputs = inputs + [input]
max = 0
for kline in future_klines:
if kline['high'] > max:
max = kline['high']
progression = max / future_klines[0]['high']
targets = targets + [[progression]]
tinputs = torch.FloatTensor(inputs).to(device)
ttargets = torch.FloatTensor(targets).to(device)
predictions = model(tinputs)
loss = loss_function(predictions, ttargets)
total_loss = total_loss + loss
# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f" Train loss: {total_loss}")
test_begin = 1000
test_end = 2000
def test():
model.eval()
n = 0
total_loss = 0
for symbol in symbols:
n = n + 1
klines_symbol = klines[symbol][test_begin:test_end]
nklines = len(klines_symbol)
if nklines == test_end - test_begin:
inputs = []
targets = []
for shift in range(nklines - npast - nfuture):
past_klines = klines_symbol[shift : shift+npast]
future_klines = klines_symbol[shift+npast : shift+npast+nfuture]
input = []
for i in range(len(past_klines) - 1):
prev = past_klines[i]
cur = past_klines[i+1]
item = [cur['open']/prev['close'], cur['close']/prev['close'], cur['low']/prev['close'], cur['high']/prev['close'], variation(cur['vol'], prev['vol']), variation(cur['count'], prev['count'])]
input = input + item
inputs = inputs + [input]
max = 0
for kline in future_klines:
if kline['high'] > max:
max = kline['high']
progression = max / future_klines[0]['high']
targets = targets + [[progression]]
tinputs = torch.FloatTensor(inputs).to(device)
ttargets = torch.FloatTensor(targets).to(device)
predictions = model(tinputs)
loss = loss_function(predictions, ttargets)
total_loss = total_loss + loss
print(f" Test loss : {total_loss}")
def test_graph():
model.eval()
n = 0
total_loss = 0
for symbol in symbols:
n = n + 1
klines_symbol = klines[symbol][test_begin:test_end]
nklines = len(klines_symbol)
if nklines == test_end - test_begin:
inputs = []
targets = []
for shift in range(nklines - npast - nfuture):
past_klines = klines_symbol[shift : shift+npast]
future_klines = klines_symbol[shift+npast : shift+npast+nfuture]
input = []
for i in range(len(past_klines) - 1):
prev = past_klines[i]
cur = past_klines[i+1]
item = [cur['open']/prev['close'], cur['close']/prev['close'], cur['low']/prev['close'], cur['high']/prev['close'], variation(cur['vol'], prev['vol']), variation(cur['count'], prev['count'])]
input = input + item
inputs = inputs + [input]
max = 0
for kline in future_klines:
if kline['high'] > max:
max = kline['high']
progression = max / future_klines[0]['high']
targets = targets + [[progression]]
tinputs = torch.FloatTensor(inputs).to(device)
ttargets = torch.FloatTensor(targets).to(device)
predictions = model(tinputs)
nptargets = [item[0] for item in ttargets.cpu().detach().numpy()]
nppredictions = [item[0] for item in predictions.cpu().detach().numpy()]
plt.scatter(nppredictions, nptargets, s=1)
# cc = np.corrcoef(nptargets, nppredictions)
# print(f"Correlations: \n{cc}")
loss = loss_function(predictions, ttargets)
total_loss = total_loss + loss
print(f" Test loss : {total_loss}")
model = NeuralNetwork().to(device)
print(model)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
epochs = 5
for e in range(epochs):
print(f"Epoch {e}:")
train()
test()
test_graph()