Commit cafee57f authored by Jeromy's avatar Jeromy

add in some events to bitswap to emit worker information

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 373bacac
...@@ -150,7 +150,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, e ...@@ -150,7 +150,8 @@ func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (*blocks.Block, e
ctx, cancelFunc := context.WithCancel(parent) ctx, cancelFunc := context.WithCancel(parent)
ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest")) ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
defer log.EventBegin(ctx, "GetBlockRequest", &k).Done() log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
defer func() { defer func() {
cancelFunc() cancelFunc()
...@@ -200,6 +201,10 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *block ...@@ -200,6 +201,10 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *block
} }
promise := bs.notifications.Subscribe(ctx, keys...) promise := bs.notifications.Subscribe(ctx, keys...)
for _, k := range keys {
log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
}
bs.wm.WantBlocks(keys) bs.wm.WantBlocks(keys)
req := &blockRequest{ req := &blockRequest{
...@@ -310,6 +315,9 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -310,6 +315,9 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
return return
} }
k := b.Key()
log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
log.Debugf("got block %s from %s (%d,%d)", b, p, brecvd, bdup) log.Debugf("got block %s from %s (%d,%d)", b, p, brecvd, bdup)
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, b); err != nil { if err := bs.HasBlock(hasBlockCtx, b); err != nil {
......
...@@ -7,7 +7,9 @@ import ( ...@@ -7,7 +7,9 @@ import (
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
) )
var TaskWorkerCount = 8 var TaskWorkerCount = 8
...@@ -36,8 +38,9 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { ...@@ -36,8 +38,9 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// Start up workers to handle requests from other nodes for the data on this node // Start up workers to handle requests from other nodes for the data on this node
for i := 0; i < TaskWorkerCount; i++ { for i := 0; i < TaskWorkerCount; i++ {
i := i
px.Go(func(px process.Process) { px.Go(func(px process.Process) {
bs.taskWorker(ctx) bs.taskWorker(ctx, i)
}) })
} }
...@@ -55,15 +58,18 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { ...@@ -55,15 +58,18 @@ func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// consider increasing number if providing blocks bottlenecks // consider increasing number if providing blocks bottlenecks
// file transfers // file transfers
for i := 0; i < provideWorkers; i++ { for i := 0; i < provideWorkers; i++ {
i := i
px.Go(func(px process.Process) { px.Go(func(px process.Process) {
bs.provideWorker(ctx) bs.provideWorker(ctx, i)
}) })
} }
} }
func (bs *Bitswap) taskWorker(ctx context.Context) { func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
idmap := eventlog.LoggableMap{"ID": id}
defer log.Info("bitswap task worker shutting down...") defer log.Info("bitswap task worker shutting down...")
for { for {
log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
select { select {
case nextEnvelope := <-bs.engine.Outbox(): case nextEnvelope := <-bs.engine.Outbox():
select { select {
...@@ -71,6 +77,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { ...@@ -71,6 +77,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
if !ok { if !ok {
continue continue
} }
log.Event(ctx, "Bitswap.TaskWorker.Work", eventlog.LoggableMap{"ID": id, "Target": envelope.Peer.Pretty(), "Block": envelope.Block.Multihash.B58String()})
bs.wm.SendBlock(ctx, envelope) bs.wm.SendBlock(ctx, envelope)
case <-ctx.Done(): case <-ctx.Done():
...@@ -82,10 +89,13 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { ...@@ -82,10 +89,13 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
} }
} }
func (bs *Bitswap) provideWorker(ctx context.Context) { func (bs *Bitswap) provideWorker(ctx context.Context, id int) {
idmap := eventlog.LoggableMap{"ID": id}
for { for {
log.Event(ctx, "Bitswap.ProvideWorker.Loop", idmap)
select { select {
case k, ok := <-bs.provideKeys: case k, ok := <-bs.provideKeys:
log.Event(ctx, "Bitswap.ProvideWorker.Work", idmap, &k)
if !ok { if !ok {
log.Debug("provideKeys channel closed") log.Debug("provideKeys channel closed")
return return
...@@ -139,6 +149,7 @@ func (bs *Bitswap) providerConnector(parent context.Context) { ...@@ -139,6 +149,7 @@ func (bs *Bitswap) providerConnector(parent context.Context) {
defer log.Info("bitswap client worker shutting down...") defer log.Info("bitswap client worker shutting down...")
for { for {
log.Event(parent, "Bitswap.ProviderConnector.Loop")
select { select {
case req := <-bs.findKeys: case req := <-bs.findKeys:
keys := req.keys keys := req.keys
...@@ -146,6 +157,7 @@ func (bs *Bitswap) providerConnector(parent context.Context) { ...@@ -146,6 +157,7 @@ func (bs *Bitswap) providerConnector(parent context.Context) {
log.Warning("Received batch request for zero blocks") log.Warning("Received batch request for zero blocks")
continue continue
} }
log.Event(parent, "Bitswap.ProviderConnector.Work", eventlog.LoggableMap{"Keys": keys})
// NB: Optimization. Assumes that providers of key[0] are likely to // NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most // be able to provide for all keys. This currently holds true in most
...@@ -174,6 +186,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ...@@ -174,6 +186,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
defer tick.Stop() defer tick.Stop()
for { for {
log.Event(ctx, "Bitswap.Rebroadcast.idle")
select { select {
case <-tick.C: case <-tick.C:
n := bs.wm.wl.Len() n := bs.wm.wl.Len()
...@@ -181,6 +194,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { ...@@ -181,6 +194,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
log.Debug(n, "keys in bitswap wantlist") log.Debug(n, "keys in bitswap wantlist")
} }
case <-broadcastSignal.C: // resend unfulfilled wantlist keys case <-broadcastSignal.C: // resend unfulfilled wantlist keys
log.Event(ctx, "Bitswap.Rebroadcast.active")
entries := bs.wm.wl.Entries() entries := bs.wm.wl.Entries()
if len(entries) > 0 { if len(entries) > 0 {
bs.connectToProviders(ctx, entries) bs.connectToProviders(ctx, entries)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment