-
Notifications
You must be signed in to change notification settings - Fork 10
/
ota.py
128 lines (100 loc) · 4.77 KB
/
ota.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import network
import urequests
import os
import json
import machine
from time import sleep
class OTAUpdater:
""" This class handles OTA updates. It connects to the Wi-Fi, checks for updates, downloads and installs them."""
def __init__(self, ssid, password, repo_url, filename):
self.filename = filename
self.ssid = ssid
self.password = password
self.repo_url = repo_url
if "www.github.com" in self.repo_url :
print(f"Updating {repo_url} to raw.githubusercontent")
self.repo_url = self.repo_url.replace("www.github","raw.githubusercontent")
elif "github.com" in self.repo_url:
print(f"Updating {repo_url} to raw.githubusercontent'")
self.repo_url = self.repo_url.replace("github","raw.githubusercontent")
self.version_url = self.repo_url + 'main/version.json'
print(f"version url is: {self.version_url}")
self.firmware_url = self.repo_url + 'main/' + filename
# get the current version (stored in version.json)
if 'version.json' in os.listdir():
with open('version.json') as f:
self.current_version = int(json.load(f)['version'])
print(f"Current device firmware version is '{self.current_version}'")
else:
self.current_version = 0
# save the current version
with open('version.json', 'w') as f:
json.dump({'version': self.current_version}, f)
def connect_wifi(self):
""" Connect to Wi-Fi."""
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.connect(self.ssid, self.password)
while not sta_if.isconnected():
print('.', end="")
sleep(0.25)
print(f'Connected to WiFi, IP is: {sta_if.ifconfig()[0]}')
def fetch_latest_code(self)->bool:
""" Fetch the latest code from the repo, returns False if not found."""
# Fetch the latest code from the repo.
response = urequests.get(self.firmware_url)
if response.status_code == 200:
print(f'Fetched latest firmware code, status: {response.status_code}, - {response.text}')
# Save the fetched code to memory
self.latest_code = response.text
return True
elif response.status_code == 404:
print(f'Firmware not found - {self.firmware_url}.')
return False
def update_no_reset(self):
""" Update the code without resetting the device."""
# Save the fetched code and update the version file to latest version.
with open('latest_code.py', 'w') as f:
f.write(self.latest_code)
# update the version in memory
self.current_version = self.latest_version
# save the current version
with open('version.json', 'w') as f:
json.dump({'version': self.current_version}, f)
# free up some memory
self.latest_code = None
# Overwrite the old code.
# os.rename('latest_code.py', self.filename)
def update_and_reset(self):
""" Update the code and reset the device."""
print(f"Updating device... (Renaming latest_code.py to {self.filename})", end="")
# Overwrite the old code.
os.rename('latest_code.py', self.filename)
# Restart the device to run the new code.
print('Restarting device...')
machine.reset() # Reset the device to run the new code.
def check_for_updates(self):
""" Check if updates are available."""
# Connect to Wi-Fi
self.connect_wifi()
print(f'Checking for latest version... on {self.version_url}')
response = urequests.get(self.version_url)
data = json.loads(response.text)
print(f"data is: {data}, url is: {self.version_url}")
print(f"data version is: {data['version']}")
# Turn list to dict using dictionary comprehension
# my_dict = {data[i]: data[i + 1] for i in range(0, len(data), 2)}
self.latest_version = int(data['version'])
print(f'latest version is: {self.latest_version}')
# compare versions
newer_version_available = True if self.current_version < self.latest_version else False
print(f'Newer version available: {newer_version_available}')
return newer_version_available
def download_and_install_update_if_available(self):
""" Check for updates, download and install them."""
if self.check_for_updates():
if self.fetch_latest_code():
self.update_no_reset()
self.update_and_reset()
else:
print('No new updates available.')