-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmainTest.py
More file actions
157 lines (140 loc) · 6.75 KB
/
mainTest.py
File metadata and controls
157 lines (140 loc) · 6.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import pyaudio
import numpy as np
import librosa
import sklearn
import peakutils
import time
import wave
import joblib
# -------------------------------------sia Training che Test------------------------------------------------------------
# flag per decidere modello: K = Kmeans, G = GMM
flagAlg = 'K'
# Setup
FORMAT = pyaudio.paInt16 # formato dati (opp. FORMAT = pyaudio.paInt16) #<--------------------------------!!!!!!
CHANNELS = 1 # numero canali
RATE = 44100 # 44100 # Sample Rate
CHUNK = 1024 # Block Size
RECORD_SECONDS = 5 # tempo registrazione in secondi
WAVE_OUTPUT_FILENAME = "output.wav" # nome con cui salverò il file audio
n_oggetti = 3
frames = [] # dichiaro var per frame
sgn = np.array([]) # var per segnale audio
dir = '.\\datasetRT\\' # con linux poi i separatori saranno diversi
# listo i file .wav contenuti in quella cartella in una list filelist
# tr_sub_dirs è una lista di nomi delle sottocartelle di training, solo una per ora
tr_sub_dirs = ["training\\"]
# carico il modello di ML addestrato e lo scaler per normalizzare le istanze di test
model_filename = 'modelKmeansRTAttack600Mano.sav' if flagAlg == 'K' else 'modelGmmRT.sav'
try:
model = joblib.load(model_filename) # carico il modello ML
scaler = joblib.load("scalerRT.sav")
print("Caricamento del modello salvato come ", model_filename) # stampo a video la conferma di aver caricato il modello
except Exception as e:
print("Errore nel trovare il file in cui è stato salvato il modello, chiamato: ", model_filename) # qualora il modello non venga caricato, stampo un messaggio di errore
print(e) # stampo messaggio di errore
# Startup - istanza pyaudio
audio = pyaudio.PyAudio()
print(audio.get_default_input_device_info())
print(audio.get_default_output_device_info())
# inizializzo le variabili globali che utilizzo all'interno della funzione di callback
f = 0 # flag per indicare quanti blocchi/chunk ancora dovrò prendere e per far sì che siano di 2048
ind = 0 # variabile che conterrà l'indice del picco rilevato e che mi consentirà di selezionare esattamente 2048 campioni
buff = np.zeros(600) # buffer che conterrà 2048 campioni audio, a partire dal picco rilevato, su cui calcolo le mfcc e predico un'etichetta
start_time = 0 # variabile per calcolare quanto tempo viene impiegato dal rilevamento del picco al rilascio del suono di output triggerato dall'etichettatura
prev_chunk = np.array([])
kick = wave.open('samples/action-kick.wav', 'rb')
kick_nframes = kick.getnframes()
kick_bytes = kick.readframes(kick_nframes)
hat = wave.open('samples/ec-hat002.wav', 'rb')
hat_nframes = hat.getnframes()
hat_bytes = hat.readframes(hat_nframes)
snare = wave.open('samples/ec-sn004.wav', 'rb')
snare_nframes = snare.getnframes()
snare_bytes = snare.readframes(snare_nframes)
step = 0
incr = 2048
zeros_bytes = bytes(2048)
prev_out = bytes(2048)
flagSample = 0
nframes = 0
def callback(in_data, frame_count, time_info, status):
'''
:param in_data: segnale audio in ingresso dal microfono
:param frame_count: numero di frame
:param time_info: informazioni sul tempo
:param status: stato - PaCallbackFlags
:return: dovrebbe essere (frame_count * channels * bytes-per-channel), paContinue= c'è ancora audio in ingresso
La funzione permette di prendere l'audio in input da microfono ed elaborarlo.
'''
# if status:
# print("Stato della callback: %i" % status)
# variabili globali presentate anche nelle precedenti righe di codice
global model, f, ind, buff, start_time, process, prev_chunk, step, incr, zeros_bytes, flagSample, prev_out, kick_nframes, hat_nframes, snare_nframes, nframes
out_data = bytes()
data_input = np.fromstring(in_data, dtype=np.int16) # prendo in ingresso i byte
data_input = data_input.astype(np.float32, order='C') / 32768.0 # converto audio a float32
data = np.nan_to_num(data_input) # controllo non vi siano nan
indici = peakutils.indexes(data, thres=0.2, min_dist=1024, thres_abs=True) # rilevo gli indici dei picchi
if indici.size != 0:
if indici[0] > 200:
buff = data[indici[0]-200:indici[0] + 400]
else:
prec = 200 - indici[0]
buff = np.concatenate((prev_chunk[-prec:], data[0:indici[0]+400]))
mfcc = np.mean(librosa.feature.mfcc(y=buff, sr=RATE, n_mfcc=13).T, axis=0) # su tale buffer, di 2048, calcolo le 13 mfcc
X_test = scaler.transform(list(mfcc.reshape(1, -1))) # normalizzo l'istanza di test
label = model.predict(X_test) # predico un'etichetta per il picco rilevato => buffer
#print("label: ", label) # stampo etichetta
#print("time: ", time.time() - start_time)
fn = kick if label == 0 else hat if label == 1 else snare # in base all'etichetta ottenuta seleziono il nome del file audio da caricare
if label == 0:
out_data = kick_bytes[step:incr]
step = incr
incr = incr + 2048
prev_out = kick_bytes[step:incr]
elif label == 1:
out_data = hat_bytes[step:incr]
step = incr
incr = incr + 2048
prev_out = hat_bytes[step:incr]
else:
out_data = snare_bytes[step:incr]
step = incr
incr = incr + 2048
prev_out = snare_bytes[step:incr]
prev_out = kick_bytes if label == 0 else hat_bytes if label == 1 else snare_bytes
nframes = kick_nframes if label == 0 else hat_nframes if label == 1 else snare_nframes
flagSample = 1
else: # se non ho rilevato indici e non devo continuare il campione audio
if flagSample == 1:
if incr < nframes:
out_data = prev_out[step:incr]
step = incr
incr = incr + 2048
else:
incr = nframes
out_data = prev_out[step:incr]
out_data = out_data + bytes(2048 - len(out_data))
step = 0
incr = 2048
flagSample = 0
else:
out_data = zeros_bytes
prev_chunk = data
return (out_data, pyaudio.paContinue) # restituisco dati audio e flag per dire di continuare a restare in ascolto
# inizio la registrazione dal mic
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, output=True, frames_per_buffer=CHUNK,
stream_callback=callback)
# inizia lo stream
stream.start_stream()
# stampo l'avviso che sto registrando
print("recording...")
# aspetta che lo stream termini (15)
while stream.is_active():
time.sleep(0.1)
# Fermo la registrazione
stream.stop_stream()
# Chiudo lo stream
stream.close()
# Chiudo istanza PyAudio
audio.terminate()