diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7b53123 --- /dev/null +++ b/.gitignore @@ -0,0 +1,68 @@ +# --- Python virtual envs --- +.venv/ +venv/ +ENV/ +env/ + +# --- Telethon session files (SQLite) --- +# keep your login session out of git +*.session +*.session-* +*session-* +*-journal +*-wal +*-shm + +# --- Secrets / environment --- +.env +.env.* +.envrc +.direnv/ + +# --- Python bytecode & caches --- +__pycache__/ +*.py[cod] +*.pyd +*.pyo +*.so +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +.pyre/ +.coverage +.coverage.* +coverage.xml +htmlcov/ + +# --- Packaging artifacts --- +build/ +dist/ +*.egg-info/ +.eggs/ + +# --- Editors / IDEs --- +.vscode/ # VSCodium/VS Code settings +*.code-workspace +.idea/ +.history/ + +# --- OS cruft --- +.DS_Store +Thumbs.db + +# --- Logs & temp files --- +*.log +*.tmp +*.temp +*~ +.#* +*.swp +*.swo +*.swn +*.bak +_* +._* + +# --- Exports --- +members.* +export.* diff --git a/export_members.py b/export_members.py new file mode 100755 index 0000000..c5a27f2 --- /dev/null +++ b/export_members.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +import os +import sys +import csv +import argparse +import asyncio +from getpass import getpass +from typing import Optional, Tuple + +from telethon import TelegramClient, functions, types +from telethon.errors import ( + ChatAdminRequiredError, + ChannelPrivateError, + UsernameNotOccupiedError, + UsernameInvalidError, + InviteHashExpiredError, + InviteHashInvalidError, +) + +DEFAULT_SESSION = "export_session" + +def parse_args(): + p = argparse.ArgumentParser( + description="Export Telegram group/supergroup members (and channel subscribers if you are an admin)." + ) + g = p.add_mutually_exclusive_group(required=False) + g.add_argument("--chat", help="Group/channel @username or t.me link (public or invite).") + g.add_argument("--chat-id", type=int, help="Numeric chat ID, e.g. -1001234567890.") + + # Listing modes + p.add_argument("--list", action="store_true", + help="List ONLY direct chats (1:1 DMs) and exit.") + p.add_argument("--listgroups", action="store_true", + help="List ONLY groups/supergroups and exit.") + p.add_argument("--listchannels", action="store_true", + help="List ONLY channels and exit.") + + # Pick mode with filter: g = groups, c = channels + p.add_argument("--pick", choices=["g", "c"], + help="Interactively pick a chat: 'g' for groups, 'c' for channels.") + + p.add_argument("--out", default="members.csv", help="Output file (default: members.csv)") + p.add_argument("--format", choices=["csv","txt","json"], default="csv", help="Output format (default: csv)") + p.add_argument("--session", default=DEFAULT_SESSION, help=f"Telethon session name (default: {DEFAULT_SESSION})") + p.add_argument("--aggressive", action="store_true", help="Use aggressive participant scraping.") + return p.parse_args() + +def ensure_api_creds(): + api_id = os.environ.get("TELEGRAM_API_ID") + api_hash = os.environ.get("TELEGRAM_API_HASH") + if not api_id: + api_id = input("Enter TELEGRAM_API_ID: ").strip() + if not api_hash: + api_hash = getpass("Enter TELEGRAM_API_HASH (hidden): ").strip() + try: + api_id = int(api_id) + except Exception: + print("ERROR: TELEGRAM_API_ID must be an integer.", file=sys.stderr) + sys.exit(1) + return api_id, api_hash + +def user_to_dict(u: types.User): + first = (u.first_name or "").strip() + last = (u.last_name or "").strip() + username = (u.username or "").strip() + phone = (u.phone or "").strip() + full = (first + " " + last).strip() + return { + "user_id": u.id, + "is_bot": bool(getattr(u, "bot", False)), + "username": username, + "first_name": first, + "last_name": last, + "full_name": full, + "phone": phone, + "lang_code": getattr(u, "lang_code", "") or "" + } + +def parse_tme_link(s: str) -> Tuple[Optional[str], Optional[str]]: + """ + Returns (public_username, invite_hash) from a t.me link or (@name, None) if it's public. + For invite links like https://t.me/+AbCdEfGhIj -> (None, 'AbCdEfGhIj') + For public like https://t.me/mygroup -> ('mygroup', None) + For @mygroup -> ('mygroup', None) + """ + s = s.strip() + if s.startswith("@"): + return (s[1:], None) + if "t.me/" in s: + link = s.split("t.me/", 1)[1].strip() + if link.startswith("+"): + return (None, link[1:].split("?")[0]) + if link.startswith("joinchat/"): + return (None, link.split("/", 1)[1].split("?")[0]) + if link.startswith("c/"): + # private deep-link; cannot resolve to an entity without being in the chat + return (None, None) + public = link.split("/", 1)[0] + return (public, None) + return (None, None) + +def classify_entity(entity) -> str: + """ + Returns one of: 'direct', 'group', 'channel', or 'other' + """ + if isinstance(entity, types.User): + return "direct" + if isinstance(entity, types.Chat): + # legacy small group + return "group" + if isinstance(entity, types.Channel): + # megagroup -> group, broadcast -> channel + if getattr(entity, "megagroup", False): + return "group" + if getattr(entity, "broadcast", False) or not getattr(entity, "megagroup", False): + return "channel" + return "other" + +def fmt_dialog_line(d) -> str: + name = d.name or "(no title)" + ent = d.entity + eid = getattr(ent, "id", None) + uname = getattr(ent, "username", None) + uname_s = f" @{uname}" if uname else "" + return f"- {name}{uname_s} id={eid}" + +async def list_dialogs(client: TelegramClient, mode: str): + """ + mode: 'direct' | 'group' | 'channel' + """ + dialogs = await client.get_dialogs(limit=None) + print_header = { + "direct": "Your direct chats (DMs):", + "group": "Your groups & supergroups:", + "channel": "Your channels:", + }[mode] + print(print_header) + count = 0 + for d in dialogs: + if classify_entity(d.entity) == mode: + print(fmt_dialog_line(d)) + count += 1 + if count == 0: + print("(none)") + +async def resolve_entity(client: TelegramClient, + chat: Optional[str], + chat_id: Optional[int], + pick_filter: Optional[str]): + # numeric chat_id + if chat_id: + return await client.get_entity(chat_id) + + # pick mode with filter + if pick_filter in ("g", "c"): + want = "group" if pick_filter == "g" else "channel" + dialogs = await client.get_dialogs(limit=None) + filtered = [d for d in dialogs if classify_entity(d.entity) == want] + if not filtered: + print(f"No {want}s found.", file=sys.stderr) + sys.exit(2) + print(f"Pick a {want}:") + for idx, d in enumerate(filtered): + print(f"[{idx}] {fmt_dialog_line(d)}") + sel = input("Enter number: ").strip() + try: + sel_i = int(sel) + return filtered[sel_i].entity + except Exception: + print("Invalid selection.", file=sys.stderr) + sys.exit(2) + + # --chat (username or link) + if chat: + public, invite = parse_tme_link(chat) + if public: + try: + return await client.get_entity(public) + except (UsernameInvalidError, UsernameNotOccupiedError): + print("ERROR: Public @name not found.", file=sys.stderr) + sys.exit(2) + if invite: + try: + info = await client(functions.messages.CheckChatInviteRequest(invite)) + except (InviteHashInvalidError, InviteHashExpiredError): + print("ERROR: Invite link is invalid or EXPIRED. Generate a fresh invite or use --list* / --pick.", file=sys.stderr) + sys.exit(2) + if isinstance(info, types.messages.ChatInvite): + joined = await client(functions.messages.ImportChatInviteRequest(invite)) + if joined.chats: + return joined.chats[0] + if isinstance(info, types.messages.ChatInviteAlready): + return info.chat + print("ERROR: Could not resolve the link directly. Use --list* or --pick.", file=sys.stderr) + sys.exit(2) + + print("ERROR: Provide --chat, --chat-id, or use --pick g|c.", file=sys.stderr) + sys.exit(2) + +async def export(): + args = parse_args() + api_id, api_hash = ensure_api_creds() + + client = TelegramClient(args.session, api_id, api_hash) + await client.start() # will ask phone+code on first run + + # listing modes + if args.list or args.listgroups or args.listchannels: + if args.list: + await list_dialogs(client, mode="direct") + if args.listgroups: + await list_dialogs(client, mode="group") + if args.listchannels: + await list_dialogs(client, mode="channel") + await client.disconnect() + return + + # normal export flow + entity = await resolve_entity(client, args.chat, args.chat_id, args.pick) + + title = getattr(entity, "title", None) or getattr(entity, "username", None) or getattr(entity, "id", None) + print(f"[i] Exporting from: {title}") + + users = [] + try: + async for u in client.iter_participants(entity, aggressive=args.aggressive): + users.append(user_to_dict(u)) + except ChatAdminRequiredError: + print("ERROR: Admin rights required to list subscribers for this channel.", file=sys.stderr) + await client.disconnect() + sys.exit(3) + except ChannelPrivateError: + print("ERROR: The chat/channel is private and you don't have access.", file=sys.stderr) + await client.disconnect() + sys.exit(3) + + # write output + if args.format == "csv": + fieldnames = ["user_id","is_bot","username","first_name","last_name","full_name","phone","lang_code"] + with open(args.out, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(users) + elif args.format == "txt": + with open(args.out, "w", encoding="utf-8") as f: + for u in users: + handle = f"@{u['username']}" if u["username"] else u["full_name"] or str(u["user_id"]) + f.write(f"{handle}\n") + else: # json + import json + with open(args.out, "w", encoding="utf-8") as f: + json.dump(users, f, ensure_ascii=False, indent=2) + + print(f"[✓] Exported {len(users)} members to {args.out}") + await client.disconnect() + +if __name__ == "__main__": + asyncio.run(export()) diff --git a/import_contacts.py b/import_contacts.py new file mode 100755 index 0000000..390518d --- /dev/null +++ b/import_contacts.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python3 +import os +import sys +import csv +import argparse +import asyncio +import time +from getpass import getpass +from typing import Dict, Optional, Set, Tuple + +from telethon import TelegramClient, functions, types +from telethon.errors import ( + FloodWaitError, + PeerFloodError, + UsernameInvalidError, + UsernameNotOccupiedError, + RPCError, +) + +DEFAULT_SESSION = "export_session" + +# ---------- CLI ---------- + +def parse_args(): + p = argparse.ArgumentParser( + description="Import contacts from a CSV (from export_members.py) into your Telegram Contacts." + ) + p.add_argument("--csv", required=True, help="Path to CSV exported by export_members.py") + p.add_argument("--session", default=DEFAULT_SESSION, help=f"Telethon session name (default: {DEFAULT_SESSION})") + p.add_argument("--skip-bots", action="store_true", default=True, help="Skip users marked as bots (default: on)") + p.add_argument("--no-skip-bots", dest="skip_bots", action="store_false", help="Do not skip bots") + p.add_argument("--only-with-phone", action="store_true", + help="Only add rows that contain a phone number (safest and most reliable).") + p.add_argument("--dry-run", action="store_true", help="Do not write anything; just print what would happen.") + p.add_argument("--limit", type=int, default=0, help="Stop after adding this many contacts (0 = no limit).") + p.add_argument("--sleep", type=float, default=0.5, help="Delay (seconds) between adds to avoid flood (default: 0.5)") + p.add_argument("--add-phone-privacy-exception", action="store_true", + help="Set the 'add_phone_privacy_exception' flag when adding by @username/user_id.") + p.add_argument("--start-from", type=int, default=0, + help="Skip the first N data rows (0-based) to resume after a previous run.") + return p.parse_args() + +# ---------- Helpers ---------- + +def ensure_api_creds() -> Tuple[int, str]: + api_id = os.environ.get("TELEGRAM_API_ID") + api_hash = os.environ.get("TELEGRAM_API_HASH") + if not api_id: + api_id = input("Enter TELEGRAM_API_ID: ").strip() + if not api_hash: + api_hash = getpass("Enter TELEGRAM_API_HASH (hidden): ").strip() + try: + api_id = int(api_id) + except Exception: + print("ERROR: TELEGRAM_API_ID must be an integer.", file=sys.stderr) + sys.exit(1) + return api_id, api_hash + +def normalize_phone(s: Optional[str]) -> Optional[str]: + if not s: + return None + s = s.strip().replace(" ", "").replace("-", "").replace("(", "").replace(")", "") + if not s: + return None + if s[0] != "+" and s[0].isdigit(): + pass + return s + +def read_csv_rows(path: str): + with open(path, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + yield { + "user_id": row.get("user_id", "").strip(), + "is_bot": row.get("is_bot", "").strip(), + "username": (row.get("username") or "").strip(), + "first_name": (row.get("first_name") or "").strip(), + "last_name": (row.get("last_name") or "").strip(), + "full_name": (row.get("full_name") or "").strip(), + "phone": normalize_phone(row.get("phone")), + "lang_code": (row.get("lang_code") or "").strip(), + } + +async def get_existing_contacts_sets(client: TelegramClient) -> Tuple[Set[int], Set[str], Set[str]]: + result = await client(functions.contacts.GetContactsRequest(hash=0)) + ids: Set[int] = set() + usernames: Set[str] = set() + phones: Set[str] = set() + for u in result.users: + if isinstance(u, types.User): + ids.add(u.id) + if u.username: + usernames.add(u.username.lower()) + if u.phone: + phones.add(normalize_phone(u.phone) or "") + return ids, usernames, phones + +async def add_by_phone(client: TelegramClient, phone: str, first: str, last: str, dry: bool) -> Optional[types.User]: + ipc = types.InputPhoneContact(client_id=0, phone=phone, first_name=first or " ", last_name=last or "") + if dry: + print(f"[DRY] contacts.ImportContacts: {phone} ({first} {last})") + return None + resp = await client(functions.contacts.ImportContactsRequest(contacts=[ipc], replace=False)) + return resp.users[0] if resp.users else None + +async def add_by_entity(client: TelegramClient, + entity: types.TypeUser, + first: str, + last: str, + phone_for_label: str, + add_privacy: bool, + dry: bool) -> types.User: + if dry: + uname = getattr(entity, "username", None) + print(f"[DRY] contacts.AddContact: id={entity.id} @{uname or ''} ({first} {last})") + return entity + user = await client(functions.contacts.AddContactRequest( + id=entity, + first_name=first or " ", + last_name=last or "", + phone=phone_for_label or "", + add_phone_privacy_exception=bool(add_privacy), + )) + return user + +async def resolve_user_entity(client: TelegramClient, + username: Optional[str], + user_id: Optional[int]) -> Optional[types.User]: + if username: + try: + ent = await client.get_entity(username) + if isinstance(ent, types.User): + return ent + except (UsernameInvalidError, UsernameNotOccupiedError): + return None + except RPCError: + return None + if user_id: + try: + ent = await client.get_entity(user_id) + if isinstance(ent, types.User): + return ent + except RPCError: + return None + return None + +# ---------- Main flow ---------- + +async def run(): + args = parse_args() + + # NEW: Early CSV validation (fail fast, BEFORE connecting) + csv_abs = os.path.abspath(args.csv) + try: + rows = list(read_csv_rows(csv_abs)) + except FileNotFoundError: + print(f"ERROR: CSV file not found: {csv_abs}", file=sys.stderr) + sys.exit(2) + except IsADirectoryError: + print(f"ERROR: CSV path is a directory, not a file: {csv_abs}", file=sys.stderr) + sys.exit(2) + except PermissionError: + print(f"ERROR: No permission to read CSV: {csv_abs}", file=sys.stderr) + sys.exit(2) + except OSError as e: + print(f"ERROR: Could not open CSV '{csv_abs}': {e}", file=sys.stderr) + sys.exit(2) + + if args.start_from: + rows = rows[args.start_from:] + + api_id, api_hash = ensure_api_creds() + client = TelegramClient(args.session, api_id, api_hash) + await client.start() + + existing_ids, existing_usernames, existing_phones = await get_existing_contacts_sets(client) + print(f"[i] You currently have {len(existing_ids)} contacts.") + + total = 0 + added = 0 + skipped = 0 + failed = 0 + + for idx, row in enumerate(rows): + total += 1 + uid_str = row["user_id"] + user_id = int(uid_str) if uid_str.isdigit() else None + is_bot = str(row["is_bot"]).lower() in ("1", "true", "yes") + username = row["username"] + first = row["first_name"] or (row["full_name"].split(" ")[0] if row["full_name"] else "") + last = row["last_name"] or (" ".join(row["full_name"].split(" ")[1:]) if row["full_name"] else "") + phone = row["phone"] + + if args.skip_bots and is_bot: + print(f"[skip] bot user_id={uid_str} @{username}") + skipped += 1 + continue + + if args.only_with_phone and not phone: + print(f"[skip] no phone for user_id={uid_str} @{username}") + skipped += 1 + continue + + if user_id and user_id in existing_ids: + print(f"[skip] already contact: id={user_id} @{username}") + skipped += 1 + continue + if username and username.lower() in existing_usernames: + print(f"[skip] already contact by username: @{username}") + skipped += 1 + continue + if phone and phone in existing_phones: + print(f"[skip] already contact by phone: {phone}") + skipped += 1 + continue + + try: + if phone: + u = await add_by_phone(client, phone, first, last, args.dry_run) + status = "[ok]" if not args.dry_run else "[would add]" + print(f"{status} by phone: {phone} name='{first} {last}' @{username or ''}") + if not args.dry_run and u and isinstance(u, types.User): + existing_ids.add(u.id) + if u.username: existing_usernames.add(u.username.lower()) + if u.phone: existing_phones.add(normalize_phone(u.phone) or "") + added += 1 + else: + ent = await resolve_user_entity(client, username=username or None, user_id=user_id) + if ent: + _ = await add_by_entity( + client, ent, first, last, phone_for_label="", add_privacy=args.add_phone_privacy_exception, + dry=args.dry_run + ) + status = "[ok]" if not args.dry_run else "[would add]" + print(f"{status} by entity: id={ent.id} @{getattr(ent, 'username', '')} name='{first} {last}'") + if not args.dry_run: + existing_ids.add(ent.id) + if ent.username: existing_usernames.add(ent.username.lower()) + added += 1 + else: + print(f"[fail] cannot resolve user (no phone, bad/unknown username/id): id={uid_str} @{username}") + failed += 1 + + except FloodWaitError as e: + wait_s = int(getattr(e, "seconds", 30)) + print(f"[flood-wait] Telegram asked to wait {wait_s}s; sleeping…") + if args.dry_run: + print("[dry-run] continuing without waiting.") + else: + time.sleep(wait_s + 1) + try: + if phone: + _ = await add_by_phone(client, phone, first, last, args.dry_run) + print(f"[ok] retried by phone: {phone} name='{first} {last}'") + added += 1 + else: + ent = await resolve_user_entity(client, username=username or None, user_id=user_id) + if ent: + _ = await add_by_entity( + client, ent, first, last, phone_for_label="", add_privacy=args.add_phone_privacy_exception, + dry=args.dry_run + ) + print(f"[ok] retried by entity: id={ent.id} @{getattr(ent, 'username', '')}") + added += 1 + else: + print(f"[fail] still cannot resolve after wait: id={uid_str} @{username}") + failed += 1 + except PeerFloodError: + print("[fatal] PeerFloodError: Telegram is rate-limiting hard; stop and try again later.") + break + + except PeerFloodError: + print("[fatal] PeerFloodError: Telegram is rate-limiting hard; stop and try again later.") + break + + except RPCError as e: + print(f"[fail] RPC error for id={uid_str} @{username}: {e.__class__.__name__}: {e}") + failed += 1 + + if args.limit and added >= args.limit: + print(f"[i] Reached --limit={args.limit}.") + break + if args.sleep > 0 and not args.dry_run: + await asyncio.sleep(args.sleep) + + print(f"\nSummary: total rows seen={total} added={added} skipped={skipped} failed={failed}") + await client.disconnect() + +# ---------- Entrypoint ---------- + +if __name__ == "__main__": + asyncio.run(run())