engine.go 4.69 KB
Newer Older
1
package decision
2 3 4 5

import (
	"sync"

6
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Jeromy's avatar
Jeromy committed
7
	bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
8 9 10 11 12 13
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
)

14
var log = u.Logger("engine")
15

16
// Envelope contains a message for a Peer
17
type Envelope struct {
18 19 20
	// Peer is the intended recipient
	Peer peer.Peer
	// Message is the payload
21 22 23
	Message bsmsg.BitSwapMessage
}

24
type Engine struct {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
25 26 27 28
	// FIXME peerRequestQueue isn't threadsafe nor is it protected by a mutex.
	// consider a way to avoid sharing the peerRequestQueue between the worker
	// and the receiver
	peerRequestQueue *taskQueue
29

Jeromy's avatar
Jeromy committed
30
	workSignal chan struct{}
31 32 33 34 35 36 37 38

	outbox chan Envelope

	bs bstore.Blockstore

	lock sync.RWMutex
	// ledgerMap lists Ledgers by their Partner key.
	ledgerMap map[u.Key]*ledger
39 40
}

41 42
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
	e := &Engine{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
43 44 45 46 47
		ledgerMap:        make(map[u.Key]*ledger),
		bs:               bs,
		peerRequestQueue: newTaskQueue(),
		outbox:           make(chan Envelope, 4), // TODO extract constant
		workSignal:       make(chan struct{}),
48
	}
49 50
	go e.taskWorker(ctx)
	return e
Jeromy's avatar
Jeromy committed
51 52
}

53
func (e *Engine) taskWorker(ctx context.Context) {
Jeromy's avatar
Jeromy committed
54
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
55
		nextTask := e.peerRequestQueue.Pop()
Jeromy's avatar
Jeromy committed
56 57 58 59
		if nextTask == nil {
			// No tasks in the list?
			// Wait until there are!
			select {
60
			case <-ctx.Done():
Jeromy's avatar
Jeromy committed
61
				return
62
			case <-e.workSignal:
Jeromy's avatar
Jeromy committed
63 64 65
			}
			continue
		}
66
		block, err := e.bs.Get(nextTask.Entry.Key)
67 68 69 70 71 72 73 74
		if err != nil {
			continue // TODO maybe return an error
		}
		// construct message here so we can make decisions about any additional
		// information we may want to include at this time.
		m := bsmsg.New()
		m.AddBlock(block)
		// TODO: maybe add keys from our wantlist?
Jeromy's avatar
Jeromy committed
75
		select {
76
		case <-ctx.Done():
Jeromy's avatar
Jeromy committed
77
			return
78
		case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
Jeromy's avatar
Jeromy committed
79 80 81 82
		}
	}
}

83 84
func (e *Engine) Outbox() <-chan Envelope {
	return e.outbox
85 86 87
}

// Returns a slice of Peers with whom the local node has active sessions
88 89 90
func (e *Engine) Peers() []peer.Peer {
	e.lock.RLock()
	defer e.lock.RUnlock()
91 92

	response := make([]peer.Peer, 0)
93
	for _, ledger := range e.ledgerMap {
94 95 96 97 98 99 100
		response = append(response, ledger.Partner)
	}
	return response
}

// BlockIsWantedByPeer returns true if peer wants the block given by this
// key
101 102 103
func (e *Engine) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
	e.lock.RLock()
	defer e.lock.RUnlock()
104

105
	ledger := e.findOrCreate(p)
106 107 108 109 110
	return ledger.WantListContains(k)
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
111
func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
112 113 114 115 116
	newWorkExists := false
	defer func() {
		if newWorkExists {
			// Signal task generation to restart (if stopped!)
			select {
117
			case e.workSignal <- struct{}{}:
118 119 120 121
			default:
			}
		}
	}()
122 123
	e.lock.Lock()
	defer e.lock.Unlock()
Jeromy's avatar
Jeromy committed
124

125
	l := e.findOrCreate(p)
126 127 128
	if m.Full() {
		l.wantList = wl.New()
	}
129 130 131
	for _, entry := range m.Wantlist() {
		if entry.Cancel {
			l.CancelWant(entry.Key)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
132
			e.peerRequestQueue.Remove(entry.Key, p)
133
		} else {
134
			l.Wants(entry.Key, entry.Priority)
135
			newWorkExists = true
Brian Tiger Chow's avatar
Brian Tiger Chow committed
136
			e.peerRequestQueue.Push(entry.Key, entry.Priority, p)
137 138
		}
	}
Jeromy's avatar
Jeromy committed
139

140 141 142
	for _, block := range m.Blocks() {
		// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
		l.ReceivedBytes(len(block.Data))
143
		for _, l := range e.ledgerMap {
Jeromy's avatar
Jeromy committed
144
			if l.WantListContains(block.Key()) {
145
				newWorkExists = true
Brian Tiger Chow's avatar
Brian Tiger Chow committed
146
				e.peerRequestQueue.Push(block.Key(), 1, l.Partner)
Jeromy's avatar
Jeromy committed
147 148
			}
		}
149 150 151 152 153 154 155 156 157 158
	}
	return nil
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

159 160 161
func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
	e.lock.Lock()
	defer e.lock.Unlock()
162

163
	l := e.findOrCreate(p)
164 165 166
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
		l.wantList.Remove(block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
167
		e.peerRequestQueue.Remove(block.Key(), p)
168 169 170 171 172
	}

	return nil
}

173 174 175
func (e *Engine) NumBytesSentTo(p peer.Peer) uint64 {
	e.lock.RLock()
	defer e.lock.RUnlock()
176

177
	return e.findOrCreate(p).Accounting.BytesSent
178 179
}

180 181 182
func (e *Engine) NumBytesReceivedFrom(p peer.Peer) uint64 {
	e.lock.RLock()
	defer e.lock.RUnlock()
183

184
	return e.findOrCreate(p).Accounting.BytesRecv
185 186 187
}

// ledger lazily instantiates a ledger
188 189
func (e *Engine) findOrCreate(p peer.Peer) *ledger {
	l, ok := e.ledgerMap[p.Key()]
190 191
	if !ok {
		l = newLedger(p)
192
		e.ledgerMap[p.Key()] = l
193 194 195
	}
	return l
}