ledgermanager.go 4.1 KB
Newer Older
1 2 3 4 5
package strategy

import (
	"sync"

Jeromy's avatar
Jeromy committed
6 7 8
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

	bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
9 10 11 12 13 14
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
)

15 16
var log = u.Logger("strategy")

17 18 19 20 21 22
// LedgerMap lists Ledgers by their Partner key.
type ledgerMap map[peerKey]*ledger

// FIXME share this externally
type peerKey u.Key

Jeromy's avatar
Jeromy committed
23 24 25 26 27 28 29
type LedgerManager struct {
	lock       sync.RWMutex
	ledgerMap  ledgerMap
	bs         bstore.Blockstore
	tasklist   *TaskList
	taskOut    chan *Task
	workSignal chan struct{}
30 31
}

Jeromy's avatar
Jeromy committed
32 33 34 35 36 37 38
func NewLedgerManager(bs bstore.Blockstore, ctx context.Context) *LedgerManager {
	lm := &LedgerManager{
		ledgerMap:  make(ledgerMap),
		bs:         bs,
		tasklist:   NewTaskList(),
		taskOut:    make(chan *Task, 4),
		workSignal: make(chan struct{}),
39
	}
40
	go lm.taskWorker(ctx)
Jeromy's avatar
Jeromy committed
41 42 43
	return lm
}

44
func (lm *LedgerManager) taskWorker(ctx context.Context) {
Jeromy's avatar
Jeromy committed
45
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46
		nextTask := lm.tasklist.Pop()
Jeromy's avatar
Jeromy committed
47 48 49 50
		if nextTask == nil {
			// No tasks in the list?
			// Wait until there are!
			select {
51
			case <-ctx.Done():
Jeromy's avatar
Jeromy committed
52 53 54 55 56 57 58
				return
			case <-lm.workSignal:
			}
			continue
		}

		select {
59
		case <-ctx.Done():
Jeromy's avatar
Jeromy committed
60 61 62 63 64 65 66 67
			return
		case lm.taskOut <- nextTask:
		}
	}
}

func (lm *LedgerManager) GetTaskChan() <-chan *Task {
	return lm.taskOut
68 69 70
}

// Returns a slice of Peers with whom the local node has active sessions
Jeromy's avatar
Jeromy committed
71 72 73
func (lm *LedgerManager) Peers() []peer.Peer {
	lm.lock.RLock()
	defer lm.lock.RUnlock()
74 75

	response := make([]peer.Peer, 0)
Jeromy's avatar
Jeromy committed
76
	for _, ledger := range lm.ledgerMap {
77 78 79 80 81 82 83
		response = append(response, ledger.Partner)
	}
	return response
}

// BlockIsWantedByPeer returns true if peer wants the block given by this
// key
Jeromy's avatar
Jeromy committed
84 85 86
func (lm *LedgerManager) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
	lm.lock.RLock()
	defer lm.lock.RUnlock()
87

Brian Tiger Chow's avatar
Brian Tiger Chow committed
88
	ledger := lm.findOrCreate(p)
89 90 91 92 93
	return ledger.WantListContains(k)
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
Jeromy's avatar
Jeromy committed
94 95 96 97
func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
	lm.lock.Lock()
	defer lm.lock.Unlock()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
98
	l := lm.findOrCreate(p)
99 100 101 102 103 104
	if m.Full() {
		l.wantList = wl.New()
	}
	for _, e := range m.Wantlist() {
		if e.Cancel {
			l.CancelWant(e.Key)
Jeromy's avatar
Jeromy committed
105
			lm.tasklist.Cancel(e.Key, p)
106 107
		} else {
			l.Wants(e.Key, e.Priority)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
108
			lm.tasklist.Push(e.Key, e.Priority, p)
Jeromy's avatar
Jeromy committed
109 110 111 112 113 114

			// Signal task generation to restart (if stopped!)
			select {
			case lm.workSignal <- struct{}{}:
			default:
			}
115 116
		}
	}
Jeromy's avatar
Jeromy committed
117

118 119 120
	for _, block := range m.Blocks() {
		// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
		l.ReceivedBytes(len(block.Data))
Jeromy's avatar
Jeromy committed
121 122
		for _, l := range lm.ledgerMap {
			if l.WantListContains(block.Key()) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
123
				lm.tasklist.Push(block.Key(), 1, l.Partner)
Jeromy's avatar
Jeromy committed
124 125 126 127 128 129 130 131 132

				// Signal task generation to restart (if stopped!)
				select {
				case lm.workSignal <- struct{}{}:
				default:
				}

			}
		}
133 134 135 136 137 138 139 140 141 142
	}
	return nil
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

Jeromy's avatar
Jeromy committed
143 144 145
func (lm *LedgerManager) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
	lm.lock.Lock()
	defer lm.lock.Unlock()
146

Brian Tiger Chow's avatar
Brian Tiger Chow committed
147
	l := lm.findOrCreate(p)
148 149 150
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
		l.wantList.Remove(block.Key())
Jeromy's avatar
Jeromy committed
151
		lm.tasklist.Cancel(block.Key(), p)
152 153 154 155 156
	}

	return nil
}

Jeromy's avatar
Jeromy committed
157 158 159
func (lm *LedgerManager) NumBytesSentTo(p peer.Peer) uint64 {
	lm.lock.RLock()
	defer lm.lock.RUnlock()
160

Brian Tiger Chow's avatar
Brian Tiger Chow committed
161
	return lm.findOrCreate(p).Accounting.BytesSent
162 163
}

Jeromy's avatar
Jeromy committed
164 165 166
func (lm *LedgerManager) NumBytesReceivedFrom(p peer.Peer) uint64 {
	lm.lock.RLock()
	defer lm.lock.RUnlock()
167

Brian Tiger Chow's avatar
Brian Tiger Chow committed
168
	return lm.findOrCreate(p).Accounting.BytesRecv
169 170 171
}

// ledger lazily instantiates a ledger
Brian Tiger Chow's avatar
Brian Tiger Chow committed
172
func (lm *LedgerManager) findOrCreate(p peer.Peer) *ledger {
Jeromy's avatar
Jeromy committed
173
	l, ok := lm.ledgerMap[peerKey(p.Key())]
174 175
	if !ok {
		l = newLedger(p)
Jeromy's avatar
Jeromy committed
176
		lm.ledgerMap[peerKey(p.Key())] = l
177 178 179
	}
	return l
}