import argparse
import csv
import os
import re
import signal
import sys
import tempfile
import threading
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from queue import Queue
from urllib.parse import urlparse
import dns.resolver
STOP_EVENT = threading.Event()
DEFAULT_DNS_TIMEOUT = 2
DEFAULT_WORKERS = 50
FIELDNAMES = [
"checked_at",
"domain",
"match",
"bucket",
"page_type",
"dns_ok",
"dns_error",
"A",
"AAAA",
"CNAME",
"NS",
"notes",
]
C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_CYAN = "\033[96m"
def now_utc():
return datetime.now(timezone.utc).isoformat()
def normalize_domain(raw):
s = raw.strip()
if not s or s.startswith("#"):
return ""
s = s.split("#", 1)[0].strip()
if not s:
return ""
if "://" not in s:
s = "http://" + s
try:
p = urlparse(s)
host = p.netloc or p.path
host = host.split("/")[0].split(":")[0].strip().lower().strip(".")
if host.startswith("www."):
host = host[4:]
return host
except Exception:
return ""
def load_domains(input_file):
domains = []
seen = set()
with open(input_file, "r", encoding="utf-8") as f:
for line in f:
d = normalize_domain(line)
if d and d not in seen:
seen.add(d)
domains.append(d)
return domains
def load_processed_domains(csv_path):
processed = set()
if not os.path.exists(csv_path) or os.path.getsize(csv_path) == 0:
return processed
try:
with open(csv_path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
d = (row.get("domain") or "").strip().lower()
if d:
processed.add(d)
except Exception:
pass
return processed
def remove_if_exists(path):
try:
if os.path.exists(path):
os.remove(path)
except Exception:
pass
def atomic_write_text(path, text):
os.makedirs(os.path.dirname(path), exist_ok=True)
fd, tmp = tempfile.mkstemp(prefix=".tmp_", dir=os.path.dirname(path))
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(text)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
finally:
try:
if os.path.exists(tmp):
os.remove(tmp)
except Exception:
pass
class SafeCsvWriter:
def __init__(self, path, fieldnames):
self.path = path
os.makedirs(os.path.dirname(path), exist_ok=True)
file_exists = os.path.exists(path) and os.path.getsize(path) > 0
self.f = open(path, "a", newline="", encoding="utf-8", buffering=1)
self.writer = csv.DictWriter(self.f, fieldnames=fieldnames)
if not file_exists:
self.writer.writeheader()
self.f.flush()
os.fsync(self.f.fileno())
def writerow(self, row):
self.writer.writerow(row)
self.f.flush()
os.fsync(self.f.fileno())
def close(self):
try:
self.f.close()
except Exception:
pass
class SafeLineWriter:
def __init__(self, path):
self.f = open(path, "a", encoding="utf-8", buffering=1)
def write_line(self, text):
self.f.write(text.rstrip("\n") + "\n")
self.f.flush()
os.fsync(self.f.fileno())
def close(self):
try:
self.f.close()
except Exception:
pass
def signal_handler(signum, frame):
if not STOP_EVENT.is_set():
STOP_EVENT.set()
print(f"\n{C_YELLOW}Signal diterima. Stop submit job baru...{C_RESET}")
else:
raise KeyboardInterrupt
def get_dns_info(domain, dns_timeout=2):
result = {
"dns_ok": False,
"dns_error": "",
"A": [],
"AAAA": [],
"CNAME": [],
"NS": [],
}
resolver = dns.resolver.Resolver()
resolver.timeout = dns_timeout
resolver.lifetime = dns_timeout
# Fokus utama: A, AAAA, CNAME
for rtype in ["A", "AAAA", "CNAME"]:
try:
answers = resolver.resolve(domain, rtype)
vals = []
for r in answers:
if hasattr(r, "target"):
vals.append(str(r.target).rstrip("."))
else:
vals.append(str(r).rstrip("."))
result[rtype] = vals
except dns.resolver.NXDOMAIN:
result["dns_error"] = "NXDOMAIN"
# kalau NXDOMAIN, biasanya NS tidak relevan
return result
except (dns.resolver.NoAnswer, dns.resolver.NoNameservers, dns.resolver.LifetimeTimeout):
pass
except Exception as e:
if not result["dns_error"]:
result["dns_error"] = type(e).__name__
if any(result[k] for k in ["A", "AAAA", "CNAME"]):
result["dns_ok"] = True
elif not result["dns_error"]:
result["dns_error"] = "NO_RECORDS"
# Optional info NS: tetap dicoba agar tahu nameserver kalau ada
try:
answers = resolver.resolve(domain, "NS")
result["NS"] = [str(x).rstrip(".") for x in answers]
except Exception:
pass
return result
def classify_row(domain, dns_info):
# Exact target:
# bucket = OFFLINE
# page_type = CONNECTION FAILED
# dns_ok = False
# dns_error = NO_RECORDS
if dns_info["dns_ok"] is False and dns_info["dns_error"] == "NO_RECORDS":
notes = []
if dns_info["NS"]:
notes.append("NS ada, tapi tidak ada A/AAAA/CNAME")
else:
notes.append("Tidak ada A/AAAA/CNAME")
return {
"checked_at": now_utc(),
"domain": domain,
"match": "YES",
"bucket": "OFFLINE",
"page_type": "CONNECTION FAILED",
"dns_ok": False,
"dns_error": "NO_RECORDS",
"A": ", ".join(dns_info["A"]),
"AAAA": ", ".join(dns_info["AAAA"]),
"CNAME": ", ".join(dns_info["CNAME"]),
"NS": ", ".join(dns_info["NS"]),
"notes": "; ".join(notes),
}
# selain itu = tidak match exact filter
if dns_info["dns_error"] == "NXDOMAIN":
bucket = "OFFLINE"
page_type = "NXDOMAIN"
notes = "Domain tidak ada / tidak terdaftar / tidak resolve"
elif dns_info["dns_ok"]:
bucket = "HAS_DNS"
page_type = "HAS_DNS_RECORD"
notes = "Ada record DNS web"
else:
bucket = "OTHER"
page_type = "DNS_NOT_MATCH"
notes = "Bukan NO_RECORDS exact match"
return {
"checked_at": now_utc(),
"domain": domain,
"match": "NO",
"bucket": bucket,
"page_type": page_type,
"dns_ok": dns_info["dns_ok"],
"dns_error": dns_info["dns_error"],
"A": ", ".join(dns_info["A"]),
"AAAA": ", ".join(dns_info["AAAA"]),
"CNAME": ", ".join(dns_info["CNAME"]),
"NS": ", ".join(dns_info["NS"]),
"notes": notes,
}
def check_domain(domain, dns_timeout):
dns_info = get_dns_info(domain, dns_timeout=dns_timeout)
return classify_row(domain, dns_info)
def writer_loop(result_queue, output_dir, total_input, skipped_resume):
os.makedirs(output_dir, exist_ok=True)
all_csv = os.path.join(output_dir, "all_checked.csv")
matched_csv = os.path.join(output_dir, "matched_dead_norecords.csv")
others_csv = os.path.join(output_dir, "others.csv")
matched_txt = os.path.join(output_dir, "matched_dead_norecords.txt")
progress_log = os.path.join(output_dir, "progress.log")
summary_txt = os.path.join(output_dir, "summary.txt")
all_writer = SafeCsvWriter(all_csv, FIELDNAMES)
matched_writer = SafeCsvWriter(matched_csv, FIELDNAMES)
others_writer = SafeCsvWriter(others_csv, FIELDNAMES)
matched_txt_writer = SafeLineWriter(matched_txt)
log_writer = SafeLineWriter(progress_log)
counts_match = Counter()
counts_type = Counter()
processed_now = 0
def write_summary():
lines = []
lines.append("FIND DEAD NO_RECORDS SUMMARY")
lines.append("=" * 40)
lines.append(f"generated_at : {now_utc()}")
lines.append(f"total_input : {total_input}")
lines.append(f"skipped_resume : {skipped_resume}")
lines.append(f"processed_now : {processed_now}")
lines.append(f"remaining_est : {max(total_input - skipped_resume - processed_now, 0)}")
lines.append("")
lines.append("MATCH COUNTS")
lines.append("-" * 40)
lines.append(f"MATCH YES : {counts_match.get('YES', 0)}")
lines.append(f"MATCH NO : {counts_match.get('NO', 0)}")
lines.append("")
lines.append("PAGE TYPE COUNTS")
lines.append("-" * 40)
for k, v in counts_type.most_common():
lines.append(f"{k:25}: {v}")
atomic_write_text(summary_txt, "\n".join(lines) + "\n")
log_writer.write_line(f"RUN START {now_utc()} total_input={total_input} skipped_resume={skipped_resume}")
write_summary()
while True:
item = result_queue.get()
if item is None:
break
row = item
domain = row["domain"]
all_writer.writerow(row)
if row["match"] == "YES":
matched_writer.writerow(row)
matched_txt_writer.write_line(domain)
else:
others_writer.writerow(row)
counts_match[row["match"]] += 1
counts_type[row["page_type"]] += 1
processed_now += 1
write_summary()
log_writer.write_line(
f"DONE {now_utc()} {domain} match={row['match']} dns_error={row['dns_error']} ns={row['NS']}"
)
color = C_GREEN if row["match"] == "YES" else C_RED
print(
f"[{processed_now}] "
f"{domain:30} -> {color}{row['match']}{C_RESET} | "
f"{row['page_type'][:22]:22} | "
f"dns_ok={str(row['dns_ok']):5} | "
f"dns_error={row['dns_error'][:15]:15} | "
f"NS={row['NS'][:35]}"
)
write_summary()
log_writer.write_line(f"RUN END {now_utc()} processed_now={processed_now}")
all_writer.close()
matched_writer.close()
others_writer.close()
matched_txt_writer.close()
log_writer.close()
def worker(domain, dns_timeout, result_queue):
if STOP_EVENT.is_set():
return
try:
row = check_domain(domain, dns_timeout)
except Exception as e:
row = {
"checked_at": now_utc(),
"domain": domain,
"match": "NO",
"bucket": "OTHER",
"page_type": "SCRIPT ERROR",
"dns_ok": "",
"dns_error": type(e).__name__,
"A": "",
"AAAA": "",
"CNAME": "",
"NS": "",
"notes": str(e),
}
result_queue.put(row)
def parse_args():
parser = argparse.ArgumentParser(
description="Cari domain exact match: OFFLINE + CONNECTION FAILED + dns_ok=False + dns_error=NO_RECORDS"
)
parser.add_argument("-i", "--input", required=True, help="File domain input")
parser.add_argument("-o", "--output", default="dead_results", help="Folder output")
parser.add_argument("-w", "--workers", type=int, default=DEFAULT_WORKERS, help="Jumlah worker")
parser.add_argument("--dns-timeout", type=int, default=DEFAULT_DNS_TIMEOUT, help="DNS timeout")
parser.add_argument("--fresh", action="store_true", help="Hapus hasil lama")
parser.add_argument("--no-resume", action="store_true", help="Jangan resume")
return parser.parse_args()
def main():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
args = parse_args()
os.makedirs(args.output, exist_ok=True)
all_csv = os.path.join(args.output, "all_checked.csv")
if args.fresh:
for f in [
"all_checked.csv",
"matched_dead_norecords.csv",
"others.csv",
"matched_dead_norecords.txt",
"progress.log",
"summary.txt",
]:
remove_if_exists(os.path.join(args.output, f))
domains = load_domains(args.input)
if not domains:
print("Tidak ada domain valid di file input.")
sys.exit(1)
processed = set()
if not args.no_resume:
processed = load_processed_domains(all_csv)
queue_domains = [d for d in domains if d not in processed]
print(f"{C_CYAN}Total input : {len(domains)}{C_RESET}")
print(f"{C_CYAN}Sudah diproses : {len(processed)}{C_RESET}")
print(f"{C_CYAN}Akan diproses : {len(queue_domains)}{C_RESET}")
print(f"{C_CYAN}Workers : {args.workers}{C_RESET}")
print(f"{C_CYAN}DNS timeout : {args.dns_timeout}{C_RESET}")
print(f"{C_CYAN}Output folder : {args.output}{C_RESET}")
print("-" * 110)
result_queue = Queue()
writer_thread = threading.Thread(
target=writer_loop,
args=(result_queue, args.output, len(domains), len(processed)),
daemon=True
)
writer_thread.start()
executor = ThreadPoolExecutor(max_workers=args.workers)
try:
futures = []
for domain in queue_domains:
if STOP_EVENT.is_set():
break
futures.append(executor.submit(worker, domain, args.dns_timeout, result_queue))
for f in futures:
if STOP_EVENT.is_set():
break
try:
f.result()
except Exception:
pass
except KeyboardInterrupt:
STOP_EVENT.set()
print(f"\n{C_RED}Dihentikan user.{C_RESET}")
finally:
try:
executor.shutdown(wait=False, cancel_futures=True)
except TypeError:
executor.shutdown(wait=False)
result_queue.put(None)
writer_thread.join(timeout=10)
print("-" * 110)
print(f"{C_GREEN}Selesai / berhenti aman.{C_RESET}")
print(f"All checked : {os.path.join(args.output, 'all_checked.csv')}")
print(f"Matched CSV : {os.path.join(args.output, 'matched_dead_norecords.csv')}")
print(f"Matched TXT : {os.path.join(args.output, 'matched_dead_norecords.txt')}")
print(f"Others CSV : {os.path.join(args.output, 'others.csv')}")
print(f"Summary : {os.path.join(args.output, 'summary.txt')}")
if __name__ == "__main__":
main()