Merge pull request #655 from harry0703/dev

Dev
This commit is contained in:
Harry
2025-05-10 13:27:27 +08:00
committed by GitHub
19 changed files with 227 additions and 18 deletions

View File

@@ -1,4 +1,5 @@
import glob import glob
import itertools
import os import os
import random import random
import gc import gc
@@ -31,15 +32,19 @@ from app.services.utils import video_effects
from app.utils import utils from app.utils import utils
class SubClippedVideoClip: class SubClippedVideoClip:
def __init__(self, file_path, start_time, end_time, width=None, height=None): def __init__(self, file_path, start_time=None, end_time=None, width=None, height=None, duration=None):
self.file_path = file_path self.file_path = file_path
self.start_time = start_time self.start_time = start_time
self.end_time = end_time self.end_time = end_time
self.width = width self.width = width
self.height = height self.height = height
if duration is None:
self.duration = end_time - start_time
else:
self.duration = duration
def __str__(self): def __str__(self):
return f"SubClippedVideoClip(file_path={self.file_path}, start_time={self.start_time}, end_time={self.end_time}, width={self.width}, height={self.height})" return f"SubClippedVideoClip(file_path={self.file_path}, start_time={self.start_time}, end_time={self.end_time}, duration={self.duration}, width={self.width}, height={self.height})"
audio_codec = "aac" audio_codec = "aac"
@@ -131,7 +136,7 @@ def combine_videos(
aspect = VideoAspect(video_aspect) aspect = VideoAspect(video_aspect)
video_width, video_height = aspect.to_resolution() video_width, video_height = aspect.to_resolution()
clip_files = [] processed_clips = []
subclipped_items = [] subclipped_items = []
video_duration = 0 video_duration = 0
for video_path in video_paths: for video_path in video_paths:
@@ -144,7 +149,7 @@ def combine_videos(
while start_time < clip_duration: while start_time < clip_duration:
end_time = min(start_time + max_clip_duration, clip_duration) end_time = min(start_time + max_clip_duration, clip_duration)
if clip_duration - start_time > max_clip_duration: if clip_duration - start_time >= max_clip_duration:
subclipped_items.append(SubClippedVideoClip(file_path= video_path, start_time=start_time, end_time=end_time, width=clip_w, height=clip_h)) subclipped_items.append(SubClippedVideoClip(file_path= video_path, start_time=start_time, end_time=end_time, width=clip_w, height=clip_h))
start_time = end_time start_time = end_time
if video_concat_mode.value == VideoConcatMode.sequential.value: if video_concat_mode.value == VideoConcatMode.sequential.value:
@@ -171,7 +176,7 @@ def combine_videos(
if clip_w != video_width or clip_h != video_height: if clip_w != video_width or clip_h != video_height:
clip_ratio = clip.w / clip.h clip_ratio = clip.w / clip.h
video_ratio = video_width / video_height video_ratio = video_width / video_height
logger.debug(f"resizing to {video_width}x{video_height}, source: {clip_w}x{clip_h}, ratio: {clip_ratio:.2f}, target ratio: {video_ratio:.2f}") logger.debug(f"resizing clip, source: {clip_w}x{clip_h}, ratio: {clip_ratio:.2f}, target: {video_width}x{video_height}, ratio: {video_ratio:.2f}")
if clip_ratio == video_ratio: if clip_ratio == video_ratio:
clip = clip.resized(new_size=(video_width, video_height)) clip = clip.resized(new_size=(video_width, video_height))
@@ -221,28 +226,39 @@ def combine_videos(
close_clip(clip) close_clip(clip)
clip_files.append(clip_file) processed_clips.append(SubClippedVideoClip(file_path=clip_file, duration=clip.duration, width=clip_w, height=clip_h))
video_duration += clip.duration video_duration += clip.duration
except Exception as e: except Exception as e:
logger.error(f"failed to process clip: {str(e)}") logger.error(f"failed to process clip: {str(e)}")
# loop processed clips until the video duration matches or exceeds the audio duration.
if video_duration < audio_duration:
logger.warning(f"video duration ({video_duration:.2f}s) is shorter than audio duration ({audio_duration:.2f}s), looping clips to match audio length.")
base_clips = processed_clips.copy()
for clip in itertools.cycle(base_clips):
if video_duration >= audio_duration:
break
processed_clips.append(clip)
video_duration += clip.duration
logger.info(f"video duration: {video_duration:.2f}s, audio duration: {audio_duration:.2f}s, looped {len(processed_clips)-len(base_clips)} clips")
# merge video clips progressively, avoid loading all videos at once to avoid memory overflow # merge video clips progressively, avoid loading all videos at once to avoid memory overflow
logger.info("starting clip merging process") logger.info("starting clip merging process")
if not clip_files: if not processed_clips:
logger.warning("no clips available for merging") logger.warning("no clips available for merging")
return combined_video_path return combined_video_path
# if there is only one clip, use it directly # if there is only one clip, use it directly
if len(clip_files) == 1: if len(processed_clips) == 1:
logger.info("using single clip directly") logger.info("using single clip directly")
shutil.copy(clip_files[0], combined_video_path) shutil.copy(processed_clips[0].file_path, combined_video_path)
delete_files(clip_files) delete_files(processed_clips)
logger.info("video combining completed") logger.info("video combining completed")
return combined_video_path return combined_video_path
# create initial video file as base # create initial video file as base
base_clip_path = clip_files[0] base_clip_path = processed_clips[0].file_path
temp_merged_video = f"{output_dir}/temp-merged-video.mp4" temp_merged_video = f"{output_dir}/temp-merged-video.mp4"
temp_merged_next = f"{output_dir}/temp-merged-next.mp4" temp_merged_next = f"{output_dir}/temp-merged-next.mp4"
@@ -250,13 +266,13 @@ def combine_videos(
shutil.copy(base_clip_path, temp_merged_video) shutil.copy(base_clip_path, temp_merged_video)
# merge remaining video clips one by one # merge remaining video clips one by one
for i, clip_path in enumerate(clip_files[1:], 1): for i, clip in enumerate(processed_clips[1:], 1):
logger.info(f"merging clip {i}/{len(clip_files)-1}") logger.info(f"merging clip {i}/{len(processed_clips)-1}, duration: {clip.duration:.2f}s")
try: try:
# load current base video and next clip to merge # load current base video and next clip to merge
base_clip = VideoFileClip(temp_merged_video) base_clip = VideoFileClip(temp_merged_video)
next_clip = VideoFileClip(clip_path) next_clip = VideoFileClip(clip.file_path)
# merge these two clips # merge these two clips
merged_clip = concatenate_videoclips([base_clip, next_clip]) merged_clip = concatenate_videoclips([base_clip, next_clip])
@@ -286,6 +302,7 @@ def combine_videos(
os.rename(temp_merged_video, combined_video_path) os.rename(temp_merged_video, combined_video_path)
# clean temp files # clean temp files
clip_files = [clip.file_path for clip in processed_clips]
delete_files(clip_files) delete_files(clip_files)
logger.info("video combining completed") logger.info("video combining completed")
@@ -511,8 +528,7 @@ def preprocess_video(materials: List[MaterialInfo], clip_duration=4):
# Output the video to a file. # Output the video to a file.
video_file = f"{material.url}.mp4" video_file = f"{material.url}.mp4"
final_clip.write_videofile(video_file, fps=30, logger=None) final_clip.write_videofile(video_file, fps=30, logger=None)
final_clip.close() close_clip(clip)
del final_clip
material.url = video_file material.url = video_file
logger.success(f"image processed: {video_file}") logger.success(f"image processed: {video_file}")
return materials return materials

View File

@@ -1,6 +1,7 @@
import json import json
import locale import locale
import os import os
from pathlib import Path
import threading import threading
from typing import Any from typing import Any
from uuid import uuid4 from uuid import uuid4
@@ -226,4 +227,4 @@ def load_locales(i18n_dir):
def parse_extension(filename): def parse_extension(filename):
return os.path.splitext(filename)[1].strip().lower().replace(".", "") return Path(filename).suffix.lower().lstrip('.')

39
test/README.md Normal file
View File

@@ -0,0 +1,39 @@
# MoneyPrinterTurbo Test Directory
This directory contains unit tests for the **MoneyPrinterTurbo** project.
## Directory Structure
- `services/`: Tests for components in the `app/services` directory
- `test_video.py`: Tests for the video service
- `test_task.py`: Tests for the task service
## Running Tests
You can run the tests using Pythons built-in `unittest` framework:
```bash
# Run all tests
python -m unittest discover -s test
# Run a specific test file
python -m unittest test/services/test_video.py
# Run a specific test class
python -m unittest test.services.test_video.TestVideoService
# Run a specific test method
python -m unittest test.services.test_video.TestVideoService.test_preprocess_video
````
## Adding New Tests
To add tests for other components, follow these guidelines:
1. Create test files prefixed with `test_` in the appropriate subdirectory
2. Use `unittest.TestCase` as the base class for your test classes
3. Name test methods with the `test_` prefix
## Test Resources
Place any resource files required for testing in the `test/resources` directory.

1
test/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Unit test package for test

BIN
test/resources/1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.6 KiB

BIN
test/resources/1.png.mp4 Normal file

Binary file not shown.

BIN
test/resources/2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.1 KiB

BIN
test/resources/2.png.mp4 Normal file

Binary file not shown.

BIN
test/resources/3.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.8 KiB

BIN
test/resources/3.png.mp4 Normal file

Binary file not shown.

BIN
test/resources/4.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.6 KiB

BIN
test/resources/5.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.8 KiB

BIN
test/resources/6.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.8 KiB

BIN
test/resources/7.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.4 KiB

BIN
test/resources/8.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.3 KiB

BIN
test/resources/9.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.0 KiB

View File

@@ -0,0 +1 @@
# Unit test package for services

View File

@@ -0,0 +1,66 @@
import unittest
import os
import sys
from pathlib import Path
# add project root to python path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from app.services import task as tm
from app.models.schema import MaterialInfo, VideoParams
resources_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "resources")
class TestTaskService(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_task_local_materials(self):
task_id = "00000000-0000-0000-0000-000000000000"
video_materials=[]
for i in range(1, 4):
video_materials.append(MaterialInfo(
provider="local",
url=os.path.join(resources_dir, f"{i}.png"),
duration=0
))
params = VideoParams(
video_subject="金钱的作用",
video_script="金钱不仅是交换媒介,更是社会资源的分配工具。它能满足基本生存需求,如食物和住房,也能提供教育、医疗等提升生活品质的机会。拥有足够的金钱意味着更多选择权,比如职业自由或创业可能。但金钱的作用也有边界,它无法直接购买幸福、健康或真诚的人际关系。过度追逐财富可能导致价值观扭曲,忽视精神层面的需求。理想的状态是理性看待金钱,将其作为实现目标的工具而非终极目的。",
video_terms="money importance, wealth and society, financial freedom, money and happiness, role of money",
video_aspect="9:16",
video_concat_mode="random",
video_transition_mode="None",
video_clip_duration=3,
video_count=1,
video_source="local",
video_materials=video_materials,
video_language="",
voice_name="zh-CN-XiaoxiaoNeural-Female",
voice_volume=1.0,
voice_rate=1.0,
bgm_type="random",
bgm_file="",
bgm_volume=0.2,
subtitle_enabled=True,
subtitle_position="bottom",
custom_position=70.0,
font_name="MicrosoftYaHeiBold.ttc",
text_fore_color="#FFFFFF",
text_background_color=True,
font_size=60,
stroke_color="#000000",
stroke_width=1.5,
n_threads=2,
paragraph_number=1
)
result = tm.start(task_id=task_id, params=params)
print(result)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,85 @@
import unittest
import os
import sys
from pathlib import Path
from moviepy import (
VideoFileClip,
)
# add project root to python path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from app.models.schema import MaterialInfo
from app.services import video as vd
from app.utils import utils
resources_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "resources")
class TestVideoService(unittest.TestCase):
def setUp(self):
self.test_img_path = os.path.join(resources_dir, "1.png")
def tearDown(self):
pass
def test_preprocess_video(self):
if not os.path.exists(self.test_img_path):
self.fail(f"test image not found: {self.test_img_path}")
# test preprocess_video function
m = MaterialInfo()
m.url = self.test_img_path
m.provider = "local"
print(m)
materials = vd.preprocess_video([m], clip_duration=4)
print(materials)
# verify result
self.assertIsNotNone(materials)
self.assertEqual(len(materials), 1)
self.assertTrue(materials[0].url.endswith(".mp4"))
# moviepy get video info
clip = VideoFileClip(materials[0].url)
print(clip)
# clean generated test video file
if os.path.exists(materials[0].url):
os.remove(materials[0].url)
def test_wrap_text(self):
"""test text wrapping function"""
try:
font_path = os.path.join(utils.font_dir(), "STHeitiMedium.ttc")
if not os.path.exists(font_path):
self.fail(f"font file not found: {font_path}")
# test english text wrapping
test_text_en = "This is a test text for wrapping long sentences in english language"
wrapped_text_en, text_height_en = vd.wrap_text(
text=test_text_en,
max_width=300,
font=font_path,
fontsize=30
)
print(wrapped_text_en, text_height_en)
# verify text is wrapped
self.assertIn("\n", wrapped_text_en)
# test chinese text wrapping
test_text_zh = "这是一段用来测试中文长句换行的文本内容,应该会根据宽度限制进行换行处理"
wrapped_text_zh, text_height_zh = vd.wrap_text(
text=test_text_zh,
max_width=300,
font=font_path,
fontsize=30
)
print(wrapped_text_zh, text_height_zh)
# verify chinese text is wrapped
self.assertIn("\n", wrapped_text_zh)
except Exception as e:
self.fail(f"test wrap_text failed: {str(e)}")
if __name__ == "__main__":
unittest.main()