Newer
Older
medialib-manager / mediascan.py
import os
import json
import ffmpeg
import subprocess

def load_config(config_file):
    if os.path.exists(config_file):
        with open(config_file, 'r') as f:
            return json.load(f)
    else:
        return {"directories": []}

def save_config(config_file, config):
    with open(config_file, 'w') as f:
        json.dump(config, f, indent=4)

def media_create_cache(cache_dir, data):
    filepath = f"{cache_dir}/mediascan.json"
    with open(filepath, 'w') as f:
        json.dump(data, f, indent=2)

def media_remove_cache(cache_dir):
    filepath = f"{cache_dir}/mediascan.json"
    if not os.path.exists(filepath):
        return True

    os.remove(filepath)
    return True


def media_get_from_cache(cache_dir):
    filepath = f"{cache_dir}/mediascan.json"
    if not os.path.exists(filepath):
        return None

    with open(filepath, 'r') as f:
        return json.load(f)


def get_media_info_with_ffprobe(file_path):
    try:
        if not os.path.exists(file_path):
            print(f"File not found: {file_path}")
            return None

        # Command to fetch streams
        streams_command = [
            "ffprobe", "-v", "error",
            "-show_entries", "stream=index,codec_type,codec_name,width,height,bit_rate,channels,channel_layout",
            "-show_entries", "stream_tags=language,title",
            "-of", "json", file_path
        ]

        # Command to fetch format information
        format_command = [
            "ffprobe", "-v", "error",
            "-show_entries", "format=duration,bit_rate,size",
            "-of", "json", file_path
        ]

        streams_result = subprocess.run(streams_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        format_result = subprocess.run(format_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

        if streams_result.returncode != 0:
            print(f"ffprobe stream error: {streams_result.stderr}")
            return None

        if format_result.returncode != 0:
            print(f"ffprobe format error: {format_result.stderr}")
            return None

        streams_data = json.loads(streams_result.stdout)
        format_data = json.loads(format_result.stdout)

        format_info = format_data.get("format", {})
        container_duration = float(format_info.get("duration", 0))
        container_bitrate = int(format_info.get("bit_rate", 0)) // 1000 if format_info.get("bit_rate") else "Unknown"
        file_size = int(format_info.get("size", 0))

        media_info = {
            "video": [],
            "audio": [],
            "container": {
                "duration": f"{container_duration:.2f} seconds" if container_duration else "Unknown",
                "bitrate": f"{container_bitrate} Kbit/s" if container_bitrate else "Unknown",
                "size": f"{file_size / (1024**2):.2f} MB" if file_size else "Unknown"
            }
        }

        for stream in streams_data.get("streams", []):
            codec_type = stream.get("codec_type")
            codec_name = stream.get("codec_name")
            stream_bitrate = int(stream.get("bit_rate", 0)) // 1000 if stream.get("bit_rate") else None
            width = stream.get("width")
            height = stream.get("height")
            channels = stream.get("channels", "Unknown")
            channel_layout = stream.get("channel_layout", "Unknown")
            language = stream.get("tags", {}).get("language", "Unknown")
            title = stream.get("tags", {}).get("title", "Unknown")
            bitrate = stream_bitrate if stream_bitrate else container_bitrate

            if codec_type == "video":
                if len(media_info["video"]) == 0:
                    bitrate_str = f"{bitrate} Kbit/s" if bitrate else "Unknown"
                else:
                    bitrate_str = "Unknown"

                media_info["video"].append({
                    "resolution": f"{width}x{height}" if width and height else "Unknown",
                    "bitrate": bitrate_str,
                    "codec": codec_name
                })
            elif codec_type == "audio":
                media_info["audio"].append({
                    "bitrate": f"{stream_bitrate} Kbit/s" if stream_bitrate else "Unknown",
                    "channels": channels,
                    "layout": channel_layout,
                    "language": language,
                    "title": title,
                    "codec": codec_name
                })

        return media_info

    except Exception as e:
        print(f"Error processing file {file_path}: {e}")
        return None




# Function to list media files
def list_media_files(directories):
    media_files = []
    for directory in directories:
        if os.path.exists(directory):
            for root, _, files in os.walk(directory):
                for file in files:
                    file_path = os.path.join(root, file)
                    try:
                        file_info = {
                            "name": file,
                            "path": file_path,
                            "size": os.path.getsize(file_path)
                        }
                        media_files.append(file_info)
                    except OSError as e:
                        print(f"Error accessing file {file_path}: {e}")
        else:
            print(f"Directory does not exist: {directory}")
    return media_files

def filter_media_files(media_files, allowed_formats):
    """
    Фильтрует список файлов по допустимым форматам.

    :param media_files: Список файлов, полученный из list_media_files.
    :param allowed_formats: Список допустимых форматов, например [".mp3", ".jpg"].
    :return: Отфильтрованный список файлов.
    """
    filtered_files = [
        file for file in media_files
        if any(file['name'].lower().endswith(fmt.lower()) for fmt in allowed_formats)
    ]
    return filtered_files

def human_readable_size(size_bytes):
    if size_bytes < 1024:
        return [f"{size_bytes}", "B", f"{size_bytes}"]
    elif size_bytes < 1024**2:
        return [f"{size_bytes / 1024:.2f}", "KB", f"{size_bytes}"]
    elif size_bytes < 1024**3:
        return [f"{size_bytes / 1024**2:.2f}", "MB", f"{size_bytes}"]
    else:
        return [f"{size_bytes / 1024**3:.2f}", "GB", f"{size_bytes}"]

def get_single_media_by_path(path):
    media_info = get_media_info_with_ffprobe(path)
    size, size_unit, size_bytes = human_readable_size(os.path.getsize(path))
    media_file = {
        "path": path,
        "name": os.path.basename(path),
        "size_bytes": size_bytes,
        "size": size,
        "size_unit": size_unit,
        "info": media_info
    }

    return media_file