1, 支持AI生成文案预览

2, 支持自定义视频文案,关键词
3, 可选择是否启用字幕
4, UI优化
5, 一些其他bug修复和优化
This commit is contained in:
harry
2024-03-22 17:46:56 +08:00
parent 4a800eab4b
commit ce4b3771b6
10 changed files with 301 additions and 186 deletions

View File

@@ -1,4 +1,4 @@
punctuations = [
"?", ",", ".", "", ";",
"", "", "", "", "",
"?", ",", ".", "", ";", ":",
"", "", "", "", "", "",
]

View File

@@ -8,6 +8,11 @@ import warnings
warnings.filterwarnings("ignore", category=UserWarning, message="Field name.*shadows an attribute in parent.*")
class VideoConcatMode(str, Enum):
random = "random"
sequential = "sequential"
class VideoAspect(str, Enum):
landscape = "16:9"
portrait = "9:16"
@@ -23,6 +28,12 @@ class VideoAspect(str, Enum):
return 1080, 1920
class MaterialInfo:
provider: str = "pexels"
url: str = ""
duration: int = 0
VoiceNames = [
# zh-CN
"female-zh-CN-XiaoxiaoNeural",
@@ -77,11 +88,21 @@ class VideoParams:
}
"""
video_subject: str
video_script: str = "" # 用于生成视频的脚本
video_terms: str = "" # 用于生成视频的关键词
video_aspect: Optional[VideoAspect] = VideoAspect.portrait.value
video_concat_mode: Optional[VideoConcatMode] = VideoConcatMode.random.value
video_clip_duration: Optional[int] = 5
voice_name: Optional[str] = VoiceNames[0]
bgm_name: Optional[str] = "random"
bgm_type: Optional[str] = "random"
bgm_file: Optional[str] = ""
subtitle_enabled: Optional[bool] = True
font_name: Optional[str] = "STHeitiMedium.ttc"
text_fore_color: Optional[str] = "#FFFFFF"
text_background_color: Optional[str] = "transparent"
font_size: int = 60
stroke_color: Optional[str] = "#000000"
stroke_width: float = 1.5

View File

@@ -1,12 +1,13 @@
import random
import time
from urllib.parse import urlencode
import requests
from typing import List
from loguru import logger
from app.config import config
from app.models.schema import VideoAspect
from app.models.schema import VideoAspect, VideoConcatMode, MaterialInfo
from app.utils import utils
requested_count = 0
@@ -22,11 +23,9 @@ def round_robin_api_key():
def search_videos(search_term: str,
wanted_count: int,
minimum_duration: int,
video_aspect: VideoAspect = VideoAspect.portrait,
locale: str = "zh-CN"
) -> List[str]:
) -> List[MaterialInfo]:
aspect = VideoAspect(video_aspect)
video_orientation = aspect.name
video_width, video_height = aspect.to_resolution()
@@ -36,37 +35,45 @@ def search_videos(search_term: str,
}
proxies = config.pexels.get("proxies", None)
# Build URL
query_url = f"https://api.pexels.com/videos/search?query={search_term}&per_page=15&orientation={video_orientation}&locale={locale}"
params = {
"query": search_term,
"per_page": 20,
"orientation": video_orientation
}
query_url = f"https://api.pexels.com/videos/search?{urlencode(params)}"
logger.info(f"searching videos: {query_url}, with proxies: {proxies}")
# Send the request
r = requests.get(query_url, headers=headers, proxies=proxies, verify=False)
# Parse the response
response = r.json()
video_urls = []
try:
videos_count = min(len(response["videos"]), wanted_count)
r = requests.get(query_url, headers=headers, proxies=proxies, verify=False)
response = r.json()
video_items = []
if "videos" not in response:
logger.error(f"search videos failed: {response}")
return video_items
videos = response["videos"]
# loop through each video in the result
for i in range(videos_count):
for v in videos:
duration = v["duration"]
# check if video has desired minimum duration
if response["videos"][i]["duration"] < minimum_duration:
if duration < minimum_duration:
continue
video_files = response["videos"][i]["video_files"]
video_files = v["video_files"]
# loop through each url to determine the best quality
for video in video_files:
# Check if video has a valid download link
# if ".com/external" in video["link"]:
w = int(video["width"])
h = int(video["height"])
if w == video_width and h == video_height:
video_urls.append(video["link"])
item = MaterialInfo()
item.provider = "pexels"
item.url = video["link"]
item.duration = duration
video_items.append(item)
break
return video_items
except Exception as e:
logger.error(f"search videos failed: {e}")
return video_urls
return []
def save_video(video_url: str, save_dir: str) -> str:
@@ -82,41 +89,46 @@ def save_video(video_url: str, save_dir: str) -> str:
def download_videos(task_id: str,
search_terms: List[str],
video_aspect: VideoAspect = VideoAspect.portrait,
wanted_count: int = 15,
minimum_duration: int = 5
video_contact_mode: VideoConcatMode = VideoConcatMode.random,
audio_duration: float = 0.0,
max_clip_duration: int = 5,
) -> List[str]:
valid_video_items = []
valid_video_urls = []
video_concat_mode = config.pexels.get("video_concat_mode", "")
found_duration = 0.0
for search_term in search_terms:
# logger.info(f"searching videos for '{search_term}'")
video_urls = search_videos(search_term=search_term,
wanted_count=wanted_count,
minimum_duration=minimum_duration,
video_aspect=video_aspect)
logger.info(f"found {len(video_urls)} videos for '{search_term}'")
video_items = search_videos(search_term=search_term,
minimum_duration=max_clip_duration,
video_aspect=video_aspect)
logger.info(f"found {len(video_items)} videos for '{search_term}'")
i = 0
for url in video_urls:
if video_concat_mode == "random":
url = random.choice(video_urls)
for item in video_items:
if item.url not in valid_video_urls:
valid_video_items.append(item)
valid_video_urls.append(item.url)
found_duration += item.duration
if url not in valid_video_urls:
valid_video_urls.append(url)
i += 1
if i >= 3:
break
logger.info(f"downloading videos: {len(valid_video_urls)}")
logger.info(
f"found total videos: {len(valid_video_items)}, required duration: {audio_duration} seconds, found duration: {found_duration} seconds")
video_paths = []
save_dir = utils.task_dir(task_id)
for video_url in valid_video_urls:
if video_contact_mode.value == VideoConcatMode.random.value:
random.shuffle(valid_video_items)
total_duration = 0.0
for item in valid_video_items:
try:
saved_video_path = save_video(video_url, save_dir)
logger.info(f"downloading video: {item.url}")
saved_video_path = save_video(item.url, save_dir)
video_paths.append(saved_video_path)
seconds = min(max_clip_duration, item.duration)
total_duration += seconds
if total_duration > audio_duration:
logger.info(f"total duration of downloaded videos: {total_duration} seconds, skip downloading more")
break
except Exception as e:
logger.error(f"failed to download video: {video_url}, {e}")
logger.error(f"failed to download video: {item}, {e}")
logger.success(f"downloaded {len(video_paths)} videos")
return video_paths

View File

@@ -106,7 +106,7 @@ def create(audio_file, subtitle_file: str = ""):
idx += 1
sub = "\n".join(lines)
with open(subtitle_file, "w") as f:
with open(subtitle_file, "w", encoding="utf-8") as f:
f.write(sub)
logger.info(f"subtitle file created: {subtitle_file}")
@@ -116,7 +116,7 @@ def file_to_subtitles(filename):
current_times = None
current_text = ""
index = 0
with open(filename, 'r') as f:
with open(filename, 'r', encoding="utf-8") as f:
for line in f:
times = re.findall("([0-9]*:[0-9]*:[0-9]*,[0-9]*)", line)
if times:
@@ -145,7 +145,7 @@ def correct(subtitle_file, video_script):
corrected = True
if corrected:
with open(subtitle_file, "w") as fd:
with open(subtitle_file, "w", encoding="utf-8") as fd:
for item in subtitle_items:
fd.write(f"{item[0]}\n{item[1]}\n{item[2]}\n\n")
logger.info(f"subtitle corrected")

View File

@@ -1,4 +1,5 @@
import os.path
import re
from os import path
from loguru import logger
@@ -41,77 +42,101 @@ def start(task_id, params: VideoParams):
voice_name, language = _parse_voice(params.voice_name)
paragraph_number = params.paragraph_number
n_threads = params.n_threads
max_clip_duration = params.video_clip_duration
logger.info("\n\n## generating video script")
script = llm.generate_script(video_subject=video_subject, language=language, paragraph_number=paragraph_number)
video_script = params.video_script.strip()
if not video_script:
video_script = llm.generate_script(video_subject=video_subject, language=language,
paragraph_number=paragraph_number)
else:
logger.debug(f"video script: \n{video_script}")
logger.info("\n\n## generating video terms")
search_terms = llm.generate_terms(video_subject=video_subject, video_script=script, amount=5)
video_terms = params.video_terms
if not video_terms:
video_terms = llm.generate_terms(video_subject=video_subject, video_script=video_script, amount=5)
else:
video_terms = [term.strip() for term in re.split(r'[,]', video_terms)]
logger.debug(f"video terms: {utils.to_json(video_terms)}")
script_file = path.join(utils.task_dir(task_id), f"script.json")
script_data = {
"script": script,
"search_terms": search_terms
"script": video_script,
"search_terms": video_terms
}
with open(script_file, "w") as f:
with open(script_file, "w", encoding="utf-8") as f:
f.write(utils.to_json(script_data))
audio_file = path.join(utils.task_dir(task_id), f"audio.mp3")
subtitle_path = path.join(utils.task_dir(task_id), f"subtitle.srt")
logger.info("\n\n## generating audio")
sub_maker = voice.tts(text=script, voice_name=voice_name, voice_file=audio_file)
audio_file = path.join(utils.task_dir(task_id), f"audio.mp3")
sub_maker = voice.tts(text=video_script, voice_name=voice_name, voice_file=audio_file)
if sub_maker is None:
logger.error(
"failed to generate audio, maybe the network is not available. if you are in China, please use a VPN.")
return
subtitle_provider = config.app.get("subtitle_provider", "").strip().lower()
logger.info(f"\n\n## generating subtitle, provider: {subtitle_provider}")
subtitle_fallback = False
if subtitle_provider == "edge":
voice.create_subtitle(text=script, sub_maker=sub_maker, subtitle_file=subtitle_path)
if not os.path.exists(subtitle_path):
subtitle_fallback = True
logger.warning("subtitle file not found, fallback to whisper")
audio_duration = voice.get_audio_duration(sub_maker)
subtitle_path = ""
if params.subtitle_enabled:
subtitle_path = path.join(utils.task_dir(task_id), f"subtitle.srt")
subtitle_provider = config.app.get("subtitle_provider", "").strip().lower()
logger.info(f"\n\n## generating subtitle, provider: {subtitle_provider}")
subtitle_fallback = False
if subtitle_provider == "edge":
voice.create_subtitle(text=video_script, sub_maker=sub_maker, subtitle_file=subtitle_path)
if not os.path.exists(subtitle_path):
subtitle_fallback = True
logger.warning("subtitle file not found, fallback to whisper")
else:
subtitle_lines = subtitle.file_to_subtitles(subtitle_path)
if not subtitle_lines:
logger.warning(f"subtitle file is invalid: {subtitle_path}")
subtitle_fallback = True
if subtitle_provider == "whisper" or subtitle_fallback:
subtitle.create(audio_file=audio_file, subtitle_file=subtitle_path)
logger.info("\n\n## correcting subtitle")
subtitle.correct(subtitle_file=subtitle_path, video_script=script)
if subtitle_provider == "whisper" or subtitle_fallback:
subtitle.create(audio_file=audio_file, subtitle_file=subtitle_path)
logger.info("\n\n## correcting subtitle")
subtitle.correct(subtitle_file=subtitle_path, video_script=video_script)
subtitle_lines = subtitle.file_to_subtitles(subtitle_path)
if not subtitle_lines:
logger.warning(f"subtitle file is invalid: {subtitle_path}")
subtitle_path = ""
logger.info("\n\n## downloading videos")
video_paths = material.download_videos(task_id=task_id, search_terms=search_terms, video_aspect=params.video_aspect,
wanted_count=20,
minimum_duration=5)
downloaded_videos = material.download_videos(task_id=task_id,
search_terms=video_terms,
video_aspect=params.video_aspect,
video_contact_mode=params.video_concat_mode,
audio_duration=audio_duration,
max_clip_duration=max_clip_duration,
)
if not downloaded_videos:
logger.error(
"failed to download videos, maybe the network is not available. if you are in China, please use a VPN.")
return
logger.info("\n\n## combining videos")
combined_video_path = path.join(utils.task_dir(task_id), f"combined.mp4")
video.combine_videos(combined_video_path=combined_video_path,
video_paths=video_paths,
video_paths=downloaded_videos,
audio_file=audio_file,
video_aspect=params.video_aspect,
max_clip_duration=5,
video_concat_mode=params.video_concat_mode,
max_clip_duration=max_clip_duration,
threads=n_threads)
final_video_path = path.join(utils.task_dir(task_id), f"final.mp4")
bgm_file = video.get_bgm_file(bgm_name=params.bgm_name)
logger.info("\n\n## generating video")
# Put everything together
video.generate_video(video_path=combined_video_path,
audio_path=audio_file,
subtitle_path=subtitle_path,
output_file=final_video_path,
video_aspect=params.video_aspect,
threads=n_threads,
font_name=params.font_name,
fontsize=params.font_size,
text_fore_color=params.text_fore_color,
stroke_color=params.stroke_color,
stroke_width=params.stroke_width,
bgm_file=bgm_file
params=params,
)
logger.start(f"task {task_id} finished")
return {

View File

@@ -7,22 +7,22 @@ from moviepy.editor import *
from moviepy.video.fx.crop import crop
from moviepy.video.tools.subtitles import SubtitlesClip
from app.models.schema import VideoAspect
from app.models.schema import VideoAspect, VideoParams, VideoConcatMode
from app.utils import utils
def get_bgm_file(bgm_name: str = "random"):
if not bgm_name:
def get_bgm_file(bgm_type: str = "random", bgm_file: str = ""):
if not bgm_type:
return ""
if bgm_name == "random":
if bgm_type == "random":
suffix = "*.mp3"
song_dir = utils.song_dir()
files = glob.glob(os.path.join(song_dir, suffix))
return random.choice(files)
file = os.path.join(utils.song_dir(), bgm_name)
if os.path.exists(file):
return file
if os.path.exists(bgm_file):
return bgm_file
return ""
@@ -30,6 +30,7 @@ def combine_videos(combined_video_path: str,
video_paths: List[str],
audio_file: str,
video_aspect: VideoAspect = VideoAspect.portrait,
video_concat_mode: VideoConcatMode = VideoConcatMode.random,
max_clip_duration: int = 5,
threads: int = 2,
) -> str:
@@ -48,6 +49,10 @@ def combine_videos(combined_video_path: str,
tot_dur = 0
# Add downloaded clips over and over until the duration of the audio (max_duration) has been reached
while tot_dur < max_duration:
# random video_paths order
if video_concat_mode.value == VideoConcatMode.random.value:
random.shuffle(video_paths)
for video_path in video_paths:
clip = VideoFileClip(video_path)
clip = clip.without_audio()
@@ -127,20 +132,9 @@ def generate_video(video_path: str,
audio_path: str,
subtitle_path: str,
output_file: str,
video_aspect: VideoAspect = VideoAspect.portrait,
threads: int = 2,
font_name: str = "",
fontsize: int = 60,
stroke_color: str = "#000000",
stroke_width: float = 1.5,
text_fore_color: str = "white",
text_background_color: str = "transparent",
bgm_file: str = "",
params: VideoParams,
):
aspect = VideoAspect(video_aspect)
aspect = VideoAspect(params.video_aspect)
video_width, video_height = aspect.to_resolution()
logger.info(f"start, video size: {video_width} x {video_height}")
@@ -149,31 +143,33 @@ def generate_video(video_path: str,
logger.info(f" ③ subtitle: {subtitle_path}")
logger.info(f" ④ output: {output_file}")
if not font_name:
font_name = "STHeitiMedium.ttc"
font_path = os.path.join(utils.font_dir(), font_name)
if os.name == 'nt':
font_path = font_path.replace("\\", "/")
font_path = ""
if params.subtitle_enabled:
if not params.font_name:
params.font_name = "STHeitiMedium.ttc"
font_path = os.path.join(utils.font_dir(), params.font_name)
if os.name == 'nt':
font_path = font_path.replace("\\", "/")
logger.info(f"using font: {font_path}")
logger.info(f"using font: {font_path}")
def generator(txt):
wrapped_txt = wrap_text(txt, max_width=video_width - 100,
font=font_path,
fontsize=fontsize) # 调整max_width以适应你的视频
fontsize=params.font_size) # 调整max_width以适应你的视频
return TextClip(
wrapped_txt,
font=font_path,
fontsize=fontsize,
color=text_fore_color,
bg_color=text_background_color,
stroke_color=stroke_color,
stroke_width=stroke_width,
fontsize=params.font_size,
color=params.text_fore_color,
bg_color=params.text_background_color,
stroke_color=params.stroke_color,
stroke_width=params.stroke_width,
print_cmd=False,
)
position_height = video_height - 200
if video_aspect == VideoAspect.landscape:
if params.video_aspect == VideoAspect.landscape:
position_height = video_height - 100
clips = [
@@ -191,9 +187,11 @@ def generate_video(video_path: str,
temp_output_file = f"{output_file}.temp.mp4"
logger.info(f"writing to temp file: {temp_output_file}")
result.write_videofile(temp_output_file, threads=threads or 2)
result.write_videofile(temp_output_file, threads=params.n_threads or 2)
video_clip = VideoFileClip(temp_output_file)
bgm_file = get_bgm_file(bgm_type=params.bgm_type, bgm_file=params.bgm_file)
if bgm_file:
logger.info(f"adding background music: {bgm_file}")
# Add song to video at 30% volume using moviepy
@@ -209,35 +207,7 @@ def generate_video(video_path: str,
video_clip = video_clip.set_duration(original_duration)
logger.info(f"encoding audio codec to aac")
video_clip.write_videofile(output_file, audio_codec="aac", threads=threads)
video_clip.write_videofile(output_file, audio_codec="aac", threads=params.n_threads or 2)
os.remove(temp_output_file)
logger.success(f"completed")
if __name__ == "__main__":
txt = "hello 幸福经常被描述为最终人生目标和人类追求的核心 但它通常涉及对个人生活中意义和目的的深刻感悟"
font = utils.resource_dir() + "/fonts/STHeitiMedium.ttc"
t = wrap_text(text=txt, max_width=1000, font=font, fontsize=60)
print(t)
task_id = "c12fd1e6-4b0a-4d65-a075-c87abe35a072"
task_dir = utils.task_dir(task_id)
video_file = f"{task_dir}/combined.mp4"
audio_file = f"{task_dir}/audio.mp3"
subtitle_file = f"{task_dir}/subtitle.srt"
output_file = f"{task_dir}/final.mp4"
generate_video(video_path=video_file,
audio_path=audio_file,
subtitle_path=subtitle_file,
output_file=output_file,
video_aspect=VideoAspect.portrait,
threads=2,
font_name="STHeitiMedium.ttc",
fontsize=60,
stroke_color="#000000",
stroke_width=1.5,
text_fore_color="white",
text_background_color="transparent",
bgm_file=""
)

View File

@@ -8,23 +8,26 @@ import edge_tts
from app.utils import utils
def tts(text: str, voice_name: str, voice_file: str) -> SubMaker:
def tts(text: str, voice_name: str, voice_file: str) -> [SubMaker, None]:
logger.info(f"start, voice name: {voice_name}")
try:
async def _do() -> SubMaker:
communicate = edge_tts.Communicate(text, voice_name)
sub_maker = edge_tts.SubMaker()
with open(voice_file, "wb") as file:
async for chunk in communicate.stream():
if chunk["type"] == "audio":
file.write(chunk["data"])
elif chunk["type"] == "WordBoundary":
sub_maker.create_sub((chunk["offset"], chunk["duration"]), chunk["text"])
return sub_maker
async def _do() -> SubMaker:
communicate = edge_tts.Communicate(text, voice_name)
sub_maker = edge_tts.SubMaker()
with open(voice_file, "wb") as file:
async for chunk in communicate.stream():
if chunk["type"] == "audio":
file.write(chunk["data"])
elif chunk["type"] == "WordBoundary":
sub_maker.create_sub((chunk["offset"], chunk["duration"]), chunk["text"])
sub_maker = asyncio.run(_do())
logger.info(f"completed, output file: {voice_file}")
return sub_maker
sub_maker = asyncio.run(_do())
logger.info(f"completed, output file: {voice_file}")
return sub_maker
except Exception as e:
logger.error(f"failed, error: {e}")
return None
def create_subtitle(sub_maker: submaker.SubMaker, text: str, subtitle_file: str):
@@ -78,6 +81,15 @@ def create_subtitle(sub_maker: submaker.SubMaker, text: str, subtitle_file: str)
file.write("\n".join(sub_items))
def get_audio_duration(sub_maker: submaker.SubMaker):
"""
获取音频时长
"""
if not sub_maker.offset:
return 0.0
return sub_maker.offset[-1][1] / 10000000
if __name__ == "__main__":
async def _do():
@@ -102,6 +114,8 @@ if __name__ == "__main__":
subtitle_file = f"{temp_dir}/tts.mp3.srt"
sub_maker = tts(text=text, voice_name=voice_name, voice_file=voice_file)
create_subtitle(sub_maker=sub_maker, text=text, subtitle_file=subtitle_file)
audio_duration = get_audio_duration(sub_maker)
print(f"voice: {voice_name}, audio duration: {audio_duration}s")
loop = asyncio.get_event_loop_policy().get_event_loop()