This commit is contained in:
kevin.zhang
2024-04-12 17:47:14 +08:00
14 changed files with 197 additions and 98 deletions

View File

@@ -5,6 +5,8 @@ from typing import List
from loguru import logger
from openai import OpenAI
from openai import AzureOpenAI
from openai.types.chat import ChatCompletion
from app.config import config
@@ -57,6 +59,11 @@ def _generate_response(prompt: str) -> str:
api_key = config.app.get("qwen_api_key")
model_name = config.app.get("qwen_model_name")
base_url = "***"
elif llm_provider == "cloudflare":
api_key = config.app.get("cloudflare_api_key")
model_name = config.app.get("cloudflare_model_name")
account_id = config.app.get("cloudflare_account_id")
base_url = "***"
else:
raise ValueError("llm_provider is not set, please set it in the config.toml file.")
@@ -69,17 +76,31 @@ def _generate_response(prompt: str) -> str:
if llm_provider == "qwen":
import dashscope
from dashscope.api_entities.dashscope_response import GenerationResponse
dashscope.api_key = api_key
response = dashscope.Generation.call(
model=model_name,
messages=[{"role": "user", "content": prompt}]
)
content = response["output"]["text"]
return content.replace("\n", "")
if response:
if isinstance(response, GenerationResponse):
status_code = response.status_code
if status_code != 200:
raise Exception(
f"[{llm_provider}] returned an error response: \"{response}\"")
content = response["output"]["text"]
return content.replace("\n", "")
else:
raise Exception(
f"[{llm_provider}] returned an invalid response: \"{response}\"")
else:
raise Exception(
f"[{llm_provider}] returned an empty response")
if llm_provider == "gemini":
import google.generativeai as genai
genai.configure(api_key=api_key)
genai.configure(api_key=api_key, transport='rest')
generation_config = {
"temperature": 0.5,
@@ -111,10 +132,30 @@ def _generate_response(prompt: str) -> str:
generation_config=generation_config,
safety_settings=safety_settings)
convo = model.start_chat(history=[])
try:
response = model.generate_content(prompt)
candidates = response.candidates
generated_text = candidates[0].content.parts[0].text
except (AttributeError, IndexError) as e:
print("Gemini Error:", e)
convo.send_message(prompt)
return convo.last.text
return generated_text
if llm_provider == "cloudflare":
import requests
response = requests.post(
f"https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run/{model_name}",
headers={"Authorization": f"Bearer {api_key}"},
json={
"messages": [
{"role": "system", "content": "You are a friendly assistant"},
{"role": "user", "content": prompt}
]
}
)
result = response.json()
logger.info(result)
return result["result"]["response"]
if llm_provider == "azure":
client = AzureOpenAI(
@@ -133,7 +174,15 @@ def _generate_response(prompt: str) -> str:
messages=[{"role": "user", "content": prompt}]
)
if response:
content = response.choices[0].message.content
if isinstance(response, ChatCompletion):
content = response.choices[0].message.content
else:
raise Exception(
f"[{llm_provider}] returned an invalid response: \"{response}\", please check your network "
f"connection and try again.")
else:
raise Exception(
f"[{llm_provider}] returned an empty response, please check your network connection and try again.")
return content.replace("\n", "")
@@ -149,9 +198,9 @@ Generate a script for a video, depending on the subject of the video.
1. the script is to be returned as a string with the specified number of paragraphs.
2. do not under any circumstance reference this prompt in your response.
3. get straight to the point, don't start with unnecessary things like, "welcome to this video".
4. you must not include any type of markdown or formatting in the script, never use a title.
5. only return the raw content of the script.
6. do not include "voiceover", "narrator" or similar indicators of what should be spoken at the beginning of each paragraph or line.
4. you must not include any type of markdown or formatting in the script, never use a title.
5. only return the raw content of the script.
6. do not include "voiceover", "narrator" or similar indicators of what should be spoken at the beginning of each paragraph or line.
7. you must not mention the prompt, or anything about the script itself. also, never talk about the amount of paragraphs or lines. just write the script.
8. respond in the same language as the video subject.

View File

