import bpy
import threading
import paho.mqtt.client as mqtt
import time
# Global MQTT
mqtt_client = None
mqtt_running = False
mqtt_topic = "cabot"
broker_address = "localhost"
broker_port = 1883
# path globals
path = [] # path steps
# MQTT helpers, runs via the the mqtt class
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected with result code "+str(rc))
client.subscribe(mqtt_topic)
else:
print("Failed to connect with result code "+str(rc))
def on_message(client, userdata, msg):
decoded_message = msg.payload.decode()
print(f"Message mottatt: {decoded_message}")
coordinates = decoded_message.split(',')
if len(coordinates) == 3: # example of message -m "x,y,z"
try:
position = tuple(float(coord) for coord in coordinates)
bpy.app.timers.register(lambda: set_object_position("End-Effector", position), first_interval=0.1)
except ValueError:
print("Error: ikke tall LOL")
def set_object_position(obj_name, position):
obj = bpy.data.objects.get(obj_name)
if obj:
obj.location = position
return None
# MQTT Client Thread init
def run_mqtt_client():
global mqtt_running, mqtt_client
mqtt_client.loop_start()
while mqtt_running:
continue
mqtt_client.loop_stop()
mqtt_client.disconnect()
# Operator to start MQTT subscription
class StartMQTTSubOperator(bpy.types.Operator):
bl_idname = "mqtt.start_sub"
bl_label = "Start MQTT Subscriber"
def execute(self, context):
global mqtt_client, mqtt_running
if not mqtt_client:
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(broker_address, broker_port, 60)
if not mqtt_running:
mqtt_running = True
threading.Thread(target=run_mqtt_client).start()
return {'FINISHED!'}
class StopMQTTSubOperator(bpy.types.Operator):
bl_idname = "mqtt.stop_sub"
bl_label = "Stop MQTT Subscriber"
def execute(self, context):
global mqtt_running
mqtt_running = False
return {'FINISHED'}
class AddStepOperator(bpy.types.Operator): # for path, må fullføres for å animere path i blender under show motion
bl_idname = "mqtt.add_step"
bl_label = "Add Step"
def execute(self, context):
target = bpy.data.objects.get("Target")
if target:
path.append(target.location.copy())
print("Step added: ", target.location)
return {'FINISHED'}
class RemoveStepOperator(bpy.types.Operator):
bl_idname = "mqtt.remove_step"
bl_label = "Remove Step"
def execute(self, context):
if path:
removed = path.pop()
print("Step removed: ", removed)
return {'FINISHED'}
# denne må oppdateres. nå fryser blender viewporten når jeg "animerer" posisjonen for å simulere.
# trenger også å animere en path fra x1,y1,z1 til xN,yN,zN avhengi av hvor mange i punkter i path, HIGH PRIORITY
class ShowMotionOperator(bpy.types.Operator):
bl_idname = "mqtt.show_motion"
bl_label = "Show Motion"
def execute(self, context):
target = bpy.data.objects.get("Target")
if target and path:
for pos in path:
target.location = pos
time.sleep(0.5)
return {'FINISHED'}
class SendMotionOperator(bpy.types.Operator):
bl_idname = "mqtt.send_motion"
bl_label = "Send Motion"
def execute(self, context):
if path: # sends a path to the topic
message = ",".join(["{:.2f},{:.2f},{:.2f}".format(*p) for p in path]) # ChatGPT food. husket ikke syntax for formatering
mqtt_client.publish("cobot/input/targetposition", message)
print("Published path: ", message)
return {'FINISHED'}
class CobotSystemInterfacePanel(bpy.types.Panel):
bl_label = "Cobot System Interface"
bl_idname = "MQTT_PT_cobot_system_interface"
bl_space_type = 'VIEW_3D'
bl_region_type = 'UI'
bl_category = 'Cobot System Interface'
def draw(self, context):
layout = self.layout
# MQTT Section / Buttons
layout.label(text="MQTT")
layout.operator("mqtt.start_sub", text="Start MQTT Subscriber")
layout.operator("mqtt.stop_sub", text="Stop MQTT Subscriber")
# Operations Section / BUttons
layout.label(text="Operations")
layout.operator("mqtt.add_step", text="Add Step")
layout.operator("mqtt.remove_step", text="Remove Step")
layout.operator("mqtt.show_motion", text="Show Motion")
layout.operator("mqtt.send_motion", text="Send Motion")
def register(): # Blender shiT
bpy.utils.register_class(StartMQTTSubOperator)
bpy.utils.register_class(StopMQTTSubOperator)
bpy.utils.register_class(AddStepOperator)
bpy.utils.register_class(RemoveStepOperator)
bpy.utils.register_class(ShowMotionOperator)
bpy.utils.register_class(SendMotionOperator)
bpy.utils.register_class(CobotSystemInterfacePanel)
def unregister(): # MER Blender shiT
bpy.utils.unregister_class(StartMQTTSubOperator)
bpy.utils.unregister_class(StopMQTTSubOperator)
bpy.utils.unregister_class(AddStepOperator)
bpy.utils.unregister_class(RemoveStepOperator)
bpy.utils.unregister_class(ShowMotionOperator)
bpy.utils.unregister_class(SendMotionOperator)
bpy.utils.unregister_class(CobotSystemInterfacePanel)
if __name__ == "__main__":
register()
|