Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-bitswap
Commits
b3005fb2
Commit
b3005fb2
authored
Nov 20, 2015
by
Jeromy
Committed by
Jeromy
Apr 27, 2016
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
wire contexts into bitswap requests more deeply
License: MIT Signed-off-by:
Jeromy
<
jeromyj@gmail.com
>
parent
cba821a8
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
102 additions
and
85 deletions
+102
-85
bitswap.go
bitswap.go
+17
-34
decision/engine.go
decision/engine.go
+1
-1
decision/ledger.go
decision/ledger.go
+4
-2
wantlist/wantlist.go
wantlist/wantlist.go
+22
-8
wantmanager.go
wantmanager.go
+14
-5
workers.go
workers.go
+44
-35
No files found.
bitswap.go
View file @
b3005fb2
...
...
@@ -86,7 +86,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
notifications
:
notif
,
engine
:
decision
.
NewEngine
(
ctx
,
bstore
),
// TODO close the engine with Close() method
network
:
network
,
findKeys
:
make
(
chan
*
blockRequest
,
sizeBatchRequestChan
),
findKeys
:
make
(
chan
*
wantlist
.
Entry
,
sizeBatchRequestChan
),
process
:
px
,
newBlocks
:
make
(
chan
*
blocks
.
Block
,
HasBlockBufferSize
),
provideKeys
:
make
(
chan
key
.
Key
,
provideKeysBufferSize
),
...
...
@@ -129,7 +129,7 @@ type Bitswap struct {
notifications
notifications
.
PubSub
// send keys to a worker to find and connect to providers for them
findKeys
chan
*
blockRequest
findKeys
chan
*
wantlist
.
Entry
engine
*
decision
.
Engine
...
...
@@ -146,8 +146,8 @@ type Bitswap struct {
}
type
blockRequest
struct
{
key
s
[]
key
.
Key
ctx
context
.
Context
key
key
.
Key
ctx
context
.
Context
}
// GetBlock attempts to retrieve a particular block from peers within the
...
...
@@ -208,6 +208,12 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func
(
bs
*
Bitswap
)
GetBlocks
(
ctx
context
.
Context
,
keys
[]
key
.
Key
)
(
<-
chan
*
blocks
.
Block
,
error
)
{
if
len
(
keys
)
==
0
{
out
:=
make
(
chan
*
blocks
.
Block
)
close
(
out
)
return
out
,
nil
}
select
{
case
<-
bs
.
process
.
Closing
()
:
return
nil
,
errors
.
New
(
"bitswap is closed"
)
...
...
@@ -219,11 +225,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *block
log
.
Event
(
ctx
,
"Bitswap.GetBlockRequest.Start"
,
&
k
)
}
bs
.
wm
.
WantBlocks
(
keys
)
bs
.
wm
.
WantBlocks
(
ctx
,
keys
)
req
:=
&
blockRequest
{
keys
:
keys
,
ctx
:
ctx
,
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
req
:=
&
wantlist
.
Entry
{
Key
:
keys
[
0
],
Ctx
:
ctx
,
}
select
{
case
bs
.
findKeys
<-
req
:
...
...
@@ -276,32 +285,6 @@ func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error {
return
err
}
func
(
bs
*
Bitswap
)
connectToProviders
(
ctx
context
.
Context
,
entries
[]
wantlist
.
Entry
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
defer
cancel
()
// Get providers for all entries in wantlist (could take a while)
wg
:=
sync
.
WaitGroup
{}
for
_
,
e
:=
range
entries
{
wg
.
Add
(
1
)
go
func
(
k
key
.
Key
)
{
defer
wg
.
Done
()
child
,
cancel
:=
context
.
WithTimeout
(
ctx
,
providerRequestTimeout
)
defer
cancel
()
providers
:=
bs
.
network
.
FindProvidersAsync
(
child
,
k
,
maxProvidersPerRequest
)
for
prov
:=
range
providers
{
go
func
(
p
peer
.
ID
)
{
bs
.
network
.
ConnectTo
(
ctx
,
p
)
}(
prov
)
}
}(
e
.
Key
)
}
wg
.
Wait
()
// make sure all our children do finish.
}
func
(
bs
*
Bitswap
)
ReceiveMessage
(
ctx
context
.
Context
,
p
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
{
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
...
...
decision/engine.go
View file @
b3005fb2
...
...
@@ -217,7 +217,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
e
.
peerRequestQueue
.
Remove
(
entry
.
Key
,
p
)
}
else
{
log
.
Debugf
(
"wants %s - %d"
,
entry
.
Key
,
entry
.
Priority
)
l
.
Wants
(
entry
.
Key
,
entry
.
Priority
)
l
.
Wants
(
entry
.
Ctx
,
entry
.
Key
,
entry
.
Priority
)
if
exists
,
err
:=
e
.
bs
.
Has
(
entry
.
Key
);
err
==
nil
&&
exists
{
e
.
peerRequestQueue
.
Push
(
entry
.
Entry
,
p
)
newWorkExists
=
true
...
...
decision/ledger.go
View file @
b3005fb2
...
...
@@ -6,6 +6,8 @@ import (
key
"github.com/ipfs/go-ipfs/blocks/key"
wl
"github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
peer
"gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer"
"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)
// keySet is just a convenient alias for maps of keys, where we only care
...
...
@@ -68,9 +70,9 @@ func (l *ledger) ReceivedBytes(n int) {
}
// TODO: this needs to be different. We need timeouts.
func
(
l
*
ledger
)
Wants
(
k
key
.
Key
,
priority
int
)
{
func
(
l
*
ledger
)
Wants
(
ctx
context
.
Context
,
k
key
.
Key
,
priority
int
)
{
log
.
Debugf
(
"peer %s wants %s"
,
l
.
Partner
,
k
)
l
.
wantList
.
Add
(
k
,
priority
)
l
.
wantList
.
Add
(
ctx
,
k
,
priority
)
}
func
(
l
*
ledger
)
CancelWant
(
k
key
.
Key
)
{
...
...
wantlist/wantlist.go
View file @
b3005fb2
...
...
@@ -3,9 +3,12 @@
package
wantlist
import
(
key
"github.com/ipfs/go-ipfs/blocks/key"
"sort"
"sync"
key
"github.com/ipfs/go-ipfs/blocks/key"
"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)
type
ThreadSafe
struct
{
...
...
@@ -16,7 +19,6 @@ type ThreadSafe struct {
// not threadsafe
type
Wantlist
struct
{
set
map
[
key
.
Key
]
Entry
// TODO provide O(1) len accessor if cost becomes an issue
}
type
Entry
struct
{
...
...
@@ -24,6 +26,7 @@ type Entry struct {
// slices can be copied efficiently.
Key
key
.
Key
Priority
int
Ctx
context
.
Context
}
type
entrySlice
[]
Entry
...
...
@@ -44,22 +47,25 @@ func New() *Wantlist {
}
}
func
(
w
*
ThreadSafe
)
Add
(
k
key
.
Key
,
priority
int
)
{
// TODO rm defer for perf
func
(
w
*
ThreadSafe
)
Add
(
ctx
context
.
Context
,
k
key
.
Key
,
priority
int
)
{
w
.
lk
.
Lock
()
defer
w
.
lk
.
Unlock
()
w
.
Wantlist
.
Add
(
ctx
,
k
,
priority
)
}
func
(
w
*
ThreadSafe
)
AddEntry
(
e
Entry
)
{
w
.
lk
.
Lock
()
defer
w
.
lk
.
Unlock
()
w
.
Wantlist
.
Add
(
k
,
priority
)
w
.
Wantlist
.
Add
Entry
(
e
)
}
func
(
w
*
ThreadSafe
)
Remove
(
k
key
.
Key
)
{
// TODO rm defer for perf
w
.
lk
.
Lock
()
defer
w
.
lk
.
Unlock
()
w
.
Wantlist
.
Remove
(
k
)
}
func
(
w
*
ThreadSafe
)
Contains
(
k
key
.
Key
)
(
Entry
,
bool
)
{
// TODO rm defer for perf
w
.
lk
.
RLock
()
defer
w
.
lk
.
RUnlock
()
return
w
.
Wantlist
.
Contains
(
k
)
...
...
@@ -87,14 +93,22 @@ func (w *Wantlist) Len() int {
return
len
(
w
.
set
)
}
func
(
w
*
Wantlist
)
Add
(
k
key
.
Key
,
priority
int
)
{
func
(
w
*
Wantlist
)
Add
(
ctx
context
.
Context
,
k
key
.
Key
,
priority
int
)
{
if
_
,
ok
:=
w
.
set
[
k
];
ok
{
return
}
w
.
set
[
k
]
=
Entry
{
Key
:
k
,
Priority
:
priority
,
Ctx
:
ctx
,
}
}
func
(
w
*
Wantlist
)
AddEntry
(
e
Entry
)
{
if
_
,
ok
:=
w
.
set
[
e
.
Key
];
ok
{
return
}
w
.
set
[
e
.
Key
]
=
e
}
func
(
w
*
Wantlist
)
Remove
(
k
key
.
Key
)
{
...
...
wantmanager.go
View file @
b3005fb2
...
...
@@ -64,16 +64,16 @@ type msgQueue struct {
done
chan
struct
{}
}
func
(
pm
*
WantManager
)
WantBlocks
(
ks
[]
key
.
Key
)
{
func
(
pm
*
WantManager
)
WantBlocks
(
ctx
context
.
Context
,
ks
[]
key
.
Key
)
{
log
.
Infof
(
"want blocks: %s"
,
ks
)
pm
.
addEntries
(
ks
,
false
)
pm
.
addEntries
(
ctx
,
ks
,
false
)
}
func
(
pm
*
WantManager
)
CancelWants
(
ks
[]
key
.
Key
)
{
pm
.
addEntries
(
ks
,
true
)
pm
.
addEntries
(
context
.
TODO
(),
ks
,
true
)
}
func
(
pm
*
WantManager
)
addEntries
(
ks
[]
key
.
Key
,
cancel
bool
)
{
func
(
pm
*
WantManager
)
addEntries
(
ctx
context
.
Context
,
ks
[]
key
.
Key
,
cancel
bool
)
{
var
entries
[]
*
bsmsg
.
Entry
for
i
,
k
:=
range
ks
{
entries
=
append
(
entries
,
&
bsmsg
.
Entry
{
...
...
@@ -81,6 +81,7 @@ func (pm *WantManager) addEntries(ks []key.Key, cancel bool) {
Entry
:
wantlist
.
Entry
{
Key
:
k
,
Priority
:
kMaxPriority
-
i
,
Ctx
:
ctx
,
},
})
}
...
...
@@ -224,7 +225,7 @@ func (pm *WantManager) Run() {
if
e
.
Cancel
{
pm
.
wl
.
Remove
(
e
.
Key
)
}
else
{
pm
.
wl
.
Add
(
e
.
Key
,
e
.
Priorit
y
)
pm
.
wl
.
Add
Entry
(
e
.
Entr
y
)
}
}
...
...
@@ -237,6 +238,14 @@ func (pm *WantManager) Run() {
// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
var
es
[]
*
bsmsg
.
Entry
for
_
,
e
:=
range
pm
.
wl
.
Entries
()
{
select
{
case
<-
e
.
Ctx
.
Done
()
:
// entry has been cancelled
// simply continue, the entry will be removed from the
// wantlist soon enough
continue
default
:
}
es
=
append
(
es
,
&
bsmsg
.
Entry
{
Entry
:
e
})
}
for
_
,
p
:=
range
pm
.
peers
{
...
...
workers.go
View file @
b3005fb2
package
bitswap
import
(
"sync"
"time"
process
"gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
...
...
@@ -8,6 +9,8 @@ import (
context
"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
key
"github.com/ipfs/go-ipfs/blocks/key"
wantlist
"github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
peer
"gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer"
logging
"gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log"
)
...
...
@@ -16,7 +19,7 @@ var TaskWorkerCount = 8
func
(
bs
*
Bitswap
)
startWorkers
(
px
process
.
Process
,
ctx
context
.
Context
)
{
// Start up a worker to handle block requests this node is making
px
.
Go
(
func
(
px
process
.
Process
)
{
bs
.
provider
Connecto
r
(
ctx
)
bs
.
provider
QueryManage
r
(
ctx
)
})
// Start up workers to handle requests from other nodes for the data on this node
...
...
@@ -149,37 +152,6 @@ func (bs *Bitswap) provideCollector(ctx context.Context) {
}
}
// connects to providers for the given keys
func
(
bs
*
Bitswap
)
providerConnector
(
parent
context
.
Context
)
{
defer
log
.
Info
(
"bitswap client worker shutting down..."
)
for
{
log
.
Event
(
parent
,
"Bitswap.ProviderConnector.Loop"
)
select
{
case
req
:=
<-
bs
.
findKeys
:
keys
:=
req
.
keys
if
len
(
keys
)
==
0
{
log
.
Warning
(
"Received batch request for zero blocks"
)
continue
}
log
.
Event
(
parent
,
"Bitswap.ProviderConnector.Work"
,
logging
.
LoggableMap
{
"Keys"
:
keys
})
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child
,
cancel
:=
context
.
WithTimeout
(
req
.
ctx
,
providerRequestTimeout
)
providers
:=
bs
.
network
.
FindProvidersAsync
(
child
,
keys
[
0
],
maxProvidersPerRequest
)
for
p
:=
range
providers
{
go
bs
.
network
.
ConnectTo
(
req
.
ctx
,
p
)
}
cancel
()
case
<-
parent
.
Done
()
:
return
}
}
}
func
(
bs
*
Bitswap
)
rebroadcastWorker
(
parent
context
.
Context
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
parent
)
defer
cancel
()
...
...
@@ -200,12 +172,49 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
}
case
<-
broadcastSignal
.
C
:
// resend unfulfilled wantlist keys
log
.
Event
(
ctx
,
"Bitswap.Rebroadcast.active"
)
entries
:=
bs
.
wm
.
wl
.
Entries
()
if
len
(
entries
)
>
0
{
bs
.
connectToProviders
(
ctx
,
entries
)
for
_
,
e
:=
range
bs
.
wm
.
wl
.
Entries
()
{
bs
.
findKeys
<-
&
e
}
case
<-
parent
.
Done
()
:
return
}
}
}
func
(
bs
*
Bitswap
)
providerQueryManager
(
ctx
context
.
Context
)
{
var
activeLk
sync
.
Mutex
active
:=
make
(
map
[
key
.
Key
]
*
wantlist
.
Entry
)
for
{
select
{
case
e
:=
<-
bs
.
findKeys
:
activeLk
.
Lock
()
if
_
,
ok
:=
active
[
e
.
Key
];
ok
{
activeLk
.
Unlock
()
continue
}
active
[
e
.
Key
]
=
e
activeLk
.
Unlock
()
go
func
(
e
*
wantlist
.
Entry
)
{
child
,
cancel
:=
context
.
WithTimeout
(
e
.
Ctx
,
providerRequestTimeout
)
defer
cancel
()
providers
:=
bs
.
network
.
FindProvidersAsync
(
child
,
e
.
Key
,
maxProvidersPerRequest
)
for
p
:=
range
providers
{
go
func
(
p
peer
.
ID
)
{
err
:=
bs
.
network
.
ConnectTo
(
child
,
p
)
if
err
!=
nil
{
log
.
Debug
(
"failed to connect to provider %s: %s"
,
p
,
err
)
}
}(
p
)
}
activeLk
.
Lock
()
delete
(
active
,
e
.
Key
)
activeLk
.
Unlock
()
}(
e
)
case
<-
ctx
.
Done
()
:
return
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment