Commit 029e305f authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

tasklist queue for bitswap tasks

parent cfbe92bc
{
"ImportPath": "github.com/jbenet/go-ipfs",
"GoVersion": "go1.3",
"GoVersion": "devel +ffe33f1f1f17 Tue Nov 25 15:41:33 2014 +1100",
"Packages": [
"./..."
],
......
......@@ -56,8 +56,7 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout
blockstore: bstore,
cancelFunc: cancelFunc,
notifications: notif,
strategy: strategy.New(nice),
ledgerset: strategy.NewLedgerSet(),
ledgermanager: strategy.NewLedgerManager(bstore, ctx),
routing: routing,
sender: network,
wantlist: wl.New(),
......@@ -93,7 +92,7 @@ type bitswap struct {
// strategy makes decisions about how to interact with partners.
strategy strategy.Strategy
ledgerset *strategy.LedgerSet
ledgermanager *strategy.LedgerManager
wantlist *wl.Wantlist
......@@ -197,7 +196,7 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
// FIXME ensure accounting is handled correctly when
// communication fails. May require slightly different API to
// get better guarantees. May need shared sequence numbers.
bs.ledgerset.MessageSent(p, message)
bs.ledgermanager.MessageSent(p, message)
}(peerToQuery)
}
wg.Wait()
......@@ -236,35 +235,24 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wan
}
func (bs *bitswap) roundWorker(ctx context.Context) {
roundTicker := time.NewTicker(roundTime)
for {
select {
case <-ctx.Done():
return
case <-roundTicker.C:
alloc, err := bs.strategy.GetTasks(bandwidthPerRound, bs.ledgerset, bs.blockstore)
case task := <-bs.ledgermanager.GetTaskChan():
block, err := bs.blockstore.Get(task.Key)
if err != nil {
log.Critical("%s", err)
}
err = bs.processStrategyAllocation(ctx, alloc)
if err != nil {
log.Critical("Error processing strategy allocation: %s", err)
log.Errorf("Expected to have block %s, but it was not found!", task.Key)
continue
}
}
}
}
func (bs *bitswap) processStrategyAllocation(ctx context.Context, alloc []*strategy.Task) error {
for _, t := range alloc {
for _, block := range t.Blocks {
message := bsmsg.New()
message.AddBlock(block)
if err := bs.send(ctx, t.Peer, message); err != nil {
return err
}
// TODO: maybe add keys from our wantlist?
bs.send(ctx, task.Target, message)
}
}
return nil
}
// TODO ensure only one active request per key
......@@ -327,7 +315,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.ledgerset.MessageReceived(p, incoming)
bs.ledgermanager.MessageReceived(p, incoming)
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger
......@@ -352,7 +340,7 @@ func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
for _, k := range bkeys {
message.AddEntry(k, 0, true)
}
for _, p := range bs.ledgerset.Peers() {
for _, p := range bs.ledgermanager.Peers() {
err := bs.send(ctx, p, message)
if err != nil {
log.Errorf("Error sending message: %s", err)
......@@ -372,7 +360,7 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage
if err := bs.sender.SendMessage(ctx, p, m); err != nil {
return err
}
return bs.ledgerset.MessageSent(p, m)
return bs.ledgermanager.MessageSent(p, m)
}
func (bs *bitswap) Close() error {
......
......@@ -26,7 +26,7 @@ func TestClose(t *testing.T) {
vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rout := mockrouting.NewServer()
sesgen := NewSessionGenerator(vnet, rout)
defer sesgen.Stop()
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
block := bgen.Next()
......@@ -41,7 +41,7 @@ func TestGetBlockTimeout(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer()
g := NewSessionGenerator(net, rs)
defer g.Stop()
defer g.Close()
self := g.Next()
......@@ -59,7 +59,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer()
g := NewSessionGenerator(net, rs)
defer g.Stop()
defer g.Close()
block := blocks.NewBlock([]byte("block"))
rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network
......@@ -83,7 +83,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
rs := mockrouting.NewServer()
block := blocks.NewBlock([]byte("block"))
g := NewSessionGenerator(net, rs)
defer g.Stop()
defer g.Close()
hasBlock := g.Next()
defer hasBlock.Exchange.Close()
......@@ -137,7 +137,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer()
sg := NewSessionGenerator(net, rs)
defer sg.Stop()
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
t.Log("Test a few nodes trying to get one file with a lot of blocks")
......@@ -203,7 +203,7 @@ func TestSendToWantingPeer(t *testing.T) {
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
rs := mockrouting.NewServer()
sg := NewSessionGenerator(net, rs)
defer sg.Stop()
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
oldVal := rebroadcastDelay
......
......@@ -8,5 +8,5 @@ type Strategy interface {
// Seed initializes the decider to a deterministic state
Seed(int64)
GetTasks(bandwidth int, ledgers *LedgerSet, bs bstore.Blockstore) ([]*Task, error)
GetTasks(bandwidth int, ledgers *LedgerManager, bs bstore.Blockstore) ([]*Task, error)
}
......@@ -3,6 +3,9 @@ package strategy
import (
"sync"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/jbenet/go-ipfs/peer"
......@@ -15,24 +18,62 @@ type ledgerMap map[peerKey]*ledger
// FIXME share this externally
type peerKey u.Key
type LedgerSet struct {
lock sync.RWMutex
ledgerMap ledgerMap
type LedgerManager struct {
lock sync.RWMutex
ledgerMap ledgerMap
bs bstore.Blockstore
tasklist *TaskList
taskOut chan *Task
workSignal chan struct{}
ctx context.Context
}
func NewLedgerSet() *LedgerSet {
return &LedgerSet{
ledgerMap: make(ledgerMap),
func NewLedgerManager(bs bstore.Blockstore, ctx context.Context) *LedgerManager {
lm := &LedgerManager{
ledgerMap: make(ledgerMap),
bs: bs,
tasklist: NewTaskList(),
taskOut: make(chan *Task, 4),
workSignal: make(chan struct{}),
ctx: ctx,
}
go lm.taskWorker()
return lm
}
func (lm *LedgerManager) taskWorker() {
for {
nextTask := lm.tasklist.GetNext()
if nextTask == nil {
// No tasks in the list?
// Wait until there are!
select {
case <-lm.ctx.Done():
return
case <-lm.workSignal:
}
continue
}
select {
case <-lm.ctx.Done():
return
case lm.taskOut <- nextTask:
}
}
}
func (lm *LedgerManager) GetTaskChan() <-chan *Task {
return lm.taskOut
}
// Returns a slice of Peers with whom the local node has active sessions
func (ls *LedgerSet) Peers() []peer.Peer {
ls.lock.RLock()
defer ls.lock.RUnlock()
func (lm *LedgerManager) Peers() []peer.Peer {
lm.lock.RLock()
defer lm.lock.RUnlock()
response := make([]peer.Peer, 0)
for _, ledger := range ls.ledgerMap {
for _, ledger := range lm.ledgerMap {
response = append(response, ledger.Partner)
}
return response
......@@ -40,43 +81,55 @@ func (ls *LedgerSet) Peers() []peer.Peer {
// BlockIsWantedByPeer returns true if peer wants the block given by this
// key
func (ls *LedgerSet) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
ls.lock.RLock()
defer ls.lock.RUnlock()
func (lm *LedgerManager) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
lm.lock.RLock()
defer lm.lock.RUnlock()
ledger := ls.ledger(p)
ledger := lm.ledger(p)
return ledger.WantListContains(k)
}
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (ls *LedgerSet) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
ls.lock.Lock()
defer ls.lock.Unlock()
// TODO find a more elegant way to handle this check
/*
if p == nil {
return errors.New("Strategy received nil peer")
}
if m == nil {
return errors.New("Strategy received nil message")
}
*/
l := ls.ledger(p)
func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
lm.lock.Lock()
defer lm.lock.Unlock()
l := lm.ledger(p)
if m.Full() {
l.wantList = wl.New()
}
for _, e := range m.Wantlist() {
if e.Cancel {
l.CancelWant(e.Key)
lm.tasklist.Cancel(e.Key, p)
} else {
l.Wants(e.Key, e.Priority)
lm.tasklist.Add(e.Key, e.Priority, p)
// Signal task generation to restart (if stopped!)
select {
case lm.workSignal <- struct{}{}:
default:
}
}
}
for _, block := range m.Blocks() {
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
l.ReceivedBytes(len(block.Data))
for _, l := range lm.ledgerMap {
if l.WantListContains(block.Key()) {
lm.tasklist.Add(block.Key(), 1, l.Partner)
// Signal task generation to restart (if stopped!)
select {
case lm.workSignal <- struct{}{}:
default:
}
}
}
}
return nil
}
......@@ -87,39 +140,40 @@ func (ls *LedgerSet) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically
func (ls *LedgerSet) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
ls.lock.Lock()
defer ls.lock.Unlock()
func (lm *LedgerManager) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
lm.lock.Lock()
defer lm.lock.Unlock()
l := ls.ledger(p)
l := lm.ledger(p)
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data))
l.wantList.Remove(block.Key())
lm.tasklist.Cancel(block.Key(), p)
}
return nil
}
func (ls *LedgerSet) NumBytesSentTo(p peer.Peer) uint64 {
ls.lock.RLock()
defer ls.lock.RUnlock()
func (lm *LedgerManager) NumBytesSentTo(p peer.Peer) uint64 {
lm.lock.RLock()
defer lm.lock.RUnlock()
return ls.ledger(p).Accounting.BytesSent
return lm.ledger(p).Accounting.BytesSent
}
func (ls *LedgerSet) NumBytesReceivedFrom(p peer.Peer) uint64 {
ls.lock.RLock()
defer ls.lock.RUnlock()
func (lm *LedgerManager) NumBytesReceivedFrom(p peer.Peer) uint64 {
lm.lock.RLock()
defer lm.lock.RUnlock()
return ls.ledger(p).Accounting.BytesRecv
return lm.ledger(p).Accounting.BytesRecv
}
// ledger lazily instantiates a ledger
func (ls *LedgerSet) ledger(p peer.Peer) *ledger {
l, ok := ls.ledgerMap[peerKey(p.Key())]
func (lm *LedgerManager) ledger(p peer.Peer) *ledger {
l, ok := lm.ledgerMap[peerKey(p.Key())]
if !ok {
l = newLedger(p)
ls.ledgerMap[peerKey(p.Key())] = l
lm.ledgerMap[peerKey(p.Key())] = l
}
return l
}
......@@ -4,28 +4,30 @@ import (
"strings"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
blocks "github.com/jbenet/go-ipfs/blocks"
message "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/peer"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
type peerAndLedgerset struct {
type peerAndLedgermanager struct {
peer.Peer
ls *LedgerSet
ls *LedgerManager
}
func newPeerAndLedgerset(idStr string) peerAndLedgerset {
return peerAndLedgerset{
func newPeerAndLedgermanager(idStr string) peerAndLedgermanager {
return peerAndLedgermanager{
Peer: testutil.NewPeerWithIDString(idStr),
//Strategy: New(true),
ls: NewLedgerSet(),
ls: NewLedgerManager(nil, context.TODO()),
}
}
func TestConsistentAccounting(t *testing.T) {
sender := newPeerAndLedgerset("Ernie")
receiver := newPeerAndLedgerset("Bert")
sender := newPeerAndLedgermanager("Ernie")
receiver := newPeerAndLedgermanager("Bert")
// Send messages from Ernie to Bert
for i := 0; i < 1000; i++ {
......@@ -56,8 +58,8 @@ func TestConsistentAccounting(t *testing.T) {
}
func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) {
beggar := newPeerAndLedgerset("can't be chooser")
chooser := newPeerAndLedgerset("chooses JIF")
beggar := newPeerAndLedgermanager("can't be chooser")
chooser := newPeerAndLedgermanager("chooses JIF")
block := blocks.NewBlock([]byte("data wanted by beggar"))
......@@ -74,8 +76,8 @@ func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) {
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
sanfrancisco := newPeerAndLedgerset("sf")
seattle := newPeerAndLedgerset("sea")
sanfrancisco := newPeerAndLedgermanager("sf")
seattle := newPeerAndLedgermanager("sea")
m := message.New()
......@@ -95,7 +97,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
}
}
func peerIsPartner(p peer.Peer, ls *LedgerSet) bool {
func peerIsPartner(p peer.Peer, ls *LedgerManager) bool {
for _, partner := range ls.Peers() {
if partner.Key() == p.Key() {
return true
......
package strategy
import (
blocks "github.com/jbenet/go-ipfs/blocks"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/jbenet/go-ipfs/peer"
//blocks "github.com/jbenet/go-ipfs/blocks"
//bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
//wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
//peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
var log = u.Logger("strategy")
/*
// TODO niceness should be on a per-peer basis. Use-case: Certain peers are
// "trusted" and/or controlled by a single human user. The user may want for
// these peers to exchange data freely
......@@ -29,12 +30,7 @@ type strategist struct {
strategyFunc
}
type Task struct {
Peer peer.Peer
Blocks []*blocks.Block
}
func (s *strategist) GetTasks(bandwidth int, ledgers *LedgerSet, bs bstore.Blockstore) ([]*Task, error) {
func (s *strategist) GetTasks(bandwidth int, ledgers *LedgerManager, bs bstore.Blockstore) ([]*Task, error) {
var tasks []*Task
ledgers.lock.RLock()
......@@ -87,3 +83,5 @@ func test() {}
func (s *strategist) Seed(int64) {
// TODO
}
*/
package strategy
import (
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
// TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
type TaskList struct {
tasks []*Task
taskmap map[u.Key]*Task
}
func NewTaskList() *TaskList {
return &TaskList{
taskmap: make(map[u.Key]*Task),
}
}
type Task struct {
Key u.Key
Target peer.Peer
theirPriority int
}
// Add currently adds a new task to the end of the list
// TODO: make this into a priority queue
func (tl *TaskList) Add(block u.Key, priority int, to peer.Peer) {
if task, ok := tl.taskmap[to.Key()+block]; ok {
// TODO: when priority queue is implemented,
// rearrange this Task
task.theirPriority = priority
return
}
task := &Task{
Key: block,
Target: to,
theirPriority: priority,
}
tl.tasks = append(tl.tasks, task)
tl.taskmap[to.Key()+block] = task
}
// GetNext returns the next task to be performed by bitswap
// the task is then removed from the list
func (tl *TaskList) GetNext() *Task {
var out *Task
for len(tl.tasks) > 0 {
// TODO: instead of zero, use exponential distribution
// it will help reduce the chance of receiving
// the same block from multiple peers
out = tl.tasks[0]
tl.tasks = tl.tasks[1:]
delete(tl.taskmap, out.Target.Key()+out.Key)
// Filter out blocks that have been cancelled
if out.theirPriority >= 0 {
break
}
}
return out
}
// Cancel lazily cancels the sending of a block to a given peer
func (tl *TaskList) Cancel(k u.Key, p peer.Peer) {
t, ok := tl.taskmap[p.Key()+k]
if ok {
t.theirPriority = -1
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment