Python Uncategorized

Find Dead Norecords

0
Please log in or register to do it.

nano find_dead_norecords.py

import argparse
import csv
import os
import re
import signal
import sys
import tempfile
import threading
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from queue import Queue
from urllib.parse import urlparse

import dns.resolver

STOP_EVENT = threading.Event()

DEFAULT_DNS_TIMEOUT = 2
DEFAULT_WORKERS = 50

FIELDNAMES = [
    "checked_at",
    "domain",
    "match",
    "bucket",
    "page_type",
    "dns_ok",
    "dns_error",
    "A",
    "AAAA",
    "CNAME",
    "NS",
    "notes",
]

C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_CYAN = "\033[96m"


def now_utc():
    return datetime.now(timezone.utc).isoformat()


def normalize_domain(raw):
    s = raw.strip()
    if not s or s.startswith("#"):
        return ""

    s = s.split("#", 1)[0].strip()
    if not s:
        return ""

    if "://" not in s:
        s = "http://" + s

    try:
        p = urlparse(s)
        host = p.netloc or p.path
        host = host.split("/")[0].split(":")[0].strip().lower().strip(".")
        if host.startswith("www."):
            host = host[4:]
        return host
    except Exception:
        return ""


def load_domains(input_file):
    domains = []
    seen = set()

    with open(input_file, "r", encoding="utf-8") as f:
        for line in f:
            d = normalize_domain(line)
            if d and d not in seen:
                seen.add(d)
                domains.append(d)
    return domains


def load_processed_domains(csv_path):
    processed = set()
    if not os.path.exists(csv_path) or os.path.getsize(csv_path) == 0:
        return processed

    try:
        with open(csv_path, "r", encoding="utf-8", newline="") as f:
            reader = csv.DictReader(f)
            for row in reader:
                d = (row.get("domain") or "").strip().lower()
                if d:
                    processed.add(d)
    except Exception:
        pass

    return processed


def remove_if_exists(path):
    try:
        if os.path.exists(path):
            os.remove(path)
    except Exception:
        pass


def atomic_write_text(path, text):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    fd, tmp = tempfile.mkstemp(prefix=".tmp_", dir=os.path.dirname(path))
    try:
        with os.fdopen(fd, "w", encoding="utf-8") as f:
            f.write(text)
            f.flush()
            os.fsync(f.fileno())
        os.replace(tmp, path)
    finally:
        try:
            if os.path.exists(tmp):
                os.remove(tmp)
        except Exception:
            pass


class SafeCsvWriter:
    def __init__(self, path, fieldnames):
        self.path = path
        os.makedirs(os.path.dirname(path), exist_ok=True)
        file_exists = os.path.exists(path) and os.path.getsize(path) > 0
        self.f = open(path, "a", newline="", encoding="utf-8", buffering=1)
        self.writer = csv.DictWriter(self.f, fieldnames=fieldnames)

        if not file_exists:
            self.writer.writeheader()
            self.f.flush()
            os.fsync(self.f.fileno())

    def writerow(self, row):
        self.writer.writerow(row)
        self.f.flush()
        os.fsync(self.f.fileno())

    def close(self):
        try:
            self.f.close()
        except Exception:
            pass


class SafeLineWriter:
    def __init__(self, path):
        self.f = open(path, "a", encoding="utf-8", buffering=1)

    def write_line(self, text):
        self.f.write(text.rstrip("\n") + "\n")
        self.f.flush()
        os.fsync(self.f.fileno())

    def close(self):
        try:
            self.f.close()
        except Exception:
            pass


def signal_handler(signum, frame):
    if not STOP_EVENT.is_set():
        STOP_EVENT.set()
        print(f"\n{C_YELLOW}Signal diterima. Stop submit job baru...{C_RESET}")
    else:
        raise KeyboardInterrupt


def get_dns_info(domain, dns_timeout=2):
    result = {
        "dns_ok": False,
        "dns_error": "",
        "A": [],
        "AAAA": [],
        "CNAME": [],
        "NS": [],
    }

    resolver = dns.resolver.Resolver()
    resolver.timeout = dns_timeout
    resolver.lifetime = dns_timeout

    # Fokus utama: A, AAAA, CNAME
    for rtype in ["A", "AAAA", "CNAME"]:
        try:
            answers = resolver.resolve(domain, rtype)
            vals = []
            for r in answers:
                if hasattr(r, "target"):
                    vals.append(str(r.target).rstrip("."))
                else:
                    vals.append(str(r).rstrip("."))
            result[rtype] = vals
        except dns.resolver.NXDOMAIN:
            result["dns_error"] = "NXDOMAIN"
            # kalau NXDOMAIN, biasanya NS tidak relevan
            return result
        except (dns.resolver.NoAnswer, dns.resolver.NoNameservers, dns.resolver.LifetimeTimeout):
            pass
        except Exception as e:
            if not result["dns_error"]:
                result["dns_error"] = type(e).__name__

    if any(result[k] for k in ["A", "AAAA", "CNAME"]):
        result["dns_ok"] = True
    elif not result["dns_error"]:
        result["dns_error"] = "NO_RECORDS"

    # Optional info NS: tetap dicoba agar tahu nameserver kalau ada
    try:
        answers = resolver.resolve(domain, "NS")
        result["NS"] = [str(x).rstrip(".") for x in answers]
    except Exception:
        pass

    return result


