Commit 0abc72c0 authored by Jeromy's avatar Jeromy

move some variables into strategy

parent 4d144758
...@@ -96,6 +96,11 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er ...@@ -96,6 +96,11 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er
} }
} }
func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) (<-chan blocks.Block, error) {
// TODO:
return nil, nil
}
// DeleteBlock deletes a block in the blockservice from the datastore // DeleteBlock deletes a block in the blockservice from the datastore
func (s *BlockService) DeleteBlock(k u.Key) error { func (s *BlockService) DeleteBlock(k u.Key) error {
return s.Datastore.Delete(k.DsKey()) return s.Datastore.Delete(k.DsKey())
......
...@@ -157,9 +157,10 @@ func (bs *bitswap) run(ctx context.Context) { ...@@ -157,9 +157,10 @@ func (bs *bitswap) run(ctx context.Context) {
rebroadcastTime := time.Second * 5 rebroadcastTime := time.Second * 5
var providers <-chan peer.Peer // NB: must be initialized to zero value var providers <-chan peer.Peer // NB: must be initialized to zero value
broadcastSignal := time.After(rebroadcastPeriod) broadcastSignal := time.After(bs.strategy.GetRebroadcastDelay())
unsentKeys := 0
// Number of unsent keys for the current batch
unsentKeys := 0
for { for {
select { select {
case <-broadcastSignal: case <-broadcastSignal:
...@@ -170,14 +171,14 @@ func (bs *bitswap) run(ctx context.Context) { ...@@ -170,14 +171,14 @@ func (bs *bitswap) run(ctx context.Context) {
if providers == nil { if providers == nil {
// rely on semi randomness of maps // rely on semi randomness of maps
firstKey := wantlist[0] firstKey := wantlist[0]
providers = bs.routing.FindProvidersAsync(ctx, firstKey, 6) providers = bs.routing.FindProvidersAsync(ctx, firstKey, maxProvidersPerRequest)
} }
err := bs.sendWantListTo(ctx, providers) err := bs.sendWantListTo(ctx, providers)
if err != nil { if err != nil {
log.Errorf("error sending wantlist: %s", err) log.Errorf("error sending wantlist: %s", err)
} }
providers = nil providers = nil
broadcastSignal = time.After(rebroadcastPeriod) broadcastSignal = time.After(bs.strategy.GetRebroadcastDelay())
case k := <-bs.blockRequests: case k := <-bs.blockRequests:
if unsentKeys == 0 { if unsentKeys == 0 {
...@@ -185,19 +186,19 @@ func (bs *bitswap) run(ctx context.Context) { ...@@ -185,19 +186,19 @@ func (bs *bitswap) run(ctx context.Context) {
} }
unsentKeys++ unsentKeys++
if unsentKeys >= numKeysPerBatch { if unsentKeys >= bs.strategy.GetBatchSize() {
// send wantlist to providers // send wantlist to providers
err := bs.sendWantListTo(ctx, providers) err := bs.sendWantListTo(ctx, providers)
if err != nil { if err != nil {
log.Errorf("error sending wantlist: %s", err) log.Errorf("error sending wantlist: %s", err)
} }
unsentKeys = 0 unsentKeys = 0
broadcastSignal = time.After(rebroadcastPeriod) broadcastSignal = time.After(bs.strategy.GetRebroadcastDelay())
providers = nil providers = nil
} else { } else {
// set a timeout to wait for more blocks or send current wantlist // set a timeout to wait for more blocks or send current wantlist
broadcastSignal = time.After(batchDelay) broadcastSignal = time.After(bs.strategy.GetBatchDelay())
} }
case <-ctx.Done(): case <-ctx.Done():
return return
...@@ -217,7 +218,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { ...@@ -217,7 +218,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
// TODO(brian): handle errors // TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) ( func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) { peer.Peer, bsmsg.BitSwapMessage) {
log.Debugf("ReceiveMessage from %v", p.Key()) log.Debugf("ReceiveMessage from %s", p)
log.Debugf("Message wantlist: %v", incoming.Wantlist()) log.Debugf("Message wantlist: %v", incoming.Wantlist())
if p == nil { if p == nil {
...@@ -239,6 +240,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm ...@@ -239,6 +240,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
for _, block := range incoming.Blocks() { for _, block := range incoming.Blocks() {
// TODO verify blocks? // TODO verify blocks?
if err := bs.blockstore.Put(&block); err != nil { if err := bs.blockstore.Put(&block); err != nil {
log.Criticalf("error putting block: %s", err)
continue // FIXME(brian): err ignored continue // FIXME(brian): err ignored
} }
bs.notifications.Publish(block) bs.notifications.Publish(block)
......
package strategy package strategy
import ( import (
"time"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
...@@ -29,4 +31,9 @@ type Strategy interface { ...@@ -29,4 +31,9 @@ type Strategy interface {
NumBytesSentTo(peer.Peer) uint64 NumBytesSentTo(peer.Peer) uint64
NumBytesReceivedFrom(peer.Peer) uint64 NumBytesReceivedFrom(peer.Peer) uint64
// Values determining bitswap behavioural patterns
GetBatchSize() int
GetBatchDelay() time.Duration
GetRebroadcastDelay() time.Duration
} }
...@@ -3,6 +3,7 @@ package strategy ...@@ -3,6 +3,7 @@ package strategy
import ( import (
"errors" "errors"
"sync" "sync"
"time"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
...@@ -139,3 +140,15 @@ func (s *strategist) ledger(p peer.Peer) *ledger { ...@@ -139,3 +140,15 @@ func (s *strategist) ledger(p peer.Peer) *ledger {
} }
return l return l
} }
func (s *strategist) GetBatchSize() int {
return 10
}
func (s *strategist) GetBatchDelay() time.Duration {
return time.Millisecond * 3
}
func (s *strategist) GetRebroadcastDelay() time.Duration {
return time.Second * 2
}
...@@ -42,6 +42,10 @@ type Network interface { ...@@ -42,6 +42,10 @@ type Network interface {
// the network since it was instantiated // the network since it was instantiated
GetBandwidthTotals() (uint64, uint64) GetBandwidthTotals() (uint64, uint64)
// GetMessageCounts returns the total number of messages passed through
// the network since it was instantiated
GetMessageCounts() (uint64, uint64)
// SendMessage sends given Message out // SendMessage sends given Message out
SendMessage(msg.NetMessage) error SendMessage(msg.NetMessage) error
......
...@@ -45,9 +45,11 @@ type Muxer struct { ...@@ -45,9 +45,11 @@ type Muxer struct {
bwiLock sync.Mutex bwiLock sync.Mutex
bwIn uint64 bwIn uint64
msgIn uint64
bwoLock sync.Mutex bwoLock sync.Mutex
bwOut uint64 bwOut uint64
msgOut uint64
*msg.Pipe *msg.Pipe
ctxc.ContextCloser ctxc.ContextCloser
...@@ -76,6 +78,18 @@ func (m *Muxer) GetPipe() *msg.Pipe { ...@@ -76,6 +78,18 @@ func (m *Muxer) GetPipe() *msg.Pipe {
return m.Pipe return m.Pipe
} }
// GetMessageCounts return the in/out message count measured over this muxer.
func (m *Muxer) GetMessageCounts() (in uint64, out uint64) {
m.bwiLock.Lock()
in = m.msgIn
m.bwiLock.Unlock()
m.bwoLock.Lock()
out = m.msgOut
m.bwoLock.Unlock()
return
}
// GetBandwidthTotals return the in/out bandwidth measured over this muxer. // GetBandwidthTotals return the in/out bandwidth measured over this muxer.
func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) { func (m *Muxer) GetBandwidthTotals() (in uint64, out uint64) {
m.bwiLock.Lock() m.bwiLock.Lock()
...@@ -125,6 +139,7 @@ func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) { ...@@ -125,6 +139,7 @@ func (m *Muxer) handleIncomingMessage(m1 msg.NetMessage) {
m.bwiLock.Lock() m.bwiLock.Lock()
// TODO: compensate for overhead // TODO: compensate for overhead
m.bwIn += uint64(len(m1.Data())) m.bwIn += uint64(len(m1.Data()))
m.msgIn++
m.bwiLock.Unlock() m.bwiLock.Unlock()
data, pid, err := unwrapData(m1.Data()) data, pid, err := unwrapData(m1.Data())
...@@ -182,6 +197,7 @@ func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) { ...@@ -182,6 +197,7 @@ func (m *Muxer) handleOutgoingMessage(pid pb.ProtocolID, m1 msg.NetMessage) {
// TODO: compensate for overhead // TODO: compensate for overhead
// TODO(jbenet): switch this to a goroutine to prevent sync waiting. // TODO(jbenet): switch this to a goroutine to prevent sync waiting.
m.bwOut += uint64(len(data)) m.bwOut += uint64(len(data))
m.msgOut++
m.bwoLock.Unlock() m.bwoLock.Unlock()
m2 := msg.New(m1.Peer(), data) m2 := msg.New(m1.Peer(), data)
......
...@@ -110,6 +110,11 @@ func (n *IpfsNetwork) GetBandwidthTotals() (in uint64, out uint64) { ...@@ -110,6 +110,11 @@ func (n *IpfsNetwork) GetBandwidthTotals() (in uint64, out uint64) {
return n.muxer.GetBandwidthTotals() return n.muxer.GetBandwidthTotals()
} }
// GetBandwidthTotals returns the total amount of messages transferred
func (n *IpfsNetwork) GetMessageCounts() (in uint64, out uint64) {
return n.muxer.GetMessageCounts()
}
// ListenAddresses returns a list of addresses at which this network listens. // ListenAddresses returns a list of addresses at which this network listens.
func (n *IpfsNetwork) ListenAddresses() []ma.Multiaddr { func (n *IpfsNetwork) ListenAddresses() []ma.Multiaddr {
return n.swarm.ListenAddresses() return n.swarm.ListenAddresses()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment