|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
import tensorflow as tf |
|
|
from sklearn.preprocessing import MinMaxScaler |
|
|
import os |
|
|
|
|
|
|
|
|
TIME_STEP = 100 |
|
|
MODEL_PATH = 'stock_prediction_model.h5' |
|
|
|
|
|
|
|
|
def create_lstm_model(): |
|
|
"""Defines the Bidirectional LSTM model architecture used for training.""" |
|
|
model = tf.keras.models.Sequential() |
|
|
model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, return_sequences=True), input_shape=(TIME_STEP, 1))) |
|
|
model.add(tf.keras.layers.Dropout(0.3)) |
|
|
model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True))) |
|
|
model.add(tf.keras.layers.Dropout(0.3)) |
|
|
model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32))) |
|
|
model.add(tf.keras.layers.Dense(64, activation='relu')) |
|
|
model.add(tf.keras.layers.Dense(1)) |
|
|
model.compile(loss='mse', optimizer='adam') |
|
|
return model |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
model = tf.keras.models.load_model(MODEL_PATH) |
|
|
print(f"Successfully loaded model from {MODEL_PATH}") |
|
|
except Exception as e: |
|
|
|
|
|
|
|
|
print(f"Warning: Could not load {MODEL_PATH}. Error: {e}") |
|
|
print("Initializing a dummy model. Please ensure your 'stock_prediction_model.h5' is uploaded.") |
|
|
model = create_lstm_model() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def forecast_stock(csv_file, days_to_predict=30): |
|
|
""" |
|
|
Takes an uploaded CSV file containing stock data, extracts 'Close' prices, |
|
|
and forecasts the next 'days_to_predict' using the loaded LSTM model. |
|
|
""" |
|
|
if csv_file is None: |
|
|
return None, "Error: Please upload a CSV file.", None |
|
|
|
|
|
try: |
|
|
|
|
|
df = pd.read_csv(csv_file.name) |
|
|
|
|
|
|
|
|
if 'Close' not in df.columns: |
|
|
return None, "Error: CSV must contain a 'Close' price column.", None |
|
|
|
|
|
|
|
|
ds_close = df.reset_index()['Close'].values.reshape(-1, 1) |
|
|
scaler = MinMaxScaler(feature_range=(0, 1)) |
|
|
ds_close_scaled = scaler.fit_transform(ds_close) |
|
|
|
|
|
|
|
|
if len(ds_close_scaled) < TIME_STEP: |
|
|
return None, f"Error: Dataset must contain at least {TIME_STEP} entries for initial prediction.", None |
|
|
|
|
|
|
|
|
x_input = ds_close_scaled[-TIME_STEP:].reshape(1, -1) |
|
|
temp_input = list(x_input[0]) |
|
|
|
|
|
lst_output = [] |
|
|
i = 0 |
|
|
|
|
|
|
|
|
while i < days_to_predict: |
|
|
if len(temp_input) > TIME_STEP: |
|
|
|
|
|
x_input = np.array(temp_input[1:]) |
|
|
x_input = x_input.reshape(1, TIME_STEP, 1) |
|
|
temp_input = temp_input[1:] |
|
|
else: |
|
|
x_input = np.array(temp_input).reshape(1, TIME_STEP, 1) |
|
|
|
|
|
|
|
|
yhat = model.predict(x_input, verbose=0) |
|
|
|
|
|
|
|
|
temp_input.extend(yhat[0].tolist()) |
|
|
lst_output.extend(yhat.tolist()) |
|
|
i = i + 1 |
|
|
|
|
|
|
|
|
predicted_prices = scaler.inverse_transform(lst_output) |
|
|
|
|
|
|
|
|
plt.figure(figsize=(10, 6)) |
|
|
|
|
|
|
|
|
actual_prices = scaler.inverse_transform(ds_close_scaled) |
|
|
day_actual = np.arange(len(actual_prices) - TIME_STEP, len(actual_prices)) |
|
|
|
|
|
plt.plot(day_actual, actual_prices[-TIME_STEP:], label='Last 100 Actual Days', color='blue') |
|
|
|
|
|
|
|
|
day_pred = np.arange(TIME_STEP, TIME_STEP + days_to_predict) |
|
|
plt.plot(day_pred, predicted_prices, label=f'Forecasted {days_to_predict} Days', color='red', linestyle='--') |
|
|
|
|
|
|
|
|
plt.plot([day_actual[-1], day_pred[0]], [actual_prices[-1], predicted_prices[0]], color='red', linestyle='--') |
|
|
|
|
|
plt.title('Stock Price Forecast (LSTM)') |
|
|
plt.xlabel('Days') |
|
|
plt.ylabel('Close Price') |
|
|
plt.legend() |
|
|
plt.grid(True) |
|
|
plot_output = plt |
|
|
|
|
|
|
|
|
|
|
|
last_date = pd.to_datetime(df.iloc[-1]['Date']) if 'Date' in df.columns else pd.to_datetime(df.index[-1]) |
|
|
|
|
|
|
|
|
future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=days_to_predict) |
|
|
|
|
|
df_forecast = pd.DataFrame({ |
|
|
'Date': future_dates.strftime('%Y-%m-%d'), |
|
|
'Forecasted Price': np.round(predicted_prices.flatten(), 2) |
|
|
}) |
|
|
|
|
|
return plot_output, "Forecast successful!", df_forecast |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"An unexpected error occurred: {e}", None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
input_csv = gr.File(label="1. Upload Historical Stock CSV (must include 'Date' and 'Close' columns)") |
|
|
input_days = gr.Slider(minimum=10, maximum=180, value=30, step=1, label="2. Number of days to forecast") |
|
|
|
|
|
|
|
|
output_plot = gr.Plot(label="Price Forecast Visualization") |
|
|
output_message = gr.Textbox(label="Status / Notes", value="Waiting for file upload...") |
|
|
output_df = gr.Dataframe(label="Forecasted Prices Table") |
|
|
|
|
|
|
|
|
iface = gr.Interface( |
|
|
fn=forecast_stock, |
|
|
inputs=[input_csv, input_days], |
|
|
outputs=[output_plot, output_message, output_df], |
|
|
title="LSTM Stock Price Prediction", |
|
|
description="Upload a CSV file of historical stock data and use the pre-trained Bidirectional LSTM model to forecast future closing prices. The model requires the latest 100 data points to make the initial forecast.", |
|
|
allow_flagging='never' |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
iface.launch() |
|
|
|