-
Notifications
You must be signed in to change notification settings - Fork 1
/
concurrent_queue.py
72 lines (58 loc) · 2.41 KB
/
concurrent_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# The example from the Python docs, expanded to take its work from a queue. A
# change to note, is that this code uses concurrent.futures.wait instead of
# concurrent.futures.as_completed to allow new work to be started while waiting
# for other work to complete.
#
# How would I go about using concurrent.futures and queues for a real-time scenario?
# https://stackoverflow.com/a/41654240/111424
# type: ignore
import concurrent.futures
import urllib.request
import time
import queue
q = queue.Queue()
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def feed_the_workers(spacing):
""" Simulate outside actors sending in work to do, request each url twice """
for url in URLS + URLS:
time.sleep(spacing)
q.put(url)
return "DONE FEEDING"
def load_url(url, timeout):
""" Retrieve a single page and report the URL and contents """
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# start a future for a thread which sends work in through the queue
future_to_url = {
executor.submit(feed_the_workers, 0.25): 'FEEDER DONE'}
while future_to_url:
# check for status of the futures which are currently working
done, not_done = concurrent.futures.wait(
future_to_url, timeout=0.25,
return_when=concurrent.futures.FIRST_COMPLETED)
# if there is incoming work, start a new future
while not q.empty():
# fetch a url from the queue
url = q.get()
# Start the load operation and mark the future with its URL
future_to_url[executor.submit(load_url, url, 60)] = url
# process any completed futures
for future in done:
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
if url == 'FEEDER DONE':
print(data)
else:
print('%r page is %d bytes' % (url, len(data)))
# remove the now completed future
del future_to_url[future]