259 lines
9.5 KiB
Python
Executable File
259 lines
9.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import os
|
|
import sys
|
|
import csv
|
|
import argparse
|
|
import asyncio
|
|
from getpass import getpass
|
|
from typing import Optional, Tuple
|
|
|
|
from telethon import TelegramClient, functions, types
|
|
from telethon.errors import (
|
|
ChatAdminRequiredError,
|
|
ChannelPrivateError,
|
|
UsernameNotOccupiedError,
|
|
UsernameInvalidError,
|
|
InviteHashExpiredError,
|
|
InviteHashInvalidError,
|
|
)
|
|
|
|
DEFAULT_SESSION = "export_session"
|
|
|
|
def parse_args():
|
|
p = argparse.ArgumentParser(
|
|
description="Export Telegram group/supergroup members (and channel subscribers if you are an admin)."
|
|
)
|
|
g = p.add_mutually_exclusive_group(required=False)
|
|
g.add_argument("--chat", help="Group/channel @username or t.me link (public or invite).")
|
|
g.add_argument("--chat-id", type=int, help="Numeric chat ID, e.g. -1001234567890.")
|
|
|
|
# Listing modes
|
|
p.add_argument("--list", action="store_true",
|
|
help="List ONLY direct chats (1:1 DMs) and exit.")
|
|
p.add_argument("--listgroups", action="store_true",
|
|
help="List ONLY groups/supergroups and exit.")
|
|
p.add_argument("--listchannels", action="store_true",
|
|
help="List ONLY channels and exit.")
|
|
|
|
# Pick mode with filter: g = groups, c = channels
|
|
p.add_argument("--pick", choices=["g", "c"],
|
|
help="Interactively pick a chat: 'g' for groups, 'c' for channels.")
|
|
|
|
p.add_argument("--out", default="members.csv", help="Output file (default: members.csv)")
|
|
p.add_argument("--format", choices=["csv","txt","json"], default="csv", help="Output format (default: csv)")
|
|
p.add_argument("--session", default=DEFAULT_SESSION, help=f"Telethon session name (default: {DEFAULT_SESSION})")
|
|
p.add_argument("--aggressive", action="store_true", help="Use aggressive participant scraping.")
|
|
return p.parse_args()
|
|
|
|
def ensure_api_creds():
|
|
api_id = os.environ.get("TELEGRAM_API_ID")
|
|
api_hash = os.environ.get("TELEGRAM_API_HASH")
|
|
if not api_id:
|
|
api_id = input("Enter TELEGRAM_API_ID: ").strip()
|
|
if not api_hash:
|
|
api_hash = getpass("Enter TELEGRAM_API_HASH (hidden): ").strip()
|
|
try:
|
|
api_id = int(api_id)
|
|
except Exception:
|
|
print("ERROR: TELEGRAM_API_ID must be an integer.", file=sys.stderr)
|
|
sys.exit(1)
|
|
return api_id, api_hash
|
|
|
|
def user_to_dict(u: types.User):
|
|
first = (u.first_name or "").strip()
|
|
last = (u.last_name or "").strip()
|
|
username = (u.username or "").strip()
|
|
phone = (u.phone or "").strip()
|
|
full = (first + " " + last).strip()
|
|
return {
|
|
"user_id": u.id,
|
|
"is_bot": bool(getattr(u, "bot", False)),
|
|
"username": username,
|
|
"first_name": first,
|
|
"last_name": last,
|
|
"full_name": full,
|
|
"phone": phone,
|
|
"lang_code": getattr(u, "lang_code", "") or ""
|
|
}
|
|
|
|
def parse_tme_link(s: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Returns (public_username, invite_hash) from a t.me link or (@name, None) if it's public.
|
|
For invite links like https://t.me/+AbCdEfGhIj -> (None, 'AbCdEfGhIj')
|
|
For public like https://t.me/mygroup -> ('mygroup', None)
|
|
For @mygroup -> ('mygroup', None)
|
|
"""
|
|
s = s.strip()
|
|
if s.startswith("@"):
|
|
return (s[1:], None)
|
|
if "t.me/" in s:
|
|
link = s.split("t.me/", 1)[1].strip()
|
|
if link.startswith("+"):
|
|
return (None, link[1:].split("?")[0])
|
|
if link.startswith("joinchat/"):
|
|
return (None, link.split("/", 1)[1].split("?")[0])
|
|
if link.startswith("c/"):
|
|
# private deep-link; cannot resolve to an entity without being in the chat
|
|
return (None, None)
|
|
public = link.split("/", 1)[0]
|
|
return (public, None)
|
|
return (None, None)
|
|
|
|
def classify_entity(entity) -> str:
|
|
"""
|
|
Returns one of: 'direct', 'group', 'channel', or 'other'
|
|
"""
|
|
if isinstance(entity, types.User):
|
|
return "direct"
|
|
if isinstance(entity, types.Chat):
|
|
# legacy small group
|
|
return "group"
|
|
if isinstance(entity, types.Channel):
|
|
# megagroup -> group, broadcast -> channel
|
|
if getattr(entity, "megagroup", False):
|
|
return "group"
|
|
if getattr(entity, "broadcast", False) or not getattr(entity, "megagroup", False):
|
|
return "channel"
|
|
return "other"
|
|
|
|
def fmt_dialog_line(d) -> str:
|
|
name = d.name or "(no title)"
|
|
ent = d.entity
|
|
eid = getattr(ent, "id", None)
|
|
uname = getattr(ent, "username", None)
|
|
uname_s = f" @{uname}" if uname else ""
|
|
return f"- {name}{uname_s} id={eid}"
|
|
|
|
async def list_dialogs(client: TelegramClient, mode: str):
|
|
"""
|
|
mode: 'direct' | 'group' | 'channel'
|
|
"""
|
|
dialogs = await client.get_dialogs(limit=None)
|
|
print_header = {
|
|
"direct": "Your direct chats (DMs):",
|
|
"group": "Your groups & supergroups:",
|
|
"channel": "Your channels:",
|
|
}[mode]
|
|
print(print_header)
|
|
count = 0
|
|
for d in dialogs:
|
|
if classify_entity(d.entity) == mode:
|
|
print(fmt_dialog_line(d))
|
|
count += 1
|
|
if count == 0:
|
|
print("(none)")
|
|
|
|
async def resolve_entity(client: TelegramClient,
|
|
chat: Optional[str],
|
|
chat_id: Optional[int],
|
|
pick_filter: Optional[str]):
|
|
# numeric chat_id
|
|
if chat_id:
|
|
return await client.get_entity(chat_id)
|
|
|
|
# pick mode with filter
|
|
if pick_filter in ("g", "c"):
|
|
want = "group" if pick_filter == "g" else "channel"
|
|
dialogs = await client.get_dialogs(limit=None)
|
|
filtered = [d for d in dialogs if classify_entity(d.entity) == want]
|
|
if not filtered:
|
|
print(f"No {want}s found.", file=sys.stderr)
|
|
sys.exit(2)
|
|
print(f"Pick a {want}:")
|
|
for idx, d in enumerate(filtered):
|
|
print(f"[{idx}] {fmt_dialog_line(d)}")
|
|
sel = input("Enter number: ").strip()
|
|
try:
|
|
sel_i = int(sel)
|
|
return filtered[sel_i].entity
|
|
except Exception:
|
|
print("Invalid selection.", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
# --chat (username or link)
|
|
if chat:
|
|
public, invite = parse_tme_link(chat)
|
|
if public:
|
|
try:
|
|
return await client.get_entity(public)
|
|
except (UsernameInvalidError, UsernameNotOccupiedError):
|
|
print("ERROR: Public @name not found.", file=sys.stderr)
|
|
sys.exit(2)
|
|
if invite:
|
|
try:
|
|
info = await client(functions.messages.CheckChatInviteRequest(invite))
|
|
except (InviteHashInvalidError, InviteHashExpiredError):
|
|
print("ERROR: Invite link is invalid or EXPIRED. Generate a fresh invite or use --list* / --pick.", file=sys.stderr)
|
|
sys.exit(2)
|
|
if isinstance(info, types.messages.ChatInvite):
|
|
joined = await client(functions.messages.ImportChatInviteRequest(invite))
|
|
if joined.chats:
|
|
return joined.chats[0]
|
|
if isinstance(info, types.messages.ChatInviteAlready):
|
|
return info.chat
|
|
print("ERROR: Could not resolve the link directly. Use --list* or --pick.", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
print("ERROR: Provide --chat, --chat-id, or use --pick g|c.", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
async def export():
|
|
args = parse_args()
|
|
api_id, api_hash = ensure_api_creds()
|
|
|
|
client = TelegramClient(args.session, api_id, api_hash)
|
|
await client.start() # will ask phone+code on first run
|
|
|
|
# listing modes
|
|
if args.list or args.listgroups or args.listchannels:
|
|
if args.list:
|
|
await list_dialogs(client, mode="direct")
|
|
if args.listgroups:
|
|
await list_dialogs(client, mode="group")
|
|
if args.listchannels:
|
|
await list_dialogs(client, mode="channel")
|
|
await client.disconnect()
|
|
return
|
|
|
|
# normal export flow
|
|
entity = await resolve_entity(client, args.chat, args.chat_id, args.pick)
|
|
|
|
title = getattr(entity, "title", None) or getattr(entity, "username", None) or getattr(entity, "id", None)
|
|
print(f"[i] Exporting from: {title}")
|
|
|
|
users = []
|
|
try:
|
|
async for u in client.iter_participants(entity, aggressive=args.aggressive):
|
|
users.append(user_to_dict(u))
|
|
except ChatAdminRequiredError:
|
|
print("ERROR: Admin rights required to list subscribers for this channel.", file=sys.stderr)
|
|
await client.disconnect()
|
|
sys.exit(3)
|
|
except ChannelPrivateError:
|
|
print("ERROR: The chat/channel is private and you don't have access.", file=sys.stderr)
|
|
await client.disconnect()
|
|
sys.exit(3)
|
|
|
|
# write output
|
|
if args.format == "csv":
|
|
fieldnames = ["user_id","is_bot","username","first_name","last_name","full_name","phone","lang_code"]
|
|
with open(args.out, "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(users)
|
|
elif args.format == "txt":
|
|
with open(args.out, "w", encoding="utf-8") as f:
|
|
for u in users:
|
|
handle = f"@{u['username']}" if u["username"] else u["full_name"] or str(u["user_id"])
|
|
f.write(f"{handle}\n")
|
|
else: # json
|
|
import json
|
|
with open(args.out, "w", encoding="utf-8") as f:
|
|
json.dump(users, f, ensure_ascii=False, indent=2)
|
|
|
|
print(f"[✓] Exported {len(users)} members to {args.out}")
|
|
await client.disconnect()
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(export())
|