关于open sora plan数据集–清洗篇

关于open sora plan数据集–清洗篇

删除不合格分辨率的视频

import os
import subprocess
import shlex

# 定义一个函数用于检查文件完整性
def is_file_complete(filepath):
    # 检查文件的大小,确保它不是空的
    try:
        return os.path.getsize(filepath) > 0
    except OSError as e:
        print(f"Error checking file size: {e}")
        return False

def get_video_resolution(file_path):
    cmd = (
        f"ffprobe -v error -select_streams v:0 "
        f"-show_entries stream=width,height -of csv=s=x:p=0 '{file_path}'"
    )
    try:
        result = subprocess.run(
            shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True
        )
        if result.returncode == 0:
            output = result.stdout.decode('utf-8').strip()
            # 分别赋值
            try:
                parts = output.split('x')
                width = int(parts[0])
                height = int(parts[1])
                return width, height
            except:
                return 0, 0
        else:
            return None
    except subprocess.CalledProcessError:
        return None

def delete_low_resolution_videos(folder_path, min_resolution=720):
    count_ = 0
    for root, dirs, files in os.walk(folder_path):
        for filename in files:

            count_ += 1
            file_path = os.path.join(root, filename)

            if not is_file_complete(file_path):
                # 如果不完整,删除视频文件和对应的文本文件(如果存在)
                os.remove(file_path)

            #print(f"Checking resolution of {file_path}")
            resolution = get_video_resolution(file_path)
            if resolution:
                width, height = resolution
                if (width < min_resolution and height < min_resolution):
                    print(f"{count_} Deleting low-resolution video file: {file_path}")
                    os.remove(file_path)
            else:
                print(f"{count_} Skipping non-video file or error reading file: {file_path}")
                os.remove(file_path)

if __name__ == '__main__':
  # 指定你的文件夹路径
  folder_path = r''
  delete_low_resolution_videos(folder_path)

视频质量评分

重新分配文件夹

  • 将视频文件分成每5000个视频一个文件夹,防止检查视频假死
import os
import shutil
from tqdm import tqdm

# 定义一个函数用于检查文件完整性
def is_file_complete(filepath):
    # 检查文件的大小,确保它不是空的
    try:
        return os.path.getsize(filepath) > 0
    except OSError as e:
        print(f"Error checking file size: {e}")
        return False

# 定义一个函数用于整理文件
def organize_files(src_folder, dest_folder, files_per_folder=5000):
    # 获取源文件夹中所有视频文件的数目
    video_file_count = sum([len(files) for r, d, files in os.walk(src_folder) if any(file.lower().endswith(('.mp4', '.mov', '.avi', '.mkv', '.wmv', '.flv')) for file in files)])

    # 如果目标文件夹不存在,则创建它
    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)

    # 初始化文件夹索引和文件计数器
    folder_index = 4
    file_count = 0

    # 创建第一个目标子文件夹
    current_subfolder = os.path.join(dest_folder, f'5000-{folder_index}')
    os.makedirs(current_subfolder, exist_ok=True)

    # 使用tqdm创建进度条
    progress_bar = tqdm(total=video_file_count, unit='file')

    # 定义有效的视频文件扩展名
    video_extensions = ('.mp4', '.mov', '.avi', '.mkv', '.wmv', '.flv', '.webm')

    # 遍历源文件夹中的所有文件和文件夹
    for root, dirs, files in os.walk(src_folder):
        for file in files:
            # 检查当前文件是否为视频文件
            if file.lower().endswith(video_extensions):
                try:
                    # 获取不带扩展名的文件名
                    base_name = os.path.splitext(file)[0]
                    # 构建视频文件的完整路径
                    video_path = os.path.join(root, file)
                    # 构建相应的文本文件的完整路径
                    txt_path = os.path.join(root, f'{base_name}.txt')

                    # 检查视频文件是否完整
                    if not is_file_complete(video_path):
                        # 如果不完整,删除视频文件和对应的文本文件(如果存在)
                        os.remove(video_path)
                        if os.path.exists(txt_path):
                            os.remove(txt_path)
                        print(f'Removed incomplete files: {video_path} and corresponding txt file')
                        continue  # 跳过当前循环,处理下一个文件

                    # 如果当前文件夹内的文件数量已达到上限,则创建新的子文件夹
                    if file_count >= files_per_folder:
                        folder_index += 1
                        current_subfolder = os.path.join(dest_folder, f'5000-{folder_index}')
                        os.makedirs(current_subfolder, exist_ok=True)
                        file_count = 0  # 为新文件夹重置文件计数器

                    # 移动视频文件到当前子文件夹
                    dest_video_path = os.path.join(current_subfolder, file)
                    shutil.move(video_path, dest_video_path)
                    file_count += 1  # 更新文件计数器

                    # 如果相应的文本文件存在,则一并移动
                    if os.path.exists(txt_path):
                        dest_txt_path = os.path.join(current_subfolder, f'{base_name}.txt')
                        shutil.move(txt_path, dest_txt_path)

                    # 更新进度条
                    progress_bar.update(1)

                except Exception as e:
                    print(f"Error processing file {file}: {e}")

    # 关闭进度条
    progress_bar.close()

if __name__ == '__main__':
    # 指定源目录,包含原始文件
    source_directory = r''
    # 指定目标目录,整理后的文件将被放置在这里
    destination_directory = r''

    # 调用函数,开始文件整理过程
    organize_files(source_directory, destination_directory)
0

评论0

显示验证码
没有账号?注册  忘记密码?