Team I.A. Italia

6 min

Implementare le reti neurali ricorrenti (RNN) con Python e Keras

Questo tutorial è progettato per chiunque stia cercando di capire il funzionamento delle reti neurali ricorrenti (RNN) e di come utilizzarle tramite la libreria di deep learning Keras.

Implementare le reti neurali ricorrenti (RNN) con Python e Keras

Sebbene tutti i metodi necessari per risolvere problemi e creare applicazioni siano forniti dalla libreria Keras, è anche importante ottenere un'idea di come funziona tutto. In questo articolo, i calcoli che avvengono nel modello RNN sono mostrati passo dopo passo. Successivamente, vedremo come sviluppare un sistema end-to-end completo per la previsione delle serie temporali.

Dopo aver completato questo tutorial, saprai:

  • La struttura di RNN

  • Come RNN calcola l'output quando viene fornito un input

  • Come preparare i dati per un SimpleRNN in Keras

  • Come addestrare un modello SimpleRNN

Iniziamo a capire e Implementare le reti neurali ricorrenti (RNN) con Python e Keras

Panoramica dell'esercitazione

Questo tutorial è diviso in due parti; sono:

  1. La struttura della RNN

    1. Pesi e pregiudizi diversi associati a diversi livelli della RNN.

    2. Come vengono eseguiti i calcoli per calcolare l'output quando viene fornito un input.

  2. Un'applicazione completa per la previsione di serie temporali.

Prerequisiti :

Passiamo ora alla parte di implementazione.

Iniziamo importando le Librerie necessarie

Per avviare l'implementazione delle RNN, aggiungiamo la sezione di importazione.

from pandas import read_csv
 
import numpy as np
 
from keras.models import Sequential
 
from keras.layers import Dense, SimpleRNN
 
from sklearn.preprocessing import MinMaxScaler
 
from sklearn.metrics import mean_squared_error
 
import math
 
import matplotlib.pyplot as plt

Keras SimpleRNN

La funzione seguente restituisce un modello che include un livello SimpleRNN e un livello Dense per l'apprendimento dei dati sequenziali. Specifica il parametro input_shape (time_steps x features). Semplificheremo tutto e utilizzeremo dati univariati, ovvero con una sola caratteristica; i time_steps sono discussi di seguito.

def create_RNN(hidden_units, dense_units, input_shape, activation):
 
model = Sequential()
 
model.add(SimpleRNN(hidden_units, input_shape=input_shape,
 
activation=activation[0]))
 
model.add(Dense(units=dense_units, activation=activation[1]))
 
model.compile(loss='mean_squared_error', optimizer='adam')
 
return model
 

 
demo_model = create_RNN(2, 1, (3,1), activation=['linear', 'linear'])

L'oggetto demo_model viene restituito con 2 unità nascoste create tramite il livello SimpleRNN e 1 unità densa creata tramite il livello Dense. input_shape È impostato su 3×1 e una funzione linear di attivazione viene utilizzata in entrambi i livelli per semplicità. Solo per richiamare la funzione di attivazione lineare F(X)=X non cambia l'input. La rete si presenta come segue:

Se abbiamo m unità nascoste (m=2 nel caso precedente), quindi:

  • Ingresso: X∈R

  • Unità nascosta: h∈Rm

  • Pesi per unità di input: wX∈Rm

  • Pesi per unità nascoste: wh∈RmXm

  • Bias per le unità nascoste: Bh∈Rm

  • Peso per lo strato denso: wy∈Rm

  • Bias per lo strato denso: By∈R

Diamo un'occhiata ai pesi sopra. Nota: poiché i pesi vengono inizializzati in modo casuale, i risultati incollati qui saranno diversi dai tuoi. L'importante è imparare che aspetto ha la struttura di ogni oggetto utilizzato e come interagisce con gli altri per produrre l'output finale.

wx = demo_model.get_weights()[0]
 
wh = demo_model.get_weights()[1]
 
bh = demo_model.get_weights()[2]
 
