Jay Taylor's notes

back to listing index

Asynchronously Split an io.Reader in Go (golang) « Rodaine

[web search]
Original source (rodaine.com)
Tags: golang go rodaine.com
Clipped on: 2016-05-14

25 Apr 2015 · 6 Comments

Asynchronously Split an io.Reader in Go (golang)

Or the many ways to skin a cat — er — stream

I have fallen in love with the flexibility of io.Reader and io.Writer when dealing with any stream of data in Go. And while I am more or less smitten at this point, the reader interface challenged me with something you might think simple: splitting it in two.

I’m not even certain “split” is the right word. I would like to receive an io.Reader and read over it multiple times, possibly in parallel. But because readers don’t necessarily expose the Seek method to reset them, I need a way to duplicate it. Or would that be clone it? Fork?!

The Situation

Suppose you have a web service that allows a user to upload a file. The service will store the file on “the cloud”, but first it needs a bit of processing. All you have to work with is the io.Reader from the incoming request.

The Solutions

There is not one way to go about solving this problem, of course. Depending on the types of files, throughput of the service and the kinds of processing required, some options are more practical than others. Below, I lay out five different methods of varying complexity and flexibility. I imagine there are many more, but these are a good starting point.

Solution #1: The Simple bytes.Reader

If the source reader doesn’t have a Seek method, then why not make one? You can pump the input into a bytes.Reader and rewind it as many times as you like:

1 func handleUpload(u io.Reader) (err error) {
2 // capture all bytes from upload
3 b, err := ioutil.ReadAll(u)
4 if err != nil {
5 return
6 }
7
8 // wrap the bytes in a ReadSeeker
9 r := bytes.NewReader(b)
10
11 // process the meta data
12 err = processMetaData(r)
13 if err != nil {
14 return
15 }
16
17 // rewind the reader back to the start
18 r.Seek(0, 0)
19
20 // upload the data
21 err = uploadFile(r)
22 if err != nil {
23 return
24 }
25
26 return nil
27 }
view raw bytesreader.go hosted with ❤ by GitHub

If the data is small enough, this might be the most convenient option; you could forgo the bytes.Reader altogether and work off the byte slice instead. But suppose the file is large, such as a video or RAW photo. These behemoths will chew through memory, especially if the service is high-traffic. Not to mention, you cannot perform these actions in parallel.

Pro’s: Probably the simplest solution.
Con’s: Synchronous and not prudent if you expect many or large files.

Solution #2: The Reliable File System

OK, then how about you drop the data into a file on disk (a’la ioutil.TempFile) and skip the penalties of storing it in RAM?

1 func handleUpload(u io.Reader) (err error) {
2 // create a temporary file for the upload
3 f, err := ioutil.TempFile("", "upload")
4 if err != nil {
5 return
6 }
7
8 // destroy the file once done
9 defer func() {
10 n := f.Name()
11 f.Close()
12 os.Remove(n)
13 }()
14
15 // transfer the bytes to the file
16 _, err = io.Copy(f, u)
17 if err != nil {
18 return
19 }
20
21 // rewind the file
22 f.Seek(0, 0)
23
24 // process the meta data
25 err = processMetaData(f)
26 if err != nil {
27 return
28 }
29
30 // rewind the file again
31 f.Seek(0, 0)
32
33 // upload the file
34 err = uploadFile(f)
35 if err != nil {
36 return
37 }
38
39 return nil
40 }
view raw file.go hosted with ❤ by GitHub

If the final destination is on the service’s file system, then this is probably your best choice (albeit with a real file), but let’s assume it will end up on the cloud. Again, if the files are large, the IO costs here could be noticeable and unnecessary. You run the risk of bugs or crashes orphaning files on the machine, and I also wouldn’t recommend this if the data is sensitive in any way.

Pro’s: Keeps the whole file out of RAM.
Con’s: Still synchronous, potential for lots of IO, disk space, and orphaned data.

Solution #3: The Duct-Tape io.MultiReader

In some cases, the metadata you need exists in the first handful of bytes of the file. Identifying a file as a JPEG, for instance, only requires checking that the first two bytes are 0xFF 0xD8. This can be handled synchronously using a io.MultiReader, which glues together a set of readers as if they were one. Here’s our JPEG example:

It is not categorically true that a file beginning with those two bytes is a valid JPEG, but for the most part it's enough. If you're curious, the exiv2 team has documented the metadata structure of the JPEG format.
1 func handleUpload(u io.Reader) (err error) {
2 // read in the first two bytes
3 b := make([]byte, 2)
4 _, err = u.Read(b)
5 if err != nil {
6 return
7 }
8
9 // check that they match the JPEG header
10 jpg := []byte{0xFF, 0xD8}
11 if !bytes.Equal(b, jpg) {
12 return errors.New("not a JPEG")
13 }
14
15 // glue those bytes back onto the reader
16 r := io.MultiReader(bytes.NewReader(b), u)
17
18 // upload the file
19 err = uploadFile(r)
20 if err != nil {
21 return
22 }
23
24 return nil
25 }
view raw multireader.go hosted with ❤ by GitHub

This is a great technique if you intend to gate the upload to only JPEG files. With only two bytes, you can cancel the transfer without entirely reading it into memory or writing it to disk. As you might expect, this method falters in situations where you need to read in more than a little bit of the file to gather the data, such as calculating a word count across it. Having this process blocking the upload may not be ideal for intensive tasks. And finally, most 3rd-party (and the majority of the standard library) packages entirely consume a reader, preventing you from using an io.MultiReader in this way.

Another solution would be to use bufio.Reader.Peek. It essentially performs the same operation but you can eschew the MultiReader. That, and it gives you access to some other useful methods on the reader.

Pro’s: Quick and dirty reads off the top of a file, can act as a gate.
Con’s: Doesn’t work for unknown-length reads, processing the whole file, intensive tasks, or with most 3rd-party packages.

Solution #4: The Single-Split io.TeeReader and io.Pipe

Back to our scenario of a large video file, let’s change the story a bit. Your users will upload the video in a single format, but you want your service to be able to display those videos in a couple of different formats. You have a 3rd-party transcoder that can take in an io.Reader of (say) MP4 encoded data and return another reader of WebM data. The service will upload the original MP4 and WebM versions to the cloud. The previous solutions must perform these steps synchronously and with overhead; now, you want to do them in parallel.

Take a look at io.TeeReader, which has the following signature: func TeeReader(r Reader, w Writer) Reader. The docs say “TeeReader returns a Reader that writes to w what it reads from r.” This is exactly what you want! Now how do you get the data written into w to be readable? This is where io.Pipe comes into play, yielding a connected io.PipeReader and io.PipeWriter (i.e., writes to the latter are immediately available in the former). Let’s see it in action:

1 func handleUpload(u io.Reader) (err error) {
2 // create the pipe and tee reader
3 pr, pw := io.Pipe()
4 tr := io.TeeReader(u, pw)
5
6 // create channels to synchronize
7 done := make(chan bool)
8 errs := make(chan error)
9 defer close(done)
10 defer close(errs)
11
12 go func() {
13 // close the PipeWriter after the
14 // TeeReader completes to trigger EOF
15 defer pw.Close()
16
17 // upload the original MP4 data
18 err := uploadFile(tr)
19 if err != nil {
20 errs <- err
21 return
22 }
23
24 done <- true
25 }()
26
27 go func() {
28 // transcode to WebM
29 webmr, err := transcode(pr)
30 if err != nil {
31 errs <- err
32 return
33 }
34
35 // upload to storage
36 err = uploadFile(webmr)
37 if err != nil {
38 errs <- err
39 return
40 }
41
42 done <- true
43 }()
44
45 // wait until both are done
46 // or an error occurs
47 for c := 0; c < 2; {
48 select {
49 case err := <-errs:
50 return err
51 case <-done:
52 c++
53 }
54 }
55
56 return nil
57 }
view raw teereader.go hosted with ❤ by GitHub

As the uploader consumes tr, the transcoder receives and processes the same bytes before sending it off to storage. All without a buffer and in parallel! Be aware of the use of goroutines for both pathways, though. io.Pipe blocks until something writes and reads from it. Attempting this on the same thread will give you a fatal error: all goroutines are asleep - deadlock! panic. Another point of caution: when using pipes, you will need to explicitly trigger an EOF by closing the io.PipeWriter at the appropriate time. In this case, you would close it after the TeeReader has been exhausted.

This method also employs channels to communicate “doneness” and any errors that occur. If you expect a value back from these processes, you could replace the chan bool for a more appropriate type.

Pro’s: Completely independent, parallelized streams of the same data!
Con’s: Requires the added complexity of goroutines and channels to work.

Solution #5: The Multi-Split io.MultiWriter and io.Copy

The io.TeeReader solution works great when only one other consumer of the stream exists. As the service parallelizes more tasks (e.g., more transcoding), teeing off of tees becomes gross. Enter the io.MultiWriter: “a writer that duplicates its writes to all provided writers.” This method utilizes pipes like in the previous solution to propagate the data, but instead of a TeeReader, you can use io.Copy to split the data across all the pipes:

1 func handleUpload(u io.Reader) (err error) {
2 // create the pipes
3 mp4R, mp4W := io.Pipe()
4 webmR, webmW := io.Pipe()
5 oggR, oggW := io.Pipe()
6 wavR, wavW := io.Pipe()
7
8 // create channels to synchronize
9 done := make(chan bool)
10 errs := make(chan error)
11 defer close(done)
12 defer close(errs)
13
14 // spawn all the task goroutines. These look identical to
15 // the TeeReader example, but pulled out into separate
16 // methods for clarity
17 go uploadMP4(mp4R, done, errs)
18 go transcodeAndUploadWebM(webmR, done, errs)
19 go transcodeAndUploadOgg(oggR, done, errs)
20 go transcodeAndUploadWav(wavR, done, errs)
21
22 go func() {
23 // after completing the copy, we need to close
24 // the PipeWriters to propagate the EOF to all
25 // PipeReaders to avoid deadlock
26 defer mp4W.Close()
27 defer webmW.Close()
28 defer oggW.Close()
29 defer wavW.Close()
30
31 // build the multiwriter for all the pipes
32 mw := io.MultiWriter(mp4W, webmW, oggW, wavW)
33
34 // copy the data into the multiwriter
35 _, err := io.Copy(mw, u)
36 if err != nil {
37 errs <- err
38 }
39 }()
40
41 // wait until all are done
42 // or an error occurs
43 for c := 0; c < 4; c++ {
44 select {
45 case err := <-errs:
46 return err
47 case <-done:
48 }
49 }
50
51 return nil
52 }
view raw multiwriter.go hosted with ❤ by GitHub

This is more or less analogous with the previous method, but noticeably cleaner when the stream needs multiple clones. Because of the pipes, you’ll again require goroutines and synchronizing channels to avoid the deadlock. We defer closing all the pipes until the copy is complete.

Pro’s: Can make as many forks of the original reader as desired.
Con’s: Even more use of goroutines and channels to coordinate.

What About Channels?

Channels are one of the most unique and powerful concurrency tools Go has to offer. Serving as a bridge between goroutines, they combine communication and synchronization in one. You can allocate a channel with or without a buffer, allowing for many creative ways to share data. So why did I not provide a solution that leverages them for more than sync?

Looking through the top-level packages of the standard library, channels rarely appear in function signatures:

  • time: useful for a select with timeout
  • reflect: … ‘cause reflection
  • fmt: for formatting it as a pointer
  • builtin: exposes the close function

The implementation of io.Pipe forgoes a channel in favor of sync.Mutex to move data safely between the reader and writer. My suspicion is that channels are just not as performant, and presumably mutexes prevail for this reason.

When developing a reusable package, I’d avoid channels in my public API to be consistent with the standard library but maybe use them internally for synchronization. If the complexity is low enough, replacing them with mutexes may even be ideal. That said, within an application, channels are wonderful abstractions, easier to grok than locks and more flexible.

Wrapping Up

I’ve only broached a handful of ways to go about processing the data coming from an io.Reader, and without a doubt there are plenty more. Go’s implicit interface model plus the standard library’s heavy use of them permits many creative ways of gluing together various components without having to worry about the source of the data. I hope some of the exploration I’ve done here will prove as useful for you as it did for me!