Codigo Python para Bloqueos contra DDoS - Source Mu - Mu Server Files
 

Noticias:

SMF - Just Installed!

Menú principal

Codigo Python para Bloqueos contra DDoS

Publicado por Dakosmu, Mar 20, 2026, 04:24 PM

Tema anterior - Siguiente tema

0 Miembros y 1 Visitante están viendo este tema.

Dakosmu

les vengo a traer un codigo que me ha resultado muy bueno a la hora de restringir ciertos ataques contra ddos y con la ayuda de ChatGPT se logro mejorar aun mas el codigo para aplicar mas funciones, que se las hare detallar a continuacion:

changelog

Citar=======================================================================
           CHANGE LOG - SISTEMA DE DEFENSA ANTI-DDoS (v2.0)
=======================================================================

PROYECTO: Monitor de Tráfico y Mitigación de Ataques
FECHA: 2026-01-20
ESTADO: Implementación de Alto Rendimiento (Kernel Level)

-----------------------------------------------------------------------
[VERSIÓN 1.0] - Monitorización Básica (Sockets Raw)
-----------------------------------------------------------------------
* [ESTRUCTURA]: Implementación en Python puro usando hilos (threading).
* [DETECCIÓN]: Uso de 'socket.AF_PACKET' para capturar tráfico.
* [ANÁLISIS]: Desempaquetado manual de cabeceras IP con 'struct.unpack'.
* [LIMITACIÓN]: Procesamiento en "User Space". El sistema es vulnerable
    a saturación de CPU ante ataques de gran volumen.
* [ACCIÓN]: Solo detección lógica; no bloqueaba tráfico real.

-----------------------------------------------------------------------
[VERSIÓN 1.5] - Mitigación con Firewall (iptables)
-----------------------------------------------------------------------
* [NUEVO]: Integración con el sistema operativo mediante 'subprocess'.
* [ACCIÓN]: Bloqueo real de direcciones IP sospechosas usando el comando:
    'iptables -A INPUT -s [IP] -j DROP'.
* [GESTIÓN]: Sistema de desbloqueo automático tras 'BLOCK_TIME'.
* [MEJORA]: La IP atacante ya no puede establecer conexiones.

-----------------------------------------------------------------------
[VERSIÓN 2.0] - Optimización Extrema (eBPF / XDP) - ACTUAL
-----------------------------------------------------------------------
* [ARQUITECTURA]: Migración del procesamiento de "User Space" al "Kernel Space".
* [TECNOLOGÍA]: Implementación de tecnología XDP (eXpress Data Path).
* [NUEVO]: Inyección de código en C directamente en la tarjeta de red.
* [RENDIMIENTO]:
    - Los paquetes se filtran ANTES de que el Kernel de Linux los procese.
    - Consumo de CPU drásticamente reducido durante ataques de inundación.
* [MEMORIA]: Uso de BPF_HASH (Maps) para comunicación ultra-rápida entre
    el código de C y el script de control en Python.
* [SEGURIDAD]: Bloqueo preventivo instantáneo mediante el retorno 'XDP_DROP'
    dentro de la NIC (Network Interface Card).

-----------------------------------------------------------------------
NOTAS TÉCNICAS:
- Requiere privilegios de ROOT para cargar programas en el Kernel.
- Requiere la librería BCC (BPF Compiler Collection).
- Soporte exclusivo para sistemas operativos Linux (Kernel 4.8+).
=======================================================================

El código proporcionado es un buen comienzo para detectar posibles ataques DDoS mediante el monitoreo del tráfico de red y la identificación de direcciones IP que envían un número inusualmente alto de paquetes. Sin embargo, hay varias mejoras que podrían implementarse para fortalecer la protección contra ataques DDoS:

Agregar filtrado de paquetes: Actualmente, el código solo cuenta el número de paquetes recibidos por dirección IP. Podrías considerar agregar filtrado de paquetes para descartar paquetes maliciosos antes de procesarlos más a fondo.

