query.go 2.78 KB
Newer Older
1 2 3 4 5 6 7 8 9
package routing

import (
	"context"
	"sync"

	"github.com/libp2p/go-libp2p-core/peer"
)

10
// QueryEventType indicates the query event's type.
11 12 13 14 15 16
type QueryEventType int

// Number of events to buffer.
var QueryEventBufferSize = 16

const (
17
	// Sending a query to a peer.
18
	SendingQuery QueryEventType = iota
19
	// Got a response from a peer.
20
	PeerResponse
21
	// Found a "closest" peer (not currently used).
22
	FinalPeer
23
	// Got an error when querying.
24
	QueryError
25
	// Found a provider.
26
	Provider
27
	// Found a value.
28
	Value
29
	// Adding a peer to the query.
30
	AddingPeer
31
	// Dialing a peer.
32 33 34
	DialingPeer
)

35
// QueryEvent is emitted for every notable event that happens during a DHT query.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
type QueryEvent struct {
	ID        peer.ID
	Type      QueryEventType
	Responses []*peer.AddrInfo
	Extra     string
}

type routingQueryKey struct{}
type eventChannel struct {
	mu  sync.Mutex
	ctx context.Context
	ch  chan<- *QueryEvent
}

// waitThenClose is spawned in a goroutine when the channel is registered. This
// safely cleans up the channel when the context has been canceled.
func (e *eventChannel) waitThenClose() {
	<-e.ctx.Done()
	e.mu.Lock()
	close(e.ch)
	// 1. Signals that we're done.
	// 2. Frees memory (in case we end up hanging on to this for a while).
	e.ch = nil
	e.mu.Unlock()
}

// send sends an event on the event channel, aborting if either the passed or
// the internal context expire.
func (e *eventChannel) send(ctx context.Context, ev *QueryEvent) {
	e.mu.Lock()
	// Closed.
	if e.ch == nil {
		e.mu.Unlock()
		return
	}
	// in case the passed context is unrelated, wait on both.
	select {
	case e.ch <- ev:
	case <-e.ctx.Done():
	case <-ctx.Done():
	}
	e.mu.Unlock()
}

80 81 82 83 84 85
// RegisterForQueryEvents registers a query event channel with the given
// context. The returned context can be passed to DHT queries to receive query
// events on the returned channels.
//
// The passed context MUST be canceled when the caller is no longer interested
// in query events.
86 87 88 89 90 91 92
func RegisterForQueryEvents(ctx context.Context) (context.Context, <-chan *QueryEvent) {
	ch := make(chan *QueryEvent, QueryEventBufferSize)
	ech := &eventChannel{ch: ch, ctx: ctx}
	go ech.waitThenClose()
	return context.WithValue(ctx, routingQueryKey{}, ech), ch
}

93 94
// PublishQueryEvent publishes a query event to the query event channel
// associated with the given context, if any.
95 96 97 98 99 100 101 102 103 104
func PublishQueryEvent(ctx context.Context, ev *QueryEvent) {
	ich := ctx.Value(routingQueryKey{})
	if ich == nil {
		return
	}

	// We *want* to panic here.
	ech := ich.(*eventChannel)
	ech.send(ctx, ev)
}
105 106 107 108 109 110 111

// SubscribesToQueryEvents returns true if the context subscribes to query
// events. If this function returns falls, calling `PublishQueryEvent` on the
// context will be a no-op.
func SubscribesToQueryEvents(ctx context.Context) bool {
	return ctx.Value(routingQueryKey{}) != nil
}