from flask import Flask, render_template, request, jsonify import os import json import ffmpeg import subprocess app = Flask(__name__) # Load configuration CONFIG_FILE = 'config.json' def load_config(): if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: return json.load(f) else: return {"directories": []} def save_config(config): with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=4) config = load_config() def get_media_info_with_ffprobe(file_path): """ Извлекает информацию о видео- и аудиопотоках, а также контейнере. :param file_path: Путь к файлу :return: Словарь с информацией о видео, аудио и контейнере """ try: if not os.path.exists(file_path): print(f"File not found: {file_path}") return None # Команда для потоков streams_command = [ "ffprobe", "-v", "error", "-show_entries", "stream=index,codec_type,codec_name,width,height,bit_rate,channels,channel_layout", "-of", "json", file_path ] # Команда для контейнера format_command = [ "ffprobe", "-v", "error", "-show_entries", "format=duration,bit_rate,size", "-of", "json", file_path ] # Выполняем команды streams_result = subprocess.run(streams_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) format_result = subprocess.run(format_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # Проверяем ошибки if streams_result.returncode != 0: print(f"ffprobe stream error: {streams_result.stderr}") return None if format_result.returncode != 0: print(f"ffprobe format error: {format_result.stderr}") return None # Парсим выводы streams_data = json.loads(streams_result.stdout) format_data = json.loads(format_result.stdout) # Извлекаем данные контейнера format_info = format_data.get("format", {}) container_duration = float(format_info.get("duration", 0)) container_bitrate = int(format_info.get("bit_rate", 0)) // 1000 # Перевод в Кбит/с file_size = int(format_info.get("size", 0)) # Обрабатываем потоки media_info = { "video": [], "audio": [], "container": { "duration": f"{container_duration:.2f} seconds" if container_duration else "Unknown", "bitrate": f"{container_bitrate} Kbit/s" if container_bitrate else "Unknown", "size": f"{file_size / (1024**2):.2f} MB" if file_size else "Unknown" } } for stream in streams_data.get("streams", []): codec_type = stream.get("codec_type") codec_name = stream.get("codec_name") stream_bitrate = int(stream.get("bit_rate", 0)) // 1000 if stream.get("bit_rate") else None width = stream.get("width") height = stream.get("height") channels = stream.get("channels", "Unknown") channel_layout = stream.get("channel_layout", "Unknown") # Если битрейт потока отсутствует, используем битрейт контейнера bitrate = stream_bitrate if stream_bitrate else container_bitrate if codec_type == "video": media_info["video"].append({ "resolution": f"{width}x{height}" if width and height else "Unknown", "bitrate": f"{bitrate} Kbit/s" if bitrate else "Unknown", "codec": codec_name }) elif codec_type == "audio": media_info["audio"].append({ "bitrate": f"{bitrate} Kbit/s" if bitrate else "Unknown", "channels": channels, "layout": channel_layout, "codec": codec_name }) return media_info except Exception as e: print(f"Error processing file {file_path}: {e}") return None # Function to list media files def list_media_files(directories): media_files = [] for directory in directories: if os.path.exists(directory): for root, _, files in os.walk(directory): for file in files: file_path = os.path.join(root, file) try: file_info = { "name": file, "path": file_path, "size": os.path.getsize(file_path) } media_files.append(file_info) except OSError as e: print(f"Error accessing file {file_path}: {e}") else: print(f"Directory does not exist: {directory}") return media_files def filter_media_files(media_files, allowed_formats): """ Фильтрует список файлов по допустимым форматам. :param media_files: Список файлов, полученный из list_media_files. :param allowed_formats: Список допустимых форматов, например [".mp3", ".jpg"]. :return: Отфильтрованный список файлов. """ filtered_files = [ file for file in media_files if any(file['name'].lower().endswith(fmt.lower()) for fmt in allowed_formats) ] return filtered_files def human_readable_size(size_bytes): if size_bytes < 1024: return f"{size_bytes} B" elif size_bytes < 1024**2: return f"{size_bytes / 1024:.2f} KB" elif size_bytes < 1024**3: return f"{size_bytes / 1024**2:.2f} MB" else: return f"{size_bytes / 1024**3:.2f} GB" @app.route('/') def index(): # Получаем список медиафайлов media_files = list_media_files(config.get("directories", [])) # Фильтруем медиафайлы по формату allowed_formats = config.get("allowed_formats", []) filtered_files = filter_media_files(media_files, allowed_formats) for file in filtered_files: file["size"] = human_readable_size(file["size"]) # file["info"] = get_media_info(file["path"]) file["info"] = get_media_info_with_ffprobe(file["path"]) return render_template('index.html', media_files=filtered_files) @app.route('/configure', methods=['GET', 'POST']) def configure(): if request.method == 'POST': directories = request.json.get('directories', []) config['directories'] = directories save_config(config) return jsonify({"status": "success"}) return jsonify(config) if __name__ == '__main__': app.run(debug=True)