Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-dms3
Commits
b514478f
Commit
b514478f
authored
10 years ago
by
Jeromy
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
rename wantlist to bitswap, add stat command
parent
559a2415
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
132 additions
and
24 deletions
+132
-24
core/commands/bitswap.go
core/commands/bitswap.go
+86
-0
core/commands/root.go
core/commands/root.go
+1
-1
exchange/bitswap/bitswap.go
exchange/bitswap/bitswap.go
+18
-18
exchange/bitswap/stat.go
exchange/bitswap/stat.go
+22
-0
exchange/bitswap/workers.go
exchange/bitswap/workers.go
+5
-5
No files found.
core/commands/
wantlist
.go
→
core/commands/
bitswap
.go
View file @
b514478f
package
commands
import
cmds
"github.com/jbenet/go-ipfs/commands"
import
(
"bytes"
"encoding/json"
cmds
"github.com/jbenet/go-ipfs/commands"
bitswap
"github.com/jbenet/go-ipfs/exchange/bitswap"
u
"github.com/jbenet/go-ipfs/util"
"io"
)
var
Wantlist
Cmd
=
&
cmds
.
Command
{
var
Bitswap
Cmd
=
&
cmds
.
Command
{
Helptext
:
cmds
.
HelpText
{
Tagline
:
"A set of commands to
work with
the bitswap
wantlis
t"
,
Tagline
:
"A set of commands to
manipulate
the bitswap
agen
t"
,
ShortDescription
:
``
,
},
Subcommands
:
map
[
string
]
*
cmds
.
Command
{
"show"
:
showWantlistCmd
,
"wantlist"
:
showWantlistCmd
,
"stat"
:
bitswapStatCmd
,
},
}
...
...
@@ -32,3 +40,47 @@ Print out all blocks currently on the bitswap wantlist for the local peer`,
cmds
.
Text
:
KeyListTextMarshaler
,
},
}
var
bitswapStatCmd
=
&
cmds
.
Command
{
Helptext
:
cmds
.
HelpText
{
Tagline
:
"show some diagnostic information on the bitswap agent"
,
ShortDescription
:
``
,
},
Type
:
bitswap
.
Stat
{},
Run
:
func
(
req
cmds
.
Request
,
res
cmds
.
Response
)
{
nd
,
err
:=
req
.
Context
()
.
GetNode
()
if
err
!=
nil
{
res
.
SetError
(
err
,
cmds
.
ErrNormal
)
return
}
bs
,
ok
:=
nd
.
Exchange
.
(
*
bitswap
.
Bitswap
)
if
!
ok
{
res
.
SetError
(
u
.
ErrCast
(),
cmds
.
ErrNormal
)
return
}
st
,
err
:=
bs
.
Stat
()
if
err
!=
nil
{
res
.
SetError
(
err
,
cmds
.
ErrNormal
)
return
}
res
.
SetOutput
(
st
)
},
Marshalers
:
cmds
.
MarshalerMap
{
cmds
.
Text
:
func
(
res
cmds
.
Response
)
(
io
.
Reader
,
error
)
{
out
,
ok
:=
res
.
Output
()
.
(
*
bitswap
.
Stat
)
if
!
ok
{
return
nil
,
u
.
ErrCast
()
}
buf
:=
new
(
bytes
.
Buffer
)
enc
:=
json
.
NewEncoder
(
buf
)
err
:=
enc
.
Encode
(
out
)
if
err
!=
nil
{
return
nil
,
err
}
return
buf
,
nil
},
},
}
This diff is collapsed.
Click to expand it.
core/commands/root.go
View file @
b514478f
...
...
@@ -98,7 +98,7 @@ var rootSubcommands = map[string]*cmds.Command{
"swarm"
:
SwarmCmd
,
"update"
:
UpdateCmd
,
"version"
:
VersionCmd
,
"
wantlist
"
:
Wantlist
Cmd
,
"
bitswap
"
:
Bitswap
Cmd
,
}
func
init
()
{
...
...
This diff is collapsed.
Click to expand it.
exchange/bitswap/bitswap.go
View file @
b514478f
...
...
@@ -79,7 +79,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
px
.
Close
()
}()
bs
:=
&
b
itswap
{
bs
:=
&
B
itswap
{
self
:
p
,
blockstore
:
bstore
,
notifications
:
notif
,
...
...
@@ -97,8 +97,8 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
return
bs
}
//
b
itswap instances implement the bitswap protocol.
type
b
itswap
struct
{
//
B
itswap instances implement the bitswap protocol.
type
B
itswap
struct
{
// the ID of the peer to act on behalf of
self
peer
.
ID
...
...
@@ -133,7 +133,7 @@ type blockRequest struct {
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func
(
bs
*
b
itswap
)
GetBlock
(
parent
context
.
Context
,
k
u
.
Key
)
(
*
blocks
.
Block
,
error
)
{
func
(
bs
*
B
itswap
)
GetBlock
(
parent
context
.
Context
,
k
u
.
Key
)
(
*
blocks
.
Block
,
error
)
{
// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
...
...
@@ -179,7 +179,7 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func
(
bs
*
b
itswap
)
GetBlocks
(
ctx
context
.
Context
,
keys
[]
u
.
Key
)
(
<-
chan
*
blocks
.
Block
,
error
)
{
func
(
bs
*
B
itswap
)
GetBlocks
(
ctx
context
.
Context
,
keys
[]
u
.
Key
)
(
<-
chan
*
blocks
.
Block
,
error
)
{
select
{
case
<-
bs
.
process
.
Closing
()
:
return
nil
,
errors
.
New
(
"bitswap is closed"
)
...
...
@@ -201,7 +201,7 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func
(
bs
*
b
itswap
)
HasBlock
(
ctx
context
.
Context
,
blk
*
blocks
.
Block
)
error
{
func
(
bs
*
B
itswap
)
HasBlock
(
ctx
context
.
Context
,
blk
*
blocks
.
Block
)
error
{
log
.
Event
(
ctx
,
"hasBlock"
,
blk
)
select
{
case
<-
bs
.
process
.
Closing
()
:
...
...
@@ -221,7 +221,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return
nil
}
func
(
bs
*
b
itswap
)
sendWantlistMsgToPeers
(
ctx
context
.
Context
,
m
bsmsg
.
BitSwapMessage
,
peers
<-
chan
peer
.
ID
)
error
{
func
(
bs
*
B
itswap
)
sendWantlistMsgToPeers
(
ctx
context
.
Context
,
m
bsmsg
.
BitSwapMessage
,
peers
<-
chan
peer
.
ID
)
error
{
set
:=
pset
.
New
()
wg
:=
sync
.
WaitGroup
{}
for
peerToQuery
:=
range
peers
{
...
...
@@ -242,7 +242,7 @@ func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMe
return
nil
}
func
(
bs
*
b
itswap
)
sendWantlistToPeers
(
ctx
context
.
Context
,
peers
<-
chan
peer
.
ID
)
error
{
func
(
bs
*
B
itswap
)
sendWantlistToPeers
(
ctx
context
.
Context
,
peers
<-
chan
peer
.
ID
)
error
{
message
:=
bsmsg
.
New
()
message
.
SetFull
(
true
)
for
_
,
wanted
:=
range
bs
.
wantlist
.
Entries
()
{
...
...
@@ -251,7 +251,7 @@ func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID
return
bs
.
sendWantlistMsgToPeers
(
ctx
,
message
,
peers
)
}
func
(
bs
*
b
itswap
)
sendWantlistToProviders
(
ctx
context
.
Context
,
entries
[]
wantlist
.
Entry
)
{
func
(
bs
*
B
itswap
)
sendWantlistToProviders
(
ctx
context
.
Context
,
entries
[]
wantlist
.
Entry
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
defer
cancel
()
...
...
@@ -286,7 +286,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
}
// TODO(brian): handle errors
func
(
bs
*
b
itswap
)
ReceiveMessage
(
ctx
context
.
Context
,
p
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
(
func
(
bs
*
B
itswap
)
ReceiveMessage
(
ctx
context
.
Context
,
p
peer
.
ID
,
incoming
bsmsg
.
BitSwapMessage
)
(
peer
.
ID
,
bsmsg
.
BitSwapMessage
)
{
defer
log
.
EventBegin
(
ctx
,
"receiveMessage"
,
p
,
incoming
)
.
Done
()
...
...
@@ -325,7 +325,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
}
// Connected/Disconnected warns bitswap about peer connections
func
(
bs
*
b
itswap
)
PeerConnected
(
p
peer
.
ID
)
{
func
(
bs
*
B
itswap
)
PeerConnected
(
p
peer
.
ID
)
{
// TODO: add to clientWorker??
peers
:=
make
(
chan
peer
.
ID
,
1
)
peers
<-
p
...
...
@@ -337,11 +337,11 @@ func (bs *bitswap) PeerConnected(p peer.ID) {
}
// Connected/Disconnected warns bitswap about peer connections
func
(
bs
*
b
itswap
)
PeerDisconnected
(
p
peer
.
ID
)
{
func
(
bs
*
B
itswap
)
PeerDisconnected
(
p
peer
.
ID
)
{
bs
.
engine
.
PeerDisconnected
(
p
)
}
func
(
bs
*
b
itswap
)
cancelBlocks
(
ctx
context
.
Context
,
bkeys
[]
u
.
Key
)
{
func
(
bs
*
B
itswap
)
cancelBlocks
(
ctx
context
.
Context
,
bkeys
[]
u
.
Key
)
{
if
len
(
bkeys
)
<
1
{
return
}
...
...
@@ -358,7 +358,7 @@ func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
}
}
func
(
bs
*
b
itswap
)
wantNewBlocks
(
ctx
context
.
Context
,
bkeys
[]
u
.
Key
)
{
func
(
bs
*
B
itswap
)
wantNewBlocks
(
ctx
context
.
Context
,
bkeys
[]
u
.
Key
)
{
if
len
(
bkeys
)
<
1
{
return
}
...
...
@@ -383,7 +383,7 @@ func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
wg
.
Wait
()
}
func
(
bs
*
b
itswap
)
ReceiveError
(
err
error
)
{
func
(
bs
*
B
itswap
)
ReceiveError
(
err
error
)
{
log
.
Debugf
(
"Bitswap ReceiveError: %s"
,
err
)
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
...
...
@@ -391,7 +391,7 @@ func (bs *bitswap) ReceiveError(err error) {
// send strives to ensure that accounting is always performed when a message is
// sent
func
(
bs
*
b
itswap
)
send
(
ctx
context
.
Context
,
p
peer
.
ID
,
m
bsmsg
.
BitSwapMessage
)
error
{
func
(
bs
*
B
itswap
)
send
(
ctx
context
.
Context
,
p
peer
.
ID
,
m
bsmsg
.
BitSwapMessage
)
error
{
defer
log
.
EventBegin
(
ctx
,
"sendMessage"
,
p
,
m
)
.
Done
()
if
err
:=
bs
.
network
.
SendMessage
(
ctx
,
p
,
m
);
err
!=
nil
{
return
errors
.
Wrap
(
err
)
...
...
@@ -399,11 +399,11 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage)
return
bs
.
engine
.
MessageSent
(
p
,
m
)
}
func
(
bs
*
b
itswap
)
Close
()
error
{
func
(
bs
*
B
itswap
)
Close
()
error
{
return
bs
.
process
.
Close
()
}
func
(
bs
*
b
itswap
)
GetWantlist
()
[]
u
.
Key
{
func
(
bs
*
B
itswap
)
GetWantlist
()
[]
u
.
Key
{
var
out
[]
u
.
Key
for
_
,
e
:=
range
bs
.
wantlist
.
Entries
()
{
out
=
append
(
out
,
e
.
Key
)
...
...
This diff is collapsed.
Click to expand it.
exchange/bitswap/stat.go
0 → 100644
View file @
b514478f
package
bitswap
import
(
peer
"github.com/jbenet/go-ipfs/p2p/peer"
u
"github.com/jbenet/go-ipfs/util"
)
type
Stat
struct
{
ProvideBufLen
int
Wantlist
[]
u
.
Key
Peers
[]
peer
.
ID
}
func
(
bs
*
Bitswap
)
Stat
()
(
*
Stat
,
error
)
{
st
:=
new
(
Stat
)
st
.
ProvideBufLen
=
len
(
bs
.
newBlocks
)
st
.
Wantlist
=
bs
.
GetWantlist
()
st
.
Peers
=
bs
.
engine
.
Peers
()
return
st
,
nil
}
This diff is collapsed.
Click to expand it.
exchange/bitswap/workers.go
View file @
b514478f
...
...
@@ -8,7 +8,7 @@ import (
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
func
(
bs
*
b
itswap
)
startWorkers
(
px
process
.
Process
,
ctx
context
.
Context
)
{
func
(
bs
*
B
itswap
)
startWorkers
(
px
process
.
Process
,
ctx
context
.
Context
)
{
// Start up a worker to handle block requests this node is making
px
.
Go
(
func
(
px
process
.
Process
)
{
bs
.
clientWorker
(
ctx
)
...
...
@@ -34,7 +34,7 @@ func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) {
}
}
func
(
bs
*
b
itswap
)
taskWorker
(
ctx
context
.
Context
)
{
func
(
bs
*
B
itswap
)
taskWorker
(
ctx
context
.
Context
)
{
defer
log
.
Info
(
"bitswap task worker shutting down..."
)
for
{
select
{
...
...
@@ -55,7 +55,7 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
}
}
func
(
bs
*
b
itswap
)
provideWorker
(
ctx
context
.
Context
)
{
func
(
bs
*
B
itswap
)
provideWorker
(
ctx
context
.
Context
)
{
for
{
select
{
case
blk
,
ok
:=
<-
bs
.
newBlocks
:
...
...
@@ -75,7 +75,7 @@ func (bs *bitswap) provideWorker(ctx context.Context) {
}
// TODO ensure only one active request per key
func
(
bs
*
b
itswap
)
clientWorker
(
parent
context
.
Context
)
{
func
(
bs
*
B
itswap
)
clientWorker
(
parent
context
.
Context
)
{
defer
log
.
Info
(
"bitswap client worker shutting down..."
)
for
{
...
...
@@ -115,7 +115,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
}
}
func
(
bs
*
b
itswap
)
rebroadcastWorker
(
parent
context
.
Context
)
{
func
(
bs
*
B
itswap
)
rebroadcastWorker
(
parent
context
.
Context
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
parent
)
defer
cancel
()
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment