""" Optimized collision detection system using NumPy for vectorized operations. This module provides efficient collision detection for games with many entities (200+). Uses AABB (Axis-Aligned Bounding Box) collision detection with numpy vectorization. HYBRID APPROACH: - For < 50 units: Uses simple dictionary-based approach (low overhead) - For >= 50 units: Uses NumPy vectorization (scales better) Performance improvements: - O(n²) → O(n) for spatial queries using grid-based hashing - Vectorized AABB checks for large unit counts - Minimal overhead for small unit counts """ import numpy as np from typing import Dict, List, Tuple, Set from dataclasses import dataclass # Threshold for switching to NumPy mode NUMPY_THRESHOLD = 50 @dataclass class CollisionLayer: """Define which types of units can collide with each other.""" RAT = 0 BOMB = 1 GAS = 2 MINE = 3 POINT = 4 EXPLOSION = 5 class CollisionSystem: """ Manages collision detection for all game units using NumPy vectorization. Attributes ---------- cell_size : int Size of each grid cell in pixels grid_width : int Number of cells in grid width grid_height : int Number of cells in grid height """ def __init__(self, cell_size: int, grid_width: int, grid_height: int): self.cell_size = cell_size self.grid_width = grid_width self.grid_height = grid_height # Spatial grid for fast lookups self.spatial_grid: Dict[Tuple[int, int], List] = {} self.spatial_grid_before: Dict[Tuple[int, int], List] = {} # Arrays for vectorized operations self.unit_ids = [] self.bboxes = np.array([], dtype=np.float32).reshape(0, 4) # (x1, y1, x2, y2) self.positions = np.array([], dtype=np.int32).reshape(0, 2) # (x, y) self.positions_before = np.array([], dtype=np.int32).reshape(0, 2) self.layers = np.array([], dtype=np.int8) # Pre-allocation tracking self._capacity = 0 self._size = 0 # Collision matrix: which layers collide with which self.collision_matrix = np.zeros((6, 6), dtype=bool) self._setup_collision_matrix() def _setup_collision_matrix(self): """Define which collision layers interact with each other.""" L = CollisionLayer # Rats collide with: Rats, Bombs, Gas, Mines, Points self.collision_matrix[L.RAT, L.RAT] = True self.collision_matrix[L.RAT, L.BOMB] = False # Bombs don't kill on contact self.collision_matrix[L.RAT, L.GAS] = True self.collision_matrix[L.RAT, L.MINE] = True self.collision_matrix[L.RAT, L.POINT] = True self.collision_matrix[L.RAT, L.EXPLOSION] = True # Gas affects rats self.collision_matrix[L.GAS, L.RAT] = True # Mines trigger on rats self.collision_matrix[L.MINE, L.RAT] = True # Points collected by rats (handled in point logic) self.collision_matrix[L.POINT, L.RAT] = True # Explosions kill rats self.collision_matrix[L.EXPLOSION, L.RAT] = True # Make matrix symmetric self.collision_matrix = np.logical_or(self.collision_matrix, self.collision_matrix.T) def clear(self): """Clear all collision data for new frame.""" self.spatial_grid.clear() self.spatial_grid_before.clear() self.unit_ids = [] self.bboxes = np.array([], dtype=np.float32).reshape(0, 4) self.positions = np.array([], dtype=np.int32).reshape(0, 2) self.positions_before = np.array([], dtype=np.int32).reshape(0, 2) self.layers = np.array([], dtype=np.int8) def register_unit(self, unit_id, bbox: Tuple[float, float, float, float], position: Tuple[int, int], position_before: Tuple[int, int], layer: int): """ Register a unit for collision detection this frame. Parameters ---------- unit_id : UUID Unique identifier for the unit bbox : tuple Bounding box (x1, y1, x2, y2) position : tuple Current grid position (x, y) position_before : tuple Previous grid position (x, y) layer : int Collision layer (from CollisionLayer enum) """ idx = len(self.unit_ids) self.unit_ids.append(unit_id) # Pre-allocate arrays in batches to reduce overhead if len(self.bboxes) == 0: # Initialize with reasonable capacity self.bboxes = np.empty((100, 4), dtype=np.float32) self.positions = np.empty((100, 2), dtype=np.int32) self.positions_before = np.empty((100, 2), dtype=np.int32) self.layers = np.empty(100, dtype=np.int8) self._capacity = 100 self._size = 0 elif self._size >= self._capacity: # Expand capacity new_capacity = self._capacity * 2 self.bboxes = np.resize(self.bboxes, (new_capacity, 4)) self.positions = np.resize(self.positions, (new_capacity, 2)) self.positions_before = np.resize(self.positions_before, (new_capacity, 2)) self.layers = np.resize(self.layers, new_capacity) self._capacity = new_capacity # Add data self.bboxes[self._size] = bbox self.positions[self._size] = position self.positions_before[self._size] = position_before self.layers[self._size] = layer self._size += 1 # Add to spatial grids self.spatial_grid.setdefault(position, []).append(idx) self.spatial_grid_before.setdefault(position_before, []).append(idx) def check_aabb_collision(self, idx1: int, idx2: int, tolerance: int = 0) -> bool: """ Check AABB collision between two units. Parameters ---------- idx1, idx2 : int Indices in the arrays tolerance : int Overlap tolerance in pixels (reduces detection zone) Returns ------- bool True if bounding boxes overlap """ bbox1 = self.bboxes[idx1] bbox2 = self.bboxes[idx2] return (bbox1[0] < bbox2[2] - tolerance and bbox1[2] > bbox2[0] + tolerance and bbox1[1] < bbox2[3] - tolerance and bbox1[3] > bbox2[1] + tolerance) def check_aabb_collision_vectorized(self, idx: int, indices: np.ndarray, tolerance: int = 0) -> np.ndarray: """ Vectorized AABB collision check between one unit and many others. Parameters ---------- idx : int Index of the unit to check indices : ndarray Array of indices to check against tolerance : int Overlap tolerance in pixels Returns ------- ndarray Boolean array indicating collisions """ if len(indices) == 0: return np.array([], dtype=bool) # Slice actual data size, not full capacity bbox = self.bboxes[idx] other_bboxes = self.bboxes[indices] # Vectorized AABB check collisions = ( (bbox[0] < other_bboxes[:, 2] - tolerance) & (bbox[2] > other_bboxes[:, 0] + tolerance) & (bbox[1] < other_bboxes[:, 3] - tolerance) & (bbox[3] > other_bboxes[:, 1] + tolerance) ) return collisions def get_collisions_for_unit(self, unit_id, layer: int, tolerance: int = 0) -> List[Tuple[int, any]]: """ Get all units colliding with the specified unit. Uses hybrid approach: simple method for few units, numpy for many. Parameters ---------- unit_id : UUID ID of the unit to check layer : int Collision layer of the unit tolerance : int Overlap tolerance Returns ------- list List of tuples (index, unit_id) for colliding units """ if unit_id not in self.unit_ids: return [] idx = self.unit_ids.index(unit_id) position = tuple(self.positions[idx]) position_before = tuple(self.positions_before[idx]) # Get candidate indices from spatial grid candidates = set() for pos in [position, position_before]: candidates.update(self.spatial_grid.get(pos, [])) candidates.update(self.spatial_grid_before.get(pos, [])) # Remove self and out-of-bounds indices candidates.discard(idx) candidates = {c for c in candidates if c < self._size} if not candidates: return [] # HYBRID APPROACH: Use simple method for few candidates if len(candidates) < 10: return self._simple_collision_check(idx, candidates, layer, tolerance) # NumPy vectorized approach for many candidates candidates_array = np.array(list(candidates), dtype=np.int32) candidate_layers = self.layers[candidates_array] # Check collision matrix can_collide = self.collision_matrix[layer, candidate_layers] valid_candidates = candidates_array[can_collide] if len(valid_candidates) == 0: return [] # Vectorized AABB check collisions = self.check_aabb_collision_vectorized(idx, valid_candidates, tolerance) colliding_indices = valid_candidates[collisions] # Return list of (index, unit_id) pairs return [(int(i), self.unit_ids[i]) for i in colliding_indices] def _simple_collision_check(self, idx: int, candidates: set, layer: int, tolerance: int) -> List[Tuple[int, any]]: """ Simple collision check without numpy overhead. Used when there are few candidates. """ results = [] bbox = self.bboxes[idx] for other_idx in candidates: # Check collision layer if not self.collision_matrix[layer, self.layers[other_idx]]: continue # AABB check other_bbox = self.bboxes[other_idx] if (bbox[0] < other_bbox[2] - tolerance and bbox[2] > other_bbox[0] + tolerance and bbox[1] < other_bbox[3] - tolerance and bbox[3] > other_bbox[1] + tolerance): results.append((int(other_idx), self.unit_ids[other_idx])) return results def get_units_in_cell(self, position: Tuple[int, int], use_before: bool = False) -> List[any]: """ Get all unit IDs in a specific grid cell. Parameters ---------- position : tuple Grid position (x, y) use_before : bool If True, use position_before instead of position Returns ------- list List of unit IDs in that cell """ grid = self.spatial_grid_before if use_before else self.spatial_grid indices = grid.get(position, []) return [self.unit_ids[i] for i in indices] def get_units_in_area(self, positions: List[Tuple[int, int]], layer_filter: int = None) -> Set[any]: """ Get all units in multiple grid cells (useful for explosions). Parameters ---------- positions : list List of grid positions to check layer_filter : int, optional If provided, only return units of this layer Returns ------- set Set of unique unit IDs in the area """ unit_set = set() for pos in positions: # Check both current and previous positions for grid in [self.spatial_grid, self.spatial_grid_before]: indices = grid.get(pos, []) for idx in indices: if layer_filter is None or self.layers[idx] == layer_filter: unit_set.add(self.unit_ids[idx]) return unit_set def check_partial_move_collision(self, unit_id, partial_move: float, threshold: float = 0.5) -> List[any]: """ Check collisions considering partial movement progress. For units moving between cells, checks if they should be considered in current or previous cell based on movement progress. Parameters ---------- unit_id : UUID Unit to check partial_move : float Movement progress (0.0 to 1.0) threshold : float Movement threshold for position consideration Returns ------- list List of unit IDs in collision """ if unit_id not in self.unit_ids: return [] idx = self.unit_ids.index(unit_id) # Choose position based on partial move if partial_move >= threshold: position = tuple(self.positions[idx]) else: position = tuple(self.positions_before[idx]) # Get units in that position indices = self.spatial_grid.get(position, []) + \ self.spatial_grid_before.get(position, []) # Remove duplicates and self indices = list(set(indices)) if idx in indices: indices.remove(idx) return [self.unit_ids[i] for i in indices]