Non-blocking reading from stdin in Python
One of my projects relies heavily on multiprocessing. Some of its components are chained through the Linux pipes in order to process data. A common way to read from stdin is to use sys.stdin.readlines
; however, this approach has a flaw: readlines
is blocking. In fact, I need to be able to stop the individual components, which means that every process needs to check on regular basis for a multiprocessing event which tells that it needs to stop. With a blocking call, this wouldn't work.
A common way all over StackOverflow and the Internet to read from stdin in a non-blocking way consists of using select.select
. Let's create a script, one.py
, which uses it:
#!/usr/bin/python3
import multiprocessing
import select
exit_event = multiprocessing.Event()
with open(0) as f:
while not exit_event.is_set():
rlist, _, _ = select.select([f], [], [], 0.1)
if rlist:
line = f.readline().strip()
if line:
print(f"Received {line}.")
Unfortunately, while it looks like it works perfectly well, sometimes it doesn't. The trick is that select.select
tells that something is available in stdin, but says nothing about what is actually available. It belongs to the caller to read the contents through f.readline()
, but this only reads one line. In other words, if two lines are available at the same time, if rlist
will be evaluated to true
only once, f.readline()
will read one line, and the second one will remain in stdin
.
There is an easy way to illustrate this. One simply needs to add multiple lines faster than the loop step. Doing multiple echo
statements at the same time usually does the trick. In order to test it yourself, run the previously created one.py
like this:
echo "" > buffer; tail -f buffer | ./one.py
In another window, execute first echo "1" >> buffer
. The window running the Python script should display: “Received 1.” Now execute echo "2" >> buffer; echo "3" >> buffer
. This time, Python script displays only: “Received 2.” There is no mention of the third line. If, later on, you run echo "4" >> buffer
, you'll see “Received 3.” followed by “Received 4.” This means that the third line is not lost: instead, the script is not informed about the line when it is available, but only when and if an additional line is appended later on.
This could be very problematic when working with real-time data. In my case, as the scripts were processing both the data coming from the sensors and the events acting on those sensors, it wasn't exactly a good idea to process the events with random delays: I needed them to be processed immediately, or at least in a matter of milliseconds.
There is a different approach which doesn't have this issue. Let's create two.py
:
#!/usr/bin/python3
import fcntl
import multiprocessing
import os
import selectors
import sys
def read_line(f):
line = f.read().strip()
if line:
print(f"Received {line}.")
exit_event = multiprocessing.Event()
orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)
selector = selectors.DefaultSelector()
selector.register(sys.stdin, selectors.EVENT_READ, read_line)
while not exit_event.is_set():
for k, mask in selector.select(0.1):
k.data(k.fileobj)
If you redo the test, this time the messages are shown with no delay. This would be a perfect solution for scenarios where stdin should be read in parallel, but there is one caveat: it doesn't work when running from a multiprocessing process. To demonstrate the issue, let's put the code above into a function def read_file(exit_event)
, and call it like this:
p = multiprocessing.Process(target=read_file, args=(exit_event, print))
p.start()
time.sleep(2)
The code will raise the following issue:
Process Process-1:
Traceback (most recent call last):
[...]
File "./demo.py", line 22, in read_file
selector.register(file, selectors.EVENT_READ, read_line)
File "/usr/lib/python3.6/selectors.py", line 412, in register
self._epoll.register(key.fd, epoll_events)
PermissionError: [Errno 1] Operation not permitted
In my case, I don't want to have this constraint. It appears, then, that there is a simpler alternative. The fact that f.readline()
is blocking should not prevent the process from stopping, as multiprocessing has a process.terminate()
method. The method is destructive by its nature; that is, if the process is in a middle of something, it doesn't get a chance to finish. However, in the case where the process is simply waiting for the bytes in a stream, terminating the process is perfectly acceptable.
My approach is therefore to launch a subprocess from a process, and to have a flag indicating whether the subprocess is doing something interesting, or just waiting for the stream. In the first case, the subprocess is kindly asked to finish what it is doing. In the second case, there is no negotiation with the subprocess, as it is terminated immediately by the parent process.
The parent process doesn't do anything interesting: it contains an infinite loop, conditioned by the multiprocessing exit event. It is only when it receives the event that it starts to do useful stuff, that is, checks for the flag from the subprocess and decides how to stop it.
p = ... # Initializing the subprocess.
while not exit_event.is_set():
time.sleep(0.01)
if can_terminate.is_set():
p.terminate()
else:
p.join(2)
if p.is_alive():
p.terminate()
if p.exitcode:
...
The subprocess has also a loop, conditioned too by the multiprocessing exit event. Within this loop, it waits for a line from a file (in a blocking way), and once it gets it, it processes it.
def _read_file_internal(file, exit_event, action, can_terminate):
with open(file) as f:
while not exit_event.is_set():
can_terminate.set()
line = f.readline().strip()
can_terminate.clear()
if line and not exit_event.is_set():
action(line)
Although the code is relatively easy to reconstruct from this article, it can also be used through the functions read_file
and read_file_into_queue
that I added to my library, multiwatch. The second function is particularly interesting: a common scenario (at least in my case) is to read stuff from stdin, transform it somehow, and put the result in a queue. With this function, this becomes a one-liner. Here's a more elaborate example, where a dedicated process is created, and stdin is being read from this process:
import multiprocessing
import multiwatch
exit_event = multiprocessing.Event()
queue = multiprocessing.Queue()
p = multiprocessing.Process(
target=multiwatch.read_file_into_queue,
args=(0, exit_event, queue)
)
p.start()
This is all it gets to receive the lines from stdin into the queue
.