providers.go 3.11 KB
Newer Older
1 2 3 4 5
package dht

import (
	"time"

6
	key "github.com/ipfs/go-ipfs/blocks/key"
7 8
	goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
	goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
Lars Gierth's avatar
Lars Gierth committed
9
	peer "gx/ipfs/QmYgaiNVVL7f2nydijAwpDRunRkmxfu3PoK87Y3pH84uAW/go-libp2p/p2p/peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10

11
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
12 13 14
)

type ProviderManager struct {
15 16 17
	// all non channel fields are meant to be accessed only within
	// the run method
	providers map[key.Key]*providerSet
18
	local     map[key.Key]struct{}
19
	lpeer     peer.ID
20 21 22 23 24

	getlocal chan chan []key.Key
	newprovs chan *addProv
	getprovs chan *getProv
	period   time.Duration
25
	proc     goprocess.Process
26 27
}

28 29 30 31 32
type providerSet struct {
	providers []peer.ID
	set       map[peer.ID]time.Time
}

33
type addProv struct {
34
	k   key.Key
35
	val peer.ID
36 37 38
}

type getProv struct {
39
	k    key.Key
40
	resp chan []peer.ID
41 42
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43
func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
44 45 46
	pm := new(ProviderManager)
	pm.getprovs = make(chan *getProv)
	pm.newprovs = make(chan *addProv)
47
	pm.providers = make(map[key.Key]*providerSet)
48 49
	pm.getlocal = make(chan chan []key.Key)
	pm.local = make(map[key.Key]struct{})
50 51
	pm.proc = goprocessctx.WithContext(ctx)
	pm.proc.Go(func(p goprocess.Process) { pm.run() })
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52

53 54 55 56 57 58 59 60
	return pm
}

func (pm *ProviderManager) run() {
	tick := time.NewTicker(time.Hour)
	for {
		select {
		case np := <-pm.newprovs:
61
			if np.val == pm.lpeer {
62 63
				pm.local[np.k] = struct{}{}
			}
64 65 66 67 68 69
			provs, ok := pm.providers[np.k]
			if !ok {
				provs = newProviderSet()
				pm.providers[np.k] = provs
			}
			provs.Add(np.val)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70

71
		case gp := <-pm.getprovs:
72
			var parr []peer.ID
73 74 75
			provs, ok := pm.providers[gp.k]
			if ok {
				parr = provs.providers
76
			}
77

78
			gp.resp <- parr
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79

80
		case lc := <-pm.getlocal:
81
			var keys []key.Key
rht's avatar
rht committed
82
			for k := range pm.local {
83 84 85
				keys = append(keys, k)
			}
			lc <- keys
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86

87
		case <-tick.C:
88 89 90 91 92 93
			for _, provs := range pm.providers {
				var filtered []peer.ID
				for p, t := range provs.set {
					if time.Now().Sub(t) > time.Hour*24 {
						delete(provs.set, p)
					} else {
94 95 96
						filtered = append(filtered, p)
					}
				}
97
				provs.providers = filtered
98
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99

100
		case <-pm.proc.Closing():
101 102 103 104 105
			return
		}
	}
}

106
func (pm *ProviderManager) AddProvider(ctx context.Context, k key.Key, val peer.ID) {
107
	prov := &addProv{
108
		k:   k,
109 110
		val: val,
	}
111 112 113 114
	select {
	case pm.newprovs <- prov:
	case <-ctx.Done():
	}
115 116
}

117
func (pm *ProviderManager) GetProviders(ctx context.Context, k key.Key) []peer.ID {
118 119
	gp := &getProv{
		k:    k,
120
		resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
121
	}
122
	select {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
123 124
	case <-ctx.Done():
		return nil
125
	case pm.getprovs <- gp:
Brian Tiger Chow's avatar
Brian Tiger Chow committed
126 127
	}
	select {
128 129
	case <-ctx.Done():
		return nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
130 131
	case peers := <-gp.resp:
		return peers
132
	}
133
}
134

135 136
func (pm *ProviderManager) GetLocal() []key.Key {
	resp := make(chan []key.Key)
137 138 139
	pm.getlocal <- resp
	return <-resp
}
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154

func newProviderSet() *providerSet {
	return &providerSet{
		set: make(map[peer.ID]time.Time),
	}
}

func (ps *providerSet) Add(p peer.ID) {
	_, found := ps.set[p]
	if !found {
		ps.providers = append(ps.providers, p)
	}

	ps.set[p] = time.Now()
}