query.go 1.69 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
package dht

import (
	peer "github.com/jbenet/go-ipfs/peer"
	queue "github.com/jbenet/go-ipfs/peer/queue"
	u "github.com/jbenet/go-ipfs/util"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)

type dhtQuery struct {
	// a PeerQueue
	peers queue.PeerQueue

	// the function to execute per peer
	qfunc queryFunc
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
type queryFunc func(context.Context, *peer.Peer) (interface{}, []*peer.Peer, error)

func (q *dhtQuery) Run(ctx context.Context, concurrency int) (interface{}, error) {
	// get own cancel function to signal when we've found the value
	ctx, cancel := context.WithCancel(ctx)

	// the variable waiting to be populated upon success
	var result interface{}

	// chanQueue is how workers receive their work
	chanQueue := queue.NewChanQueue(ctx, q.peers)

	// worker
	worker := func() {
		for {
			select {
			case p := <-chanQueue.DeqChan:

				val, closer, err := q.qfunc(ctx, p)
				if err != nil {
					u.PErr("error running query: %v\n", err)
					continue
				}

				if val != nil {
					result = val
					cancel() // signal we're done.
					return
				}

				if closer != nil {
					for _, p := range closer {
						select {
						case chanQueue.EnqChan <- p:
						case <-ctx.Done():
							return
						}
					}
				}

			case <-ctx.Done():
				return
			}
		}
	}

	// launch all workers
	for i := 0; i < concurrency; i++ {
		go worker()
	}

	// wait until we're done. yep.
	select {
	case <-ctx.Done():
	}

	if result != nil {
		return result, nil
	}

	return nil, ctx.Err()
}