portscraper/main.py
WhatDidYouExpect 4277e56865 kill me
2025-07-26 02:19:21 +02:00

249 lines
7.7 KiB
Python

import socket
from datetime import datetime
import ipaddress
import sys
import subprocess
import platform
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import queue
import json
log_queue = queue.Queue()
print_lock = threading.Lock()
ip_status_lock = threading.Lock()
shutdown_event = threading.Event()
ip_line_map = {}
open_ports = []
open_ports_lock = threading.Lock()
open_port_msgs = []
open_port_msgs_lock = threading.Lock()
bottom_log_start_line = 0
def clear_screen():
sys.stdout.write("\033[2J")
sys.stdout.flush()
def move_cursor(row, col=0):
sys.stdout.write(f"\033[{row};{col}H")
def clear_line():
sys.stdout.write("\033[K")
def print_status_line(ip, msg):
with ip_status_lock:
line = ip_line_map.get(ip)
if line:
move_cursor(line)
clear_line()
sys.stdout.write(f"[{ip}] {msg}")
sys.stdout.flush()
def print_open_port_line(msg):
with open_port_msgs_lock:
open_port_msgs.append(msg)
bottom_line = len(ip_line_map) + 2 + len(open_port_msgs)
move_cursor(bottom_line)
clear_line()
sys.stdout.write(msg + "\n")
sys.stdout.flush()
def ping_ip(ip, ping_cmd):
try:
cmd = ping_cmd + [ip]
result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=1)
return result.returncode == 0
except subprocess.TimeoutExpired:
return False
except:
return False
def is_private_ip(ip):
return ipaddress.IPv4Address(ip).is_private
def grab_banner(ip, port):
try:
s = socket.socket()
s.settimeout(1)
s.connect((ip, port))
if port == 22:
banner = s.recv(1024).decode(errors='ignore').strip()
return banner
elif port == 5900:
banner = s.recv(12).decode(errors='ignore').strip()
return banner
else:
return None
except:
return None
finally:
s.close()
def identify_service_and_os(ip, port):
json_path = "services.json"
try:
with open(json_path, "r") as f:
data = json.load(f)
services = {int(k): v for k, v in data.items()}
except Exception:
services = {}
service_info = services.get(port)
fingerprint = None
if service_info:
banner = grab_banner(ip, port)
if banner:
fingerprint = f"{service_info['name']} banner: {banner}"
else:
fingerprint = f"{service_info['name']}"
return fingerprint
def scan_port(ip, port):
print_status_line(ip, f"scanning port {port}...")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.5)
try:
result = s.connect_ex((ip, port))
if result == 0:
try:
hostname = socket.gethostbyaddr(ip)[0]
except socket.herror:
hostname = "unknown hostname"
fingerprint = identify_service_and_os(ip, port)
with open_ports_lock:
open_ports.append((ip, port, hostname, fingerprint))
msg = f"[{ip}] port {port} is OPEN ({hostname})"
if fingerprint:
msg += f" | {fingerprint}"
print_open_port_line(msg)
print_status_line(ip, f"open port found: {port}")
finally:
s.close()
def scan_ports_for_ip(ip, port_start, port_end):
with ThreadPoolExecutor(max_workers=10) as p_exec:
futures = [p_exec.submit(scan_port, ip, port) for port in range(port_start, port_end + 1)]
for future in as_completed(futures):
if shutdown_event.is_set():
break
try:
future.result()
except Exception as e:
with print_lock:
print(f"[!] port scan err on {ip}: {e}")
print_status_line(ip, "scan done.")
def ping_ip_threaded(ip, ping_cmd):
if shutdown_event.is_set():
return None
return ip if ping_ip(ip, ping_cmd) else None
def ping_ips_concurrently(ips, ping_cmd, max_workers=50):
alive_ips = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(ping_ip_threaded, ip, ping_cmd): ip for ip in ips}
for future in as_completed(futures):
if shutdown_event.is_set():
break
ip = futures[future]
try:
result = future.result()
if result:
alive_ips.append(ip)
print(f"[+] found {ip}")
else:
print(f"[~] no response from {ip}")
except Exception as e:
print(f"[!] ping error for {ip}: {e}")
return alive_ips
def main():
try:
ip_range = input("enter ip range (e.g., 192.168.1.1-192.168.1.20) [default 192.168.0.0-192.168.255.255]: ").strip()
port_range = input("port range (e.g., 1-65535) [default 1-10000]: ").strip()
if ip_range == "":
ip_start_str, ip_end_str = "192.168.0.0", "192.168.255.255"
else:
ip_start_str, ip_end_str = ip_range.split('-')
if port_range == "":
port_start, port_end = 1, 10000
else:
port_start, port_end = map(int, port_range.split('-'))
ip_start = ipaddress.IPv4Address(ip_start_str)
ip_end = ipaddress.IPv4Address(ip_end_str)
if ip_start > ip_end:
raise ValueError("start ip > end ip")
except Exception as e:
print(f"[!] bad input: {e}")
sys.exit(1)
print("-"*60)
print(f"scanning {ip_start} to {ip_end}")
print(f"ports {port_start} to {port_end}")
print("started at:", str(datetime.now()))
print("-"*60)
osname = platform.system().lower()
if osname == "windows":
ping_cmd = ["ping", "-n", "1", "-w", "1000"]
elif osname == "darwin":
ping_cmd = ["ping", "-c", "1"]
else:
ping_cmd = ["ping", "-c", "1", "-W", "1"]
all_ips = [str(ipaddress.IPv4Address(i)) for i in range(int(ip_start), int(ip_end)+1)]
any_public = False
for ip_str in all_ips:
ip_obj = ipaddress.IPv4Address(ip_str)
if not (ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved):
any_public = True
break
if any_public:
print("[~] public IPs detected, skipping ping and scanning directly...")
ips_to_scan = all_ips
else:
private_ips = [ip for ip in all_ips if is_private_ip(ip)]
print("[~] pinging private IPs...")
alive_ips = ping_ips_concurrently(private_ips, ping_cmd)
if not alive_ips:
print("[-] no IPs responded to ping. exiting.")
sys.exit(0)
ips_to_scan = alive_ips
clear_screen()
for idx, ip in enumerate(ips_to_scan):
ip_line_map[ip] = idx + 1
for ip in ips_to_scan:
print_status_line(ip, "waiting to scan...")
for ip in ips_to_scan:
if shutdown_event.is_set():
break
scan_ports_for_ip(ip, port_start, port_end)
print("\n" + "-"*60)
print("scan done at:", str(datetime.now()))
print("-"*60)
with open("summary.txt", "w") as f:
if open_ports:
f.write("[+] scan summary: open ports found with fingerprints\n")
for ip, port, host, fingerprint in open_ports:
f.write(f" - {ip} : port {port} open - hostname: {host}")
if fingerprint:
f.write(f" | {fingerprint}")
f.write("\n")
else:
f.write("no open ports found.\n")
print("\n[+] scan summary written to summary.txt :p")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n[!] ctrl+c caught. stopping...")
sys.exit(1)