Avertissement: Ce code vise à partager publiquement des exemples de code. Nous déclinons toutes responsabilité en cas de perte fiancière.
Créer un dossier nommé "csv".
Dans le dossier "csv", mettre ces fichiers. Ceux-ci sont les données de prix de la bourse japonaise.
Télécharger les données: https://github.com/shunakanishi/japanese_stockprice
Maintenant, aller dans le premier répertoire et créer un fichier de python 3.
Et insérer le code suivant:
#-*- coding: utf-8 -*-
import numpy
import pandas
import matplotlib.pyplot as plt
from sklearn import preprocessing
from keras.models import Sequential
from keras.models import model_from_json
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
import keras.backend.tensorflow_backend as KTF
import os.path
class Prediction :
def __init__(self):
self.length_of_sequences = 10
self.in_out_neurons = 1
self.hidden_neurons = 300
def load_data(self, data, n_prev=10):
X, Y = [], []
for i in range(len(data) - n_prev):
X.append(data.iloc[i:(i+n_prev)].as_matrix())
Y.append(data.iloc[i+n_prev].as_matrix())
retX = numpy.array(X)
retY = numpy.array(Y)
return retX, retY
def create_model(self, f_model, model_filename, weights_filename) :
print(os.path.join(f_model,model_filename))
if os.path.isfile(os.path.join(f_model,model_filename)):
print('Saved parameters found. I will use this file...')
model = Sequential()
model.add(LSTM(self.hidden_neurons, \
batch_input_shape=(None, self.length_of_sequences, self.in_out_neurons), \
return_sequences=False))
model.add(Dense(self.in_out_neurons))
model.add(Activation("linear"))
model.compile(loss="mape", optimizer="adam")
model.load_weights(os.path.join(f_model,weights_filename))
else:
print('Saved parameters Not found. Creating new one...')
model = Sequential()
model.add(LSTM(self.hidden_neurons, \
batch_input_shape=(None, self.length_of_sequences, self.in_out_neurons), \
return_sequences=False))
model.add(Dense(self.in_out_neurons))
model.add(Activation("linear"))
model.compile(loss="mape", optimizer="adam")
return model
def train(self, f_model, model_filename, weights_filename, X_train, y_train) :
model = self.create_model(f_model, model_filename, weights_filename)
# Learn
model.fit(X_train, y_train, batch_size=10, epochs=15)
return model
if __name__ == "__main__":
f_log = './log'
f_model = './model/stockprice'
model_filename = 'stockprice_model.json'
yaml_filename = 'stockprice_model.yaml'
weights_filename = 'stockprice_model_weights.hdf5'
prediction = Prediction()
# Data
data = None
for year in range(2007, 2017):
data_ = pandas.read_csv('csv/indices_I101_1d_' + str(year) + '.csv')
data = data_ if (data is None) else pandas.concat([data, data_])
data.columns = ['date', 'open', 'high', 'low', 'close']
data['date'] = pandas.to_datetime(data['date'], format='%Y-%m-%d')
# Data of closing price
data['close'] = preprocessing.scale(data['close'])
data = data.sort_values(by='date')
data = data.reset_index(drop=True)
data = data.loc[:, ['date', 'close']]
# 20% of the data is used as test data.
split_pos = int(len(data) * 0.8)
x_train, y_train = prediction.load_data(data[['close']].iloc[0:split_pos], prediction.length_of_sequences)
x_test, y_test = prediction.load_data(data[['close']].iloc[split_pos:], prediction.length_of_sequences)
old_session = KTF.get_session()
model = prediction.train(f_model, model_filename, weights_filename, x_train, y_train)
predicted = model.predict(x_test)
json_string = model.to_json()
open(os.path.join(f_model,model_filename), 'w').write(json_string)
yaml_string = model.to_yaml()
open(os.path.join(f_model,yaml_filename), 'w').write(yaml_string)
print('save weights')
model.save_weights(os.path.join(f_model,weights_filename))
KTF.set_session(old_session)
result = pandas.DataFrame(predicted)
result.columns = ['predict']
result['actual'] = y_test
result.plot()
plt.show()
import numpy
import pandas
import matplotlib.pyplot as plt
from sklearn import preprocessing
from keras.models import Sequential
from keras.models import model_from_json
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
import keras.backend.tensorflow_backend as KTF
import os.path
class Prediction :
def __init__(self):
self.length_of_sequences = 10
self.in_out_neurons = 1
self.hidden_neurons = 300
def load_data(self, data, n_prev=10):
X, Y = [], []
for i in range(len(data) - n_prev):
X.append(data.iloc[i:(i+n_prev)].as_matrix())
Y.append(data.iloc[i+n_prev].as_matrix())
retX = numpy.array(X)
retY = numpy.array(Y)
return retX, retY
def create_model(self, f_model, model_filename, weights_filename) :
print(os.path.join(f_model,model_filename))
if os.path.isfile(os.path.join(f_model,model_filename)):
print('Saved parameters found. I will use this file...')
model = Sequential()
model.add(LSTM(self.hidden_neurons, \
batch_input_shape=(None, self.length_of_sequences, self.in_out_neurons), \
return_sequences=False))
model.add(Dense(self.in_out_neurons))
model.add(Activation("linear"))
model.compile(loss="mape", optimizer="adam")
model.load_weights(os.path.join(f_model,weights_filename))
else:
print('Saved parameters Not found. Creating new one...')
model = Sequential()
model.add(LSTM(self.hidden_neurons, \
batch_input_shape=(None, self.length_of_sequences, self.in_out_neurons), \
return_sequences=False))
model.add(Dense(self.in_out_neurons))
model.add(Activation("linear"))
model.compile(loss="mape", optimizer="adam")
return model
def train(self, f_model, model_filename, weights_filename, X_train, y_train) :
model = self.create_model(f_model, model_filename, weights_filename)
# Learn
model.fit(X_train, y_train, batch_size=10, epochs=15)
return model
if __name__ == "__main__":
f_log = './log'
f_model = './model/stockprice'
model_filename = 'stockprice_model.json'
yaml_filename = 'stockprice_model.yaml'
weights_filename = 'stockprice_model_weights.hdf5'
prediction = Prediction()
# Data
data = None
for year in range(2007, 2017):
data_ = pandas.read_csv('csv/indices_I101_1d_' + str(year) + '.csv')
data = data_ if (data is None) else pandas.concat([data, data_])
data.columns = ['date', 'open', 'high', 'low', 'close']
data['date'] = pandas.to_datetime(data['date'], format='%Y-%m-%d')
# Data of closing price
data['close'] = preprocessing.scale(data['close'])
data = data.sort_values(by='date')
data = data.reset_index(drop=True)
data = data.loc[:, ['date', 'close']]
# 20% of the data is used as test data.
split_pos = int(len(data) * 0.8)
x_train, y_train = prediction.load_data(data[['close']].iloc[0:split_pos], prediction.length_of_sequences)
x_test, y_test = prediction.load_data(data[['close']].iloc[split_pos:], prediction.length_of_sequences)
old_session = KTF.get_session()
model = prediction.train(f_model, model_filename, weights_filename, x_train, y_train)
predicted = model.predict(x_test)
json_string = model.to_json()
open(os.path.join(f_model,model_filename), 'w').write(json_string)
yaml_string = model.to_yaml()
open(os.path.join(f_model,yaml_filename), 'w').write(yaml_string)
print('save weights')
model.save_weights(os.path.join(f_model,weights_filename))
KTF.set_session(old_session)
result = pandas.DataFrame(predicted)
result.columns = ['predict']
result['actual'] = y_test
result.plot()
plt.show()
Pour enregistrer le model et les paramètres, créer un dossier nommé "model" et un dossier nommé "log".
Et dans le dossier "model", créer un dossier nommé "stockprice".
Et faire ce commande:
$ sudo python3 stockprice.py
Le résultat
Le model et les paramètres entraîné sont enregistré dans le dossier "model" -> "stockprice".
Vous pouvez obtenir des donnes de bourse:
Des données de Nikkei
https://finance.yahoo.com/quote/%5EN225/history?ltr=1
Des données de NY Dow
https://finance.yahoo.com/quote/%5EDJI/history?ltr=1
Des données de Nasdaq
https://finance.yahoo.com/quote/%5EIXIC/history?ltr=1
Et enregistrez les donnes comme "stock.csv" dans le dossier de csv.
Maintenant ouvrez le fichier de "stockprice.py" et copier-coller ça dans le fichier:
#-*- coding: utf-8 -*-
import numpy
import pandas
import matplotlib.pyplot as plt
from decimal import *
import sys
from keras.models import load_model
from sklearn import preprocessing
from sklearn.preprocessing import StandardScaler
from keras.models import Sequential
from keras.models import model_from_json
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
import keras.backend.tensorflow_backend as KTF
import os.path
from datetime import datetime, timedelta
class Prediction :
def load_data(self, data, scaler):
# Data of closing price
scaler.fit(data[['Close']])
price_data = data[['Close']]
data['Close'] = scaler.transform(data[['Close']])
data = data.sort_values(by='Date')
data = data.reset_index(drop=True)
data = data.loc[:, ['Date', 'Close']]
X, Y = [], []
Dates = []
Prices = []
close_data = data[['Close']]
date_data = data[['Date']]
for i in range(len(close_data) - 10):
if i+11 < len(close_data):
Dates.append(date_data.iloc[[i+11]].iloc[0]['Date'])
Prices.append(price_data.iloc[[i+11]].iloc[0]['Close'])
else:
Dates.append(date_data.iloc[[i+10]].iloc[0]['Date']+timedelta(days=1))
Prices.append('Not applicable.')
X.append(close_data.iloc[i:(i+10)].as_matrix())
Y.append(close_data.iloc[i+10].as_matrix())
retX = numpy.array(X)
retY = numpy.array(Y)
return retX, retY, Dates, Prices
def create_model(self, f_model, model_filename):
print(os.path.join(f_model,model_filename))
if os.path.isfile(os.path.join(f_model,model_filename)):
print('Saved parameters found. I will use this file...')
model = load_model(os.path.join(f_model,model_filename))
else:
print('Saved parameters weren\'t found')
return
return model
if __name__ == "__main__":
f_log = './'
f_model = './'
model_filename = 'stockprice_model.hdf5'
prediction = Prediction()
# Data
data = None
try:
csv_loc = str(sys.argv[1])
except NameError:
print("Please give a location of the csv file.")
if(csv_loc == ""):
print("Please give a location of the csv file.")
print(csv_loc)
data = pandas.read_csv(csv_loc)
data = data.drop('Volume',axis=1)
data = data.drop('Adj Close',axis=1)
data.columns = ['Date', 'Open', 'High', 'Low', 'Close']
data['Date'] = pandas.to_datetime(data['Date'])
print(data)
scaler = StandardScaler()
x_test, y_test, Dates, Prices = prediction.load_data(data, scaler)
model = prediction.create_model(f_model, model_filename)
predicted = model.predict(x_test, verbose=1)
FalseResult = 0
TrueResult = 0
for idx,p in enumerate(predicted):
print('Date:' + str(Dates[idx].year) + '/' + str(Dates[idx].month) + '/' + str(Dates[idx].day) + ', Closing price (Predicted): '+ str(float(scaler.inverse_transform(p))))
print('Date:' + str(Dates[idx].year) + '/' + str(Dates[idx].month) + '/' + str(Dates[idx].day) + ', Closing price (Actual): '+ str(Prices[idx]))
dif1 = 0
dif2 = 0
dif3 = 0
was_high_low_correct = False
if idx > 0 and not isinstance(Prices[idx], str) :
dif1 = float(scaler.inverse_transform(p)) - float(Prices[idx])
dif2 = float(Prices[idx-1]) - float(Prices[idx])
dif3 = float(Prices[idx-1]) - float(scaler.inverse_transform(p))
if (dif2 < 0 and dif3 < 0) or (dif2 > 0 and dif3 > 0) or (dif2 == 0 and dif3 == 0):
was_high_low_correct = True
else:
dif1 = 'Not applicable.'
dif2 = 'Not applicable.'
dif3 = 'Not applicable.'
was_high_low_correct = 'Not applicable.'
print('Difference between actual and previous price :' + str(dif2))
print('Difference between predicted and previous price :' + str(dif3))
print('Prediction of high and low was correct? : ' + str(was_high_low_correct))
print('Difference between predicted and actual price : ' + str(dif1))
print('')
if was_high_low_correct :
TrueResult = TrueResult + 1
else:
FalseResult = FalseResult + 1
print('Num of True: ' + str(TrueResult))
print('Num of False: ' + str(FalseResult))
print('Rate of true: ' + str((TrueResult/(FalseResult+TrueResult))*100) + '%')
result = pandas.DataFrame(scaler.inverse_transform(predicted))
result.columns = ['predict']
result['actual'] = scaler.inverse_transform(y_test)
result.plot()
plt.show()
import numpy
import pandas
import matplotlib.pyplot as plt
from decimal import *
import sys
from keras.models import load_model
from sklearn import preprocessing
from sklearn.preprocessing import StandardScaler
from keras.models import Sequential
from keras.models import model_from_json
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
import keras.backend.tensorflow_backend as KTF
import os.path
from datetime import datetime, timedelta
class Prediction :
def load_data(self, data, scaler):
# Data of closing price
scaler.fit(data[['Close']])
price_data = data[['Close']]
data['Close'] = scaler.transform(data[['Close']])
data = data.sort_values(by='Date')
data = data.reset_index(drop=True)
data = data.loc[:, ['Date', 'Close']]
X, Y = [], []
Dates = []
Prices = []
close_data = data[['Close']]
date_data = data[['Date']]
for i in range(len(close_data) - 10):
if i+11 < len(close_data):
Dates.append(date_data.iloc[[i+11]].iloc[0]['Date'])
Prices.append(price_data.iloc[[i+11]].iloc[0]['Close'])
else:
Dates.append(date_data.iloc[[i+10]].iloc[0]['Date']+timedelta(days=1))
Prices.append('Not applicable.')
X.append(close_data.iloc[i:(i+10)].as_matrix())
Y.append(close_data.iloc[i+10].as_matrix())
retX = numpy.array(X)
retY = numpy.array(Y)
return retX, retY, Dates, Prices
def create_model(self, f_model, model_filename):
print(os.path.join(f_model,model_filename))
if os.path.isfile(os.path.join(f_model,model_filename)):
print('Saved parameters found. I will use this file...')
model = load_model(os.path.join(f_model,model_filename))
else:
print('Saved parameters weren\'t found')
return
return model
if __name__ == "__main__":
f_log = './'
f_model = './'
model_filename = 'stockprice_model.hdf5'
prediction = Prediction()
# Data
data = None
try:
csv_loc = str(sys.argv[1])
except NameError:
print("Please give a location of the csv file.")
if(csv_loc == ""):
print("Please give a location of the csv file.")
print(csv_loc)
data = pandas.read_csv(csv_loc)
data = data.drop('Volume',axis=1)
data = data.drop('Adj Close',axis=1)
data.columns = ['Date', 'Open', 'High', 'Low', 'Close']
data['Date'] = pandas.to_datetime(data['Date'])
print(data)
scaler = StandardScaler()
x_test, y_test, Dates, Prices = prediction.load_data(data, scaler)
model = prediction.create_model(f_model, model_filename)
predicted = model.predict(x_test, verbose=1)
FalseResult = 0
TrueResult = 0
for idx,p in enumerate(predicted):
print('Date:' + str(Dates[idx].year) + '/' + str(Dates[idx].month) + '/' + str(Dates[idx].day) + ', Closing price (Predicted): '+ str(float(scaler.inverse_transform(p))))
print('Date:' + str(Dates[idx].year) + '/' + str(Dates[idx].month) + '/' + str(Dates[idx].day) + ', Closing price (Actual): '+ str(Prices[idx]))
dif1 = 0
dif2 = 0
dif3 = 0
was_high_low_correct = False
if idx > 0 and not isinstance(Prices[idx], str) :
dif1 = float(scaler.inverse_transform(p)) - float(Prices[idx])
dif2 = float(Prices[idx-1]) - float(Prices[idx])
dif3 = float(Prices[idx-1]) - float(scaler.inverse_transform(p))
if (dif2 < 0 and dif3 < 0) or (dif2 > 0 and dif3 > 0) or (dif2 == 0 and dif3 == 0):
was_high_low_correct = True
else:
dif1 = 'Not applicable.'
dif2 = 'Not applicable.'
dif3 = 'Not applicable.'
was_high_low_correct = 'Not applicable.'
print('Difference between actual and previous price :' + str(dif2))
print('Difference between predicted and previous price :' + str(dif3))
print('Prediction of high and low was correct? : ' + str(was_high_low_correct))
print('Difference between predicted and actual price : ' + str(dif1))
print('')
if was_high_low_correct :
TrueResult = TrueResult + 1
else:
FalseResult = FalseResult + 1
print('Num of True: ' + str(TrueResult))
print('Num of False: ' + str(FalseResult))
print('Rate of true: ' + str((TrueResult/(FalseResult+TrueResult))*100) + '%')
result = pandas.DataFrame(scaler.inverse_transform(predicted))
result.columns = ['predict']
result['actual'] = scaler.inverse_transform(y_test)
result.plot()
plt.show()
Et exécutez le script:
$ sudo python3 stockprice.py downloaded_stockprice.csv
Et l'apprentissage va démarrer.
Aucun commentaire:
Enregistrer un commentaire