from pydub import AudioSegment
from scipy import signal
from operator import itemgetter
import pyaudio
import numpy as np
import utils
import os
import sys
import warnings
import operator
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
FORMAT = pyaudio.paInt16
'''
Number of audio channels in the recording
'''
CHANNELS = 2
'''
Original sample rate of the recordings
'''
SAMPLE_RATE = 44100
'''
Sampling rate (after downsampling)
'''
FS = 8000
'''
Factor by which the original signal will be downsampled
'''
DECIMATION_FACTOR = SAMPLE_RATE/FS
'''
Size of the FFT window, affects frequency granularity (we saw this in class!)
'''
WINDOW_SIZE = 1024
'''
Degree to which a fingerprint can be paired with its neighbors --
higher will cause more fingerprints, but potentially better accuracy.
'''
FAN_VALUE = 15
'''
Ratio by which each window overlaps the previous and next window --
higher will cause more fingerprints, but higher granularity of offset matching
'''
OVERLAP_RATIO = 0.5
path = os.getcwd()
warnings.filterwarnings("ignore", message="divide by zero encountered in log10")
warnings.filterwarnings("ignore", category=DeprecationWarning)
# Database with key=songname, value=[channel1, channel2]
SongDb = {}
#Goes through mp3s folder and adds each song to database
for filename in os.listdir(path + "/mp3s/"):
audiofile = AudioSegment.from_file(path + "/mp3s/" + filename)
data = np.fromstring(audiofile._data, np.int16)
channels = []
for chn in range(audiofile.channels):
channels.append(data[chn::audiofile.channels])
SongDb[filename[:-3]] = channels
print("Added to song database: " + str(filename[:-4]))
def Preprocess(channels):
channel1 = channels[0]
channel2 = channels[1]
channelmean = ((channel1 + channel2)/2 - np.mean(channel1 + channel2))
resampled = signal.decimate(channelmean, int(DECIMATION_FACTOR))
return resampled
# Database with key=songname, value=processed signal
ProcessedDb = {}
#Processes each song and adds it to ProcessedDb
#Prints table of number of samples in for each song
print('{0:65}{1:22}{2:20}\n'.format('Song Name', 'Original #Samples', 'Processed #Samples'))
for song, sig in SongDb.items():
processed = Preprocess(sig)
ProcessedDb[song] = processed
original_duration = len(sig[0])
processed_duration = len(processed)
print('{0:50}{1:32d}{2:20d}'.format(song, original_duration, processed_duration))
def getSpectrogram(signal):
spect = mlab.specgram(signal, NFFT = WINDOW_SIZE, Fs = FS, noverlap = OVERLAP_RATIO*WINDOW_SIZE)[0]
log_mag_spect = 10*np.log10(spect)
log_mag_spect[np.nonzero((log_mag_spect == np.inf) | (log_mag_spect == -np.inf))] = 0
return log_mag_spect
''' TODO '''
# Database with key=songname, value=spectrogram
Spectrograms = {}
for song_name, sig in ProcessedDb.items():
signals = getSpectrogram(sig)
Spectrograms[song_name] = signals
plt.title(song_name)
plt.imshow(Spectrograms[song_name])
plt.show()
# Gets the spectrogram for each song and adds it to the Spectrograms database
# Plots each spectrogram
''' TODO '''
# Database with key=songname, value=array of local peaks
Peaks = {}
for songs, spects in Spectrograms.items():
y, x = utils.get_2D_peaks(spects)[0:2]
Peaks[songs] = list(utils.get_2D_peaks(spects)[2])
plt.title(songs)
plt.plot(x,y,"*", markersize=1)
plt.imshow(spects)
plt.show()
# Gets the local peaks for each song and adds it to the Peaks database
# Plots the peaks over the original spectrogram
''' TODO '''
def getPairs(peaks):
#get as many pairs of peaks for each song
#each peak can only be in FAN_VALUE amounts of peaks
#keep track of each songs pairs (dict)
#pairs can only happen in a certain interval (time_difference)
#make a dictionary for each song
#for loop through each peak and if another peak is within interval
#use to search through peaks dict to find times for both signals
#add to the dictionary = (current_peak_freq, found_peak_freq, time diff)
#output list of pairs
#peaks = freq, time
pairs = []
num_pair_match = {} #checks if the peak hit max pairs
peaks = list(peaks)
for i in range(len(peaks)):
if i in num_pair_match: #check if pair counter already exist
pass #do nothing
else:
num_pair_match[i] = 0
if num_pair_match[i] < FAN_VALUE:
index_tracker = i+1
for peak in peaks[i+1:]:
t_diff = np.abs(peak[1] - peaks[i][1]) #what happens for the last signal
if (t_diff < 150) & (t_diff > 0): #global variable max time diff
pairs.append((peaks[i][0], peak[0] , t_diff)) #stores (current_peak_freq, found_peak_freq, time diff)
num_pair_match[i] += 1
num_pair_match[index_tracker] = 1
index_tracker += 1
return pairs
''' TODO '''
# Database with key=fingerprint (f1, f2, tdelta), value=songname
LookUpTable = {}
for song, peak in Peaks.items():
finger_p = getPairs(peak)
#print(finger_p)
for pair in finger_p:
LookUpTable[pair] = song #each print is added as term in dict
#print(LookUpTable)
# Get fingerprints for each song stores them in the LookUpTable database
# Prints a sample of the LookUpTable entries
# Database with key=songname, value=[channel1, channel2] for a snippet of the song
TestDb = {}
# Goes through test_mp3s folder and adds a snippet of each song to database
for filename in os.listdir("./test_mp3s/"):
audiofile = AudioSegment.from_file("./test_mp3s/" + filename)
data = np.fromstring(audiofile._data, np.int16)[SAMPLE_RATE*60:SAMPLE_RATE*75]
channels = []
for chn in range(audiofile.channels):
channels.append(data[chn::audiofile.channels])
TestDb[filename] = channels
print("Added to test database. : " + str(filename))
# Goes through test snippets and runs same fingerprinting process
# Prints out the number of matches for each song and confidence of prediction
for test in TestDb.keys():
print('\033[1mTesting: ' + test + '\033[0m \n')
Matches = {}
for song in SongDb.keys():
Matches[song] = 0
channels = TestDb[test]
preprocessed = Preprocess(channels)
spectrogram = getSpectrogram(preprocessed)
_, _, peaks = utils.get_2D_peaks(spectrogram)
pairs = getPairs(peaks)
for p in pairs:
match = LookUpTable.get(p, None)
if match:
Matches[match] += 1
prediction, count = max(Matches.items(), key=itemgetter(1))
for k,v in Matches.items():
if k == prediction:
print('\033[1m{0:50} ==> {1:10d} \033[0m'.format(k, v))
else:
print('{0:50} ==> {1:10d}'.format(k, v))
confidence = str(float(count)/sum(Matches.values())*100)[:5] + "%"
prediction = max(Matches.items(), key=itemgetter(1))
print(f'\nPrediction: {prediction[0]}')
print('\033[1m{0:10}: {1:10}\033[0m\n-----------------------------------------------------------------------\n\n'.format('Confidence', confidence))