wy = demo_model.get_weights()[3]
 
by = demo_model.get_weights()[4]
 

 
print('wx = ', wx, ' wh = ', wh, ' bh = ', bh, ' wy =', wy, 'by = ', by)
 

 
Output
 
wx = [[ 0.18662322 -1.2369459 ]]
 
wh = [[ 0.86981213 -0.49338293] [ 0.49338293 0.8698122 ]]
 
bh = [0. 0.]
 
wy = [[-0.4635998] [ 0.6538409]]
 
by = [0.]

Ora facciamo un semplice esperimento per vedere come i livelli di un livello SimpleRNN e Dense producono un output. Tieni presente questa figura.

Implementare le reti neurali ricorrenti (RNN) con Python e Keras

Nell'immagine vengono evidenziati gli Strati di una rete neurale ricorrente

Inseriremo x per tre passaggi temporali e consentiremo alla rete di generare un output. Verranno calcolati i valori delle unità nascoste ai passaggi temporali 1, 2 e 3. h0 è inizializzato al vettore zero. Il risultato O3 è calcolato da h3 e wy. Non è richiesta una funzione di attivazione poiché utilizziamo unità lineari.

x = np.array([1, 2, 3])
 
#Rimodelliamo l'input come richiedono sample_size x time_steps x features
 
x_input = np.reshape(x,(1, 3, 1))
 
y_pred_model = demo_model.predict(x_input)
 

 

 
m = 2
 
h0 = np.zeros(m)
 
h1 = np.dot(x[0], wx) + h0 + bh
 
h2 = np.dot(x[1], wx) + np.dot(h1,wh) + bh
 
h3 = np.dot(x[2], wx) + np.dot(h2,wh) + bh
 
o3 = np.dot(h3, wy) + by
 

 
print('h1 = ', h1,'h2 = ', h2,'h3 = ', h3)
 

 
print("Prediction from network ", y_pred_model)
 
print("Prediction from our computation ", o3)
 

 
Output
 

 
h1 = [[ 0.18662322 -1.23694587]]
 
h2 = [[-0.07471441 -3.64187904]]
 
h3 = [[-1.30195881 -6.84172557]]
 
Prediction from network [[-3.8698118]]
 
Prediction from our computation [[-3.86981216]]

Esecuzione dell' RNN su un set di dati

Ora che capiamo come vengono messi insieme i livelli SimpleRNN e Dense. Eseguiamo un RNN completo su un semplice set di dati di serie temporali. Dovremo seguire questi passaggi

  1. Leggi il set di dati da un determinato URL

  2. Suddividi i dati in training e test set

  3. Preparare l'input nel formato Keras richiesto

  4. Crea un modello RNN e addestralo

  5. Fare le previsioni sugli insiemi di allenamento e di prova e stampare l'errore quadratico medio della radice su entrambi gli insiemi

  6. Visualizza il risultato

Passaggio 1, 2: lettura dei dati e suddivisione in treno e test

La seguente funzione legge i dati del treno e del test da un determinato URL e lo divide in una determinata percentuale di dati del train e del test. Restituisce array unidimensionali per i dati di training e test dopo aver ridimensionato i dati tra 0 e 1 utilizzando MinMaxScalerda scikit-learn.

# il paramentro split_percent definisce il rapporto dei dati di addestramento (80%)
 
def get_train_test(url, split_percent=0.8):
 
df = read_csv(url, usecols=[1], engine='python')
 
data = np.array(df.values.astype('float32'))
 
scaler = MinMaxScaler(feature_range=(0, 1))
 
data = scaler.fit_transform(data).flatten()
 
n = len(data)
 
# dividiamo i dati in test e train
 
split = int(n*split_percent)
 
train_data = data[range(split)]
 
test_data = data[split:]
 
return train_data, test_data, data
 

 
sunspots_url = 'https://raw.githubusercontent.com/jbrownlee/Datasets/master/monthly-sunspots.csv'
 
train_data, test_data, data = get_train_test(sunspots_url)

