!pip install siml
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import seaborn as sns
from scipy.fftpack import fft
from scipy.stats import kstest as ks
from siml.sk_utils import *
from siml.signal_analysis_utils import *
from collections import defaultdict, Counter
from scipy.signal import chirp, spectrogram, stft
from functools import reduce
import heapq as hp
import scipy.signal as signal
df=pd.read_csv(r'sinaisExemplo.csv',delimiter=";")
br2=pd.read_csv(r'BR23_5118_TB23_P2.csv', delimiter=';')
br3=pd.read_csv(r'BR24_5120_TB24_P3.csv', delimiter=';')
df
br2
br3
def plot_signal(signal_data, x, xlabel, ylabel):
ax = sns.lineplot(y=signal_data, x=x)
ax.set(xlabel=xlabel, ylabel=ylabel)
plt.show()
plot_signal(df['Channel1WaveformData_1'], df['Unnamed: 0'], 'Time (s)', 'Velocity (mm/s)')
plot_signal(br2['Channel: 1'], br2['Time (ms)'], 'Time (ms)', 'Velocity (mm/s)')
def get_fft_values(y_values, T, N, f_s):
f_values = np.linspace(0.0, 1.0/(2.0*T), N//2)
fft_values_ = fft(y_values)
fft_values = 2.0/N * np.abs(fft_values_[0:N//2])
return f_values, fft_values
def get_and_plot_fft(x, y, plot=False):
signal_y = y.to_numpy()
signal_x = x.to_numpy()
max_x = signal_x[-1]
if (max_x > 1000):
max_x = max_x/1000
t_n = max_x # sampling rate
N = len(signal_y) # sample size
T = t_n / N #sampling time
f_s = 1/T #sampling freq
x_value = signal_x
composite_y_value = signal_y
t_n = 10
N = 1000
T = t_n / N
f_s = 1/T
f_values, fft_values = get_fft_values(composite_y_value, T, N, f_s)
if plot:
plt.plot(f_values, fft_values, linestyle='-', color='blue')
plt.xlabel('Frequency [Hz]', fontsize=16)
plt.ylabel('Amplitude', fontsize=16)
plt.title("Frequency domain of the signal", fontsize=22)
plt.grid()
plt.show()
return (f_values, fft_values)
def get_n_highest(freq, n=1):
h = []
for i in range(len(freq)):
if (len(h) == n):
if(h[0][0] < freq[i]):
hp.heappop(h)
hp.heappush(h, (freq[i], i))
else:
hp.heappush(h, (freq[i], i))
return h
plot_signal(df['Channel1WaveformData_1'], df['Unnamed: 0'], 'Time (s)', 'Velocity (mm/s)')
t, v = get_and_plot_fft(df['Unnamed: 0'], df['Channel1WaveformData_1'], plot=True)
dominant_frequency = get_n_highest(v)
def plot_stft(x, y, plot=False):
signal_y = y.to_numpy()
signal_x = x.to_numpy()
max_x = round(signal_x[-1])
if (max_x > 1000):
max_x = round(max_x/1000)
fs = round(len(signal_x)/max_x)
t_final = max_x
t = np.linspace(1, t_final, fs*t_final)
f, t, Zxx = stft(signal_y, fs, nperseg=256)
if plot:
plt.figure(figsize=(30, 10))
plt.pcolormesh(t, f, np.abs(Zxx), shading='gouraud')
plt.title('STFT Magnitude', fontsize=22)
plt.ylabel('Frequency [Hz]')
plt.xlabel('Time [sec]')
plt.axis([0, max_x, 0, 300])
plt.show()
df_f = pd.DataFrame(f,columns = ['freq']).reset_index()
matriz = pd.DataFrame(np.abs(Zxx))
df_matriz = pd.DataFrame(pd.DataFrame(np.abs(matriz)).idxmax(),columns=['idx'])\
.reset_index(drop=True)\
.merge(df_f,left_on = 'idx',right_on = 'index')
ln_matriz = np.log(df_matriz.loc[df_matriz['freq']!=0])['freq']
mean = ln_matriz.mean()
var = ln_matriz.std()**2
if plot:
df_matriz.freq.hist()
mean = ln_matriz.mean()
var = ln_matriz.std()**2
return mean,var
plot_signal(df['Channel1WaveformData_1'], df['Unnamed: 0'], 'Time (s)', 'Velocity (mm/s)')
t, v = get_and_plot_fft(df['Unnamed: 0'], df['Channel1WaveformData_1'], plot=True)
dominant_frequency = get_n_highest(v)
mean,var = plot_stft(df['Unnamed: 0'], df['Channel1WaveformData_1'], plot=True)
mean,var = plot_stft(br2['Time (ms)'], br2['Channel: 1'], plot=True)
mean,var = plot_stft(br3['Time (ms)'], br3['Channel: 3'], plot=True)
# Filtering a noisy signal.
def pass_band_filter(signal_x, signal_y, highcut, plot=False):
max_x = signal_x.iloc[-1] #final time
if (max_x > 1000):
max_x = round(max_x/1000)
fs = round(len(signal_x)/max_x)
order = 9
x= signal_y.to_numpy()
a = 0.8
nsamples = len(signal_x.to_numpy())
t = np.linspace(0, max_x, nsamples, endpoint=True) #time array
# and now let's filter it
sys = signal.butter(order,[highcut], btype='lowpass', fs=fs, output='sos')
y = signal.sosfilt(sys, x) # compute the output signal
if plot:
plt.figure()
plt.clf()
plt.plot(t, y, label='Filtered signal')
plt.xlabel('Time (seconds)')
plt.hlines([-a, a], 0, max_x, linestyles='--')
plt.grid(True)
plt.axis('tight')
plt.legend(loc='upper left')
plt.show()
return y
highcut = 20
plot_signal(df['Channel1WaveformData_1'], df['Unnamed: 0'], 'Time (s)', 'Velocity (mm/s)')
pass_band_filter(df['Unnamed: 0'], df['Channel1WaveformData_1'], highcut,plot = True)
plot_signal(br2['Channel: 1'],br2['Time (ms)'], 'Time (ms)', 'Velocity (mm/s)')
pass_band_filter(br2['Time (ms)'], br2['Channel: 1'], highcut,plot = True)
plot_signal(br3['Channel: 2'],br3['Time (ms)'], 'Time (ms)', 'Velocity (mm/s)')
pass_band_filter(br3['Time (ms)'], br3['Channel: 2'], highcut,plot = True)
plot_signal(br3['Channel: 3'],br3['Time (ms)'], 'Time (ms)', 'Velocity (mm/s)')
pass_band_filter(br3['Time (ms)'], br3['Channel: 3'], highcut,plot = True)
plot_signal(df['Channel1WaveformData_5'],df['Unnamed: 0'], 'Time (s)', 'Velocity (mm/s)')
pass_band_filter(df['Unnamed: 0'], df['Channel1WaveformData_5'],highcut,plot = True)
dfFinal = df.iloc[:,:5]
dfFinal.rename(columns=
{'Unnamed: 0': 'time',
'Channel1WaveformData_1':'wave1',
'Channel2WaveformData_1':'wave2',
'Channel3WaveformData_1':'wave3',
'Channel1WaveformData_2':'wave4'}
,inplace = True
)
br2.rename(
columns={
'Time (ms)':'time',
'Channel: 1':'wave1',
'Channel: 2':'wave2',
'Channel: 3':'wave3'
},inplace = True
)
br3.rename(
columns={
'Time (ms)':'time',
'Channel: 1':'wave4',
'Channel: 2':'wave5',
'Channel: 3':'wave6'
},inplace = True
)
br = br2.merge(br3,on = 'time')
for i in range(len(dfFinal.columns)-1):
dfFinal.iloc[:,i+1]= pass_band_filter(dfFinal.iloc[:,0], dfFinal.iloc[:,i+1],highcut)
for i in range(len(br.columns)-1):
br.iloc[:,i+1]= pass_band_filter(br.iloc[:,0], br.iloc[:,i+1],highcut)
#df
featuresNames = ['Amplitude','Time','Frequency_domain','Mean','Variance']
features_row = []
for i in range(len(dfFinal.columns)-1):
#Amplitude
wave = dfFinal.iloc[:,i+1]
amplitude = wave.max()
#Time
mom = dfFinal.iloc[dfFinal.iloc[:,i+1].idxmax(),0]
#Frequency domain
t, v = get_and_plot_fft(dfFinal['time'], dfFinal.iloc[:,i+1], plot=False)
highest = get_n_highest(v)
dominant_frequency= t[highest[0][1]]
#Mean and variance
mean,var = plot_stft(dfFinal['time'], dfFinal.iloc[:,i+1], plot=False)
features_row = features_row + [[amplitude,mom,dominant_frequency,mean,var]]
#--------------------------------------------------------
#br - Underwater
for i in range(len(br.columns)-1):
#Amplitude
wave = br.iloc[:,i+1]
amplitude = wave.max()
#Time
mom = br.iloc[br.iloc[:,i+1].idxmax(),0]/1000
#Frequency domain
t, v = get_and_plot_fft(br['time'], br.iloc[:,i+1], plot=False)
highest = get_n_highest(v)
dominant_frequency= t[highest[0][1]]
#Mean and variance
mean,var = plot_stft(br['time'], br.iloc[:,i+1], plot=False)
features_row = features_row + [[amplitude,mom,dominant_frequency,mean,var]]
dfFeatures = pd.DataFrame(features_row,columns=featuresNames)
dfFeatures