ledgerset.go 4.07 KB
Newer Older
1 2 3 4 5
package strategy

import (
	"sync"

Jeromy's avatar
Jeromy committed
6 7 8
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

	bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
9 10 11 12 13 14 15 16 17 18 19 20
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
)

// LedgerMap lists Ledgers by their Partner key.
type ledgerMap map[peerKey]*ledger

// FIXME share this externally
type peerKey u.Key

Jeromy's avatar
Jeromy committed
21 22 23 24 25 26 27 28
type LedgerManager struct {
	lock       sync.RWMutex
	ledgerMap  ledgerMap
	bs         bstore.Blockstore
	tasklist   *TaskList
	taskOut    chan *Task
	workSignal chan struct{}
	ctx        context.Context
29 30
}

Jeromy's avatar
Jeromy committed
31 32 33 34 35 36 37 38
func NewLedgerManager(bs bstore.Blockstore, ctx context.Context) *LedgerManager {
	lm := &LedgerManager{
		ledgerMap:  make(ledgerMap),
		bs:         bs,
		tasklist:   NewTaskList(),
		taskOut:    make(chan *Task, 4),
		workSignal: make(chan struct{}),
		ctx:        ctx,
39
	}
Jeromy's avatar
Jeromy committed
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
	go lm.taskWorker()
	return lm
}

func (lm *LedgerManager) taskWorker() {
	for {
		nextTask := lm.tasklist.GetNext()
		if nextTask == nil {
			// No tasks in the list?
			// Wait until there are!
			select {
			case <-lm.ctx.Done():
				return
			case <-lm.workSignal:
			}
			continue
		}

		select {
		case <-lm.ctx.Done():
			return
		case lm.taskOut <- nextTask:
		}
	}
}

func (lm *LedgerManager) GetTaskChan() <-chan *Task {
	return lm.taskOut
68 69 70
}

// Returns a slice of Peers with whom the local node has active sessions
Jeromy's avatar
Jeromy committed
71 72 73
func (lm *LedgerManager) Peers() []peer.Peer {
	lm.lock.RLock()
	defer lm.lock.RUnlock()
74 75

	response := make([]peer.Peer, 0)
Jeromy's avatar
Jeromy committed
76
	for _, ledger := range lm.ledgerMap {
77 78 79 80 81 82 83
		response = append(response, ledger.Partner)
	}
	return response
}

// BlockIsWantedByPeer returns true if peer wants the block given by this
// key
Jeromy's avatar
Jeromy committed
84 85 86
func (lm *LedgerManager) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
	lm.lock.RLock()
	defer lm.lock.RUnlock()
87

Jeromy's avatar
Jeromy committed
88
	ledger := lm.ledger(p)
89 90 91 92 93
	return ledger.WantListContains(k)
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
Jeromy's avatar
Jeromy committed
94 95 96 97 98
func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
	lm.lock.Lock()
	defer lm.lock.Unlock()

	l := lm.ledger(p)
99 100 101 102 103 104
	if m.Full() {
		l.wantList = wl.New()
	}
	for _, e := range m.Wantlist() {
		if e.Cancel {
			l.CancelWant(e.Key)
Jeromy's avatar
Jeromy committed
105
			lm.tasklist.Cancel(e.Key, p)
106 107
		} else {
			l.Wants(e.Key, e.Priority)
Jeromy's avatar
Jeromy committed
108 109 110 111 112 113 114
			lm.tasklist.Add(e.Key, e.Priority, p)

			// Signal task generation to restart (if stopped!)
			select {
			case lm.workSignal <- struct{}{}:
			default:
			}
115 116
		}
	}
Jeromy's avatar
Jeromy committed
117

118 119 120
	for _, block := range m.Blocks() {
		// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
		l.ReceivedBytes(len(block.Data))
Jeromy's avatar
Jeromy committed
121 122 123 124 125 126 127 128 129 130 131 132
		for _, l := range lm.ledgerMap {
			if l.WantListContains(block.Key()) {
				lm.tasklist.Add(block.Key(), 1, l.Partner)

				// Signal task generation to restart (if stopped!)
				select {
				case lm.workSignal <- struct{}{}:
				default:
				}

			}
		}
133 134 135 136 137 138 139 140 141 142
	}
	return nil
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

Jeromy's avatar
Jeromy committed
143 144 145
func (lm *LedgerManager) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
	lm.lock.Lock()
	defer lm.lock.Unlock()
146

Jeromy's avatar
Jeromy committed
147
	l := lm.ledger(p)
148 149 150
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
		l.wantList.Remove(block.Key())
Jeromy's avatar
Jeromy committed
151
		lm.tasklist.Cancel(block.Key(), p)
152 153 154 155 156
	}

	return nil
}

Jeromy's avatar
Jeromy committed
157 158 159
func (lm *LedgerManager) NumBytesSentTo(p peer.Peer) uint64 {
	lm.lock.RLock()
	defer lm.lock.RUnlock()
160

Jeromy's avatar
Jeromy committed
161
	return lm.ledger(p).Accounting.BytesSent
162 163
}

Jeromy's avatar
Jeromy committed
164 165 166
func (lm *LedgerManager) NumBytesReceivedFrom(p peer.Peer) uint64 {
	lm.lock.RLock()
	defer lm.lock.RUnlock()
167

Jeromy's avatar
Jeromy committed
168
	return lm.ledger(p).Accounting.BytesRecv
169 170 171
}

// ledger lazily instantiates a ledger
Jeromy's avatar
Jeromy committed
172 173
func (lm *LedgerManager) ledger(p peer.Peer) *ledger {
	l, ok := lm.ledgerMap[peerKey(p.Key())]
174 175
	if !ok {
		l = newLedger(p)
Jeromy's avatar
Jeromy committed
176
		lm.ledgerMap[peerKey(p.Key())] = l
177 178 179
	}
	return l
}