Files
marzban_tg_bot/main.py
2026-01-08 19:36:50 +03:00

988 lines
40 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import json
import random
import string
from qrcode import QRCode
import io
from aiogram import Bot, Dispatcher, Router, F
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.types import (
Message, CallbackQuery, InlineKeyboardMarkup,
InlineKeyboardButton, LabeledPrice, PreCheckoutQuery,
BufferedInputFile
)
import aiohttp
import aiosqlite
from config import CONFIG, PLANS
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# FSM States
class InviteStates(StatesGroup):
waiting_for_code = State()
class PromoStates(StatesGroup):
waiting_for_promo = State()
creating_promo = State()
promo_code = State()
promo_discount = State()
promo_uses = State()
class BroadcastStates(StatesGroup):
waiting_for_message = State()
# Marzban API Client
class MarzbanAPI:
def __init__(self, url: str, username: str, password: str):
self.url = url.rstrip('/')
self.username = username
self.password = password
self.token = None
self.session = None
async def init_session(self):
self.session = aiohttp.ClientSession()
async def close_session(self):
if self.session:
await self.session.close()
async def login(self):
async with self.session.post(
f"{self.url}/api/admin/token",
data={"username": self.username, "password": self.password}
) as resp:
data = await resp.json()
self.token = data["access_token"]
return self.token
async def _request(self, method: str, endpoint: str, **kwargs):
if not self.token:
await self.login()
headers = {"Authorization": f"Bearer {self.token}"}
url = f"{self.url}/api{endpoint}"
logger.debug(f"Marzban Request: {method} {url} Payload: {kwargs.get('json')}")
async with self.session.request(
method, url, headers=headers, **kwargs
) as resp:
data = await resp.json()
logger.info(f"Marzban Response [{resp.status}]: {data}")
if resp.status == 401:
await self.login()
headers = {"Authorization": f"Bearer {self.token}"}
async with self.session.request(
method, url, headers=headers, **kwargs
) as retry_resp:
retry_data = await retry_resp.json()
logger.info(f"Marzban Retry Response [{retry_resp.status}]: {retry_data}")
return retry_data
return data
async def create_user(self, username: str, data_limit: int, expire_days: int):
expire_timestamp = int((datetime.now() + timedelta(days=expire_days)).timestamp())
payload = {
"username": username,
"proxies": {
"vless": {}
},
"inbounds": {}, # Разрешить все входящие
"excluded_inbounds": {}, # Ничего не исключать
"data_limit": data_limit * 1024 * 1024 * 1024, # GB to bytes
"expire": expire_timestamp,
"status": "active"
}
return await self._request("POST", "/user", json=payload)
async def get_user(self, username: str):
return await self._request("GET", f"/user/{username}")
async def modify_user(self, username: str, data_limit: int, expire_days: int):
expire_timestamp = int((datetime.now() + timedelta(days=expire_days)).timestamp())
payload = {
"data_limit": data_limit * 1024 * 1024 * 1024,
"expire": expire_timestamp,
"excluded_inbounds": {}, # Снимаем ограничения при продлении
"status": "active"
}
return await self._request("PUT", f"/user/{username}", json=payload)
async def delete_user(self, username: str):
return await self._request("DELETE", f"/user/{username}")
async def get_system_stats(self):
return await self._request("GET", "/system")
async def get_users_stats(self):
return await self._request("GET", "/users")
# Database Manager
class Database:
def __init__(self, url: Optional[str]):
self.url = url
self.is_sqlite = not url or not url.startswith("postgresql://")
self.pool = None
self.conn = None # For SQLite
async def init_pool(self):
if self.is_sqlite:
db_path = "bot.db"
self.conn = await aiosqlite.connect(db_path)
self.conn.row_factory = aiosqlite.Row
logger.info(f"Using SQLite database: {db_path}")
else:
self.pool = await asyncpg.create_pool(self.url)
logger.info("Using PostgreSQL database")
await self.create_tables()
async def execute(self, query: str, *args):
if self.is_sqlite:
query = query.replace("$1", "?").replace("$2", "?").replace("$3", "?").replace("$4", "?")
async with self.conn.execute(query, args) as cursor:
await self.conn.commit()
return cursor
else:
async with self.pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetchrow(self, query: str, *args):
if self.is_sqlite:
query = query.replace("$1", "?").replace("$2", "?").replace("$3", "?").replace("$4", "?")
async with self.conn.execute(query, args) as cursor:
return await cursor.fetchone()
else:
async with self.pool.acquire() as conn:
return await conn.fetchrow(query, *args)
async def fetchval(self, query: str, *args):
if self.is_sqlite:
query = query.replace("$1", "?").replace("$2", "?").replace("$3", "?").replace("$4", "?")
async with self.conn.execute(query, args) as cursor:
row = await cursor.fetchone()
return row[0] if row else None
else:
async with self.pool.acquire() as conn:
return await conn.fetchval(query, *args)
async def fetch(self, query: str, *args):
if self.is_sqlite:
query = query.replace("$1", "?").replace("$2", "?").replace("$3", "?").replace("$4", "?")
async with self.conn.execute(query, args) as cursor:
return await cursor.fetchall()
else:
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def create_tables(self):
now_default = "CURRENT_TIMESTAMP" if self.is_sqlite else "NOW()"
serial_type = "INTEGER PRIMARY KEY AUTOINCREMENT" if self.is_sqlite else "SERIAL PRIMARY KEY"
queries = [
f"""CREATE TABLE IF NOT EXISTS users (
user_id BIGINT PRIMARY KEY,
username TEXT,
marzban_username TEXT UNIQUE,
subscription_until TIMESTAMP,
data_limit INTEGER,
invited_by BIGINT,
created_at TIMESTAMP DEFAULT {now_default}
)""",
f"""CREATE TABLE IF NOT EXISTS invite_codes (
code TEXT PRIMARY KEY,
created_by BIGINT,
used_by BIGINT,
used_at TIMESTAMP,
created_at TIMESTAMP DEFAULT {now_default}
)""",
f"""CREATE TABLE IF NOT EXISTS promo_codes (
code TEXT PRIMARY KEY,
discount INTEGER,
uses_left INTEGER,
created_by BIGINT,
created_at TIMESTAMP DEFAULT {now_default}
)""",
f"""CREATE TABLE IF NOT EXISTS payments (
id {serial_type},
user_id BIGINT,
plan TEXT,
amount INTEGER,
promo_code TEXT,
paid_at TIMESTAMP DEFAULT {now_default}
)"""
]
for q in queries:
await self.execute(q)
async def get_user(self, user_id: int):
return await self.fetchrow("SELECT * FROM users WHERE user_id = $1", user_id)
async def create_user(self, user_id: int, username: str, marzban_username: str, invited_by: int = None):
await self.execute(
"INSERT INTO users (user_id, username, marzban_username, invited_by) VALUES ($1, $2, $3, $4)",
user_id, username, marzban_username, invited_by
)
async def update_subscription(self, user_id: int, days: int, data_limit: int):
user = await self.get_user(user_id)
# SQLite returns Row object, datetime handling might need care
sub_until = user['subscription_until']
if isinstance(sub_until, str): # SQLite might return it as string
sub_until = datetime.strptime(sub_until, '%Y-%m-%d %H:%M:%S') if '.' not in sub_until else datetime.strptime(sub_until, '%Y-%m-%d %H:%M:%S.%f')
if user and sub_until and sub_until > datetime.now():
new_date = sub_until + timedelta(days=days)
else:
new_date = datetime.now() + timedelta(days=days)
await self.execute(
"UPDATE users SET subscription_until = $1, data_limit = $2 WHERE user_id = $3",
new_date, data_limit, user_id
)
async def create_invite_code(self, created_by: int):
code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
await self.execute(
"INSERT INTO invite_codes (code, created_by) VALUES ($1, $2)",
code, created_by
)
return code
async def use_invite_code(self, code: str, user_id: int):
invite = await self.fetchrow(
"SELECT * FROM invite_codes WHERE code = $1 AND used_by IS NULL",
code
)
if not invite:
return False
now_val = datetime.now()
await self.execute(
"UPDATE invite_codes SET used_by = $1, used_at = $2 WHERE code = $3",
user_id, now_val, code
)
return invite['created_by']
async def create_promo_code(self, code: str, discount: int, uses: int, created_by: int):
await self.execute(
"INSERT INTO promo_codes (code, discount, uses_left, created_by) VALUES ($1, $2, $3, $4)",
code, discount, uses, created_by
)
async def get_promo_code(self, code: str):
return await self.fetchrow(
"SELECT * FROM promo_codes WHERE code = $1 AND uses_left > 0",
code
)
async def decrement_promo_usage(self, code: str):
await self.execute(
"UPDATE promo_codes SET uses_left = uses_left - 1 WHERE code = $1",
code
)
async def add_payment(self, user_id: int, plan: str, amount: int, promo_code: str = None):
await self.execute(
"INSERT INTO payments (user_id, plan, amount, promo_code) VALUES ($1, $2, $3, $4)",
user_id, plan, amount, promo_code
)
async def get_all_users(self):
return await self.fetch("SELECT user_id FROM users")
async def get_stats(self):
now_expr = "CURRENT_TIMESTAMP" if self.is_sqlite else "NOW()"
total = await self.fetchval("SELECT COUNT(*) FROM users")
active = await self.fetchval(
f"SELECT COUNT(*) FROM users WHERE subscription_until > {now_expr}"
)
revenue = await self.fetchval("SELECT SUM(amount) FROM payments")
return {"total": total, "active": active, "revenue": revenue or 0}
# Initialize
bot = Bot(token=CONFIG["BOT_TOKEN"])
storage = MemoryStorage()
dp = Dispatcher(storage=storage)
router = Router()
dp.include_router(router)
marzban = MarzbanAPI(CONFIG["MARZBAN_URL"], CONFIG["MARZBAN_USERNAME"], CONFIG["MARZBAN_PASSWORD"])
db = Database(CONFIG["DATABASE_URL"])
# Keyboards
def main_keyboard(is_admin: bool = False):
buttons = [
[InlineKeyboardButton(text="📊 Моя подписка", callback_data="my_subscription")],
[InlineKeyboardButton(text="💎 Купить подписку", callback_data="buy_subscription")],
[InlineKeyboardButton(text="🎟️ Использовать промокод", callback_data="use_promo")],
[InlineKeyboardButton(text=" Помощь", callback_data="help")],
]
if is_admin:
buttons.append([InlineKeyboardButton(text="👑 Админ-панель", callback_data="admin_panel")])
return InlineKeyboardMarkup(inline_keyboard=buttons)
def admin_keyboard():
return InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="📊 Статистика", callback_data="admin_stats")],
[InlineKeyboardButton(text="🎟️ Создать промокод", callback_data="admin_create_promo")],
[InlineKeyboardButton(text="👥 Создать инвайт", callback_data="admin_create_invite")],
[InlineKeyboardButton(text="📢 Рассылка", callback_data="admin_broadcast")],
[InlineKeyboardButton(text="🖥️ Статистика сервера", callback_data="admin_server_stats")],
[InlineKeyboardButton(text="◀️ Назад", callback_data="back_to_main")],
])
def plans_keyboard():
buttons = []
for plan_id, plan_data in PLANS.items():
buttons.append([InlineKeyboardButton(
text=f"{plan_data['name']} - {plan_data['price']}",
callback_data=f"plan_{plan_id}"
)])
buttons.append([InlineKeyboardButton(text="◀️ Назад", callback_data="back_to_main")])
return InlineKeyboardMarkup(inline_keyboard=buttons)
# Handlers
@router.message(Command("start"))
async def cmd_start(message: Message, state: FSMContext):
user_id = message.from_user.id
user = await db.get_user(user_id)
# Автоматическая регистрация админа
if not user and user_id in CONFIG["ADMIN_IDS"]:
marzban_username = f"user_{user_id}"
await db.create_user(user_id, message.from_user.username, marzban_username, invited_by=None)
user = await db.get_user(user_id)
await message.answer("✅ Администратор автоматически зарегистрирован.")
if user:
is_admin = user_id in CONFIG["ADMIN_IDS"]
await message.answer(
f"Привет, {message.from_user.first_name}! 👋\n\n"
"Выберите действие:",
reply_markup=main_keyboard(is_admin)
)
else:
await message.answer(
"👋 Добро пожаловать!\n\n"
"Для использования бота необходим инвайт-код.\n"
"Введите ваш инвайт-код:"
)
await state.set_state(InviteStates.waiting_for_code)
@router.message(InviteStates.waiting_for_code)
async def process_invite_code(message: Message, state: FSMContext):
code = message.text.strip().upper()
invited_by = await db.use_invite_code(code, message.from_user.id)
if not invited_by:
await message.answer("❌ Неверный или использованный инвайт-код. Попробуйте снова:")
return
marzban_username = f"user_{message.from_user.id}"
await db.create_user(message.from_user.id, message.from_user.username, marzban_username, invited_by)
is_admin = message.from_user.id in CONFIG["ADMIN_IDS"]
await message.answer(
"✅ Регистрация успешна!\n\n"
"Теперь вы можете приобрести подписку.",
reply_markup=main_keyboard(is_admin)
)
await state.clear()
@router.callback_query(F.data == "my_subscription")
async def show_subscription(callback: CallbackQuery):
user = await db.get_user(callback.from_user.id)
sub_until = user['subscription_until']
if isinstance(sub_until, str):
try:
sub_until = datetime.strptime(sub_until, '%Y-%m-%d %H:%M:%S')
except ValueError:
# Try with microseconds if previous format fails
sub_until = datetime.strptime(sub_until, '%Y-%m-%d %H:%M:%S.%f')
if not sub_until or sub_until < datetime.now():
await callback.message.edit_text(
"У вас нет активной подписки.\n\n"
"Приобретите подписку, чтобы начать пользоваться VPN.",
reply_markup=main_keyboard(callback.from_user.id in CONFIG["ADMIN_IDS"])
)
return
try:
marzban_user = await marzban.get_user(user['marzban_username'])
# Если пользователя нет в панели, но подписка активна - пробуем создать заново
if isinstance(marzban_user, dict) and marzban_user.get('detail') == 'User not found':
logger.warning(f"User {user['marzban_username']} not found in Marzban, restoring...")
await marzban.create_user(
user['marzban_username'],
user['data_limit'] or 50, # Лимит из базы или 50 по умолчанию
30 # Срок не важен, он обновится при следующей оплате
)
marzban_user = await marzban.get_user(user['marzban_username'])
used_traffic = marzban_user.get('used_traffic', 0) / (1024**3)
sub_url = marzban_user.get('subscription_url', 'Генерируется...')
if sub_url and sub_url.startswith('/'):
# Используем BASE_URL если он есть, иначе MARZBAN_URL
base = CONFIG.get('BASE_URL') or CONFIG['MARZBAN_URL']
sub_url = f"{base.rstrip('/')}{sub_url}"
info_text = (
f"📊 Ваша подписка:\n\n"
f"⏰ Действует до: {sub_until.strftime('%d.%m.%Y %H:%M')}\n"
f"📦 Лимит трафика: {user['data_limit']} ГБ\n"
f"📊 Использовано: {used_traffic:.2f} ГБ\n\n"
f"🎫 Ссылка на подписку (рекомендуется):\n"
f"`{sub_url}`"
)
except Exception as e:
logger.error(f"Error getting user info: {e}")
info_text = "⚠️ Ошибка получения данных. Попробуйте позже."
try:
# Генерация QR-кода
qr = QRCode(version=1, box_size=10, border=5)
qr.add_data(sub_url)
qr.make(fit=True)
qr_img = qr.make_image(fill_color="black", back_color="white")
# Сохранение в буфер
img_buffer = io.BytesIO()
qr_img.save(img_buffer)
img_buffer.seek(0)
qr_file = BufferedInputFile(img_buffer.getvalue(), filename="subscription_qr.png")
# Если это текстовое сообщение - удаляем и шлем фото
# Если это уже фото - пробуем edit_media (но проще удалить и прислать новое для чистоты)
await callback.message.delete()
await callback.message.answer_photo(
photo=qr_file,
caption=info_text,
reply_markup=main_keyboard(callback.from_user.id in CONFIG["ADMIN_IDS"]),
parse_mode="Markdown"
)
except Exception as e:
logger.error(f"Error sending subscription photo: {e}")
# В случае ошибки - шлем текст как раньше
await callback.message.answer(info_text, reply_markup=main_keyboard(callback.from_user.id in CONFIG["ADMIN_IDS"]), parse_mode="Markdown")
@router.callback_query(F.data == "buy_subscription")
async def show_plans(callback: CallbackQuery):
text = (
"💎 Выберите тарифный план:\n\n"
"Все планы включают:\n"
"• Безлимитная скорость\n"
"• Поддержка всех устройств\n"
"• Техподдержка 24/7"
)
kb = plans_keyboard()
if callback.message.photo:
await callback.message.delete()
await callback.message.answer(text, reply_markup=kb)
else:
await callback.message.edit_text(text, reply_markup=kb)
@router.callback_query(F.data.startswith("plan_"))
async def process_plan_selection(callback: CallbackQuery, state: FSMContext):
plan_id = callback.data.replace("plan_", "", 1)
plan = PLANS[plan_id]
await state.update_data(selected_plan=plan_id)
data = await state.get_data()
# Если промокод уже активирован
if 'promo_code' in data and 'discount' in data:
discount = data['discount']
new_price = int(plan['price'] * (100 - discount) / 100)
await state.update_data(final_price=new_price)
await callback.message.edit_text(
f"Вы выбрали: {plan['name']}\n"
f"Цена без скидки: {plan['price']}\n"
f"✅ Применен промокод: {discount}%\n"
f"Итого к оплате: {new_price}\n\n"
f"📦 Трафик: {plan['data_limit']} ГБ\n"
f"⏰ Период: {plan['days']} дней",
reply_markup=InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="💳 Оплатить", callback_data="pay_now")],
[InlineKeyboardButton(text="❌ Сбросить промокод", callback_data="reset_promo")],
[InlineKeyboardButton(text="◀️ Назад", callback_data="buy_subscription")],
])
)
else:
await callback.message.edit_text(
f"Вы выбрали: {plan['name']}\n"
f"Стоимость: {plan['price']}\n\n"
f"📦 Трафик: {plan['data_limit']} ГБ\n"
f"⏰ Период: {plan['days']} дней\n\n"
"Есть промокод на скидку?",
reply_markup=InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="✅ Ввести промокод", callback_data="enter_promo")],
[InlineKeyboardButton(text="💳 Оплатить", callback_data="pay_now")],
[InlineKeyboardButton(text="◀️ Назад", callback_data="buy_subscription")],
])
)
@router.callback_query(F.data == "enter_promo")
async def ask_promo(callback: CallbackQuery, state: FSMContext):
await callback.message.edit_text("Введите промокод:")
await state.set_state(PromoStates.waiting_for_promo)
@router.message(PromoStates.waiting_for_promo)
async def process_promo(message: Message, state: FSMContext):
promo_code = message.text.strip().upper()
promo = await db.get_promo_code(promo_code)
if promo:
discount = promo['discount']
await state.update_data(promo_code=promo_code, discount=discount)
data = await state.get_data()
# Если план уже выбран (переход из покупки)
if 'selected_plan' in data:
plan_id = data['selected_plan']
plan = PLANS[plan_id]
new_price = int(plan['price'] * (100 - discount) / 100)
await state.update_data(final_price=new_price)
await message.answer(
f"✅ Промокод применен! Скидка: {discount}%\n"
f"Новая цена: {new_price}",
reply_markup=InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="💳 Оплатить", callback_data="pay_now")],
[InlineKeyboardButton(text="◀️ Назад", callback_data="buy_subscription")],
])
)
# Если план не выбран (переход из главного меню)
else:
await message.answer(
f"✅ Промокод активирован! Скидка: {discount}%\n"
"Теперь выберите тариф:",
reply_markup=plans_keyboard()
)
else:
await message.answer(
"❌ Промокод недействителен или исчерпан.",
reply_markup=InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🔄 Попробовать снова", callback_data="enter_promo")],
[InlineKeyboardButton(text="◀️ Отмена", callback_data="back_to_main")],
])
)
await state.set_state(None)
@router.callback_query(F.data == "reset_promo")
async def reset_promo(callback: CallbackQuery, state: FSMContext):
await state.update_data(promo_code=None, discount=None, final_price=None)
# Перезагружаем выбор плана
data = await state.get_data()
if 'selected_plan' in data:
# Имитируем повторный выбор плана для обновления текста
callback.data = f"plan_{data['selected_plan']}"
await process_plan_selection(callback, state)
else:
await back_to_main(callback)
async def grant_subscription(user_id: int, plan_id: str, promo_code: str, amount: int):
plan = PLANS[plan_id]
user = await db.get_user(user_id)
sub_until = user['subscription_until']
if isinstance(sub_until, str):
try:
sub_until = datetime.strptime(sub_until, '%Y-%m-%d %H:%M:%S')
except ValueError:
sub_until = datetime.strptime(sub_until, '%Y-%m-%d %H:%M:%S.%f')
# Marzban
try:
marzban_username = user['marzban_username']
resp = None
if sub_until and sub_until > datetime.now():
logger.info(f"Attempting to modify existing user: {marzban_username}")
resp = await marzban.modify_user(
marzban_username,
plan['data_limit'],
plan['days']
)
# Если пользователь не найден в Marzban (хотя в БД бота он есть)
if isinstance(resp, dict) and resp.get('detail') == 'User not found':
logger.info(f"User {marzban_username} missing in Marzban, re-creating...")
resp = await marzban.create_user(
marzban_username,
plan['data_limit'],
plan['days']
)
else:
logger.info(f"Creating/Reactivating user: {marzban_username}")
resp = await marzban.create_user(
marzban_username,
plan['data_limit'],
plan['days']
)
except Exception as e:
logger.error(f"Marzban error in grant_subscription: {e}")
# DB
await db.update_subscription(user_id, plan['days'], plan['data_limit'])
await db.add_payment(
user_id,
plan_id,
amount,
promo_code
)
if promo_code:
await db.decrement_promo_usage(promo_code)
return plan
@router.callback_query(F.data == "pay_now")
async def process_payment(callback: CallbackQuery, state: FSMContext):
data = await state.get_data()
plan_id = data.get('selected_plan')
if not plan_id:
await callback.answer("❌ Сессия истекла. Пожалуйста, выберите тариф снова.", show_alert=True)
await show_plans(callback)
return
plan = PLANS[plan_id]
final_price = int(data.get('final_price', plan['price']))
promo_code = data.get('promo_code')
if final_price <= 0:
await grant_subscription(callback.from_user.id, plan_id, promo_code, 0)
await callback.message.edit_text(
f"✅ Подписка активирована бесплатно!\n\n"
f"План: {plan['name']}\n"
f"Срок: {plan['days']} дней\n"
f"Трафик: {plan['data_limit']} ГБ\n\n"
f"Настройте подключение в меню: 📊 Моя подписка",
reply_markup=main_keyboard(callback.from_user.id in CONFIG["ADMIN_IDS"])
)
await state.clear()
return
# Создаем инвойс для Telegram Stars
await callback.message.answer_invoice(
title=f"Подписка VPN - {plan['name']}",
description=f"Трафик: {plan['data_limit']} ГБ на {plan['days']} дней",
payload=f"{plan_id}:{data.get('promo_code', '')}",
currency="XTR", # Telegram Stars
prices=[LabeledPrice(label=plan['name'], amount=final_price)],
reply_markup=InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="💳 Оплатить", pay=True)]
])
)
@router.pre_checkout_query()
async def pre_checkout_handler(pre_checkout_query: PreCheckoutQuery):
await pre_checkout_query.answer(ok=True)
@router.message(F.successful_payment)
async def successful_payment(message: Message):
payment = message.successful_payment
plan_id, promo_code = payment.invoice_payload.split(":")
if not promo_code:
promo_code = None
plan = await grant_subscription(
message.from_user.id,
plan_id,
promo_code,
payment.total_amount
)
await message.answer(
f"✅ Оплата успешна!\n\n"
f"Ваша подписка активирована на {plan['days']} дней.\n"
f"Трафик: {plan['data_limit']} ГБ\n\n"
f"Получите конфигурацию через: 📊 Моя подписка",
reply_markup=main_keyboard(message.from_user.id in CONFIG["ADMIN_IDS"])
)
# Admin handlers
@router.callback_query(F.data == "admin_panel")
async def admin_panel(callback: CallbackQuery):
if callback.from_user.id not in CONFIG["ADMIN_IDS"]:
await callback.answer("❌ Доступ запрещен", show_alert=True)
return
text = "👑 Панель администратора"
kb = admin_keyboard()
if callback.message.photo:
await callback.message.delete()
await callback.message.answer(text, reply_markup=kb)
else:
await callback.message.edit_text(text, reply_markup=kb)
@router.callback_query(F.data == "admin_stats")
async def admin_stats(callback: CallbackQuery):
if callback.from_user.id not in CONFIG["ADMIN_IDS"]:
await callback.answer("❌ Доступ запрещен", show_alert=True)
return
stats = await db.get_stats()
text = (
"📊 Статистика бота:\n\n"
f"👥 Всего пользователей: {stats['total']}\n"
f"✅ Активных подписок: {stats['active']}\n"
f"💰 Общая выручка: {stats['revenue']}\n"
)
await callback.message.edit_text(text, reply_markup=admin_keyboard())
@router.callback_query(F.data == "admin_server_stats")
async def admin_server_stats(callback: CallbackQuery):
if callback.from_user.id not in CONFIG["ADMIN_IDS"]:
await callback.answer("❌ Доступ запрещен", show_alert=True)
return
try:
system_stats = await marzban.get_system_stats()
users_stats = await marzban.get_users_stats()
text = (
"🖥️ Статистика сервера:\n\n"
f"📊 CPU: {system_stats.get('cpu_usage', 'N/A')}%\n"
f"💾 RAM: {system_stats.get('mem_used', 0) / (1024**3):.2f} / "
f"{system_stats.get('mem_total', 0) / (1024**3):.2f} ГБ\n"
f"💿 Диск: {system_stats.get('disk_used', 0) / (1024**3):.2f} / "
f"{system_stats.get('disk_total', 0) / (1024**3):.2f} ГБ\n\n"
f"👥 Пользователей в Marzban: {users_stats.get('total', 0)}\n"
f"✅ Активных: {users_stats.get('active', 0)}\n"
)
except Exception as e:
text = f"⚠️ Ошибка получения статистики: {str(e)}"
await callback.message.edit_text(text, reply_markup=admin_keyboard())
@router.callback_query(F.data == "admin_create_invite")
async def admin_create_invite(callback: CallbackQuery):
if callback.from_user.id not in CONFIG["ADMIN_IDS"]:
await callback.answer("❌ Доступ запрещен", show_alert=True)
return
code = await db.create_invite_code(callback.from_user.id)
await callback.answer(f"✅ Инвайт-код создан: {code}", show_alert=True)
@router.callback_query(F.data == "admin_create_promo")
async def admin_create_promo_start(callback: CallbackQuery, state: FSMContext):
if callback.from_user.id not in CONFIG["ADMIN_IDS"]:
await callback.answer("❌ Доступ запрещен", show_alert=True)
return
await callback.message.edit_text(
"Создание промокода\n\n"
"Введите код промокода (или 'авто' для генерации):"
)
await state.set_state(PromoStates.promo_code)
@router.message(PromoStates.promo_code)
async def admin_promo_code(message: Message, state: FSMContext):
code = message.text.strip().upper()
if code == 'АВТО' or code == 'AUTO':
code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8))
await state.update_data(promo_code=code)
await message.answer("Введите размер скидки (%):")
await state.set_state(PromoStates.promo_discount)
@router.message(PromoStates.promo_discount)
async def admin_promo_discount(message: Message, state: FSMContext):
try:
discount = int(message.text.strip())
if discount < 1 or discount > 100:
await message.answer("❌ Скидка должна быть от 1 до 100%. Попробуйте снова:")
return
await state.update_data(discount=discount)
await message.answer("Введите количество использований:")
await state.set_state(PromoStates.promo_uses)
except ValueError:
await message.answer("❌ Введите число. Попробуйте снова:")
@router.message(PromoStates.promo_uses)
async def admin_promo_uses(message: Message, state: FSMContext):
try:
uses = int(message.text.strip())
if uses < 1:
await message.answer("❌ Минимум 1 использование. Попробуйте снова:")
return
data = await state.get_data()
await db.create_promo_code(
data['promo_code'],
data['discount'],
uses,
message.from_user.id
)
await message.answer(
f"✅ Промокод создан!\n\n"
f"Код: `{data['promo_code']}`\n"
f"Скидка: {data['discount']}%\n"
f"Использований: {uses}",
reply_markup=admin_keyboard(),
parse_mode="Markdown"
)
await state.clear()
except ValueError:
await message.answer("❌ Введите число. Попробуйте снова:")
@router.callback_query(F.data == "back_to_main")
async def back_to_main(callback: CallbackQuery):
is_admin = callback.from_user.id in CONFIG["ADMIN_IDS"]
text = "Главное меню:"
kb = main_keyboard(is_admin)
if callback.message.photo:
await callback.message.delete()
await callback.message.answer(text, reply_markup=kb)
else:
await callback.message.edit_text(text, reply_markup=kb)
@router.callback_query(F.data == "use_promo")
async def use_promo_callback(callback: CallbackQuery, state: FSMContext):
text = "Введите промокод для активации скидки:"
kb = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="◀️ Отмена", callback_data="back_to_main")]
])
if callback.message.photo:
await callback.message.delete()
await callback.message.answer(text, reply_markup=kb)
else:
await callback.message.edit_text(text, reply_markup=kb)
await state.set_state(PromoStates.waiting_for_promo)
@router.callback_query(F.data == "help")
async def help_handler(callback: CallbackQuery):
help_text = (
" Помощь по использованию бота:\n\n"
"📱 Как подключиться:\n"
"1. Купите подписку\n"
"2. Получите ссылку конфигурации\n"
"3. Скопируйте ссылку\n"
"4. Вставьте в VPN-клиент\n\n"
"📲 Рекомендуемые клиенты:\n"
"• iOS: Shadowrocket, V2Box\n"
"• Android: V2rayNG, NekoBox\n"
"• Windows: v2rayN, Nekoray\n"
"• macOS: V2rayU, ClashX\n\n"
"❓ Возникли проблемы?\n"
"Напишите администратору: @hoshimach1"
)
kb = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="◀️ Назад", callback_data="back_to_main")]
])
if callback.message.photo:
await callback.message.delete()
await callback.message.answer(help_text, reply_markup=kb)
else:
await callback.message.edit_text(help_text, reply_markup=kb)
@router.callback_query(F.data == "admin_broadcast")
async def admin_broadcast_start(callback: CallbackQuery, state: FSMContext):
if callback.from_user.id not in CONFIG["ADMIN_IDS"]:
await callback.answer("❌ Доступ запрещен", show_alert=True)
return
await callback.message.edit_text(
"📢 Рассылка сообщений\n\n"
"Отправьте сообщение, которое хотите разослать всем пользователям:"
)
await state.set_state(BroadcastStates.waiting_for_message)
@router.message(BroadcastStates.waiting_for_message)
async def admin_broadcast_send(message: Message, state: FSMContext):
if message.from_user.id not in CONFIG["ADMIN_IDS"]:
return
users = await db.get_all_users()
success_count = 0
fail_count = 0
status_msg = await message.answer("📤 Начинаю рассылку...")
for user in users:
try:
await bot.copy_message(
chat_id=user['user_id'],
from_chat_id=message.chat.id,
message_id=message.message_id
)
success_count += 1
await asyncio.sleep(0.05) # Защита от rate limit
except Exception as e:
fail_count += 1
logger.error(f"Broadcast error for user {user['user_id']}: {e}")
await status_msg.edit_text(
f"✅ Рассылка завершена!\n\n"
f"Успешно: {success_count}\n"
f"Ошибок: {fail_count}",
reply_markup=admin_keyboard()
)
await state.clear()
# Команда для получения своего ID
@router.message(Command("myid"), StateFilter("*"))
async def cmd_myid(message: Message):
await message.answer(f"Ваш Telegram ID: `{message.from_user.id}`", parse_mode="Markdown")
# Обработка неизвестных команд
@router.message()
async def unknown_message(message: Message, state: FSMContext):
current_state = await state.get_state()
if current_state is None:
user = await db.get_user(message.from_user.id)
if user:
is_admin = message.from_user.id in CONFIG["ADMIN_IDS"]
await message.answer(
"Используйте кнопки меню для навигации:",
reply_markup=main_keyboard(is_admin)
)
else:
await message.answer(
"Для использования бота начните с команды /start"
)
# Main function
async def main():
# Инициализация
await db.init_pool()
await marzban.init_session()
logger.info("Bot started successfully!")
try:
await dp.start_polling(bot)
finally:
await marzban.close_session()
await bot.session.close()
if __name__ == "__main__":
asyncio.run(main())