Commit 8443b99c authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

update comments and reintroduce test

parent a159e682
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
blocks "github.com/ipfs/go-ipfs/blocks" blocks "github.com/ipfs/go-ipfs/blocks"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
p2ptestutil "github.com/ipfs/go-ipfs/p2p/test/util"
mockrouting "github.com/ipfs/go-ipfs/routing/mock" mockrouting "github.com/ipfs/go-ipfs/routing/mock"
delay "github.com/ipfs/go-ipfs/thirdparty/delay" delay "github.com/ipfs/go-ipfs/thirdparty/delay"
u "github.com/ipfs/go-ipfs/util" u "github.com/ipfs/go-ipfs/util"
...@@ -35,6 +36,28 @@ func TestClose(t *testing.T) { ...@@ -35,6 +36,28 @@ func TestClose(t *testing.T) {
bitswap.Exchange.GetBlock(context.Background(), block.Key()) bitswap.Exchange.GetBlock(context.Background(), block.Key())
} }
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
rs := mockrouting.NewServer()
net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
g := NewTestSessionGenerator(net)
defer g.Close()
block := blocks.NewBlock([]byte("block"))
pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
solo := g.Next()
defer solo.Exchange.Close()
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
_, err := solo.Exchange.GetBlock(ctx, block.Key())
if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error")
}
}
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
......
...@@ -46,8 +46,8 @@ type cancellation struct { ...@@ -46,8 +46,8 @@ type cancellation struct {
type msgQueue struct { type msgQueue struct {
p peer.ID p peer.ID
lk sync.Mutex outlk sync.Mutex
wlmsg bsmsg.BitSwapMessage out bsmsg.BitSwapMessage
work chan struct{} work chan struct{}
done chan struct{} done chan struct{}
...@@ -106,11 +106,11 @@ func (pm *PeerManager) runQueue(mq *msgQueue) { ...@@ -106,11 +106,11 @@ func (pm *PeerManager) runQueue(mq *msgQueue) {
// TODO: cant connect, what now? // TODO: cant connect, what now?
} }
// grab messages from queue // grab outgoin message
mq.lk.Lock() mq.outlk.Lock()
wlm := mq.wlmsg wlm := mq.out
mq.wlmsg = nil mq.out = nil
mq.lk.Unlock() mq.outlk.Unlock()
if wlm != nil && !wlm.Empty() { if wlm != nil && !wlm.Empty() {
// send wantlist updates // send wantlist updates
...@@ -178,26 +178,30 @@ func (pm *PeerManager) Run(ctx context.Context) { ...@@ -178,26 +178,30 @@ func (pm *PeerManager) Run(ctx context.Context) {
} }
func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) { func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) {
mq.lk.Lock() mq.outlk.Lock()
defer func() { defer func() {
mq.lk.Unlock() mq.outlk.Unlock()
select { select {
case mq.work <- struct{}{}: case mq.work <- struct{}{}:
default: default:
} }
}() }()
if mq.wlmsg == nil || msg.Full() { // if we have no message held, or the one we are given is full
mq.wlmsg = msg // overwrite the one we are holding
if mq.out == nil || msg.Full() {
mq.out = msg
return return
} }
// TODO: add a msg.Combine(...) method // TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
for _, e := range msg.Wantlist() { for _, e := range msg.Wantlist() {
if e.Cancel { if e.Cancel {
mq.wlmsg.Cancel(e.Key) mq.out.Cancel(e.Key)
} else { } else {
mq.wlmsg.AddEntry(e.Key, e.Priority) mq.out.AddEntry(e.Key, e.Priority)
} }
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment