16 Commits

Author SHA1 Message Date
Harry
0bfec956c5 Merge pull request #658 from harry0703/dev
bump version to 1.2.6
2025-05-10 14:14:42 +08:00
harry
fec3a8b6bd Merge branch 'add-siliconflow-tts' into dev 2025-05-10 14:13:37 +08:00
harry
3108c2e4e5 perf: bump version to 1.2.6 2025-05-10 14:13:18 +08:00
Harry
d8dd1f1acf Merge pull request #657 from harry0703/add-siliconflow-tts
feat: update SiliconFlow API Key descriptions in localization files
2025-05-10 14:12:11 +08:00
Harry
208ea5c11b Merge pull request #653 from yyhhyyyyyy/add-siliconflow-tts
feat: Increase SiliconFlow TTS services.
2025-05-10 14:11:26 +08:00
harry
71d791a9af feat: update SiliconFlow API Key descriptions in localization files 2025-05-10 14:10:42 +08:00
Harry
03a06f141c Merge pull request #655 from harry0703/dev
Dev
2025-05-10 13:27:27 +08:00
harry
4c9ac5e6df feat: loop video clips to match audio duration 2025-05-10 13:26:24 +08:00
harry
4a64e211f9 fix: correct condition for subclipping 2025-05-10 12:35:45 +08:00
harry
97c631e696 feat: improve file extension parsing using pathlib 2025-05-10 12:34:53 +08:00
harry
a601705bf4 feat: add unit tests 2025-05-10 12:34:37 +08:00
yyhhyyyyyy
45f32756a3 feat: increase siliconflow TTS services 2025-05-09 23:31:04 +08:00
yyhhyyyyyy
22f47d90de feat: add TTS services provider selection list 2025-05-09 22:14:43 +08:00
Harry
c03dc9c984 Merge pull request #652 from harry0703/dev
perf: optimize memory usage and processing performance, bump version to 1.2.5
2025-05-09 20:56:14 +08:00
harry
7569c08a62 perf: bump version to 1.2.5 2025-05-09 20:55:36 +08:00
harry
f07e5802f7 perf: optimize memory usage and processing performance 2025-05-09 20:55:12 +08:00
28 changed files with 773 additions and 193 deletions

View File

@@ -36,6 +36,7 @@ def save_config():
with open(config_file, "w", encoding="utf-8") as f:
_cfg["app"] = app
_cfg["azure"] = azure
_cfg["siliconflow"] = siliconflow
_cfg["ui"] = ui
f.write(toml.dumps(_cfg))
@@ -45,9 +46,13 @@ app = _cfg.get("app", {})
whisper = _cfg.get("whisper", {})
proxy = _cfg.get("proxy", {})
azure = _cfg.get("azure", {})
ui = _cfg.get("ui", {
"hide_log": False,
})
siliconflow = _cfg.get("siliconflow", {})
ui = _cfg.get(
"ui",
{
"hide_log": False,
},
)
hostname = socket.gethostname()
@@ -59,7 +64,7 @@ project_description = _cfg.get(
"project_description",
"<a href='https://github.com/harry0703/MoneyPrinterTurbo'>https://github.com/harry0703/MoneyPrinterTurbo</a>",
)
project_version = _cfg.get("project_version", "1.2.4")
project_version = _cfg.get("project_version", "1.2.6")
reload_debug = False
imagemagick_path = app.get("imagemagick_path", "")

View File

