Install Library
pip install python-telegram-bot boto3 requestsScript Lengkap
Buat file storage_bot.py:
import os
import re
import boto3
import requests
import subprocess
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
# ============================================
# KONFIGURASI - EDIT BAGIAN INI
# ============================================
TOKEN = "ISI_TOKEN_BOT_KAMU"
OWNER_ID = 123456789 # Chat ID kamu
# AWS S3
AWS_ACCESS_KEY = "ISI_AWS_ACCESS_KEY"
AWS_SECRET_KEY = "ISI_AWS_SECRET_KEY"
AWS_REGION = "ap-southeast-1" # Ganti region kamu
S3_BUCKET = "nama-bucket-kamu"
DOWNLOAD_DIR = "./temp"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
# ============================================
# Init S3 client
s3 = boto3.client(
"s3",
aws_access_key_id = AWS_ACCESS_KEY,
aws_secret_access_key = AWS_SECRET_KEY,
region_name = AWS_REGION
)
# ============================================
# HELPER
# ============================================
def is_owner(update: Update) -> bool:
return update.effective_user.id == OWNER_ID
def get_file_ext(filename: str) -> str:
return filename.lower().split(".")[-1]
async def send_file(update: Update, filepath: str, filename: str):
"""Kirim file ke Telegram sesuai tipenya"""
ext = get_file_ext(filename)
caption = f"✅ {filename}"
with open(filepath, "rb") as f:
if ext in ["mp4", "mkv", "avi", "mov", "ts"]:
await update.message.reply_video(
video=f,
filename=filename,
caption=caption,
supports_streaming=True
)
elif ext in ["mp3", "m4a", "aac", "flac", "wav"]:
await update.message.reply_audio(
audio=f,
filename=filename,
caption=caption
)
elif ext in ["jpg", "jpeg", "png", "webp"]:
await update.message.reply_photo(
photo=f,
caption=caption
)
else:
await update.message.reply_document(
document=f,
filename=filename,
caption=caption
)
# ============================================
# COMMAND HANDLERS
# ============================================
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not is_owner(update):
return
await update.message.reply_text(
"🤖 *Bot Storage Pribadi*\n\n"
"📦 *Sumber File:*\n"
"• AWS S3\n"
"• Pixeldrain\n"
"• URL Langsung\n\n"
"📋 *Perintah:*\n"
"/s3list - List file di S3\n"
"/s3list [folder] - List isi folder S3\n"
"/s3get [nama\\_file] - Ambil file dari S3\n"
"/pdget [file\\_id] - Ambil file dari Pixeldrain\n"
"/urlget [url] - Ambil file dari URL langsung\n"
"/help - Bantuan",
parse_mode="Markdown"
)
# ============================================
# AWS S3
# ============================================
async def s3_list(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""List file di S3 bucket"""
if not is_owner(update):
return
# Prefix/folder opsional
prefix = context.args[0] if context.args else ""
msg = await update.message.reply_text(f"⏳ Mengambil list S3...")
try:
response = s3.list_objects_v2(
Bucket=S3_BUCKET,
Prefix=prefix,
MaxKeys=50 # Tampilkan max 50 file
)
if "Contents" not in response:
await msg.edit_text("📂 Folder kosong atau tidak ditemukan")
return
file_list = []
for obj in response["Contents"]:
key = obj["Key"]
size = obj["Size"] / (1024 * 1024) # Convert ke MB
file_list.append(f"📄 `{key}` ({size:.1f} MB)")
text = f"📦 *File di S3 Bucket: {S3_BUCKET}*\n"
if prefix:
text += f"📁 Folder: `{prefix}`\n"
text += f"Total: {len(file_list)} file\n\n"
text += "\n".join(file_list)
# Telegram max 4096 karakter
if len(text) > 4000:
text = text[:4000] + "\n\n... (terpotong, gunakan /s3list [folder])"
await msg.edit_text(text, parse_mode="Markdown")
except Exception as e:
await msg.edit_text(f"❌ Error S3: {str(e)}")
async def s3_get(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Download file dari S3 lalu kirim ke Telegram"""
if not is_owner(update):
return
if not context.args:
await update.message.reply_text(
"❌ Format: `/s3get nama_file.mp4`\n"
"atau dengan folder: `/s3get folder/nama_file.mp4`",
parse_mode="Markdown"
)
return
s3_key = context.args[0]
filename = s3_key.split("/")[-1]
filepath = os.path.join(DOWNLOAD_DIR, filename)
msg = await update.message.reply_text(f"⏳ Downloading dari S3: `{s3_key}`", parse_mode="Markdown")
try:
# Cek ukuran file dulu
head = s3.head_object(Bucket=S3_BUCKET, Key=s3_key)
size_mb = head["ContentLength"] / (1024 * 1024)
if size_mb > 2000: # Lebih dari 2GB
await msg.edit_text(
f"⚠️ File terlalu besar: {size_mb:.1f} MB\n"
"Telegram free max 2GB\nButuh Telegram Premium untuk > 2GB"
)
return
await msg.edit_text(f"⏳ Downloading {size_mb:.1f} MB dari S3...")
# Download dari S3
s3.download_file(S3_BUCKET, s3_key, filepath)
await msg.edit_text(f"📤 Uploading ke Telegram: `{filename}`", parse_mode="Markdown")
# Kirim ke Telegram
await send_file(update, filepath, filename)
await msg.edit_text(f"✅ Selesai: `{filename}`", parse_mode="Markdown")
except s3.exceptions.NoSuchKey:
await msg.edit_text(f"❌ File tidak ditemukan di S3: `{s3_key}`", parse_mode="Markdown")
except Exception as e:
await msg.edit_text(f"❌ Error: {str(e)}")
finally:
if os.path.exists(filepath):
os.remove(filepath)
async def s3_batch(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Download multiple file dari S3"""
if not is_owner(update):
return
if not context.args:
await update.message.reply_text(
"❌ Format: `/s3batch file1.mp4 file2.mp4 file3.mp4`",
parse_mode="Markdown"
)
return
files = context.args
await update.message.reply_text(f"⏳ Memproses {len(files)} file dari S3...")
for s3_key in files:
context.args = [s3_key]
await s3_get(update, context)
# ============================================
# PIXELDRAIN
# ============================================
async def pixeldrain_get(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Download file dari Pixeldrain lalu kirim ke Telegram"""
if not is_owner(update):
return
if not context.args:
await update.message.reply_text(
"❌ Format: `/pdget FILE_ID`\n"
"atau URL: `/pdget https://pixeldrain.com/u/FILE_ID`",
parse_mode="Markdown"
)
return
input_arg = context.args[0]
# Extract FILE_ID dari URL jika input berupa URL
if "pixeldrain.com" in input_arg:
file_id = input_arg.rstrip("/").split("/")[-1]
else:
file_id = input_arg
msg = await update.message.reply_text(f"⏳ Mengambil info dari Pixeldrain: `{file_id}`", parse_mode="Markdown")
try:
# Ambil info file dari API Pixeldrain
info_url = f"https://pixeldrain.com/api/file/{file_id}/info"
info = requests.get(info_url).json()
if "success" in info and not info["success"]:
await msg.edit_text(f"❌ File tidak ditemukan di Pixeldrain: `{file_id}`", parse_mode="Markdown")
return
filename = info.get("name", f"{file_id}.bin")
size_mb = info.get("size", 0) / (1024 * 1024)
if size_mb > 2000:
await msg.edit_text(
f"⚠️ File terlalu besar: {size_mb:.1f} MB\n"
"Telegram free max 2GB"
)
return
await msg.edit_text(f"⏳ Downloading {filename} ({size_mb:.1f} MB) dari Pixeldrain...")
# Download file
download_url = f"https://pixeldrain.com/api/file/{file_id}"
filepath = os.path.join(DOWNLOAD_DIR, filename)
response = requests.get(download_url, stream=True)
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
await msg.edit_text(f"📤 Uploading ke Telegram: `{filename}`", parse_mode="Markdown")
# Kirim ke Telegram
await send_file(update, filepath, filename)
await msg.edit_text(f"✅ Selesai: `{filename}`", parse_mode="Markdown")
except Exception as e:
await msg.edit_text(f"❌ Error: {str(e)}")
finally:
if os.path.exists(filepath):
os.remove(filepath)
# ============================================
# URL LANGSUNG
# ============================================
async def url_get(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Download dari URL langsung"""
if not is_owner(update):
return
if not context.args:
await update.message.reply_text("❌ Format: `/urlget https://example.com/file.mp4`", parse_mode="Markdown")
return
url = context.args[0]
filename = url.split("/")[-1].split("?")[0]
filepath = os.path.join(DOWNLOAD_DIR, filename)
msg = await update.message.reply_text(f"⏳ Downloading dari URL: `{filename}`", parse_mode="Markdown")
try:
subprocess.run(["wget", "-O", filepath, url], check=True, capture_output=True)
await msg.edit_text(f"📤 Uploading ke Telegram: `{filename}`", parse_mode="Markdown")
await send_file(update, filepath, filename)
await msg.edit_text(f"✅ Selesai: `{filename}`", parse_mode="Markdown")
except Exception as e:
await msg.edit_text(f"❌ Error: {str(e)}")
finally:
if os.path.exists(filepath):
os.remove(filepath)
# ============================================
# HELP
# ============================================
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not is_owner(update):
return
await update.message.reply_text(
"📖 *Panduan Lengkap*\n\n"
"📦 *AWS S3:*\n"
"`/s3list` - List semua file\n"
"`/s3list videos/` - List folder videos\n"
"`/s3get video1.mp4` - Ambil file\n"
"`/s3get videos/video1.mp4` - Ambil dari folder\n"
"`/s3batch file1.mp4 file2.mp4` - Ambil banyak file\n\n"
"🌊 *Pixeldrain:*\n"
"`/pdget AbCdEfGh` - Pakai File ID\n"
"`/pdget https://pixeldrain.com/u/AbCdEfGh` - Pakai URL\n\n"
"🔗 *URL Langsung:*\n"
"`/urlget https://example.com/file.mp4`\n\n"
"💡 *Tips:*\n"
"• File otomatis terhapus dari temp setelah upload\n"
"• Max 2GB per file (Telegram free)\n"
"• Gunakan Search Telegram untuk cari file lama",
parse_mode="Markdown"
)
# ============================================
# MAIN
# ============================================
def main():
app = Application.builder().token(TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("help", help_command))
app.add_handler(CommandHandler("s3list", s3_list))
app.add_handler(CommandHandler("s3get", s3_get))
app.add_handler(CommandHandler("s3batch", s3_batch))
app.add_handler(CommandHandler("pdget", pixeldrain_get))
app.add_handler(CommandHandler("urlget", url_get))
print("✅ Bot berjalan...")
app.run_polling()
if __name__ == "__main__":
main()Cara Pakai
# Install dependencies
pip install python-telegram-bot boto3 requests
# Edit konfigurasi di script
# Lalu jalankan
python storage_bot.pyContoh Penggunaan
# Lihat semua file di S3
/s3list
# Lihat isi folder tertentu
/s3list videos/2024/
# Ambil file dari S3
/s3get video1.mp4
/s3get videos/2024/film.mp4
# Ambil dari Pixeldrain
/pdget AbCd
