logic

🧩 Syntax:
import json
import numpy as np
from rabbit_client import alert, Client

from shapely.geometry import Polygon

class BaseLogic:
    def __init__(self, cam=1,
                        max_objects=1000,
                        man_on_top_thresh=1.0,
                        vicinity_thresh=4.0,
                        overlap_thresh=1.0
                        ):
        self.cam = cam
        self.cam_configs = self._load_config()
        self.trucks = ["open_truck", "closed_truck"]
        self.man_on_top_thresh = man_on_top_thresh
        self.vicinity_thresh = vicinity_thresh
        self.last_frames = None
        self.park_zones = [{"x":12, "y":14}, {"x":34, "y":65}, {"x":92, "y":114}]
        self.overlap_thresh = overlap_thresh
    
    def _load_config(self):
        config = {}
        with open("configs/cams_config.json", "r") as cfg:
            config = json.load(cfg)
        return config
    
    def standardize(self, dict_obj):
        out = {}
        for d in dict_obj:
            idx = dict_obj[d]["id"]
            out[idx] = dict_obj[d]
        return out
    
    def check_in_zone(self, zone, bbox):
        zone_polygon = []
        for coord in zone["polygon"]:
            zone_polygon.append([coord["x"], coord["y"]])

        zone_polygon = Polygon(zone_polygon)
        person_polygon = [  [bbox["l"], bbox["t"]],
                            [bbox["r"], bbox["t"]],
                            [bbox["r"], bbox["b"]],
                            [bbox["l"], bbox["b"]]
                            ]
        person_polygon = Polygon(person_polygon)
        area_intersection = zone_polygon.intersection(
                                np.array(person_polygon)
                                ).area
        area_intersection = area_intersection/person_polygon.area
        return area_intersection > 0.0, area_intersection
    
    def check_person_with_hardhat(self, person_detections, zones):
        pwh_violation = {}
        for zone in zones:
            if "Non" not in zone["id"]:
                continue
            for p_idx, person in person_detections.items():
                if p_idx in pwh_violation:
                    continue
                is_in_zone, _ = self.check_in_zone(zone, person["bbox"], get_area=False)
                has_hardhat = person["attributes"]["hardhat"]
                if has_hardhat != True and is_in_zone:
                    pwh_violation[p_idx] = person
        return pwh_violation

    def check_man_on_top_of_truck_or_vicinity(self, distances, 
                                   person_detections,
                                   vehicle_detections):
        mot_violation = []
        miv_violation = []
        for dist in distances:
            person_idx = dist["id"]
            has_hardhat = person_detections[person_idx]["attributes"]["hardhat"]
            is_driver = person_detections[person_idx]["attributes"]["driver"]
            
            vehicles = dist["distances"]
            for veh in vehicles:
                vehicle = vehicle_detections[veh["id"]]
                distance = veh["distance"]
                if vehicle["type"]["class"] in self.trucks:
                    
                    # check for man on truck
                    if distance < self.man_on_top_thresh and not has_hardhat and not is_driver:
                        mot_violation += [person_detections[person_idx]]

                    # check for man in vicinity
                    if self.man_on_top_thresh <= distance <= self.vicinity_thresh:
                        miv_violation += [person_detections[person_idx]]
                        miv_violation += [vehicle]

        return miv_violation, mot_violation
    
    def check_forklift_on_slope(self, vehicle_detections):
        fos_violation = []
        for idx, vehicle in vehicle_detections.items():
            if vehicle["type"]["class"] == "forklift":
                moving = vehicle["attributes"]["isMoving"]
                direction = vehicle["attributes"]["movementDirection"]
                slope_move = vehicle["attributes"]["slopeMovement"]
                if moving and direction == "front" and slope_move == "descending":
                    fos_violation += [vehicle]
        return fos_violation

    def check_parallel_parking(self, distances, vehicle_detections):
        pp_violation = []
        for src_dist in distances:
            src_id = src_dist["id"]
            if src_id not in vehicle_detections:
                continue
            for trg_dist in src_dist["distances"]:
                trg_id = trg_dist["id"]
                if trg_id not in vehicle_detections:
                    continue
                for zone in self.park_zones:
                    src_box = vehicle_detections[src_id]["bbox"]
                    trg_box = vehicle_detections[trg_id]["bbox"]
                    src_in_zone, src_intersection_area = self.check_in_zone(zone, src_box)
                    trg_in_zone, trg_intersection_area = self.check_in_in(zone, trg_box)
                    if (src_in_zone or trg_in_zone) \
                        and trg_dist["distance"] < self.pp_thresh \
                        and (
                            src_intersection_area < self.overlap_thresh or \
                            trg_intersection_area < self.overlap_thresh):
                        pp_violation += [src_dist]
                        pp_violation += [trg_dist]
        return pp_violation
    
    def apply_rules(self, meta, zones, distances, person_detections, vehicle_detections):
        
        person_detections = self.standardize(person_detections)
        vehicle_detections = self.standardize(vehicle_detections)

        # case 1: person with hardhat
        pwh_ = self.check_person_with_hardhat(person_detections, zones)
        
        # case 2&3: check man in vicinity and top of truck
        mot_, miv_ = self.check_man_on_top_of_truck_or_vicinity(distances, 
                                                                person_detections,
                                                                vehicle_detections)

        # case 4: forklift on slope
        fos_ = self.check_forklift_on_slope(vehicle_detections)

        # case 5: parallel parking
        pp_ = self.check_parallel_parking(vehicle_detections)

        # ALERT
        self.violations = {"metadata": meta,
                           "person with hardhat" : list(pwh_.values()),
                            "man on top of truck" : mot_,
                            "man in vicinity" : miv_,
                            "forklift on slope" : fos_,
                            "parallel parking" : pp_
                            }
        alert(self.violations)


if __name__=="__main__":

    params = {
        "username": "guest",
        "password": "guest",
        "binding_key": "perceive.aurangabad.cam01",
        "exchange": "safety_exchange",
        "queue": "ai4safety"
    }
    THRESH = 30*30
    ctr = {}
    zone = [(501, 193), (1193, 189), (1193, 615), (509, 611)]
    timer = 0
    conn = Client(params)
    tar_fr=-1

    base_logic = BaseLogic(
                        cam=1,
                        max_objects=1000,
                        man_on_top_thresh=1.0,
                        vicinity_thresh=4.0,
                        overlap_thresh=1.0
                        )
    
    zone = {"polygon" : [{"x":30, "y":30}, {"x":100, "y":30}, {"x":100, "y":100}, {"x":30, "y":100}]}
    bbox = {"l" : 30,
          "t" : 0,
          "r" : 50,
          "b" : 60}
    # in_zone, area = base_logic.check_in_zone(zone, bbox)
    # print(in_zone, area)

    while True:
        method_name, header, body = conn.client.basic_get(queue=conn.queue, auto_ack=True)
        if method_name:
            msg = json.loads(body.decode("utf-8"))
            frame_num = msg["frameNumber"]
            vehicles = msg["summary"]["vehicles"]
            persons = msg["summary"]["persons"]
            distances = msg["summary"]["distance-set"]
            zones = msg["summary"]["zones"]
            timestamp = msg["utc-timestamp"]

            base_logic.apply_rules(timestamp,
                                    zones=zones,
                                    distances=distances,
                                    person_detections=persons,
                                    vehicle_detections=vehicles)