Streaming large uploads to a backend API

Arseni Mourzenko
Founder and lead developer
176
articles
May 6, 2018
Tags: flask 2 werkzeug 1 performance 13 python 5

When it comes to up­load­ing large files to a Flask ap­pli­ca­tion, the sub­ject seems to be well doc­u­ment­ed. Flask (or rather Werkzeug) has lots of fea­tures to stream not only down­loads, but also up­loads them­selves, in or­der to re­ceive huge files with near-zero mem­o­ry foot­print.

There is one case, how­ev­er, which is not doc­u­ment­ed.

In Book­shelf, one of the fea­tures con­sists of up­load­ing the books you pur­chased in PDF/DjVu for­mat. The user can sim­ply drag a file over the <div></div> con­tain­ing the book, and the AJAX call trans­fers the file. While this is done, a top bar shows the up­load progress. Since those files can be pret­ty large (I had a few ones go­ing up to 150 MB), the progress bar ap­pears to be rather use­ful.

In­ter­nal­ly, the web ap­pli­ca­tion doesn't store any­thing, but com­mu­ni­cates with the Book­shelf API which han­dles the data. Up­loaded files are not an ex­cep­tion: when the web ap­pli­ca­tion re­ceives an up­loaded file, it uses re­quests li­brary to do a HTTP POST to the Book­shelf API. Noth­ing orig­i­nal here.

user = flask.session.get("user")
id = int(flask.request.args["id"])
match = flask.request.files["file"]
storage_api_client.upload_file(user["id"], id, match.stream)

The prob­lem is that the trans­fer of the file from the web ap­pli­ca­tion to the API oc­curs af­ter the file is en­tire­ly re­ceived by Flask. Not a prob­lem for small files, but de­fin­i­tive­ly an is­sue for a 150 MB file. The is­sue is not lim­it­ed to the in­creased time, but also to the fact that it cre­ates a ter­ri­ble user ex­pe­ri­ence. It hap­pens this way. The user drops a file; the progress bar goes from zero to one hun­dred in, say, twen­ty sec­onds, and then noth­ing hap­pens. Only fif­teen sec­onds lat­er, the web page shows that the file was up­loaded, when the AJAX call ends.

There­fore, I need­ed to look for a way to stream the file to the API as soon as it starts to be re­ceived. I start­ed by pro­fil­ing the code. It ap­peared that the first block­ing slow op­er­a­tion is the call flask.request.files. It blocks the ex­e­cu­tion un­til the file is com­plete­ly re­ceived and stored as a tem­po­rary file by Flask. This first du­ra­tion cor­re­sponds ap­prox­i­mate­ly to the time dur­ing which the progress bar on the client side moves from zero to one hun­dred. When storage_api_client.upload_file is called, the client-side progress bar is al­ready re­moved, and the time it takes to HTTP POST the large file to the API is the time the user doesn't have any vi­su­al feed­back.

Giv­en the flex­i­bil­i­ty of Werkzeug, it is ob­vi­ous­ly pos­si­ble for the caller to start pro­cess­ing the stream as soon as it ar­rives:

stream, form, files = werkzeug.formparser.parse_form_data(
    flask.request.environ, stream_factory=custom_stream_factory)

where custom_stream_factory is a stream which re­ceives the con­tents of the file. Nice, I sim­ply have to link it to requests.post, is it? I would:

  1. Pre­pare the re­quest to Book­shelf API, with­out putting a re­quest body yet.
  2. Fill the body when Werkzeug calls write of my custom_stream_factory.
  3. Ter­mi­nate the re­quest to the API and re­ceive the re­sponse.

How­ev­er, this is not how re­quests li­brary works. In­stead, it will ask the caller to pro­vide a stream (or gen­er­a­tor) that the li­brary will use to stream the re­quest data. There­fore, on one side, Werkzeug calls write to feed my ap­pli­ca­tion with the in­com­ing file data, and on oth­er side, re­quests li­brary calls read to get the file data to send, and both won't work to­geth­er, un­less run­ning in par­al­lel. In oth­er words, I can't start re­ceiv­ing the file from Werkzeug be­fore I do a requests.post, be­cause the data ar­riv­ing through the con­sec­u­tive calls of write will be lost; I can't call requests.post be­fore I re­ceive the in­com­ing file ei­ther, be­cause it will at­tempt to read data which doesn't ex­ist yet.

One of the so­lu­tions would be to mod­i­fy the source of re­quests by sep­a­rat­ing the stream loop from what hap­pens be­fore and af­ter the loop. I didn't want to go this way, ex­pect­ing to have mul­ti­ple is­sues down the road which would force me to copy-paste a large part of re­quests li­brary.

An­oth­er so­lu­tion was to go par­al­lel. One process would re­ceive the file from Werkzeug; an­oth­er one would feed it to the re­quests li­brary. It ap­peared that it's rel­a­tive­ly easy to set up.

Im­ple­ment­ing stream­ing in par­al­lel

