-
Notifications
You must be signed in to change notification settings - Fork 7
/
arduinoserialhal.py
executable file
·137 lines (122 loc) · 3.49 KB
/
arduinoserialhal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python
#
# This code is running on the host and implements the serial protocol
# used to control the display connected to the MCU.
#
# On the MCU side, the serial protocol is either implemented under Arduino
# or under MicroPython.
#
import serial
import time
class ArduinoSerialHAL:
"""
ArduinoSerialHAL is handles the serial protocol (API) used to control
the display connected to the MCU.
"""
def __init__(self, config):
self.port = config['port']
self.baudrate = config['baudrate']
self.tz_adjust = config['tzOffsetSeconds']
self.ser = None # initialized in reset()
self.reset()
def process_input(self):
"""
Process data coming from the MCU over the serial link, such as any
captured button presses by the firmware or log messages, and return
it as input data to the caller (the main game loop)
"""
if not self.ser.in_waiting:
return None
line = self.ser.readline()
return line
def reset(self):
"""
(Re-)open serial ports and resynchronize the protocol
"""
if self.ser:
print('SerialProtocol: closing serial link')
self.ser.close()
print('SerialProtocol: opening port {} @ {} baud'.format(self.port, self.baudrate))
self.ser = serial.Serial(self.port, baudrate=self.baudrate, rtscts=True, timeout=0.1, write_timeout=0.5)
self.resynchronize_protocol()
self.set_rtc(int(time.time()) + self.tz_adjust)
def resynchronize_protocol(self):
"""
Resynchronize the protocol by writing a string of zeroes.
"""
data = bytearray(10)
self.ser.write(data)
def safe_write(self, data):
"""
Write data to the serial link and attempt to handle write timeouts
"""
try:
self.ser.write(data)
return
except serial.SerialTimeoutException:
print('SerialProtocol: write timeout, attempting reset..')
print('WARN: Serial write timed out, attempting reset')
self.reset()
print('SerialProtocol: retrying send of {} bytes'.format(len(data)))
self.ser.write(data)
def init_display(self, num_pixels=256):
# Setup FastLED library
data = bytearray(3)
data[0] = ord('i')
data[1] = num_pixels & 0xff
data[2] = (num_pixels >> 8) & 0xff
self.safe_write(data)
def clear_display(self):
data = bytearray(2)
data[0] = ord('c')
self.safe_write(data)
def update_display(self, num_modified_pixels=None):
data = bytearray(2)
data[0] = ord('s')
self.safe_write(data)
def put_pixel(self, addr, r, g, b):
data = bytearray(6)
data[0] = ord('l')
data[1] = (addr >> 0) & 0xff
data[2] = (addr >> 8) & 0xff
data[3] = r
data[4] = g
data[5] = b
self.safe_write(data)
def set_rtc(self, t):
# Resynchronize RTC
data = bytearray(5)
data[0] = ord('@')
t = int(t)
data[1] = (t >> 0) & 0xff
data[2] = (t >> 8) & 0xff
data[3] = (t >> 16) & 0xff
data[4] = (t >> 24) & 0xff
self.safe_write(data)
def set_auto_time(self, enable=True):
# Enable or disable automatic rendering of current time
data = bytearray(2)
data[0] = ord('t')
data[1] = int(enable)
self.safe_write(data)
def suspend_host(self, restart_timeout_seconds):
data = bytearray(3)
data[0] = ord('S')
data[1] = (restart_timeout_seconds >> 0) & 0xff
data[2] = (restart_timeout_seconds >> 8) & 0xff
self.safe_write(data)
if __name__ == '__main__':
import os
import time
port = '/dev/tty.usbmodem575711'
if not os.path.exists(port):
port = '/dev/ttyACM0'
p = SerialProtocol(port, 115200)
p.init_display(256)
p.clear_display()
p.put_pixel(0, 8, 0, 0)
p.put_pixel(8, 0, 8, 0)
p.put_pixel(16, 0, 0, 8)
p.update_display()
time.sleep(1)
p.clear_display()