import socket from datetime import datetime import ipaddress import sys import subprocess import platform from concurrent.futures import ThreadPoolExecutor, as_completed import threading import os import queue log_queue = queue.Queue() def logger_thread(): while True: msg = log_queue.get() if msg is None: break with print_lock: sys.stdout.write("\n" + msg + "\n") sys.stdout.flush() try: ip_range = input("enter ip range (e.g., 192.168.1.1-192.168.1.20): ").strip() port_range = input("port range (e.g., 1-65535) [default 1-10000]: ").strip() if port_range == "": port_start, port_end = 1, 10000 else: port_start, port_end = map(int, port_range.split('-')) ip_start_str, ip_end_str = ip_range.split('-') ip_start = ipaddress.IPv4Address(ip_start_str) ip_end = ipaddress.IPv4Address(ip_end_str) if ip_start > ip_end: raise ValueError("start ip > end ip") except Exception as e: print(f"[!] bad input: {e}") sys.exit(1) print("-"*60) print(f"scanning {ip_start} to {ip_end}") print(f"ports {port_start} to {port_end}") print("started at:", str(datetime.now())) print("-"*60) timeout = 0.5 osname = platform.system().lower() ping_cmd = ["ping", "-c", "1", "-W", "1"] if osname != "windows" else ["ping", "-n", "1", "-w", "1000"] shutdown_event = threading.Event() ip_line_map = {} ip_status_lock = threading.Lock() print_lock = threading.Lock() def move_cursor_to_line(line_num): sys.stdout.write(f"\033[{line_num};0H") def clear_screen(): sys.stdout.write("\033[2J") sys.stdout.flush() def print_overwrite(ip, msg): with ip_status_lock: line = ip_line_map.get(ip) if line is not None: move_cursor_to_line(line) sys.stdout.write(f"\033[K[{ip}] {msg}") sys.stdout.flush() def print_open_port(ip, port, hostname): log_queue.put(f"[{ip}] port {port} is OPEN ({hostname})") def ping_ip(ip): try: cmd = ping_cmd + [ip] result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return result.returncode == 0 except: return False def is_private_ip(ip): return ipaddress.IPv4Address(ip).is_private alive_ips = [] def ping_all_ips(ips): with ThreadPoolExecutor(max_workers=100) as p_exec: futures = {p_exec.submit(ping_ip, ip): ip for ip in ips} for future in as_completed(futures): ip = futures[future] if future.result(): alive_ips.append(ip) open_ports = [] open_ports_lock = threading.Lock() def scan_port(ip, port): print_overwrite(ip, f"scanning port {port}...") s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(timeout) try: result = s.connect_ex((ip, port)) if result == 0: try: hostname = socket.gethostbyaddr(ip)[0] except socket.herror: hostname = "unknown" with open_ports_lock: open_ports.append((ip, port, hostname)) print_open_port(ip, port, hostname) finally: s.close() def scan_ports_for_ip(ip): with ThreadPoolExecutor(max_workers=10) as p_exec: futures = [p_exec.submit(scan_port, ip, port) for port in range(port_start, port_end + 1)] for future in as_completed(futures): if shutdown_event.is_set(): break try: future.result() except Exception as e: with print_lock: print(f"[!] port scan err on {ip}: {e}") print_overwrite(ip, "scan done.") def scan_ports_for_but_fast(ip): with ThreadPoolExecutor(max_workers=100) as p_exec: futures = [p_exec.submit(scan_port, ip, port) for port in range(port_start, port_end + 1)] for future in as_completed(futures): if shutdown_event.is_set(): break try: future.result() except Exception as e: with print_lock: print(f"[!] port scan err on {ip}: {e}") print_overwrite(ip, "scan done.") # --- main --- log_thread = threading.Thread(target=logger_thread, daemon=True) log_thread.start() try: all_ips = [str(ipaddress.IPv4Address(i)) for i in range(int(ip_start), int(ip_end)+1)] private_ips = [ip for ip in all_ips if is_private_ip(ip)] print("[~] pinging private IPs only...") alive_ips = [] ping_all_ips(private_ips) if not alive_ips: public_ips = [ip for ip in all_ips if not is_private_ip(ip)] if public_ips: print("[~] no private IPs alive, skipping ping on public IPs, assuming they're alive") alive_ips = public_ips else: print("[-] no IPs responded to ping. exiting.") sys.exit(0) if not alive_ips: print("[-] no IPs responded to ping. exiting.") sys.exit(0) clear_screen() for idx, ip in enumerate(alive_ips): ip_line_map[ip] = idx + 1 move_cursor_to_line(idx + 1) print(f"[{ip}] preparing...", end='') single = len(alive_ips) == 1 if single: print_overwrite(alive_ips[0], "single ip: going full send (unless isp blocks u lol) :p") scan_ports_for_but_fast(alive_ips[0]) else: with ThreadPoolExecutor(max_workers=3) as ip_exec: futs = {ip_exec.submit(scan_ports_for_ip, ip): ip for ip in alive_ips} for fut in as_completed(futs): if shutdown_event.is_set(): break ip = futs[fut] try: fut.result() except Exception as e: with print_lock: print(f"[!] ip {ip} scan fail: {e}") except KeyboardInterrupt: shutdown_event.set() print("\n[!] ctrl+c caught. stopping...") except Exception as e: print(f"[!] error: {e}") print("\n" + "-"*60) print("scan done at:", str(datetime.now())) print("-"*60) with open("summary.txt", "w") as f: if open_ports: f.write("[+] scan summary: open ports found\n") for ip, port, host in open_ports: f.write(f" - {ip} : port {port} open - hostname: {host}\n") else: f.write("no open ports found.\n") print("\n[+] scan summary written to summary.txt :p")