关于open sora plan数据集–清洗篇
删除不合格分辨率的视频
import os
import subprocess
import shlex
# 定义一个函数用于检查文件完整性
def is_file_complete(filepath):
# 检查文件的大小,确保它不是空的
try:
return os.path.getsize(filepath) > 0
except OSError as e:
print(f"Error checking file size: {e}")
return False
def get_video_resolution(file_path):
cmd = (
f"ffprobe -v error -select_streams v:0 "
f"-show_entries stream=width,height -of csv=s=x:p=0 '{file_path}'"
)
try:
result = subprocess.run(
shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True
)
if result.returncode == 0:
output = result.stdout.decode('utf-8').strip()
# 分别赋值
try:
parts = output.split('x')
width = int(parts[0])
height = int(parts[1])
return width, height
except:
return 0, 0
else:
return None
except subprocess.CalledProcessError:
return None
def delete_low_resolution_videos(folder_path, min_resolution=720):
count_ = 0
for root, dirs, files in os.walk(folder_path):
for filename in files:
count_ += 1
file_path = os.path.join(root, filename)
if not is_file_complete(file_path):
# 如果不完整,删除视频文件和对应的文本文件(如果存在)
os.remove(file_path)
#print(f"Checking resolution of {file_path}")
resolution = get_video_resolution(file_path)
if resolution:
width, height = resolution
if (width < min_resolution and height < min_resolution):
print(f"{count_} Deleting low-resolution video file: {file_path}")
os.remove(file_path)
else:
print(f"{count_} Skipping non-video file or error reading file: {file_path}")
os.remove(file_path)
if __name__ == '__main__':
# 指定你的文件夹路径
folder_path = r''
delete_low_resolution_videos(folder_path)
视频质量评分
重新分配文件夹
- 将视频文件分成每5000个视频一个文件夹,防止检查视频假死
import os
import shutil
from tqdm import tqdm
# 定义一个函数用于检查文件完整性
def is_file_complete(filepath):
# 检查文件的大小,确保它不是空的
try:
return os.path.getsize(filepath) > 0
except OSError as e:
print(f"Error checking file size: {e}")
return False
# 定义一个函数用于整理文件
def organize_files(src_folder, dest_folder, files_per_folder=5000):
# 获取源文件夹中所有视频文件的数目
video_file_count = sum([len(files) for r, d, files in os.walk(src_folder) if any(file.lower().endswith(('.mp4', '.mov', '.avi', '.mkv', '.wmv', '.flv')) for file in files)])
# 如果目标文件夹不存在,则创建它
if not os.path.exists(dest_folder):
os.makedirs(dest_folder)
# 初始化文件夹索引和文件计数器
folder_index = 4
file_count = 0
# 创建第一个目标子文件夹
current_subfolder = os.path.join(dest_folder, f'5000-{folder_index}')
os.makedirs(current_subfolder, exist_ok=True)
# 使用tqdm创建进度条
progress_bar = tqdm(total=video_file_count, unit='file')
# 定义有效的视频文件扩展名
video_extensions = ('.mp4', '.mov', '.avi', '.mkv', '.wmv', '.flv', '.webm')
# 遍历源文件夹中的所有文件和文件夹
for root, dirs, files in os.walk(src_folder):
for file in files:
# 检查当前文件是否为视频文件
if file.lower().endswith(video_extensions):
try:
# 获取不带扩展名的文件名
base_name = os.path.splitext(file)[0]
# 构建视频文件的完整路径
video_path = os.path.join(root, file)
# 构建相应的文本文件的完整路径
txt_path = os.path.join(root, f'{base_name}.txt')
# 检查视频文件是否完整
if not is_file_complete(video_path):
# 如果不完整,删除视频文件和对应的文本文件(如果存在)
os.remove(video_path)
if os.path.exists(txt_path):
os.remove(txt_path)
print(f'Removed incomplete files: {video_path} and corresponding txt file')
continue # 跳过当前循环,处理下一个文件
# 如果当前文件夹内的文件数量已达到上限,则创建新的子文件夹
if file_count >= files_per_folder:
folder_index += 1
current_subfolder = os.path.join(dest_folder, f'5000-{folder_index}')
os.makedirs(current_subfolder, exist_ok=True)
file_count = 0 # 为新文件夹重置文件计数器
# 移动视频文件到当前子文件夹
dest_video_path = os.path.join(current_subfolder, file)
shutil.move(video_path, dest_video_path)
file_count += 1 # 更新文件计数器
# 如果相应的文本文件存在,则一并移动
if os.path.exists(txt_path):
dest_txt_path = os.path.join(current_subfolder, f'{base_name}.txt')
shutil.move(txt_path, dest_txt_path)
# 更新进度条
progress_bar.update(1)
except Exception as e:
print(f"Error processing file {file}: {e}")
# 关闭进度条
progress_bar.close()
if __name__ == '__main__':
# 指定源目录,包含原始文件
source_directory = r''
# 指定目标目录,整理后的文件将被放置在这里
destination_directory = r''
# 调用函数,开始文件整理过程
organize_files(source_directory, destination_directory)
评论0