-
-
Notifications
You must be signed in to change notification settings - Fork 44
/
send-receive.py
132 lines (99 loc) · 4.22 KB
/
send-receive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
You can use MAVLink for communicating between processes.
You don't even need an autopilot or a proxy server (e.g. MAVProxy) for that.
One process should act as a "MASTER" and another one is "SLAVE".
"MASTER" process should create a MAVLink connection with "udpout or tcpin".
"SLAVE" process should create a MAVLink connection with "udpin or tcp".
The communication is bidirectional.
You can also use serial devices for interprocess communication, too.
Master: python3 send-receive.py master
Slave: python3 send-receive.py slave
https://mavlink.io/en/messages/common.html#HEARTBEAT
https://ardupilot.org/mavproxy/docs/getting_started/starting.html#master
https://ardupilot.org/mavproxy/docs/getting_started/starting.html#out
"""
import sys
import time
import threading
import pymavlink.mavutil as utility
import pymavlink.dialects.v20.all as dialect
def send_heartbeat(master):
while True:
heartbeat = dialect.MAVLink_heartbeat_message(type=dialect.MAV_TYPE_ONBOARD_CONTROLLER,
autopilot=dialect.MAV_AUTOPILOT_INVALID,
base_mode=0,
custom_mode=0,
system_status=dialect.MAV_STATE_ACTIVE,
mavlink_version=3)
master.mav.send(heartbeat)
time.sleep(1)
# helper variables
MASTER = "master"
SLAVE = "slave"
# get the argument
argument = sys.argv[1]
# check the argument is master
if argument == MASTER:
print("This script is running as master")
# create master MAVLink connection
master = utility.mavlink_connection(device="udpout:127.0.0.1:6000",
source_system=1,
source_component=1)
# start heartbeat thread
threading.Thread(target=send_heartbeat, args=(master,)).start()
# infinite loop
while True:
# receive a message
message = master.recv_match(type=dialect.MAVLink_named_value_float_message.msgname,
blocking=True)
# convert this message to dictionary
message = message.to_dict()
# show the message
print("MASTER", message)
# check the message
if message["name"] == "REQSR":
# calculate the result
result = message["value"] ** 0.5
# create the message
message = dialect.MAVLink_named_value_float_message(time_boot_ms=0,
name="REPSR".encode("utf-8"),
value=result)
# send the message
master.mav.send(message)
# check the argument is slave
elif argument == SLAVE:
print("This script is running as slave")
# create slave MAVLink connection
slave = utility.mavlink_connection(device="udpin:127.0.0.1:6000",
source_system=1,
source_component=2,
force_connected=True)
# wait heartbeat
slave.wait_heartbeat()
# infinite loop
i = 1
while True:
# create the message
message = dialect.MAVLink_named_value_float_message(time_boot_ms=0,
name="REQSR".encode("utf-8"),
value=i)
# send the message
slave.mav.send(message)
# receive a message
message = slave.recv_match(type=dialect.MAVLink_named_value_float_message.msgname,
blocking=True)
# convert this message to dictionary
message = message.to_dict()
# show the message
print("SLAVE", message)
# check the message
if message["name"] == "REPSR":
# show the response
print("Squared root of", i, "is", message["value"])
# increment the counter
i += 1
# sleep a bit
time.sleep(3)
# raise exception on others
else:
raise NotImplementedError