Handling data between distinct tasks in Python threads proves invaluable. Python threads leverage a shared memory space, facilitating straightforward information exchange. Nonetheless, employing specific structures can enhance the precision of achieving particular objectives. Understanding how to efficiently share data between threads in Python is a foundational skill that complements advanced concepts like those explored in our setup.py example guide, where we delve into the intricacies of preparing Python projects for distribution and execution.
The preceding article explored the initiation and synchronization of threads. Now, the focus shifts to expanding the toolkit for proficiently managing information exchange between these threads.
Utilizing Shared Memory in Multithreading
The initial and simplest approach involves employing the same variables across different threads. Although we’ve utilized this feature in the previous tutorial without explicit discussion, let’s delve into how shared memory can be implemented through a straightforward example:
from threading import Thread, Event
from time import sleep
event = Event()
def modify_variable(var):
while True:
for i in range(len(var)):
var[i] += 1
if event.is_set():
break
sleep(.5)
print('Stop printing')
my_var = [1, 2, 3]
t = Thread(target=modify_variable, args=(my_var, ))
t.start()
while True:
try:
print(my_var)
sleep(1)
except KeyboardInterrupt:
event.set()
break
t.join()
print(my_var)
The provided example, while straightforward, incorporates a critical functionality: the initiation of a new thread with an argument, my_var, which comprises a list of numbers. This thread augments each number’s value by one, introducing a delay. The example employs events to terminate the thread smoothly; further details on this can be found in a preceding tutorial.
A key element in the code is the print(my_var) statement. This line of output exists in the main thread yet accesses data from a child thread. Such interaction is enabled through the shared memory between threads. While access to a common memory space is advantageous, it may also introduce risks. In the given scenario, a single thread is created, but the process is not restricted to this count—multiple threads could be initiated similarly.
t = Thread(target=modify_variable, args=(my_var, ))
t2 = Thread(target=modify_variable, args=(my_var, ))
t.start()
t2.start()
Observing the execution, you’ll notice that my_var and its information are shared across all threads. This setup is beneficial for applications like the one mentioned above, where it’s inconsequential which thread increments the variable. Or is it? Let’s make a slight modification to the code running in the thread by removing the sleep:
def modify_variable(var):
while True:
for i in range(len(var)):
var[i] += 1
if event.is_set():
break
# sleep(.5)
print('Stop printing')
In the upcoming code execution, there won’t be any sleep between one iteration and the next. Let’s run it for a brief period, perhaps 5 seconds. To achieve this, execute the following:
from time import time
[...]
my_var = [1, 2, 3]
t = Thread(target=modify_variable, args=(my_var, ))
t.start()
t0 = time()
while time()-t0 < 5:
print(my_var)
sleep(1)
event.set()
t.join()
print(my_var)
The repetitive sections of the code have been omitted. Running this code will yield significantly large numbers as outputs. In the provided instance, the obtained results were:
[6563461, 6563462, 6563463]
However, it’s crucial to note a significant feature. The three numbers are consecutive, which aligns with expectations since the initial variable was [1, 2, 3], and one is added to each variable. Commence a second thread this time and observe the resulting output:
my_var = [1, 2, 3]
t = Thread(target=modify_variable, args=(my_var, ))
t2 = Thread(target=modify_variable, args=(my_var, ))
t.start()
t2.start()
t0 = time()
while time()-t0 < 5:
try:
print(my_var)
sleep(1)
except KeyboardInterrupt:
event.set()
break
event.set()
t.join()
t2.join()
print(my_var)
The output consists of the following values:
[5738447, 5686971, 5684220]
It can be observed that the operation speed does not increase with the addition of a second thread, suggesting that multi-threading may slow down this particular operation. Additionally, the sequence of values being non-consecutive is an important phenomenon that may arise when multiple threads are utilized in Python. An explanation for this behavior requires a deeper understanding of thread management.
It was previously explained that the operating system manages thread scheduling, which is beyond the user’s control. In the given code, without any imposed delay within the loop, the operating system must determine the switching points between threads. Nonetheless, this alone does not fully account for the observed output, as the intention is to increment each element by one.
The underlying issue lies within the line var[i] += 1, which encompasses two distinct actions. Initially, the current value at var[i] is retrieved and increased by one. Subsequently, the new value is saved back to var[i]. During this two-step process, the operating system may switch tasks, resulting in both threads reading the same value from the list. Consequently, rather than incrementing by two, the list is increased only once. To make this issue more evident, initiating two threads with opposing operations—one adding and the other subtracting from a list—could illustrate which thread executes more rapidly. In such a test, the output may appear as follows:
[-8832, -168606, 2567]
If the code is executed again, the obtained result is:
[97998, 133432, 186591]
Note: A delay between the start of both threads may be observed, potentially giving a certain advantage to the first thread initiated. However, this factor alone cannot account for the generated output.
Implementing Data Access Synchronization
To address the issue identified in the preceding examples, it is crucial to ensure that no two threads attempt to write to the same variable simultaneously. To achieve this, a Lock can be employed:
from threading import Lock
[...]
data_lock = Lock()
def modify_variable(var):
while True:
for i in range(len(var)):
with data_lock:
var[i] += 1
if event.is_set():
break
# sleep(.5)
print('Stop printing')
Observe the inclusion of a line featuring data_lock: within the function. Executing the code with this modification ensures that the resulting values are sequential. The lock serves as a mechanism to ensure exclusive access to the variable by only one thread at a time.
The basic examples provided, which involve incrementing or decrementing values in a list, serve as an introduction to the complexities associated with memory management in concurrent programming. While shared memory is a beneficial attribute, it introduces certain risks that must be managed.
Implementing Queues in Multithreading
- Threads are often employed for handling tasks that are inherently slow and resist optimization efforts;
- Consider the scenario of downloading content from a website, an operation where the processor is frequently waiting, thus idle;
- This downtime presents an opportunity to engage in additional activities, such as simultaneously downloading multiple web pages—a common technique in web scraping;
- Without a proper strategy, however, one might inadvertently download the same page multiple times, as discussed in earlier sections.
To address this, queues present a highly effective tool for thread management. A queue operates by storing data elements sequentially, adhering to a First-in-first-out (FIFO) principle. Data is enqueued, or added, one piece at a time and then dequeued, or removed, maintaining the original order of insertion. An illustration of queue usage might look like the following:
from queue import Queue
queue = Queue()
for i in range(20):
queue.put(i)
while not queue.empty():
data = queue.get()
print(data)
In the provided scenario, a Queue object is initialized, and subsequently, numbers 0 through 19 are enqueued. Following this setup, a while loop is introduced to dequeue and display each number. This procedure exemplifies the fundamental functionality of queues in Python, where the sequence of output adheres to the order of entry.
Returning to the initial examples discussed, queues can be utilized to facilitate data sharing across multiple threads. By adjusting the function to accept a queue instead of a list for its input parameter, elements can be retrieved from this queue. The processed results can then be placed into an output queue for further handling.
from threading import Thread, Event
from queue import Queue
from time import sleep, time
event = Event()
def modify_variable(queue_in, queue_out):
while True:
if not queue_in.empty():
var = queue_in.get()
for i in range(len(var)):
var[i] += 1
queue_out.put(var)
if event.is_set():
break
print('Stop printing')
To implement the code above, two queues need to be created. The concept involves setting up two threads, with the input and output queues reversed. In this scenario, one thread places its output on the second thread’s queue, and vice versa. The structure would resemble the following:
my_var = [1, 2, 3]
queue1 = Queue()
queue2 = Queue()
queue1.put(my_var)
t = Thread(target=modify_variable, args=(queue1, queue2))
t2 = Thread(target=modify_variable, args=(queue2, queue1))
t.start()
t2.start()
t0 = time()
while time()-t0 < 5:
try:
sleep(1)
except KeyboardInterrupt:
event.set()
break
event.set()
t.join()
t2.join()
if not queue1.empty():
print(queue1.get())
if not queue2.empty():
print(queue2.get())
In this case, the output obtained is:
[871, 872, 873]
Much smaller than everything else seen so far, yet data has been successfully shared between two distinct threads without conflicts. The question arises: where does this slowdown originate? Let’s adopt the scientific approach, which involves breaking down the problem and scrutinizing each component. One intriguing aspect is the check for an empty queue before executing the remaining code. It’s valuable to monitor the actual time spent running the crucial part of our program:
def modify_variable(queue_in: Queue, queue_out: Queue):
def modify_variable(queue_in: Queue, queue_out: Queue):
internal_t = 0
while True:
if not queue_in.empty():
t0 = time()
var = queue_in.get()
for i in range(len(var)):
var[i] += 1
queue_out.put(var)
internal_t += time()-t0
if event.is_set():
break
sleep(0.1)
print(f'Running time: {internal_t} seconds\n')
The only alterations involve adding a new variable to the function, named internal_t. Subsequently, we track the time spent on calculations and placing the data into the new thread. Running the code again should yield an output similar to:
Running time: 0.0006377696990966797 seconds
Running time: 0.0003573894500732422 seconds
This indicates that out of the 5 seconds during which our program runs, we are actively engaged in tasks for approximately 0.9 milliseconds. This equates to 0.01% of the total time! Let’s swiftly examine the impact of altering the code to use only one queue instead of two, where the input and output queues are the same:
t = Thread(target=modify_variable, args=(queue1, queue1))
t2 = Thread(target=modify_variable, args=(queue1, queue1))
The simple alteration produced the following results:
Running time: 4.290639877319336 seconds
Running time: 4.355865955352783 seconds
The improvement is evident. During the roughly 5 seconds of program execution, the threads are active for a cumulative duration of 8 seconds, aligning with expectations for parallel processing. Additionally, the quantity of output has significantly increased:
[710779, 710780, 710781]
The question arises: why does the program operate slowly with two separate queues, but performs efficiently when both input and output share a single queue? It is crucial to acknowledge that indiscriminate thread usage, as in the earlier example, leaves thread management entirely to the discretion of the operating system.
Without the ability to influence the operating system’s scheduling decisions, a situation may arise where a thread is simply idling, awaiting items to process from an empty queue. This can result in the operating system allocating time to an inactive task, leading to inefficient synchronization and prolonged periods of inactivity. Conversely, utilizing a single queue for both input and output ensures there is always data to process, regardless of which thread is active.
To verify this hypothesis, one could measure the actual idle time within the program. By implementing an else clause to accompany the queue.empty() check, it is possible to accumulate and assess the duration when the program is not performing any productive tasks.
def modify_variable(queue_in: Queue, queue_out: Queue):
internal_t = 0
sleeping_t = 0
while True:
if not queue_in.empty():
t0 = time()
var = queue_in.get()
for i in range(len(var)):
var[i] += 1
queue_out.put(var)
internal_t += time()-t0
else:
t0 = time()
sleep(0.001)
sleeping_t += time()-t0
if event.is_set():
break
sleep(0.1)
print(f'Running time: {internal_t} seconds')
print(f'Sleeping time: {sleeping_t} seconds')
In the discussed code, when encountering an empty queue, the program is designed to pause for 1 millisecond. Although this is not an ideal solution, it is presumed that such a brief delay will not significantly affect the overall program performance. However, the output from running the code with two separate queues indicates otherwise:
Running time: 0.0 seconds
Sleeping time: 5.001126289367676 seconds
Running time: 0.00018215179443359375 seconds
Sleeping time: 5.001835107803345 seconds
[4126, 4127, 4128]
The data reveals that the program spends most of its time idle, waiting for the queue to be populated with data. The 1 millisecond pause, implemented when the queue is empty, contributes to a considerable slowdown. This outcome serves as an instructive example. In contrast, when a single queue is used for both input and output, the results are markedly different:
Running time: 3.1206254959106445 seconds
Sleeping time: 1.3756272792816162 seconds
Running time: 3.253162145614624 seconds
Sleeping time: 1.136244535446167 seconds
Now, it becomes apparent that despite the time consumed due to sleep, a significant portion of our routine is dedicated to actual calculations.
An important consideration when utilizing the same queue for both input and output is the potential occurrence of the other thread retrieving the result between checking if the queue is empty and the actual reading from it. This scenario is outlined in the Queue documentation. Unless a Lock is explicitly included, the Queue is susceptible to being read and written by any threads. The Lock only becomes effective for the get or put commands.
Additional Features of Queue Objects
- Queue objects offer several advanced options, including setting a limit on the number of items they can contain. Another variant is the LIFO Queue, which operates on a last-in, first-out basis. The details and usage of these options are well-documented;
- One of the advantageous aspects of Queue objects is that they are implemented in pure Python. By examining their source code, one can gain valuable insights into thread synchronization, the handling of custom exceptions, and best practices for documentation.
It is crucial to recognize that when managing multiple Threads, there may be instances where it’s necessary to either pause (block) the execution or continue without delay. In the scenarios previously discussed, the existence of items in the Queue was verified before attempting to retrieve them. But what occurs if this check is omitted? The get method of a Queue has two parameters of interest: block and timeout. The block parameter determines whether the program should wait for an item to become available, whereas timeout sets a limit on how long to wait before raising an exception. If block is set to false and the Queue is empty, an exception will be thrown immediately.
This functionality can be integrated into the modify_variable function to demonstrate its practical application:
def modify_variable(queue_in: Queue, queue_out: Queue):
internal_t = 0
while True:
t0 = time()
var = queue_in.get()
for i in range(len(var)):
var[i] += 1
queue_out.put(var)
internal_t += time()-t0
if event.is_set():
break
sleep(0.1)
print(f'Running time: {internal_t} seconds\n')
With this code, utilizing distinct queues for input and output, the following output is obtained:
Running time: 4.914130210876465 seconds
Running time: 4.937211513519287 seconds
[179992, 179993, 179994]
The updated approach yields considerably improved results compared to the initial outcomes. However, it’s worth noting that much of the elapsed time is attributed to the get function’s wait period, which is also included in the time tracking. To obtain a more accurate measurement of the active running time, the timestamp t0 = time() can be initiated immediately after the get method call:
Running time: 0.7706246376037598 seconds
Running time: 0.763786792755127 seconds
[177807, 177808, 177809]
This adjustment reveals that the actual computation time is significantly shorter. This insight suggests that perhaps the timing should have been measured differently in earlier examples as well, especially when using a single queue for both input and output.
To prevent the program from blocking on a get call, one can handle it as follows:
from queue import Empty
# ...
try:
var = queue_in.get(block=False)
except Empty:
continue
Alternatively, a timeout can be specified:
try:
var = queue_in.get(block=True, timeout=0.001)
except Empty:
continue
With these settings, the program either does not wait at all (block==False) and catches the exception, or it waits for a maximum of 1 millisecond (timeout=0.001) before catching the exception. Experimenting with these parameters can provide insights into their impact on code performance.
Controlling Thread Termination with Queues
Until now, the method of using locks to halt threads has been adopted, which offers an elegant solution. However, an alternative approach involves using queues to manage thread execution. This method entails appending a special marker, like None, to a queue, which signals a thread to cease its operation upon encountering this marker. The implementation in code would appear as follows:
var = queue_in.get()
if var is None:
break
To initiate the termination of threads in the main script, the special marker is inserted into the queues:
queue1.put(None)
queue2.put(None)
The choice between using locks or queue markers depends on the specific requirements of the application. In scenarios where queues handle individual elements and it’s important to ensure all items are processed before stopping, inserting a special value in the queue is effective. This approach is particularly relevant in situations such as downloading data from a website or processing images, where each element is processed independently.
Warning: It’s crucial to ensure that a queue is completely emptied before ceasing its use. If a thread is interrupted through a lock status check, the queue might still contain unprocessed data, leading to memory not being released. This can be resolved with a simple while-loop to extract all remaining elements from the queue.
IO-Bound Threads in Multithreading Applications
The examples provided in this article are focused on computational tasks, pushing the boundaries of where multi-threading may not be the most effective, and highlighting issues like concurrency. Emphasizing these limits allows for a deeper understanding of multi-threading’s intricacies, fostering confident and cautious programming practices, especially in scenarios prone to potential problems.
Multi-threading, however, shows its strength particularly in managing IO (input-output) tasks. In scenarios where a program needs to perform disk writes while simultaneously executing other operations, delegating the IO task to a separate thread can enhance efficiency. This principle applies equally to situations where a program is awaiting user input, network resource availability, or engaging in tasks like downloading data from the internet. In such cases, multi-threading can significantly improve overall program performance by allowing simultaneous execution of IO operations and other processes.
Implementing a Multithreaded Website Downloader
This article culminates with a demonstration of a multithreaded application for downloading websites, utilizing threadings, queues, and locks. While there’s room for performance enhancement, the example lays out the fundamental components of many threading applications.
The objective is to download a list of websites and save the content to disk. Initially, one might consider a simple for-loop for iterating over the website list. However, the focus here is on employing multiple threads for increased efficiency.
The proposed architecture includes: a queue for the URLs to be downloaded, another queue for holding the downloaded data, and threads designated for downloading and saving. The following modules are used in this example:
import os
from queue import Queue
from threading import Lock, Thread
from urllib import request
The queues and lock are set up as follows:
website_queue = Queue()
data_queue = Queue()
file_lock = Lock()
The function for downloading data is defined to check for a special element, indicating when to cease operations:
def download_data():
while True:
var = website_queue.get()
if var is None:
break
response = request.urlopen(var)
data = response.read()
data_queue.put(data)
In this implementation, the approach involves checking if the queue contains a designated element, ensuring that all websites in the queue are processed before terminating the thread. Data from the website is downloaded and subsequently placed on another queue for subsequent processing.
The saving process demands additional caution to ensure that two threads do not attempt to write to the same file:
def save_data():
while True:
var = data_queue.get()
if var is None:
break
with file_lock:
i = 0
while os.path.exists(f'website_data_{i}.dat'):
i += 1
open(f'website_data_{i}.dat', 'w').close()
with open(f'website_data_{i}.dat', 'wb') as f:
f.write(var)
This method mirrors the data downloading process. It involves pausing until a specific marker indicates the thread should cease. A lock is then utilized to ensure exclusive file access, preventing multiple threads from simultaneously writing to the same file. The loop within the function serves to identify an available file name.
A lock is crucial in this context to avoid a scenario where concurrent threads might identify the same file as available for writing. However, once a unique file is secured for writing, the concern about concurrent access diminishes. Therefore, the file is initially created while the lock is active, ensuring that each thread is associated with a distinct file, eliminating the risk of overlap in file writing.
open(f'website_data_{i}.dat', 'w').close()
However, the data is written on separate lines without utilizing the lock:
with open(f'website_data_{i}.dat', 'wb') as f:
f.write(var)
This approach might seem overly complex for our objectives, and that is accurate. However, it illustrates a potential method where multiple threads could concurrently write to the hard drive because they are writing to different files. Note the use of ‘wb’ for file opening, where ‘w’ signifies writing to the file (not appending), and ‘b’ denotes that the result of reading the response is binary and not a string. Subsequently, the threads designated for downloading and saving data need to be triggered. To begin, a list of websites slated for download is created. In this instance, Wikipedia homepages in various languages:
website_list = [
'https://www.wikipedia.org/',
'https://nl.wikipedia.org/',
'https://de.wikipedia.org/',
'https://fr.wikipedia.org/',
'https://pt.wikipedia.org/',
'https://it.wikipedia.org',
'https://ru.wikipedia.org',
'https://es.wikipedia.org',
'https://en.wikipedia.org',
'https://ja.wikipedia.org',
'https://zh.wikipedia.org',
]
Subsequently, the queues are prepared, and the threads are initiated:
for ws in website_list:
website_queue.put(ws)
threads_download = []
threads_save = []
for i in range(3):
t = Thread(target=download_data)
t.start()
threads_download.append(t)
t2 = Thread(target=save_data)
t2.start()
threads_save.append(t2)
This involves creating lists of running threads for saving and downloading. Naturally, the numbers could have been different. Following this, it’s crucial to ensure the downloading threads are stopped:
for i in range(3):
website_queue.put(None)
Running three threads for downloading data necessitates appending three ‘None’ to the Queue to ensure all threads stop. Once we are certain that the downloading process is completed, we can proceed to stop the saving:
for t in threads_download:
t.join()
for i in range(3):
data_queue.put(None)
Subsequently, we await the completion of the saving process:
for t in threads_save:
t.join()
print(f'Finished downloading {len(website_list)} websites')
Now, it is confirmed that all threads have completed, and the queues are empty. Running the program will reveal a list of 10 files, each containing the HTML of 10 distinct Wikipedia homepages.
Conclusions
In the preceding article, the utilization of threading to concurrently execute distinct functions and the essential tools for managing the flow of multiple threads were explored. This article delves into the techniques for sharing data between threads, capitalizing on shared memory, and employing queues. While shared memory expedites program development, challenges arise when different threads interact with the same elements for reading or writing. This concern was addressed early in the article, examining the consequences of using a simple operator like =+ to increment array values by 1. Subsequently, the use of queues for data sharing between threads, both between the main thread and child threads and among child threads, was explored.
To conclude, a straightforward example of utilizing threads for downloading data from a website and saving it to disk was presented. Although basic, this example sets the stage for expansion in subsequent articles. Future articles will explore other Input-Output (IO) tasks, including acquiring data from devices like cameras, awaiting user input, reading from disk, and more.