Non-blocking reading from stdin in Python

Arseni Mourzenko
Founder and lead developer
170
articles
July 20, 2021
Tags: python 4

One of my pro­jects re­lies heav­i­ly on mul­ti­pro­cess­ing. Some of its com­po­nents are chained through the Lin­ux pipes in or­der to process data. A com­mon way to read from stdin is to use sys.stdin.readlines; how­ev­er, this ap­proach has a flaw: readlines is block­ing. In fact, I need to be able to stop the in­di­vid­ual com­po­nents, which means that every process needs to check on reg­u­lar ba­sis for a mul­ti­pro­cess­ing event which tells that it needs to stop. With a block­ing call, this wouldn't work.

A com­mon way all over Stack­Over­flow and the In­ter­net to read from stdin in a non-block­ing way con­sists of us­ing select.select. Let's cre­ate a script, one.py, which uses it:

#!/usr/bin/python3

import multiprocessing
import select

exit_event = multiprocessing.Event()
with open(0) as f:
    while not exit_event.is_set():
        rlist, _, _ = select.select([f], [], [], 0.1)
        if rlist:
            line = f.readline().strip()
            if line:
                print(f"Received {line}.")

Un­for­tu­nate­ly, while it looks like it works per­fect­ly well, some­times it doesn't. The trick is that select.select tells that some­thing is avail­able in stdin, but says noth­ing about what is ac­tu­al­ly avail­able. It be­longs to the caller to read the con­tents through f.readline(), but this only reads one line. In oth­er words, if two lines are avail­able at the same time, if rlist will be eval­u­at­ed to true only once, f.readline() will read one line, and the sec­ond one will re­main in stdin.

There is an easy way to il­lus­trate this. One sim­ply needs to add mul­ti­ple lines faster than the loop step. Do­ing mul­ti­ple echo state­ments at the same time usu­al­ly does the trick. In or­der to test it your­self, run the pre­vi­ous­ly cre­at­ed one.py like this:

echo "" > buffer; tail -f buffer | ./one.py

In an­oth­er win­dow, ex­e­cute first echo "1" >> buffer. The win­dow run­ning the Python script should dis­play: “Re­ceived 1.” Now ex­e­cute echo "2" >> buffer; echo "3" >> buffer. This time, Python script dis­plays only: “Re­ceived 2.” There is no men­tion of the third line. If, lat­er on, you run echo "4" >> buffer, you'll see “Re­ceived 3.” fol­lowed by “Re­ceived 4.” This means that the third line is not lost: in­stead, the script is not in­formed about the line when it is avail­able, but only when and if an ad­di­tion­al line is ap­pend­ed lat­er on.

This could be very prob­lem­at­ic when work­ing with real-time data. In my case, as the scripts were pro­cess­ing both the data com­ing from the sen­sors and the events act­ing on those sen­sors, it wasn't ex­act­ly a good idea to process the events with ran­dom de­lays: I need­ed them to be processed im­me­di­ate­ly, or at least in a mat­ter of mil­lisec­onds.

There is a dif­fer­ent ap­proach which doesn't have this is­sue. Let's cre­ate two.py:

#!/usr/bin/python3

import fcntl
import multiprocessing
import os
import selectors
import sys

def read_line(f):
    line = f.read().strip()
    if line:
        print(f"Received {line}.")

exit_event = multiprocessing.Event()
orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)
selector = selectors.DefaultSelector()
selector.register(sys.stdin, selectors.EVENT_READ, read_line)

while not exit_event.is_set():
    for k, mask in selector.select(0.1):
        k.data(k.fileobj)

If you redo the test, this time the mes­sages are shown with no de­lay. This would be a per­fect so­lu­tion for sce­nar­ios where stdin should be read in par­al­lel, but there is one caveat: it doesn't work when run­ning from a mul­ti­pro­cess­ing process. To demon­strate the is­sue, let's put the code above into a func­tion def read_file(exit_event), and call it like this:

p = multiprocessing.Process(target=read_file, args=(exit_event, print))
p.start()
time.sleep(2)

The code will raise the fol­low­ing is­sue:

Process Process-1:
Traceback (most recent call last):
  [...]
  File "./demo.py", line 22, in read_file
    selector.register(file, selectors.EVENT_READ, read_line)
  File "/usr/lib/python3.6/selectors.py", line 412, in register
    self._epoll.register(key.fd, epoll_events)
PermissionError: [Errno 1] Operation not permitted

In my case, I don't want to have this con­straint. It ap­pears, then, that there is a sim­pler al­ter­na­tive. The fact that f.readline() is block­ing should not pre­vent the process from stop­ping, as mul­ti­pro­cess­ing has a process.terminate() method. The method is de­struc­tive by its na­ture; that is, if the process is in a mid­dle of some­thing, it doesn't get a chance to fin­ish. How­ev­er, in the case where the process is sim­ply wait­ing for the bytes in a stream, ter­mi­nat­ing the process is per­fect­ly ac­cept­able.

My ap­proach is there­fore to launch a sub­process from a process, and to have a flag in­di­cat­ing whether the sub­process is do­ing some­thing in­ter­est­ing, or just wait­ing for the stream. In the first case, the sub­process is kind­ly asked to fin­ish what it is do­ing. In the sec­ond case, there is no ne­go­ti­a­tion with the sub­process, as it is ter­mi­nat­ed im­me­di­ate­ly by the par­ent process.

The par­ent process doesn't do any­thing in­ter­est­ing: it con­tains an in­fi­nite loop, con­di­tioned by the mul­ti­pro­cess­ing exit event. It is only when it re­ceives the event that it starts to do use­ful stuff, that is, checks for the flag from the sub­process and de­cides how to stop it.

p = ...  # Initializing the subprocess.

while not exit_event.is_set():
    time.sleep(0.01)
                
if can_terminate.is_set():
    p.terminate()
else:
    p.join(2)
    if p.is_alive():
        p.terminate()

    if p.exitcode:
        ...

The sub­process has also a loop, con­di­tioned too by the mul­ti­pro­cess­ing exit event. With­in this loop, it waits for a line from a file (in a block­ing way), and once it gets it, it process­es it.

def _read_file_internal(file, exit_event, action, can_terminate):
    with open(file) as f:
        while not exit_event.is_set():
            can_terminate.set()
            line = f.readline().strip()

            can_terminate.clear()
            if line and not exit_event.is_set():
                action(line)

Al­though the code is rel­a­tive­ly easy to re­con­struct from this ar­ti­cle, it can also be used through the func­tions read_file and read_file_into_queue that I added to my li­brary, mul­ti­watch. The sec­ond func­tion is par­tic­u­lar­ly in­ter­est­ing: a com­mon sce­nario (at least in my case) is to read stuff from stdin, trans­form it some­how, and put the re­sult in a queue. With this func­tion, this be­comes a one-lin­er. Here's a more elab­o­rate ex­am­ple, where a ded­i­cat­ed process is cre­at­ed, and stdin is be­ing read from this process:

import multiprocessing
import multiwatch

exit_event = multiprocessing.Event()
queue = multiprocessing.Queue()

p = multiprocessing.Process(
    target=multiwatch.read_file_into_queue,
    args=(0, exit_event, queue)
)

p.start()

This is all it gets to re­ceive the lines from stdin into the queue.