getter.go 3.72 KB
Newer Older
1
package getter
Jeromy's avatar
Jeromy committed
2 3 4 5 6

import (
	"context"
	"errors"

7 8
	notifications "gitlab.dms3.io/dms3/go-bitswap/internal/notifications"
	logging "gitlab.dms3.io/dms3/go-log"
Jeromy's avatar
Jeromy committed
9

10 11 12
	blocks "gitlab.dms3.io/dms3/go-block-format"
	cid "gitlab.dms3.io/dms3/go-cid"
	blockstore "gitlab.dms3.io/dms3/go-dms3-blockstore"
Jeromy's avatar
Jeromy committed
13 14
)

15
var log = logging.Logger("bitswap")
Jeromy's avatar
Jeromy committed
16

17 18 19 20 21 22 23 24
// GetBlocksFunc is any function that can take an array of CIDs and return a
// channel of incoming blocks.
type GetBlocksFunc func(context.Context, []cid.Cid) (<-chan blocks.Block, error)

// SyncGetBlock takes a block cid and an async function for getting several
// blocks that returns a channel, and uses that function to return the
// block syncronously.
func SyncGetBlock(p context.Context, k cid.Cid, gb GetBlocksFunc) (blocks.Block, error) {
25 26
	if !k.Defined() {
		log.Error("undefined cid in GetBlock")
Jeromy's avatar
Jeromy committed
27 28 29 30 31 32 33 34 35 36 37 38
		return nil, blockstore.ErrNotFound
	}

	// Any async work initiated by this function must end when this function
	// returns. To ensure this, derive a new context. Note that it is okay to
	// listen on parent in this scope, but NOT okay to pass |parent| to
	// functions called by this one. Otherwise those functions won't return
	// when this context's cancel func is executed. This is difficult to
	// enforce. May this comment keep you safe.
	ctx, cancel := context.WithCancel(p)
	defer cancel()

39
	promise, err := gb(ctx, []cid.Cid{k})
Jeromy's avatar
Jeromy committed
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
	if err != nil {
		return nil, err
	}

	select {
	case block, ok := <-promise:
		if !ok {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
				return nil, errors.New("promise channel was closed")
			}
		}
		return block, nil
	case <-p.Done():
		return nil, p.Err()
	}
}

60 61
// WantFunc is any function that can express a want for set of blocks.
type WantFunc func(context.Context, []cid.Cid)
Jeromy's avatar
Jeromy committed
62

63
// AsyncGetBlocks take a set of block cids, a pubsub channel for incoming
64 65 66 67 68 69
// blocks, a want function, and a close function, and returns a channel of
// incoming blocks.
func AsyncGetBlocks(ctx context.Context, sessctx context.Context, keys []cid.Cid, notif notifications.PubSub,
	want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {

	// If there are no keys supplied, just return a closed channel
Jeromy's avatar
Jeromy committed
70 71 72 73 74 75
	if len(keys) == 0 {
		out := make(chan blocks.Block)
		close(out)
		return out, nil
	}

76
	// Use a PubSub notifier to listen for incoming blocks for each key
Jeromy's avatar
Jeromy committed
77 78 79
	remaining := cid.NewSet()
	promise := notif.Subscribe(ctx, keys...)
	for _, k := range keys {
80
		log.Debugw("Bitswap.GetBlockRequest.Start", "cid", k)
Jeromy's avatar
Jeromy committed
81 82 83
		remaining.Add(k)
	}

84
	// Send the want request for the keys to the network
Jeromy's avatar
Jeromy committed
85 86 87
	want(ctx, keys)

	out := make(chan blocks.Block)
88
	go handleIncoming(ctx, sessctx, remaining, promise, out, cwants)
Jeromy's avatar
Jeromy committed
89 90 91
	return out, nil
}

92 93 94 95 96 97
// Listens for incoming blocks, passing them to the out channel.
// If the context is cancelled or the incoming channel closes, calls cfun with
// any keys corresponding to blocks that were never received.
func handleIncoming(ctx context.Context, sessctx context.Context, remaining *cid.Set,
	in <-chan blocks.Block, out chan blocks.Block, cfun func([]cid.Cid)) {

Jeromy's avatar
Jeromy committed
98
	ctx, cancel := context.WithCancel(ctx)
99 100 101

	// Clean up before exiting this function, and call the cancel function on
	// any remaining keys
Jeromy's avatar
Jeromy committed
102 103 104 105 106 107
	defer func() {
		cancel()
		close(out)
		// can't just defer this call on its own, arguments are resolved *when* the defer is created
		cfun(remaining.Keys())
	}()
108

Jeromy's avatar
Jeromy committed
109 110 111
	for {
		select {
		case blk, ok := <-in:
112 113
			// If the channel is closed, we're done (note that PubSub closes
			// the channel once all the keys have been received)
Jeromy's avatar
Jeromy committed
114 115 116 117 118 119 120 121 122
			if !ok {
				return
			}

			remaining.Remove(blk.Cid())
			select {
			case out <- blk:
			case <-ctx.Done():
				return
123 124
			case <-sessctx.Done():
				return
Jeromy's avatar
Jeromy committed
125 126 127
			}
		case <-ctx.Done():
			return
128 129
		case <-sessctx.Done():
			return
Jeromy's avatar
Jeromy committed
130 131 132
		}
	}
}