Commit 16e05fc4 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

update comments and reintroduce test

parent 5efc7f69
......@@ -13,6 +13,7 @@ import (
blocks "github.com/ipfs/go-ipfs/blocks"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
delay "github.com/ipfs/go-ipfs/thirdparty/delay"
u "github.com/ipfs/go-ipfs/util"
......@@ -35,6 +36,28 @@ func TestClose(t *testing.T) {
bitswap.Exchange.GetBlock(context.Background(), block.Key())
}
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
rs := mockrouting.NewServer()
net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
g := NewTestSessionGenerator(net)
defer g.Close()
block := blocks.NewBlock([]byte("block"))
pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
solo := g.Next()
defer solo.Exchange.Close()
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
_, err := solo.Exchange.GetBlock(ctx, block.Key())
if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error")
}
}
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
......
......@@ -46,8 +46,8 @@ type cancellation struct {
type msgQueue struct {
p peer.ID
lk sync.Mutex
wlmsg bsmsg.BitSwapMessage
outlk sync.Mutex
out bsmsg.BitSwapMessage
work chan struct{}
done chan struct{}
......@@ -106,11 +106,11 @@ func (pm *PeerManager) runQueue(mq *msgQueue) {
// TODO: cant connect, what now?
}
// grab messages from queue
mq.lk.Lock()
wlm := mq.wlmsg
mq.wlmsg = nil
mq.lk.Unlock()
// grab outgoin message
mq.outlk.Lock()
wlm := mq.out
mq.out = nil
mq.outlk.Unlock()
if wlm != nil && !wlm.Empty() {
// send wantlist updates
......@@ -178,26 +178,30 @@ func (pm *PeerManager) Run(ctx context.Context) {
}
func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) {
mq.lk.Lock()
mq.outlk.Lock()
defer func() {
mq.lk.Unlock()
mq.outlk.Unlock()
select {
case mq.work <- struct{}{}:
default:
}
}()
if mq.wlmsg == nil || msg.Full() {
mq.wlmsg = msg
// if we have no message held, or the one we are given is full
// overwrite the one we are holding
if mq.out == nil || msg.Full() {
mq.out = msg
return
}
// TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
for _, e := range msg.Wantlist() {
if e.Cancel {
mq.wlmsg.Cancel(e.Key)
mq.out.Cancel(e.Key)
} else {
mq.wlmsg.AddEntry(e.Key, e.Priority)
mq.out.AddEntry(e.Key, e.Priority)
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment