Files
marzban_tg_bot/server.py
2026-01-11 07:07:32 +03:00

516 lines
18 KiB
Python

from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
import aiohttp
from pydantic import BaseModel
from datetime import datetime, timedelta
import logging
import json
from database import db
from config import PLANS, CONFIG
from marzban import marzban
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("server")
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def startup():
await db.connect()
logger.info("Database connected")
@app.get("/api/plans")
async def get_plans():
plans_list = []
# Only return first 3 plans for the shop
visible_plans = list(PLANS.items())[:3]
for pid, p in visible_plans:
plans_list.append({
"id": pid,
**p
})
return plans_list
from aiogram.types import LabeledPrice
class BuyPlanRequest(BaseModel):
user_id: int
plan_id: str
promo_code: str = None
class PromoCheckRequest(BaseModel):
code: str
@app.post("/api/check-promo")
async def check_promo_code(req: PromoCheckRequest):
promo = await db.get_promo_code(req.code)
if not promo:
return JSONResponse(status_code=404, content={"error": "Invalid or expired promo code"})
return {
"code": promo["code"],
"discount": promo["discount"],
"bonus_days": promo["bonus_days"],
"is_unlimited": promo["is_unlimited"],
"description": f"Discount {promo['discount']}%" + (f" + {promo['bonus_days']} Days" if promo['bonus_days'] else "")
}
@app.post("/api/create-invoice")
async def create_invoice(req: BuyPlanRequest, request: Request):
bot = getattr(request.app.state, "bot", None)
if not bot:
return JSONResponse(status_code=500, content={"error": "Bot instance not initialized"})
plan = PLANS.get(req.plan_id)
if not plan:
return JSONResponse(status_code=404, content={"error": "Plan not found"})
price = plan['price']
# Validating Promo
discount = 0
if req.promo_code:
promo = await db.get_promo_code(req.promo_code)
if promo:
discount = promo['discount']
final_price = int(price * (100 - discount) / 100)
if final_price < 1: final_price = 1
try:
limit_desc = f"{plan['data_limit']}GB" if plan['data_limit'] > 0 else "Unlimited"
invoice_link = await bot.create_invoice_link(
title=f"Sub: {plan['name']}",
description=f"{limit_desc} / {plan['days']} days",
payload=f"{req.plan_id}:{req.promo_code or ''}",
provider_token="", # Empty for Stars
currency="XTR",
prices=[LabeledPrice(label=plan['name'], amount=final_price)]
)
return {"invoice_link": invoice_link}
except Exception as e:
logger.error(f"Error generating invoice: {e}")
return JSONResponse(status_code=500, content={"error": str(e)})
@app.get("/api/user/{user_id}")
async def get_user_stats(user_id: int):
user = await db.get_user(user_id)
if not user:
return JSONResponse(status_code=404, content={"error": "User not found"})
sub_until = user['subscription_until']
days_left = 0
status = "Inactive"
expire_str = "No active subscription"
if sub_until:
if isinstance(sub_until, str):
try:
sub_until = datetime.fromisoformat(sub_until)
except:
pass
if isinstance(sub_until, datetime):
expire_str = sub_until.strftime("%d.%m.%Y")
if sub_until > datetime.now():
delta = sub_until - datetime.now()
days_left = delta.days
status = "Active"
else:
status = "Expired"
# Fetch detailed stats from Marzban
sub_url = ""
used_traffic = 0
if user['marzban_username']:
try:
m_user = await marzban.get_user(user['marzban_username'])
# Check for error in response
if isinstance(m_user, dict) and not m_user.get('detail'):
used_traffic = m_user.get('used_traffic', 0)
sub_url = m_user.get('subscription_url', "")
# Fix relative URL
if sub_url and sub_url.startswith('/'):
base = CONFIG.get('BASE_URL') or CONFIG['MARZBAN_URL']
sub_url = f"{base.rstrip('/')}{sub_url}"
except Exception as e:
logger.error(f"Marzban fetch error: {e}")
# Registration Date
created_at = user['created_at']
reg_date = "Unknown"
if created_at:
if isinstance(created_at, str):
try:
created_at = datetime.fromisoformat(created_at)
except:
pass
if isinstance(created_at, datetime):
reg_date = created_at.strftime("%d.%m.%Y")
# Referral and Payment Stats
ref_count = await db.get_referrals_count(user_id)
payments_info = await db.get_user_payments_info(user_id)
return {
"status": status,
"days_left": days_left,
"expire_date": expire_str,
"data_limit_gb": user['data_limit'] or 0,
"used_traffic_gb": round(used_traffic / (1024**3), 2),
"plan": "Premium",
"subscription_url": sub_url,
"username": user['username'],
"marzban_username": user['marzban_username'],
"photo_url": f"/api/user-photo/{user_id}",
"reg_date": reg_date,
"referrals_count": ref_count,
"total_payments": payments_info["total_count"],
"total_spent": payments_info["total_amount"],
"is_admin": is_admin(user_id)
}
@app.get("/api/user-photo/{user_id}")
async def get_user_photo(user_id: int):
# This is a proxy to get TG photo without leaking BOT_TOKEN
async with aiohttp.ClientSession() as session:
# 1. Get user profile photos
get_photos_url = f"https://api.telegram.org/bot{CONFIG['BOT_TOKEN']}/getUserProfilePhotos?user_id={user_id}&limit=1"
async with session.get(get_photos_url) as resp:
if resp.status != 200:
return Response(status_code=404)
data = await resp.json()
if not data.get('ok') or not data['result']['photos']:
return Response(status_code=404)
file_id = data['result']['photos'][0][0]['file_id']
# 2. Get file path
get_file_url = f"https://api.telegram.org/bot{CONFIG['BOT_TOKEN']}/getFile?file_id={file_id}"
async with session.get(get_file_url) as resp:
if resp.status != 200:
return Response(status_code=404)
file_data = await resp.json()
if not file_data.get('ok'):
return Response(status_code=404)
file_path = file_data['result']['file_path']
# 3. Download and stream the file
file_url = f"https://api.telegram.org/file/bot{CONFIG['BOT_TOKEN']}/{file_path}"
async with session.get(file_url) as resp:
if resp.status != 200:
return Response(status_code=404)
content = await resp.read()
return Response(content=content, media_type="image/jpeg")
# ... existing code ...
class SupportRequest(BaseModel):
user_id: int
message: str
username: str = "Unknown"
@app.post("/api/support")
async def send_support_message(req: SupportRequest, request: Request):
bot = getattr(request.app.state, "bot", None)
if not bot:
return JSONResponse(status_code=500, content={"error": "Bot not init"})
# Send to admins
for admin_id in CONFIG["ADMIN_IDS"]:
try:
text = f"📩 **Support Request**\nFrom: @{req.username} (ID: {req.user_id})\n\n{req.message}"
await bot.send_message(chat_id=admin_id, text=text, parse_mode="Markdown")
except Exception as e:
logger.error(f"Failed to send support msg to {admin_id}: {e}")
return {"status": "sent"}
# --- ADMIN API ---
def is_admin(user_id: int):
return user_id in CONFIG["ADMIN_IDS"]
@app.get("/api/admin/stats")
async def get_admin_stats(user_id: int):
if not is_admin(user_id):
return JSONResponse(status_code=403, content={"error": "Forbidden"})
db_stats = await db.get_stats()
try:
sys_stats = await marzban.get_system_stats()
marz_user_stats = await marzban.get_users_stats()
except:
sys_stats = {}
marz_user_stats = {}
return {
"bot": db_stats,
"server": {
"cpu": sys_stats.get('cpu_usage', 'N/A'),
"ram_used": round(sys_stats.get('mem_used', 0) / (1024**3), 2),
"ram_total": round(sys_stats.get('mem_total', 0) / (1024**3), 2),
"active_users": marz_user_stats.get('active_users', 0),
"total_traffic_gb": round(marz_user_stats.get('total_usage', 0) / (1024**3), 2)
}
}
@app.get("/api/admin/users")
async def admin_list_users(user_id: int, query: str = None):
if not is_admin(user_id):
return JSONResponse(status_code=403, content={"error": "Forbidden"})
if query:
users = await db.search_users(query)
else:
users = await db.get_all_users()
# Convert to dict and add photo_url
results = []
for u in users:
d = dict(u)
d['photo_url'] = f"/api/user-photo/{u['user_id']}"
results.append(d)
# Sort alphabetically by username (case-insensitive), or ID if username is missing
results.sort(key=lambda x: (x.get('username') or str(x['user_id'])).lower())
# Return first 50 results
return results[:50]
@app.get("/api/admin/user/{target_id}")
async def admin_get_user(target_id: int, user_id: int):
if not is_admin(user_id):
return JSONResponse(status_code=403, content={"error": "Forbidden"})
user = await db.get_user(target_id)
if not user:
return JSONResponse(status_code=404, content={"error": "User not found"})
marz_info = {}
try:
marz_info = await marzban.get_user(user['marzban_username'])
except:
pass
return {
"user": dict(user),
"marzban": marz_info
}
@app.post("/api/admin/user/{target_id}/action")
async def admin_user_action(target_id: int, req: Request):
data = await req.json()
admin_id = data.get("user_id")
if not is_admin(admin_id):
return JSONResponse(status_code=403, content={"error": "Forbidden"})
action = data.get("action")
user = await db.get_user(target_id)
if not user:
return JSONResponse(status_code=404, content={"error": "User not found"})
try:
if action == "reset_traffic":
await marzban.reset_user_traffic(user['marzban_username'])
elif action == "toggle_status":
marz_user = await marzban.get_user(user['marzban_username'])
new_status = "disabled" if marz_user.get('status') == 'active' else 'active'
limit_gb = (marz_user.get('data_limit') or 0) / (1024**3)
await marzban.modify_user(user['marzban_username'],
limit_gb=limit_gb,
status=new_status,
expire_timestamp=marz_user.get('expire'))
elif action == "add_days":
days = int(data.get("days", 0))
current_limit = user['data_limit'] or 0
await db.update_subscription(target_id, days, current_limit)
# Sync to Marzban
u = await db.get_user(target_id)
sub_until = u['subscription_until']
if isinstance(sub_until, str): sub_until = datetime.fromisoformat(sub_until)
days_left = (sub_until - datetime.now()).days + 1 if sub_until else 0
await marzban.modify_user(u['marzban_username'], (current_limit / (1024**3)), days_left)
elif action == "set_limit":
limit_gb = float(data.get("limit_gb", 0))
marz_user = await marzban.get_user(user['marzban_username'])
# 0 and negative in Marzban is unlimited
marz_limit = limit_gb if limit_gb > 0 else 0
await marzban.modify_user(user['marzban_username'], marz_limit,
status=marz_user.get('status'),
expire_timestamp=marz_user.get('expire'))
# If 0, we store a very large number in DB to represent infinity
limit_bytes = int(limit_gb * (1024**3)) if limit_gb > 0 else 999999 * (1024**3)
await db.execute("UPDATE users SET data_limit = $1 WHERE user_id = $2", limit_bytes, target_id)
elif action == "set_expiry":
days = int(data.get("days", 0))
# Set fixed expiry from NOW
if days > 10000:
new_date = datetime(2099, 12, 31)
else:
new_date = datetime.now() + timedelta(days=days)
await db.execute("UPDATE users SET subscription_until = $1 WHERE user_id = $2", new_date, target_id)
# Sync to Marzban
u = await db.get_user(target_id)
marz_user = await marzban.get_user(u['marzban_username'])
# In Marzban, we pass days left
days_left = (new_date - datetime.now()).days + 1 if days > 0 else 0
# If days > 10000 (forever), Marzban should be None
marz_days = days_left if days < 10000 else 0
await marzban.modify_user(u['marzban_username'],
(u['data_limit'] / (1024**3)),
marz_days if days > 0 else 1) # 1 sec if expire
elif action == "set_plan":
plan_id = data.get("plan_id")
plan = PLANS.get(plan_id)
if not plan:
return JSONResponse(status_code=404, content={"error": "Plan not found"})
# Use grant_subscription logic
total_days = plan['days']
data_limit_gb = plan['data_limit']
limit_bytes = int(data_limit_gb * (1024**3)) if data_limit_gb > 0 else 999999 * (1024**3)
await db.execute("UPDATE users SET data_limit = $1 WHERE user_id = $2", limit_bytes, target_id)
# Update expiry relative to now
new_date = datetime.now() + timedelta(days=total_days)
await db.execute("UPDATE users SET subscription_until = $1 WHERE user_id = $2", new_date, target_id)
# Sync to Marzban
marz_days = total_days if total_days > 0 else 0
marz_limit = data_limit_gb if data_limit_gb > 0 else 0
await marzban.modify_user(user['marzban_username'],
marz_limit,
marz_days if marz_days > 0 else 1)
elif action == "delete_sub":
await db.remove_subscription(target_id)
expire_ts = int(datetime.now().timestamp())
marz_user = await marzban.get_user(user['marzban_username'])
limit_gb = (marz_user.get('data_limit') or 0) / (1024**3)
await marzban.modify_user(user['marzban_username'],
limit_gb,
expire_timestamp=expire_ts)
else:
return JSONResponse(status_code=400, content={"error": "Invalid action"})
return {"status": "ok"}
except Exception as e:
logger.error(f"Admin action error: {e}")
return JSONResponse(status_code=500, content={"error": str(e)})
@app.get("/api/admin/promos")
async def admin_list_promos(user_id: int):
if not is_admin(user_id):
return JSONResponse(status_code=403, content={"error": "Forbidden"})
promos = await db.fetch("SELECT * FROM promo_codes")
return [dict(p) for p in promos]
@app.get("/api/admin/plans_full")
async def admin_get_plans_full(user_id: int):
if not is_admin(user_id):
return JSONResponse(status_code=403, content={"error": "Forbidden"})
plans_list = []
for pid, p in PLANS.items():
plans_list.append({"id": pid, **p})
return plans_list
class CreatePromoRequest(BaseModel):
user_id: int
code: str
discount: int
uses: int
days: int
is_unlimited: bool = False
bonus_days: int = 0
is_sticky: bool = False
@app.post("/api/admin/promos/create")
async def admin_create_promo(req: CreatePromoRequest):
if not is_admin(req.user_id):
return JSONResponse(status_code=403, content={"error": "Forbidden"})
expires_at = None
if req.days > 0:
expires_at = datetime.now() + timedelta(days=req.days)
try:
await db.create_promo_code(
req.code.upper().strip(),
req.discount,
req.uses,
req.user_id,
expires_at,
req.is_unlimited,
req.bonus_days,
req.is_sticky
)
return {"status": "ok"}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
@app.delete("/api/admin/promo/{code}")
async def admin_delete_promo(code: str, user_id: int):
if not is_admin(user_id):
return JSONResponse(status_code=403, content={"error": "Forbidden"})
await db.delete_promo_code(code)
return {"status": "ok"}
class BroadcastRequest(BaseModel):
user_id: int
message: str
@app.post("/api/admin/broadcast")
async def admin_broadcast(req: BroadcastRequest, request: Request):
if not is_admin(req.user_id):
return JSONResponse(status_code=403, content={"error": "Forbidden"})
bot = getattr(request.app.state, "bot", None)
if not bot:
return JSONResponse(status_code=500, content={"error": "Bot not init"})
users = await db.get_users_for_broadcast()
count = 0
for u in users:
try:
await bot.send_message(chat_id=u['user_id'], text=req.message)
count += 1
except:
pass
return {"sent": count}
# Serve Static Files (must be last)
app.mount("/", StaticFiles(directory="web_app/static", html=True), name="static")
if __name__ == "__main__":
from config import CONFIG
uvicorn.run(app, host="0.0.0.0", port=CONFIG["WEB_APP_PORT"])