def classify_row(domain, dns_info):
    # Exact target:
    # bucket = OFFLINE
    # page_type = CONNECTION FAILED
    # dns_ok = False
    # dns_error = NO_RECORDS

    if dns_info["dns_ok"] is False and dns_info["dns_error"] == "NO_RECORDS":
        notes = []
        if dns_info["NS"]:
            notes.append("NS ada, tapi tidak ada A/AAAA/CNAME")
        else:
            notes.append("Tidak ada A/AAAA/CNAME")
        return {
            "checked_at": now_utc(),
            "domain": domain,
            "match": "YES",
            "bucket": "OFFLINE",
            "page_type": "CONNECTION FAILED",
            "dns_ok": False,
            "dns_error": "NO_RECORDS",
            "A": ", ".join(dns_info["A"]),
            "AAAA": ", ".join(dns_info["AAAA"]),
            "CNAME": ", ".join(dns_info["CNAME"]),
            "NS": ", ".join(dns_info["NS"]),
            "notes": "; ".join(notes),
        }

    # selain itu = tidak match exact filter
    if dns_info["dns_error"] == "NXDOMAIN":
        bucket = "OFFLINE"
        page_type = "NXDOMAIN"
        notes = "Domain tidak ada / tidak terdaftar / tidak resolve"
    elif dns_info["dns_ok"]:
        bucket = "HAS_DNS"
        page_type = "HAS_DNS_RECORD"
        notes = "Ada record DNS web"
    else:
        bucket = "OTHER"
        page_type = "DNS_NOT_MATCH"
        notes = "Bukan NO_RECORDS exact match"

    return {
        "checked_at": now_utc(),
        "domain": domain,
        "match": "NO",
        "bucket": bucket,
        "page_type": page_type,
        "dns_ok": dns_info["dns_ok"],
        "dns_error": dns_info["dns_error"],
        "A": ", ".join(dns_info["A"]),
        "AAAA": ", ".join(dns_info["AAAA"]),
        "CNAME": ", ".join(dns_info["CNAME"]),
        "NS": ", ".join(dns_info["NS"]),
        "notes": notes,
    }


def check_domain(domain, dns_timeout):
    dns_info = get_dns_info(domain, dns_timeout=dns_timeout)
    return classify_row(domain, dns_info)


def writer_loop(result_queue, output_dir, total_input, skipped_resume):
    os.makedirs(output_dir, exist_ok=True)

    all_csv = os.path.join(output_dir, "all_checked.csv")
    matched_csv = os.path.join(output_dir, "matched_dead_norecords.csv")
    others_csv = os.path.join(output_dir, "others.csv")
    matched_txt = os.path.join(output_dir, "matched_dead_norecords.txt")
    progress_log = os.path.join(output_dir, "progress.log")
    summary_txt = os.path.join(output_dir, "summary.txt")

    all_writer = SafeCsvWriter(all_csv, FIELDNAMES)
    matched_writer = SafeCsvWriter(matched_csv, FIELDNAMES)
    others_writer = SafeCsvWriter(others_csv, FIELDNAMES)
    matched_txt_writer = SafeLineWriter(matched_txt)
    log_writer = SafeLineWriter(progress_log)

    counts_match = Counter()
    counts_type = Counter()
    processed_now = 0

    def write_summary():
        lines = []
        lines.append("FIND DEAD NO_RECORDS SUMMARY")
        lines.append("=" * 40)
        lines.append(f"generated_at   : {now_utc()}")
        lines.append(f"total_input    : {total_input}")
        lines.append(f"skipped_resume : {skipped_resume}")
        lines.append(f"processed_now  : {processed_now}")
        lines.append(f"remaining_est  : {max(total_input - skipped_resume - processed_now, 0)}")
        lines.append("")
        lines.append("MATCH COUNTS")
        lines.append("-" * 40)
        lines.append(f"MATCH YES : {counts_match.get('YES', 0)}")
        lines.append(f"MATCH NO  : {counts_match.get('NO', 0)}")
        lines.append("")
        lines.append("PAGE TYPE COUNTS")
        lines.append("-" * 40)
        for k, v in counts_type.most_common():
            lines.append(f"{k:25}: {v}")
        atomic_write_text(summary_txt, "\n".join(lines) + "\n")

    log_writer.write_line(f"RUN START {now_utc()} total_input={total_input} skipped_resume={skipped_resume}")
    write_summary()

    while True:
        item = result_queue.get()
        if item is None:
            break

        row = item
        domain = row["domain"]

        all_writer.writerow(row)

        if row["match"] == "YES":
            matched_writer.writerow(row)
            matched_txt_writer.write_line(domain)
        else:
            others_writer.writerow(row)

        counts_match[row["match"]] += 1
        counts_type[row["page_type"]] += 1
        processed_now += 1

        write_summary()
        log_writer.write_line(
            f"DONE {now_utc()} {domain} match={row['match']} dns_error={row['dns_error']} ns={row['NS']}"
        )

        color = C_GREEN if row["match"] == "YES" else C_RED
        print(
            f"[{processed_now}] "
            f"{domain:30} -> {color}{row['match']}{C_RESET} | "
            f"{row['page_type'][:22]:22} | "
            f"dns_ok={str(row['dns_ok']):5} | "
            f"dns_error={row['dns_error'][:15]:15} | "
            f"NS={row['NS'][:35]}"
        )

    write_summary()
    log_writer.write_line(f"RUN END {now_utc()} processed_now={processed_now}")

    all_writer.close()
    matched_writer.close()
    others_writer.close()
    matched_txt_writer.close()
    log_writer.close()


