Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-bitswap
Commits
484399b4
Commit
484399b4
authored
Mar 25, 2020
by
Steven Allen
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix: avoid copying messages multiple times on send
Also, reduce the overhead from logging.
parent
3895cc0a
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
50 additions
and
41 deletions
+50
-41
bitswap.go
bitswap.go
+1
-0
workers.go
workers.go
+49
-41
No files found.
bitswap.go
View file @
484399b4
...
@@ -37,6 +37,7 @@ import (
...
@@ -37,6 +37,7 @@ import (
)
)
var
log
=
logging
.
Logger
(
"bitswap"
)
var
log
=
logging
.
Logger
(
"bitswap"
)
var
sflog
=
log
.
Desugar
()
var
_
exchange
.
SessionExchange
=
(
*
Bitswap
)(
nil
)
var
_
exchange
.
SessionExchange
=
(
*
Bitswap
)(
nil
)
...
...
workers.go
View file @
484399b4
...
@@ -5,11 +5,11 @@ import (
...
@@ -5,11 +5,11 @@ import (
"fmt"
"fmt"
engine
"github.com/ipfs/go-bitswap/internal/decision"
engine
"github.com/ipfs/go-bitswap/internal/decision"
bsmsg
"github.com/ipfs/go-bitswap/message"
pb
"github.com/ipfs/go-bitswap/message/pb"
pb
"github.com/ipfs/go-bitswap/message/pb"
cid
"github.com/ipfs/go-cid"
cid
"github.com/ipfs/go-cid"
process
"github.com/jbenet/goprocess"
process
"github.com/jbenet/goprocess"
procctx
"github.com/jbenet/goprocess/context"
procctx
"github.com/jbenet/goprocess/context"
"go.uber.org/zap"
)
)
// TaskWorkerCount is the total number of simultaneous threads sending
// TaskWorkerCount is the total number of simultaneous threads sending
...
@@ -52,29 +52,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
...
@@ -52,29 +52,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
continue
continue
}
}
// update the BS ledger to reflect sent message
// TODO: Should only track *useful* messages in ledger
outgoing
:=
bsmsg
.
New
(
false
)
for
_
,
block
:=
range
envelope
.
Message
.
Blocks
()
{
log
.
Debugw
(
"Bitswap.TaskWorker.Work"
,
"Target"
,
envelope
.
Peer
,
"Block"
,
block
.
Cid
(),
)
outgoing
.
AddBlock
(
block
)
}
for
_
,
blockPresence
:=
range
envelope
.
Message
.
BlockPresences
()
{
outgoing
.
AddBlockPresence
(
blockPresence
.
Cid
,
blockPresence
.
Type
)
}
// TODO: Only record message as sent if there was no error?
// TODO: Only record message as sent if there was no error?
bs
.
engine
.
MessageSent
(
envelope
.
Peer
,
outgoing
)
// Ideally, yes. But we'd need some way to trigger a retry and/or drop
// the peer.
bs
.
engine
.
MessageSent
(
envelope
.
Peer
,
envelope
.
Message
)
bs
.
sendBlocks
(
ctx
,
envelope
)
bs
.
sendBlocks
(
ctx
,
envelope
)
bs
.
counterLk
.
Lock
()
for
_
,
block
:=
range
envelope
.
Message
.
Blocks
()
{
bs
.
counters
.
blocksSent
++
bs
.
counters
.
dataSent
+=
uint64
(
len
(
block
.
RawData
()))
}
bs
.
counterLk
.
Unlock
()
case
<-
ctx
.
Done
()
:
case
<-
ctx
.
Done
()
:
return
return
}
}
...
@@ -84,41 +66,67 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
...
@@ -84,41 +66,67 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
}
}
}
}
func
(
bs
*
Bitswap
)
sendBlocks
(
ctx
context
.
Context
,
env
*
engine
.
Envelope
)
{
func
(
bs
*
Bitswap
)
logOutgoingBlocks
(
env
*
engine
.
Envelope
)
{
// Blocks need to be sent synchronously to maintain proper backpressure
if
ce
:=
sflog
.
Check
(
zap
.
DebugLevel
,
"Bitswap -> send blocks"
);
ce
==
nil
{
// throughout the network stack
return
defer
env
.
Sent
()
}
msgSize
:=
0
msg
:=
bsmsg
.
New
(
false
)
for
_
,
blockPresence
:=
range
env
.
Message
.
BlockPresences
()
{
for
_
,
blockPresence
:=
range
env
.
Message
.
BlockPresences
()
{
c
:=
blockPresence
.
Cid
c
:=
blockPresence
.
Cid
switch
blockPresence
.
Type
{
switch
blockPresence
.
Type
{
case
pb
.
Message_Have
:
case
pb
.
Message_Have
:
log
.
Infof
(
"Sending HAVE %s to %s"
,
c
.
String
()[
2
:
8
],
env
.
Peer
)
log
.
Debugw
(
"sending message"
,
"type"
,
"HAVE"
,
"cid"
,
c
,
"peer"
,
env
.
Peer
,
)
case
pb
.
Message_DontHave
:
case
pb
.
Message_DontHave
:
log
.
Infof
(
"Sending DONT_HAVE %s to %s"
,
c
.
String
()[
2
:
8
],
env
.
Peer
)
log
.
Debugw
(
"sending message"
,
"type"
,
"DONT_HAVE"
,
"cid"
,
c
,
"peer"
,
env
.
Peer
,
)
default
:
default
:
panic
(
fmt
.
Sprintf
(
"unrecognized BlockPresence type %v"
,
blockPresence
.
Type
))
panic
(
fmt
.
Sprintf
(
"unrecognized BlockPresence type %v"
,
blockPresence
.
Type
))
}
}
msgSize
+=
bsmsg
.
BlockPresenceSize
(
c
)
msg
.
AddBlockPresence
(
c
,
blockPresence
.
Type
)
}
}
for
_
,
block
:=
range
env
.
Message
.
Blocks
()
{
for
_
,
block
:=
range
env
.
Message
.
Blocks
()
{
msgSize
+=
len
(
block
.
RawData
())
log
.
Debugw
(
"sending message"
,
msg
.
AddBlock
(
block
)
"type"
,
"BLOCK"
,
log
.
Infof
(
"Sending block %s to %s"
,
block
,
env
.
Peer
)
"cid"
,
block
.
Cid
(),
"peer"
,
env
.
Peer
,
)
}
}
}
func
(
bs
*
Bitswap
)
sendBlocks
(
ctx
context
.
Context
,
env
*
engine
.
Envelope
)
{
// Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack
defer
env
.
Sent
()
bs
.
sentHistogram
.
Observe
(
float64
(
msgSize
))
bs
.
logOutgoingBlocks
(
env
)
err
:=
bs
.
network
.
SendMessage
(
ctx
,
env
.
Peer
,
msg
)
err
:=
bs
.
network
.
SendMessage
(
ctx
,
env
.
Peer
,
env
.
Message
)
if
err
!=
nil
{
if
err
!=
nil
{
// log.Infof("sendblock error: %s", err)
log
.
Debugw
(
"failed to send blocks message"
,
log
.
Errorf
(
"SendMessage error: %s. size: %d. block-presence length: %d"
,
err
,
msg
.
Size
(),
len
(
env
.
Message
.
BlockPresences
()))
"peer"
,
env
.
Peer
,
"error"
,
err
,
)
return
}
}
log
.
Infof
(
"Sent message to %s"
,
env
.
Peer
)
dataSent
:=
0
blocks
:=
env
.
Message
.
Blocks
()
for
_
,
b
:=
range
blocks
{
dataSent
+=
len
(
b
.
RawData
())
}
bs
.
counterLk
.
Lock
()
bs
.
counters
.
blocksSent
+=
uint64
(
len
(
blocks
))
bs
.
counters
.
dataSent
+=
uint64
(
dataSent
)
bs
.
counterLk
.
Unlock
()
bs
.
sentHistogram
.
Observe
(
float64
(
env
.
Message
.
Size
()))
log
.
Debugw
(
"sent message"
,
"peer"
,
env
.
Peer
)
}
}
func
(
bs
*
Bitswap
)
provideWorker
(
px
process
.
Process
)
{
func
(
bs
*
Bitswap
)
provideWorker
(
px
process
.
Process
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment