Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-bitswap
Commits
5b807cbc
Unverified
Commit
5b807cbc
authored
Feb 05, 2019
by
Hannah Howard
Committed by
GitHub
Feb 05, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #64 from ipfs/feat/merge-get-blocks
refactor(GetBlocks): Merge session/non-session
parents
bb897895
7643ad2d
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
4 additions
and
197 deletions
+4
-197
bitswap.go
bitswap.go
+2
-92
bitswap_test.go
bitswap_test.go
+2
-2
workers.go
workers.go
+0
-103
No files found.
bitswap.go
View file @
5b807cbc
...
...
@@ -16,7 +16,6 @@ import (
bsmsg
"github.com/ipfs/go-bitswap/message"
bsmq
"github.com/ipfs/go-bitswap/messagequeue"
bsnet
"github.com/ipfs/go-bitswap/network"
notifications
"github.com/ipfs/go-bitswap/notifications"
bspm
"github.com/ipfs/go-bitswap/peermanager"
bspqm
"github.com/ipfs/go-bitswap/providerquerymanager"
bssession
"github.com/ipfs/go-bitswap/session"
...
...
@@ -95,9 +94,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sentHistogram
:=
metrics
.
NewCtx
(
ctx
,
"sent_all_blocks_bytes"
,
"Histogram of blocks sent by"
+
" this bitswap"
)
.
Histogram
(
metricsBuckets
)
notif
:=
notifications
.
New
()
px
:=
process
.
WithTeardown
(
func
()
error
{
notif
.
Shutdown
()
return
nil
})
...
...
@@ -120,10 +117,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
bs
:=
&
Bitswap
{
blockstore
:
bstore
,
notifications
:
notif
,
engine
:
decision
.
NewEngine
(
ctx
,
bstore
),
// TODO close the engine with Close() method
network
:
network
,
findKeys
:
make
(
chan
*
blockRequest
,
sizeBatchRequestChan
),
process
:
px
,
newBlocks
:
make
(
chan
cid
.
Cid
,
HasBlockBufferSize
),
provideKeys
:
make
(
chan
cid
.
Cid
,
provideKeysBufferSize
),
...
...
@@ -179,12 +174,6 @@ type Bitswap struct {
// NB: ensure threadsafety
blockstore
blockstore
.
Blockstore
// notifications engine for receiving new blocks and routing them to the
// appropriate user requests
notifications
notifications
.
PubSub
// findKeys sends keys to a worker to find and connect to providers for them
findKeys
chan
*
blockRequest
// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
...
...
@@ -248,86 +237,8 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func
(
bs
*
Bitswap
)
GetBlocks
(
ctx
context
.
Context
,
keys
[]
cid
.
Cid
)
(
<-
chan
blocks
.
Block
,
error
)
{
if
len
(
keys
)
==
0
{
out
:=
make
(
chan
blocks
.
Block
)
close
(
out
)
return
out
,
nil
}
select
{
case
<-
bs
.
process
.
Closing
()
:
return
nil
,
errors
.
New
(
"bitswap is closed"
)
default
:
}
promise
:=
bs
.
notifications
.
Subscribe
(
ctx
,
keys
...
)
for
_
,
k
:=
range
keys
{
log
.
Event
(
ctx
,
"Bitswap.GetBlockRequest.Start"
,
k
)
}
mses
:=
bs
.
sm
.
GetNextSessionID
()
bs
.
wm
.
WantBlocks
(
ctx
,
keys
,
nil
,
mses
)
remaining
:=
cid
.
NewSet
()
for
_
,
k
:=
range
keys
{
remaining
.
Add
(
k
)
}
out
:=
make
(
chan
blocks
.
Block
)
go
func
()
{
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
defer
cancel
()
defer
close
(
out
)
defer
func
()
{
// can't just defer this call on its own, arguments are resolved *when* the defer is created
bs
.
CancelWants
(
remaining
.
Keys
(),
mses
)
}()
findProvsDelay
:=
time
.
NewTimer
(
findProviderDelay
)
defer
findProvsDelay
.
Stop
()
findProvsDelayCh
:=
findProvsDelay
.
C
req
:=
&
blockRequest
{
Cid
:
keys
[
0
],
Ctx
:
ctx
,
}
var
findProvsReqCh
chan
<-
*
blockRequest
for
{
select
{
case
<-
findProvsDelayCh
:
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
findProvsReqCh
=
bs
.
findKeys
findProvsDelayCh
=
nil
case
findProvsReqCh
<-
req
:
findProvsReqCh
=
nil
case
blk
,
ok
:=
<-
promise
:
if
!
ok
{
return
}
// No need to find providers now.
findProvsDelay
.
Stop
()
findProvsDelayCh
=
nil
findProvsReqCh
=
nil
bs
.
CancelWants
([]
cid
.
Cid
{
blk
.
Cid
()},
mses
)
remaining
.
Remove
(
blk
.
Cid
())
select
{
case
out
<-
blk
:
case
<-
ctx
.
Done
()
:
return
}
case
<-
ctx
.
Done
()
:
return
}
}
}()
return
out
,
nil
session
:=
bs
.
sm
.
NewSession
(
ctx
)
return
session
.
GetBlocks
(
ctx
,
keys
)
}
// CancelWants removes a given key from the wantlist.
...
...
@@ -366,7 +277,6 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {
// is waiting on a GetBlock for that object, they will receive a reference
// to the same node. We should address this soon, but i'm not going to do
// it now as it requires more thought and isnt causing immediate problems.
bs
.
notifications
.
Publish
(
blk
)
bs
.
sm
.
ReceiveBlockFrom
(
from
,
blk
)
...
...
bitswap_test.go
View file @
5b807cbc
...
...
@@ -533,8 +533,8 @@ func TestWantlistCleanup(t *testing.T) {
}
time
.
Sleep
(
time
.
Millisecond
*
50
)
if
len
(
bswap
.
GetWantlist
())
!=
11
{
t
.
Fatal
(
"should have
11
keys in wantlist"
)
if
len
(
bswap
.
GetWantlist
())
!=
5
{
t
.
Fatal
(
"should have
5
keys in wantlist"
)
}
cancel
()
...
...
workers.go
View file @
5b807cbc
...
...
@@ -2,9 +2,6 @@ package bitswap
import
(
"context"
"math/rand"
"sync"
"time"
engine
"github.com/ipfs/go-bitswap/decision"
bsmsg
"github.com/ipfs/go-bitswap/message"
...
...
@@ -12,16 +9,11 @@ import (
logging
"github.com/ipfs/go-log"
process
"github.com/jbenet/goprocess"
procctx
"github.com/jbenet/goprocess/context"
peer
"github.com/libp2p/go-libp2p-peer"
)
var
TaskWorkerCount
=
8
func
(
bs
*
Bitswap
)
startWorkers
(
px
process
.
Process
,
ctx
context
.
Context
)
{
// Start up a worker to handle block requests this node is making
px
.
Go
(
func
(
px
process
.
Process
)
{
bs
.
providerQueryManager
(
ctx
)
})
// Start up workers to handle requests from other nodes for the data on this node
for
i
:=
0
;
i
<
TaskWorkerCount
;
i
++
{
...
...
@@ -31,11 +23,6 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
})
}
// Start up a worker to manage periodically resending our wantlist out to peers
px
.
Go
(
func
(
px
process
.
Process
)
{
bs
.
rebroadcastWorker
(
ctx
)
})
// Start up a worker to manage sending out provides messages
px
.
Go
(
func
(
px
process
.
Process
)
{
bs
.
provideCollector
(
ctx
)
...
...
@@ -188,93 +175,3 @@ func (bs *Bitswap) provideCollector(ctx context.Context) {
}
}
}
func
(
bs
*
Bitswap
)
rebroadcastWorker
(
parent
context
.
Context
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
parent
)
defer
cancel
()
broadcastSignal
:=
time
.
NewTicker
(
rebroadcastDelay
.
Get
())
defer
broadcastSignal
.
Stop
()
tick
:=
time
.
NewTicker
(
10
*
time
.
Second
)
defer
tick
.
Stop
()
for
{
log
.
Event
(
ctx
,
"Bitswap.Rebroadcast.idle"
)
select
{
case
<-
tick
.
C
:
n
:=
bs
.
wm
.
WantCount
()
if
n
>
0
{
log
.
Debugf
(
"%d keys in bitswap wantlist"
,
n
)
}
case
<-
broadcastSignal
.
C
:
// resend unfulfilled wantlist keys
log
.
Event
(
ctx
,
"Bitswap.Rebroadcast.active"
)
entries
:=
bs
.
wm
.
CurrentWants
()
if
len
(
entries
)
==
0
{
continue
}
// TODO: come up with a better strategy for determining when to search
// for new providers for blocks.
i
:=
rand
.
Intn
(
len
(
entries
))
select
{
case
bs
.
findKeys
<-
&
blockRequest
{
Cid
:
entries
[
i
]
.
Cid
,
Ctx
:
ctx
,
}
:
case
<-
ctx
.
Done
()
:
return
}
case
<-
ctx
.
Done
()
:
return
}
}
}
func
(
bs
*
Bitswap
)
providerQueryManager
(
ctx
context
.
Context
)
{
var
activeLk
sync
.
Mutex
kset
:=
cid
.
NewSet
()
for
{
select
{
case
e
:=
<-
bs
.
findKeys
:
select
{
// make sure its not already cancelled
case
<-
e
.
Ctx
.
Done
()
:
continue
default
:
}
activeLk
.
Lock
()
if
kset
.
Has
(
e
.
Cid
)
{
activeLk
.
Unlock
()
continue
}
kset
.
Add
(
e
.
Cid
)
activeLk
.
Unlock
()
go
func
(
e
*
blockRequest
)
{
child
,
cancel
:=
context
.
WithTimeout
(
e
.
Ctx
,
providerRequestTimeout
)
defer
cancel
()
providers
:=
bs
.
network
.
FindProvidersAsync
(
child
,
e
.
Cid
,
maxProvidersPerRequest
)
wg
:=
&
sync
.
WaitGroup
{}
for
p
:=
range
providers
{
wg
.
Add
(
1
)
go
func
(
p
peer
.
ID
)
{
defer
wg
.
Done
()
err
:=
bs
.
network
.
ConnectTo
(
child
,
p
)
if
err
!=
nil
{
log
.
Debugf
(
"failed to connect to provider %s: %s"
,
p
,
err
)
}
}(
p
)
}
wg
.
Wait
()
activeLk
.
Lock
()
kset
.
Remove
(
e
.
Cid
)
activeLk
.
Unlock
()
}(
e
)
case
<-
ctx
.
Done
()
:
return
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment