import socket from datetime import datetime import ipaddress import sys import subprocess import platform from concurrent.futures import ThreadPoolExecutor, as_completed import threading try: ip_range = input("enter ip range (e.g., 192.168.1.1-192.168.1.20): ").strip() port_range = input("port range (e.g., 1-65535) [default 1-10000]: ").strip() if port_range == "": port_start, port_end = 1, 10000 else: port_start, port_end = map(int, port_range.split('-')) ip_start_str, ip_end_str = ip_range.split('-') ip_start = ipaddress.IPv4Address(ip_start_str) ip_end = ipaddress.IPv4Address(ip_end_str) if ip_start > ip_end: raise ValueError("start ip > end ip") except Exception as e: print(f"[!] bad input: {e}") sys.exit(1) print("-"*60) print(f"scanning {ip_start} to {ip_end}") print(f"ports {port_start} to {port_end}") print("started at:", str(datetime.now())) print("-"*60) timeout = 0.5 osname = platform.system().lower() ping_cmd = ["ping", "-c", "1", "-W", "1"] if osname != "windows" else ["ping", "-n", "1", "-w", "1000"] shutdown_event = threading.Event() def ping_ip(ip): try: cmd = ping_cmd + [ip] result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return result.returncode == 0 except: return False def is_private_ip(ip): return ipaddress.IPv4Address(ip).is_private open_ports = [] open_ports_lock = threading.Lock() print_lock = threading.Lock() ip_status_lines = {} ip_status_lock = threading.Lock() def print_overwrite(ip, msg): with ip_status_lock: last = ip_status_lines.get(ip, "") clear_len = max(len(last), len(msg)) sys.stdout.write(f"\r[{ip}] {msg}" + " "*(clear_len - len(msg))) sys.stdout.flush() ip_status_lines[ip] = msg def print_open_port(ip, port, hostname): with print_lock: print(f"\n[{ip}] port {port} is OPEN ({hostname})") def scan_port(ip, port): print_overwrite(ip, f"scanning port {port}...") s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(timeout) try: result = s.connect_ex((ip, port)) if result == 0: try: hostname = socket.gethostbyaddr(ip)[0] except socket.herror: hostname = "unknown" with open_ports_lock: open_ports.append((ip, port, hostname)) print_open_port(ip, port, hostname) finally: s.close() def scan_ports_for_ip(ip): scan_it = True if is_private_ip(ip): with print_lock: print(f"\n[~] pinging private ip: {ip}") if not ping_ip(ip): with print_lock: print(f"[-] {ip} no ping reply. skipping.") scan_it = False else: with print_lock: print(f"\n[~] public ip {ip}, skipping ping") if not scan_it: return with ThreadPoolExecutor(max_workers=10) as p_exec: futures = [p_exec.submit(scan_port, ip, port) for port in range(port_start, port_end + 1)] for future in as_completed(futures): if shutdown_event.is_set(): break try: future.result() except Exception as e: with print_lock: print(f"[!] port scan err on {ip}: {e}") with ip_status_lock: sys.stdout.write(f"\r[{ip}] scan done.{' '*40}\n") sys.stdout.flush() ip_status_lines.pop(ip, None) def scan_ports_for_but_fast(ip): with ThreadPoolExecutor(max_workers=100) as p_exec: futures = [p_exec.submit(scan_port, ip, port) for port in range(port_start, port_end + 1)] for future in as_completed(futures): if shutdown_event.is_set(): break try: future.result() except Exception as e: with print_lock: print(f"[!] port scan err on {ip}: {e}") with ip_status_lock: sys.stdout.write(f"\r[{ip}] scan done.{' '*40}\n") sys.stdout.flush() ip_status_lines.pop(ip, None) try: ips = [str(ipaddress.IPv4Address(i)) for i in range(int(ip_start), int(ip_end)+1)] single = ip_start == ip_end if single: print(f"single ip: {ips[0]} — going full send (unless isp blocks u lol) :p") scan_ports_for_but_fast(ips[0]) else: with ThreadPoolExecutor(max_workers=3) as ip_exec: futs = {ip_exec.submit(scan_ports_for_ip, ip): ip for ip in ips} for fut in as_completed(futs): if shutdown_event.is_set(): break ip = futs[fut] try: fut.result() except Exception as e: with print_lock: print(f"[!] ip {ip} scan fail: {e}") except KeyboardInterrupt: shutdown_event.set() print("\n[!] ctrl+c caught. stopping...") except Exception as e: print(f"[!] error: {e}") print("-"*60) print("scan done at:", str(datetime.now())) print("-"*60) with open("summary.txt", "w") as f: if open_ports: f.write("[+] scan summary: open ports found\n") for ip, port, host in open_ports: f.write(f" - {ip} : port {port} open - hostname: {host}\n") else: f.write("no open ports found.\n") print("\n[+] scan summary written to summary.txt :p")