ledgermanager.go 4.61 KB
Newer Older
1 2 3 4 5
package strategy

import (
	"sync"

Jeromy's avatar
Jeromy committed
6 7 8
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

	bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
9 10 11 12 13 14
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
)

15 16
var log = u.Logger("strategy")

17
// LedgerMap lists Ledgers by their Partner key.
18
type ledgerMap map[u.Key]*ledger
19

20 21 22 23 24
type Envelope struct {
	Peer    peer.Peer
	Message bsmsg.BitSwapMessage
}

Jeromy's avatar
Jeromy committed
25
type LedgerManager struct {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
26 27 28 29 30
	lock      sync.RWMutex
	ledgerMap ledgerMap
	bs        bstore.Blockstore
	// FIXME tasklist isn't threadsafe nor is it protected by a mutex. consider
	// a way to avoid sharing the tasklist between the worker and the receiver
Brian Tiger Chow's avatar
Brian Tiger Chow committed
31
	tasklist   *taskList
32
	outbox     chan Envelope
Jeromy's avatar
Jeromy committed
33
	workSignal chan struct{}
34 35
}

36
func NewLedgerManager(ctx context.Context, bs bstore.Blockstore) *LedgerManager {
Jeromy's avatar
Jeromy committed
37 38 39
	lm := &LedgerManager{
		ledgerMap:  make(ledgerMap),
		bs:         bs,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
40
		tasklist:   newTaskList(),
41
		outbox:     make(chan Envelope, 4), // TODO extract constant
Jeromy's avatar
Jeromy committed
42
		workSignal: make(chan struct{}),
43
	}
44
	go lm.taskWorker(ctx)
Jeromy's avatar
Jeromy committed
45 46 47
	return lm
}

48
func (lm *LedgerManager) taskWorker(ctx context.Context) {
Jeromy's avatar
Jeromy committed
49
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
50
		nextTask := lm.tasklist.Pop()
Jeromy's avatar
Jeromy committed
51 52 53 54
		if nextTask == nil {
			// No tasks in the list?
			// Wait until there are!
			select {
55
			case <-ctx.Done():
Jeromy's avatar
Jeromy committed
56 57 58 59 60
				return
			case <-lm.workSignal:
			}
			continue
		}
61 62 63 64 65 66 67 68 69
		block, err := lm.bs.Get(nextTask.Key)
		if err != nil {
			continue // TODO maybe return an error
		}
		// construct message here so we can make decisions about any additional
		// information we may want to include at this time.
		m := bsmsg.New()
		m.AddBlock(block)
		// TODO: maybe add keys from our wantlist?
Jeromy's avatar
Jeromy committed
70
		select {
71
		case <-ctx.Done():
Jeromy's avatar
Jeromy committed
72
			return
73
		case lm.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
Jeromy's avatar
Jeromy committed
74 75 76 77
		}
	}
}

78 79
func (lm *LedgerManager) Outbox() <-chan Envelope {
	return lm.outbox
80 81 82
}

// Returns a slice of Peers with whom the local node has active sessions
Jeromy's avatar
Jeromy committed
83 84 85
func (lm *LedgerManager) Peers() []peer.Peer {
	lm.lock.RLock()
	defer lm.lock.RUnlock()
86 87

	response := make([]peer.Peer, 0)
Jeromy's avatar
Jeromy committed
88
	for _, ledger := range lm.ledgerMap {
89 90 91 92 93 94 95
		response = append(response, ledger.Partner)
	}
	return response
}

// BlockIsWantedByPeer returns true if peer wants the block given by this
// key
Jeromy's avatar
Jeromy committed
96 97 98
func (lm *LedgerManager) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
	lm.lock.RLock()
	defer lm.lock.RUnlock()
99

Brian Tiger Chow's avatar
Brian Tiger Chow committed
100
	ledger := lm.findOrCreate(p)
101 102 103 104 105
	return ledger.WantListContains(k)
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
Jeromy's avatar
Jeromy committed
106
func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
107 108 109 110 111 112 113 114 115 116
	newWorkExists := false
	defer func() {
		if newWorkExists {
			// Signal task generation to restart (if stopped!)
			select {
			case lm.workSignal <- struct{}{}:
			default:
			}
		}
	}()
Jeromy's avatar
Jeromy committed
117 118 119
	lm.lock.Lock()
	defer lm.lock.Unlock()

Brian Tiger Chow's avatar
Brian Tiger Chow committed
120
	l := lm.findOrCreate(p)
121 122 123 124 125 126
	if m.Full() {
		l.wantList = wl.New()
	}
	for _, e := range m.Wantlist() {
		if e.Cancel {
			l.CancelWant(e.Key)
Jeromy's avatar
Jeromy committed
127
			lm.tasklist.Cancel(e.Key, p)
128 129
		} else {
			l.Wants(e.Key, e.Priority)
130
			newWorkExists = true
Brian Tiger Chow's avatar
Brian Tiger Chow committed
131
			lm.tasklist.Push(e.Key, e.Priority, p)
132 133
		}
	}
Jeromy's avatar
Jeromy committed
134

135 136 137
	for _, block := range m.Blocks() {
		// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
		l.ReceivedBytes(len(block.Data))
Jeromy's avatar
Jeromy committed
138 139
		for _, l := range lm.ledgerMap {
			if l.WantListContains(block.Key()) {
140
				newWorkExists = true
Brian Tiger Chow's avatar
Brian Tiger Chow committed
141
				lm.tasklist.Push(block.Key(), 1, l.Partner)
Jeromy's avatar
Jeromy committed
142 143
			}
		}
144 145 146 147 148 149 150 151 152 153
	}
	return nil
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

Jeromy's avatar
Jeromy committed
154 155 156
func (lm *LedgerManager) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
	lm.lock.Lock()
	defer lm.lock.Unlock()
157

Brian Tiger Chow's avatar
Brian Tiger Chow committed
158
	l := lm.findOrCreate(p)
159 160 161
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
		l.wantList.Remove(block.Key())
Jeromy's avatar
Jeromy committed
162
		lm.tasklist.Cancel(block.Key(), p)
163 164 165 166 167
	}

	return nil
}

Jeromy's avatar
Jeromy committed
168 169 170
func (lm *LedgerManager) NumBytesSentTo(p peer.Peer) uint64 {
	lm.lock.RLock()
	defer lm.lock.RUnlock()
171

Brian Tiger Chow's avatar
Brian Tiger Chow committed
172
	return lm.findOrCreate(p).Accounting.BytesSent
173 174
}

Jeromy's avatar
Jeromy committed
175 176 177
func (lm *LedgerManager) NumBytesReceivedFrom(p peer.Peer) uint64 {
	lm.lock.RLock()
	defer lm.lock.RUnlock()
178

Brian Tiger Chow's avatar
Brian Tiger Chow committed
179
	return lm.findOrCreate(p).Accounting.BytesRecv
180 181 182
}

// ledger lazily instantiates a ledger
Brian Tiger Chow's avatar
Brian Tiger Chow committed
183
func (lm *LedgerManager) findOrCreate(p peer.Peer) *ledger {
184
	l, ok := lm.ledgerMap[p.Key()]
185 186
	if !ok {
		l = newLedger(p)
187
		lm.ledgerMap[p.Key()] = l
188 189 190
	}
	return l
}