From 85273daaa505e4f876ce951513c869c04499a83f Mon Sep 17 00:00:00 2001 From: Jeromy <jeromyj@gmail.com> Date: Thu, 4 Sep 2014 20:32:46 +0000 Subject: [PATCH] allow peers to realize that they are actually a provider for a value --- routing/dht/dht.go | 26 +++++++++++++++++++++++++- routing/dht/providers.go | 23 ++++++++++++++++++++++- routing/dht/providers_test.go | 20 ++++++++++++++++++++ routing/dht/routing.go | 1 + 4 files changed, 68 insertions(+), 2 deletions(-) create mode 100644 routing/dht/providers_test.go diff --git a/routing/dht/dht.go b/routing/dht/dht.go index b1a6e59c9..7a88bebc0 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -2,6 +2,7 @@ package dht import ( "bytes" + "crypto/rand" "fmt" "sync" "time" @@ -59,7 +60,7 @@ func NewDHT(p *peer.Peer, net swarm.Network, dstore ds.Datastore) *IpfsDHT { dht.netChan = net.GetChannel(swarm.PBWrapper_DHT_MESSAGE) dht.datastore = dstore dht.self = p - dht.providers = NewProviderManager() + dht.providers = NewProviderManager(p.ID) dht.shutdown = make(chan struct{}) dht.routingTables = make([]*kb.RoutingTable, 3) @@ -293,7 +294,15 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) { Response: true, } + has, err := dht.datastore.Has(ds.NewKey(pmes.GetKey())) + if err != nil { + dht.netChan.Errors <- err + } + providers := dht.providers.GetProviders(u.Key(pmes.GetKey())) + if has { + providers = append(providers, dht.self) + } if providers == nil || len(providers) == 0 { level := 0 if len(pmes.GetValue()) > 0 { @@ -637,3 +646,18 @@ func (dht *IpfsDHT) peerFromInfo(pbp *PBDHTMessage_PBPeer) (*peer.Peer, error) { return dht.network.GetConnection(peer.ID(pbp.GetId()), maddr) } + +func (dht *IpfsDHT) loadProvidableKeys() error { + kl := dht.datastore.KeyList() + for _, k := range kl { + dht.providers.AddProvider(u.Key(k.Bytes()), dht.self) + } + return nil +} + +// Builds up list of peers by requesting random peer IDs +func (dht *IpfsDHT) Bootstrap() { + id := make([]byte, 16) + rand.Read(id) + dht.FindPeer(peer.ID(id), time.Second*10) +} diff --git a/routing/dht/providers.go b/routing/dht/providers.go index 2e89eea4c..c62755cf2 100644 --- a/routing/dht/providers.go +++ b/routing/dht/providers.go @@ -9,9 +9,13 @@ import ( type ProviderManager struct { providers map[u.Key][]*providerInfo + local map[u.Key]struct{} + lpeer peer.ID + getlocal chan chan []u.Key newprovs chan *addProv getprovs chan *getProv halt chan struct{} + period time.Duration } type addProv struct { @@ -24,11 +28,13 @@ type getProv struct { resp chan []*peer.Peer } -func NewProviderManager() *ProviderManager { +func NewProviderManager(local peer.ID) *ProviderManager { pm := new(ProviderManager) pm.getprovs = make(chan *getProv) pm.newprovs = make(chan *addProv) pm.providers = make(map[u.Key][]*providerInfo) + pm.getlocal = make(chan chan []u.Key) + pm.local = make(map[u.Key]struct{}) pm.halt = make(chan struct{}) go pm.run() return pm @@ -39,6 +45,9 @@ func (pm *ProviderManager) run() { for { select { case np := <-pm.newprovs: + if np.val.ID.Equal(pm.lpeer) { + pm.local[np.k] = struct{}{} + } pi := new(providerInfo) pi.Creation = time.Now() pi.Value = np.val @@ -51,6 +60,12 @@ func (pm *ProviderManager) run() { parr = append(parr, p.Value) } gp.resp <- parr + case lc := <-pm.getlocal: + var keys []u.Key + for k, _ := range pm.local { + keys = append(keys, k) + } + lc <- keys case <-tick.C: for k, provs := range pm.providers { var filtered []*providerInfo @@ -82,6 +97,12 @@ func (pm *ProviderManager) GetProviders(k u.Key) []*peer.Peer { return <-gp.resp } +func (pm *ProviderManager) GetLocal() []u.Key { + resp := make(chan []u.Key) + pm.getlocal <- resp + return <-resp +} + func (pm *ProviderManager) Halt() { pm.halt <- struct{}{} } diff --git a/routing/dht/providers_test.go b/routing/dht/providers_test.go new file mode 100644 index 000000000..0cdfa4fcc --- /dev/null +++ b/routing/dht/providers_test.go @@ -0,0 +1,20 @@ +package dht + +import ( + "testing" + + "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" +) + +func TestProviderManager(t *testing.T) { + mid := peer.ID("testing") + p := NewProviderManager(mid) + a := u.Key("test") + p.AddProvider(a, &peer.Peer{}) + resp := p.GetProviders(a) + if len(resp) != 1 { + t.Fatal("Could not retrieve provider.") + } + p.Halt() +} diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 082f737f5..6ceeb9a93 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -166,6 +166,7 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { // Provide makes this node announce that it can provide a value for the given key func (dht *IpfsDHT) Provide(key u.Key) error { + dht.providers.AddProvider(key, dht.self) peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize) if len(peers) == 0 { return kb.ErrLookupFailure -- GitLab