Commit 65280c14 authored by Brian Tiger Chow's avatar Brian Tiger Chow Committed by Juan Batiz-Benet

rename to strategy.LedgerManager to decision.Engine

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent b34e4df9
......@@ -7,14 +7,13 @@ import (
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange"
decision "github.com/jbenet/go-ipfs/exchange/bitswap/decision"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
......@@ -56,7 +55,7 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout
blockstore: bstore,
cancelFunc: cancelFunc,
notifications: notif,
ledgermanager: strategy.NewLedgerManager(ctx, bstore),
engine: decision.NewEngine(ctx, bstore),
routing: routing,
sender: network,
wantlist: wantlist.NewThreadSafe(),
......@@ -89,11 +88,7 @@ type bitswap struct {
// have more than a single block in the set
batchRequests chan []u.Key
// strategy makes decisions about how to interact with partners.
// TODO: strategy commented out until we have a use for it again
//strategy strategy.Strategy
ledgermanager *strategy.LedgerManager
engine *decision.Engine
wantlist *wantlist.ThreadSafe
......@@ -196,7 +191,7 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
// FIXME ensure accounting is handled correctly when
// communication fails. May require slightly different API to
// get better guarantees. May need shared sequence numbers.
bs.ledgermanager.MessageSent(p, message)
bs.engine.MessageSent(p, message)
}(peerToQuery)
}
wg.Wait()
......@@ -239,7 +234,7 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
select {
case <-ctx.Done():
return
case envelope := <-bs.ledgermanager.Outbox():
case envelope := <-bs.engine.Outbox():
bs.send(ctx, envelope.Peer, envelope.Message)
}
}
......@@ -305,7 +300,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.ledgermanager.MessageReceived(p, incoming)
bs.engine.MessageReceived(p, incoming)
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger
......@@ -334,7 +329,7 @@ func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
for _, k := range bkeys {
message.Cancel(k)
}
for _, p := range bs.ledgermanager.Peers() {
for _, p := range bs.engine.Peers() {
err := bs.send(ctx, p, message)
if err != nil {
log.Errorf("Error sending message: %s", err)
......@@ -354,7 +349,7 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage
if err := bs.sender.SendMessage(ctx, p, m); err != nil {
return err
}
return bs.ledgermanager.MessageSent(p, m)
return bs.engine.MessageSent(p, m)
}
func (bs *bitswap) Close() error {
......
package strategy
package decision
import (
"sync"
......@@ -11,7 +11,7 @@ import (
u "github.com/jbenet/go-ipfs/util"
)
var log = u.Logger("strategy")
var log = u.Logger("engine")
// Envelope contains a message for a Peer
type Envelope struct {
......@@ -21,7 +21,7 @@ type Envelope struct {
Message bsmsg.BitSwapMessage
}
type LedgerManager struct {
type Engine struct {
// FIXME taskqueue isn't threadsafe nor is it protected by a mutex. consider
// a way to avoid sharing the taskqueue between the worker and the receiver
taskqueue *taskQueue
......@@ -37,32 +37,32 @@ type LedgerManager struct {
ledgerMap map[u.Key]*ledger
}
func NewLedgerManager(ctx context.Context, bs bstore.Blockstore) *LedgerManager {
lm := &LedgerManager{
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
e := &Engine{
ledgerMap: make(map[u.Key]*ledger),
bs: bs,
taskqueue: newTaskQueue(),
outbox: make(chan Envelope, 4), // TODO extract constant
workSignal: make(chan struct{}),
}
go lm.taskWorker(ctx)
return lm
go e.taskWorker(ctx)
return e
}
func (lm *LedgerManager) taskWorker(ctx context.Context) {
func (e *Engine) taskWorker(ctx context.Context) {
for {
nextTask := lm.taskqueue.Pop()
nextTask := e.taskqueue.Pop()
if nextTask == nil {
// No tasks in the list?
// Wait until there are!
select {
case <-ctx.Done():
return
case <-lm.workSignal:
case <-e.workSignal:
}
continue
}
block, err := lm.bs.Get(nextTask.Entry.Key)
block, err := e.bs.Get(nextTask.Entry.Key)
if err != nil {
continue // TODO maybe return an error
}
......@@ -74,22 +74,22 @@ func (lm *LedgerManager) taskWorker(ctx context.Context) {
select {
case <-ctx.Done():
return
case lm.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
}
}
}
func (lm *LedgerManager) Outbox() <-chan Envelope {
return lm.outbox
func (e *Engine) Outbox() <-chan Envelope {
return e.outbox
}
// Returns a slice of Peers with whom the local node has active sessions
func (lm *LedgerManager) Peers() []peer.Peer {
lm.lock.RLock()
defer lm.lock.RUnlock()
func (e *Engine) Peers() []peer.Peer {
e.lock.RLock()
defer e.lock.RUnlock()
response := make([]peer.Peer, 0)
for _, ledger := range lm.ledgerMap {
for _, ledger := range e.ledgerMap {
response = append(response, ledger.Partner)
}
return response
......@@ -97,52 +97,52 @@ func (lm *LedgerManager) Peers() []peer.Peer {
// BlockIsWantedByPeer returns true if peer wants the block given by this
// key
func (lm *LedgerManager) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
lm.lock.RLock()
defer lm.lock.RUnlock()
func (e *Engine) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
e.lock.RLock()
defer e.lock.RUnlock()
ledger := lm.findOrCreate(p)
ledger := e.findOrCreate(p)
return ledger.WantListContains(k)
}
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
newWorkExists := false
defer func() {
if newWorkExists {
// Signal task generation to restart (if stopped!)
select {
case lm.workSignal <- struct{}{}:
case e.workSignal <- struct{}{}:
default:
}
}
}()
lm.lock.Lock()
defer lm.lock.Unlock()
e.lock.Lock()
defer e.lock.Unlock()
l := lm.findOrCreate(p)
l := e.findOrCreate(p)
if m.Full() {
l.wantList = wl.New()
}
for _, e := range m.Wantlist() {
if e.Cancel {
l.CancelWant(e.Key)
lm.taskqueue.Remove(e.Key, p)
for _, entry := range m.Wantlist() {
if entry.Cancel {
l.CancelWant(entry.Key)
e.taskqueue.Remove(entry.Key, p)
} else {
l.Wants(e.Key, e.Priority)
l.Wants(entry.Key, entry.Priority)
newWorkExists = true
lm.taskqueue.Push(e.Key, e.Priority, p)
e.taskqueue.Push(entry.Key, entry.Priority, p)
}
}
for _, block := range m.Blocks() {
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
l.ReceivedBytes(len(block.Data))
for _, l := range lm.ledgerMap {
for _, l := range e.ledgerMap {
if l.WantListContains(block.Key()) {
newWorkExists = true
lm.taskqueue.Push(block.Key(), 1, l.Partner)
e.taskqueue.Push(block.Key(), 1, l.Partner)
}
}
}
......@@ -155,40 +155,40 @@ func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) er
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically
func (lm *LedgerManager) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
lm.lock.Lock()
defer lm.lock.Unlock()
func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
e.lock.Lock()
defer e.lock.Unlock()
l := lm.findOrCreate(p)
l := e.findOrCreate(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data))
l.wantList.Remove(block.Key())
lm.taskqueue.Remove(block.Key(), p)
e.taskqueue.Remove(block.Key(), p)
}
return nil
}
func (lm *LedgerManager) NumBytesSentTo(p peer.Peer) uint64 {
lm.lock.RLock()
defer lm.lock.RUnlock()
func (e *Engine) NumBytesSentTo(p peer.Peer) uint64 {
e.lock.RLock()
defer e.lock.RUnlock()
return lm.findOrCreate(p).Accounting.BytesSent
return e.findOrCreate(p).Accounting.BytesSent
}
func (lm *LedgerManager) NumBytesReceivedFrom(p peer.Peer) uint64 {
lm.lock.RLock()
defer lm.lock.RUnlock()
func (e *Engine) NumBytesReceivedFrom(p peer.Peer) uint64 {
e.lock.RLock()
defer e.lock.RUnlock()
return lm.findOrCreate(p).Accounting.BytesRecv
return e.findOrCreate(p).Accounting.BytesRecv
}
// ledger lazily instantiates a ledger
func (lm *LedgerManager) findOrCreate(p peer.Peer) *ledger {
l, ok := lm.ledgerMap[p.Key()]
func (e *Engine) findOrCreate(p peer.Peer) *ledger {
l, ok := e.ledgerMap[p.Key()]
if !ok {
l = newLedger(p)
lm.ledgerMap[p.Key()] = l
e.ledgerMap[p.Key()] = l
}
return l
}
package strategy
package decision
import (
"strings"
......@@ -14,16 +14,16 @@ import (
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
type peerAndLedgermanager struct {
type peerAndEngine struct {
peer.Peer
ls *LedgerManager
Engine *Engine
}
func newPeerAndLedgermanager(idStr string) peerAndLedgermanager {
return peerAndLedgermanager{
func newPeerAndLedgermanager(idStr string) peerAndEngine {
return peerAndEngine{
Peer: testutil.NewPeerWithIDString(idStr),
//Strategy: New(true),
ls: NewLedgerManager(context.TODO(),
Engine: NewEngine(context.TODO(),
blockstore.NewBlockstore(sync.MutexWrap(ds.NewMapDatastore()))),
}
}
......@@ -39,23 +39,23 @@ func TestConsistentAccounting(t *testing.T) {
content := []string{"this", "is", "message", "i"}
m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
sender.ls.MessageSent(receiver.Peer, m)
receiver.ls.MessageReceived(sender.Peer, m)
sender.Engine.MessageSent(receiver.Peer, m)
receiver.Engine.MessageReceived(sender.Peer, m)
}
// Ensure sender records the change
if sender.ls.NumBytesSentTo(receiver.Peer) == 0 {
if sender.Engine.NumBytesSentTo(receiver.Peer) == 0 {
t.Fatal("Sent bytes were not recorded")
}
// Ensure sender and receiver have the same values
if sender.ls.NumBytesSentTo(receiver.Peer) != receiver.ls.NumBytesReceivedFrom(sender.Peer) {
if sender.Engine.NumBytesSentTo(receiver.Peer) != receiver.Engine.NumBytesReceivedFrom(sender.Peer) {
t.Fatal("Inconsistent book-keeping. Strategies don't agree")
}
// Ensure sender didn't record receving anything. And that the receiver
// didn't record sending anything
if receiver.ls.NumBytesSentTo(sender.Peer) != 0 || sender.ls.NumBytesReceivedFrom(receiver.Peer) != 0 {
if receiver.Engine.NumBytesSentTo(sender.Peer) != 0 || sender.Engine.NumBytesReceivedFrom(receiver.Peer) != 0 {
t.Fatal("Bert didn't send bytes to Ernie")
}
}
......@@ -69,10 +69,10 @@ func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) {
messageFromBeggarToChooser := message.New()
messageFromBeggarToChooser.AddEntry(block.Key(), 1)
chooser.ls.MessageReceived(beggar.Peer, messageFromBeggarToChooser)
chooser.Engine.MessageReceived(beggar.Peer, messageFromBeggarToChooser)
// for this test, doesn't matter if you record that beggar sent
if !chooser.ls.BlockIsWantedByPeer(block.Key(), beggar.Peer) {
if !chooser.Engine.BlockIsWantedByPeer(block.Key(), beggar.Peer) {
t.Fatal("chooser failed to record that beggar wants block")
}
}
......@@ -84,24 +84,24 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
m := message.New()
sanfrancisco.ls.MessageSent(seattle.Peer, m)
seattle.ls.MessageReceived(sanfrancisco.Peer, m)
sanfrancisco.Engine.MessageSent(seattle.Peer, m)
seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
if seattle.Peer.Key() == sanfrancisco.Peer.Key() {
t.Fatal("Sanity Check: Peers have same Key!")
}
if !peerIsPartner(seattle.Peer, sanfrancisco.ls) {
if !peerIsPartner(seattle.Peer, sanfrancisco.Engine) {
t.Fatal("Peer wasn't added as a Partner")
}
if !peerIsPartner(sanfrancisco.Peer, seattle.ls) {
if !peerIsPartner(sanfrancisco.Peer, seattle.Engine) {
t.Fatal("Peer wasn't added as a Partner")
}
}
func peerIsPartner(p peer.Peer, ls *LedgerManager) bool {
for _, partner := range ls.Peers() {
func peerIsPartner(p peer.Peer, e *Engine) bool {
for _, partner := range e.Peers() {
if partner.Key() == p.Key() {
return true
}
......
package strategy
package decision
import (
"time"
......
package strategy
package decision
import (
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment