import json import numpy as np from rabbit_client import alert, Client from shapely.geometry import Polygon class BaseLogic: def __init__(self, cam=1, max_objects=1000, man_on_top_thresh=1.0, vicinity_thresh=4.0, overlap_thresh=1.0 ): self.cam = cam self.cam_configs = self._load_config() self.trucks = ["open_truck", "closed_truck"] self.man_on_top_thresh = man_on_top_thresh self.vicinity_thresh = vicinity_thresh self.last_frames = None self.park_zones = [{"x":12, "y":14}, {"x":34, "y":65}, {"x":92, "y":114}] self.overlap_thresh = overlap_thresh def _load_config(self): config = {} with open("configs/cams_config.json", "r") as cfg: config = json.load(cfg) return config def standardize(self, dict_obj): out = {} for d in dict_obj: idx = dict_obj[d]["id"] out[idx] = dict_obj[d] return out def check_in_zone(self, zone, bbox): zone_polygon = [] for coord in zone["polygon"]: zone_polygon.append([coord["x"], coord["y"]]) zone_polygon = Polygon(zone_polygon) person_polygon = [ [bbox["l"], bbox["t"]], [bbox["r"], bbox["t"]], [bbox["r"], bbox["b"]], [bbox["l"], bbox["b"]] ] person_polygon = Polygon(person_polygon) area_intersection = zone_polygon.intersection( np.array(person_polygon) ).area area_intersection = area_intersection/person_polygon.area return area_intersection > 0.0, area_intersection def check_person_with_hardhat(self, person_detections, zones): pwh_violation = {} for zone in zones: if "Non" not in zone["id"]: continue for p_idx, person in person_detections.items(): if p_idx in pwh_violation: continue is_in_zone, _ = self.check_in_zone(zone, person["bbox"], get_area=False) has_hardhat = person["attributes"]["hardhat"] if has_hardhat != True and is_in_zone: pwh_violation[p_idx] = person return pwh_violation def check_man_on_top_of_truck_or_vicinity(self, distances, person_detections, vehicle_detections): mot_violation = [] miv_violation = [] for dist in distances: person_idx = dist["id"] has_hardhat = person_detections[person_idx]["attributes"]["hardhat"] is_driver = person_detections[person_idx]["attributes"]["driver"] vehicles = dist["distances"] for veh in vehicles: vehicle = vehicle_detections[veh["id"]] distance = veh["distance"] if vehicle["type"]["class"] in self.trucks: # check for man on truck if distance < self.man_on_top_thresh and not has_hardhat and not is_driver: mot_violation += [person_detections[person_idx]] # check for man in vicinity if self.man_on_top_thresh <= distance <= self.vicinity_thresh: miv_violation += [person_detections[person_idx]] miv_violation += [vehicle] return miv_violation, mot_violation def check_forklift_on_slope(self, vehicle_detections): fos_violation = [] for idx, vehicle in vehicle_detections.items(): if vehicle["type"]["class"] == "forklift": moving = vehicle["attributes"]["isMoving"] direction = vehicle["attributes"]["movementDirection"] slope_move = vehicle["attributes"]["slopeMovement"] if moving and direction == "front" and slope_move == "descending": fos_violation += [vehicle] return fos_violation def check_parallel_parking(self, distances, vehicle_detections): pp_violation = [] for src_dist in distances: src_id = src_dist["id"] if src_id not in vehicle_detections: continue for trg_dist in src_dist["distances"]: trg_id = trg_dist["id"] if trg_id not in vehicle_detections: continue for zone in self.park_zones: src_box = vehicle_detections[src_id]["bbox"] trg_box = vehicle_detections[trg_id]["bbox"] src_in_zone, src_intersection_area = self.check_in_zone(zone, src_box) trg_in_zone, trg_intersection_area = self.check_in_in(zone, trg_box) if (src_in_zone or trg_in_zone) \ and trg_dist["distance"] < self.pp_thresh \ and ( src_intersection_area < self.overlap_thresh or \ trg_intersection_area < self.overlap_thresh): pp_violation += [src_dist] pp_violation += [trg_dist] return pp_violation def apply_rules(self, meta, zones, distances, person_detections, vehicle_detections): person_detections = self.standardize(person_detections) vehicle_detections = self.standardize(vehicle_detections) # case 1: person with hardhat pwh_ = self.check_person_with_hardhat(person_detections, zones) # case 2&3: check man in vicinity and top of truck mot_, miv_ = self.check_man_on_top_of_truck_or_vicinity(distances, person_detections, vehicle_detections) # case 4: forklift on slope fos_ = self.check_forklift_on_slope(vehicle_detections) # case 5: parallel parking pp_ = self.check_parallel_parking(vehicle_detections) # ALERT self.violations = {"metadata": meta, "person with hardhat" : list(pwh_.values()), "man on top of truck" : mot_, "man in vicinity" : miv_, "forklift on slope" : fos_, "parallel parking" : pp_ } alert(self.violations) if __name__=="__main__": params = { "username": "guest", "password": "guest", "binding_key": "perceive.aurangabad.cam01", "exchange": "safety_exchange", "queue": "ai4safety" } THRESH = 30*30 ctr = {} zone = [(501, 193), (1193, 189), (1193, 615), (509, 611)] timer = 0 conn = Client(params) tar_fr=-1 base_logic = BaseLogic( cam=1, max_objects=1000, man_on_top_thresh=1.0, vicinity_thresh=4.0, overlap_thresh=1.0 ) zone = {"polygon" : [{"x":30, "y":30}, {"x":100, "y":30}, {"x":100, "y":100}, {"x":30, "y":100}]} bbox = {"l" : 30, "t" : 0, "r" : 50, "b" : 60} # in_zone, area = base_logic.check_in_zone(zone, bbox) # print(in_zone, area) while True: method_name, header, body = conn.client.basic_get(queue=conn.queue, auto_ack=True) if method_name: msg = json.loads(body.decode("utf-8")) frame_num = msg["frameNumber"] vehicles = msg["summary"]["vehicles"] persons = msg["summary"]["persons"] distances = msg["summary"]["distance-set"] zones = msg["summary"]["zones"] timestamp = msg["utc-timestamp"] base_logic.apply_rules(timestamp, zones=zones, distances=distances, person_detections=persons, vehicle_detections=vehicles)