Commit a3f0d585 authored by Jeromy's avatar Jeromy

allow peers to realize that they are actually a provider for a value

parent f81ae37f
...@@ -2,6 +2,7 @@ package dht ...@@ -2,6 +2,7 @@ package dht
import ( import (
"bytes" "bytes"
"crypto/rand"
"fmt" "fmt"
"sync" "sync"
"time" "time"
...@@ -59,7 +60,7 @@ func NewDHT(p *peer.Peer, net swarm.Network, dstore ds.Datastore) *IpfsDHT { ...@@ -59,7 +60,7 @@ func NewDHT(p *peer.Peer, net swarm.Network, dstore ds.Datastore) *IpfsDHT {
dht.netChan = net.GetChannel(swarm.PBWrapper_DHT_MESSAGE) dht.netChan = net.GetChannel(swarm.PBWrapper_DHT_MESSAGE)
dht.datastore = dstore dht.datastore = dstore
dht.self = p dht.self = p
dht.providers = NewProviderManager() dht.providers = NewProviderManager(p.ID)
dht.shutdown = make(chan struct{}) dht.shutdown = make(chan struct{})
dht.routingTables = make([]*kb.RoutingTable, 3) dht.routingTables = make([]*kb.RoutingTable, 3)
...@@ -293,7 +294,15 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) { ...@@ -293,7 +294,15 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
Response: true, Response: true,
} }
has, err := dht.datastore.Has(ds.NewKey(pmes.GetKey()))
if err != nil {
dht.netChan.Errors <- err
}
providers := dht.providers.GetProviders(u.Key(pmes.GetKey())) providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
if has {
providers = append(providers, dht.self)
}
if providers == nil || len(providers) == 0 { if providers == nil || len(providers) == 0 {
level := 0 level := 0
if len(pmes.GetValue()) > 0 { if len(pmes.GetValue()) > 0 {
...@@ -637,3 +646,18 @@ func (dht *IpfsDHT) peerFromInfo(pbp *PBDHTMessage_PBPeer) (*peer.Peer, error) { ...@@ -637,3 +646,18 @@ func (dht *IpfsDHT) peerFromInfo(pbp *PBDHTMessage_PBPeer) (*peer.Peer, error) {
return dht.network.GetConnection(peer.ID(pbp.GetId()), maddr) return dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
} }
func (dht *IpfsDHT) loadProvidableKeys() error {
kl := dht.datastore.KeyList()
for _, k := range kl {
dht.providers.AddProvider(u.Key(k.Bytes()), dht.self)
}
return nil
}
// Builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap() {
id := make([]byte, 16)
rand.Read(id)
dht.FindPeer(peer.ID(id), time.Second*10)
}
...@@ -9,9 +9,13 @@ import ( ...@@ -9,9 +9,13 @@ import (
type ProviderManager struct { type ProviderManager struct {
providers map[u.Key][]*providerInfo providers map[u.Key][]*providerInfo
local map[u.Key]struct{}
lpeer peer.ID
getlocal chan chan []u.Key
newprovs chan *addProv newprovs chan *addProv
getprovs chan *getProv getprovs chan *getProv
halt chan struct{} halt chan struct{}
period time.Duration
} }
type addProv struct { type addProv struct {
...@@ -24,11 +28,13 @@ type getProv struct { ...@@ -24,11 +28,13 @@ type getProv struct {
resp chan []*peer.Peer resp chan []*peer.Peer
} }
func NewProviderManager() *ProviderManager { func NewProviderManager(local peer.ID) *ProviderManager {
pm := new(ProviderManager) pm := new(ProviderManager)
pm.getprovs = make(chan *getProv) pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv) pm.newprovs = make(chan *addProv)
pm.providers = make(map[u.Key][]*providerInfo) pm.providers = make(map[u.Key][]*providerInfo)
pm.getlocal = make(chan chan []u.Key)
pm.local = make(map[u.Key]struct{})
pm.halt = make(chan struct{}) pm.halt = make(chan struct{})
go pm.run() go pm.run()
return pm return pm
...@@ -39,6 +45,9 @@ func (pm *ProviderManager) run() { ...@@ -39,6 +45,9 @@ func (pm *ProviderManager) run() {
for { for {
select { select {
case np := <-pm.newprovs: case np := <-pm.newprovs:
if np.val.ID.Equal(pm.lpeer) {
pm.local[np.k] = struct{}{}
}
pi := new(providerInfo) pi := new(providerInfo)
pi.Creation = time.Now() pi.Creation = time.Now()
pi.Value = np.val pi.Value = np.val
...@@ -51,6 +60,12 @@ func (pm *ProviderManager) run() { ...@@ -51,6 +60,12 @@ func (pm *ProviderManager) run() {
parr = append(parr, p.Value) parr = append(parr, p.Value)
} }
gp.resp <- parr gp.resp <- parr
case lc := <-pm.getlocal:
var keys []u.Key
for k, _ := range pm.local {
keys = append(keys, k)
}
lc <- keys
case <-tick.C: case <-tick.C:
for k, provs := range pm.providers { for k, provs := range pm.providers {
var filtered []*providerInfo var filtered []*providerInfo
...@@ -82,6 +97,12 @@ func (pm *ProviderManager) GetProviders(k u.Key) []*peer.Peer { ...@@ -82,6 +97,12 @@ func (pm *ProviderManager) GetProviders(k u.Key) []*peer.Peer {
return <-gp.resp return <-gp.resp
} }
func (pm *ProviderManager) GetLocal() []u.Key {
resp := make(chan []u.Key)
pm.getlocal <- resp
return <-resp
}
func (pm *ProviderManager) Halt() { func (pm *ProviderManager) Halt() {
pm.halt <- struct{}{} pm.halt <- struct{}{}
} }
package dht
import (
"testing"
"github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
func TestProviderManager(t *testing.T) {
mid := peer.ID("testing")
p := NewProviderManager(mid)
a := u.Key("test")
p.AddProvider(a, &peer.Peer{})
resp := p.GetProviders(a)
if len(resp) != 1 {
t.Fatal("Could not retrieve provider.")
}
p.Halt()
}
...@@ -166,6 +166,7 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { ...@@ -166,6 +166,7 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
// Provide makes this node announce that it can provide a value for the given key // Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(key u.Key) error { func (dht *IpfsDHT) Provide(key u.Key) error {
dht.providers.AddProvider(key, dht.self)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize) peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize)
if len(peers) == 0 { if len(peers) == 0 {
return kb.ErrLookupFailure return kb.ErrLookupFailure
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment