# Utils
import re
import json
import yaml
import pprint as pp
import importlib
import urllib
import time
from thefuzz import fuzz
from thefuzz import process
import textwrap as tw
# Data
import numpy as np
import pandas as pd
pd.options.display.max_columns = None
# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import vapeplot
import dufte
plt.style.use(dufte.style)
vapeplot.set_palette('jazzcup')
palette = vapeplot.palette("jazzcup")
# Spotify
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from spotify_functions import offset_api_limit
# Genius
import genius_functions as genius
importlib.reload(genius)
# Scraping
import requests
import urllib.request
import urllib.parse
from bs4 import BeautifulSoup
# Models
from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score, silhouette_samples
from sklearn.preprocessing import MinMaxScaler
from pysentimiento import create_analyzer
with open("spotify/spotify_details.yml", 'r') as stream:
spotify_details = yaml.safe_load(stream)
# https://developer.spotify.com/web-api/using-scopes/
scope = "user-library-read user-follow-read user-top-read playlist-read-private"
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(
client_id=spotify_details['SPOTIPY_CLIENT_ID'],
client_secret=spotify_details['SPOTIPY_CLIENT_SECRET'],
redirect_uri=spotify_details['SPOTIPY_REDIRECT_URI'],
scope=scope,
))
badbunny_uri = 'spotify:artist:4q3ewBCX7sLwd24euuV69X'
artist = sp.artist(badbunny_uri)
albums = offset_api_limit(sp, sp.artist_albums(artist_id=badbunny_uri,
album_type=['album', 'single'],
country=None,
limit=20))
pp.pprint(artist)
df_albums = pd.DataFrame(columns=['album_name', 'artists', 'total_tracks',
'album_url', 'cover_url', 'album_uri'])
for i, album in enumerate(albums['items']):
df_albums.loc[i] = {
'album_name': album['name'],
'artists': ', '.join(art['name'] for art in album['artists']),
'total_tracks': album['total_tracks'],
'album_url': album['external_urls']['spotify'],
'cover_url': album['name'],
'album_uri': album['uri']
}
df_albums
df_tracks = pd.DataFrame(columns=['song_name', 'song_artists', 'song_duration',
'song_is_explicit', 'song_url',
'song_preview_url', 'song_uri',
'album_uri'])
i = 0
for idx, row in df_albums.iterrows():
album_uri = row['album_uri']
album_tracks = offset_api_limit(
sp, sp.album_tracks(album_id=album_uri,
limit=50, offset=0, market=None)
)
for track in album_tracks:
df_tracks.loc[i] = {
'song_name': track['name'],
'song_artists': ', '.join(art['name'] for art in track['artists']),
'song_duration': (track['duration_ms']/1000)/60,
'song_is_explicit': track['explicit'],
'song_url': track['external_urls']['spotify'],
'song_preview_url': track['preview_url'],
'song_uri': track['uri'],
'album_uri': album_uri
}
i += 1
df_tracks['audio_features'] = sp.audio_features(list(df_tracks['song_uri'].values))
df_tracks = df_tracks.drop('audio_features', axis=1).assign(**df_tracks['audio_features'].apply(pd.Series))
drop_cols = ['type', 'id', 'uri', 'track_href', 'analysis_url',
'duration_ms', 'time_signature']
df_tracks.drop(columns=drop_cols, inplace=True)
df_audio = pd.merge(df_albums, df_tracks, how='left', on='album_uri')
df_audio.sample(5)
artist_name = 'Bad Bunny'
results = genius.search_artist(artist_name)
pp.pprint(results[2]['result']['primary_artist'])
artist_id = results[2]['result']['primary_artist']['id']
artist_data = genius.get_artist(artist_id)
%%time
songs_ids = genius.get_songs_ids(artist_id)
len(songs_ids)
df_audio.sample(5)
def get_genius_id(song_name, songs_ids):
song_distances = []
for genius_name, genius_id in songs_ids:
distance = fuzz.ratio(song_name, genius_name)
song_distances.append((genius_name, genius_id, distance))
song_distances = sorted(song_distances,
key=lambda x: x[2],
reverse=True)
id_most_similar = song_distances[0][1]
print(f'{song_name} -> {song_distances[0][0]}')
return id_most_similar
df_audio['genius_id'] = df_audio['song_name'].apply(get_genius_id,
songs_ids=songs_ids)
df_audio['lyrics'] = np.nan
%%time
for i in df_audio.index:
if pd.isna(df_audio.at[i, 'lyrics']):
df_audio.at[i, 'lyrics'] = genius.retrieve_lyrics(
df_audio.at[i, 'genius_id'])
else:
continue
df_audio.sample(3)
df_audio.isna().sum()
df_audio.to_csv('data/df_songs.csv', index=False)
df_songs = pd.read_csv('data/df_songs.csv')
df_songs.info()
df_songs.sample(5)
print(df_songs.iloc[11]['lyrics'])
regex_bracket = re.compile(r'\[.*?\]')
regex_newline = re.compile(r'\n')
regex_word = re.compile(r'\u2005')
regex_word2 = re.compile(r'\u205f')
lyrics_clean = (
df_songs.lyrics
.str.replace(r'\n', '', regex=True)
.str.replace(r'\[.*?\]', '', regex=True)
.str.replace(r'\u2005', ' ', regex=True)
.str.replace(r'\u205f', ' ', regex=True)
.str.replace(r'[(),-]', ' ', regex=True)
.str.replace(r'\s+', ' ', regex=True)
.str.strip()
.str.lower()
)
df_songs['lyrics_clean'] = lyrics_clean
print(tw.fill(lyrics_clean.iloc[11], 90))
embedding_model = SentenceTransformer('hiiamsid/sentence_similarity_spanish_es')
embeddings = embedding_model.encode(list(lyrics_clean))
embeddings = embedding_model.encode(list(lyrics_clean))
print(embeddings.shape)
pca = PCA(n_components=2)
pca_embeddings = pca.fit_transform(embeddings)
sentiment_analyzer = create_analyzer(task="sentiment", lang="es")
emotion_analyzer = create_analyzer(task="emotion", lang="es")
happy_song = df_songs.iloc[90]['lyrics']
sad_song = df_songs.iloc[20]['lyrics']
print(
'Estamos bien (happy song):',
sentiment_analyzer.predict(happy_song),
emotion_analyzer.predict(happy_song),
sep='\n',
end='\n\n'
)
print(
'Un veranto sin ti (sad song):',
sentiment_analyzer.predict(sad_song),
emotion_analyzer.predict(sad_song),
sep='\n'
)
df_lyrics = pd.DataFrame(data=pca_embeddings, columns=['component1', 'component2'])
df_lyrics['is_explicit'] = df_songs.song_is_explicit
%%time
for idx in df_lyrics.index:
if idx % 5 == 0: print(idx)
lyric = df_songs.at[idx, 'lyrics']
sentiment = sentiment_analyzer.predict(lyric)
emotion = emotion_analyzer.predict(lyric)
df_lyrics.at[idx, 'sentiment'] = sentiment.probas['POS']
df_lyrics.at[idx, 'emotion'] = sorted(
[emo for emo in emotion.probas.keys() if emo != 'others'],
key=lambda x: emotion.probas[x],
reverse=True
)[0]
df_lyrics.is_explicit = df_lyrics.is_explicit.astype(np.int32)
df_lyrics_pre = pd.concat([df_lyrics.drop('emotion', axis=1),
pd.get_dummies(df_lyrics.emotion)],
axis=1)
df_lyrics_pre
scaler = MinMaxScaler()
x_scaled = scaler.fit_transform(df_lyrics_pre.values)
df_lyrics_pre_scaled = pd.DataFrame(x_scaled, columns=df_lyrics_pre.columns)
inertia = []
sil = []
k_list = range(2, 11)
for k in k_list:
km = KMeans(n_clusters=k, random_state=0)
km.fit(df_lyrics_pre_scaled)
inertia.append(km.inertia_)
sil.append(silhouette_score(df_lyrics_pre_scaled, km.labels_))
plt.figure(figsize=(8, 6))
sns.lineplot(x=k_list, y=inertia, marker='o')
plt.xlim(min(k_list) - max(k_list) * 0.05, max(k_list) * 1.05)
plt.ylim(min(inertia) - max(inertia) * 0.05, max(inertia) * 1.05)
plt.xlabel('Clusters')
plt.ylabel('Inercia')
plt.title('Inercia por número de clusters')
plt.show()
plt.figure(figsize=(8, 6))
sns.lineplot(x=k_list, y=sil, marker='o', color=palette[1])
plt.xlim(min(k_list) - max(k_list) * 0.05, max(k_list) * 1.05)
plt.ylim(min(sil) - max(sil) * 0.05, max(sil) * 1.05)
plt.xlabel('Clusters')
plt.ylabel('Silhouette media')
plt.title('Silhouette media para el dataset')
plt.show()
X_std = df_lyrics_pre_scaled
k_list = [5, 6, 7]
for i, k in enumerate(k_list):
fig, (ax1, ax2) = plt.subplots(1, 2)
fig.set_size_inches(18, 7)
# Run the Kmeans algorithm
km = KMeans(n_clusters=k, random_state=0)
labels = km.fit_predict(X_std)
centroids = km.cluster_centers_
# Get silhouette samples
silhouette_vals = silhouette_samples(X_std, labels)
silho_score = silhouette_score(X_std, km.labels_)
# Silhouette plot
y_ticks = []
y_lower, y_upper = 0, 0
for i, cluster in enumerate(np.unique(labels)):
cluster_silhouette_vals = silhouette_vals[labels == cluster]
cluster_silhouette_vals.sort()
y_upper += len(cluster_silhouette_vals)
ax1.barh(range(y_lower, y_upper),
cluster_silhouette_vals,
edgecolor='none',
height=1)
ax1.text(-0.03, (y_lower + y_upper) / 2, str(i))
y_lower += len(cluster_silhouette_vals)
# Get the average silhouette score and plot it
avg_score = np.mean(silhouette_vals)
ax1.axvline(avg_score, linestyle='--', linewidth=2, color='0.7')
ax1.set_yticks([])
ax1.set_xlim([-0.1, 1.0])
ax1.set_xlabel('Coeficiente Silhouette')
ax1.set_ylabel('Etiquetas de los clusters')
ax1.set_title('Silhouette para cada cluster', y=1.00)
# Scatter plot of data colored with labels
sns.scatterplot(x=X_std.iloc[:, 0],
y=X_std.iloc[:, 1],
hue=labels,
palette=(palette*2)[:k],
ax=ax2)
ax2.scatter(centroids[:, 0],
centroids[:, 1],
marker='+',
c='0.2',
s=1000)
ax2.set_xlim([0, 1])
ax2.set_ylim([0, 1])
ax2.set_xlabel('Dimensión 1 del embedding')
ax2.set_ylabel('Dimensión 2 del embedding')
ax2.set_title('Visualización de los clusters', y=1.00)
ax2.set_aspect('equal')
plt.suptitle(f'Silhouette usando {k} clusters: ({silho_score:.3f})',
fontsize=18,
fontweight=550,
y=1.05)
plt.show()
lyrics_clustering = KMeans(n_clusters=5, random_state=0)
lyrics_clustering.fit(df_lyrics_pre_scaled)
lyrics_clusters = lyrics_clustering.labels_
features_audio = [
# Medidas de confianza
#"acousticness",
#"liveness",
#"speechiness",
#"instrumentalness",
# Medidas perceptivas
"energy",
#"loudness",
"danceability",
"valence",
# Descriptores músicales
#"key",
#"mode",
"tempo"
]
df_audio = df_songs[features_audio].copy()
n_components = 2
pca = PCA(n_components=n_components)
pca_audio = pca.fit_transform(df_audio)
#df_audio = pd.DataFrame(data=pca_audio, columns=list(range(n_components)))
scaler = MinMaxScaler()
x_scaled = scaler.fit_transform(df_audio.values)
df_audio_scaled = pd.DataFrame(x_scaled, columns=df_audio.columns)
inertia = []
sil = []
k_list = range(2, 11)
for k in k_list:
km = KMeans(n_clusters=k, random_state=0)
km.fit(df_audio_scaled)
inertia.append(km.inertia_)
sil.append(silhouette_score(df_audio_scaled, km.labels_))
plt.figure(figsize=(8, 6))
sns.lineplot(x=k_list, y=inertia, marker='o', color=palette[2])
plt.xlim(min(k_list) - max(k_list) * 0.05, max(k_list) * 1.05)
plt.ylim(min(inertia) - max(inertia) * 0.05, max(inertia) * 1.05)
plt.xlabel('Clusters')
plt.ylabel('Inercia')
plt.title('Inercia por número de clusters')
plt.show()
plt.figure(figsize=(8, 6))
sns.lineplot(x=k_list, y=sil, marker='o', color=palette[3])
plt.xlim(min(k_list) - max(k_list) * 0.05, max(k_list) * 1.05)
plt.ylim(min(sil) - max(sil) * 0.05, max(sil) * 1.05)
plt.xlabel('Clusters')
plt.ylabel('Silhouette media')
plt.title('Silhouette media para el dataset')
plt.show()
audio_clustering = KMeans(n_clusters=4, random_state=0)
audio_clustering.fit(df_audio_scaled)
audio_clusters = audio_clustering.labels_
df_songs[['sentiment', 'emotion']] = df_lyrics[['sentiment', 'emotion']]
df_songs['audio_clusters'] = audio_clusters
df_songs['lyrics_clusters'] = lyrics_clusters
crosstab = pd.crosstab(df_songs['audio_clusters'],
df_songs['lyrics_clusters'],
normalize='columns')
plt.figure(figsize=(8, 5))
sns.heatmap(
data=crosstab,
cbar=False,
square=True,
annot=True,
fmt= '.0%',
annot_kws={'size': 14},
xticklabels=crosstab.columns,
yticklabels=crosstab.index,
vmin=0,
vmax=None,
cmap=LinearSegmentedColormap.from_list('cmap', [
'#ffffff',
palette[0]
], N=256, gamma=1.0)
)
plt.title('Correlación entre clusters')
plt.xlabel('Clusters de la letras')
plt.ylabel('Clusters del audio')
plt.xticks(rotation=0)
plt.yticks(rotation=0)
plt.show()
for c_audio in range(df_songs.audio_clusters.max()+1):
cluster = df_songs.query('audio_clusters==@c_audio')
print(f'Cluster {c_audio} de audio')
print(f'\tEnergía: {cluster.energy.mean():.2f} (+/-{cluster.energy.std():.2f})')
print(f'\tTempo: {cluster.tempo.mean():.2f} (+/-{cluster.tempo.std():.2f})')
print(f'\tBailabilidad: {cluster.danceability.mean():.2f} (+/-{cluster.danceability.std():.2f})')
print(f'\tValencia: {cluster.valence.mean():.2f} (+/-{cluster.valence.std():.2f})\n')
stopwords = nltk.corpus.stopwords.words('spanish')
stopwords.append('yeah')
for c_lyric in range(df_songs.lyrics_clusters.max()+1):
cluster = df_songs.query('lyrics_clusters==@c_lyric')
all_words = nltk.tokenize.word_tokenize(
' '.join(list(cluster.lyrics_clean.str.lower())))
all_words_dist = nltk.FreqDist(
w.lower() for w in all_words
if (w not in stopwords) and (len(w) > 3))
most_common = [x[0] for x in all_words_dist.most_common(10)]
print(f'Cluster {c_lyric} de la letra')
print(f'\tCanciones explicitas: {cluster.song_is_explicit.mean():.0%}')
print(f'\tSentimiento POS: {cluster.sentiment.mean():.3f} (+/-{cluster.sentiment.std():.3f})')
print(f'\tEmoción: {cluster.emotion.mode()[0]}')
print(f'\tKeywords: {", ".join(most_common)}\n')
for c_audio in range(df_songs.audio_clusters.max()+1):
for c_lyric in range(df_songs.lyrics_clusters.max()+1):
print(f'Cluster {c_audio}-{c_lyric}')
cluster = (
df_songs.query('audio_clusters==@c_audio & lyrics_clusters==@c_lyric')
[['song_name', 'song_preview_url']]
)
try:
cluster_sample = cluster.sample(5, random_state=42)
except:
cluster_sample = cluster
#display(df_songs.query('audio_clusters == @c')[features_audio].describe())
for idx, row in cluster_sample.iterrows():
print(row['song_name'], '==>', row['song_preview_url'])
print('-' * 100)
df_complete = pd.concat([df_audio_scaled, df_lyrics_pre_scaled], axis=1)
n_components = 4
pca = PCA(n_components=n_components)
pca_complete = pca.fit_transform(df_complete)
inertia = []
sil = []
k_list = range(2, 11)
for k in k_list:
km = KMeans(n_clusters=k, random_state=0)
km.fit(pca_complete)
inertia.append(km.inertia_)
sil.append(silhouette_score(pca_complete, km.labels_))
plt.figure(figsize=(8, 6))
sns.lineplot(x=k_list, y=inertia, marker='o', color=palette[2])
plt.xlim(min(k_list) - max(k_list) * 0.05, max(k_list) * 1.05)
plt.ylim(min(inertia) - max(inertia) * 0.05, max(inertia) * 1.05)
plt.xlabel('Clusters')
plt.ylabel('Inercia')
plt.title('Inercia por número de clusters')
plt.show()
plt.figure(figsize=(8, 6))
sns.lineplot(x=k_list, y=sil, marker='o', color=palette[3])
plt.xlim(min(k_list) - max(k_list) * 0.05, max(k_list) * 1.05)
plt.ylim(min(sil) - max(sil) * 0.05, max(sil) * 1.05)
plt.xlabel('Clusters')
plt.ylabel('Silhouette media')
plt.title('Silhouette media para el dataset')
plt.show()
k_full = 4
km = KMeans(n_clusters=k_full, random_state=0)
km.fit(pca_complete)
full_clusters = km.labels_
df_songs['full_clusters'] = full_clusters
stopwords = nltk.corpus.stopwords.words('spanish')
stopwords.append('yeah')
for c_full in range(df_songs.full_clusters.max()+1):
cluster = df_songs.query('full_clusters==@c_full')
all_words = nltk.tokenize.word_tokenize(
' '.join(list(cluster.lyrics_clean.str.lower())))
all_words_dist = nltk.FreqDist(
w.lower() for w in all_words
if (w not in stopwords) and (len(w) > 3))
most_common = [x[0] for x in all_words_dist.most_common(10)]
print(f'Cluster {c_full}')
print(f'\tNúmero de canciones: {len(cluster)}')
print(f'\tCanciones explicitas: {cluster.song_is_explicit.mean():.0%}')
print(f'\tSentimiento POS: {cluster.sentiment.mean():.3f}')
print(f'\tEmoción: {cluster.emotion.mode()[0]}')
print(f'\tEnergía: {cluster.energy.mean():.2f}')
print(f'\tTempo: {cluster.tempo.mean():.2f}')
print(f'\tBailabilidad: {cluster.danceability.mean():.2f}')
print(f'\tValencia: {cluster.valence.mean():.2f}')
print(f'\tKeywords: {", ".join(most_common)}')
print(f'\tAlbum: {cluster.album_name.mode()[0]}')
print(f'\tEjemplares:')
try:
cluster_sample = cluster.sample(10, random_state=42)
except:
cluster_sample = cluster
finally:
for idx, row in cluster_sample.iterrows():
print(f'\t\t{row["song_name"]} ==> {row["song_preview_url"]}')
print('-'*150)