import os import json import ffmpeg import subprocess def get_media_info_with_ffprobe(file_path): try: if not os.path.exists(file_path): print(f"File not found: {file_path}") return None # Command to fetch streams streams_command = [ "ffprobe", "-v", "error", "-show_entries", "stream=index,codec_type,codec_name,width,height,bit_rate,channels,channel_layout", "-show_entries", "stream_tags=language", "-of", "json", file_path ] # Command to fetch format information format_command = [ "ffprobe", "-v", "error", "-show_entries", "format=duration,bit_rate,size", "-of", "json", file_path ] streams_result = subprocess.run(streams_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) format_result = subprocess.run(format_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if streams_result.returncode != 0: print(f"ffprobe stream error: {streams_result.stderr}") return None if format_result.returncode != 0: print(f"ffprobe format error: {format_result.stderr}") return None streams_data = json.loads(streams_result.stdout) format_data = json.loads(format_result.stdout) format_info = format_data.get("format", {}) container_duration = float(format_info.get("duration", 0)) container_bitrate = int(format_info.get("bit_rate", 0)) // 1000 if format_info.get("bit_rate") else "Unknown" file_size = int(format_info.get("size", 0)) media_info = { "video": [], "audio": [], "container": { "duration": f"{container_duration:.2f} seconds" if container_duration else "Unknown", "bitrate": f"{container_bitrate} Kbit/s" if container_bitrate else "Unknown", "size": f"{file_size / (1024**2):.2f} MB" if file_size else "Unknown" } } for stream in streams_data.get("streams", []): codec_type = stream.get("codec_type") codec_name = stream.get("codec_name") stream_bitrate = int(stream.get("bit_rate", 0)) // 1000 if stream.get("bit_rate") else None width = stream.get("width") height = stream.get("height") channels = stream.get("channels", "Unknown") channel_layout = stream.get("channel_layout", "Unknown") language = stream.get("tags", {}).get("language", "Unknown") bitrate = stream_bitrate if stream_bitrate else container_bitrate if codec_type == "video": media_info["video"].append({ "resolution": f"{width}x{height}" if width and height else "Unknown", "bitrate": f"{bitrate} Kbit/s" if bitrate else "Unknown", "codec": codec_name }) elif codec_type == "audio": media_info["audio"].append({ "bitrate": f"{stream_bitrate} Kbit/s" if stream_bitrate else "Unknown", "channels": channels, "layout": channel_layout, "language": language, "codec": codec_name }) return media_info except Exception as e: print(f"Error processing file {file_path}: {e}") return None # Function to list media files def list_media_files(directories): media_files = [] for directory in directories: if os.path.exists(directory): for root, _, files in os.walk(directory): for file in files: file_path = os.path.join(root, file) try: file_info = { "name": file, "path": file_path, "size": os.path.getsize(file_path) } media_files.append(file_info) except OSError as e: print(f"Error accessing file {file_path}: {e}") else: print(f"Directory does not exist: {directory}") return media_files def filter_media_files(media_files, allowed_formats): """ Фильтрует список файлов по допустимым форматам. :param media_files: Список файлов, полученный из list_media_files. :param allowed_formats: Список допустимых форматов, например [".mp3", ".jpg"]. :return: Отфильтрованный список файлов. """ filtered_files = [ file for file in media_files if any(file['name'].lower().endswith(fmt.lower()) for fmt in allowed_formats) ] return filtered_files def human_readable_size(size_bytes): if size_bytes < 1024: return [f"{size_bytes}", "B"] elif size_bytes < 1024**2: return [f"{size_bytes / 1024:.2f}", "KB"] elif size_bytes < 1024**3: return [f"{size_bytes / 1024**2:.2f}", "MB"] else: return [f"{size_bytes / 1024**3:.2f}", "GB"]