Implementar un mecanismo de bloqueo temporal: En lugar de simplemente imprimir un mensaje cuando se detecta un posible ataque DDoS, podrías implementar un mecanismo para bloquear temporalmente la dirección IP sospechosa durante un período determinado.

Optimizar el rendimiento: Si bien el código actual funciona, podría optimizarse para mejorar el rendimiento y reducir la carga del sistema. Por ejemplo, podrías explorar técnicas para mejorar la eficiencia en la manipulación de paquetes y el acceso concurrente a la estructura de datos del contador de paquetes.


Codigo 1
import socket
import struct
import threading
import time

# Configuración inicial
HOST = '0.0.0.0'
PORT = 80
THRESHOLD = 1000  # Número de paquetes por segundo para considerar un ataque DDoS
BLOCK_TIME = 60  # Duración del bloqueo en segundos

# Estructura de datos para almacenar las direcciones IP bloqueadas temporalmente
blocked_ips = {}

# Contador de paquetes
packet_counts = {}

# Bloqueo para sincronizar el acceso a las estructuras de datos
lock = threading.Lock()

def monitor_traffic():
    global packet_counts
    with socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003)) as s:
        while True:
            packet, _ = s.recvfrom(65565)
            # Desempaqueta el paquete para obtener la dirección IP de origen
            ip_header = packet[14:34]
            iph = struct.unpack('!BBHHHBBH4s4s', ip_header)
            src_ip = socket.inet_ntoa(iph[8])
           
            # Filtra paquetes maliciosos antes de procesarlos
            if not is_packet_valid(packet):
                continue
           
            # Actualiza el contador de paquetes
            with lock:
                if src_ip in packet_counts:
                    packet_counts[src_ip] += 1
                else:
                    packet_counts[src_ip] = 1

def detect_ddos():
    global packet_counts, blocked_ips
    while True:
        with lock:
            for src_ip, count in packet_counts.items():
                if count > THRESHOLD:
                    if src_ip not in blocked_ips:
                        print(f"Detectado posible ataque DDoS de {src_ip}")
                        block_ip(src_ip)
            packet_counts = {}  # Reinicia el contador
        time.sleep(1)

def is_packet_valid(packet):
    # Aquí puedes implementar la lógica para filtrar paquetes maliciosos
    return True

def block_ip(ip):
    global blocked_ips
    blocked_ips[ip] = time.time() + BLOCK_TIME
    # Aquí podrías realizar acciones adicionales, como bloquear la IP en el firewall

def unblock_expired_ips():
    global blocked_ips
    while True:
        current_time = time.time()
        with lock:
            for ip, block_time in list(blocked_ips.items()):
                if current_time > block_time:
                    del blocked_ips[ip]
        time.sleep(1)

# Inicia las threads de monitorización y detección
traffic_thread = threading.Thread(target=monitor_traffic)
ddos_thread = threading.Thread(target=detect_ddos)
unblock_thread = threading.Thread(target=unblock_expired_ips)

traffic_thread.start()
ddos_thread.start()
unblock_thread.start()


Codigo 2 de prueba

import socket
import struct
import threading
import time
import os
import subprocess

# Configuración inicial
HOST = '0.0.0.0'
PORT = 80
THRESHOLD = 1000  # Paquetes por segundo
BLOCK_TIME = 60   # Segundos

blocked_ips = {}
packet_counts = {}
lock = threading.Lock()

def is_packet_valid(packet):
    # En un sistema real, aquí inspeccionarías si el paquete es malformado
    return True

def block_ip(ip):
    """Ejecuta un comando real de sistema para bloquear la IP"""
    global blocked_ips
    with lock:
        if ip not in blocked_ips:
            print(f"[BLOQUEO] Aplicando baneo real a: {ip}")
            # Comando para insertar una regla de rechazo en el firewall de Linux
            subprocess.run(["iptables", "-A", "INPUT", "-s", ip, "-j", "DROP"])
            blocked_ips[ip] = time.time() + BLOCK_TIME

def unblock_expired_ips():
    """Limpia las reglas de iptables cuando pasa el tiempo"""
    global blocked_ips
    while True:
        current_time = time.time()
        with lock:
            for ip, expiry in list(blocked_ips.items()):
                if current_time > expiry:
                    print(f"[DESBLOQUEO] Liberando IP: {ip}")
                    subprocess.run(["iptables", "-D", "INPUT", "-s", ip, "-j", "DROP"])
                    del blocked_ips[ip]
        time.sleep(1)

def monitor_traffic():
    global packet_counts
    # Se requiere ejecutar como ROOT para abrir un socket RAW
    try:
        s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
    except PermissionError:
        print("Error: Debes ejecutar este script como ROOT (sudo).")
        return

    while True:
        packet, _ = s.recvfrom(65565)
       
        # Extraer IP de origen (Capa de Red - IPv4)
        ip_header = packet[14:34]
        iph = struct.unpack('!BBHHHBBH4s4s', ip_header)
        src_ip = socket.inet_ntoa(iph[8])
       
        if not is_packet_valid(packet):
            continue
           
        with lock:
            packet_counts[src_ip] = packet_counts.get(src_ip, 0) + 1

def detect_ddos():
    global packet_counts
    while True:
        time.sleep(1) # Ventana de tiempo de 1 segundo
        with lock:
            for src_ip, count in packet_counts.items():
                if count > THRESHOLD:
                    block_ip(src_ip)
            packet_counts.clear() # Reiniciamos conteo para el siguiente segundo

# Hilos de ejecución
threads = [
    threading.Thread(target=monitor_traffic, daemon=True),
    threading.Thread(target=detect_ddos, daemon=True),
    threading.Thread(target=unblock_expired_ips, daemon=True)
]

if __name__ == "__main__":
    print("Iniciando Sistema de Monitoreo Activo...")
    for t in threads:
        t.start()
   
    # Mantener el programa principal vivo
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\nApagando sistema y limpiando reglas...")
        # Aquí podrías añadir lógica para limpiar todas las IPs bloqueadas antes de salir



Codigo 3 de prueba

from bcc import BPF
import time
import ctypes

# 1. CÓDIGO EN C (Se ejecuta en el Kernel)
# Este programa cuenta paquetes por IP directamente en la tarjeta de red
ebpf_code = """
#include <uapi/linux/bpf.h>
#include <uapi/linux/if_ether.h>
#include <uapi/linux/ip.h>

BPF_HASH(packet_counts, u32, u64); // Diccionario en Kernel: IP -> Contador

int ddos_monitor(struct xdp_md *ctx) {
    void *data = (void *)(long)ctx->data;
    void *data_end = (void *)(long)ctx->data_end;
    struct ethhdr *eth = data;

    // Verificar que el paquete sea IP
    if ((void*)eth + sizeof(*eth) > data_end) return XDP_PASS;
    if (eth->h_proto != __constant_htons(ETH_P_IP)) return XDP_PASS;

    struct iphdr *ip = data + sizeof(*eth);
    if ((void*)ip + sizeof(*ip) > data_end) return XDP_PASS;

    u32 src_ip = ip->saddr;
    u64 *count, one = 1;

    // Incrementar el contador en el mapa del Kernel
    count = packet_counts.lookup(&src_ip);
    if (count) {
        *count += 1;
    } else {
        packet_counts.update(&src_ip, &one);
    }

    return XDP_PASS; // Deja pasar el paquete, Python decidirá si bloquearlo después
}
"""

