import socket from datetime import datetime import ipaddress import sys import subprocess import platform from concurrent.futures import ThreadPoolExecutor, as_completed import threading import queue import json log_queue = queue.Queue() print_lock = threading.Lock() ip_status_lock = threading.Lock() shutdown_event = threading.Event() ip_line_map = {} open_ports = [] open_ports_lock = threading.Lock() open_port_msgs = [] open_port_msgs_lock = threading.Lock() bottom_log_start_line = 0 def clear_screen(): sys.stdout.write("\033[2J") sys.stdout.flush() def move_cursor(row, col=0): sys.stdout.write(f"\033[{row};{col}H") def clear_line(): sys.stdout.write("\033[K") def print_status_line(ip, msg): with ip_status_lock: line = ip_line_map.get(ip) if line: move_cursor(line) clear_line() sys.stdout.write(f"[{ip}] {msg}") sys.stdout.flush() def print_open_port_line(msg): with open_port_msgs_lock: open_port_msgs.append(msg) bottom_line = len(ip_line_map) + 2 + len(open_port_msgs) move_cursor(bottom_line) clear_line() sys.stdout.write(msg + "\n") sys.stdout.flush() def ping_ip(ip, ping_cmd): try: cmd = ping_cmd + [ip] result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=1) return result.returncode == 0 except subprocess.TimeoutExpired: return False except: return False def is_private_ip(ip): return ipaddress.IPv4Address(ip).is_private def grab_banner(ip, port): try: s = socket.socket() s.settimeout(1) s.connect((ip, port)) if port == 22: banner = s.recv(1024).decode(errors='ignore').strip() return banner elif port == 5900: banner = s.recv(12).decode(errors='ignore').strip() return banner else: return None except: return None finally: s.close() def identify_service_and_os(ip, port): json_path = "services.json" try: with open(json_path, "r") as f: data = json.load(f) services = {int(k): v for k, v in data.items()} except Exception: services = {} service_info = services.get(port) fingerprint = None if service_info: banner = grab_banner(ip, port) if banner: fingerprint = f"{service_info['name']} banner: {banner}" else: fingerprint = f"{service_info['name']}" return fingerprint def scan_port(ip, port): print_status_line(ip, f"scanning port {port}...") s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(0.5) try: result = s.connect_ex((ip, port)) if result == 0: try: hostname = socket.gethostbyaddr(ip)[0] except socket.herror: hostname = "unknown hostname" fingerprint = identify_service_and_os(ip, port) with open_ports_lock: open_ports.append((ip, port, hostname, fingerprint)) msg = f"[{ip}] port {port} is OPEN ({hostname})" if fingerprint: msg += f" | {fingerprint}" print_open_port_line(msg) print_status_line(ip, f"open port found: {port}") finally: s.close() def scan_ports_for_ip(ip, port_start, port_end): with ThreadPoolExecutor(max_workers=10) as p_exec: futures = [p_exec.submit(scan_port, ip, port) for port in range(port_start, port_end + 1)] for future in as_completed(futures): if shutdown_event.is_set(): break try: future.result() except Exception as e: with print_lock: print(f"[!] port scan err on {ip}: {e}") print_status_line(ip, "scan done.") def ping_ip_threaded(ip, ping_cmd): if shutdown_event.is_set(): return None return ip if ping_ip(ip, ping_cmd) else None def ping_ips_concurrently(ips, ping_cmd, max_workers=50): alive_ips = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(ping_ip_threaded, ip, ping_cmd): ip for ip in ips} for future in as_completed(futures): if shutdown_event.is_set(): break ip = futures[future] try: result = future.result() if result: alive_ips.append(ip) print(f"[+] found {ip}") else: print(f"[~] no response from {ip}") except Exception as e: print(f"[!] ping error for {ip}: {e}") return alive_ips def main(): try: ip_range = input("enter ip range (e.g., 192.168.1.1-192.168.1.20) [default 192.168.0.0-192.168.255.255]: ").strip() port_range = input("port range (e.g., 1-65535) [default 1-10000]: ").strip() if ip_range == "": ip_start_str, ip_end_str = "192.168.0.0", "192.168.255.255" else: ip_start_str, ip_end_str = ip_range.split('-') if port_range == "": port_start, port_end = 1, 10000 else: port_start, port_end = map(int, port_range.split('-')) ip_start = ipaddress.IPv4Address(ip_start_str) ip_end = ipaddress.IPv4Address(ip_end_str) if ip_start > ip_end: raise ValueError("start ip > end ip") except Exception as e: print(f"[!] bad input: {e}") sys.exit(1) print("-"*60) print(f"scanning {ip_start} to {ip_end}") print(f"ports {port_start} to {port_end}") print("started at:", str(datetime.now())) print("-"*60) osname = platform.system().lower() if osname == "windows": ping_cmd = ["ping", "-n", "1", "-w", "1000"] elif osname == "darwin": ping_cmd = ["ping", "-c", "1"] else: ping_cmd = ["ping", "-c", "1", "-W", "1"] all_ips = [str(ipaddress.IPv4Address(i)) for i in range(int(ip_start), int(ip_end)+1)] any_public = False for ip_str in all_ips: ip_obj = ipaddress.IPv4Address(ip_str) if not (ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved): any_public = True break if any_public: print("[~] public IPs detected, skipping ping and scanning directly...") ips_to_scan = all_ips else: private_ips = [ip for ip in all_ips if is_private_ip(ip)] print("[~] pinging private IPs...") alive_ips = ping_ips_concurrently(private_ips, ping_cmd) if not alive_ips: print("[-] no IPs responded to ping. exiting.") sys.exit(0) ips_to_scan = alive_ips clear_screen() for idx, ip in enumerate(ips_to_scan): ip_line_map[ip] = idx + 1 for ip in ips_to_scan: print_status_line(ip, "waiting to scan...") for ip in ips_to_scan: if shutdown_event.is_set(): break scan_ports_for_ip(ip, port_start, port_end) bottom_line = len(ip_line_map) + 3 + len(open_port_msgs) move_cursor(bottom_line, 0) clear_line() print("\n" + "-"*60) print("scan done at:", str(datetime.now())) print("-"*60 + "\n") with open("summary.txt", "w") as f: if open_ports: f.write("[+] scan summary: open ports found with fingerprints\n") for ip, port, host, fingerprint in open_ports: f.write(f" - {ip} : port {port} open - hostname: {host}") if fingerprint: f.write(f" | {fingerprint}") f.write("\n") else: f.write("no open ports found.\n") print("[+] scan summary written to summary.txt :p\n") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n[!] ctrl+c caught. stopping...") sys.exit(1)