View file src/colab/huobi_ai.py - Download

# -*- coding: utf-8 -*-
"""huobi_ai.ipynb

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1U385BdwkfCxVVLCbHGUHMGwUIkhyaE1I
"""

# Commented out IPython magic to ensure Python compatibility.
# %matplotlib inline
import matplotlib.pyplot as plt
import numpy as np

!git clone https://github.com/ericls/huobi.git

# Commented out IPython magic to ensure Python compatibility.
# %cd huobi

# Commented out IPython magic to ensure Python compatibility.
# %%writefile market_tickers.py
# 
#     market_tickers = Endpoint(
#         method='get',
#         path='/market/tickers',
#         auth_required=False,
#         params={
#         }
#     )

!cat market_tickers.py >> huobi/rest/endpoints/market.py

from huobi import HuobiRestClient
from google.colab import userdata

client = HuobiRestClient("", "")

tickers = client.market_tickers().data['data']
symbols = [ticker['symbol'] for ticker in tickers]
print(f"{len(symbols)} symbols")

i = 0
klines = dict()
for symbol in symbols:
  i = i + 1
  print(f"{i:3}) Get klines for {symbol}")
  klines1 = client.market_history_kline(symbol=symbol, period='1min', size=2000)
  klines[symbol] = list(reversed(klines1.data['data']))

npast = 5
nfuture = 5

input_size = 6 * (npast - 1)
print(f"Input size: {input_size}")
intermediate_size_1 = 4 * (npast - 1)
intermediate_size_2 = 2 * (npast - 1)

import torch
from torch import nn

# Get cpu or gpu device for training.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        # self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(input_size, intermediate_size_1),
            nn.ReLU(),
            nn.Linear(intermediate_size_1, intermediate_size_2),
            nn.ReLU(),
            nn.Linear(intermediate_size_2, 1)
        )

    def forward(self, x):
        # x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

model = NeuralNetwork().to(device)
print(model)

train_begin = 0
train_end = 1000

def variation(cur, prev):
  if cur == 0 and prev == 0:
    return 0
  else:
    return 2.0 * (cur - prev) / (cur + prev)

loss_function = nn.MSELoss()

def train():

  model.train()
  n = 0
  total_loss = 0

  for symbol in symbols:

    n = n + 1
    klines_symbol = klines[symbol][train_begin:train_end]
    nklines = len(klines_symbol)

    if nklines == train_end - train_begin:

      inputs = []
      targets = []

      for shift in range(nklines - npast - nfuture):
        past_klines = klines_symbol[shift : shift+npast]
        future_klines = klines_symbol[shift+npast : shift+npast+nfuture]
        input = []
        for i in range(len(past_klines) - 1):
          prev = past_klines[i]
          cur = past_klines[i+1]
          item = [cur['open']/prev['close'], cur['close']/prev['close'], cur['low']/prev['close'], cur['high']/prev['close'], variation(cur['vol'], prev['vol']), variation(cur['count'], prev['count'])]
          input = input + item
        inputs = inputs + [input]
        max = 0
        for kline in future_klines:
          if kline['high'] > max:
            max = kline['high']
        progression = max / future_klines[0]['high']
        targets = targets + [[progression]]

      tinputs = torch.FloatTensor(inputs).to(device)
      ttargets = torch.FloatTensor(targets).to(device)
      predictions = model(tinputs)
      loss = loss_function(predictions, ttargets)
      total_loss = total_loss + loss

      # Backpropagation
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

  print(f" Train loss: {total_loss}")

test_begin = 1000
test_end = 2000

def test():

  model.eval()
  n = 0
  total_loss = 0

  for symbol in symbols:

    n = n + 1
    klines_symbol = klines[symbol][test_begin:test_end]
    nklines = len(klines_symbol)

    if nklines == test_end - test_begin:

      inputs = []
      targets = []

      for shift in range(nklines - npast - nfuture):
        past_klines = klines_symbol[shift : shift+npast]
        future_klines = klines_symbol[shift+npast : shift+npast+nfuture]
        input = []
        for i in range(len(past_klines) - 1):
          prev = past_klines[i]
          cur = past_klines[i+1]
          item = [cur['open']/prev['close'], cur['close']/prev['close'], cur['low']/prev['close'], cur['high']/prev['close'], variation(cur['vol'], prev['vol']), variation(cur['count'], prev['count'])]
          input = input + item
        inputs = inputs + [input]
        max = 0
        for kline in future_klines:
          if kline['high'] > max:
            max = kline['high']
        progression = max / future_klines[0]['high']
        targets = targets + [[progression]]

      tinputs = torch.FloatTensor(inputs).to(device)
      ttargets = torch.FloatTensor(targets).to(device)
      predictions = model(tinputs)
      loss = loss_function(predictions, ttargets)
      total_loss = total_loss + loss

  print(f" Test loss : {total_loss}")

def test_graph():

  model.eval()
  n = 0
  total_loss = 0

  for symbol in symbols:

    n = n + 1
    klines_symbol = klines[symbol][test_begin:test_end]
    nklines = len(klines_symbol)

    if nklines == test_end - test_begin:

      inputs = []
      targets = []

      for shift in range(nklines - npast - nfuture):
        past_klines = klines_symbol[shift : shift+npast]
        future_klines = klines_symbol[shift+npast : shift+npast+nfuture]
        input = []
        for i in range(len(past_klines) - 1):
          prev = past_klines[i]
          cur = past_klines[i+1]
          item = [cur['open']/prev['close'], cur['close']/prev['close'], cur['low']/prev['close'], cur['high']/prev['close'], variation(cur['vol'], prev['vol']), variation(cur['count'], prev['count'])]
          input = input + item
        inputs = inputs + [input]
        max = 0
        for kline in future_klines:
          if kline['high'] > max:
            max = kline['high']
        progression = max / future_klines[0]['high']
        targets = targets + [[progression]]

      tinputs = torch.FloatTensor(inputs).to(device)
      ttargets = torch.FloatTensor(targets).to(device)
      predictions = model(tinputs)

      nptargets = [item[0] for item in ttargets.cpu().detach().numpy()]
      nppredictions = [item[0] for item in predictions.cpu().detach().numpy()]

      plt.scatter(nppredictions, nptargets, s=1)

      # cc = np.corrcoef(nptargets, nppredictions)
      # print(f"Correlations: \n{cc}")

      loss = loss_function(predictions, ttargets)
      total_loss = total_loss + loss

  print(f" Test loss : {total_loss}")

model = NeuralNetwork().to(device)
print(model)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 5
for e in range(epochs):
  print(f"Epoch {e}:")
  train()
  test()

test_graph()