import argparse
import csv
import os
import re
import signal
import sys
import tempfile
import threading
import time
import warnings
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from queue import Queue
from urllib.parse import urljoin, urlparse
import dns.resolver
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from urllib3.exceptions import InsecureRequestWarning
from urllib3.util.retry import Retry
warnings.simplefilter("ignore", InsecureRequestWarning)
# =========================
# GLOBALS
# =========================
STOP_EVENT = threading.Event()
THREAD_LOCAL = threading.local()
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,text/plain;q=0.8,*/*;q=0.7",
}
DEFAULT_CONNECT_TIMEOUT = 5
DEFAULT_READ_TIMEOUT = 7
DEFAULT_DNS_TIMEOUT = 2
DEFAULT_MAX_REDIRECTS = 8
DEFAULT_MAX_BYTES = 98304 # 96 KB
DEFAULT_RETRIES = 1
DEFAULT_WORKERS = 25
FOR_SALE_KEYWORDS = [
"domain is for sale",
"buy this domain",
"this domain may be for sale",
"purchase this domain",
"afternic",
"sedo",
"dan.com",
"undeveloped",
]
PARKED_KEYWORDS = [
"domain parked",
"parked free",
"parkingcrew",
"bodis",
"cashparking",
"sedo parking",
"parked domain",
"this domain is parked",
]
DEFAULT_HOSTING_KEYWORDS = [
"apache2 ubuntu default page",
"apache2 debian default page",
"welcome to nginx",
"nginx test page",
"test page for the nginx",
"default web site page",
"iis windows server",
]
COMING_SOON_KEYWORDS = [
"coming soon",
"under construction",
"launching soon",
"website coming soon",
"site is coming soon",
]
SUSPENDED_KEYWORDS = [
"this account has been suspended",
"account suspended",
"website suspended",
"site suspended",
"hosting account has been suspended",
"suspended due to non-payment",
"please contact billing",
"contact your hosting provider",
"billing issue",
]
EXPIRED_KEYWORDS = [
"this domain has expired",
"domain expired",
"expired domain",
"renew this domain",
"renewal required",
"domain renewal",
"renew now",
"expiration notice",
"registrant verification failed",
"has expired and may be available",
]
FIELDNAMES = [
"checked_at",
"domain",
"bucket",
"page_type",
"dns_ok",
"dns_error",
"status_code",
"ssl_status",
"content_type",
"elapsed_ms",
"best_start_url",
"best_final_url",
"title",
"notes",
"error",
"redirect_chain",
"A",
"AAAA",
"CNAME",
"all_attempts",
]
# ANSI color
C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_CYAN = "\033[96m"
C_DIM = "\033[2m"
# =========================
# UTIL
# =========================
def now_utc():
return datetime.now(timezone.utc).isoformat()
def clean_text(s):
if not s:
return ""
return re.sub(r"\s+", " ", s).strip()
def normalize_domain(raw):
s = raw.strip()
if not s or s.startswith("#"):
return ""
s = s.split("#", 1)[0].strip()
if not s:
return ""
if "://" not in s:
s = "http://" + s
try:
p = urlparse(s)
host = p.netloc or p.path
host = host.split("/")[0].split(":")[0].strip().lower().strip(".")
if host.startswith("www."):
host = host[4:]
return host
except Exception:
return ""
def load_domains(input_file):
domains = []
seen = set()
with open(input_file, "r", encoding="utf-8") as f:
for line in f:
d = normalize_domain(line)
if d and d not in seen:
seen.add(d)
domains.append(d)
return domains
def load_processed_domains(all_results_csv):
processed = set()
if not os.path.exists(all_results_csv) or os.path.getsize(all_results_csv) == 0:
return processed
try:
with open(all_results_csv, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
d = (row.get("domain") or "").strip().lower()
if d:
processed.add(d)
except Exception:
pass
return processed
def host_of(url):
try:
return (urlparse(url).hostname or "").lower()
except Exception:
return ""
def is_html_like(content_type):
ct = (content_type or "").lower()
return any(x in ct for x in [
"text/html",
"application/xhtml+xml",
"text/plain",
"application/xml",
"text/xml",
])
def extract_title(html):
if not html:
return ""
try:
soup = BeautifulSoup(html, "html.parser")
if soup.title and soup.title.string:
return clean_text(soup.title.string)
except Exception:
pass
m = re.search(r"<title[^>]*>(.*?)</title>", html, re.I | re.S)
if m:
return clean_text(m.group(1))
return ""
def colorize_bucket(bucket):
if bucket == "LIVE_OK":
return f"{C_GREEN}{bucket}{C_RESET}"
if bucket == "WARNING":
return f"{C_YELLOW}{bucket}{C_RESET}"
return f"{C_RED}{bucket}{C_RESET}"
# =========================
# SAFE FILE WRITERS
# =========================
def sync_file(f):
f.flush()
os.fsync(f.fileno())
def atomic_write_text(path, text):
os.makedirs(os.path.dirname(path), exist_ok=True)
fd, tmp = tempfile.mkstemp(prefix=".tmp_", dir=os.path.dirname(path))
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(text)
sync_file(f)
os.replace(tmp, path)
finally:
try:
if os.path.exists(tmp):
os.remove(tmp)
except Exception:
pass
class SafeCsvWriter:
def __init__(self, path, fieldnames):
self.path = path
os.makedirs(os.path.dirname(path), exist_ok=True)
file_exists = os.path.exists(path) and os.path.getsize(path) > 0
self.f = open(path, "a", newline="", encoding="utf-8", buffering=1)
self.writer = csv.DictWriter(self.f, fieldnames=fieldnames)
if not file_exists:
self.writer.writeheader()
sync_file(self.f)
def writerow(self, row):
self.writer.writerow(row)
sync_file(self.f)
def close(self):
try:
self.f.close()
except Exception:
pass
class SafeLineWriter:
def __init__(self, path):
self.path = path
os.makedirs(os.path.dirname(path), exist_ok=True)
self.f = open(path, "a", encoding="utf-8", buffering=1)
def write_line(self, text):
self.f.write(text.rstrip("\n") + "\n")
sync_file(self.f)
def close(self):
try:
self.f.close()
except Exception:
pass
# =========================
# SIGNAL HANDLER
# =========================
def signal_handler(signum, frame):
if not STOP_EVENT.is_set():
STOP_EVENT.set()
print(
f"\n{C_YELLOW}Signal diterima. Menghentikan submit job baru... "
f"hasil yang sudah selesai tetap disimpan.{C_RESET}"
)
else:
raise KeyboardInterrupt
# =========================
# SESSION PER THREAD
# =========================
def get_session(retries=1, pool_size=100):
session = getattr(THREAD_LOCAL, "session", None)
if session is None:
session = requests.Session()
retry_cfg = Retry(
total=retries,
connect=retries,
read=0,
redirect=0,
status=0,
backoff_factor=0.2,
allowed_methods=frozenset(["GET"]),
raise_on_status=False,
)
adapter = HTTPAdapter(
max_retries=retry_cfg,
pool_connections=pool_size,
pool_maxsize=pool_size
)
session.mount("http://", adapter)
session.mount("https://", adapter)
THREAD_LOCAL.session = session
return session
# =========================
# DNS
# =========================
def get_dns_info(domain, dns_timeout=2, full_dns=False):
result = {
"dns_ok": False,
"A": [],
"AAAA": [],
"CNAME": [],
"dns_error": "",
}
resolver = dns.resolver.Resolver()
resolver.timeout = dns_timeout
resolver.lifetime = dns_timeout
record_types = ["A", "AAAA", "CNAME"]
if full_dns:
record_types = ["A", "AAAA", "CNAME"]
for rtype in record_types:
try:
answers = resolver.resolve(domain, rtype)
vals = []
for r in answers:
if hasattr(r, "target"):
vals.append(str(r.target).rstrip("."))
else:
vals.append(str(r).rstrip("."))
result[rtype] = vals
except dns.resolver.NXDOMAIN:
result["dns_error"] = "NXDOMAIN"
return result
except (dns.resolver.NoAnswer, dns.resolver.NoNameservers, dns.resolver.LifetimeTimeout):
pass
except Exception as e:
if not result["dns_error"]:
result["dns_error"] = type(e).__name__
if any(result[k] for k in ["A", "AAAA", "CNAME"]):
result["dns_ok"] = True
elif not result["dns_error"]:
result["dns_error"] = "NO_RECORDS"
return result
# =========================
# HTTP / PROBE
# =========================
def read_limited_text(resp, max_bytes):
chunks = []
total = 0
try:
for chunk in resp.iter_content(chunk_size=8192, decode_unicode=False):
if not chunk:
continue
remain = max_bytes - total
if remain <= 0:
break
if len(chunk) > remain:
chunk = chunk[:remain]
chunks.append(chunk)
total += len(chunk)
if total >= max_bytes:
break
except Exception:
pass
raw = b"".join(chunks)
enc = resp.encoding or "utf-8"
try:
return raw.decode(enc, errors="replace")
except Exception:
return raw.decode("utf-8", errors="replace")
def classify_page(status_code, title, body, content_type):
blob = ((title or "") + "\n" + (body or "")[:7000]).lower()
if any(k in blob for k in EXPIRED_KEYWORDS):
return "EXPIRED / RENEWAL ISSUE"
if any(k in blob for k in SUSPENDED_KEYWORDS):
return "SUSPENDED"
if any(k in blob for k in FOR_SALE_KEYWORDS):
return "FOR SALE"
if any(k in blob for k in PARKED_KEYWORDS):
return "PARKED"
if any(k in blob for k in DEFAULT_HOSTING_KEYWORDS):
return "DEFAULT HOSTING PAGE"
if any(k in blob for k in COMING_SOON_KEYWORDS):
return "COMING SOON"
if 200 <= status_code <= 299:
if content_type and not is_html_like(content_type):
return "LIVE NON-HTML"
return "LIVE 200"
if status_code in (301, 302, 303, 307, 308):
return "REDIRECT"
if status_code == 401:
return "UNAUTHORIZED"
if status_code == 403:
return "FORBIDDEN"
if status_code == 404:
return "NOT FOUND"
if status_code == 410:
return "GONE"
if status_code == 429:
return "RATE LIMITED"
if 500 <= status_code <= 599:
return "SERVER ERROR"
return f"HTTP {status_code}"
def score_result(r):
if not r["ok"]:
return 0
pt = r["page_type"]
sc = r["status_code"]
if pt == "LIVE 200":
return 100
if pt == "LIVE NON-HTML":
return 98
if pt in [
"EXPIRED / RENEWAL ISSUE",
"SUSPENDED",
"FOR SALE",
"PARKED",
"DEFAULT HOSTING PAGE",
"COMING SOON",
]:
return 95
if sc == 403:
return 85
if sc == 401:
return 84
if 500 <= sc <= 599:
return 82
if sc in (404, 410):
return 80
if 300 <= sc < 400:
return 75
return 10
def probe_url(url, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size):
session = get_session(retries=retries, pool_size=pool_size)
current_url = url
chain = []
ssl_status = "N/A"
for _ in range(max_redirects):
resp = None
try:
resp = session.get(
current_url,
headers=HEADERS,
timeout=(connect_timeout, read_timeout),
allow_redirects=False,
verify=True,
stream=True,
)
if current_url.startswith("https://"):
ssl_status = "VALID"
except requests.exceptions.SSLError:
ssl_status = "INVALID"
try:
resp = session.get(
current_url,
headers=HEADERS,
timeout=(connect_timeout, read_timeout),
allow_redirects=False,
verify=False,
stream=True,
)
except requests.exceptions.Timeout:
return {
"ok": False, "start_url": url, "final_url": current_url, "status_code": "",
"page_type": "TIMEOUT", "title": "", "chain": " | ".join(chain),
"ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "Timeout",
}
except requests.exceptions.ConnectionError:
return {
"ok": False, "start_url": url, "final_url": current_url, "status_code": "",
"page_type": "CONNECTION FAILED", "title": "", "chain": " | ".join(chain),
"ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "ConnectionError",
}
except Exception as e:
return {
"ok": False, "start_url": url, "final_url": current_url, "status_code": "",
"page_type": "SSL ERROR", "title": "", "chain": " | ".join(chain),
"ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": type(e).__name__,
}
except requests.exceptions.Timeout:
return {
"ok": False, "start_url": url, "final_url": current_url, "status_code": "",
"page_type": "TIMEOUT", "title": "", "chain": " | ".join(chain),
"ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "Timeout",
}
except requests.exceptions.ConnectionError:
return {
"ok": False, "start_url": url, "final_url": current_url, "status_code": "",
"page_type": "CONNECTION FAILED", "title": "", "chain": " | ".join(chain),
"ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": "ConnectionError",
}
except Exception as e:
return {
"ok": False, "start_url": url, "final_url": current_url, "status_code": "",
"page_type": "REQUEST ERROR", "title": "", "chain": " | ".join(chain),
"ssl_status": ssl_status, "content_type": "", "elapsed_ms": "", "error": type(e).__name__,
}
try:
elapsed_ms = int(resp.elapsed.total_seconds() * 1000)
except Exception:
elapsed_ms = ""
chain.append(f"{resp.status_code} {current_url}")
if 300 <= resp.status_code < 400 and resp.headers.get("Location"):
next_url = urljoin(current_url, resp.headers.get("Location"))
try:
resp.close()
except Exception:
pass
current_url = next_url
continue
content_type = resp.headers.get("Content-Type", "")
body = ""
if resp.status_code not in (204, 304):
body = read_limited_text(resp, max_bytes)
title = extract_title(body)
page_type = classify_page(resp.status_code, title, body, content_type)
final_url = resp.url
try:
resp.close()
except Exception:
pass
return {
"ok": True,
"start_url": url,
"final_url": final_url,
"status_code": resp.status_code,
"page_type": page_type,
"title": title,
"chain": " | ".join(chain),
"ssl_status": ssl_status,
"content_type": content_type,
"elapsed_ms": elapsed_ms,
"error": "",
}
return {
"ok": False,
"start_url": url,
"final_url": current_url,
"status_code": "",
"page_type": "TOO MANY REDIRECTS",
"title": "",
"chain": " | ".join(chain),
"ssl_status": ssl_status,
"content_type": "",
"elapsed_ms": "",
"error": "TooManyRedirects",
}
def is_good_enough(result):
if not result["ok"]:
return False
if result["page_type"] in [
"LIVE 200", "LIVE NON-HTML",
"PARKED", "FOR SALE", "DEFAULT HOSTING PAGE",
"COMING SOON", "SUSPENDED", "EXPIRED / RENEWAL ISSUE"
]:
return True
if result["status_code"] in (401, 403, 404, 410, 500, 501, 502, 503, 504):
return True
return False
def best_probe(domain, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size):
"""
Lebih cepat dari versi lama:
- utamakan https://domain
- lalu http://domain
- hanya coba www jika hasil awal masih jelek
"""
candidates_primary = [
f"https://{domain}",
f"http://{domain}",
]
candidates_www = [
f"https://www.{domain}",
f"http://www.{domain}",
]
results = []
best = None
for url in candidates_primary:
r = probe_url(url, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size)
results.append(r)
if best is None or score_result(r) > score_result(best):
best = r
if is_good_enough(best):
return results, best
for url in candidates_www:
r = probe_url(url, connect_timeout, read_timeout, max_redirects, max_bytes, retries, pool_size)
results.append(r)
if best is None or score_result(r) > score_result(best):
best = r
if is_good_enough(best):
return results, best
return results, best
# =========================
# CLASSIFICATION
# =========================
def classify_bucket(best):
if best["page_type"] in ["LIVE 200", "LIVE NON-HTML"]:
return "LIVE_OK"
if best["status_code"] != "":
return "WARNING"
if best["page_type"] == "TOO MANY REDIRECTS" and best["chain"]:
return "WARNING"
return "OFFLINE"
def build_notes(domain, dns_info, best):
notes = []
if not dns_info["dns_ok"]:
notes.append("DNS problem")
if best["ssl_status"] == "INVALID":
notes.append("SSL invalid")
final_url = best["final_url"] or ""
start_url = best["start_url"] or ""
final_host = host_of(final_url)
if start_url and final_url and start_url != final_url:
notes.append("Redirected")
if final_url.startswith("http://"):
notes.append("HTTP only")
if final_host and final_host not in {domain, f"www.{domain}"}:
notes.append(f"Redirect to other host: {final_host}")
pt = best["page_type"]
if pt == "DEFAULT HOSTING PAGE":
notes.append("Server default page")
elif pt == "PARKED":
notes.append("Parked domain")
elif pt == "FOR SALE":
notes.append("Domain for sale")
elif pt == "COMING SOON":
notes.append("Coming soon page")
elif pt == "SUSPENDED":
notes.append("Suspended; check billing/hosting")
elif pt == "EXPIRED / RENEWAL ISSUE":
notes.append("Expired or renewal issue")
elif pt == "SERVER ERROR":
notes.append("Website reachable but server error")
elif pt == "NOT FOUND":
notes.append("Host reachable but page not found")
elif pt == "FORBIDDEN":
notes.append("Host reachable but forbidden")
elif pt == "CONNECTION FAILED":
notes.append("Cannot connect to web server")
elif pt == "TIMEOUT":
notes.append("Request timeout")
ct = (best["content_type"] or "").strip()
if ct and not is_html_like(ct):
notes.append(f"Non-HTML content: {ct}")
return "; ".join(notes)
def summarize_domain(domain, args):
dns_info = get_dns_info(domain, dns_timeout=args.dns_timeout, full_dns=False)
attempts, best = best_probe(
domain=domain,
connect_timeout=args.connect_timeout,
read_timeout=args.read_timeout,
max_redirects=args.max_redirects,
max_bytes=args.max_bytes,
retries=args.retries,
pool_size=max(args.workers, 20),
)
bucket = classify_bucket(best)
notes = build_notes(domain, dns_info, best)
row = {
"checked_at": now_utc(),
"domain": domain,
"bucket": bucket,
"page_type": best["page_type"],
"dns_ok": dns_info["dns_ok"],
"dns_error": dns_info["dns_error"],
"status_code": best["status_code"],
"ssl_status": best["ssl_status"],
"content_type": best["content_type"],
"elapsed_ms": best["elapsed_ms"],
"best_start_url": best["start_url"],
"best_final_url": best["final_url"],
"title": best["title"],
"notes": notes,
"error": best["error"],
"redirect_chain": best["chain"],
"A": ", ".join(dns_info["A"]),
"AAAA": ", ".join(dns_info["AAAA"]),
"CNAME": ", ".join(dns_info["CNAME"]),
"all_attempts": " || ".join(
f'{r["start_url"]} => {r["page_type"]} ({r["status_code"]}) -> {r["final_url"]}'
for r in attempts
),
}
return row
def fallback_error_row(domain, err_msg):
return {
"checked_at": now_utc(),
"domain": domain,
"bucket": "OFFLINE",
"page_type": "SCRIPT ERROR",
"dns_ok": "",
"dns_error": "",
"status_code": "",
"ssl_status": "",
"content_type": "",
"elapsed_ms": "",
"best_start_url": "",
"best_final_url": "",
"title": "",
"notes": "Internal script error",
"error": err_msg,
"redirect_chain": "",
"A": "",
"AAAA": "",
"CNAME": "",
"all_attempts": "",
}
# =========================
# SUMMARY
# =========================
def update_summary(path, total_input, skipped_resume, processed_now, counts_bucket, counts_type):
lines = []
lines.append("DOMAIN STATUS FAST CHECKER")
lines.append("=" * 40)
lines.append(f"generated_at : {now_utc()}")
lines.append(f"total_input : {total_input}")
lines.append(f"skipped_resume : {skipped_resume}")
lines.append(f"processed_now : {processed_now}")
lines.append(f"remaining_est : {max(total_input - skipped_resume - processed_now, 0)}")
lines.append("")
lines.append("BUCKET COUNTS")
lines.append("-" * 40)
for k in ["LIVE_OK", "WARNING", "OFFLINE"]:
lines.append(f"{k:12}: {counts_bucket.get(k, 0)}")
lines.append("")
lines.append("PAGE TYPE COUNTS")
lines.append("-" * 40)
for k, v in counts_type.most_common():
lines.append(f"{k:28}: {v}")
atomic_write_text(path, "\n".join(lines) + "\n")
# =========================
# WRITER THREAD
# =========================
def writer_loop(result_queue, args, total_input, skipped_resume):
out = args.output
os.makedirs(out, exist_ok=True)
all_csv = os.path.join(out, "all_results.csv")
hidup_csv = os.path.join(out, "hidup_results.csv")
live_ok_csv = os.path.join(out, "live_ok_results.csv")
warning_csv = os.path.join(out, "warning_results.csv")
offline_csv = os.path.join(out, "offline_results.csv")
hidup_txt = os.path.join(out, "hidup_domains.txt")
live_ok_txt = os.path.join(out, "live_ok_domains.txt")
warning_txt = os.path.join(out, "warning_domains.txt")
offline_txt = os.path.join(out, "offline_domains.txt")
summary_txt = os.path.join(out, "summary.txt")
progress_log = os.path.join(out, "progress.log")
all_writer = SafeCsvWriter(all_csv, FIELDNAMES)
hidup_writer = SafeCsvWriter(hidup_csv, FIELDNAMES)
live_ok_writer = SafeCsvWriter(live_ok_csv, FIELDNAMES)
warning_writer = SafeCsvWriter(warning_csv, FIELDNAMES)
offline_writer = SafeCsvWriter(offline_csv, FIELDNAMES)
hidup_list_writer = SafeLineWriter(hidup_txt)
live_ok_list_writer = SafeLineWriter(live_ok_txt)
warning_list_writer = SafeLineWriter(warning_txt)
offline_list_writer = SafeLineWriter(offline_txt)
log_writer = SafeLineWriter(progress_log)
counts_bucket = Counter()
counts_type = Counter()
processed_now = 0
update_summary(summary_txt, total_input, skipped_resume, processed_now, counts_bucket, counts_type)
log_writer.write_line(f"RUN START {now_utc()} total_input={total_input} skipped_resume={skipped_resume}")
while True:
item = result_queue.get()
if item is None:
break
row = item
domain = row["domain"]
all_writer.writerow(row)
if row["bucket"] in ("LIVE_OK", "WARNING"):
hidup_writer.writerow(row)
hidup_list_writer.write_line(domain)
if row["bucket"] == "LIVE_OK":
live_ok_writer.writerow(row)
live_ok_list_writer.write_line(domain)
elif row["bucket"] == "WARNING":
warning_writer.writerow(row)
warning_list_writer.write_line(domain)
else:
offline_writer.writerow(row)
offline_list_writer.write_line(domain)
counts_bucket[row["bucket"]] += 1
counts_type[row["page_type"]] += 1
processed_now += 1
update_summary(summary_txt, total_input, skipped_resume, processed_now, counts_bucket, counts_type)
log_writer.write_line(
f"DONE {now_utc()} {domain} bucket={row['bucket']} type={row['page_type']} code={row['status_code']}"
)
code_text = str(row["status_code"]) if row["status_code"] != "" else "-"
print(
f"[{processed_now}] "
f"{domain:30} -> {colorize_bucket(row['bucket']):18} | "
f"{row['page_type'][:28]:28} | "
f"{code_text:4} | "
f"{str(row['elapsed_ms'])[:6]:6} ms | "
f"{row['best_final_url'][:45]}"
)
log_writer.write_line(f"RUN END {now_utc()} processed_now={processed_now}")
all_writer.close()
hidup_writer.close()
live_ok_writer.close()
warning_writer.close()
offline_writer.close()
hidup_list_writer.close()
live_ok_list_writer.close()
warning_list_writer.close()
offline_list_writer.close()
log_writer.close()
# =========================
# WORKER
# =========================
def worker(domain, args, result_queue):
if STOP_EVENT.is_set():
return
try:
row = summarize_domain(domain, args)
except Exception as e:
row = fallback_error_row(domain, f"{type(e).__name__}: {e}")
result_queue.put(row)
if args.delay > 0:
time.sleep(args.delay)
# =========================
# FILE CLEANUP
# =========================
def remove_if_exists(path):
try:
if os.path.exists(path):
os.remove(path)
except Exception:
pass
def cleanup_output(outdir):
files = [
"all_results.csv",
"hidup_results.csv",
"live_ok_results.csv",
"warning_results.csv",
"offline_results.csv",
"hidup_domains.txt",
"live_ok_domains.txt",
"warning_domains.txt",
"offline_domains.txt",
"summary.txt",
"progress.log",
]
for f in files:
remove_if_exists(os.path.join(outdir, f))
# =========================
# CLI
# =========================
def parse_args():
parser = argparse.ArgumentParser(description="Fast CLI Domain Status Checker")
parser.add_argument("-i", "--input", required=True, help="File domain input (.txt)")
parser.add_argument("-o", "--output", default="results", help="Folder output")
parser.add_argument("-w", "--workers", type=int, default=DEFAULT_WORKERS, help="Jumlah worker paralel")
parser.add_argument("--connect-timeout", type=int, default=DEFAULT_CONNECT_TIMEOUT, help="Connect timeout")
parser.add_argument("--read-timeout", type=int, default=DEFAULT_READ_TIMEOUT, help="Read timeout")
parser.add_argument("--dns-timeout", type=int, default=DEFAULT_DNS_TIMEOUT, help="DNS timeout")
parser.add_argument("--max-redirects", type=int, default=DEFAULT_MAX_REDIRECTS, help="Maks redirect")
parser.add_argument("--max-bytes", type=int, default=DEFAULT_MAX_BYTES, help="Max body dibaca")
parser.add_argument("--retries", type=int, default=DEFAULT_RETRIES, help="Retry koneksi ringan")
parser.add_argument("--delay", type=float, default=0.0, help="Delay per domain")
parser.add_argument("--fresh", action="store_true", help="Hapus hasil lama")
parser.add_argument("--no-resume", action="store_true", help="Jangan skip hasil lama")
return parser.parse_args()
def main():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
args = parse_args()
os.makedirs(args.output, exist_ok=True)
if args.fresh:
cleanup_output(args.output)
all_csv = os.path.join(args.output, "all_results.csv")
domains = load_domains(args.input)
if not domains:
print("Tidak ada domain valid di file input.")
sys.exit(1)
processed = set()
if not args.no_resume:
processed = load_processed_domains(all_csv)
queue_domains = [d for d in domains if d not in processed]
print(f"{C_CYAN}Total input : {len(domains)}{C_RESET}")
print(f"{C_CYAN}Sudah diproses : {len(processed)}{C_RESET}")
print(f"{C_CYAN}Akan diproses : {len(queue_domains)}{C_RESET}")
print(f"{C_CYAN}Workers : {args.workers}{C_RESET}")
print(f"{C_CYAN}Output folder : {args.output}{C_RESET}")
print("-" * 110)
result_queue = Queue()
writer_thread = threading.Thread(
target=writer_loop,
args=(result_queue, args, len(domains), len(processed)),
daemon=True
)
writer_thread.start()
executor = ThreadPoolExecutor(max_workers=args.workers)
try:
futures = []
for domain in queue_domains:
if STOP_EVENT.is_set():
break
futures.append(executor.submit(worker, domain, args, result_queue))
for f in futures:
if STOP_EVENT.is_set():
break
try:
f.result()
except Exception:
pass
except KeyboardInterrupt:
STOP_EVENT.set()
print(f"\n{C_RED}Dihentikan paksa oleh user.{C_RESET}")
finally:
try:
executor.shutdown(wait=False, cancel_futures=True)
except TypeError:
executor.shutdown(wait=False)
# beri waktu singkat supaya worker yang sudah selesai sempat push queue
time.sleep(0.5)
result_queue.put(None)
writer_thread.join(timeout=10)
print("-" * 110)
print(f"{C_GREEN}Selesai / berhenti aman.{C_RESET}")
print(f"All CSV : {os.path.join(args.output, 'all_results.csv')}")
print(f"Hidup CSV : {os.path.join(args.output, 'hidup_results.csv')}")
print(f"Live OK CSV : {os.path.join(args.output, 'live_ok_results.csv')}")
print(f"Warning CSV : {os.path.join(args.output, 'warning_results.csv')}")
print(f"Offline CSV : {os.path.join(args.output, 'offline_results.csv')}")
print(f"Summary : {os.path.join(args.output, 'summary.txt')}")
if __name__ == "__main__":
main()