From 234468cec946662c92dc373cb68da6c9b6a484cb Mon Sep 17 00:00:00 2001 From: WhatDidYouExpect <89535984+WhatDidYouExpect@users.noreply.github.com> Date: Sat, 26 Jul 2025 01:22:08 +0200 Subject: [PATCH] lifes a cruel cruel mistress when your ISP cockblocks you --- main.py | 126 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 82 insertions(+), 44 deletions(-) diff --git a/main.py b/main.py index 721984c..be49ae5 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,19 @@ import subprocess import platform from concurrent.futures import ThreadPoolExecutor, as_completed import threading +import os +import queue + +log_queue = queue.Queue() + +def logger_thread(): + while True: + msg = log_queue.get() + if msg is None: + break + with print_lock: + sys.stdout.write("\n" + msg + "\n") + sys.stdout.flush() try: ip_range = input("enter ip range (e.g., 192.168.1.1-192.168.1.20): ").strip() @@ -34,6 +47,28 @@ osname = platform.system().lower() ping_cmd = ["ping", "-c", "1", "-W", "1"] if osname != "windows" else ["ping", "-n", "1", "-w", "1000"] shutdown_event = threading.Event() +ip_line_map = {} +ip_status_lock = threading.Lock() +print_lock = threading.Lock() + +def move_cursor_to_line(line_num): + sys.stdout.write(f"\033[{line_num};0H") + +def clear_screen(): + sys.stdout.write("\033[2J") + sys.stdout.flush() + +def print_overwrite(ip, msg): + with ip_status_lock: + line = ip_line_map.get(ip) + if line is not None: + move_cursor_to_line(line) + sys.stdout.write(f"\033[K[{ip}] {msg}") + sys.stdout.flush() + +def print_open_port(ip, port, hostname): + log_queue.put(f"[{ip}] port {port} is OPEN ({hostname})") + def ping_ip(ip): try: cmd = ping_cmd + [ip] @@ -45,23 +80,18 @@ def ping_ip(ip): def is_private_ip(ip): return ipaddress.IPv4Address(ip).is_private +alive_ips = [] + +def ping_all_ips(ips): + with ThreadPoolExecutor(max_workers=100) as p_exec: + futures = {p_exec.submit(ping_ip, ip): ip for ip in ips} + for future in as_completed(futures): + ip = futures[future] + if future.result(): + alive_ips.append(ip) + open_ports = [] open_ports_lock = threading.Lock() -print_lock = threading.Lock() -ip_status_lines = {} -ip_status_lock = threading.Lock() - -def print_overwrite(ip, msg): - with ip_status_lock: - last = ip_status_lines.get(ip, "") - clear_len = max(len(last), len(msg)) - sys.stdout.write(f"\r[{ip}] {msg}" + " "*(clear_len - len(msg))) - sys.stdout.flush() - ip_status_lines[ip] = msg - -def print_open_port(ip, port, hostname): - with print_lock: - print(f"\n[{ip}] port {port} is OPEN ({hostname})") def scan_port(ip, port): print_overwrite(ip, f"scanning port {port}...") @@ -81,21 +111,6 @@ def scan_port(ip, port): s.close() def scan_ports_for_ip(ip): - scan_it = True - if is_private_ip(ip): - with print_lock: - print(f"\n[~] pinging private ip: {ip}") - if not ping_ip(ip): - with print_lock: - print(f"[-] {ip} no ping reply. skipping.") - scan_it = False - else: - with print_lock: - print(f"\n[~] public ip {ip}, skipping ping") - - if not scan_it: - return - with ThreadPoolExecutor(max_workers=10) as p_exec: futures = [p_exec.submit(scan_port, ip, port) for port in range(port_start, port_end + 1)] for future in as_completed(futures): @@ -106,10 +121,7 @@ def scan_ports_for_ip(ip): except Exception as e: with print_lock: print(f"[!] port scan err on {ip}: {e}") - with ip_status_lock: - sys.stdout.write(f"\r[{ip}] scan done.{' '*40}\n") - sys.stdout.flush() - ip_status_lines.pop(ip, None) + print_overwrite(ip, "scan done.") def scan_ports_for_but_fast(ip): with ThreadPoolExecutor(max_workers=100) as p_exec: @@ -122,20 +134,46 @@ def scan_ports_for_but_fast(ip): except Exception as e: with print_lock: print(f"[!] port scan err on {ip}: {e}") - with ip_status_lock: - sys.stdout.write(f"\r[{ip}] scan done.{' '*40}\n") - sys.stdout.flush() - ip_status_lines.pop(ip, None) + print_overwrite(ip, "scan done.") + +# --- main --- +log_thread = threading.Thread(target=logger_thread, daemon=True) +log_thread.start() try: - ips = [str(ipaddress.IPv4Address(i)) for i in range(int(ip_start), int(ip_end)+1)] - single = ip_start == ip_end + all_ips = [str(ipaddress.IPv4Address(i)) for i in range(int(ip_start), int(ip_end)+1)] + private_ips = [ip for ip in all_ips if is_private_ip(ip)] + + print("[~] pinging private IPs only...") + alive_ips = [] + ping_all_ips(private_ips) + + if not alive_ips: + public_ips = [ip for ip in all_ips if not is_private_ip(ip)] + if public_ips: + print("[~] no private IPs alive, skipping ping on public IPs, assuming they're alive") + alive_ips = public_ips + else: + print("[-] no IPs responded to ping. exiting.") + sys.exit(0) + + if not alive_ips: + print("[-] no IPs responded to ping. exiting.") + sys.exit(0) + + clear_screen() + for idx, ip in enumerate(alive_ips): + ip_line_map[ip] = idx + 1 + move_cursor_to_line(idx + 1) + print(f"[{ip}] preparing...", end='') + + single = len(alive_ips) == 1 if single: - print(f"single ip: {ips[0]} — going full send (unless isp blocks u lol) :p") - scan_ports_for_but_fast(ips[0]) + print_overwrite(alive_ips[0], "single ip: going full send (unless isp blocks u lol) :p") + scan_ports_for_but_fast(alive_ips[0]) else: with ThreadPoolExecutor(max_workers=3) as ip_exec: - futs = {ip_exec.submit(scan_ports_for_ip, ip): ip for ip in ips} + futs = {ip_exec.submit(scan_ports_for_ip, ip): ip for ip in alive_ips} for fut in as_completed(futs): if shutdown_event.is_set(): break @@ -151,7 +189,7 @@ except KeyboardInterrupt: except Exception as e: print(f"[!] error: {e}") -print("-"*60) +print("\n" + "-"*60) print("scan done at:", str(datetime.now())) print("-"*60)