@@ -1,8 +1,10 @@
import glob
import itertools
import os
import random
import gc
import shutil
from typing import List
from loguru import logger
from moviepy import (
AudioFileClip,
@@ -29,6 +31,72 @@ from app.models.schema import (
from app.services.utils import video_effects
from app.utils import utils
class SubClippedVideoClip:
def __init__(self, file_path, start_time=None, end_time=None, width=None, height=None, duration=None):
self.file_path = file_path
self.start_time = start_time
self.end_time = end_time
self.width = width
self.height = height
if duration is None:
self.duration = end_time - start_time
else:
self.duration = duration
def __str__(self):
return f"SubClippedVideoClip(file_path={self.file_path}, start_time={self.start_time}, end_time={self.end_time}, duration={self.duration}, width={self.width}, height={self.height})"
audio_codec = "aac"
video_codec = "libx264"
fps = 30
def close_clip(clip):
if clip is None:
return
try:
# close main resources
if hasattr(clip, 'reader') and clip.reader is not None:
clip.reader.close()
# close audio resources
if hasattr(clip, 'audio') and clip.audio is not None:
if hasattr(clip.audio, 'reader') and clip.audio.reader is not None:
clip.audio.reader.close()
del clip.audio
# close mask resources
if hasattr(clip, 'mask') and clip.mask is not None:
if hasattr(clip.mask, 'reader') and clip.mask.reader is not None:
clip.mask.reader.close()
del clip.mask
# handle child clips in composite clips
if hasattr(clip, 'clips') and clip.clips:
for child_clip in clip.clips:
if child_clip is not clip: # avoid possible circular references
close_clip(child_clip)
# clear clip list
if hasattr(clip, 'clips'):
clip.clips = []
except Exception as e:
logger.error(f"failed to close clip: {str(e)}")
del clip
gc.collect()
def delete_files(files: List[str] | str):
if isinstance(files, str):
files = [files]
for file in files:
try:
os.remove(file)
except:
pass
def get_bgm_file(bgm_type: str = "random", bgm_file: str = ""):
if not bgm_type:
@@ -58,84 +126,75 @@ def combine_videos(
) -> str:
audio_clip = AudioFileClip(audio_file)
audio_duration = audio_clip.duration
logger.info(f"max duration of audio: {audio_duration} seconds")
logger.info(f"audio duration: {audio_duration} seconds")
# Required duration of each clip
req_dur = audio_duration / len(video_paths)
req_dur = max_clip_duration
logger.info(f"each clip will be maximum {req_dur} seconds long")
logger.info(f"maximum clip duration: {req_dur} seconds")
output_dir = os.path.dirname(combined_video_path)
aspect = VideoAspect(video_aspect)
video_width, video_height = aspect.to_resolution()
clips = []
processed_clips = []
subclipped_items = []
video_duration = 0
raw_clips = []
for video_path in video_paths:
clip = VideoFileClip(video_path).without_audio()
clip = VideoFileClip(video_path)
clip_duration = clip.duration
clip_w, clip_h = clip.size
close_clip(clip)
start_time = 0
while start_time < clip_duration:
end_time = min(start_time + max_clip_duration, clip_duration)
split_clip = clip.subclipped(start_time, end_time)
raw_clips.append(split_clip)
# logger.info(f"splitting from {start_time:.2f} to {end_time:.2f}, clip duration {clip_duration:.2f}, split_clip duration {split_clip.duration:.2f}")
if clip_duration - start_time >= max_clip_duration:
subclipped_items.append(SubClippedVideoClip(file_path= video_path, start_time=start_time, end_time=end_time, width=clip_w, height=clip_h))
start_time = end_time
if video_concat_mode.value == VideoConcatMode.sequential.value:
break
# random video_paths order
# random subclipped_items order
if video_concat_mode.value == VideoConcatMode.random.value:
random.shuffle(raw_clips)
random.shuffle(subclipped_items)
logger.debug(f"total subclipped items: {len(subclipped_items)}")
# Add downloaded clips over and over until the duration of the audio (max_duration) has been reached
while video_duration < audio_duration:
for clip in raw_clips:
# Check if clip is longer than the remaining audio
if (audio_duration - video_duration) < clip.duration:
clip = clip.subclipped(0, (audio_duration - video_duration))
# Only shorten clips if the calculated clip length (req_dur) is shorter than the actual clip to prevent still image
elif req_dur < clip.duration:
clip = clip.subclipped(0, req_dur)
clip = clip.with_fps(30)
for i, subclipped_item in enumerate(subclipped_items):
if video_duration > audio_duration:
break
logger.debug(f"processing clip {i+1}: {subclipped_item.width}x{subclipped_item.height}, current duration: {video_duration:.2f}s, remaining: {audio_duration - video_duration:.2f}s")
try:
clip = VideoFileClip(subclipped_item.file_path).subclipped(subclipped_item.start_time, subclipped_item.end_time)
clip_duration = clip.duration
# Not all videos are same size, so we need to resize them
clip_w, clip_h = clip.size
if clip_w != video_width or clip_h != video_height:
clip_ratio = clip.w / clip.h
video_ratio = video_width / video_height
logger.debug(f"resizing clip, source: {clip_w}x{clip_h}, ratio: {clip_ratio:.2f}, target: {video_width}x{video_height}, ratio: {video_ratio:.2f}")
if clip_ratio == video_ratio:
# Resize proportionally
clip = clip.resized((video_width, video_height))
clip = clip.resized(new_size=(video_width, video_height))
else:
# Resize proportionally
if clip_ratio > video_ratio:
# Resize proportionally based on the target width
scale_factor = video_width / clip_w
else:
# Resize proportionally based on the target height
scale_factor = video_height / clip_h
new_width = int(clip_w * scale_factor)
new_height = int(clip_h * scale_factor)
clip_resized = clip.resized(new_size=(new_width, new_height))
background = ColorClip(
size=(video_width, video_height), color=(0, 0, 0)
)
clip = CompositeVideoClip(
[
background.with_duration(clip.duration),
clip_resized.with_position("center"),
]
)
background = ColorClip(size=(video_width, video_height), color=(0, 0, 0)).with_duration(clip_duration)
clip_resized = clip.resized(new_size=(new_width, new_height)).with_position("center")
clip = CompositeVideoClip([background, clip_resized])
logger.info(
f"resizing video to {video_width} x {video_height}, clip size: {clip_w} x {clip_h}"
)
close_clip(clip_resized)
close_clip(background)
shuffle_side = random.choice(["left", "right", "top", "bottom"])
if video_transition_mode.value == VideoTransitionMode.none.value:
@@ -161,23 +220,92 @@ def combine_videos(
if clip.duration > max_clip_duration:
clip = clip.subclipped(0, max_clip_duration)
clips.append(clip)
# wirte clip to temp file
clip_file = f"{output_dir}/temp-clip-{i+1}.mp4"
clip.write_videofile(clip_file, logger=None, fps=fps, codec=video_codec)
close_clip(clip)
processed_clips.append(SubClippedVideoClip(file_path=clip_file, duration=clip.duration, width=clip_w, height=clip_h))
video_duration += clip.duration
clips = [CompositeVideoClip([clip]) for clip in clips]
video_clip = concatenate_videoclips(clips)
video_clip = video_clip.with_fps(30)
logger.info("writing")
# https://github.com/harry0703/MoneyPrinterTurbo/issues/111#issuecomment-2032354030
video_clip.write_videofile(
filename=combined_video_path,
threads=threads,
logger=None,
temp_audiofile_path=output_dir,
audio_codec="aac",
fps=30,
)
video_clip.close()
logger.success("completed")
except Exception as e:
logger.error(f"failed to process clip: {str(e)}")
# loop processed clips until the video duration matches or exceeds the audio duration.
if video_duration < audio_duration:
logger.warning(f"video duration ({video_duration:.2f}s) is shorter than audio duration ({audio_duration:.2f}s), looping clips to match audio length.")
base_clips = processed_clips.copy()
for clip in itertools.cycle(base_clips):
if video_duration >= audio_duration:
break
processed_clips.append(clip)
video_duration += clip.duration
logger.info(f"video duration: {video_duration:.2f}s, audio duration: {audio_duration:.2f}s, looped {len(processed_clips)-len(base_clips)} clips")
# merge video clips progressively, avoid loading all videos at once to avoid memory overflow
logger.info("starting clip merging process")
if not processed_clips:
logger.warning("no clips available for merging")
return combined_video_path
# if there is only one clip, use it directly
if len(processed_clips) == 1:
logger.info("using single clip directly")
shutil.copy(processed_clips[0].file_path, combined_video_path)
delete_files(processed_clips)
logger.info("video combining completed")
return combined_video_path
# create initial video file as base
base_clip_path = processed_clips[0].file_path
temp_merged_video = f"{output_dir}/temp-merged-video.mp4"
temp_merged_next = f"{output_dir}/temp-merged-next.mp4"
# copy first clip as initial merged video
shutil.copy(base_clip_path, temp_merged_video)
# merge remaining video clips one by one
for i, clip in enumerate(processed_clips[1:], 1):
logger.info(f"merging clip {i}/{len(processed_clips)-1}, duration: {clip.duration:.2f}s")
try:
# load current base video and next clip to merge
base_clip = VideoFileClip(temp_merged_video)
next_clip = VideoFileClip(clip.file_path)
# merge these two clips
merged_clip = concatenate_videoclips([base_clip, next_clip])
# save merged result to temp file
merged_clip.write_videofile(
filename=temp_merged_next,
threads=threads,
logger=None,
temp_audiofile_path=output_dir,
audio_codec=audio_codec,
fps=fps,
)
close_clip(base_clip)
close_clip(next_clip)
close_clip(merged_clip)
# replace base file with new merged file
delete_files(temp_merged_video)
os.rename(temp_merged_next, temp_merged_video)
except Exception as e:
logger.error(f"failed to merge clip: {str(e)}")
continue
# after merging, rename final result to target file name
os.rename(temp_merged_video, combined_video_path)
# clean temp files
clip_files = [clip.file_path for clip in processed_clips]
delete_files(clip_files)
logger.info("video combining completed")
return combined_video_path
@@ -194,8 +322,6 @@ def wrap_text(text, max_width, font="Arial", fontsize=60):
if width <= max_width:
return text, height
# logger.warning(f"wrapping text, max_width: {max_width}, text_width: {width}, text: {text}")
processed = True
_wrapped_lines_ = []
@@ -218,7 +344,6 @@ def wrap_text(text, max_width, font="Arial", fontsize=60):
_wrapped_lines_ = [line.strip() for line in _wrapped_lines_]
result = "\n".join(_wrapped_lines_).strip()
height = len(_wrapped_lines_) * height
# logger.warning(f"wrapped text: {result}")
return result, height
_wrapped_lines_ = []
@@ -235,7 +360,6 @@ def wrap_text(text, max_width, font="Arial", fontsize=60):
_wrapped_lines_.append(_txt_)
result = "\n".join(_wrapped_lines_).strip()
height = len(_wrapped_lines_) * height
# logger.warning(f"wrapped text: {result}")
return result, height
@@ -249,7 +373,7 @@ def generate_video(
aspect = VideoAspect(params.video_aspect)
video_width, video_height = aspect.to_resolution()
logger.info(f"start, video size: {video_width} x {video_height}")
logger.info(f"generating video: {video_width} x {video_height}")
logger.info(f" ① video: {video_path}")
logger.info(f" ② audio: {audio_path}")
logger.info(f" ③ subtitle: {subtitle_path}")
@@ -268,7 +392,7 @@ def generate_video(
if os.name == "nt":
font_path = font_path.replace("\\", "/")
logger.info(f"using font: {font_path}")
logger.info(f" font: {font_path}")
def create_text_clip(subtitle_item):
params.font_size = int(params.font_size)
@@ -314,7 +438,7 @@ def generate_video(
_clip = _clip.with_position(("center", "center"))
return _clip
video_clip = VideoFileClip(video_path)
video_clip = VideoFileClip(video_path).without_audio()
audio_clip = AudioFileClip(audio_path).with_effects(
[afx.MultiplyVolume(params.voice_volume)]
)
@@ -353,15 +477,14 @@ def generate_video(
video_clip = video_clip.with_audio(audio_clip)
video_clip.write_videofile(
output_file,
audio_codec="aac",
audio_codec=audio_codec,
temp_audiofile_path=output_dir,
threads=params.n_threads or 2,
logger=None,
fps=30,
fps=fps,
)
video_clip.close()
del video_clip
logger.success("completed")
def preprocess_video(materials: List[MaterialInfo], clip_duration=4):
@@ -378,7 +501,7 @@ def preprocess_video(materials: List[MaterialInfo], clip_duration=4):
width = clip.size[0]
height = clip.size[1]
if width < 480 or height < 480:
logger.warning(f"video is too small, width: {width}, height: {height}")
logger.warning(f"low resolution material: {width}x{height}, minimum 480x480 required")
continue
if ext in const.FILE_TYPE_IMAGES:
@@ -405,68 +528,7 @@ def preprocess_video(materials: List[MaterialInfo], clip_duration=4):
# Output the video to a file.
video_file = f"{material.url}.mp4"
final_clip.write_videofile(video_file, fps=30, logger=None)
final_clip.close()
del final_clip
close_clip(clip)
material.url = video_file
logger.success(f"completed: {video_file}")
logger.success(f"image processed: {video_file}")
return materials
if __name__ == "__main__":
m = MaterialInfo()
m.url = "/Users/harry/Downloads/IMG_2915.JPG"
m.provider = "local"
materials = preprocess_video([m], clip_duration=4)
print(materials)
# txt_en = "Here's your guide to travel hacks for budget-friendly adventures"
# txt_zh = "测试长字段这是您的旅行技巧指南帮助您进行预算友好的冒险"
# font = utils.resource_dir() + "/fonts/STHeitiMedium.ttc"
# for txt in [txt_en, txt_zh]:
# t, h = wrap_text(text=txt, max_width=1000, font=font, fontsize=60)
# print(t)
#
# task_id = "aa563149-a7ea-49c2-b39f-8c32cc225baf"
# task_dir = utils.task_dir(task_id)
# video_file = f"{task_dir}/combined-1.mp4"
# audio_file = f"{task_dir}/audio.mp3"
# subtitle_file = f"{task_dir}/subtitle.srt"
# output_file = f"{task_dir}/final.mp4"
#
# # video_paths = []
# # for file in os.listdir(utils.storage_dir("test")):
# # if file.endswith(".mp4"):
# # video_paths.append(os.path.join(utils.storage_dir("test"), file))
# #
# # combine_videos(combined_video_path=video_file,
# # audio_file=audio_file,
# # video_paths=video_paths,
# # video_aspect=VideoAspect.portrait,
# # video_concat_mode=VideoConcatMode.random,
# # max_clip_duration=5,
# # threads=2)
#
# cfg = VideoParams()
# cfg.video_aspect = VideoAspect.portrait
# cfg.font_name = "STHeitiMedium.ttc"
# cfg.font_size = 60
# cfg.stroke_color = "#000000"
# cfg.stroke_width = 1.5
# cfg.text_fore_color = "#FFFFFF"
# cfg.text_background_color = "transparent"
# cfg.bgm_type = "random"
# cfg.bgm_file = ""
# cfg.bgm_volume = 1.0
# cfg.subtitle_enabled = True
# cfg.subtitle_position = "bottom"
# cfg.n_threads = 2
# cfg.paragraph_number = 1
#
# cfg.voice_volume = 1.0
#
# generate_video(video_path=video_file,
# audio_path=audio_file,
# subtitle_path=subtitle_file,
# output_file=output_file,
# params=cfg
# )

View File

@@ -6,6 +6,7 @@ from typing import Union
from xml.sax.saxutils import unescape
import edge_tts
import requests
from edge_tts import SubMaker, submaker
from edge_tts.submaker import mktimestamp
from loguru import logger
@@ -15,8 +16,34 @@ from app.config import config
from app.utils import utils
def get_siliconflow_voices() -> list[str]:
"""
获取硅基流动的声音列表
Returns:
声音列表,格式为 ["siliconflow:FunAudioLLM/CosyVoice2-0.5B:alex", ...]
"""
# 硅基流动的声音列表和对应的性别(用于显示)
voices_with_gender = [
("FunAudioLLM/CosyVoice2-0.5B", "alex", "Male"),
("FunAudioLLM/CosyVoice2-0.5B", "anna", "Female"),
("FunAudioLLM/CosyVoice2-0.5B", "bella", "Female"),
("FunAudioLLM/CosyVoice2-0.5B", "benjamin", "Male"),
("FunAudioLLM/CosyVoice2-0.5B", "charles", "Male"),
("FunAudioLLM/CosyVoice2-0.5B", "claire", "Female"),
("FunAudioLLM/CosyVoice2-0.5B", "david", "Male"),
("FunAudioLLM/CosyVoice2-0.5B", "diana", "Female"),
]
# 添加siliconflow:前缀,并格式化为显示名称
return [
f"siliconflow:{model}:{voice}-{gender}"
for model, voice, gender in voices_with_gender
]
def get_all_azure_voices(filter_locals=None) -> list[str]:
voices_str = """
azure_voices_str = """
Name: af-ZA-AdriNeural
Gender: Female
@@ -1015,7 +1042,7 @@ Gender: Female
# 定义正则表达式模式,用于匹配 Name 和 Gender 行
pattern = re.compile(r"Name:\s*(.+)\s*Gender:\s*(.+)\s*", re.MULTILINE)
# 使用正则表达式查找所有匹配项
matches = pattern.findall(voices_str)
matches = pattern.findall(azure_voices_str)
for name, gender in matches:
# 应用过滤条件
@@ -1045,11 +1072,37 @@ def is_azure_v2_voice(voice_name: str):
return ""
def is_siliconflow_voice(voice_name: str):
"""检查是否是硅基流动的声音"""
return voice_name.startswith("siliconflow:")
def tts(
text: str, voice_name: str, voice_rate: float, voice_file: str
text: str,
voice_name: str,
voice_rate: float,
voice_file: str,
voice_volume: float = 1.0,
) -> Union[SubMaker, None]:
if is_azure_v2_voice(voice_name):
return azure_tts_v2(text, voice_name, voice_file)
elif is_siliconflow_voice(voice_name):
# 从voice_name中提取模型和声音
# 格式: siliconflow:model:voice-Gender
parts = voice_name.split(":")
if len(parts) >= 3:
model = parts[1]
# 移除性别后缀,例如 "alex-Male" -> "alex"
voice_with_gender = parts[2]
voice = voice_with_gender.split("-")[0]
# 构建完整的voice参数格式为 "model:voice"
full_voice = f"{model}:{voice}"
return siliconflow_tts(
text, model, full_voice, voice_rate, voice_file, voice_volume
)
else:
logger.error(f"Invalid siliconflow voice name format: {voice_name}")
return None
return azure_tts_v1(text, voice_name, voice_rate, voice_file)
@@ -1098,6 +1151,144 @@ def azure_tts_v1(
return None
def siliconflow_tts(
text: str,
model: str,
voice: str,
voice_rate: float,
voice_file: str,
voice_volume: float = 1.0,
) -> Union[SubMaker, None]:
"""
使用硅基流动的API生成语音
Args:
text: 要转换为语音的文本
model: 模型名称,如 "FunAudioLLM/CosyVoice2-0.5B"
voice: 声音名称,如 "FunAudioLLM/CosyVoice2-0.5B:alex"
voice_rate: 语音速度,范围[0.25, 4.0]
voice_file: 输出的音频文件路径
voice_volume: 语音音量,范围[0.6, 5.0],需要转换为硅基流动的增益范围[-10, 10]
Returns:
SubMaker对象或None
"""
text = text.strip()
api_key = config.siliconflow.get("api_key", "")
if not api_key:
logger.error("SiliconFlow API key is not set")
return None
# 将voice_volume转换为硅基流动的增益范围
# 默认voice_volume为1.0对应gain为0
gain = voice_volume - 1.0
# 确保gain在[-10, 10]范围内
gain = max(-10, min(10, gain))
url = "https://api.siliconflow.cn/v1/audio/speech"
payload = {
"model": model,
"input": text,
"voice": voice,
"response_format": "mp3",
"sample_rate": 32000,
"stream": False,
"speed": voice_rate,
"gain": gain,
}
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
for i in range(3): # 尝试3次
try:
logger.info(
f"start siliconflow tts, model: {model}, voice: {voice}, try: {i + 1}"
)
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 200:
# 保存音频文件
with open(voice_file, "wb") as f:
f.write(response.content)
# 创建一个空的SubMaker对象
sub_maker = SubMaker()
# 获取音频文件的实际长度
try:
# 尝试使用moviepy获取音频长度
from moviepy import AudioFileClip
audio_clip = AudioFileClip(voice_file)
audio_duration = audio_clip.duration
audio_clip.close()
# 将音频长度转换为100纳秒单位与edge_tts兼容
audio_duration_100ns = int(audio_duration * 10000000)
# 使用文本分割来创建更准确的字幕
# 将文本按标点符号分割成句子
sentences = utils.split_string_by_punctuations(text)
if sentences:
# 计算每个句子的大致时长(按字符数比例分配)
total_chars = sum(len(s) for s in sentences)
char_duration = (
audio_duration_100ns / total_chars if total_chars > 0 else 0
)
current_offset = 0
for sentence in sentences:
if not sentence.strip():
continue
# 计算当前句子的时长
sentence_chars = len(sentence)
sentence_duration = int(sentence_chars * char_duration)
# 添加到SubMaker
sub_maker.subs.append(sentence)
sub_maker.offset.append(
(current_offset, current_offset + sentence_duration)
)
# 更新偏移量
current_offset += sentence_duration
else:
# 如果无法分割,则使用整个文本作为一个字幕
sub_maker.subs = [text]
sub_maker.offset = [(0, audio_duration_100ns)]
except Exception as e:
logger.warning(f"Failed to create accurate subtitles: {str(e)}")
# 回退到简单的字幕
sub_maker.subs = [text]
# 使用音频文件的实际长度如果无法获取则假设为10秒
sub_maker.offset = [
(
0,
audio_duration_100ns
if "audio_duration_100ns" in locals()
else 10000000,
)
]
logger.success(f"siliconflow tts succeeded: {voice_file}")
print("s", sub_maker.subs, sub_maker.offset)
return sub_maker
else:
logger.error(
f"siliconflow tts failed with status code {response.status_code}: {response.text}"
)
except Exception as e:
logger.error(f"siliconflow tts failed: {str(e)}")
return None
def azure_tts_v2(text: str, voice_name: str, voice_file: str) -> Union[SubMaker, None]:
voice_name = is_azure_v2_voice(voice_name)
if not voice_name:
@@ -1219,7 +1410,7 @@ def create_subtitle(sub_maker: submaker.SubMaker, text: str, subtitle_file: str)
"""
start_t = mktimestamp(start_time).replace(".", ",")
end_t = mktimestamp(end_time).replace(".", ",")
return f"{idx}\n" f"{start_t} --> {end_t}\n" f"{sub_text}\n"
return f"{idx}\n{start_t} --> {end_t}\n{sub_text}\n"
start_time = -1.0
sub_items = []

View File

@@ -1,6 +1,7 @@
import json
import locale
import os
from pathlib import Path
import threading
from typing import Any
from uuid import uuid4
@@ -226,4 +227,4 @@ def load_locales(i18n_dir):
def parse_extension(filename):
return os.path.splitext(filename)[1].strip().lower().replace(".", "")
return Path(filename).suffix.lower().lstrip('.')

View File

@@ -193,6 +193,11 @@ compute_type = "int8"
speech_key = ""
speech_region = ""
[siliconflow]
# SiliconFlow API Key
# Get your API key at https://siliconflow.cn
api_key = ""
[ui]
# UI related settings
# 是否隐藏日志信息

39
test/README.md Normal file
View File

@@ -0,0 +1,39 @@
# MoneyPrinterTurbo Test Directory
This directory contains unit tests for the **MoneyPrinterTurbo** project.
## Directory Structure
- `services/`: Tests for components in the `app/services` directory
- `test_video.py`: Tests for the video service
- `test_task.py`: Tests for the task service
## Running Tests
You can run the tests using Pythons built-in `unittest` framework:
```bash
# Run all tests
python -m unittest discover -s test
# Run a specific test file
python -m unittest test/services/test_video.py
# Run a specific test class
python -m unittest test.services.test_video.TestVideoService
# Run a specific test method
python -m unittest test.services.test_video.TestVideoService.test_preprocess_video
````
## Adding New Tests
To add tests for other components, follow these guidelines:
1. Create test files prefixed with `test_` in the appropriate subdirectory
2. Use `unittest.TestCase` as the base class for your test classes
3. Name test methods with the `test_` prefix
## Test Resources
Place any resource files required for testing in the `test/resources` directory.

1
test/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Unit test package for test

BIN
test/resources/1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.6 KiB

BIN
test/resources/1.png.mp4 Normal file

Binary file not shown.

BIN
test/resources/2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.1 KiB

BIN
test/resources/2.png.mp4 Normal file

Binary file not shown.

BIN
test/resources/3.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.8 KiB

BIN
test/resources/3.png.mp4 Normal file

Binary file not shown.

BIN
test/resources/4.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.6 KiB

BIN
test/resources/5.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.8 KiB

BIN
test/resources/6.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.8 KiB

BIN
test/resources/7.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.4 KiB

BIN
test/resources/8.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.3 KiB

BIN
test/resources/9.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.0 KiB

View File

@@ -0,0 +1 @@
# Unit test package for services

View File

@@ -0,0 +1,66 @@
import unittest
import os
import sys
from pathlib import Path
# add project root to python path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from app.services import task as tm
from app.models.schema import MaterialInfo, VideoParams
resources_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "resources")
class TestTaskService(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_task_local_materials(self):
task_id = "00000000-0000-0000-0000-000000000000"
video_materials=[]
for i in range(1, 4):
video_materials.append(MaterialInfo(
provider="local",
url=os.path.join(resources_dir, f"{i}.png"),
duration=0
))
params = VideoParams(
video_subject="金钱的作用",
video_script="金钱不仅是交换媒介,更是社会资源的分配工具。它能满足基本生存需求,如食物和住房,也能提供教育、医疗等提升生活品质的机会。拥有足够的金钱意味着更多选择权,比如职业自由或创业可能。但金钱的作用也有边界,它无法直接购买幸福、健康或真诚的人际关系。过度追逐财富可能导致价值观扭曲,忽视精神层面的需求。理想的状态是理性看待金钱,将其作为实现目标的工具而非终极目的。",
video_terms="money importance, wealth and society, financial freedom, money and happiness, role of money",
video_aspect="9:16",
video_concat_mode="random",
video_transition_mode="None",
video_clip_duration=3,
video_count=1,
video_source="local",
video_materials=video_materials,
video_language="",
voice_name="zh-CN-XiaoxiaoNeural-Female",
voice_volume=1.0,
voice_rate=1.0,
bgm_type="random",
bgm_file="",
bgm_volume=0.2,
subtitle_enabled=True,
subtitle_position="bottom",
custom_position=70.0,
font_name="MicrosoftYaHeiBold.ttc",
text_fore_color="#FFFFFF",
text_background_color=True,
font_size=60,
stroke_color="#000000",
stroke_width=1.5,
n_threads=2,
paragraph_number=1
)
result = tm.start(task_id=task_id, params=params)
print(result)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,85 @@
import unittest
import os
import sys
from pathlib import Path
from moviepy import (
VideoFileClip,
)
# add project root to python path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from app.models.schema import MaterialInfo
from app.services import video as vd
from app.utils import utils
resources_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "resources")
class TestVideoService(unittest.TestCase):
def setUp(self):
self.test_img_path = os.path.join(resources_dir, "1.png")
def tearDown(self):
pass
def test_preprocess_video(self):
if not os.path.exists(self.test_img_path):
self.fail(f"test image not found: {self.test_img_path}")
# test preprocess_video function
m = MaterialInfo()
m.url = self.test_img_path
m.provider = "local"
print(m)
materials = vd.preprocess_video([m], clip_duration=4)
print(materials)
# verify result
self.assertIsNotNone(materials)
self.assertEqual(len(materials), 1)
self.assertTrue(materials[0].url.endswith(".mp4"))
# moviepy get video info
clip = VideoFileClip(materials[0].url)
print(clip)
# clean generated test video file
if os.path.exists(materials[0].url):
os.remove(materials[0].url)
def test_wrap_text(self):
"""test text wrapping function"""
try:
font_path = os.path.join(utils.font_dir(), "STHeitiMedium.ttc")
if not os.path.exists(font_path):
self.fail(f"font file not found: {font_path}")
# test english text wrapping
test_text_en = "This is a test text for wrapping long sentences in english language"
wrapped_text_en, text_height_en = vd.wrap_text(
text=test_text_en,
max_width=300,
font=font_path,
fontsize=30
)
print(wrapped_text_en, text_height_en)
# verify text is wrapped
self.assertIn("\n", wrapped_text_en)
# test chinese text wrapping
test_text_zh = "这是一段用来测试中文长句换行的文本内容,应该会根据宽度限制进行换行处理"
wrapped_text_zh, text_height_zh = vd.wrap_text(
text=test_text_zh,
max_width=300,
font=font_path,
fontsize=30
)
print(wrapped_text_zh, text_height_zh)
# verify chinese text is wrapped
self.assertIn("\n", wrapped_text_zh)
except Exception as e:
self.fail(f"test wrap_text failed: {str(e)}")
if __name__ == "__main__":
unittest.main()

View File

@@ -107,6 +107,7 @@ support_locales = [
"th-TH",
]
def get_all_fonts():
fonts = []
for root, dirs, files in os.walk(font_dir):
@@ -197,7 +198,8 @@ def tr(key):
loc = locales.get(st.session_state["ui_language"], {})
return loc.get("Translation", {}).get(key, key)
# 创建基础设置折叠框
# 创建基础设置折叠框
if not config.app.get("hide_config", False):
with st.expander(tr("Basic Settings"), expanded=False):
config_panels = st.columns(3)
@@ -423,31 +425,31 @@ if not config.app.get("hide_config", False):
# 右侧面板 - API 密钥设置
with right_config_panel:
def get_keys_from_config(cfg_key):
api_keys = config.app.get(cfg_key, [])
if isinstance(api_keys, str):
api_keys = [api_keys]
api_key = ", ".join(api_keys)
return api_key
def get_keys_from_config(cfg_key):
api_keys = config.app.get(cfg_key, [])
if isinstance(api_keys, str):
api_keys = [api_keys]
api_key = ", ".join(api_keys)
return api_key
def save_keys_to_config(cfg_key, value):
value = value.replace(" ", "")
if value:
config.app[cfg_key] = value.split(",")
def save_keys_to_config(cfg_key, value):
value = value.replace(" ", "")
if value:
config.app[cfg_key] = value.split(",")
st.write(tr("Video Source Settings"))
st.write(tr("Video Source Settings"))
pexels_api_key = get_keys_from_config("pexels_api_keys")
pexels_api_key = st.text_input(
tr("Pexels API Key"), value=pexels_api_key, type="password"
)
save_keys_to_config("pexels_api_keys", pexels_api_key)
pexels_api_key = get_keys_from_config("pexels_api_keys")
pexels_api_key = st.text_input(
tr("Pexels API Key"), value=pexels_api_key, type="password"
)
save_keys_to_config("pexels_api_keys", pexels_api_key)
pixabay_api_key = get_keys_from_config("pixabay_api_keys")
pixabay_api_key = st.text_input(
tr("Pixabay API Key"), value=pixabay_api_key, type="password"
)
save_keys_to_config("pixabay_api_keys", pixabay_api_key)
pixabay_api_key = get_keys_from_config("pixabay_api_keys")
pixabay_api_key = st.text_input(
tr("Pixabay API Key"), value=pixabay_api_key, type="password"
)
save_keys_to_config("pixabay_api_keys", pixabay_api_key)
llm_provider = config.app.get("llm_provider", "").lower()
panel = st.columns(3)
@@ -615,42 +617,103 @@ with middle_panel:
with st.container(border=True):
st.write(tr("Audio Settings"))
# tts_providers = ['edge', 'azure']
# tts_provider = st.selectbox(tr("TTS Provider"), tts_providers)
# 添加TTS服务器选择下拉框
tts_servers = [
("azure-tts-v1", "Azure TTS V1"),
("azure-tts-v2", "Azure TTS V2"),
("siliconflow", "SiliconFlow TTS"),
]
# 获取保存的TTS服务器默认为v1
saved_tts_server = config.ui.get("tts_server", "azure-tts-v1")
saved_tts_server_index = 0
for i, (server_value, _) in enumerate(tts_servers):
if server_value == saved_tts_server:
saved_tts_server_index = i
break
selected_tts_server_index = st.selectbox(
tr("TTS Servers"),
options=range(len(tts_servers)),
format_func=lambda x: tts_servers[x][1],
index=saved_tts_server_index,
)
selected_tts_server = tts_servers[selected_tts_server_index][0]
config.ui["tts_server"] = selected_tts_server
# 根据选择的TTS服务器获取声音列表
filtered_voices = []
if selected_tts_server == "siliconflow":
# 获取硅基流动的声音列表
filtered_voices = voice.get_siliconflow_voices()
else:
# 获取Azure的声音列表
all_voices = voice.get_all_azure_voices(filter_locals=None)
# 根据选择的TTS服务器筛选声音
for v in all_voices:
if selected_tts_server == "azure-tts-v2":
# V2版本的声音名称中包含"v2"
if "V2" in v:
filtered_voices.append(v)
else:
# V1版本的声音名称中不包含"v2"
if "V2" not in v:
filtered_voices.append(v)
voices = voice.get_all_azure_voices(filter_locals=None)
friendly_names = {
v: v.replace("Female", tr("Female"))
.replace("Male", tr("Male"))
.replace("Neural", "")
for v in voices
for v in filtered_voices
}
saved_voice_name = config.ui.get("voice_name", "")
saved_voice_name_index = 0
# 检查保存的声音是否在当前筛选的声音列表中
if saved_voice_name in friendly_names:
saved_voice_name_index = list(friendly_names.keys()).index(saved_voice_name)
else:
for i, v in enumerate(voices):
if (
v.lower().startswith(st.session_state["ui_language"].lower())
and "V2" not in v
):
# 如果不在则根据当前UI语言选择一个默认声音
for i, v in enumerate(filtered_voices):
if v.lower().startswith(st.session_state["ui_language"].lower()):
saved_voice_name_index = i
break
selected_friendly_name = st.selectbox(
tr("Speech Synthesis"),
options=list(friendly_names.values()),
index=saved_voice_name_index,
)
# 如果没有找到匹配的声音,使用第一个声音
if saved_voice_name_index >= len(friendly_names) and friendly_names:
saved_voice_name_index = 0
voice_name = list(friendly_names.keys())[
list(friendly_names.values()).index(selected_friendly_name)
]
params.voice_name = voice_name
config.ui["voice_name"] = voice_name
# 确保有声音可选
if friendly_names:
selected_friendly_name = st.selectbox(
tr("Speech Synthesis"),
options=list(friendly_names.values()),
index=min(saved_voice_name_index, len(friendly_names) - 1)
if friendly_names
else 0,
)
if st.button(tr("Play Voice")):
voice_name = list(friendly_names.keys())[
list(friendly_names.values()).index(selected_friendly_name)
]
params.voice_name = voice_name
config.ui["voice_name"] = voice_name
else:
# 如果没有声音可选,显示提示信息
st.warning(
tr(
"No voices available for the selected TTS server. Please select another server."
)
)
params.voice_name = ""
config.ui["voice_name"] = ""
# 只有在有声音可选时才显示试听按钮
if friendly_names and st.button(tr("Play Voice")):
play_content = params.video_subject
if not play_content:
play_content = params.video_script
@@ -664,6 +727,7 @@ with middle_panel:
voice_name=voice_name,
voice_rate=params.voice_rate,
voice_file=audio_file,
voice_volume=params.voice_volume,
)
# if the voice file generation failed, try again with a default content.
if not sub_maker:
@@ -673,6 +737,7 @@ with middle_panel:
voice_name=voice_name,
voice_rate=params.voice_rate,
voice_file=audio_file,
voice_volume=params.voice_volume,
)
if sub_maker and os.path.exists(audio_file):
@@ -680,7 +745,10 @@ with middle_panel:
if os.path.exists(audio_file):
os.remove(audio_file)
if voice.is_azure_v2_voice(voice_name):
# 当选择V2版本或者声音是V2声音时显示服务区域和API key输入框
if selected_tts_server == "azure-tts-v2" or (
voice_name and voice.is_azure_v2_voice(voice_name)
):
saved_azure_speech_region = config.azure.get("speech_region", "")
saved_azure_speech_key = config.azure.get("speech_key", "")
azure_speech_region = st.text_input(
@@ -697,6 +765,32 @@ with middle_panel:
config.azure["speech_region"] = azure_speech_region
config.azure["speech_key"] = azure_speech_key
# 当选择硅基流动时显示API key输入框和说明信息
if selected_tts_server == "siliconflow" or (
voice_name and voice.is_siliconflow_voice(voice_name)
):
saved_siliconflow_api_key = config.siliconflow.get("api_key", "")
siliconflow_api_key = st.text_input(
tr("SiliconFlow API Key"),
value=saved_siliconflow_api_key,
type="password",
key="siliconflow_api_key_input",
)
# 显示硅基流动的说明信息
st.info(
tr("SiliconFlow TTS Settings")
+ ":\n"
+ "- "
+ tr("Speed: Range [0.25, 4.0], default is 1.0")
+ "\n"
+ "- "
+ tr("Volume: Uses Speech Volume setting, default 1.0 maps to gain 0")
)
config.siliconflow["api_key"] = siliconflow_api_key
params.voice_volume = st.selectbox(
tr("Speech Volume"),
options=[0.6, 0.8, 1.0, 1.2, 1.5, 2.0, 3.0, 4.0, 5.0],

View File

@@ -91,6 +91,12 @@
"Voice Example": "Dies ist ein Beispieltext zum Testen der Sprachsynthese",
"Synthesizing Voice": "Sprachsynthese läuft, bitte warten...",
"TTS Provider": "Sprachsynthese-Anbieter auswählen",
"TTS Servers": "TTS-Server",
"No voices available for the selected TTS server. Please select another server.": "Keine Stimmen für den ausgewählten TTS-Server verfügbar. Bitte wählen Sie einen anderen Server.",
"SiliconFlow API Key": "SiliconFlow API-Schlüssel",
"SiliconFlow TTS Settings": "SiliconFlow TTS-Einstellungen",
"Speed: Range [0.25, 4.0], default is 1.0": "Geschwindigkeit: Bereich [0.25, 4.0], Standardwert ist 1.0",
"Volume: Uses Speech Volume setting, default 1.0 maps to gain 0": "Lautstärke: Verwendet die Sprachlautstärke-Einstellung, Standardwert 1.0 entspricht Verstärkung 0",
"Hide Log": "Protokoll ausblenden",
"Hide Basic Settings": "Basis-Einstellungen ausblenden\n\nWenn diese Option deaktiviert ist, wird die Basis-Einstellungen-Leiste nicht auf der Seite angezeigt.\n\nWenn Sie sie erneut anzeigen möchten, setzen Sie `hide_config = false` in `config.toml`",
"LLM Settings": "**LLM-Einstellungen**",

View File

@@ -91,6 +91,12 @@
"Voice Example": "This is an example text for testing speech synthesis",
"Synthesizing Voice": "Synthesizing voice, please wait...",
"TTS Provider": "Select the voice synthesis provider",
"TTS Servers": "TTS Servers",
"No voices available for the selected TTS server. Please select another server.": "No voices available for the selected TTS server. Please select another server.",
"SiliconFlow API Key": "SiliconFlow API Key [Click to get](https://cloud.siliconflow.cn/account/ak)",
"SiliconFlow TTS Settings": "SiliconFlow TTS Settings",
"Speed: Range [0.25, 4.0], default is 1.0": "Speed: Range [0.25, 4.0], default is 1.0",
"Volume: Uses Speech Volume setting, default 1.0 maps to gain 0": "Volume: Uses Speech Volume setting, default 1.0 maps to gain 0",
"Hide Log": "Hide Log",
"Hide Basic Settings": "Hide Basic Settings\n\nHidden, the basic settings panel will not be displayed on the page.\n\nIf you need to display it again, please set `hide_config = false` in `config.toml`",
"LLM Settings": "**LLM Settings**",

View File

@@ -91,6 +91,12 @@
"Voice Example": "Este é um exemplo de texto para testar a síntese de fala",
"Synthesizing Voice": "Sintetizando voz, por favor aguarde...",
"TTS Provider": "Selecione o provedor de síntese de voz",
"TTS Servers": "Servidores TTS",
"No voices available for the selected TTS server. Please select another server.": "Não há vozes disponíveis para o servidor TTS selecionado. Por favor, selecione outro servidor.",
"SiliconFlow API Key": "Chave API do SiliconFlow",
"SiliconFlow TTS Settings": "Configurações do SiliconFlow TTS",
"Speed: Range [0.25, 4.0], default is 1.0": "Velocidade: Intervalo [0.25, 4.0], o padrão é 1.0",
"Volume: Uses Speech Volume setting, default 1.0 maps to gain 0": "Volume: Usa a configuração de Volume de Fala, o padrão 1.0 corresponde ao ganho 0",
"Hide Log": "Ocultar Log",
"Hide Basic Settings": "Ocultar Configurações Básicas\n\nOculto, o painel de configurações básicas não será exibido na página.\n\nSe precisar exibi-lo novamente, defina `hide_config = false` em `config.toml`",
"LLM Settings": "**Configurações do LLM**",

View File

@@ -91,6 +91,12 @@
"Voice Example": "Đây là văn bản mẫu để kiểm tra tổng hợp giọng nói",
"Synthesizing Voice": "Đang tổng hợp giọng nói, vui lòng đợi...",
"TTS Provider": "Chọn nhà cung cấp tổng hợp giọng nói",
"TTS Servers": "Máy chủ TTS",
"No voices available for the selected TTS server. Please select another server.": "Không có giọng nói nào cho máy chủ TTS đã chọn. Vui lòng chọn máy chủ khác.",
"SiliconFlow API Key": "Khóa API SiliconFlow",
"SiliconFlow TTS Settings": "Cài đặt SiliconFlow TTS",
"Speed: Range [0.25, 4.0], default is 1.0": "Tốc độ: Phạm vi [0.25, 4.0], mặc định là 1.0",
"Volume: Uses Speech Volume setting, default 1.0 maps to gain 0": "Âm lượng: Sử dụng cài đặt Âm lượng Giọng nói, mặc định 1.0 tương ứng với tăng ích 0",
"Hide Log": "Ẩn Nhật Ký",
"Hide Basic Settings": "Ẩn Cài Đặt Cơ Bản\n\nẨn, thanh cài đặt cơ bản sẽ không hiển thị trên trang web.\n\nNếu bạn muốn hiển thị lại, vui lòng đặt `hide_config = false` trong `config.toml`",
"LLM Settings": "**Cài Đặt LLM**",

View File

@@ -91,6 +91,12 @@
"Voice Example": "这是一段测试语音合成的示例文本",
"Synthesizing Voice": "语音合成中,请稍候...",
"TTS Provider": "语音合成提供商",
"TTS Servers": "TTS服务器",
"No voices available for the selected TTS server. Please select another server.": "当前选择的TTS服务器没有可用的声音请选择其他服务器。",
"SiliconFlow API Key": "硅基流动API密钥 [点击获取](https://cloud.siliconflow.cn/account/ak)",
"SiliconFlow TTS Settings": "硅基流动TTS设置",
"Speed: Range [0.25, 4.0], default is 1.0": "语速范围 [0.25, 4.0]默认值为1.0",
"Volume: Uses Speech Volume setting, default 1.0 maps to gain 0": "音量使用朗读音量设置默认值1.0对应增益0",
"Hide Log": "隐藏日志",
"Hide Basic Settings": "隐藏基础设置\n\n隐藏后基础设置面板将不会显示在页面中。\n\n如需要再次显示请在 `config.toml` 中设置 `hide_config = false`",
"LLM Settings": "**大模型设置**",