-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathapi_async.py
More file actions
154 lines (134 loc) · 5.37 KB
/
Copy pathapi_async.py
File metadata and controls
154 lines (134 loc) · 5.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# using asyncio for non blocking web server
import utime
from RequestParser import RequestParser
import uasyncio
from ResponseBuilder import ResponseBuilder
from WiFiConnection import WiFiConnection
from IoHandler import IoHandler
import random
# connect to WiFi
if not WiFiConnection.start_station_mode(True):
raise RuntimeError('network connection failed')
# coroutine to handle HTTP request
async def handle_request(reader, writer):
try:
# allow other tasks to run while waiting for data
raw_request = await reader.read(2048)
request = RequestParser(raw_request)
response_builder = ResponseBuilder()
# filter out api request
if request.url_match("/api"):
action = request.get_action()
if action == 'readData':
# ajax request for data
pot_value = IoHandler.get_pot_reading()
temp_value = IoHandler.get_temp_reading()
cled_states = {
'blue': IoHandler.get_blue_led(),
'yellow': IoHandler.get_yellow_led(),
'green': IoHandler.get_green_led()
}
response_obj = {
'status': 0,
'pot_value': pot_value,
'temp_value': temp_value,
'cled_states': cled_states,
'rgb_leds': IoHandler.rgb_led_colours
}
response_builder.set_body_from_dict(response_obj)
elif action == 'setLedColour':
# turn on requested coloured led
# returns json object with led states
led_colour = request.data()['colour']
status = 'OK'
cled_states = {
'blue': 0,
'yellow': 0,
'green': 0
}
if led_colour == 'blue':
cled_states['blue'] = 1
elif led_colour == 'yellow':
cled_states['yellow'] = 1
elif led_colour == 'green':
cled_states['green'] = 1
elif led_colour == 'off':
# leave leds off
pass
else:
status = 'Error'
IoHandler.set_coloured_leds([cled_states['blue'], cled_states['yellow'], cled_states['green']])
response_obj = {
'status': status,
'cled_states': cled_states
}
response_builder.set_body_from_dict(response_obj)
elif action == 'setRgbColour':
# set RGB colour of first 4 neopixels
# returns json object with rgb colour
rgb_red = int(request.data()['red'])
rgb_green = int(request.data()['green'])
rgb_blue = int(request.data()['blue'])
status = 'OK'
rgb_colours = {
'red': rgb_red,
'green': rgb_green,
'blue': rgb_blue
}
IoHandler.set_rgb_leds(rgb_red, rgb_green, rgb_blue)
response_obj = {
'status': status,
'rgb_colours': rgb_colours
}
response_builder.set_body_from_dict(response_obj)
else:
# unknown action
response_builder.set_status(404)
# try to serve static file
else:
response_builder.serve_static_file(request.url, "/api_index.html")
# build response message
response_builder.build_response()
# send reponse back to client
writer.write(response_builder.response)
# allow other tasks to run while data being sent
await writer.drain()
await writer.wait_closed()
except OSError as e:
print('connection error ' + str(e.errno) + " " + str(e))
# coroutine that will run as the neopixel update task
async def neopixels():
counter = 0
while True:
if counter % 1000 == 0:
new_colour = (random.randint(0, 1) * 128, random.randint(0, 1) * 128, random.randint(0, 1) * 128)
for pixel in range(7, 4, -1):
IoHandler.set_rgb_pixel(pixel, IoHandler.get_rgb_pixel(pixel - 1))
IoHandler.set_rgb_pixel(4, new_colour)
IoHandler.show_rgb_leds()
counter += 1
# 0 second pause to allow other tasks to run
await uasyncio.sleep(0)
# main coroutine to boot async tasks
async def main():
# start web server task
print('Setting up webserver...')
server = uasyncio.start_server(handle_request, "0.0.0.0", 80)
uasyncio.create_task(server)
# start top 4 neopixel updating task
uasyncio.create_task(neopixels())
# main task control loop pulses red led
counter = 0
while True:
if counter % 1000 == 0:
IoHandler.toggle_red_led()
counter += 1
# 0 second pause to allow other tasks to run
await uasyncio.sleep(0)
# start asyncio task and loop
try:
# start the main async tasks
uasyncio.run(main())
finally:
# reset and start a new event loop for the task scheduler
uasyncio.new_event_loop()