Skip to content
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 86 additions & 41 deletions TTS/engine_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#!/usr/bin/env python3
import os
from pathlib import Path
from typing import Tuple
import re

# import sox
# from mutagen import MutagenError
# from mutagen.mp3 import MP3, HeaderNotFoundError
import numpy as np
import translators as ts
from moviepy.audio.AudioClip import AudioClip
from moviepy.audio.fx.volumex import volumex
from rich.progress import track
from moviepy.editor import AudioFileClip, CompositeAudioClip, concatenate_audioclips
from utils.console import print_step, print_substep
Expand All @@ -17,7 +21,6 @@


class TTSEngine:

"""Calls the given TTS engine to reduce code duplication and allow multiple TTS engines.

Args:
Expand All @@ -31,11 +34,11 @@ class TTSEngine:
"""

def __init__(
self,
tts_module,
reddit_object: dict,
path: str = "assets/temp/mp3",
max_length: int = DEFUALT_MAX_LENGTH,
self,
tts_module,
reddit_object: dict,
path: str = "assets/temp/mp3",
max_length: int = DEFUALT_MAX_LENGTH,
):
self.tts_module = tts_module()
self.reddit_object = reddit_object
Expand All @@ -57,8 +60,8 @@ def run(self) -> Tuple[int, int]:

self.call_tts("title", self.reddit_object["thread_title"])
if (
self.reddit_object["thread_post"] != ""
and settings.config["settings"]["storymode"] == True
self.reddit_object["thread_post"] != ""
and settings.config["settings"]["storymode"] == True
):
self.call_tts("posttext", self.reddit_object["thread_post"])

Expand All @@ -68,7 +71,7 @@ def run(self) -> Tuple[int, int]:
if self.length > self.max_length:
break
if (
len(comment["comment_body"]) > self.tts_module.max_chars
len(comment["comment_body"]) > self.tts_module.max_chars
): # Split the comment if it is too long
self.split_post(comment["comment_body"], idx) # Split the comment
else: # If the comment is not too long, just call the tts engine
Expand All @@ -85,42 +88,84 @@ def split_post(self, text: str, idx: int):
r" *(((.|\n){0," + str(self.tts_module.max_chars) + "})(\.|.$))", text
)
]
offset = 0
try:
silence_duration = settings.config["settings"]["tts"]["silence_duration"]
except AttributeError:
silence_duration = 0.3
silence = AudioClip(make_frame=lambda t: np.sin(440 * 2 * np.pi * t), duration=silence_duration, fps=44100)
silence = volumex(silence, 0)
silence.write_audiofile(f"{self.path}/silence.mp3", fps=44100, verbose=False, logger=None)

idy = None
for idy, text_cut in enumerate(split_text):
# print(f"{idx}-{idy}: {text_cut}\n")
if not text_cut or text_cut.isspace():
offset += 1
continue

self.call_tts(f"{idx}-{idy - offset}.part", text_cut)
split_files.append(AudioFileClip(f"{self.path}/{idx}-{idy - offset}.part.mp3"))

CompositeAudioClip([concatenate_audioclips(split_files)]).write_audiofile(
f"{self.path}/{idx}.mp3", fps=44100, verbose=False, logger=None
)

for i in split_files:
name = i.filename
i.close()
Path(name).unlink()
newtext = process_text(text_cut)
#print(f"{idx}-{idy}: {newtext}\n")

# for i in range(0, idy + 1):
# print(f"Cleaning up {self.path}/{idx}-{i}.part.mp3")

# Path(f"{self.path}/{idx}-{i}.part.mp3").unlink()
if not newtext or newtext.isspace():
print("newtext was blank because sanitized split text resulted in none")
continue
else:
self.call_tts(f"{idx}-{idy}.part", newtext)
with open(f"{self.path}/list.txt", 'w') as f:
for idz in range(0, len(split_text)):
f.write("file " + f"'{idx}-{idz}.part.mp3'" + "\n")
split_files.append(str(f"{self.path}/{idx}-{idy}.part.mp3"))
f.write("file " + f"'silence.mp3'" + "\n")
f.close()

os.system("ffmpeg -f concat -y -hide_banner -loglevel panic -safe 0 " +
"-i " + f"{self.path}/list.txt " +
"-c copy " + f"{self.path}/{idx}.mp3")
try:
for i in range(0, len(split_files)):
os.unlink(split_files[i])
except FileNotFoundError:
print("file not found error")
except OSError:
print("OSError")

def call_tts(self, filename: str, text: str):
self.tts_module.run(text=process_text(text), filepath=f"{self.path}/{filename}.mp3")
# try:
# self.length += MP3(f"{self.path}/{filename}.mp3").info.length
# except (MutagenError, HeaderNotFoundError):
# self.length += sox.file_info.duration(f"{self.path}/{filename}.mp3")
try:
clip = AudioFileClip(f"{self.path}/{filename}.mp3")
self.length += clip.duration
clip.close()
except:
self.length = 0

if filename == "title":
try:
self.tts_module.run(text, filepath=f"{self.path}/title_no_silence.mp3")
try:
silence_duration = settings.config["settings"]["tts"]["silence_duration"]
except AttributeError:
silence_duration = 0.3
silence = AudioClip(make_frame=lambda t: np.sin(440 * 2 * np.pi * t), duration=silence_duration,
fps=44100)
silence = volumex(silence, 0)
silence.write_audiofile(f"{self.path}/silence.mp3", fps=44100, verbose=False, logger=None)

with open(f"{self.path}/title.txt", 'w') as f:
f.write("file " + f"'title_no_silence.mp3'" + "\n")
f.write("file " + f"'silence.mp3'" + "\n")
f.close()
os.system("ffmpeg -f concat -y -hide_banner -loglevel panic -safe 0 " +
"-i " + f"{self.path}/title.txt " +
"-c copy " + f"{self.path}/title.mp3")
clip = AudioFileClip(f"{self.path}/title.mp3")
self.length += clip.duration
clip.close()
try:
name = ["title_no_silence.mp3", "silence.mp3", "title.txt"]
for i in range(0, len(name)):
os.unlink(str(rf"{self.path}/" + name[i]))
except FileNotFoundError:
print("file not found error")
except OSError:
print("OSError")
except:
self.length = 0
else:
try:
self.tts_module.run(text=text, filepath=f"{self.path}/{filename}.mp3")
clip = AudioFileClip(f"{self.path}/{filename}.mp3")
self.length += clip.duration
clip.close()
except:
self.length = 0


def process_text(text: str):
Expand Down