Commit 35ed8c46 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

change providers map and lock over to an agent based approach for managing providers

parent 3c1580e1
......@@ -36,9 +36,7 @@ type IpfsDHT struct {
datastore ds.Datastore
dslock sync.Mutex
// Map keys to peers that can provide their value
providers map[u.Key][]*providerInfo
providerLock sync.RWMutex
providers *ProviderManager
// Signal to shutdown dht
shutdown chan struct{}
......@@ -59,7 +57,7 @@ func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT {
dht.network = net
dht.datastore = ds.NewMapDatastore()
dht.self = p
dht.providers = make(map[u.Key][]*providerInfo)
dht.providers = NewProviderManager()
dht.shutdown = make(chan struct{})
dht.routingTables = make([]*kb.RoutingTable, 3)
......@@ -102,7 +100,6 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
func (dht *IpfsDHT) handleMessages() {
u.DOut("Begin message handling routine")
checkTimeouts := time.NewTicker(time.Minute * 5)
ch := dht.network.GetChan()
for {
select {
......@@ -146,34 +143,18 @@ func (dht *IpfsDHT) handleMessages() {
dht.handlePing(mes.Peer, pmes)
case PBDHTMessage_DIAGNOSTIC:
dht.handleDiagnostic(mes.Peer, pmes)
default:
u.PErr("Recieved invalid message type")
}
case err := <-ch.Errors:
u.PErr("dht err: %s\n", err)
case <-dht.shutdown:
checkTimeouts.Stop()
return
case <-checkTimeouts.C:
// Time to collect some garbage!
dht.cleanExpiredProviders()
}
}
}
func (dht *IpfsDHT) cleanExpiredProviders() {
dht.providerLock.Lock()
for k, parr := range dht.providers {
var cleaned []*providerInfo
for _, v := range parr {
if time.Since(v.Creation) < time.Hour {
cleaned = append(cleaned, v)
}
}
dht.providers[k] = cleaned
}
dht.providerLock.Unlock()
}
func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error {
pmes := Message{
Type: PBDHTMessage_PUT_VALUE,
......@@ -202,14 +183,10 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
resp.Value = iVal.([]byte)
} else if err == ds.ErrNotFound {
// Check if we know any providers for the requested value
dht.providerLock.RLock()
provs, ok := dht.providers[u.Key(pmes.GetKey())]
dht.providerLock.RUnlock()
if ok && len(provs) > 0 {
provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
if len(provs) > 0 {
u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
for _, prov := range provs {
resp.Peers = append(resp.Peers, prov.Value)
}
resp.Peers = provs
resp.Success = true
} else {
// No providers?
......@@ -313,9 +290,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
Response: true,
}
dht.providerLock.RLock()
providers := dht.providers[u.Key(pmes.GetKey())]
dht.providerLock.RUnlock()
providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
if providers == nil || len(providers) == 0 {
level := 0
if len(pmes.GetValue()) > 0 {
......@@ -329,9 +304,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
resp.Peers = []*peer.Peer{closer}
}
} else {
for _, prov := range providers {
resp.Peers = append(resp.Peers, prov.Value)
}
resp.Peers = providers
resp.Success = true
}
......@@ -345,9 +318,8 @@ type providerInfo struct {
}
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) {
//TODO: need to implement TTLs on providers
key := u.Key(pmes.GetKey())
dht.addProviderEntry(key, p)
dht.providers.AddProvider(key, p)
}
// Halt stops all communications from this peer and shut down
......@@ -356,14 +328,6 @@ func (dht *IpfsDHT) Halt() {
dht.network.Close()
}
func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
u.DOut("Adding %s as provider for '%s'\n", p.Key().Pretty(), key)
dht.providerLock.Lock()
provs := dht.providers[key]
dht.providers[key] = append(provs, &providerInfo{time.Now(), p})
dht.providerLock.Unlock()
}
// NOTE: not yet finished, low priority
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
......@@ -514,7 +478,7 @@ func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
u.DErr("getFromPeers error: %s\n", err)
continue
}
dht.addProviderEntry(key, p)
dht.providers.AddProvider(key, p)
// Make sure it was a successful get
if pmes.GetSuccess() && pmes.Value != nil {
......@@ -656,7 +620,7 @@ func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer
continue
}
}
dht.addProviderEntry(key, p)
dht.providers.AddProvider(key, p)
provArr = append(provArr, p)
}
return provArr
......
package dht
import (
"time"
u "github.com/jbenet/go-ipfs/util"
peer "github.com/jbenet/go-ipfs/peer"
)
type ProviderManager struct {
providers map[u.Key][]*providerInfo
newprovs chan *addProv
getprovs chan *getProv
halt chan struct{}
}
type addProv struct {
k u.Key
val *peer.Peer
}
type getProv struct {
k u.Key
resp chan []*peer.Peer
}
func NewProviderManager() *ProviderManager {
pm := new(ProviderManager)
pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv)
pm.providers = make(map[u.Key][]*providerInfo)
pm.halt = make(chan struct{})
go pm.run()
return pm
}
func (pm *ProviderManager) run() {
tick := time.NewTicker(time.Hour)
for {
select {
case np := <-pm.newprovs:
pi := new(providerInfo)
pi.Creation = time.Now()
pi.Value = np.val
arr := pm.providers[np.k]
pm.providers[np.k] = append(arr, pi)
case gp := <-pm.getprovs:
var parr []*peer.Peer
provs := pm.providers[gp.k]
for _, p := range provs {
parr = append(parr, p.Value)
}
gp.resp <- parr
case <-tick.C:
for k, provs := range pm.providers {
var filtered []*providerInfo
for _, p := range provs {
if time.Now().Sub(p.Creation) < time.Hour * 24 {
filtered = append(filtered, p)
}
}
pm.providers[k] = filtered
}
case <-pm.halt:
return
}
}
}
func (pm *ProviderManager) AddProvider(k u.Key, val *peer.Peer) {
pm.newprovs <- &addProv{
k: k,
val: val,
}
}
func (pm *ProviderManager) GetProviders(k u.Key) []*peer.Peer {
gp := new(getProv)
gp.k = k
gp.resp = make(chan []*peer.Peer)
pm.getprovs <- gp
return <-gp.resp
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment