Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-bitswap
Commits
5792e978
Commit
5792e978
authored
Feb 20, 2015
by
Jeromy
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
rename wantlist to bitswap, add stat command
parent
38a0286f
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
45 additions
and
23 deletions
+45
-23
bitswap.go
bitswap.go
+18
-18
stat.go
stat.go
+22
-0
workers.go
workers.go
+5
-5
No files found.
bitswap.go
View file @
5792e978
...
@@ -79,7 +79,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
...
@@ -79,7 +79,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
px
.
Close
()
px
.
Close
()
}()
}()
bs
:=
&
b
itswap
{
bs
:=
&
B
itswap
{
self
:
p
,
self
:
p
,
blockstore
:
bstore
,
blockstore
:
bstore
,
notifications
:
notif
,
notifications
:
notif
,
...
@@ -97,8 +97,8 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
...
@@ -97,8 +97,8 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
return
bs
return
bs
}
}
//
b
itswap instances implement the bitswap protocol.
//
B
itswap instances implement the bitswap protocol.
type
b
itswap
struct
{
type
B
itswap
struct
{
// the ID of the peer to act on behalf of
// the ID of the peer to act on behalf of
self
peer
.
ID
self
peer
.
ID
...
@@ -133,7 +133,7 @@ type blockRequest struct {
...
@@ -133,7 +133,7 @@ type blockRequest struct {
// GetBlock attempts to retrieve a particular block from peers within the
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
// deadline enforced by the context.
func
(
bs
*
b
itswap
)
GetBlock
(
parent
context
.
Context
,
k
u
.
Key
)
(
*
blocks
.
Block
,
error
)
{
func
(
bs
*
B
itswap
)
GetBlock
(
parent
context
.
Context
,
k
u
.
Key
)
(
*
blocks
.
Block
,
error
)
{
// Any async work initiated by this function must end when this function
// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
// returns. To ensure this, derive a new context. Note that it is okay to
...
@@ -179,7 +179,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
...
@@ -179,7 +179,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
// NB: Your request remains open until the context expires. To conserve
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
// that lasts throughout the lifetime of the server)
func
(
bs
*
b
itswap
)
GetBlocks
(
ctx
context
.
Context
,
keys
[]
u
.
Key
)
(
<-
chan
*
blocks
.
Block
,
error
)
{
func
(
bs
*
B
itswap
)
GetBlocks
(
ctx
context
.
Context
,
keys
[]
u
.
Key
)
(
<-
chan
*
blocks
.
Block
,
error
)
{
select
{
select
{
case
<-
bs
.
process
.
Closing
()
:
case
<-
bs
.
process
.
Closing
()
:
return
nil
,
errors
.
New
(
"bitswap is closed"
)
return
nil
,
errors
.
New
(
"bitswap is closed"
)
...
@@ -201,7 +201,7 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
...
@@ -201,7 +201,7 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
// HasBlock announces the existance of a block to this bitswap service. The
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
// service will potentially notify its peers.
func
(
bs
*
b
itswap
)
HasBlock
(
ctx
context
.
Context
,
blk
*
blocks
.
Block
)
error
{
func
(
bs
*
B
itswap
)
HasBlock
(
ctx
context
.
Context
,
blk
*
blocks
.
Block
)
error
{
log
.
Event
(
ctx
,
"hasBlock"
,
blk
)
log
.
Event
(
ctx
,
"hasBlock"
,
blk
)
select
{
select
{
case
<-
bs
.
process
.
Closing
()
:
case
<-
bs
.
process
.
Closing
()
:
...
@@ -221,7 +221,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
...
@@ -221,7 +221,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return
nil
return
nil
}
}
func
(
bs
*
b
itswap
)
sendWantlistMsgToPeers
(
ctx
context
.
Context
,
m
bsmsg
.
BitSwapMessage
,
peers
<-
chan
peer
.
ID
)
error
{
func
(
bs
*
B
itswap
)
sendWantlistMsgToPeers
(
ctx
context
.
Context
,
m
bsmsg
.
BitSwapMessage
,
peers
<-
chan
peer
.
ID
)
error
{
set
:=
pset
.
New
()
set
:=
pset
.
New
()
wg
:=
sync
.
WaitGroup
{}
wg
:=
sync
.
WaitGroup
{}
for
peerToQuery
:=
range
peers
{
for
peerToQuery
:=
range
peers
{
...
@@ -242,7 +242,7 @@ func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMe
...
@@ -242,7 +242,7 @@ func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMe
return
nil
return
nil
}
}
func
(
bs
*
b
itswap
)
sendWantlistToPeers
(
ctx
context
.
Context
,
peers
<-
chan
peer
.
ID
)
error
{
func
(
bs
*
B
itswap
)
sendWantlistToPeers
(
ctx
context
.
Context
,
peers
<-
chan
peer
.
ID
)
error
{
message
:=
bsmsg
.
New
()
message
:=
bsmsg
.
New
()
message
.
SetFull
(
true
)
message
.
SetFull
(
true
)
for
_
,
wanted
:=
range
bs
.
wantlist
.
Entries
()
{
for
_
,
wanted
:=
range
bs
.
wantlist
.
Entries
()
{
...
@@ -251,7 +251,7 @@ func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID
...
@@ -251,7 +251,7 @@ func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID
return
bs
.
sendWantlistMsgToPeers
(
ctx
,
message
,
peers
)
return
bs
.
sendWantlistMsgToPeers
(
ctx
,
message
,
peers
)
}
}
func
(
bs
*
b
itswap
)
sendWantlistToProviders
(
ctx
context
.
Context
,
entries
[]
wantlist
.
Entry
)
{
func
(
bs
*
B
itswap
)
sendWantlistToProviders
(
ctx
context
.
Context
,
entries
[]
wantlist
.
Entry
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
defer
cancel
()
defer
cancel
()
...
@@ -286,7 +286,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
...
@@ -286,7 +286,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
}
}
// TODO(brian): handle errors
// TODO(brian): handle errors
func
(
bs
*
b
itswap
)
ReceiveMessage
(
ctx
context
.
Context
,
p
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
(
func
(
bs
*
B
itswap
)
ReceiveMessage
(
ctx
context
.
Context
,
p
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
(
peer
.
ID
,
bsmsg
.
BitSwapMessage
)
{
peer
.
ID
,
bsmsg
.
BitSwapMessage
)
{
defer
log
.
EventBegin
(
ctx
,
"receiveMessage"
,
p
,
incoming
)
.
Done
()
defer
log
.
EventBegin
(
ctx
,
"receiveMessage"
,
p
,
incoming
)
.
Done
()
...
@@ -325,7 +325,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
...
@@ -325,7 +325,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
}
}
// Connected/Disconnected warns bitswap about peer connections
// Connected/Disconnected warns bitswap about peer connections
func
(
bs
*
b
itswap
)
PeerConnected
(
p
peer
.
ID
)
{
func
(
bs
*
B
itswap
)
PeerConnected
(
p
peer
.
ID
)
{
// TODO: add to clientWorker??
// TODO: add to clientWorker??
peers
:=
make
(
chan
peer
.
ID
,
1
)
peers
:=
make
(
chan
peer
.
ID
,
1
)
peers
<-
p
peers
<-
p
...
@@ -337,11 +337,11 @@ func (bs *bitswap) PeerConnected(p peer.ID) {
...
@@ -337,11 +337,11 @@ func (bs *bitswap) PeerConnected(p peer.ID) {
}
}
// Connected/Disconnected warns bitswap about peer connections
// Connected/Disconnected warns bitswap about peer connections
func
(
bs
*
b
itswap
)
PeerDisconnected
(
p
peer
.
ID
)
{
func
(
bs
*
B
itswap
)
PeerDisconnected
(
p
peer
.
ID
)
{
bs
.
engine
.
PeerDisconnected
(
p
)
bs
.
engine
.
PeerDisconnected
(
p
)
}
}
func
(
bs
*
b
itswap
)
cancelBlocks
(
ctx
context
.
Context
,
bkeys
[]
u
.
Key
)
{
func
(
bs
*
B
itswap
)
cancelBlocks
(
ctx
context
.
Context
,
bkeys
[]
u
.
Key
)
{
if
len
(
bkeys
)
<
1
{
if
len
(
bkeys
)
<
1
{
return
return
}
}
...
@@ -358,7 +358,7 @@ func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
...
@@ -358,7 +358,7 @@ func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
}
}
}
}
func
(
bs
*
b
itswap
)
wantNewBlocks
(
ctx
context
.
Context
,
bkeys
[]
u
.
Key
)
{
func
(
bs
*
B
itswap
)
wantNewBlocks
(
ctx
context
.
Context
,
bkeys
[]
u
.
Key
)
{
if
len
(
bkeys
)
<
1
{
if
len
(
bkeys
)
<
1
{
return
return
}
}
...
@@ -383,7 +383,7 @@ func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
...
@@ -383,7 +383,7 @@ func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
wg
.
Wait
()
wg
.
Wait
()
}
}
func
(
bs
*
b
itswap
)
ReceiveError
(
err
error
)
{
func
(
bs
*
B
itswap
)
ReceiveError
(
err
error
)
{
log
.
Debugf
(
"Bitswap ReceiveError: %s"
,
err
)
log
.
Debugf
(
"Bitswap ReceiveError: %s"
,
err
)
// TODO log the network error
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
// TODO bubble the network error up to the parent context/error logger
...
@@ -391,7 +391,7 @@ func (bs *bitswap) ReceiveError(err error) {
...
@@ -391,7 +391,7 @@ func (bs *bitswap) ReceiveError(err error) {
// send strives to ensure that accounting is always performed when a message is
// send strives to ensure that accounting is always performed when a message is
// sent
// sent
func
(
bs
*
b
itswap
)
send
(
ctx
context
.
Context
,
p
peer
.
ID
,
m
bsmsg
.
BitSwapMessage
)
error
{
func
(
bs
*
B
itswap
)
send
(
ctx
context
.
Context
,
p
peer
.
ID
,
m
bsmsg
.
BitSwapMessage
)
error
{
defer
log
.
EventBegin
(
ctx
,
"sendMessage"
,
p
,
m
)
.
Done
()
defer
log
.
EventBegin
(
ctx
,
"sendMessage"
,
p
,
m
)
.
Done
()
if
err
:=
bs
.
network
.
SendMessage
(
ctx
,
p
,
m
);
err
!=
nil
{
if
err
:=
bs
.
network
.
SendMessage
(
ctx
,
p
,
m
);
err
!=
nil
{
return
errors
.
Wrap
(
err
)
return
errors
.
Wrap
(
err
)
...
@@ -399,11 +399,11 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage)
...
@@ -399,11 +399,11 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage)
return
bs
.
engine
.
MessageSent
(
p
,
m
)
return
bs
.
engine
.
MessageSent
(
p
,
m
)
}
}
func
(
bs
*
b
itswap
)
Close
()
error
{
func
(
bs
*
B
itswap
)
Close
()
error
{
return
bs
.
process
.
Close
()
return
bs
.
process
.
Close
()
}
}
func
(
bs
*
b
itswap
)
GetWantlist
()
[]
u
.
Key
{
func
(
bs
*
B
itswap
)
GetWantlist
()
[]
u
.
Key
{
var
out
[]
u
.
Key
var
out
[]
u
.
Key
for
_
,
e
:=
range
bs
.
wantlist
.
Entries
()
{
for
_
,
e
:=
range
bs
.
wantlist
.
Entries
()
{
out
=
append
(
out
,
e
.
Key
)
out
=
append
(
out
,
e
.
Key
)
...
...
stat.go
0 → 100644
View file @
5792e978
package
bitswap
import
(
peer
"github.com/jbenet/go-ipfs/p2p/peer"
u
"github.com/jbenet/go-ipfs/util"
)
type
Stat
struct
{
ProvideBufLen
int
Wantlist
[]
u
.
Key
Peers
[]
peer
.
ID
}
func
(
bs
*
Bitswap
)
Stat
()
(
*
Stat
,
error
)
{
st
:=
new
(
Stat
)
st
.
ProvideBufLen
=
len
(
bs
.
newBlocks
)
st
.
Wantlist
=
bs
.
GetWantlist
()
st
.
Peers
=
bs
.
engine
.
Peers
()
return
st
,
nil
}
workers.go
View file @
5792e978
...
@@ -8,7 +8,7 @@ import (
...
@@ -8,7 +8,7 @@ import (
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
)
func
(
bs
*
b
itswap
)
startWorkers
(
px
process
.
Process
,
ctx
context
.
Context
)
{
func
(
bs
*
B
itswap
)
startWorkers
(
px
process
.
Process
,
ctx
context
.
Context
)
{
// Start up a worker to handle block requests this node is making
// Start up a worker to handle block requests this node is making
px
.
Go
(
func
(
px
process
.
Process
)
{
px
.
Go
(
func
(
px
process
.
Process
)
{
bs
.
clientWorker
(
ctx
)
bs
.
clientWorker
(
ctx
)
...
@@ -34,7 +34,7 @@ func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) {
...
@@ -34,7 +34,7 @@ func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) {
}
}
}
}
func
(
bs
*
b
itswap
)
taskWorker
(
ctx
context
.
Context
)
{
func
(
bs
*
B
itswap
)
taskWorker
(
ctx
context
.
Context
)
{
defer
log
.
Info
(
"bitswap task worker shutting down..."
)
defer
log
.
Info
(
"bitswap task worker shutting down..."
)
for
{
for
{
select
{
select
{
...
@@ -55,7 +55,7 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
...
@@ -55,7 +55,7 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
}
}
}
}
func
(
bs
*
b
itswap
)
provideWorker
(
ctx
context
.
Context
)
{
func
(
bs
*
B
itswap
)
provideWorker
(
ctx
context
.
Context
)
{
for
{
for
{
select
{
select
{
case
blk
,
ok
:=
<-
bs
.
newBlocks
:
case
blk
,
ok
:=
<-
bs
.
newBlocks
:
...
@@ -75,7 +75,7 @@ func (bs *bitswap) provideWorker(ctx context.Context) {
...
@@ -75,7 +75,7 @@ func (bs *bitswap) provideWorker(ctx context.Context) {
}
}
// TODO ensure only one active request per key
// TODO ensure only one active request per key
func
(
bs
*
b
itswap
)
clientWorker
(
parent
context
.
Context
)
{
func
(
bs
*
B
itswap
)
clientWorker
(
parent
context
.
Context
)
{
defer
log
.
Info
(
"bitswap client worker shutting down..."
)
defer
log
.
Info
(
"bitswap client worker shutting down..."
)
for
{
for
{
...
@@ -115,7 +115,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
...
@@ -115,7 +115,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
}
}
}
}
func
(
bs
*
b
itswap
)
rebroadcastWorker
(
parent
context
.
Context
)
{
func
(
bs
*
B
itswap
)
rebroadcastWorker
(
parent
context
.
Context
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
parent
)
ctx
,
cancel
:=
context
.
WithCancel
(
parent
)
defer
cancel
()
defer
cancel
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment