Commit f054a9ed authored by Jeromy's avatar Jeromy

add files i forgot to last night

parent f85c3338
package dht
import (
"sync"
"time"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
)
type MesListener struct {
listeners map[uint64]*listenInfo
haltchan chan struct{}
unlist chan uint64
nlist chan *listenInfo
send chan *respMes
}
// The listen info struct holds information about a message that is being waited for
type listenInfo struct {
// Responses matching the listen ID will be sent through resp
resp chan *swarm.Message
// count is the number of responses to listen for
count int
// eol is the time at which this listener will expire
eol time.Time
// sendlock is used to prevent conditions where we try to send on the resp
// channel as its being closed by a timeout in another thread
sendLock sync.Mutex
closed bool
id uint64
}
func NewMesListener() *MesListener {
ml := new(MesListener)
ml.haltchan = make(chan struct{})
ml.listeners = make(map[uint64]*listenInfo)
ml.nlist = make(chan *listenInfo, 16)
ml.send = make(chan *respMes, 16)
ml.unlist = make(chan uint64, 16)
go ml.run()
return ml
}
func (ml *MesListener) Listen(id uint64, count int, timeout time.Duration) <-chan *swarm.Message {
li := new(listenInfo)
li.count = count
li.eol = time.Now().Add(timeout)
li.resp = make(chan *swarm.Message, count)
li.id = id
ml.nlist <- li
return li.resp
}
func (ml *MesListener) Unlisten(id uint64) {
ml.unlist <- id
}
type respMes struct {
id uint64
mes *swarm.Message
}
func (ml *MesListener) Respond(id uint64, mes *swarm.Message) {
ml.send <- &respMes{
id: id,
mes: mes,
}
}
func (ml *MesListener) Halt() {
ml.haltchan <- struct{}{}
}
func (ml *MesListener) run() {
for {
select {
case <-ml.haltchan:
return
case id := <-ml.unlist:
trg, ok := ml.listeners[id]
if !ok {
continue
}
close(trg.resp)
delete(ml.listeners, id)
case li := <-ml.nlist:
ml.listeners[li.id] = li
case s := <-ml.send:
trg, ok := ml.listeners[s.id]
if !ok {
u.DOut("Send with no listener.")
continue
}
if time.Now().After(trg.eol) {
close(trg.resp)
delete(ml.listeners, s.id)
continue
}
trg.resp <- s.mes
trg.count--
if trg.count == 0 {
close(trg.resp)
delete(ml.listeners, s.id)
}
}
}
}
......@@ -21,6 +21,12 @@ import (
// Pool size is the number of nodes used for group find/set RPC calls
var PoolSize = 6
// We put the 'K' in kademlia!
var KValue = 10
// Its in the paper, i swear
var AlphaValue = 3
// TODO: determine a way of creating and managing message IDs
func GenerateMessageID() uint64 {
//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
......@@ -35,24 +41,25 @@ func GenerateMessageID() uint64 {
// This is the top level "Store" operation of the DHT
func (s *IpfsDHT) PutValue(key u.Key, value []byte) {
complete := make(chan struct{})
count := 0
for _, route := range s.routes {
p := route.NearestPeer(kb.ConvertKey(key))
if p == nil {
s.network.Error(kb.ErrLookupFailure)
go func() {
peers := route.NearestPeers(kb.ConvertKey(key), KValue)
for _, p := range peers {
if p == nil {
s.network.Error(kb.ErrLookupFailure)
continue
}
count++
go func(sp *peer.Peer) {
err := s.putValueToNetwork(sp, string(key), value)
if err != nil {
s.network.Error(err)
}
complete <- struct{}{}
}()
continue
}(p)
}
go func() {
err := s.putValueToNetwork(p, string(key), value)
if err != nil {
s.network.Error(err)
}
complete <- struct{}{}
}()
}
for _, _ = range s.routes {
for i := 0; i < count; i++ {
<-complete
}
}
......@@ -150,15 +157,13 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
c := counter{}
// This limit value is referred to as k in the kademlia paper
limit := 20
count := 0
go func() {
for {
select {
case p := <-npeer_chan:
count++
if count >= limit {
if count >= KValue {
break
}
c.Increment()
......@@ -194,7 +199,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
for _, np := range peers {
// TODO: filter out peers that arent closer
if !pset.Contains(np) && pset.Size() < limit {
if !pset.Contains(np) && pset.Size() < KValue {
pset.Add(np) //This is racey... make a single function to do operation
npeer_chan <- np
}
......@@ -204,8 +209,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
}
}
concurFactor := 3
for i := 0; i < concurFactor; i++ {
for i := 0; i < AlphaValue; i++ {
go process()
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment