ctxio.go 2.65 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
package ctxutil

import (
	"io"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)

type ioret struct {
	n   int
	err error
}

type Writer interface {
	io.Writer
}

type ctxWriter struct {
	w   io.Writer
	ctx context.Context
}

// NewWriter wraps a writer to make it respect given Context.
// If there is a blocking write, the returned Writer will return
// whenever the context is cancelled (the return values are n=0
// and err=ctx.Err().)
//
// Note well: this wrapper DOES NOT ACTUALLY cancel the underlying
// write-- there is no way to do that with the standard go io
// interface. So the read and write _will_ happen or hang. So, use
// this sparingly, make sure to cancel the read or write as necesary
// (e.g. closing a connection whose context is up, etc.)
//
// Furthermore, in order to protect your memory from being read
// _after_ you've cancelled the context, this io.Writer will
// first make a **copy** of the buffer.
func NewWriter(ctx context.Context, w io.Writer) *ctxWriter {
	if ctx == nil {
		ctx = context.Background()
	}
	return &ctxWriter{ctx: ctx, w: w}
}

func (w *ctxWriter) Write(buf []byte) (int, error) {
	buf2 := make([]byte, len(buf))
	copy(buf2, buf)

	c := make(chan ioret)

	go func() {
		n, err := w.w.Write(buf2)
		c <- ioret{n, err}
		close(c)
	}()

	select {
	case r := <-c:
		return r.n, r.err
	case <-w.ctx.Done():
		return 0, w.ctx.Err()
	}
}

type Reader interface {
	io.Reader
}

type ctxReader struct {
	r   io.Reader
	ctx context.Context
}

// NewReader wraps a reader to make it respect given Context.
// If there is a blocking read, the returned Reader will return
// whenever the context is cancelled (the return values are n=0
// and err=ctx.Err().)
//
// Note well: this wrapper DOES NOT ACTUALLY cancel the underlying
// write-- there is no way to do that with the standard go io
// interface. So the read and write _will_ happen or hang. So, use
// this sparingly, make sure to cancel the read or write as necesary
// (e.g. closing a connection whose context is up, etc.)
//
// Furthermore, in order to protect your memory from being read
// _before_ you've cancelled the context, this io.Reader will
// allocate a buffer of the same size, and **copy** into the client's
// if the read succeeds in time.
func NewReader(ctx context.Context, r io.Reader) *ctxReader {
	return &ctxReader{ctx: ctx, r: r}
}

func (r *ctxReader) Read(buf []byte) (int, error) {
	buf2 := make([]byte, len(buf))

	c := make(chan ioret)

	go func() {
		n, err := r.r.Read(buf2)
		c <- ioret{n, err}
		close(c)
	}()

	select {
	case ret := <-c:
		copy(buf, buf2)
		return ret.n, ret.err
	case <-r.ctx.Done():
		return 0, r.ctx.Err()
	}
}