def worker(domain, dns_timeout, result_queue):
    if STOP_EVENT.is_set():
        return
    try:
        row = check_domain(domain, dns_timeout)
    except Exception as e:
        row = {
            "checked_at": now_utc(),
            "domain": domain,
            "match": "NO",
            "bucket": "OTHER",
            "page_type": "SCRIPT ERROR",
            "dns_ok": "",
            "dns_error": type(e).__name__,
            "A": "",
            "AAAA": "",
            "CNAME": "",
            "NS": "",
            "notes": str(e),
        }
    result_queue.put(row)


def parse_args():
    parser = argparse.ArgumentParser(
        description="Cari domain exact match: OFFLINE + CONNECTION FAILED + dns_ok=False + dns_error=NO_RECORDS"
    )
    parser.add_argument("-i", "--input", required=True, help="File domain input")
    parser.add_argument("-o", "--output", default="dead_results", help="Folder output")
    parser.add_argument("-w", "--workers", type=int, default=DEFAULT_WORKERS, help="Jumlah worker")
    parser.add_argument("--dns-timeout", type=int, default=DEFAULT_DNS_TIMEOUT, help="DNS timeout")
    parser.add_argument("--fresh", action="store_true", help="Hapus hasil lama")
    parser.add_argument("--no-resume", action="store_true", help="Jangan resume")
    return parser.parse_args()


def main():
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    args = parse_args()
    os.makedirs(args.output, exist_ok=True)

    all_csv = os.path.join(args.output, "all_checked.csv")

    if args.fresh:
        for f in [
            "all_checked.csv",
            "matched_dead_norecords.csv",
            "others.csv",
            "matched_dead_norecords.txt",
            "progress.log",
            "summary.txt",
        ]:
            remove_if_exists(os.path.join(args.output, f))

    domains = load_domains(args.input)
    if not domains:
        print("Tidak ada domain valid di file input.")
        sys.exit(1)

    processed = set()
    if not args.no_resume:
        processed = load_processed_domains(all_csv)

    queue_domains = [d for d in domains if d not in processed]

    print(f"{C_CYAN}Total input    : {len(domains)}{C_RESET}")
    print(f"{C_CYAN}Sudah diproses : {len(processed)}{C_RESET}")
    print(f"{C_CYAN}Akan diproses  : {len(queue_domains)}{C_RESET}")
    print(f"{C_CYAN}Workers        : {args.workers}{C_RESET}")
    print(f"{C_CYAN}DNS timeout    : {args.dns_timeout}{C_RESET}")
    print(f"{C_CYAN}Output folder  : {args.output}{C_RESET}")
    print("-" * 110)

    result_queue = Queue()
    writer_thread = threading.Thread(
        target=writer_loop,
        args=(result_queue, args.output, len(domains), len(processed)),
        daemon=True
    )
    writer_thread.start()

    executor = ThreadPoolExecutor(max_workers=args.workers)

    try:
        futures = []
        for domain in queue_domains:
            if STOP_EVENT.is_set():
                break
            futures.append(executor.submit(worker, domain, args.dns_timeout, result_queue))

        for f in futures:
            if STOP_EVENT.is_set():
                break
            try:
                f.result()
            except Exception:
                pass

    except KeyboardInterrupt:
        STOP_EVENT.set()
        print(f"\n{C_RED}Dihentikan user.{C_RESET}")

    finally:
        try:
            executor.shutdown(wait=False, cancel_futures=True)
        except TypeError:
            executor.shutdown(wait=False)

        result_queue.put(None)
        writer_thread.join(timeout=10)

        print("-" * 110)
        print(f"{C_GREEN}Selesai / berhenti aman.{C_RESET}")
        print(f"All checked : {os.path.join(args.output, 'all_checked.csv')}")
        print(f"Matched CSV : {os.path.join(args.output, 'matched_dead_norecords.csv')}")
        print(f"Matched TXT : {os.path.join(args.output, 'matched_dead_norecords.txt')}")
        print(f"Others CSV  : {os.path.join(args.output, 'others.csv')}")
        print(f"Summary     : {os.path.join(args.output, 'summary.txt')}")


if __name__ == "__main__":
    main()
domain_status_fast
Find Norecords Enriched

Your email address will not be published. Required fields are marked *