# 2. CÓDIGO EN PYTHON (Controlador)
try:
    # Cargar el programa en el Kernel
    b = BPF(text=ebpf_code)
    fn = b.load_func("ddos_monitor", BPF.XDP)
   
    # Asociar a una interfaz de red (ejemplo: 'eth0' o 'wlan0')
    device = "eth0"
    b.attach_xdp(device, fn, 0)
   
    print(f"Monitoreando tráfico con XDP en {device}...")
    counts = b.get_table("packet_counts")

    while True:
        try:
            time.sleep(1)
            print("--- Reporte de tráfico (IP | Paquetes/seg) ---")
            for k, v in counts.items():
                # Convertir IP de entero a formato legible
                ip_str = socket.inet_ntoa(struct.pack("<L", k.value))
                print(f"{ip_str}: {v.value}")
               
                # Si supera el umbral, aquí podrías activar un bloqueo XDP_DROP
                if v.value > 1000:
                    print(f"¡ALERTA! {ip_str} superó el umbral.")
           
            # Limpiar contadores para el siguiente segundo
            counts.clear()
           
        except KeyboardInterrupt:
            break
finally:
    # Quitar el programa del Kernel al salir
    b.remove_xdp(device, 0)
    print("\nSistema XDP detenido.")

Codigo 4 prueba

from bcc import BPF
import socket
import struct
import time

# 1. PROGRAMA EN C PARA EL KERNEL
ebpf_program = """
#include <uapi/linux/bpf.h>
#include <uapi/linux/if_ether.h>
#include <uapi/linux/ip.h>

// Tabla para contar paquetes: IP (u32) -> Cantidad (u64)
BPF_HASH(packet_counts, u32, u64);

// Tabla de bloqueo: IP (u32) -> Timestamp de expiración (u64)
BPF_HASH(blacklist, u32, u64);

int xdp_ddos_filter(struct xdp_md *ctx) {
    void *data = (void *)(long)ctx->data;
    void *data_end = (void *)(long)ctx->data_end;
    struct ethhdr *eth = data;

    // Solo procesar paquetes IPv4
    if ((void*)eth + sizeof(*eth) > data_end) return XDP_PASS;
    if (eth->h_proto != __constant_htons(ETH_P_IP)) return XDP_PASS;

    struct iphdr *ip = data + sizeof(*eth);
    if ((void*)ip + sizeof(*ip) > data_end) return XDP_PASS;

    u32 src_ip = ip->saddr;

    // --- PASO 1: REVISAR BLACKLIST ---
    u64 *blocked_until = blacklist.lookup(&src_ip);
    if (blocked_until) {
        // Si la IP está en la lista, descartamos el paquete de inmediato
        // El Kernel ni siquiera se lo pasa al Sistema Operativo
        return XDP_DROP;
    }

    // --- PASO 2: CONTAR PAQUETES ---
    u64 *count, one = 1;
    count = packet_counts.lookup(&src_ip);
    if (count) {
        *count += 1;
    } else {
        packet_counts.update(&src_ip, &one);
    }

    return XDP_PASS;
}
"""

# 2. CONTROLADOR EN PYTHON
def run_antiddos():
    interface = "eth0"  # <--- CAMBIA ESTO por tu interfaz (usa 'ip link')
   
    try:
        # Cargar programa en el Kernel
        b = BPF(text=ebpf_program)
        fn = b.load_func("xdp_ddos_filter", BPF.XDP)
        b.attach_xdp(interface, fn, 0)
       
        print(f"[*] Sistema XDP activo en {interface}. Umbral: 1000 pkt/s")
       
        packet_counts = b.get_table("packet_counts")
        blacklist = b.get_table("blacklist")
       
        # Diccionario local para manejar tiempos de desbloqueo en Python
        local_blocked_ips = {}
        BLOCK_DURATION = 60 # segundos
       
        while True:
            time.sleep(1)
            current_time = time.time()

            # Analizar contadores
            for ip_int, count in packet_counts.items():
                if count.value > 1000:
                    ip_str = socket.inet_ntoa(struct.pack("<L", ip_int.value))
                   
                    if ip_int.value not in local_blocked_ips:
                        print(f"[!] BLOQUEO ACTIVO: {ip_str} ({count.value} pkt/s)")
                        # Avisar al Kernel: "Bloquea esta IP"
                        blacklist[ip_int] = ctypes.c_uint64(int(current_time + BLOCK_DURATION))
                        local_blocked_ips[ip_int.value] = current_time + BLOCK_DURATION

            # Limpiar contadores para el nuevo segundo
            packet_counts.clear()

            # Gestionar desbloqueos
            for ip_val, expiry in list(local_blocked_ips.items()):
                if current_time > expiry:
                    ip_str = socket.inet_ntoa(struct.pack("<L", ip_val))
                    print(f"[*] Desbloqueando IP: {ip_str}")
                    del blacklist[ctypes.c_uint32(ip_val)]
                    del local_blocked_ips[ip_val]

    except KeyboardInterrupt:
        print("\nSaliendo...")
    finally:
        # IMPORTANTE: Desvincular XDP al salir
        if 'b' in locals():
            b.remove_xdp(interface, 0)
            print("[*] Programa XDP removido del Kernel.")

