Commit b28343cf authored by Brian Tiger Chow's avatar Brian Tiger Chow

refactor(bitswap) meslistener -> notifications

parent 455c6582
...@@ -3,6 +3,7 @@ package bitswap ...@@ -3,6 +3,7 @@ package bitswap
import ( import (
"time" "time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
...@@ -38,7 +39,7 @@ type BitSwap struct { ...@@ -38,7 +39,7 @@ type BitSwap struct {
// routing interface for communication // routing interface for communication
routing *dht.IpfsDHT routing *dht.IpfsDHT
listener *swarm.MessageListener notifications *notifications
// partners is a map of currently active bitswap relationships. // partners is a map of currently active bitswap relationships.
// The Ledger has the peer.ID, and the peer connection works through net. // The Ledger has the peer.ID, and the peer connection works through net.
...@@ -60,15 +61,15 @@ type BitSwap struct { ...@@ -60,15 +61,15 @@ type BitSwap struct {
// NewBitSwap creates a new BitSwap instance. It does not check its parameters. // NewBitSwap creates a new BitSwap instance. It does not check its parameters.
func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap { func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap {
bs := &BitSwap{ bs := &BitSwap{
peer: p, peer: p,
net: net, net: net,
datastore: d, datastore: d,
partners: LedgerMap{}, partners: LedgerMap{},
wantList: KeySet{}, wantList: KeySet{},
routing: r.(*dht.IpfsDHT), routing: r.(*dht.IpfsDHT),
meschan: net.GetChannel(swarm.PBWrapper_BITSWAP), meschan: net.GetChannel(swarm.PBWrapper_BITSWAP),
haltChan: make(chan struct{}), haltChan: make(chan struct{}),
listener: swarm.NewMessageListener(), notifications: newNotifications(),
} }
go bs.handleMessages() go bs.handleMessages()
...@@ -83,7 +84,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( ...@@ -83,7 +84,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
tleft := timeout - time.Now().Sub(begin) tleft := timeout - time.Now().Sub(begin)
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout) provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
valchan := make(chan []byte) blockChannel := make(chan *blocks.Block)
after := time.After(tleft) after := time.After(tleft)
// TODO: when the data is received, shut down this for loop ASAP // TODO: when the data is received, shut down this for loop ASAP
...@@ -96,7 +97,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( ...@@ -96,7 +97,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
return return
} }
select { select {
case valchan <- blk: case blockChannel <- blk:
default: default:
} }
}(p) }(p)
...@@ -104,31 +105,30 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( ...@@ -104,31 +105,30 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
}() }()
select { select {
case blkdata := <-valchan: case block := <-blockChannel:
close(valchan) close(blockChannel)
return blocks.NewBlock(blkdata) return block, nil
case <-after: case <-after:
return nil, u.ErrTimeout return nil, u.ErrTimeout
} }
} }
func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) { func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) {
u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty()) u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty())
ctx, _ := context.WithTimeout(context.Background(), timeout)
blockChannel := bs.notifications.Subscribe(ctx, k)
message := newMessage() message := newMessage()
message.AppendWanted(k) message.AppendWanted(k)
after := time.After(timeout)
resp := bs.listener.Listen(string(k), 1, timeout)
bs.meschan.Outgoing <- message.ToSwarm(p) bs.meschan.Outgoing <- message.ToSwarm(p)
select { block, ok := <-blockChannel
case resp_mes := <-resp: if !ok {
return resp_mes.Data, nil
case <-after:
u.PErr("getBlock for '%s' timed out.\n", k.Pretty()) u.PErr("getBlock for '%s' timed out.\n", k.Pretty())
return nil, u.ErrTimeout return nil, u.ErrTimeout
} }
return block, nil
} }
// HaveBlock announces the existance of a block to BitSwap, potentially sending // HaveBlock announces the existance of a block to BitSwap, potentially sending
...@@ -229,11 +229,7 @@ func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) { ...@@ -229,11 +229,7 @@ func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) {
return return
} }
mes := &swarm.Message{ bs.notifications.Publish(blk)
Peer: p,
Data: blk.Data,
}
bs.listener.Respond(string(blk.Key()), mes)
ledger := bs.getLedger(p) ledger := bs.getLedger(p)
ledger.ReceivedBytes(len(blk.Data)) ledger.ReceivedBytes(len(blk.Data))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment