-
Notifications
You must be signed in to change notification settings - Fork 0
/
texttospeech.py
143 lines (118 loc) · 5.17 KB
/
texttospeech.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
import base64
import requests
from pathlib import Path
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips
from textprocess import identifyUnicode
def createTTS(post_id,input_text):
# Limit of 300 characters per request.
max_length = 300
input_file = os.path.join("posts", f"{post_id}.txt")
output_file = f"{post_id}.mp3"
# Split the input text into chunks of maximum length 300 characters
input_chunks = [input_text[i:i+max_length] for i in range(0, len(input_text), max_length)]
# Set the speaker ID
speaker_id = "en_us_010"
# Set the API endpoint and headers
url = "https://api22-normal-c-useast1a.tiktokv.com/media/api/text/speech/invoke/"
headers = {
'User-Agent': 'com.zhiliaoapp.musically/2022600030 (Linux; U; Android 7.1.2; es_ES; SM-G988N; Build/NRD90M;tt-ok/3.12.13.1)',
'Cookie': 'sessionid=90c38a59d8076ea0fbc01c8643efbe47'
}
# Loop through the input chunks and make API requests for each chunk
audio_data_list = []
for input_chunk in input_chunks:
# Set the request parameters
params = {
"text_speaker": speaker_id,
"req_text": input_chunk,
"speaker_map_type": "0",
"aid": "1233"
}
# Send the request and get the response
r = requests.post(url, headers=headers, params=params)
# Check for errors
if r.json()["message"] == "Couldn't load speech. Try again.":
print("Session ID was not correct, or the service is unavailable")
exit()
if r.json()["message"] == "This voice is unavailable now":
print("Voice ID entered does not exist or was entered incorrectly.")
exit()
# Extract the audio data from the response
audio_data = base64.b64decode(r.json()["data"]["v_str"])
audio_data_list.append(audio_data)
# Concatenate the audio data from all the chunks
concatenated_audio_data = b"".join(audio_data_list)
if os.path.exists(os.path.join("mp3", output_file)):
print(f"{post_id} - Skipping MP3 creation for {post_id}. File already exists.")
return None
path = Path("mp3") / output_file
with open(path, "wb") as f:
f.write(concatenated_audio_data)
#upload_to_drive(output_file,"1w4IUHvVJ2S_qJRCceSokCMu9IrDc5a9P")
print(f"Conversion complete. MP3 file saved as {output_file}.")
return concatenated_audio_data
def wcreateTTS(post_id,input_text):
# Limit of 300 characters per request.
max_length = 300
input_file = os.path.join("posts", f"{post_id}.txt")
output_file = f"{post_id}.mp3"
# Load the input text
with open(input_file, "r") as f:
input_text = f.read()
# Split the input text into chunks of maximum length 300 characters
input_chunks = []
current_chunk = ""
for word in input_text.split():
if len(current_chunk) + len(word) < max_length:
current_chunk += word + " "
elif current_chunk != "":
if current_chunk[-1] in (".", " "):
input_chunks.append(current_chunk.strip())
current_chunk = word + " "
else:
last_space_index = max(current_chunk.rfind("."), current_chunk.rfind(" "))
if last_space_index == -1:
input_chunks.append(current_chunk.strip())
current_chunk = word + " "
else:
input_chunks.append(current_chunk[:last_space_index].strip())
current_chunk = current_chunk[last_space_index+1:] + word + " "
if current_chunk != "":
input_chunks.append(current_chunk.strip())
# Set the speaker ID
speaker_id = "en_us_001"
# Set the API endpoint and headers
url = "https://tiktok-tts.weilnet.workers.dev/api/generation"
headers = {
'Content-Type': 'application/json'
}
# Loop through the input chunks and make API requests for each chunk
audio_data_list = []
for input_chunk in input_chunks:
# Set the request body
data = {
"text": input_chunk,
"voice": speaker_id
}
# Send the request and get the response
r = requests.post(url, headers=headers, json=data)
# Check for errors
if r.status_code != 200:
print(f"Error {r.status_code} occurred for input chunk: {input_chunk}")
identifyUnicode(input_chunk) # Call the count_unicode_chars function
exit()
# Extract the audio data from the response
audio_data = base64.b64decode(r.json()["data"])
audio_data_list.append(audio_data)
# Concatenate the audio data from all the chunks
concatenated_audio_data = b"".join(audio_data_list)
if os.path.exists(os.path.join("mp3", output_file)):
print(f"{post_id} - Skipping MP3 creation for {post_id}. File already exists.")
return None
path = Path("mp3") / output_file
with open(path, "wb") as f:
f.write(concatenated_audio_data)
#upload_to_drive(output_file,"1w4IUHvVJ2S_qJRCceSokCMu9IrDc5a9P")
print(f"Conversion complete. MP3 file saved as {output_file}.")
return concatenated_audio_data