from pydub import AudioSegment
from scipy import signal
from operator import itemgetter
import pyaudio
import numpy as np
import utils
import os
import sys
import warnings
import operator
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import math
import scipy
FORMAT = pyaudio.paInt16
'''
Number of audio channels in the recording
'''
CHANNELS = 2
'''
Original sample rate of the recordings
'''
SAMPLE_RATE = 44100
'''
Sampling rate (after downsampling)
'''
FS = 8000
'''
Factor by which the original signal will be downsampled
'''
DECIMATION_FACTOR = SAMPLE_RATE/FS
'''
Size of the FFT window, affects frequency granularity (we saw this in class!)
'''
WINDOW_SIZE = 1024
'''
Degree to which a fingerprint can be paired with its neighbors --
higher will cause more fingerprints, but potentially better accuracy.
'''
FAN_VALUE = 15
'''
Ratio by which each window overlaps the previous and next window --
higher will cause more fingerprints, but higher granularity of offset matching
'''
OVERLAP_RATIO = 0.5
path = os.getcwd()
warnings.filterwarnings("ignore", message="divide by zero encountered in log10")
warnings.filterwarnings("ignore", category=DeprecationWarning)
# Database with key=songname, value=[channel1, channel2]
SongDb = {}
#Goes through mp3s folder and adds each song to database
for filename in os.listdir(path + "/mp3s/"):
audiofile = AudioSegment.from_file(path + "/mp3s/" + filename)
data = np.fromstring(audiofile._data, np.int16)
channels = []
for chn in range(audiofile.channels):
channels.append(data[chn::audiofile.channels])
SongDb[filename[:-3]] = channels
print("Added to song database: " + str(filename[:-4]))
print(channels)
def Preprocess(channels):
channel1 = channels[0]
channel2 = channels[1]
channelmean = ((channel1 + channel2)/2 - np.mean(channel1 + channel2))
resampled = signal.decimate(channelmean, int(DECIMATION_FACTOR))
return resampled
# Database with key=songname, value=processed signal
ProcessedDb = {}
#Processes each song and adds it to ProcessedDb
#Prints table of number of samples in for each song
print('{0:65}{1:22}{2:20}\n'.format('Song Name', 'Original #Samples', 'Processed #Samples'))
for song, sig in SongDb.items():
processed = Preprocess(sig)
ProcessedDb[song] = processed
original_duration = len(sig[0])
processed_duration = len(processed)
print('{0:50}{1:32d}{2:20d}'.format(song, original_duration, processed_duration))
print(ProcessedDb)
def getSpectrogram(signal):
overlaps = WINDOW_SIZE*OVERLAP_RATIO #number of overlapped samples in each block
#spectral magintudes
spec, freq, t = mlab.specgram(signal, NFFT=WINDOW_SIZE, noverlap= overlaps, Fs = FS)
#take log of magnitudes and multiply magnitudes by 10
#spec = np.where(spec == 0, -0., spec)
log_spec = 10*np.log10(spec )
#substitute infinity values by 0
log_spec = np.where(log_spec == np.inf, 0, log_spec)
log_spec = np.where(log_spec == -np.inf, 0, log_spec)
log_spec = np.where(log_spec == np.nan, 0, log_spec)
return log_spec
''' TODO '''
# Database with key=songname, value=spectrogram
Spectrograms = {}
# Gets the spectrogram for each song and adds it to the Spectrograms database
for song in ProcessedDb.keys():
Spectrograms[song] = getSpectrogram(ProcessedDb[song])
# Plots each spectrogram
for song in ProcessedDb.keys():
plt.figure(figsize=(10,10))
plt.title(song)
plt.xlabel("Time")
plt.ylabel("Frequency")
plt.imshow(abs(Spectrograms[song]), cmap = 'Spectral')
''' TODO '''
# Database with key=songname, value=array of local peaks
Peaks = {}
# Gets the local peaks for each song and adds it to the Peaks database
for song in Spectrograms.keys():
Peaks[song] = list(utils.get_2D_peaks(Spectrograms[song])[2])
# Plots the peaks over the original spectrogram
plt.figure(figsize=(20,2))
plt.ylabel("Frequency")
plt.title(song)
plt.xlabel("Time")
#superimpose peaks to spectrum of the song
implot = plt.imshow(abs(Spectrograms[song]), cmap = 'Greens')
plt.scatter(utils.get_2D_peaks(Spectrograms[song])[1], utils.get_2D_peaks(Spectrograms[song])[0], c='green', s=0.4)
plt.show()
''' TODO '''
def getPairs(peaks):
#The second peak must occur within a certain time interval after the first peak
times = {p:[] for p in peaks} #store full set of frequency and time differences for all peaks
peak_pairs = [] #store pairs of peaks
for peak in peaks:
for peak_comparison in peaks: #compare all pairs of peaks
time_diff = abs(peak[1] - peak_comparison[1]) #time difference
if (time_diff <= 150) and (peak_comparison != peak): #not include time differences greater than 150 samples
times[peak].append((peak[0], peak_comparison[0], time_diff)) #tuple (freq. peak 1, freq. peak 2, abs time difference)
#Each peak can only be part of up to FAN_VALUE pairs
times[peak].sort(key = lambda x: x[2]) #sort time differences
times[peak] = times[peak][FAN_VALUE:] #keep only 15 pairs of peaks with smallest time difference
#for any given peak, retrieve the 15 peaks with smallest time difference from that peak
#get tuples (peak 1, peak 2) pairs
if len(times[peak]) > 0:
for second_peak_in_pair in times[peak]:
peak_pairs.append(second_peak_in_pair)
return peak_pairs
''' TODO '''
# Database with key=fingerprint (f1, f2, tdelta), value=songname
LookUpTable = {}
# Get fingerprints for each song stores them in the LookUpTable database
# Prints a sample of the LookUpTable entries
for song in Spectrograms.keys():
for pair in getPairs(Peaks[song]):
LookUpTable[pair] = song
print(LookUpTable[(34, 45, 140)]) #sanity check
# Database with key=songname, value=[channel1, channel2] for a snippet of the song
TestDb = {}
# Goes through test_mp3s folder and adds a snippet of each song to database
for filename in os.listdir("./test_mp3s/"):
audiofile = AudioSegment.from_file("./test_mp3s/" + filename)
data = np.fromstring(audiofile._data, np.int16)[SAMPLE_RATE*60:SAMPLE_RATE*75]
channels = []
for chn in range(audiofile.channels):
channels.append(data[chn::audiofile.channels])
TestDb[filename] = channels
print("Added to test database. : " + str(filename))
# Goes through test snippets and runs same fingerprinting process
# Prints out the number of matches for each song and confidence of prediction
for test in TestDb.keys():
print('\033[1mTesting: ' + test + '\033[0m \n')
Matches = {}
for song in SongDb.keys():
Matches[song] = 0
channels = TestDb[test]
preprocessed = Preprocess(channels)
spectrogram = getSpectrogram(preprocessed)
freq, time, peaks = utils.get_2D_peaks(spectrogram)
pairs = getPairs(list(peaks))
for p in pairs:
match = LookUpTable.get(p, None)
if match:
Matches[match] += 1
prediction, count = max(Matches.items(), key=itemgetter(1))
for k,v in Matches.items():
if k == prediction:
print('\033[1m{0:50} ==> {1:10d} \033[0m'.format(k, v))
else:
print('{0:50} ==> {1:10d}'.format(k, v))
confidence = str(float(count)/sum(Matches.values())*100)[:5] + "%"
prediction = max(Matches.items(), key=itemgetter(1))
print(f'\nPrediction: {prediction[0]}')
print('\033[1m{0:10}: {1:10}\033[0m\n-----------------------------------------------------------------------\n\n'.format('Confidence', confidence))