logic
🧩 Syntax:
import json
import numpy as np
from rabbit_client import alert, Client
from shapely.geometry import Polygon
class BaseLogic:
def __init__(self, cam=1,
max_objects=1000,
man_on_top_thresh=1.0,
vicinity_thresh=4.0,
overlap_thresh=1.0
):
self.cam = cam
self.cam_configs = self._load_config()
self.trucks = ["open_truck", "closed_truck"]
self.man_on_top_thresh = man_on_top_thresh
self.vicinity_thresh = vicinity_thresh
self.last_frames = None
self.park_zones = [{"x":12, "y":14}, {"x":34, "y":65}, {"x":92, "y":114}]
self.overlap_thresh = overlap_thresh
def _load_config(self):
config = {}
with open("configs/cams_config.json", "r") as cfg:
config = json.load(cfg)
return config
def standardize(self, dict_obj):
out = {}
for d in dict_obj:
idx = dict_obj[d]["id"]
out[idx] = dict_obj[d]
return out
def check_in_zone(self, zone, bbox):
zone_polygon = []
for coord in zone["polygon"]:
zone_polygon.append([coord["x"], coord["y"]])
zone_polygon = Polygon(zone_polygon)
person_polygon = [ [bbox["l"], bbox["t"]],
[bbox["r"], bbox["t"]],
[bbox["r"], bbox["b"]],
[bbox["l"], bbox["b"]]
]
person_polygon = Polygon(person_polygon)
area_intersection = zone_polygon.intersection(
np.array(person_polygon)
).area
area_intersection = area_intersection/person_polygon.area
return area_intersection > 0.0, area_intersection
def check_person_with_hardhat(self, person_detections, zones):
pwh_violation = {}
for zone in zones:
if "Non" not in zone["id"]:
continue
for p_idx, person in person_detections.items():
if p_idx in pwh_violation:
continue
is_in_zone, _ = self.check_in_zone(zone, person["bbox"], get_area=False)
has_hardhat = person["attributes"]["hardhat"]
if has_hardhat != True and is_in_zone:
pwh_violation[p_idx] = person
return pwh_violation
def check_man_on_top_of_truck_or_vicinity(self, distances,
person_detections,
vehicle_detections):
mot_violation = []
miv_violation = []
for dist in distances:
person_idx = dist["id"]
has_hardhat = person_detections[person_idx]["attributes"]["hardhat"]
is_driver = person_detections[person_idx]["attributes"]["driver"]
vehicles = dist["distances"]
for veh in vehicles:
vehicle = vehicle_detections[veh["id"]]
distance = veh["distance"]
if vehicle["type"]["class"] in self.trucks:
# check for man on truck
if distance < self.man_on_top_thresh and not has_hardhat and not is_driver:
mot_violation += [person_detections[person_idx]]
# check for man in vicinity
if self.man_on_top_thresh <= distance <= self.vicinity_thresh:
miv_violation += [person_detections[person_idx]]
miv_violation += [vehicle]
return miv_violation, mot_violation
def check_forklift_on_slope(self, vehicle_detections):
fos_violation = []
for idx, vehicle in vehicle_detections.items():
if vehicle["type"]["class"] == "forklift":
moving = vehicle["attributes"]["isMoving"]
direction = vehicle["attributes"]["movementDirection"]
slope_move = vehicle["attributes"]["slopeMovement"]
if moving and direction == "front" and slope_move == "descending":
fos_violation += [vehicle]
return fos_violation
def check_parallel_parking(self, distances, vehicle_detections):
pp_violation = []
for src_dist in distances:
src_id = src_dist["id"]
if src_id not in vehicle_detections:
continue
for trg_dist in src_dist["distances"]:
trg_id = trg_dist["id"]
if trg_id not in vehicle_detections:
continue
for zone in self.park_zones:
src_box = vehicle_detections[src_id]["bbox"]
trg_box = vehicle_detections[trg_id]["bbox"]
src_in_zone, src_intersection_area = self.check_in_zone(zone, src_box)
trg_in_zone, trg_intersection_area = self.check_in_in(zone, trg_box)
if (src_in_zone or trg_in_zone) \
and trg_dist["distance"] < self.pp_thresh \
and (
src_intersection_area < self.overlap_thresh or \
trg_intersection_area < self.overlap_thresh):
pp_violation += [src_dist]
pp_violation += [trg_dist]
return pp_violation
def apply_rules(self, meta, zones, distances, person_detections, vehicle_detections):
person_detections = self.standardize(person_detections)
vehicle_detections = self.standardize(vehicle_detections)
# case 1: person with hardhat
pwh_ = self.check_person_with_hardhat(person_detections, zones)
# case 2&3: check man in vicinity and top of truck
mot_, miv_ = self.check_man_on_top_of_truck_or_vicinity(distances,
person_detections,
vehicle_detections)
# case 4: forklift on slope
fos_ = self.check_forklift_on_slope(vehicle_detections)
# case 5: parallel parking
pp_ = self.check_parallel_parking(vehicle_detections)
# ALERT
self.violations = {"metadata": meta,
"person with hardhat" : list(pwh_.values()),
"man on top of truck" : mot_,
"man in vicinity" : miv_,
"forklift on slope" : fos_,
"parallel parking" : pp_
}
alert(self.violations)
if __name__=="__main__":
params = {
"username": "guest",
"password": "guest",
"binding_key": "perceive.aurangabad.cam01",
"exchange": "safety_exchange",
"queue": "ai4safety"
}
THRESH = 30*30
ctr = {}
zone = [(501, 193), (1193, 189), (1193, 615), (509, 611)]
timer = 0
conn = Client(params)
tar_fr=-1
base_logic = BaseLogic(
cam=1,
max_objects=1000,
man_on_top_thresh=1.0,
vicinity_thresh=4.0,
overlap_thresh=1.0
)
zone = {"polygon" : [{"x":30, "y":30}, {"x":100, "y":30}, {"x":100, "y":100}, {"x":30, "y":100}]}
bbox = {"l" : 30,
"t" : 0,
"r" : 50,
"b" : 60}
# in_zone, area = base_logic.check_in_zone(zone, bbox)
# print(in_zone, area)
while True:
method_name, header, body = conn.client.basic_get(queue=conn.queue, auto_ack=True)
if method_name:
msg = json.loads(body.decode("utf-8"))
frame_num = msg["frameNumber"]
vehicles = msg["summary"]["vehicles"]
persons = msg["summary"]["persons"]
distances = msg["summary"]["distance-set"]
zones = msg["summary"]["zones"]
timestamp = msg["utc-timestamp"]
base_logic.apply_rules(timestamp,
zones=zones,
distances=distances,
person_detections=persons,
vehicle_detections=vehicles)