import logging import aiohttp from datetime import datetime, timedelta from config import CONFIG logger = logging.getLogger(__name__) class MarzbanAPI: def __init__(self, url: str, username: str, password: str): self.url = url.rstrip('/') self.username = username self.password = password self.token = None self.session = None async def init_session(self): self.session = aiohttp.ClientSession() async def close_session(self): if self.session: await self.session.close() async def login(self): async with self.session.post( f"{self.url}/api/admin/token", data={"username": self.username, "password": self.password} ) as resp: data = await resp.json() self.token = data["access_token"] return self.token async def _request(self, method: str, endpoint: str, **kwargs): if not self.token: await self.login() headers = {"Authorization": f"Bearer {self.token}"} url = f"{self.url}/api{endpoint}" logger.debug(f"Marzban Request: {method} {url} Payload: {kwargs.get('json')}") async with self.session.request( method, url, headers=headers, **kwargs ) as resp: data = await resp.json() logger.info(f"Marzban Response [{resp.status}]: {data}") if resp.status == 401: await self.login() headers = {"Authorization": f"Bearer {self.token}"} async with self.session.request( method, url, headers=headers, **kwargs ) as retry_resp: retry_data = await retry_resp.json() logger.info(f"Marzban Retry Response [{retry_resp.status}]: {retry_data}") return retry_data return data async def create_user(self, username: str, data_limit: int, expire_days: int, note: str = ""): if expire_days > 0: expire_timestamp = int((datetime.now() + timedelta(days=expire_days)).timestamp()) else: expire_timestamp = None payload = { "username": username, "proxies": { "vless": {} }, "inbounds": {}, "excluded_inbounds": {}, "data_limit": data_limit * 1024 * 1024 * 1024, "data_limit_reset_strategy": "month", "expire": expire_timestamp, "status": "active", "note": note } return await self._request("POST", "/user", json=payload) async def get_user(self, username: str): return await self._request("GET", f"/user/{username}") async def modify_user(self, username: str, data_limit: int, expire_days: int = None, status: str = "active", note: str = "", expire_timestamp: int = None): if expire_timestamp is not None: final_expire = expire_timestamp elif expire_days is not None and expire_days > 0: final_expire = int((datetime.now() + timedelta(days=expire_days)).timestamp()) else: final_expire = None payload = { "data_limit": data_limit * 1024 * 1024 * 1024, "data_limit_reset_strategy": "month", "expire": final_expire, "excluded_inbounds": {}, "status": status, "note": note } return await self._request("PUT", f"/user/{username}", json=payload) async def delete_user(self, username: str): return await self._request("DELETE", f"/user/{username}") async def get_system_stats(self): return await self._request("GET", "/system") async def get_users_stats(self): return await self._request("GET", "/users") async def reset_user_traffic(self, username: str): return await self._request("POST", f"/user/{username}/reset") marzban = MarzbanAPI( CONFIG["MARZBAN_URL"], CONFIG["MARZBAN_USERNAME"], CONFIG["MARZBAN_PASSWORD"] )