Passaggio 3: rimodellamento dei dati per Keras

Il passaggio successivo consiste nel preparare i dati per l'addestramento del modello Keras. L'array di input dovrebbe avere la forma di: total_samples x time_steps x features.

Esistono molti modi per preparare i dati delle serie temporali per l'addestramento. Creeremo righe di input con passaggi temporali non sovrapposti. Un esempio per time_steps = 2 è mostrato nella figura seguente. Qui time_steps indica il numero di passaggi temporali precedenti da utilizzare per prevedere il valore successivo dei dati delle serie temporali.

Implementare le reti neurali ricorrenti (RNN) con Python e Keras

La funzione seguente get_XY() prende un array unidimensionale come input e lo converte negli array di input X e di destinazione richiesti Y . Useremo 12 time_stepsper il set di dati delle macchie solari poiché le macchie solari generalmente hanno un ciclo di 12 mesi. Puoi sperimentare altri valori di time_steps.

# Prepariamo i dati di input X e di Output Y
 
def get_XY(dat, time_steps):
 
# Target Y
 
Y_ind = np.arange(time_steps, len(dat), time_steps)
 
Y = dat[Y_ind]
 
# Variabili X
 
rows_x = len(Y)
 
X = dat[range(time_steps*rows_x)]
 
X = np.reshape(X, (rows_x, time_steps, 1))
 
return X, Y
 

 
time_steps = 12
 
trainX, trainY = get_XY(train_data, time_steps)
 
testX, testY = get_XY(test_data, time_steps)

Passaggio 4: crea modello RNN e addestriamolo

Per questo passaggio, possiamo riutilizzare la nostra create_RNN()funzione definita sopra.

model = create_RNN(hidden_units=3, dense_units=1, input_shape=(time_steps,1), activation=['tanh', 'tanh'])
 
model.fit(trainX, trainY, epochs=20, batch_size=1, verbose=2)

Passaggio 5: calcola e stampa l'errore quadratico medio della radice

La funzione print_error()calcola l'errore quadratico medio tra i valori effettivi e quelli previsti.

def print_error(trainY, testY, train_predict, test_predict):
 
# Calcoliamo l'errore nelle previsioni
 
train_rmse = math.sqrt(mean_squared_error(trainY, train_predict))
 
test_rmse = math.sqrt(mean_squared_error(testY, test_predict))
 
# stampiamo l'RMSE
 
print('Train RMSE: %.3f RMSE' % (train_rmse))
 
print('Test RMSE: %.3f RMSE' % (test_rmse))
 

 
# Facciamo la previsione
 
train_predict = model.predict(trainX)
 
test_predict = model.predict(testX)
 
# stampiamo il Mean square error
 
print_error(trainY, testY, train_predict, test_predict)
 

 
Output
 

 
Train RMSE: 0.058 RMSE
 
Test RMSE: 0.077 RMSE

Passaggio 6: visualizza il risultato

La seguente funzione traccia i valori target effettivi e il valore previsto. La linea rossa separa i punti dati di addestramento e test.
 

# Grafico del risultato
 
def plot_result(trainY, testY, train_predict, test_predict):
 
actual = np.append(trainY, testY)
 
predictions = np.append(train_predict, test_predict)
 
rows = len(actual)
 
plt.figure(figsize=(15, 6), dpi=80)
 
plt.plot(range(rows), actual)
 
plt.plot(range(rows), predictions)
 
plt.axvline(x=len(trainY), color='r')
 
plt.legend(['Actual', 'Predictions'])
 
plt.xlabel('Observation number after given time steps')
 
plt.ylabel('Sunspots scaled')
 
plt.title('Actual and Predicted Values. The Red Line Separates The Training And Test Examples')
 
plot_result(trainY, testY, train_predict, test_predict)

Viene generato il seguente grfico:

Implementare le reti neurali ricorrenti (RNN) con Python e Keras

Mettiamo tutto insieme in un unico file .py

