# -*- coding: utf-8 -*- """ Preprocessing: ICA - Independent Component Analysis Features: PSD - Power Spectral Density PSD Mean PSD Sum Spectogram Valence and Arusal calculation AVG Power """ #%% #%% Preprocessing Steps """ -------------------------- Preprocess the signals ------------------------------------""" # # ICA preprocessing # def preprocess_epochs(EEG): data2d = np.zeros((EEG.shape[1], EEG.shape[2])) data3d = np.zeros((EEG.shape[0], EEG.shape[1], EEG.shape[2])) print (data2d.shape) #ICA (EoG) artifact removal ica = FastICA() for index in range (EEG.shape[0]): data2d = EEG[index,:,:] print (data2d.shape) S_ = ica.fit_transform(data2d) print (S_.shape) # Reconstruct signals A_ = ica.mixing_ A_=A_.T data3d[index, :, :] = A_ return data3d def preprocess(EEG): #ICA (EoG) artifact removal ica = FastICA() S_ = ica.fit_transform(EEG) print (S_.shape) # Reconstruct signals A_ = ica.mixing_ A_=A_.T # Remove baseline of the signals x,y = np.shape(A_) A = np.zeros((x,y)) for i in range(0,x): base = np.mean(A_,axis=1) A[i,:] = A_[i,:]-base[i] return A """ -------------------------- Features calculations ------------------------------------""" # # Extract spectral features from an EEG data file # def extract_PSD(EEG1): #(1, 14, 512) #Calculate PSD print("Power Specterum Dinsity") print ("eeg shap: %s" % str(EEG1.shape)) win = signal.get_window('hann', int(1 * 128)) # perform PSD function -- Modified 4 intead of 1 in (win and welch) #https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.welch.html freqs, psds = signal.welch(x=EEG1, fs=128.0, window = win, nperseg=1*128, nfft=None, detrend='constant', return_onesided=True, scaling='density', axis=-1) print("psds %s" % str(psds.shape)) #print(psds[0]) print("psd freqs %s" % str(freqs.shape)) #print(freqs) #calculate PSD sum psdsSum = np.sum(psds, axis=1) print("psds sum %s" % str(psdsSum.shape)) #calculate PSD average (mean) psdMean = np.mean(psds, axis=1) print("psds mean %s" % str(psdMean.shape)) #Return Results return psds, freqs, psdsSum, psdMean # # Extract avg power features from an EEG data # def extract_avgpower(eeg_rad): #Perform time-frequency transform freqs = np.logspace(*np.log10([1, 64]), num=4) n_cycles = freqs / 2 power = tfr_array_morlet(epoch_data=eeg_rad.reshape(1,nchannels,128), sfreq=128, freqs=freqs, use_fft=True, n_cycles=n_cycles, output="power", decim=1, n_jobs=1, zero_mean=False, verbose=None) #calculate AVG Power per event avgpower1 = np.mean(power, axis=2) print ("power %s" % str(power.shape)) print ("avgpower1 %s" % str(avgpower1.shape)) return avgpower1 # # Extract spectrogram hanning features from an EEG data # def spectrogram_hanning(eeg): win = signal.get_window('hann', int(128.0)) freqs, segTimes, Specs = signal.spectrogram(eeg, 128.0, window=win, nperseg=128.0, noverlap=128.0 / 2) print ("hann Specs %s" % str(Specs.shape)) print ("hann freqs %s" %str(freqs.shape)) print ("hann segTimes %s" %str(segTimes.shape)) #spec_feat = np.sum(np.sum(Specs, axis=2), axis=0) spec_feat = np.sum(Specs, axis=0) print ("spec_feat %s" % str(spec_feat.shape)) return spec_feat # # Extract valnce and arausal features from an EEG data # def compute_valence_arousal(avgpower1): #Scaling avgpower1 = ((avgpower1 - np.min(avgpower1)) * (9 - 1)) / (np.max(avgpower1) - np.min(avgpower1)) + 1 # compute averaged powers # AF3, F3 7-15 Hz left_alpha = np.mean(avgpower1[:, [0, 2], 7:14], axis=(1, 2)) # AF3, F3, 16-31 Hz left_beta = np.mean(avgpower1[:, [0, 2], 15:30], axis=(1, 2)) # AF4, F4, 7-15 Hz right_alpha = np.mean(avgpower1[:, [5, 3], 7:14], axis=(1, 2)) # AF4, F4, 16-31 Hz right_beta = np.mean(avgpower1[:, [5, 3], 15:30], axis=(1, 2)) #M AF4, F4- AF3, F3 # Fz, 7-15 Hz front_alpha = np.mean(avgpower1[:, [3,5], 7:14], axis=(1, 2)) - np.mean(avgpower1[:, [0,2], 7:14], axis=(1, 2)) ##M AF4, F4- AF3, F3 #Fz, 16-31 Hz front_beta = np.mean(avgpower1[:, [3,5], 15:30], axis=(1, 2)) - np.mean(avgpower1[:, [0,2], 15:30], axis=(1, 2)) # F4, 7-15 Hz alpha_F4 = np.mean(avgpower1[:, [3], 7:14], axis=(1, 2)) # F3, 7-15 Hz alpha_F3 = np.mean(avgpower1[:, [2], 7:14], axis=(1, 2)) # all, 7-15 Hz alpha_all = np.mean(avgpower1[:, [0,1, 2, 3, 4,5,6,7,8,9], 7:14], axis=(1, 2)) # F4, 16-31 Hz beta_F4 = np.mean(avgpower1[:, [3], 15:30], axis=(1, 2)) # F3, 16-31 Hz beta_F3 = np.mean(avgpower1[:, [2], 15:30], axis=(1, 2)) # all, 16-31 Hz beta_all = np.mean(avgpower1[:, [0,1, 2, 3, 4,5,6,7,8,9], 15:30], axis=(1, 2)) vamv_valence = left_beta / left_alpha - right_beta / right_alpha # log scale will make ratios below 1.0 negative and above 1.0 positive vamv_arousal = np.log2(front_beta / front_alpha) kirk_valence = np.log(left_alpha) - np.log(right_alpha) kirk_arousal = -(np.log(right_alpha) + np.log(left_alpha)) ram12_valence = alpha_F4 / beta_F4 - alpha_F3 / beta_F3 ram12_arousal = alpha_all / beta_all ram15_valence = alpha_F4 - beta_F3 ram15_arousal = beta_all / alpha_all # stack the different predictions to add them to features valence = np.hstack((vamv_valence, kirk_valence, ram12_valence, ram15_valence)) arousal = np.hstack((vamv_arousal, kirk_arousal, ram12_arousal, ram15_arousal)) return valence, arousal # # Extract valnce and arausal features from DWT # def get_v_a_DWT(cof): #['AF3','F7','F3', 'F4','F8','AF4','FC6','FC5', 'P7','P8'] = key #get_v_a_DWT #Scaling #cA4, cD4, cD3 , cD2, cD1 = band # delta,theta, alpha,beta, gamma = band #scale from sklearn.preprocessing import normalize scalar = MinMaxScaler() scal = [] for channel in range(10): for band in range(5): scal.append(cof[channel][band].ravel()) a = np.concatenate( scal, axis=0 ) scalar.fit(a[:,np.newaxis]) for channel in range(10): for band in range(5): cof[channel][band] =scalar.transform(cof[channel][band][:,np.newaxis]).ravel() # compute averaged powers # AF3, F3, 7-15 Hz s = [] for channel in [0,2]: #AF3, F3 for band in [2]: s.append(np.mean(cof[channel][band])) left_alpha = np.mean(np.array(s)) # AF3, F3, 16-31 Hz s = [] for channel in [0,2]: #AF3, F3 for band in [3]: s.append(np.mean(cof[channel][band])) left_beta = np.mean(np.array(s)) # AF4, F4, 7-15 Hz s = [] for channel in [3,5]: #AF3, F3 for band in [2]: s.append(np.mean(cof[channel][band])) right_alpha = np.mean(np.array(s)) # AF4, F4, 16-31 Hz s = [] for channel in [3,5]: #AF3, F3 for band in [3]: s.append(np.mean(cof[channel][band])) right_beta =np.mean(np.array(s)) # F4, 7-15 Hz s = [] for channel in [3]: #AF3, F3 for band in [2]: s.append(np.mean(cof[channel][band])) alpha_F4 = np.array(s) # F3, 7-15 Hz s = [] for channel in [2]: #AF3, F3 for band in [2]: s.append(np.mean(cof[channel][band])) alpha_F3 = np.array(s) # AF3, F3, AF4, F4, 7-15 Hz s = [] for channel in [0,2,3,5]: #AF3, F3 for band in [2]: s.append(np.mean(cof[channel][band])) alpha_all = np.mean(np.array(s)) # F4, 16-31 Hz s = [] for channel in [3]: #AF3, F3 for band in [3]: s.append(np.mean(cof[channel][band])) beta_F4 = np.array(s) # F3, 16-31 Hz s = [] for channel in [2]: #AF3, F3 for band in [3]: s.append(np.mean(cof[channel][band])) beta_F3 = np.array(s) # AF3, F3, AF4, F4, 16-31 Hz s = [] for channel in [0,2,3,5]: #AF3, F3 for band in [3]: s.append(np.mean(cof[channel][band])) beta_all = np.mean(np.array(s)) # AF4, F4 - AF3, F3 # Fz, 7-15 Hz s = [] for channel in [0,2,3,5]: #AF3, F3 for band in [2]: s.append(np.mean(cof[channel][band])) front_alpha =np.mean(np.array((s[2]+s[3]) - (s[0]+s[1]))) ##AF3, F3, AF4, F4 #Fz, 16-31 Hz s = [] for channel in [0,2,3,5]: #AF3, F3 for band in [3]: s.append(np.mean(cof[channel][band])) front_beta = np.mean(np.array((s[2]+s[3]) - (s[0]+s[1]))) vamv_valence = (left_beta / left_alpha - right_beta / right_alpha) # log scale will make ratios below 1.0 negative and above 1.0 positive vamv_arousal = np.log2(abs(front_beta/front_alpha)) print (front_beta) print (front_alpha) print (vamv_arousal) kirk_valence = ( np.log(left_alpha) - np.log(right_alpha)) kirk_arousal = -(np.log(right_alpha) + np.log(left_alpha)) ram12_valence = alpha_F4 / beta_F4 - alpha_F3 / beta_F3 ram12_arousal = alpha_all / beta_all ram15_valence = alpha_F4 - beta_F3 ram15_arousal = beta_all / alpha_all # stack the different predictions to add them to features valence = np.hstack((vamv_valence, kirk_valence, ram12_valence, ram15_valence)) arousal = np.hstack((vamv_arousal, kirk_arousal, ram12_arousal, ram15_arousal)) #scale #valence = ((valence - np.min(valence)) * (9 - 1)) / (np.max(valence) - np.min(valence)) + 1 #arousal = ((arousal - np.min(arousal)) * (9 - 1)) / (np.max(arousal) - np.min(arousal)) + 1 return valence, arousal from collections import Counter import scipy def calculate_entropy(list_values): counter_values = Counter(list_values).most_common() probabilities = [elem[1]/len(list_values) for elem in counter_values] entropy=scipy.stats.entropy(probabilities) return entropy def calculate_statistics(list_values): n5 = np.nanpercentile(list_values, 5) n25 = np.nanpercentile(list_values, 25) n75 = np.nanpercentile(list_values, 75) n95 = np.nanpercentile(list_values, 95) median = np.nanpercentile(list_values, 50) mean = np.nanmean(list_values) std = np.nanstd(list_values) var = np.nanvar(list_values) rms = np.nanmean(np.sqrt(list_values**2)) return [n5, n25, n75, n95, median, mean, std, var, rms] def calculate_crossings(list_values): zero_crossing_indices = np.nonzero(np.diff(np.array(list_values) > 0))[0] no_zero_crossings = len(zero_crossing_indices) mean_crossing_indices = np.nonzero(np.diff(np.array(list_values) > np.nanmean(list_values)))[0] no_mean_crossings = len(mean_crossing_indices) return [no_zero_crossings, no_mean_crossings] def get_features(list_values): entropy = calculate_entropy(list_values) crossings = calculate_crossings(list_values) statistics = calculate_statistics(list_values) return [entropy] + crossings + statistics import pywt # # decompose 98 signal in 14 channel into 7 level def get_DWT_features(dataset, waveletname): # (:,128,9)(no of signals, length of signal, no of components uci_har_features = [] features = [] cof = {} for signal_comp in range(0,dataset.shape[1]): # number of channel signal = dataset[:,signal_comp] list_coeff = pywt.wavedec(signal, waveletname) #https://pywavelets.readthedocs.io/en/latest/ref/dwt-discrete-wavelet-transform.html#multilevel-decomposition-using-wavedec cof.update({signal_comp: list_coeff}) for coeff in list_coeff: features += get_features(coeff) uci_har_features.append(features) X = np.array(uci_har_features) return X , cof