if __name__ == "__main__":
    run_antiddos()

Bon Dia

Dakosmu

ddos_simulator.py


Para probar un sistema de XDP/eBPF, no podemos usar un simple "ping", ya que necesitamos generar un volumen de paquetes que simule un ataque real (inundación o flooding) para ver cómo el Kernel los descarta sin inmutarse.

Para esto, usaremos la librería Scapy en un script separado. Scapy permite construir paquetes desde cero y enviarlos masivamente.

1. Requisitos para la prueba
Necesitas instalar Scapy en la máquina que hará de "atacante" (puede ser la misma máquina apuntando a 127.0.0.1 o una IP de tu red local):

pip install scapy
from scapy.all import IP, UDP, send
import random
import time

def simulate_attack(target_ip, duration_seconds):
    print(f"[*] Iniciando simulación de ataque a {target_ip} por {duration_seconds}s...")
   
    timeout = time.time() + duration_seconds
    packets_sent = 0

    try:
        while time.time() < timeout:
            # Generamos una IP de origen aleatoria para cada paquete
            # O puedes usar una fija para probar el bloqueo específico
            src_ip = "192.168.1.100"
           
            # Construcción del paquete (Capa 3: IP, Capa 4: UDP)
            packet = IP(src=src_ip, dst=target_ip) / UDP(dport=80) / ("X" * 64)
           
            # Enviamos el paquete (verbose=False para no saturar la consola)
            send(packet, verbose=False)
           
            packets_sent += 1
            if packets_sent % 100 == 0:
                print(f"[+] Paquetes enviados: {packets_sent}")

    except KeyboardInterrupt:
        print("\n[!] Simulación detenida por el usuario.")
   
    print(f"--- Simulación terminada. Total enviado: {packets_sent} ---")

if __name__ == "__main__":
    # CONFIGURACIÓN:
    TARGET = "127.0.0.1"  # Cambia por la IP donde corre tu script XDP
    DURATION = 30         # Duración de la ráfaga
   
    simulate_attack(TARGET, DURATION)

3. Cómo realizar el experimento
Para ver la magia en acción, te recomiendo tener dos terminales abiertas:

Terminal A (El Defensor): Ejecuta el script de XDP/eBPF que configuramos antes.

sudo python3 xdp_antiddos.py

Terminal B (El Atacante): Ejecuta el simulador.

sudo python3 ddos_simulator.py

¿Qué deberías observar?
En la Terminal A: Verás que el contador de paquetes de la IP 192.168.1.100 sube rápidamente.

El Bloqueo: En cuanto pase de 1000, aparecerá el mensaje [!] BLOQUEO ACTIVO.

La Prueba de Fuego: Si intentas hacer un ping legítimo desde la IP bloqueada mientras el ataque ocurre, verás que los paquetes ni siquiera llegan al sistema operativo; el Kernel los está "matando" en la puerta (XDP_DROP).


Advertencia de Seguridad
No uses este script de simulación contra servidores que no sean tuyos. Enviar paquetes masivos, incluso si son pequeños, puede ser detectado como un ataque real por los proveedores de internet (ISP) y resultar en la suspensión de tu servicio. Úsalo solo en entornos locales (LAN) o máquinas virtuales controladas.

Bon Dia

🡱 🡳
Real Time Web Analytics