providers.go 2.43 KB
Newer Older
1 2 3 4 5 6
package dht

import (
	"time"

	peer "github.com/jbenet/go-ipfs/peer"
7
	u "github.com/jbenet/go-ipfs/util"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8 9 10
	ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
11 12 13 14
)

type ProviderManager struct {
	providers map[u.Key][]*providerInfo
15 16 17
	local     map[u.Key]struct{}
	lpeer     peer.ID
	getlocal  chan chan []u.Key
18 19
	newprovs  chan *addProv
	getprovs  chan *getProv
20
	period    time.Duration
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21
	ctxc.ContextCloser
22 23 24
}

type addProv struct {
25
	k   u.Key
26
	val peer.Peer
27 28 29
}

type getProv struct {
30
	k    u.Key
31
	resp chan []peer.Peer
32 33
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34
func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
35 36 37 38
	pm := new(ProviderManager)
	pm.getprovs = make(chan *getProv)
	pm.newprovs = make(chan *addProv)
	pm.providers = make(map[u.Key][]*providerInfo)
39 40
	pm.getlocal = make(chan chan []u.Key)
	pm.local = make(map[u.Key]struct{})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
41 42 43
	pm.ContextCloser = ctxc.NewContextCloser(ctx, nil)

	pm.Children().Add(1)
44
	go pm.run()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45

46 47 48 49
	return pm
}

func (pm *ProviderManager) run() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50 51
	defer pm.Children().Done()

52 53 54 55
	tick := time.NewTicker(time.Hour)
	for {
		select {
		case np := <-pm.newprovs:
56
			if np.val.ID().Equal(pm.lpeer) {
57 58
				pm.local[np.k] = struct{}{}
			}
59 60 61 62 63
			pi := new(providerInfo)
			pi.Creation = time.Now()
			pi.Value = np.val
			arr := pm.providers[np.k]
			pm.providers[np.k] = append(arr, pi)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64

65
		case gp := <-pm.getprovs:
66
			var parr []peer.Peer
67 68 69 70 71
			provs := pm.providers[gp.k]
			for _, p := range provs {
				parr = append(parr, p.Value)
			}
			gp.resp <- parr
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72

73 74 75 76 77 78
		case lc := <-pm.getlocal:
			var keys []u.Key
			for k, _ := range pm.local {
				keys = append(keys, k)
			}
			lc <- keys
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79

80 81 82 83
		case <-tick.C:
			for k, provs := range pm.providers {
				var filtered []*providerInfo
				for _, p := range provs {
84
					if time.Now().Sub(p.Creation) < time.Hour*24 {
85 86 87 88 89
						filtered = append(filtered, p)
					}
				}
				pm.providers[k] = filtered
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90 91

		case <-pm.Closing():
92 93 94 95 96
			return
		}
	}
}

97
func (pm *ProviderManager) AddProvider(k u.Key, val peer.Peer) {
98
	pm.newprovs <- &addProv{
99
		k:   k,
100 101 102 103
		val: val,
	}
}

104
func (pm *ProviderManager) GetProviders(ctx context.Context, k u.Key) []peer.Peer {
105 106 107 108
	gp := &getProv{
		k:    k,
		resp: make(chan []peer.Peer, 1), // buffered to prevent sender from blocking
	}
109 110 111 112 113 114
	select {
	case pm.getprovs <- gp:
		return <-gp.resp
	case <-ctx.Done():
		return nil
	}
115
}
116

117 118 119 120 121
func (pm *ProviderManager) GetLocal() []u.Key {
	resp := make(chan []u.Key)
	pm.getlocal <- resp
	return <-resp
}