Files
marzban_tg_bot/marzban.py

117 lines
4.0 KiB
Python

import logging
import aiohttp
from datetime import datetime, timedelta
from config import CONFIG
logger = logging.getLogger(__name__)
class MarzbanAPI:
def __init__(self, url: str, username: str, password: str):
self.url = url.rstrip('/')
self.username = username
self.password = password
self.token = None
self.session = None
async def init_session(self):
self.session = aiohttp.ClientSession()
async def close_session(self):
if self.session:
await self.session.close()
async def login(self):
async with self.session.post(
f"{self.url}/api/admin/token",
data={"username": self.username, "password": self.password}
) as resp:
data = await resp.json()
self.token = data["access_token"]
return self.token
async def _request(self, method: str, endpoint: str, **kwargs):
if not self.token:
await self.login()
headers = {"Authorization": f"Bearer {self.token}"}
url = f"{self.url}/api{endpoint}"
logger.debug(f"Marzban Request: {method} {url} Payload: {kwargs.get('json')}")
async with self.session.request(
method, url, headers=headers, **kwargs
) as resp:
data = await resp.json()
logger.info(f"Marzban Response [{resp.status}]: {data}")
if resp.status == 401:
await self.login()
headers = {"Authorization": f"Bearer {self.token}"}
async with self.session.request(
method, url, headers=headers, **kwargs
) as retry_resp:
retry_data = await retry_resp.json()
logger.info(f"Marzban Retry Response [{retry_resp.status}]: {retry_data}")
return retry_data
return data
async def create_user(self, username: str, data_limit: int, expire_days: int, note: str = ""):
if expire_days > 0:
expire_timestamp = int((datetime.now() + timedelta(days=expire_days)).timestamp())
else:
expire_timestamp = None
payload = {
"username": username,
"proxies": {
"vless": {}
},
"inbounds": {},
"excluded_inbounds": {},
"data_limit": data_limit * 1024 * 1024 * 1024,
"data_limit_reset_strategy": "month",
"expire": expire_timestamp,
"status": "active",
"note": note
}
return await self._request("POST", "/user", json=payload)
async def get_user(self, username: str):
return await self._request("GET", f"/user/{username}")
async def modify_user(self, username: str, data_limit: int, expire_days: int = None, status: str = "active", note: str = "", expire_timestamp: int = None):
if expire_timestamp is not None:
final_expire = expire_timestamp
elif expire_days is not None and expire_days > 0:
final_expire = int((datetime.now() + timedelta(days=expire_days)).timestamp())
else:
final_expire = None
payload = {
"data_limit": data_limit * 1024 * 1024 * 1024,
"data_limit_reset_strategy": "month",
"expire": final_expire,
"excluded_inbounds": {},
"status": status,
"note": note
}
return await self._request("PUT", f"/user/{username}", json=payload)
async def delete_user(self, username: str):
return await self._request("DELETE", f"/user/{username}")
async def get_system_stats(self):
return await self._request("GET", "/system")
async def get_users_stats(self):
return await self._request("GET", "/users")
async def reset_user_traffic(self, username: str):
return await self._request("POST", f"/user/{username}/reset")
marzban = MarzbanAPI(
CONFIG["MARZBAN_URL"],
CONFIG["MARZBAN_USERNAME"],
CONFIG["MARZBAN_PASSWORD"]
)