Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-bitswap
Commits
4ba17a0f
Commit
4ba17a0f
authored
May 16, 2015
by
Jeromy
Committed by
Juan Batiz-Benet
May 21, 2015
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
WIP: super awesome bitswap cleanup fixtime
parent
440377e2
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
191 additions
and
209 deletions
+191
-209
bitswap.go
bitswap.go
+26
-108
bitswap_test.go
bitswap_test.go
+11
-3
decision/engine.go
decision/engine.go
+12
-4
decision/peer_request_queue.go
decision/peer_request_queue.go
+12
-6
network/interface.go
network/interface.go
+1
-1
peermanager.go
peermanager.go
+102
-50
testnet/network_test.go
testnet/network_test.go
+6
-10
testnet/virtual.go
testnet/virtual.go
+2
-1
workers.go
workers.go
+19
-26
No files found.
bitswap.go
View file @
4ba17a0f
...
...
@@ -4,7 +4,6 @@ package bitswap
import
(
"errors"
"fmt"
"math"
"sync"
"time"
...
...
@@ -23,7 +22,6 @@ import (
"github.com/ipfs/go-ipfs/thirdparty/delay"
eventlog
"github.com/ipfs/go-ipfs/thirdparty/eventlog"
u
"github.com/ipfs/go-ipfs/util"
pset
"github.com/ipfs/go-ipfs/util/peerset"
// TODO move this to peerstore
)
var
log
=
eventlog
.
Logger
(
"bitswap"
)
...
...
@@ -45,9 +43,7 @@ const (
provideWorkers
=
4
)
var
(
rebroadcastDelay
=
delay
.
Fixed
(
time
.
Second
*
10
)
)
var
rebroadcastDelay
=
delay
.
Fixed
(
time
.
Second
*
10
)
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
...
...
@@ -86,14 +82,13 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
notifications
:
notif
,
engine
:
decision
.
NewEngine
(
ctx
,
bstore
),
// TODO close the engine with Close() method
network
:
network
,
wantlist
:
wantlist
.
NewThreadSafe
(),
batchRequests
:
make
(
chan
*
blockRequest
,
sizeBatchRequestChan
),
process
:
px
,
newBlocks
:
make
(
chan
*
blocks
.
Block
,
HasBlockBufferSize
),
provideKeys
:
make
(
chan
u
.
Key
),
p
m
:
New
Peer
Manager
(
network
),
w
m
:
New
Want
Manager
(
network
),
}
go
bs
.
p
m
.
Run
(
ctx
)
go
bs
.
w
m
.
Run
(
ctx
)
network
.
SetDelegate
(
bs
)
// Start up bitswaps async worker routines
...
...
@@ -112,7 +107,7 @@ type Bitswap struct {
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
p
m
*
Peer
Manager
w
m
*
Want
Manager
// blockstore is the local database
// NB: ensure threadsafety
...
...
@@ -127,8 +122,6 @@ type Bitswap struct {
engine
*
decision
.
Engine
wantlist
*
wantlist
.
ThreadSafe
process
process
.
Process
newBlocks
chan
*
blocks
.
Block
...
...
@@ -233,60 +226,21 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return
err
}
bs
.
wantlist
.
Remove
(
blk
.
Key
())
bs
.
notifications
.
Publish
(
blk
)
select
{
case
bs
.
newBlocks
<-
blk
:
// send block off to be reprovided
case
<-
ctx
.
Done
()
:
return
ctx
.
Err
()
}
return
nil
}
func
(
bs
*
Bitswap
)
sendWantlistMsgToPeers
(
ctx
context
.
Context
,
m
bsmsg
.
BitSwapMessage
,
peers
<-
chan
peer
.
ID
)
error
{
set
:=
pset
.
New
()
loop
:
for
{
select
{
case
peerToQuery
,
ok
:=
<-
peers
:
if
!
ok
{
break
loop
}
if
!
set
.
TryAdd
(
peerToQuery
)
{
//Do once per peer
continue
}
bs
.
pm
.
Send
(
peerToQuery
,
m
)
case
<-
ctx
.
Done
()
:
return
nil
}
}
return
nil
}
func
(
bs
*
Bitswap
)
sendWantlistToPeers
(
ctx
context
.
Context
,
peers
<-
chan
peer
.
ID
)
error
{
entries
:=
bs
.
wantlist
.
Entries
()
if
len
(
entries
)
==
0
{
return
nil
}
message
:=
bsmsg
.
New
()
message
.
SetFull
(
true
)
for
_
,
wanted
:=
range
entries
{
message
.
AddEntry
(
wanted
.
Key
,
wanted
.
Priority
)
}
return
bs
.
sendWantlistMsgToPeers
(
ctx
,
message
,
peers
)
}
func
(
bs
*
Bitswap
)
sendWantlistToProviders
(
ctx
context
.
Context
,
entries
[]
wantlist
.
Entry
)
{
func
(
bs
*
Bitswap
)
connectToProviders
(
ctx
context
.
Context
,
entries
[]
wantlist
.
Entry
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
defer
cancel
()
// prepare a channel to hand off to sendWantlistToPeers
sendToPeers
:=
make
(
chan
peer
.
ID
)
// Get providers for all entries in wantlist (could take a while)
wg
:=
sync
.
WaitGroup
{}
for
_
,
e
:=
range
entries
{
...
...
@@ -298,97 +252,61 @@ func (bs *Bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
defer
cancel
()
providers
:=
bs
.
network
.
FindProvidersAsync
(
child
,
k
,
maxProvidersPerRequest
)
for
prov
:=
range
providers
{
sendToPeers
<-
prov
go
func
(
p
peer
.
ID
)
{
bs
.
network
.
ConnectTo
(
ctx
,
p
)
}(
prov
)
}
}(
e
.
Key
)
}
go
func
()
{
wg
.
Wait
()
// make sure all our children do finish.
close
(
sendToPeers
)
}()
err
:=
bs
.
sendWantlistToPeers
(
ctx
,
sendToPeers
)
if
err
!=
nil
{
log
.
Debugf
(
"sendWantlistToPeers error: %s"
,
err
)
}
wg
.
Wait
()
// make sure all our children do finish.
}
// TODO(brian): handle errors
func
(
bs
*
Bitswap
)
ReceiveMessage
(
ctx
context
.
Context
,
p
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
error
{
func
(
bs
*
Bitswap
)
ReceiveMessage
(
ctx
context
.
Context
,
p
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
{
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs
.
engine
.
MessageReceived
(
p
,
incoming
)
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger
if
len
(
incoming
.
Blocks
())
==
0
{
return
}
// quickly send out cancels, reduces chances of duplicate block receives
var
keys
[]
u
.
Key
for
_
,
block
:=
range
incoming
.
Blocks
()
{
keys
=
append
(
keys
,
block
.
Key
())
}
bs
.
wm
.
CancelWants
(
keys
)
for
_
,
block
:=
range
incoming
.
Blocks
()
{
bs
.
blocksRecvd
++
if
has
,
err
:=
bs
.
blockstore
.
Has
(
block
.
Key
());
err
==
nil
&&
has
{
bs
.
dupBlocksRecvd
++
}
log
.
Debugf
(
"got block %s from %s"
,
block
,
p
)
hasBlockCtx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
hasBlockTimeout
)
if
err
:=
bs
.
HasBlock
(
hasBlockCtx
,
block
);
err
!=
nil
{
return
fmt
.
Error
f
(
"ReceiveMessage HasBlock error: %s"
,
err
)
log
.
Warning
f
(
"ReceiveMessage HasBlock error: %s"
,
err
)
}
cancel
()
keys
=
append
(
keys
,
block
.
Key
())
}
bs
.
cancelBlocks
(
ctx
,
keys
)
return
nil
}
// Connected/Disconnected warns bitswap about peer connections
func
(
bs
*
Bitswap
)
PeerConnected
(
p
peer
.
ID
)
{
// TODO: add to clientWorker??
bs
.
pm
.
Connected
(
p
)
peers
:=
make
(
chan
peer
.
ID
,
1
)
peers
<-
p
close
(
peers
)
err
:=
bs
.
sendWantlistToPeers
(
context
.
TODO
(),
peers
)
if
err
!=
nil
{
log
.
Debugf
(
"error sending wantlist: %s"
,
err
)
}
bs
.
wm
.
Connected
(
p
)
}
// Connected/Disconnected warns bitswap about peer connections
func
(
bs
*
Bitswap
)
PeerDisconnected
(
p
peer
.
ID
)
{
bs
.
p
m
.
Disconnected
(
p
)
bs
.
w
m
.
Disconnected
(
p
)
bs
.
engine
.
PeerDisconnected
(
p
)
}
func
(
bs
*
Bitswap
)
cancelBlocks
(
ctx
context
.
Context
,
bkeys
[]
u
.
Key
)
{
if
len
(
bkeys
)
<
1
{
return
}
message
:=
bsmsg
.
New
()
message
.
SetFull
(
false
)
for
_
,
k
:=
range
bkeys
{
log
.
Debug
(
"cancel block: %s"
,
k
)
message
.
Cancel
(
k
)
}
bs
.
pm
.
Broadcast
(
message
)
return
}
func
(
bs
*
Bitswap
)
wantNewBlocks
(
ctx
context
.
Context
,
bkeys
[]
u
.
Key
)
{
if
len
(
bkeys
)
<
1
{
return
}
message
:=
bsmsg
.
New
()
message
.
SetFull
(
false
)
for
i
,
k
:=
range
bkeys
{
message
.
AddEntry
(
k
,
kMaxPriority
-
i
)
}
bs
.
pm
.
Broadcast
(
message
)
}
func
(
bs
*
Bitswap
)
ReceiveError
(
err
error
)
{
log
.
Debugf
(
"Bitswap ReceiveError: %s"
,
err
)
// TODO log the network error
...
...
@@ -401,7 +319,7 @@ func (bs *Bitswap) Close() error {
func
(
bs
*
Bitswap
)
GetWantlist
()
[]
u
.
Key
{
var
out
[]
u
.
Key
for
_
,
e
:=
range
bs
.
w
antlist
.
Entries
()
{
for
_
,
e
:=
range
bs
.
w
m
.
wl
.
Entries
()
{
out
=
append
(
out
,
e
.
Key
)
}
return
out
...
...
bitswap_test.go
View file @
4ba17a0f
...
...
@@ -120,6 +120,16 @@ func TestLargeFile(t *testing.T) {
PerformDistributionTest
(
t
,
numInstances
,
numBlocks
)
}
func
TestLargeFileTwoPeers
(
t
*
testing
.
T
)
{
if
testing
.
Short
()
{
t
.
SkipNow
()
}
t
.
Parallel
()
numInstances
:=
2
numBlocks
:=
100
PerformDistributionTest
(
t
,
numInstances
,
numBlocks
)
}
func
PerformDistributionTest
(
t
*
testing
.
T
,
numInstances
,
numBlocks
int
)
{
if
testing
.
Short
()
{
t
.
SkipNow
()
...
...
@@ -129,8 +139,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
defer
sg
.
Close
()
bg
:=
blocksutil
.
NewBlockGenerator
()
t
.
Log
(
"Test a few nodes trying to get one file with a lot of blocks"
)
instances
:=
sg
.
Instances
(
numInstances
)
blocks
:=
bg
.
Blocks
(
numBlocks
)
...
...
@@ -238,7 +246,7 @@ func TestBasicBitswap(t *testing.T) {
defer
sg
.
Close
()
bg
:=
blocksutil
.
NewBlockGenerator
()
t
.
Log
(
"Test a
few
node
s
trying to get one
file with a lot of blocks
"
)
t
.
Log
(
"Test a
one
node trying to get one
block from another
"
)
instances
:=
sg
.
Instances
(
2
)
blocks
:=
bg
.
Blocks
(
1
)
...
...
decision/engine.go
View file @
4ba17a0f
...
...
@@ -92,7 +92,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
bs
:
bs
,
peerRequestQueue
:
newPRQ
(),
outbox
:
make
(
chan
(
<-
chan
*
Envelope
),
outboxChanBuffer
),
workSignal
:
make
(
chan
struct
{}),
workSignal
:
make
(
chan
struct
{}
,
1
),
}
go
e
.
taskWorker
(
ctx
)
return
e
...
...
@@ -156,7 +156,15 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
return
&
Envelope
{
Peer
:
nextTask
.
Target
,
Block
:
block
,
Sent
:
nextTask
.
Done
,
Sent
:
func
()
{
nextTask
.
Done
()
select
{
case
e
.
workSignal
<-
struct
{}{}
:
// work completing may mean that our queue will provide new
// work to be done.
default
:
}
},
},
nil
}
}
...
...
@@ -202,11 +210,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
for
_
,
entry
:=
range
m
.
Wantlist
()
{
if
entry
.
Cancel
{
log
.
Debug
f
(
"cancel %s"
,
entry
.
Key
)
log
.
Error
f
(
"cancel %s"
,
entry
.
Key
)
l
.
CancelWant
(
entry
.
Key
)
e
.
peerRequestQueue
.
Remove
(
entry
.
Key
,
p
)
}
else
{
log
.
Debug
f
(
"wants %s - %d"
,
entry
.
Key
,
entry
.
Priority
)
log
.
Error
f
(
"wants %s - %d"
,
entry
.
Key
,
entry
.
Priority
)
l
.
Wants
(
entry
.
Key
,
entry
.
Priority
)
if
exists
,
err
:=
e
.
bs
.
Has
(
entry
.
Key
);
err
==
nil
&&
exists
{
e
.
peerRequestQueue
.
Push
(
entry
.
Entry
,
p
)
...
...
decision/peer_request_queue.go
View file @
4ba17a0f
...
...
@@ -51,12 +51,6 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
tl
.
partners
[
to
]
=
partner
}
if
task
,
ok
:=
tl
.
taskMap
[
taskKey
(
to
,
entry
.
Key
)];
ok
{
task
.
Entry
.
Priority
=
entry
.
Priority
partner
.
taskQueue
.
Update
(
task
.
index
)
return
}
partner
.
activelk
.
Lock
()
defer
partner
.
activelk
.
Unlock
()
_
,
ok
=
partner
.
activeBlocks
[
entry
.
Key
]
...
...
@@ -64,6 +58,12 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
return
}
if
task
,
ok
:=
tl
.
taskMap
[
taskKey
(
to
,
entry
.
Key
)];
ok
{
task
.
Entry
.
Priority
=
entry
.
Priority
partner
.
taskQueue
.
Update
(
task
.
index
)
return
}
task
:=
&
peerRequestTask
{
Entry
:
entry
,
Target
:
to
,
...
...
@@ -220,6 +220,12 @@ func partnerCompare(a, b pq.Elem) bool {
if
pb
.
requests
==
0
{
return
true
}
if
pa
.
active
==
pb
.
active
{
// sorting by taskQueue.Len() aids in cleaning out trash entries faster
// if we sorted instead by requests, one peer could potentially build up
// a huge number of cancelled entries in the queue resulting in a memory leak
return
pa
.
taskQueue
.
Len
()
>
pb
.
taskQueue
.
Len
()
}
return
pa
.
active
<
pb
.
active
}
...
...
network/interface.go
View file @
4ba17a0f
...
...
@@ -33,7 +33,7 @@ type Receiver interface {
ReceiveMessage
(
ctx
context
.
Context
,
sender
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
error
incoming
bsmsg
.
BitSwapMessage
)
ReceiveError
(
error
)
...
...
peermanager.go
View file @
4ba17a0f
...
...
@@ -7,28 +7,36 @@ import (
engine
"github.com/ipfs/go-ipfs/exchange/bitswap/decision"
bsmsg
"github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet
"github.com/ipfs/go-ipfs/exchange/bitswap/network"
wantlist
"github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
peer
"github.com/ipfs/go-ipfs/p2p/peer"
u
"github.com/ipfs/go-ipfs/util"
)
type
Peer
Manager
struct
{
type
Want
Manager
struct
{
receiver
bsnet
.
Receiver
incoming
chan
*
msgPair
connect
chan
peer
.
ID
incoming
chan
[]
*
bsmsg
.
Entry
// notification channel for new peers connecting
connect
chan
peer
.
ID
// notification channel for peers disconnecting
disconnect
chan
peer
.
ID
peers
map
[
peer
.
ID
]
*
msgQueue
wl
*
wantlist
.
Wantlist
network
bsnet
.
BitSwapNetwork
}
func
New
Peer
Manager
(
network
bsnet
.
BitSwapNetwork
)
*
Peer
Manager
{
return
&
Peer
Manager
{
incoming
:
make
(
chan
*
msgPair
,
10
),
func
New
Want
Manager
(
network
bsnet
.
BitSwapNetwork
)
*
Want
Manager
{
return
&
Want
Manager
{
incoming
:
make
(
chan
[]
*
bsmsg
.
Entry
,
10
),
connect
:
make
(
chan
peer
.
ID
,
10
),
disconnect
:
make
(
chan
peer
.
ID
,
10
),
peers
:
make
(
map
[
peer
.
ID
]
*
msgQueue
),
wl
:
wantlist
.
New
(),
network
:
network
,
}
}
...
...
@@ -53,37 +61,68 @@ type msgQueue struct {
done
chan
struct
{}
}
func
(
pm
*
PeerManager
)
SendBlock
(
ctx
context
.
Context
,
env
*
engine
.
Envelope
)
{
func
(
pm
*
WantManager
)
WantBlocks
(
ks
[]
u
.
Key
)
{
log
.
Error
(
"WANT: "
,
ks
)
pm
.
addEntries
(
ks
,
false
)
}
func
(
pm
*
WantManager
)
CancelWants
(
ks
[]
u
.
Key
)
{
log
.
Error
(
"CANCEL: "
,
ks
)
pm
.
addEntries
(
ks
,
true
)
}
func
(
pm
*
WantManager
)
addEntries
(
ks
[]
u
.
Key
,
cancel
bool
)
{
var
entries
[]
*
bsmsg
.
Entry
for
i
,
k
:=
range
ks
{
entries
=
append
(
entries
,
&
bsmsg
.
Entry
{
Cancel
:
cancel
,
Entry
:
wantlist
.
Entry
{
Key
:
k
,
Priority
:
kMaxPriority
-
i
,
},
})
}
pm
.
incoming
<-
entries
}
func
(
pm
*
WantManager
)
SendBlock
(
ctx
context
.
Context
,
env
*
engine
.
Envelope
)
{
// Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack
defer
env
.
Sent
()
msg
:=
bsmsg
.
New
()
msg
.
AddBlock
(
env
.
Block
)
msg
.
SetFull
(
false
)
err
:=
pm
.
network
.
SendMessage
(
ctx
,
env
.
Peer
,
msg
)
if
err
!=
nil
{
log
.
Error
(
err
)
}
}
func
(
pm
*
Peer
Manager
)
startPeerHandler
(
ctx
context
.
Context
,
p
peer
.
ID
)
*
msgQueue
{
func
(
pm
*
Want
Manager
)
startPeerHandler
(
ctx
context
.
Context
,
p
peer
.
ID
)
*
msgQueue
{
_
,
ok
:=
pm
.
peers
[
p
]
if
ok
{
// TODO: log an error?
return
nil
}
mq
:=
new
(
msgQueue
)
mq
.
done
=
make
(
chan
struct
{})
mq
.
work
=
make
(
chan
struct
{},
1
)
mq
.
p
=
p
mq
:=
newMsgQueue
(
p
)
// new peer, we will want to give them our full wantlist
fullwantlist
:=
bsmsg
.
New
()
for
_
,
e
:=
range
pm
.
wl
.
Entries
()
{
fullwantlist
.
AddEntry
(
e
.
Key
,
e
.
Priority
)
}
fullwantlist
.
SetFull
(
true
)
mq
.
out
=
fullwantlist
mq
.
work
<-
struct
{}{}
pm
.
peers
[
p
]
=
mq
go
pm
.
runQueue
(
ctx
,
mq
)
return
mq
}
func
(
pm
*
Peer
Manager
)
stopPeerHandler
(
p
peer
.
ID
)
{
func
(
pm
*
Want
Manager
)
stopPeerHandler
(
p
peer
.
ID
)
{
pq
,
ok
:=
pm
.
peers
[
p
]
if
!
ok
{
// TODO: log error?
...
...
@@ -94,32 +133,38 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) {
delete
(
pm
.
peers
,
p
)
}
func
(
pm
*
Peer
Manager
)
runQueue
(
ctx
context
.
Context
,
mq
*
msgQueue
)
{
func
(
pm
*
Want
Manager
)
runQueue
(
ctx
context
.
Context
,
mq
*
msgQueue
)
{
for
{
select
{
case
<-
mq
.
work
:
// there is work to be done
// TODO: this might not need to be done every time, figure out
// a good heuristic
err
:=
pm
.
network
.
ConnectTo
(
ctx
,
mq
.
p
)
if
err
!=
nil
{
log
.
Error
(
err
)
// TODO: cant connect, what now?
}
// grab outgoin message
// grab outgoin
g
message
mq
.
outlk
.
Lock
()
wlm
:=
mq
.
out
mq
.
out
=
nil
mq
.
outlk
.
Unlock
()
if
wlm
!=
nil
&&
!
wlm
.
Empty
()
{
// send wantlist updates
err
=
pm
.
network
.
SendMessage
(
ctx
,
mq
.
p
,
wlm
)
if
err
!=
nil
{
log
.
Error
(
"bitswap send error: "
,
err
)
// TODO: what do we do if this fails?
}
// no message or empty message, continue
if
wlm
==
nil
{
log
.
Error
(
"nil wantlist"
)
continue
}
if
wlm
.
Empty
()
{
log
.
Error
(
"empty wantlist"
)
continue
}
// send wantlist updates
err
=
pm
.
network
.
SendMessage
(
ctx
,
mq
.
p
,
wlm
)
if
err
!=
nil
{
log
.
Error
(
"bitswap send error: "
,
err
)
// TODO: what do we do if this fails?
}
case
<-
mq
.
done
:
return
...
...
@@ -127,46 +172,38 @@ func (pm *PeerManager) runQueue(ctx context.Context, mq *msgQueue) {
}
}
func
(
pm
*
PeerManager
)
Send
(
to
peer
.
ID
,
msg
bsmsg
.
BitSwapMessage
)
{
if
len
(
msg
.
Blocks
())
>
0
{
panic
(
"no blocks here!"
)
}
pm
.
incoming
<-
&
msgPair
{
to
:
to
,
msg
:
msg
}
}
func
(
pm
*
PeerManager
)
Broadcast
(
msg
bsmsg
.
BitSwapMessage
)
{
pm
.
incoming
<-
&
msgPair
{
msg
:
msg
}
}
func
(
pm
*
PeerManager
)
Connected
(
p
peer
.
ID
)
{
func
(
pm
*
WantManager
)
Connected
(
p
peer
.
ID
)
{
pm
.
connect
<-
p
}
func
(
pm
*
Peer
Manager
)
Disconnected
(
p
peer
.
ID
)
{
func
(
pm
*
Want
Manager
)
Disconnected
(
p
peer
.
ID
)
{
pm
.
disconnect
<-
p
}
// TODO: use goprocess here once i trust it
func
(
pm
*
Peer
Manager
)
Run
(
ctx
context
.
Context
)
{
func
(
pm
*
Want
Manager
)
Run
(
ctx
context
.
Context
)
{
for
{
select
{
case
msgp
:=
<-
pm
.
incoming
:
// Broadcast message to all if recipient not set
if
msgp
.
to
==
""
{
for
_
,
p
:=
range
pm
.
peers
{
p
.
addMessage
(
msgp
.
msg
)
case
entries
:=
<-
pm
.
incoming
:
msg
:=
bsmsg
.
New
()
msg
.
SetFull
(
false
)
// add changes to our wantlist
for
_
,
e
:=
range
entries
{
if
e
.
Cancel
{
pm
.
wl
.
Remove
(
e
.
Key
)
msg
.
Cancel
(
e
.
Key
)
}
else
{
pm
.
wl
.
Add
(
e
.
Key
,
e
.
Priority
)
msg
.
AddEntry
(
e
.
Key
,
e
.
Priority
)
}
continue
}
p
,
ok
:=
pm
.
peers
[
msgp
.
to
]
if
!
ok
{
//TODO: decide, drop message? or dial?
p
=
pm
.
startPeerHandler
(
ctx
,
msgp
.
to
)
// broadcast those wantlist changes
for
_
,
p
:=
range
pm
.
peers
{
p
.
addMessage
(
msg
)
}
p
.
addMessage
(
msgp
.
msg
)
case
p
:=
<-
pm
.
connect
:
pm
.
startPeerHandler
(
ctx
,
p
)
case
p
:=
<-
pm
.
disconnect
:
...
...
@@ -177,6 +214,15 @@ func (pm *PeerManager) Run(ctx context.Context) {
}
}
func
newMsgQueue
(
p
peer
.
ID
)
*
msgQueue
{
mq
:=
new
(
msgQueue
)
mq
.
done
=
make
(
chan
struct
{})
mq
.
work
=
make
(
chan
struct
{},
1
)
mq
.
p
=
p
return
mq
}
func
(
mq
*
msgQueue
)
addMessage
(
msg
bsmsg
.
BitSwapMessage
)
{
mq
.
outlk
.
Lock
()
defer
func
()
{
...
...
@@ -187,6 +233,10 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) {
}
}()
if
msg
.
Full
()
{
log
.
Error
(
"GOt FULL MESSAGE"
)
}
// if we have no message held, or the one we are given is full
// overwrite the one we are holding
if
mq
.
out
==
nil
||
msg
.
Full
()
{
...
...
@@ -199,8 +249,10 @@ func (mq *msgQueue) addMessage(msg bsmsg.BitSwapMessage) {
// one passed in
for
_
,
e
:=
range
msg
.
Wantlist
()
{
if
e
.
Cancel
{
log
.
Error
(
"add message cancel: "
,
e
.
Key
,
mq
.
p
)
mq
.
out
.
Cancel
(
e
.
Key
)
}
else
{
log
.
Error
(
"add message want: "
,
e
.
Key
,
mq
.
p
)
mq
.
out
.
AddEntry
(
e
.
Key
,
e
.
Priority
)
}
}
...
...
testnet/network_test.go
View file @
4ba17a0f
...
...
@@ -29,19 +29,17 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
responder
.
SetDelegate
(
lambda
(
func
(
ctx
context
.
Context
,
fromWaiter
peer
.
ID
,
msgFromWaiter
bsmsg
.
BitSwapMessage
)
error
{
msgFromWaiter
bsmsg
.
BitSwapMessage
)
{
msgToWaiter
:=
bsmsg
.
New
()
msgToWaiter
.
AddBlock
(
blocks
.
NewBlock
([]
byte
(
expectedStr
)))
waiter
.
SendMessage
(
ctx
,
fromWaiter
,
msgToWaiter
)
return
nil
}))
waiter
.
SetDelegate
(
lambda
(
func
(
ctx
context
.
Context
,
fromResponder
peer
.
ID
,
msgFromResponder
bsmsg
.
BitSwapMessage
)
error
{
msgFromResponder
bsmsg
.
BitSwapMessage
)
{
// TODO assert that this came from the correct peer and that the message contents are as expected
ok
:=
false
...
...
@@ -54,9 +52,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
if
!
ok
{
t
.
Fatal
(
"Message not received from the responder"
)
}
return
nil
}))
messageSentAsync
:=
bsmsg
.
New
()
...
...
@@ -71,7 +67,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
}
type
receiverFunc
func
(
ctx
context
.
Context
,
p
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
error
incoming
bsmsg
.
BitSwapMessage
)
// lambda returns a Receiver instance given a receiver function
func
lambda
(
f
receiverFunc
)
bsnet
.
Receiver
{
...
...
@@ -81,12 +77,12 @@ func lambda(f receiverFunc) bsnet.Receiver {
}
type
lambdaImpl
struct
{
f
func
(
ctx
context
.
Context
,
p
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
error
f
func
(
ctx
context
.
Context
,
p
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
}
func
(
lam
*
lambdaImpl
)
ReceiveMessage
(
ctx
context
.
Context
,
p
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
error
{
return
lam
.
f
(
ctx
,
p
,
incoming
)
p
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
{
lam
.
f
(
ctx
,
p
,
incoming
)
}
func
(
lam
*
lambdaImpl
)
ReceiveError
(
err
error
)
{
...
...
testnet/virtual.go
View file @
4ba17a0f
...
...
@@ -72,7 +72,8 @@ func (n *network) deliver(
n
.
delay
.
Wait
()
return
r
.
ReceiveMessage
(
context
.
TODO
(),
from
,
message
)
r
.
ReceiveMessage
(
context
.
TODO
(),
from
,
message
)
return
nil
}
type
networkClient
struct
{
...
...
workers.go
View file @
4ba17a0f
...
...
@@ -42,9 +42,11 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
}
// Start up a worker to manage periodically resending our wantlist out to peers
px
.
Go
(
func
(
px
process
.
Process
)
{
bs
.
rebroadcastWorker
(
ctx
)
})
/*
px.Go(func(px process.Process) {
bs.rebroadcastWorker(ctx)
})
*/
// Start up a worker to manage sending out provides messages
px
.
Go
(
func
(
px
process
.
Process
)
{
...
...
@@ -72,7 +74,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
continue
}
bs
.
p
m
.
SendBlock
(
ctx
,
envelope
)
bs
.
w
m
.
SendBlock
(
ctx
,
envelope
)
case
<-
ctx
.
Done
()
:
return
}
...
...
@@ -146,30 +148,19 @@ func (bs *Bitswap) clientWorker(parent context.Context) {
log
.
Warning
(
"Received batch request for zero blocks"
)
continue
}
for
i
,
k
:=
range
keys
{
bs
.
wantlist
.
Add
(
k
,
kMaxPriority
-
i
)
}
done
:=
make
(
chan
struct
{})
go
func
()
{
bs
.
wantNewBlocks
(
req
.
ctx
,
keys
)
close
(
done
)
}()
bs
.
wm
.
WantBlocks
(
keys
)
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child
,
cancel
:=
context
.
WithTimeout
(
req
.
ctx
,
providerRequestTimeout
)
providers
:=
bs
.
network
.
FindProvidersAsync
(
child
,
keys
[
0
],
maxProvidersPerRequest
)
err
:=
bs
.
sendWantlistToPeers
(
req
.
ctx
,
providers
)
if
err
!=
nil
{
log
.
Debugf
(
"error sending wantlist: %s"
,
err
)
for
p
:=
range
providers
{
go
bs
.
network
.
ConnectTo
(
req
.
ctx
,
p
)
}
cancel
()
// Wait for wantNewBlocks to finish
<-
done
case
<-
parent
.
Done
()
:
return
}
...
...
@@ -180,22 +171,24 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
ctx
,
cancel
:=
context
.
WithCancel
(
parent
)
defer
cancel
()
broadcastSignal
:=
time
.
After
(
rebroadcastDelay
.
Get
())
tick
:=
time
.
Tick
(
10
*
time
.
Second
)
broadcastSignal
:=
time
.
NewTicker
(
rebroadcastDelay
.
Get
())
defer
broadcastSignal
.
Stop
()
tick
:=
time
.
NewTicker
(
10
*
time
.
Second
)
defer
tick
.
Stop
()
for
{
select
{
case
<-
tick
:
n
:=
bs
.
w
antlist
.
Len
()
case
<-
tick
.
C
:
n
:=
bs
.
w
m
.
wl
.
Len
()
if
n
>
0
{
log
.
Debug
(
n
,
"keys in bitswap wantlist"
)
}
case
<-
broadcastSignal
:
// resend unfulfilled wantlist keys
entries
:=
bs
.
w
antlist
.
Entries
()
case
<-
broadcastSignal
.
C
:
// resend unfulfilled wantlist keys
entries
:=
bs
.
w
m
.
wl
.
Entries
()
if
len
(
entries
)
>
0
{
bs
.
sendWantlis
tToProviders
(
ctx
,
entries
)
bs
.
connec
tToProviders
(
ctx
,
entries
)
}
broadcastSignal
=
time
.
After
(
rebroadcastDelay
.
Get
())
case
<-
parent
.
Done
()
:
return
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment