Python

domain_status_fast

0
Please log in or register to do it.

domain_status_fast

import argparse
import csv
import os
import re
import signal
import sys
import tempfile
import threading
import time
import warnings
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from queue import Queue
from urllib.parse import urljoin, urlparse

import dns.resolver
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from urllib3.exceptions import InsecureRequestWarning
from urllib3.util.retry import Retry

warnings.simplefilter("ignore", InsecureRequestWarning)

# =========================
# GLOBALS
# =========================

STOP_EVENT = threading.Event()
THREAD_LOCAL = threading.local()

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/124.0 Safari/537.36"
    ),
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,text/plain;q=0.8,*/*;q=0.7",
}

DEFAULT_CONNECT_TIMEOUT = 5
DEFAULT_READ_TIMEOUT = 7
DEFAULT_DNS_TIMEOUT = 2
DEFAULT_MAX_REDIRECTS = 8
DEFAULT_MAX_BYTES = 98304   # 96 KB
DEFAULT_RETRIES = 1
DEFAULT_WORKERS = 25

FOR_SALE_KEYWORDS = [
    "domain is for sale",
    "buy this domain",
    "this domain may be for sale",
    "purchase this domain",
    "afternic",
    "sedo",
    "dan.com",
    "undeveloped",
]

PARKED_KEYWORDS = [
    "domain parked",
    "parked free",
    "parkingcrew",
    "bodis",
    "cashparking",
    "sedo parking",
    "parked domain",
    "this domain is parked",
]

DEFAULT_HOSTING_KEYWORDS = [
    "apache2 ubuntu default page",
    "apache2 debian default page",
    "welcome to nginx",
    "nginx test page",
    "test page for the nginx",
    "default web site page",
    "iis windows server",
]

COMING_SOON_KEYWORDS = [
    "coming soon",
    "under construction",
    "launching soon",
    "website coming soon",
    "site is coming soon",
]

SUSPENDED_KEYWORDS = [
    "this account has been suspended",
    "account suspended",
    "website suspended",
    "site suspended",
    "hosting account has been suspended",
    "suspended due to non-payment",
    "please contact billing",
    "contact your hosting provider",
    "billing issue",
]

EXPIRED_KEYWORDS = [
    "this domain has expired",
    "domain expired",
    "expired domain",
    "renew this domain",
    "renewal required",
    "domain renewal",
    "renew now",
    "expiration notice",
    "registrant verification failed",
    "has expired and may be available",
]

FIELDNAMES = [
    "checked_at",
    "domain",
    "bucket",
    "page_type",
    "dns_ok",
    "dns_error",
    "status_code",
    "ssl_status",
    "content_type",
    "elapsed_ms",
    "best_start_url",
    "best_final_url",
    "title",
    "notes",
    "error",
    "redirect_chain",
    "A",
    "AAAA",
    "CNAME",
    "all_attempts",
]

# ANSI color
C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_CYAN = "\033[96m"
C_DIM = "\033[2m"


# =========================
# UTIL
# =========================

def now_utc():
    return datetime.now(timezone.utc).isoformat()


def clean_text(s):
    if not s:
        return ""
    return re.sub(r"\s+", " ", s).strip()


def normalize_domain(raw):
    s = raw.strip()
    if not s or s.startswith("#"):
        return ""

    s = s.split("#", 1)[0].strip()
    if not s:
        return ""

    if "://" not in s:
        s = "http://" + s

    try:
        p = urlparse(s)
        host = p.netloc or p.path
        host = host.split("/")[0].split(":")[0].strip().lower().strip(".")
        if host.startswith("www."):
            host = host[4:]
        return host
    except Exception:
        return ""


def load_domains(input_file):
    domains = []
    seen = set()

    with open(input_file, "r", encoding="utf-8") as f:
        for line in f:
            d = normalize_domain(line)
            if d and d not in seen:
                seen.add(d)
                domains.append(d)

    return domains


def load_processed_domains(all_results_csv):
    processed = set()
    if not os.path.exists(all_results_csv) or os.path.getsize(all_results_csv) == 0:
        return processed

    try:
        with open(all_results_csv, "r", encoding="utf-8", newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                d = (row.get("domain") or "").strip().lower()
                if d:
                    processed.add(d)
    except Exception:
        pass

    return processed


def host_of(url):
    try:
        return (urlparse(url).hostname or "").lower()
    except Exception:
        return ""


def is_html_like(content_type):
    ct = (content_type or "").lower()
    return any(x in ct for x in [
        "text/html",
        "application/xhtml+xml",
        "text/plain",
        "application/xml",
        "text/xml",
    ])


def extract_title(html):
    if not html:
        return ""
    try:
        soup = BeautifulSoup(html, "html.parser")
        if soup.title and soup.title.string:
            return clean_text(soup.title.string)
    except Exception:
        pass

    m = re.search(r"<title[^>]*>(.*?)</title>", html, re.I | re.S)
    if m:
        return clean_text(m.group(1))
    return ""


def colorize_bucket(bucket):
    if bucket == "LIVE_OK":
        return f"{C_GREEN}{bucket}{C_RESET}"
    if bucket == "WARNING":
        return f"{C_YELLOW}{bucket}{C_RESET}"
    return f"{C_RED}{bucket}{C_RESET}"


# =========================
# SAFE FILE WRITERS
# =========================

def sync_file(f):
    f.flush()
    os.fsync(f.fileno())


def atomic_write_text(path, text):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    fd, tmp = tempfile.mkstemp(prefix=".tmp_", dir=os.path.dirname(path))
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            f.write(text)
            sync_file(f)
        os.replace(tmp, path)
    finally:
        try:
            if os.path.exists(tmp):
                os.remove(tmp)
        except Exception:
            pass


class SafeCsvWriter:
    def __init__(self, path, fieldnames):
        self.path = path
        os.makedirs(os.path.dirname(path), exist_ok=True)
        file_exists = os.path.exists(path) and os.path.getsize(path) > 0
        self.f = open(path, "a", newline="", encoding="utf-8", buffering=1)
        self.writer = csv.DictWriter(self.f, fieldnames=fieldnames)
        if not file_exists:
            self.writer.writeheader()
            sync_file(self.f)

    def writerow(self, row):
        self.writer.writerow(row)
        sync_file(self.f)

    def close(self):
        try:
            self.f.close()
        except Exception:
            pass


class SafeLineWriter:
    def __init__(self, path):
        self.path = path
        os.makedirs(os.path.dirname(path), exist_ok=True)
        self.f = open(path, "a", encoding="utf-8", buffering=1)

    def write_line(self, text):
        self.f.write(text.rstrip("\n") + "\n")
        sync_file(self.f)

    def close(self):
        try:
            self.f.close()
        except Exception:
            pass


# =========================
# SIGNAL HANDLER
# =========================

def signal_handler(signum, frame):
    if not STOP_EVENT.is_set():
        STOP_EVENT.set()
        print(
            f"\n{C_YELLOW}Signal diterima. Menghentikan submit job baru... "
            f"hasil yang sudah selesai tetap disimpan.{C_RESET}"
        )
    else:
        raise KeyboardInterrupt


# =========================
# SESSION PER THREAD
# =========================

def get_session(retries=1, pool_size=100):
    session = getattr(THREAD_LOCAL, "session", None)
    if session is None:
        session = requests.Session()
        retry_cfg = Retry(
            total=retries,
            connect=retries,
            read=0,
            redirect=0,
            status=0,
            backoff_factor=0.2,
            allowed_methods=frozenset(["GET"]),
            raise_on_status=False,
        )
        adapter = HTTPAdapter(
            max_retries=retry_cfg,
            pool_connections=pool_size,
            pool_maxsize=pool_size
        )
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        THREAD_LOCAL.session = session
    return session


# =========================
# DNS
# =========================

def get_dns_info(domain, dns_timeout=2, full_dns=False):
    result = {
        "dns_ok": False,
        "A": [],
        "AAAA": [],
        "CNAME": [],
        "dns_error": "",
    }

    resolver = dns.resolver.Resolver()
    resolver.timeout = dns_timeout
    resolver.lifetime = dns_timeout

    record_types = ["A", "AAAA", "CNAME"]
    if full_dns:
        record_types = ["A", "AAAA", "CNAME"]

    for rtype in record_types:
        try:
            answers = resolver.resolve(domain, rtype)
            vals = []
            for r in answers:
                if hasattr(r, "target"):
                    vals.append(str(r.target).rstrip("."))
                else:
                    vals.append(str(r).rstrip("."))
            result[rtype] = vals
        except dns.resolver.NXDOMAIN:
            result["dns_error"] = "NXDOMAIN"
            return result
        except (dns.resolver.NoAnswer, dns.resolver.NoNameservers, dns.resolver.LifetimeTimeout):
            pass
        except Exception as e:
            if not result["dns_error"]:
                result["dns_error"] = type(e).__name__

    if any(result[k] for k in ["A", "AAAA", "CNAME"]):
        result["dns_ok"] = True
    elif not result["dns_error"]:
        result["dns_error"] = "NO_RECORDS"

    return result


# =========================
# HTTP / PROBE
# =========================

def read_limited_text(resp, max_bytes):
    chunks = []
    total = 0
    try:
        for chunk in resp.iter_content(chunk_size=8192, decode_unicode=False):
            if not chunk:
                continue
            remain = max_bytes - total
            if remain <= 0:
                break
            if len(chunk) > remain:
                chunk = chunk[:remain]
            chunks.append(chunk)
            total += len(chunk)
            if total >= max_bytes:
                break
    except Exception:
        pass

    raw = b"".join(chunks)
    enc = resp.encoding or "utf-8"
    try:
        return raw.decode(enc, errors="replace")
    except Exception:
        return raw.decode("utf-8", errors="replace")


def classify_page(status_code, title, body, content_type):
    blob = ((title or "") + "\n" + (body or "")[:7000]).lower()

    if any(k in blob for k in EXPIRED_KEYWORDS):
        return "EXPIRED / RENEWAL ISSUE"
    if any(k in blob for k in SUSPENDED_KEYWORDS):
        return "SUSPENDED"
    if any(k in blob for k in FOR_SALE_KEYWORDS):
        return "FOR SALE"
    if any(k in blob for k in PARKED_KEYWORDS):
        return "PARKED"
    if any(k in blob for k in DEFAULT_HOSTING_KEYWORDS):
        return "DEFAULT HOSTING PAGE"
    if any(k in blob for k in COMING_SOON_KEYWORDS):
        return "COMING SOON"

    if 200 <= status_code <= 299:
        if content_type and not is_html_like(content_type):
            return "LIVE NON-HTML"
        return "LIVE 200"

    if status_code in (301, 302, 303, 307, 308):
        return "REDIRECT"
    if status_code == 401:
        return "UNAUTHORIZED"
    if status_code == 403:
        return "FORBIDDEN"
    if status_code == 404:
        return "NOT FOUND"
    if status_code == 410:
        return "GONE"
    if status_code == 429:
        return "RATE LIMITED"
    if 500 <= status_code <= 599:
        return "SERVER ERROR"

    return f"HTTP {status_code}"


def score_result(r):
    if not r["ok"]:
        return 0

    pt = r["page_type"]
    sc = r["status_code"]

    if pt == "LIVE 200":
        return 100
    if pt == "LIVE NON-HTML":
        return 98
    if pt in [
        "EXPIRED / RENEWAL ISSUE",
        "SUSPENDED",
        "FOR SALE",
        "PARKED",
        "DEFAULT HOSTING PAGE",
        "COMING SOON",
    ]:
        return 95
    if sc == 403:
        return 85
    if sc == 401:
        return 84
    if 500 <= sc <= 599:
        return 82
    if sc in (404, 410):
        return 80
    if 300 <= sc < 400:
        return 75
    return 10


def probe_url(url, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size):
    session = get_session(retries=retries, pool_size=pool_size)
    current_url = url
    chain = []
    ssl_status = "N/A"

    for _ in range(max_redirects):
        resp = None
        try:
            resp = session.get(
                current_url,
                headers=HEADERS,
                timeout=(connect_timeout, read_timeout),
                allow_redirects=False,
                verify=True,
                stream=True,
            )
            if current_url.startswith("https://"):
                ssl_status = "VALID"

        except requests.exceptions.SSLError:
            ssl_status = "INVALID"
            try:
                resp = session.get(
                    current_url,
                    headers=HEADERS,
                    timeout=(connect_timeout, read_timeout),
                    allow_redirects=False,
                    verify=False,
                    stream=True,
                )
            except requests.exceptions.Timeout:
                return {
                    "ok": False, "start_url": url, "final_url": current_url, "status_code": "",
                    "page_type": "TIMEOUT", "title": "", "chain": " | ".join(chain),
                    "ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "Timeout",
                }
            except requests.exceptions.ConnectionError:
                return {
                    "ok": False, "start_url": url, "final_url": current_url, "status_code": "",
                    "page_type": "CONNECTION FAILED", "title": "", "chain": " | ".join(chain),
                    "ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "ConnectionError",
                }
            except Exception as e:
                return {
                    "ok": False, "start_url": url, "final_url": current_url, "status_code": "",
                    "page_type": "SSL ERROR", "title": "", "chain": " | ".join(chain),
                    "ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": type(e).__name__,
                }

        except requests.exceptions.Timeout:
            return {
                "ok": False, "start_url": url, "final_url": current_url, "status_code": "",
                "page_type": "TIMEOUT", "title": "", "chain": " | ".join(chain),
                "ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "Timeout",
            }

        except requests.exceptions.ConnectionError:
            return {
                "ok": False, "start_url": url, "final_url": current_url, "status_code": "",
                "page_type": "CONNECTION FAILED", "title": "", "chain": " | ".join(chain),
                "ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "ConnectionError",
            }

        except Exception as e:
            return {
                "ok": False, "start_url": url, "final_url": current_url, "status_code": "",
                "page_type": "REQUEST ERROR", "title": "", "chain": " | ".join(chain),
                "ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": type(e).__name__,
            }

        try:
            elapsed_ms = int(resp.elapsed.total_seconds() * 1000)
        except Exception:
            elapsed_ms = ""

        chain.append(f"{resp.status_code} {current_url}")

        if 300 <= resp.status_code < 400 and resp.headers.get("Location"):
            next_url = urljoin(current_url, resp.headers.get("Location"))
            try:
                resp.close()
            except Exception:
                pass
            current_url = next_url
            continue

        content_type = resp.headers.get("Content-Type", "")
        body = ""
        if resp.status_code not in (204, 304):
            body = read_limited_text(resp, max_bytes)

        title = extract_title(body)
        page_type = classify_page(resp.status_code, title, body, content_type)

        final_url = resp.url
        try:
            resp.close()
        except Exception:
            pass

        return {
            "ok": True,
            "start_url": url,
            "final_url": final_url,
            "status_code": resp.status_code,
            "page_type": page_type,
            "title": title,
            "chain": " | ".join(chain),
            "ssl_status": ssl_status,
            "content_type": content_type,
            "elapsed_ms": elapsed_ms,
            "error": "",
        }

    return {
        "ok": False,
        "start_url": url,
        "final_url": current_url,
        "status_code": "",
        "page_type": "TOO MANY REDIRECTS",
        "title": "",
        "chain": " | ".join(chain),
        "ssl_status": ssl_status,
        "content_type": "",
        "elapsed_ms": "",
        "error": "TooManyRedirects",
    }


def is_good_enough(result):
    if not result["ok"]:
        return False

    if result["page_type"] in [
        "LIVE 200", "LIVE NON-HTML",
        "PARKED", "FOR SALE", "DEFAULT HOSTING PAGE",
        "COMING SOON", "SUSPENDED", "EXPIRED / RENEWAL ISSUE"
    ]:
        return True

    if result["status_code"] in (401, 403, 404, 410, 500, 501, 502, 503, 504):
        return True

    return False


def best_probe(domain, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size):
    """
    Lebih cepat dari versi lama:
    - utamakan https://domain
    - lalu http://domain
    - hanya coba www jika hasil awal masih jelek
    """
    candidates_primary = [
        f"https://{domain}",
        f"http://{domain}",
    ]
    candidates_www = [
        f"https://www.{domain}",
        f"http://www.{domain}",
    ]

    results = []
    best = None

    for url in candidates_primary:
        r = probe_url(url, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size)
        results.append(r)
        if best is None or score_result(r) > score_result(best):
            best = r
        if is_good_enough(best):
            return results, best

    for url in candidates_www:
        r = probe_url(url, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size)
        results.append(r)
        if best is None or score_result(r) > score_result(best):
            best = r
        if is_good_enough(best):
            return results, best

    return results, best


# =========================
# CLASSIFICATION
# =========================

def classify_bucket(best):
    if best["page_type"] in ["LIVE 200", "LIVE NON-HTML"]:
        return "LIVE_OK"

    if best["status_code"] != "":
        return "WARNING"

    if best["page_type"] == "TOO MANY REDIRECTS" and best["chain"]:
        return "WARNING"

    return "OFFLINE"


def build_notes(domain, dns_info, best):
    notes = []

    if not dns_info["dns_ok"]:
        notes.append("DNS problem")

    if best["ssl_status"] == "INVALID":
        notes.append("SSL invalid")

    final_url = best["final_url"] or ""
    start_url = best["start_url"] or ""
    final_host = host_of(final_url)

    if start_url and final_url and start_url != final_url:
        notes.append("Redirected")

    if final_url.startswith("http://"):
        notes.append("HTTP only")

    if final_host and final_host not in {domain, f"www.{domain}"}:
        notes.append(f"Redirect to other host: {final_host}")

    pt = best["page_type"]
    if pt == "DEFAULT HOSTING PAGE":
        notes.append("Server default page")
    elif pt == "PARKED":
        notes.append("Parked domain")
    elif pt == "FOR SALE":
        notes.append("Domain for sale")
    elif pt == "COMING SOON":
        notes.append("Coming soon page")
    elif pt == "SUSPENDED":
        notes.append("Suspended; check billing/hosting")
    elif pt == "EXPIRED / RENEWAL ISSUE":
        notes.append("Expired or renewal issue")
    elif pt == "SERVER ERROR":
        notes.append("Website reachable but server error")
    elif pt == "NOT FOUND":
        notes.append("Host reachable but page not found")
    elif pt == "FORBIDDEN":
        notes.append("Host reachable but forbidden")
    elif pt == "CONNECTION FAILED":
        notes.append("Cannot connect to web server")
    elif pt == "TIMEOUT":
        notes.append("Request timeout")

    ct = (best["content_type"] or "").strip()
    if ct and not is_html_like(ct):
        notes.append(f"Non-HTML content: {ct}")

    return "; ".join(notes)


def summarize_domain(domain, args):
    dns_info = get_dns_info(domain, dns_timeout=args.dns_timeout, full_dns=False)

    attempts, best = best_probe(
        domain=domain,
        connect_timeout=args.connect_timeout,
        read_timeout=args.read_timeout,
        max_redirects=args.max_redirects,
        max_bytes=args.max_bytes,
        retries=args.retries,
        pool_size=max(args.workers, 20),
    )

    bucket = classify_bucket(best)
    notes = build_notes(domain, dns_info, best)

    row = {
        "checked_at": now_utc(),
        "domain": domain,
        "bucket": bucket,
        "page_type": best["page_type"],
        "dns_ok": dns_info["dns_ok"],
        "dns_error": dns_info["dns_error"],
        "status_code": best["status_code"],
        "ssl_status": best["ssl_status"],
        "content_type": best["content_type"],
        "elapsed_ms": best["elapsed_ms"],
        "best_start_url": best["start_url"],
        "best_final_url": best["final_url"],
        "title": best["title"],
        "notes": notes,
        "error": best["error"],
        "redirect_chain": best["chain"],
        "A": ", ".join(dns_info["A"]),
        "AAAA": ", ".join(dns_info["AAAA"]),
        "CNAME": ", ".join(dns_info["CNAME"]),
        "all_attempts": " || ".join(
            f'{r["start_url"]} => {r["page_type"]} ({r["status_code"]}) -> {r["final_url"]}'
            for r in attempts
        ),
    }
    return row


def fallback_error_row(domain, err_msg):
    return {
        "checked_at": now_utc(),
        "domain": domain,
        "bucket": "OFFLINE",
        "page_type": "SCRIPT ERROR",
        "dns_ok": "",
        "dns_error": "",
        "status_code": "",
        "ssl_status": "",
        "content_type": "",
        "elapsed_ms": "",
        "best_start_url": "",
        "best_final_url": "",
        "title": "",
        "notes": "Internal script error",
        "error": err_msg,
        "redirect_chain": "",
        "A": "",
        "AAAA": "",
        "CNAME": "",
        "all_attempts": "",
    }


# =========================
# SUMMARY
# =========================

def update_summary(path, total_input, skipped_resume, processed_now, counts_bucket, counts_type):
    lines = []
    lines.append("DOMAIN STATUS FAST CHECKER")
    lines.append("=" * 40)
    lines.append(f"generated_at   : {now_utc()}")
    lines.append(f"total_input    : {total_input}")
    lines.append(f"skipped_resume : {skipped_resume}")
    lines.append(f"processed_now  : {processed_now}")
    lines.append(f"remaining_est  : {max(total_input - skipped_resume - processed_now, 0)}")
    lines.append("")
    lines.append("BUCKET COUNTS")
    lines.append("-" * 40)
    for k in ["LIVE_OK", "WARNING", "OFFLINE"]:
        lines.append(f"{k:12}: {counts_bucket.get(k, 0)}")

    lines.append("")
    lines.append("PAGE TYPE COUNTS")
    lines.append("-" * 40)
    for k, v in counts_type.most_common():
        lines.append(f"{k:28}: {v}")

    atomic_write_text(path, "\n".join(lines) + "\n")


# =========================
# WRITER THREAD
# =========================

def writer_loop(result_queue, args, total_input, skipped_resume):
    out = args.output
    os.makedirs(out, exist_ok=True)

    all_csv = os.path.join(out, "all_results.csv")
    hidup_csv = os.path.join(out, "hidup_results.csv")
    live_ok_csv = os.path.join(out, "live_ok_results.csv")
    warning_csv = os.path.join(out, "warning_results.csv")
    offline_csv = os.path.join(out, "offline_results.csv")

    hidup_txt = os.path.join(out, "hidup_domains.txt")
    live_ok_txt = os.path.join(out, "live_ok_domains.txt")
    warning_txt = os.path.join(out, "warning_domains.txt")
    offline_txt = os.path.join(out, "offline_domains.txt")

    summary_txt = os.path.join(out, "summary.txt")
    progress_log = os.path.join(out, "progress.log")

    all_writer = SafeCsvWriter(all_csv, FIELDNAMES)
    hidup_writer = SafeCsvWriter(hidup_csv, FIELDNAMES)
    live_ok_writer = SafeCsvWriter(live_ok_csv, FIELDNAMES)
    warning_writer = SafeCsvWriter(warning_csv, FIELDNAMES)
    offline_writer = SafeCsvWriter(offline_csv, FIELDNAMES)

    hidup_list_writer = SafeLineWriter(hidup_txt)
    live_ok_list_writer = SafeLineWriter(live_ok_txt)
    warning_list_writer = SafeLineWriter(warning_txt)
    offline_list_writer = SafeLineWriter(offline_txt)
    log_writer = SafeLineWriter(progress_log)

    counts_bucket = Counter()
    counts_type = Counter()
    processed_now = 0

    update_summary(summary_txt, total_input, skipped_resume, processed_now, counts_bucket, counts_type)
    log_writer.write_line(f"RUN START {now_utc()} total_input={total_input} skipped_resume={skipped_resume}")

    while True:
        item = result_queue.get()
        if item is None:
            break

        row = item
        domain = row["domain"]

        all_writer.writerow(row)

        if row["bucket"] in ("LIVE_OK", "WARNING"):
            hidup_writer.writerow(row)
            hidup_list_writer.write_line(domain)

        if row["bucket"] == "LIVE_OK":
            live_ok_writer.writerow(row)
            live_ok_list_writer.write_line(domain)
        elif row["bucket"] == "WARNING":
            warning_writer.writerow(row)
            warning_list_writer.write_line(domain)
        else:
            offline_writer.writerow(row)
            offline_list_writer.write_line(domain)

        counts_bucket[row["bucket"]] += 1
        counts_type[row["page_type"]] += 1
        processed_now += 1

        update_summary(summary_txt, total_input, skipped_resume, processed_now, counts_bucket, counts_type)
        log_writer.write_line(
            f"DONE {now_utc()} {domain} bucket={row['bucket']} type={row['page_type']} code={row['status_code']}"
        )

        code_text = str(row["status_code"]) if row["status_code"] != "" else "-"
        print(
            f"[{processed_now}] "
            f"{domain:30} -> {colorize_bucket(row['bucket']):18} | "
            f"{row['page_type'][:28]:28} | "
            f"{code_text:4} | "
            f"{str(row['elapsed_ms'])[:6]:6} ms | "
            f"{row['best_final_url'][:45]}"
        )

    log_writer.write_line(f"RUN END {now_utc()} processed_now={processed_now}")

    all_writer.close()
    hidup_writer.close()
    live_ok_writer.close()
    warning_writer.close()
    offline_writer.close()

    hidup_list_writer.close()
    live_ok_list_writer.close()
    warning_list_writer.close()
    offline_list_writer.close()
    log_writer.close()


# =========================
# WORKER
# =========================

def worker(domain, args, result_queue):
    if STOP_EVENT.is_set():
        return

    try:
        row = summarize_domain(domain, args)
    except Exception as e:
        row = fallback_error_row(domain, f"{type(e).__name__}: {e}")

    result_queue.put(row)

    if args.delay > 0:
        time.sleep(args.delay)


# =========================
# FILE CLEANUP
# =========================

def remove_if_exists(path):
    try:
        if os.path.exists(path):
            os.remove(path)
    except Exception:
        pass


def cleanup_output(outdir):
    files = [
        "all_results.csv",
        "hidup_results.csv",
        "live_ok_results.csv",
        "warning_results.csv",
        "offline_results.csv",
        "hidup_domains.txt",
        "live_ok_domains.txt",
        "warning_domains.txt",
        "offline_domains.txt",
        "summary.txt",
        "progress.log",
    ]
    for f in files:
        remove_if_exists(os.path.join(outdir, f))


# =========================
# CLI
# =========================

def parse_args():
    parser = argparse.ArgumentParser(description="Fast CLI Domain Status Checker")
    parser.add_argument("-i", "--input", required=True, help="File domain input (.txt)")
    parser.add_argument("-o", "--output", default="results", help="Folder output")
    parser.add_argument("-w", "--workers", type=int, default=DEFAULT_WORKERS, help="Jumlah worker paralel")
    parser.add_argument("--connect-timeout", type=int, default=DEFAULT_CONNECT_TIMEOUT, help="Connect timeout")
    parser.add_argument("--read-timeout", type=int, default=DEFAULT_READ_TIMEOUT, help="Read timeout")
    parser.add_argument("--dns-timeout", type=int, default=DEFAULT_DNS_TIMEOUT, help="DNS timeout")
    parser.add_argument("--max-redirects", type=int, default=DEFAULT_MAX_REDIRECTS, help="Maks redirect")
    parser.add_argument("--max-bytes", type=int, default=DEFAULT_MAX_BYTES, help="Max body dibaca")
    parser.add_argument("--retries", type=int, default=DEFAULT_RETRIES, help="Retry koneksi ringan")
    parser.add_argument("--delay", type=float, default=0.0, help="Delay per domain")
    parser.add_argument("--fresh", action="store_true", help="Hapus hasil lama")
    parser.add_argument("--no-resume", action="store_true", help="Jangan skip hasil lama")
    return parser.parse_args()


def main():
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    args = parse_args()
    os.makedirs(args.output, exist_ok=True)

    if args.fresh:
        cleanup_output(args.output)

    all_csv = os.path.join(args.output, "all_results.csv")

    domains = load_domains(args.input)
    if not domains:
        print("Tidak ada domain valid di file input.")
        sys.exit(1)

    processed = set()
    if not args.no_resume:
        processed = load_processed_domains(all_csv)

    queue_domains = [d for d in domains if d not in processed]

    print(f"{C_CYAN}Total input    : {len(domains)}{C_RESET}")
    print(f"{C_CYAN}Sudah diproses : {len(processed)}{C_RESET}")
    print(f"{C_CYAN}Akan diproses  : {len(queue_domains)}{C_RESET}")
    print(f"{C_CYAN}Workers        : {args.workers}{C_RESET}")
    print(f"{C_CYAN}Output folder  : {args.output}{C_RESET}")
    print("-" * 110)

    result_queue = Queue()
    writer_thread = threading.Thread(
        target=writer_loop,
        args=(result_queue, args, len(domains), len(processed)),
        daemon=True
    )
    writer_thread.start()

    executor = ThreadPoolExecutor(max_workers=args.workers)

    try:
        futures = []
        for domain in queue_domains:
            if STOP_EVENT.is_set():
                break
            futures.append(executor.submit(worker, domain, args, result_queue))

        for f in futures:
            if STOP_EVENT.is_set():
                break
            try:
                f.result()
            except Exception:
                pass

    except KeyboardInterrupt:
        STOP_EVENT.set()
        print(f"\n{C_RED}Dihentikan paksa oleh user.{C_RESET}")

    finally:
        try:
            executor.shutdown(wait=False, cancel_futures=True)
        except TypeError:
            executor.shutdown(wait=False)

        # beri waktu singkat supaya worker yang sudah selesai sempat push queue
        time.sleep(0.5)
        result_queue.put(None)
        writer_thread.join(timeout=10)

        print("-" * 110)
        print(f"{C_GREEN}Selesai / berhenti aman.{C_RESET}")
        print(f"All CSV      : {os.path.join(args.output, 'all_results.csv')}")
        print(f"Hidup CSV    : {os.path.join(args.output, 'hidup_results.csv')}")
        print(f"Live OK CSV  : {os.path.join(args.output, 'live_ok_results.csv')}")
        print(f"Warning CSV  : {os.path.join(args.output, 'warning_results.csv')}")
        print(f"Offline CSV  : {os.path.join(args.output, 'offline_results.csv')}")
        print(f"Summary      : {os.path.join(args.output, 'summary.txt')}")


if __name__ == "__main__":
    main()
🚀 BLOG PRO v3.0 - Full Ghost-like Features!
Find Dead Norecords

Your email address will not be published. Required fields are marked *