Streaming large uploads to a backend API
When it comes to uploading large files to a Flask application, the subject seems to be well documented. Flask (or rather Werkzeug) has lots of features to stream not only downloads, but also uploads themselves, in order to receive huge files with near-zero memory footprint.
There is one case, however, which is not documented.
In Bookshelf, one of the features consists of uploading the books you purchased in PDF/DjVu format. The user can simply drag a file over the <div></div>
containing the book, and the AJAX call transfers the file. While this is done, a top bar shows the upload progress. Since those files can be pretty large (I had a few ones going up to 150 MB), the progress bar appears to be rather useful.
Internally, the web application doesn't store anything, but communicates with the Bookshelf API which handles the data. Uploaded files are not an exception: when the web application receives an uploaded file, it uses requests library to do a HTTP POST to the Bookshelf API. Nothing original here.
user = flask.session.get("user")
id = int(flask.request.args["id"])
match = flask.request.files["file"]
storage_api_client.upload_file(user["id"], id, match.stream)
The problem is that the transfer of the file from the web application to the API occurs after the file is entirely received by Flask. Not a problem for small files, but definitively an issue for a 150 MB file. The issue is not limited to the increased time, but also to the fact that it creates a terrible user experience. It happens this way. The user drops a file; the progress bar goes from zero to one hundred in, say, twenty seconds, and then nothing happens. Only fifteen seconds later, the web page shows that the file was uploaded, when the AJAX call ends.
Therefore, I needed to look for a way to stream the file to the API as soon as it starts to be received. I started by profiling the code. It appeared that the first blocking slow operation is the call flask.request.files
. It blocks the execution until the file is completely received and stored as a temporary file by Flask. This first duration corresponds approximately to the time during which the progress bar on the client side moves from zero to one hundred. When storage_api_client.upload_file
is called, the client-side progress bar is already removed, and the time it takes to HTTP POST the large file to the API is the time the user doesn't have any visual feedback.
Given the flexibility of Werkzeug, it is obviously possible for the caller to start processing the stream as soon as it arrives:
stream, form, files = werkzeug.formparser.parse_form_data(
flask.request.environ, stream_factory=custom_stream_factory)
where custom_stream_factory
is a stream which receives the contents of the file. Nice, I simply have to link it to requests.post
, is it? I would:
- Prepare the request to Bookshelf API, without putting a request body yet.
- Fill the body when Werkzeug calls
write
of mycustom_stream_factory
. - Terminate the request to the API and receive the response.
However, this is not how requests library works. Instead, it will ask the caller to provide a stream (or generator) that the library will use to stream the request data. Therefore, on one side, Werkzeug calls write
to feed my application with the incoming file data, and on other side, requests library calls read
to get the file data to send, and both won't work together, unless running in parallel. In other words, I can't start receiving the file from Werkzeug before I do a requests.post
, because the data arriving through the consecutive calls of write
will be lost; I can't call requests.post
before I receive the incoming file either, because it will attempt to read data which doesn't exist yet.
One of the solutions would be to modify the source of requests by separating the stream loop from what happens before and after the loop. I didn't want to go this way, expecting to have multiple issues down the road which would force me to copy-paste a large part of requests library.
Another solution was to go parallel. One process would receive the file from Werkzeug; another one would feed it to the requests library. It appeared that it's relatively easy to set up.
Implementing streaming in parallel
To be able to stream the incoming file, Werkzeug expects a writable stream. During its operation, Werkzeug will make multiple calls to the write
method of the stream, passing each time a byte array composed of zero or more bytes. Yes, surprisingly, it can pass a zero-length array, and this doesn't mean that the stream ended. Once Werkzeug finishes transmitting the file, it conveniently rewinds the stream by calling seek
, specifying a zero position.
The requests library, in turn, calls the specified generator repeatedly until it ends.
When the file is received by one thread, it should be transmitted to the thread which will allow requests library to read it. In Python, the usual way to transmit binary data is through pipes. When a pipe is constructed, it returns two connections to be used in two processes. By default, each connection can be used to read from it and write to it. Although duplex
can be set to False
and in our case, it probably should, I couldn't make it work: when disabling duplex mode, I can still write to both connections, and the performance seems to go down. I'm probably missing something, but never mind; this is not the subject here.
This is the piece of code which creates the pipe and starts two functions in parallel. The write
and read
functions will be implemented later.
write_connection, read_connection = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=write, args=[write_connection])
p2 = multiprocessing.Process(target=read, args=[read_connection])
p1.start()
p2.start()
p1.join()
p2.join()
I decided to create two classes. The one will let Werkzeug to write binary data to it and transfer it to the pipe:
class StreamToPipe:
def __init__(self, pipe):
self.pipe = pipe
def seek(self, offset):
self.pipe.send_bytes(b'')
self.pipe.close()
def write(self, data):
if len(data) > 0:
self.pipe.send_bytes(data)
The other one will let the requests library to read from the pipe:
class StreamFromPipe:
def __init__(self, pipe):
self.pipe = pipe
def read(self):
while not self.pipe.closed:
data = self.pipe.recv_bytes()
if len(data):
yield data
else:
self.pipe.close()
The first class will be used like this:
def read(pipe_connection):
st = StreamToPipe(pipe_connection)
def custom_stream_factory(
total_content_length, filename, content_type,
content_length=None):
return st
stream, form, files = werkzeug.formparser.parse_form_data(
flask.request.environ, stream_factory=custom_stream_factory)
Here's the second one:
def write(pipe_connection):
st = StreamFromPipe(pipe_connection)
storage_api_client.upload_file_stream(user["id"], id, st.read)
The storage_api_client.upload_file_stream
method simply constructs a URI to the Bookshelf API and calls a request.post(uri, data=generator())
.
This approach works, but the performance is absolutely terrible. By comparison, the original uploading (the one involving two consecutive steps) takes one second in average. The optimized approach takes more than four seconds. Making a slow operation four to five times slower is hardly a great way to enhance user experience.
This poor performance is due to the fact that Werkzeug makes a lot of calls to write
, passing only a very short array of bytes—sometimes up to sixty, but usually no more than ten. Buffering seemed to be unavoidable.
Adding buffering
Back to StreamToPipe
class, instead of calling pipe.send_bytes
every time write
is called by Werkzeug, one could buffer the bytes and send them only when a threshold is reached. The implementation of the buffering is slightly different from the usual: instead of enforcing the maximum size of the buffer, I'm using a minimum size. What I mean it that as soon as the threshold is reached, the data will be flushed through the pipe, given that the size of the data may (and usually will) be greater than the threshold.
class StreamToPipe:
def __init__(self, pipe):
self.pipe = pipe
self.buffer = b''
def seek(self, offset):
if self.buffer:
self.pipe.send_bytes(self.buffer)
self.pipe.send_bytes(b'')
self.pipe.close()
def write(self, data):
if len(data) > 0:
self.buffer += data
if len(self.buffer) > 4000:
self.pipe.send_bytes(self.buffer)
self.buffer = b''
The threshold size is arbitrary, and is based on what seems to achieve better performance on my machine. With a threshold of 4,000, I can stream the same test document in 0.5 s. Thresholds of 1,000 or 10,000 lead to 0.6 to 0.7 s. YMMV.