Newer
Older
simple_task_manager / manager.py
import asyncio
import aiohttp
import json
import os
from flask import Flask, request, jsonify
from datetime import datetime
import threading

TASKS_FILE = "tasks.json"
LOG_FILE = "log.txt"
MAX_LOG_LINES = 100

app = Flask(__name__)
tasks = []  # список задач


class Task:
    def __init__(self, url: str, json_data: dict, run_at: str, created_at: str):
        self.url = url
        self.json_data = json_data
        self.run_at = datetime.strptime(run_at, "%Y-%m-%d %H:%M:%S")
        self.created_at = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S")
        self.executed = False

    def to_dict(self):
        return {
            "url": self.url,
            "json_data": self.json_data,
            "run_at": self.run_at.strftime("%Y-%m-%d %H:%M:%S"),
            "created_at": self.created_at.strftime("%Y-%m-%d %H:%M:%S"),
            "executed": self.executed
        }

    @staticmethod
    def from_dict(data):
        task = Task(
            url=data["url"],
            json_data=data["json_data"],
            run_at=data["run_at"],
            created_at=data["created_at"]
        )
        task.executed = data.get("executed", False)
        return task

    async def execute(self):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(self.url, json=self.json_data) as resp:
                    print(f"[{datetime.now()}] Executed task to {self.url}, status: {resp.status}")
            self.executed = True
            log_task(self)
        except Exception as e:
            print(f"[{datetime.now()}] Failed to execute task to {self.url}: {e}")


def save_tasks():
    with open(TASKS_FILE, "w") as f:
        json.dump([task.to_dict() for task in tasks if not task.executed], f, indent=2)


def load_tasks():
    if os.path.exists(TASKS_FILE):
        with open(TASKS_FILE, "r") as f:
            data = json.load(f)
            return [Task.from_dict(t) for t in data]
    return []


def log_task(task: Task):
    log_line = f"[{task.created_at.strftime('%Y-%m-%d %H:%M:%S')}]: {task.url}; {json.dumps(task.json_data)}; выполнено в {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
    try:
        if os.path.exists(LOG_FILE):
            with open(LOG_FILE, "r") as f:
                lines = f.readlines()
        else:
            lines = []

        lines.append(log_line)
        lines = lines[-MAX_LOG_LINES:]

        with open(LOG_FILE, "w") as f:
            f.writelines(lines)
    except Exception as e:
        print(f"[!] Ошибка записи лога: {e}")


@app.route('/schedule', methods=['POST'])
def schedule_task():
    try:
        data = request.get_json()
        url = data['url']
        json_data = data.get('json_data', {})
        run_at_str = data['run_at']  # "YYYY-MM-DD HH:MM:SS"

        # Проверка формата
        run_at = datetime.strptime(run_at_str, "%Y-%m-%d %H:%M:%S")
        created_at = datetime.now()

        task = Task(url, json_data, run_at_str, created_at.strftime("%Y-%m-%d %H:%M:%S"))
        tasks.append(task)
        save_tasks()

        return jsonify({"status": "scheduled", "run_at": run_at_str})
    except Exception as e:
        return jsonify({"error": str(e)}), 400


async def task_scheduler():
    while True:
        now = datetime.now()
        changed = False
        for task in tasks:
            if not task.executed and now >= task.run_at:
                await task.execute()
                changed = True
        if changed:
            save_tasks()
        await asyncio.sleep(1)


def start_async_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(task_scheduler())


if __name__ == '__main__':
    tasks = load_tasks()
    threading.Thread(target=start_async_loop, daemon=True).start()
    app.run(host='0.0.0.0', port=5000)