To be able to stream the in­com­ing file, Werkzeug ex­pects a writable stream. Dur­ing its op­er­a­tion, Werkzeug will make mul­ti­ple calls to the write method of the stream, pass­ing each time a byte ar­ray com­posed of zero or more bytes. Yes, sur­pris­ing­ly, it can pass a zero-length ar­ray, and this doesn't mean that the stream end­ed. Once Werkzeug fin­ish­es trans­mit­ting the file, it con­ve­nient­ly rewinds the stream by call­ing seek, spec­i­fy­ing a zero po­si­tion.

The re­quests li­brary, in turn, calls the spec­i­fied gen­er­a­tor re­peat­ed­ly un­til it ends.

When the file is re­ceived by one thread, it should be trans­mit­ted to the thread which will al­low re­quests li­brary to read it. In Python, the usu­al way to trans­mit bi­na­ry data is through pipes. When a pipe is con­struct­ed, it re­turns two con­nec­tions to be used in two process­es. By de­fault, each con­nec­tion can be used to read from it and write to it. Al­though duplex can be set to False and in our case, it prob­a­bly should, I couldn't make it work: when dis­abling du­plex mode, I can still write to both con­nec­tions, and the per­for­mance seems to go down. I'm prob­a­bly miss­ing some­thing, but nev­er mind; this is not the sub­ject here.

This is the piece of code which cre­ates the pipe and starts two func­tions in par­al­lel. The write and read func­tions will be im­ple­ment­ed lat­er.

write_connection, read_connection = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=write, args=[write_connection])
p2 = multiprocessing.Process(target=read, args=[read_connection])
p1.start()
p2.start()
p1.join()
p2.join()

I de­cid­ed to cre­ate two class­es. The one will let Werkzeug to write bi­na­ry data to it and trans­fer it to the pipe:

class StreamToPipe:
    def __init__(self, pipe):
        self.pipe = pipe

    def seek(self, offset):
        self.pipe.send_bytes(b'')
        self.pipe.close()

    def write(self, data):
        if len(data) > 0:
            self.pipe.send_bytes(data)

The oth­er one will let the re­quests li­brary to read from the pipe:

class StreamFromPipe:
    def __init__(self, pipe):
        self.pipe = pipe

    def read(self):
        while not self.pipe.closed:
            data = self.pipe.recv_bytes()
            if len(data):
                yield data
            else:
                self.pipe.close()

The first class will be used like this:

def read(pipe_connection):
    st = StreamToPipe(pipe_connection)

    def custom_stream_factory(
            total_content_length, filename, content_type,
            content_length=None):
        return st

    stream, form, files = werkzeug.formparser.parse_form_data(
        flask.request.environ, stream_factory=custom_stream_factory)

Here's the sec­ond one:

def write(pipe_connection):
    st = StreamFromPipe(pipe_connection)
    storage_api_client.upload_file_stream(user["id"], id, st.read)

The storage_api_client.upload_file_stream method sim­ply con­structs a URI to the Book­shelf API and calls a request.post(uri, data=generator()).

This ap­proach works, but the per­for­mance is ab­solute­ly ter­ri­ble. By com­par­i­son, the orig­i­nal up­load­ing (the one in­volv­ing two con­sec­u­tive steps) takes one sec­ond in av­er­age. The op­ti­mized ap­proach takes more than four sec­onds. Mak­ing a slow op­er­a­tion four to five times slow­er is hard­ly a great way to en­hance user ex­pe­ri­ence.

This poor per­for­mance is due to the fact that Werkzeug makes a lot of calls to write, pass­ing only a very short ar­ray of bytes—some­times up to six­ty, but usu­al­ly no more than ten. Buffer­ing seemed to be un­avoid­able.

Adding buffer­ing

Back to StreamToPipe class, in­stead of call­ing pipe.send_bytes every time write is called by Werkzeug, one could buffer the bytes and send them only when a thresh­old is reached. The im­ple­men­ta­tion of the buffer­ing is slight­ly dif­fer­ent from the usu­al: in­stead of en­forc­ing the max­i­mum size of the buffer, I'm us­ing a min­i­mum size. What I mean it that as soon as the thresh­old is reached, the data will be flushed through the pipe, giv­en that the size of the data may (and usu­al­ly will) be greater than the thresh­old.

class StreamToPipe:
    def __init__(self, pipe):
        self.pipe = pipe
        self.buffer = b''

    def seek(self, offset):
        if self.buffer:
            self.pipe.send_bytes(self.buffer)

        self.pipe.send_bytes(b'')
        self.pipe.close()

    def write(self, data):
        if len(data) > 0:
            self.buffer += data
            if len(self.buffer) > 4000:
                self.pipe.send_bytes(self.buffer)
                self.buffer = b''

The thresh­old size is ar­bi­trary, and is based on what seems to achieve bet­ter per­for­mance on my ma­chine. With a thresh­old of 4,000, I can stream the same test doc­u­ment in 0.5 s. Thresh­olds of 1,000 or 10,000 lead to 0.6 to 0.7 s. YMMV.