@@ -1,4 +1,5 @@
import json
import os.path
import re
from faster_whisper import WhisperModel
@@ -17,8 +18,13 @@ model = None
def create(audio_file, subtitle_file: str = ""):
global model
if not model:
logger.info(f"loading model: {model_size}, device: {device}, compute_type: {compute_type}")
model = WhisperModel(model_size_or_path=model_size,
model_path = f"{utils.root_dir()}/models/whisper-{model_size}"
model_bin_file = f"{model_path}/model.bin"
if not os.path.isdir(model_path) or not os.path.isfile(model_bin_file):
model_path = model_size
logger.info(f"loading model: {model_path}, device: {device}, compute_type: {compute_type}")
model = WhisperModel(model_size_or_path=model_path,
device=device,
compute_type=compute_type)

View File

@@ -124,7 +124,7 @@ def wrap_text(text, max_width, font='Arial', fontsize=60):
width, height = get_text_size(text)
if width <= max_width:
return text
return text, height
logger.warning(f"wrapping text, max_width: {max_width}, text_width: {width}, text: {text}")
@@ -149,8 +149,9 @@ def wrap_text(text, max_width, font='Arial', fontsize=60):
if processed:
_wrapped_lines_ = [line.strip() for line in _wrapped_lines_]
result = '\n'.join(_wrapped_lines_).strip()
height = len(_wrapped_lines_) * height
logger.warning(f"wrapped text: {result}")
return result
return result, height
_wrapped_lines_ = []
chars = list(text)
@@ -165,8 +166,9 @@ def wrap_text(text, max_width, font='Arial', fontsize=60):
_txt_ = ''
_wrapped_lines_.append(_txt_)
result = '\n'.join(_wrapped_lines_).strip()
height = len(_wrapped_lines_) * height
logger.warning(f"wrapped text: {result}")
return result
return result, height
def generate_video(video_path: str,
@@ -199,23 +201,15 @@ def generate_video(video_path: str,
logger.info(f"using font: {font_path}")
if params.subtitle_position == "top":
position_height = video_height * 0.1
elif params.subtitle_position == "bottom":
position_height = video_height * 0.9
else:
position_height = "center"
def generator(txt, **kwargs):
def create_text_clip(subtitle_item):
phrase = subtitle_item[1]
max_width = video_width * 0.9
# logger.debug(f"rendering text: {txt}")
wrapped_txt = wrap_text(txt,
max_width=max_width,
font=font_path,
fontsize=params.font_size
) # 调整max_width以适应你的视频
clip = TextClip(
wrapped_txt, txt_height = wrap_text(phrase,
max_width=max_width,
font=font_path,
fontsize=params.font_size
)
_clip = TextClip(
wrapped_txt,
font=font_path,
fontsize=params.font_size,
@@ -225,15 +219,28 @@ def generate_video(video_path: str,
stroke_width=params.stroke_width,
print_cmd=False,
)
return clip
duration = subtitle_item[0][1] - subtitle_item[0][0]
_clip = _clip.set_start(subtitle_item[0][0])
_clip = _clip.set_end(subtitle_item[0][1])
_clip = _clip.set_duration(duration)
if params.subtitle_position == "bottom":
_clip = _clip.set_position(('center', video_height * 0.95 - _clip.h))
elif params.subtitle_position == "top":
_clip = _clip.set_position(('center', video_height * 0.1))
else:
_clip = _clip.set_position(('center', 'center'))
return _clip
video_clip = VideoFileClip(video_path)
audio_clip = AudioFileClip(audio_path).volumex(params.voice_volume)
if subtitle_path and os.path.exists(subtitle_path):
sub = SubtitlesClip(subtitles=subtitle_path, make_textclip=generator, encoding='utf-8')
sub_clip = sub.set_position(lambda _t: ('center', position_height))
video_clip = CompositeVideoClip([video_clip, sub_clip])
sub = SubtitlesClip(subtitles=subtitle_path, encoding='utf-8')
text_clips = []
for item in sub.subtitles:
clip = create_text_clip(subtitle_item=item)
text_clips.append(clip)
video_clip = CompositeVideoClip([video_clip, *text_clips])
bgm_file = get_bgm_file(bgm_type=params.bgm_type, bgm_file=params.bgm_file)
if bgm_file:
@@ -258,7 +265,7 @@ if __name__ == "__main__":
txt_zh = "测试长字段这是您的旅行技巧指南帮助您进行预算友好的冒险"
font = utils.resource_dir() + "/fonts/STHeitiMedium.ttc"
for txt in [txt_en, txt_zh]:
t = wrap_text(text=txt, max_width=1000, font=font, fontsize=60)
t, h = wrap_text(text=txt, max_width=1000, font=font, fontsize=60)
print(t)
task_id = "aa563149-a7ea-49c2-b39f-8c32cc225baf"