Di seguito è riportato l'intero codice per questo tutorial. Prova questo alla tua fine e sperimenta diverse unità nascoste e passaggi temporali. Puoi aggiungere un secondo strato SimpleRNN alla rete e vedere come si comporta. Puoi anche utilizzare l' oggetto scaler per ridimensionare i dati al suo intervallo normale.

Qui sotto ti riportiamo il codice per copiarlo e incollarlo sul tuo editor

from pandas import read_csv
 
import numpy as np
 
from keras.models import Sequential
 
from keras.layers import Dense, SimpleRNN
 
from sklearn.preprocessing import MinMaxScaler
 
from sklearn.metrics import mean_squared_error
 
import math
 
import matplotlib.pyplot as plt
 

 
def get_train_test(url, split_percent=0.8):
 
df = read_csv(url, usecols=[1], engine='python')
 
data = np.array(df.values.astype('float32'))
 
scaler = MinMaxScaler(feature_range=(0, 1))
 
data = scaler.fit_transform(data).flatten()
 
n = len(data)
 
split = int(n*split_percent)
 
train_data = data[range(split)]
 
test_data = data[split:]
 
return train_data, test_data, data
 

 
def get_XY(dat, time_steps):
 
Y_ind = np.arange(time_steps, len(dat), time_steps)
 
Y = dat[Y_ind]
 
rows_x = len(Y)
 
X = dat[range(time_steps*rows_x)]
 
X = np.reshape(X, (rows_x, time_steps, 1))
 
return X, Y
 

 
def create_RNN(hidden_units, dense_units, input_shape, activation):
 
model = Sequential()
 
model.add(SimpleRNN(hidden_units, input_shape=input_shape, activation=activation[0]))
 
model.add(Dense(units=dense_units, activation=activation[1]))
 
model.compile(loss='mean_squared_error', optimizer='adam')
 
return model
 

 
def print_error(trainY, testY, train_predict, test_predict):
 
train_rmse = math.sqrt(mean_squared_error(trainY, train_predict))
 
test_rmse = math.sqrt(mean_squared_error(testY, test_predict))
 
print('Train RMSE: %.3f RMSE' % (train_rmse))
 
print('Test RMSE: %.3f RMSE' % (test_rmse))
 

 
def plot_result(trainY, testY, train_predict, test_predict):
 
actual = np.append(trainY, testY)
 
predictions = np.append(train_predict, test_predict)
 
rows = len(actual)
 
plt.figure(figsize=(15, 6), dpi=80)
 
plt.plot(range(rows), actual)
 
plt.plot(range(rows), predictions)
 
plt.axvline(x=len(trainY), color='r')
 
plt.legend(['Actual', 'Predictions'])
 
plt.xlabel('Observation number after given time steps')
 
plt.ylabel('Sunspots scaled')
 
plt.title('Actual and Predicted Values. The Red Line Separates The Training And Test Examples')
 

 
sunspots_url = 'https://raw.githubusercontent.com/jbrownlee/Datasets/master/monthly-sunspots.csv'
 
time_steps = 12
 
train_data, test_data, data = get_train_test(sunspots_url)
 
trainX, trainY = get_XY(train_data, time_steps)
 
testX, testY = get_XY(test_data, time_steps)
 

 
model = create_RNN(hidden_units=3, dense_units=1, input_shape=(time_steps,1),
 
activation=['tanh', 'tanh'])
 
model.fit(trainX, trainY, epochs=20, batch_size=1, verbose=2)
 

 
train_predict = model.predict(trainX)
 
test_predict = model.predict(testX)
 

 
print_error(trainY, testY, train_predict, test_predict)
 

 
plot_result(trainY, testY, train_predict, test_predict)

Riepilogo

In questo tutorial, hai scoperto le reti neurali ricorrenti e le loro varie architetture.

Nello specifico hai imparato:

  • La struttura delle RNN

  • Come l'RNN calcola un output dagli input precedenti

  • Come implementare un sistema end-to-end per la previsione di serie temporali utilizzando un RNN

Condividi l'articolo se lo hai trovato utile.