Commit 2f6d94a8 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

fix swarm message type code, i beleive it works well now

parent d574d518
......@@ -28,6 +28,7 @@ type IpfsDHT struct {
routingTables []*kb.RoutingTable
network swarm.Network
netChan *swarm.Chan
// Local peer (yourself)
self *peer.Peer
......@@ -55,6 +56,7 @@ type IpfsDHT struct {
func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT {
dht := new(IpfsDHT)
dht.network = net
dht.netChan = net.GetChannel(swarm.PBWrapper_DHT_MESSAGE)
dht.datastore = ds.NewMapDatastore()
dht.self = p
dht.providers = NewProviderManager()
......@@ -101,10 +103,9 @@ func (dht *IpfsDHT) handleMessages() {
u.DOut("Begin message handling routine\n")
errs := dht.network.GetErrChan()
dhtmes := dht.network.GetChannel(swarm.PBWrapper_DHT_MESSAGE)
for {
select {
case mes, ok := <-dhtmes:
case mes, ok := <-dht.netChan.Incoming:
if !ok {
u.DOut("handleMessages closing, bad recv on incoming\n")
return
......@@ -165,7 +166,7 @@ func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) er
}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
dht.network.Send(mes)
dht.netChan.Outgoing <- mes
return nil
}
......@@ -225,7 +226,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
out:
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes)
dht.netChan.Outgoing <- mes
}
// Store a value in this peer local storage
......@@ -247,7 +248,7 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) {
ID: pmes.GetId(),
}
dht.network.Send(swarm.NewMessage(p, resp.ToProtobuf()))
dht.netChan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf())
}
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
......@@ -258,7 +259,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) {
}
defer func() {
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes)
dht.netChan.Outgoing <- mes
}()
level := pmes.GetValue()[0]
u.DOut("handleFindPeer: searching for '%s'\n", peer.ID(pmes.GetKey()).Pretty())
......@@ -310,7 +311,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes)
dht.netChan.Outgoing <- mes
}
type providerInfo struct {
......@@ -336,7 +337,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
for _, ps := range seq {
mes := swarm.NewMessage(ps, pmes)
dht.network.Send(mes)
dht.netChan.Outgoing <- mes
}
buf := new(bytes.Buffer)
......@@ -372,7 +373,7 @@ out:
}
mes := swarm.NewMessage(p, resp.ToProtobuf())
dht.network.Send(mes)
dht.netChan.Outgoing <- mes
}
func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) {
......@@ -429,7 +430,7 @@ func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duratio
mes := swarm.NewMessage(p, pmes.ToProtobuf())
t := time.Now()
dht.network.Send(mes)
dht.netChan.Outgoing <- mes
// Wait for either the response or a timeout
timeup := time.After(timeout)
......@@ -545,7 +546,7 @@ func (dht *IpfsDHT) findPeerSingle(p *peer.Peer, id peer.ID, timeout time.Durati
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
t := time.Now()
dht.network.Send(mes)
dht.netChan.Outgoing <- mes
after := time.After(timeout)
select {
case <-after:
......@@ -581,7 +582,7 @@ func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, time
mes := swarm.NewMessage(p, pmes.ToProtobuf())
listenChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
dht.network.Send(mes)
dht.netChan.Outgoing <- mes
after := time.After(timeout)
select {
case <-after:
......
......@@ -70,8 +70,8 @@ func (f *fauxNet) GetErrChan() chan error {
return f.Chan.Errors
}
func (f *fauxNet) GetChannel(t swarm.PBWrapper_MessageType) chan *swarm.Message {
return f.Chan.Incoming
func (f *fauxNet) GetChannel(t swarm.PBWrapper_MessageType) *swarm.Chan {
return f.Chan
}
func (f *fauxNet) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
......
......@@ -240,7 +240,7 @@ func (dht *IpfsDHT) Provide(key u.Key) error {
for _, p := range peers {
mes := swarm.NewMessage(p, pbmes)
dht.network.Send(mes)
dht.netChan.Outgoing <- mes
}
return nil
}
......@@ -352,7 +352,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
before := time.Now()
responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
dht.network.Send(mes)
dht.netChan.Outgoing <- mes
tout := time.After(timeout)
select {
......@@ -385,7 +385,7 @@ func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
pbmes := pmes.ToProtobuf()
for _, p := range targets {
mes := swarm.NewMessage(p, pbmes)
dht.network.Send(mes)
dht.netChan.Outgoing <- mes
}
var out []*diagInfo
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment