open
🧩 Syntax:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Joy
import RPi.GPIO as GPIO
from time import time
class PumpControlNode(Node):
def __init__(self):
super().__init__('pump_control_node')
# GPIO pin mappings for pump control (BCM)
# Y button: front inflate, A button: front deflate
# X button: rear inflate, B button: rear deflate
self.PINS = {
'Y': 17,
'A': 27,
'X': 22,
'B': 23,
}
# Controller button indices
self.BUTTONS = {
'A': 0,
'B': 1,
'X': 2,
'Y': 3,
}
# How long a press holds the pump on
self.STOP_DURATION = 2.0 # seconds
# Setup GPIO outputs
GPIO.setmode(GPIO.BCM)
for pin in self.PINS.values():
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, GPIO.LOW)
# Track a single pressed key and its start time
self._pressed_key = None
self._press_time = 0
# ROS setup
self.create_subscription(Joy, 'joy', self.joy_callback, 10)
self.get_logger().info(
"PumpControlNode initialized: one key at a time, prints once, stops after 2s"
)
def joy_callback(self, msg: Joy):
now = time()
b = msg.buttons
# Determine which keys are currently pressed (in defined order)
for key in ['Y', 'A', 'X', 'B']:
idx = self.BUTTONS[key]
if b[idx]:
current = key
break
else:
current = None
# If a new key press detected
if current and current != self._pressed_key:
self._pressed_key = current
self._press_time = now
self.get_logger().info(f"Pressed {current} to {self._action_map(current)}")
# Check if the press duration exceeded
if self._pressed_key:
if now - self._press_time < self.STOP_DURATION:
# Keep the single pump active
self._set_gpio(self._pressed_key, True)
else:
# Time exceeded: stop and reset
self._set_all(False)
self._pressed_key = None
else:
# No key pressed: ensure all pumps off
self._set_all(False)
def _set_gpio(self, key, state):
# Turn on the one pin for key, turn off its opposite, and all others off
# Front pumps: Y/A share control; Rear pumps: X/B share control
opposite = {
'Y': 'A',
'A': 'Y',
'X': 'B',
'B': 'X',
}
# Turn off all
for pin in self.PINS.values():
GPIO.output(pin, GPIO.LOW)
# Activate desired
pin = self.PINS[key]
GPIO.output(pin, GPIO.HIGH)
# Ensure opposite is off (redundant, but clear)
opp_pin = self.PINS[opposite[key]]
GPIO.output(opp_pin, GPIO.LOW)
def _set_all(self, state):
level = GPIO.HIGH if state else GPIO.LOW
for pin in self.PINS.values():
GPIO.output(pin, level)
def _action_map(self, key):
return {
'Y': 'inflate front pump',
'A': 'deflate front pump',
'X': 'inflate back pump',
'B': 'deflate back pump',
}[key]
def destroy_node(self):
super().destroy_node()
GPIO.cleanup()
self.get_logger().info("GPIO cleaned up")
def main(args=None):
rclpy.init(args=args)
node = PumpControlNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()