import math
import tempfile
from typing import Optional
import whisper
from fastapi import UploadFile
from google.cloud import texttospeech
from pydub import AudioSegment
from databases.bucket import save_bucket_audio_content
from agents.conversation.conversation import Conversation
from databases.firebase import set_audio_file_response_gpt
from agents.conversation.translate import detect_language, translate_text_from_to
from utils import init_speech_client
class AudioFile:
def __init__(self, file: UploadFile, user_id: str, chunck_duration: int):
self.file = file
self.user_id = user_id
self.chunk_duration = chunck_duration
def save_chunk_to_tempfile(self, chunk):
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_audio_file:
# Guardar el chunk en el archivo temporal
chunk.export(tmp_audio_file.name, format="wav")
# Retornar la ruta del archivo temporal
return tmp_audio_file.name
def load_file_audio(self):
# load audio and pad/trim it to fit 30 seconds
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_audio_file:
# Leer los contenidos del archivo upload_file
contents = self.file.file.read()
file_path_name = save_bucket_audio_content(
file=contents, user_id=self.user_id, assistant=False
)
# Escribir los contenidos en el archivo temporal
tmp_audio_file.write(contents)
# Obtener la ruta del archivo temporal
return tmp_audio_file.name, file_path_name
def split_audio_file(self):
file_read, file_path_name = self.load_file_audio()
sound = AudioSegment.from_file(file=file_read)
num_chunks = math.ceil(len(sound) / self.chunk_duration)
chunks_paths = []
for i in range(num_chunks):
start = i * self.chunk_duration
end = start + self.chunk_duration
if end > len(sound):
end = len(sound)
chunk = sound[start:end]
chunks_paths.append(self.save_chunk_to_tempfile(chunk))
return chunks_paths, file_path_name
class Audio:
def __init__(self, file: UploadFile, user_id: str):
self.user_id = user_id
self.file = file
self.audio_file = AudioFile(
file=self.file, user_id=self.user_id, chunck_duration=30000
)
self.text = ""
self.language_to_translate = ""
def add_attributes(self, text: str, language_to_translate: str):
self.text = text
self.language_to_translate = language_to_translate
def language_to_idiom(self, language: str):
if language == "es":
return "Spanish"
elif language == "en":
return "English"
elif language == "pt":
return "Portuguese"
elif language == "fr":
return "French"
else:
return "English"
def voice_to_select_voice(self, voice: str):
if voice == "English":
return {"language_code": "en-US", "name": "en-US-Standard-A"}
elif voice == "Spanish":
return {"language_code": "es-US", "name": "es-US-Standard-A"}
elif voice == "French":
return {"language_code": "fr-FR", "name": "fr-FR-Standard-A"}
elif voice == "Portuguese":
return {"language_code": "pt-BR", "name": "pt-BR-Standard-A"}
else:
return {"language_code": "en-US", "name": "en-US-Standard-A"}
def transcribe_audio(self):
model = whisper.load_model("base")
chunks_paths, file_path_name = self.audio_file.split_audio_file()
full_text = ""
for chunk_path in chunks_paths:
audio = whisper.load_audio(file=chunk_path)
audio = whisper.pad_or_trim(audio)
# make log-Mel spectrogram and move to the same device as the model
mel = whisper.log_mel_spectrogram(audio).to(model.device)
# decode the audio
options = whisper.DecodingOptions(
fp16=False, temperature=0.5, suppress_blank=True
)
result = whisper.decode(model, mel, options)
full_text += result.text
return full_text, file_path_name
# Función para generar un archivo de audio a partir de un texto utilizando langchain
def text_to_speech(self):
translation = ""
client_speech = init_speech_client()
source_lang = detect_language(self.text)
if source_lang != self.language_to_translate:
translation = (
translate_text_from_to(
self.language_to_idiom(source_lang),
self.language_to_translate,
self.text,
)
.choices[0]
.text.strip()
)
else:
translation = self.text
input_text = texttospeech.SynthesisInput(text=translation)
voice_selected = self.voice_to_select_voice(
self.language_to_idiom(self.language_to_translate)
)
# Note: the voice can also be specified by name.
# Names of voices can be retrieved with client.list_voices().
voice = texttospeech.VoiceSelectionParams(
language_code=voice_selected["language_code"],
name=voice_selected["name"],
ssml_gender=texttospeech.SsmlVoiceGender.FEMALE,
)
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
response = client_speech.synthesize_speech(
request={"input": input_text, "voice": voice, "audio_config": audio_config}
)
file_path_name = save_bucket_audio_content(
file=response.audio_content, user_id=self.user_id, assistant=True
)
return file_path_name
class AudioConversation:
def __init__(
self,
user_id: str,
file: UploadFile,
language: str,
conversation_id: Optional[str] = None,
):
self.user_id = user_id
self.file = file
self.language = language
self.conversation_id = conversation_id
def speech_to_speech_conversation(self):
# Transcribe el archivo de audio utilizando langchain
audio = Audio(file=self.file, user_id=self.user_id)
text, file_path_name = audio.transcribe_audio()
# Genera una respuesta utilizando GPT-3.5 de OpenAI
conversation = Conversation(
human_prompt=text,
user_id=self.user_id,
conversation_id=self.conversation_id,
audio_file_path=file_path_name,
)
result = conversation.conversation_int()
audio.add_attributes(
text=result["response"], language_to_translate=self.language
)
# Genera un archivo de audio a partir de la respuesta utilizando langchain
file_response_path_name = audio.text_to_speech()
set_audio_file_response_gpt(
user_id=self.user_id,
conversation_id=conversation.conversation_id,
file_name_path_response=file_response_path_name,
)
return {
"conversation_id": conversation.conversation_id,
"user_id": self.user_id,
"file_name_user": file_path_name,
"file_name_assistant_response": file_response_path_name,
}
2023-04-24 05:36:20.491761: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-04-24 05:36:20.596101: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-04-24 05:36:20.600470: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-04-24 05:36:20.600486: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-04-24 05:36:20.624086: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-04-24 05:36:21.132174: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-04-24 05:36:21.132229: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
2023-04-24 05:36:21.132236: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
/root/venv/lib/python3.9/site-packages/pinecone/index.py:4: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
from tqdm.autonotebook import tqdm