engine.go 6.49 KB
Newer Older
1
package decision
2 3 4 5

import (
	"sync"

6
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Jeromy's avatar
Jeromy committed
7
	bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
8 9 10 11 12 13
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
)

14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
// 1. an initial `sendwantlist` message to a provider of the first key in a request
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as appropriate
// * when handling `cancel`, if we recently received a wanted block from a
// 	 peer, include a partial wantlist that contains a few other high priority
//   blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).

44
var log = u.Logger("engine")
45

Brian Tiger Chow's avatar
Brian Tiger Chow committed
46 47 48 49
const (
	sizeOutboxChan = 4
)

50
// Envelope contains a message for a Peer
51
type Envelope struct {
52
	// Peer is the intended recipient
53
	Peer peer.ID
54
	// Message is the payload
55 56 57
	Message bsmsg.BitSwapMessage
}

58
type Engine struct {
59 60 61
	// peerRequestQueue is a priority queue of requests received from peers.
	// Requests are popped from the queue, packaged up, and placed in the
	// outbox.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
62
	peerRequestQueue *taskQueue
63

64 65 66 67 68
	// FIXME it's a bit odd for the client and the worker to both share memory
	// (both modify the peerRequestQueue) and also to communicate over the
	// workSignal channel. consider sending requests over the channel and
	// allowing the worker to have exclusive access to the peerRequestQueue. In
	// that case, no lock would be required.
Jeromy's avatar
Jeromy committed
69
	workSignal chan struct{}
70

71
	// outbox contains outgoing messages to peers
72 73 74 75
	outbox chan Envelope

	bs bstore.Blockstore

76
	lock sync.RWMutex // protects the fields immediatly below
77
	// ledgerMap lists Ledgers by their Partner key.
78
	ledgerMap map[peer.ID]*ledger
79 80
}

81 82
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
	e := &Engine{
83
		ledgerMap:        make(map[peer.ID]*ledger),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
84 85
		bs:               bs,
		peerRequestQueue: newTaskQueue(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
86
		outbox:           make(chan Envelope, sizeOutboxChan),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87
		workSignal:       make(chan struct{}),
88
	}
89 90
	go e.taskWorker(ctx)
	return e
Jeromy's avatar
Jeromy committed
91 92
}

93
func (e *Engine) taskWorker(ctx context.Context) {
Jeromy's avatar
Jeromy committed
94
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
95
		nextTask := e.peerRequestQueue.Pop()
Jeromy's avatar
Jeromy committed
96 97 98 99
		if nextTask == nil {
			// No tasks in the list?
			// Wait until there are!
			select {
100
			case <-ctx.Done():
Jeromy's avatar
Jeromy committed
101
				return
102
			case <-e.workSignal:
Jeromy's avatar
Jeromy committed
103 104 105
			}
			continue
		}
106
		block, err := e.bs.Get(nextTask.Entry.Key)
107
		if err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
108 109
			log.Warning("engine: task exists to send block, but block is not in blockstore")
			continue
110 111 112 113 114 115
		}
		// construct message here so we can make decisions about any additional
		// information we may want to include at this time.
		m := bsmsg.New()
		m.AddBlock(block)
		// TODO: maybe add keys from our wantlist?
Jeromy's avatar
Jeromy committed
116
		select {
117
		case <-ctx.Done():
Jeromy's avatar
Jeromy committed
118
			return
119
		case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
Jeromy's avatar
Jeromy committed
120 121 122 123
		}
	}
}

124 125
func (e *Engine) Outbox() <-chan Envelope {
	return e.outbox
126 127 128
}

// Returns a slice of Peers with whom the local node has active sessions
129
func (e *Engine) Peers() []peer.ID {
130 131
	e.lock.RLock()
	defer e.lock.RUnlock()
132

133
	response := make([]peer.ID, 0)
134
	for _, ledger := range e.ledgerMap {
135 136 137 138 139 140 141
		response = append(response, ledger.Partner)
	}
	return response
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
142
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
143 144 145 146 147
	newWorkExists := false
	defer func() {
		if newWorkExists {
			// Signal task generation to restart (if stopped!)
			select {
148
			case e.workSignal <- struct{}{}:
149 150 151 152
			default:
			}
		}
	}()
153 154
	e.lock.Lock()
	defer e.lock.Unlock()
Jeromy's avatar
Jeromy committed
155

156
	l := e.findOrCreate(p)
157 158 159
	if m.Full() {
		l.wantList = wl.New()
	}
160 161 162
	for _, entry := range m.Wantlist() {
		if entry.Cancel {
			l.CancelWant(entry.Key)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
163
			e.peerRequestQueue.Remove(entry.Key, p)
164
		} else {
165
			l.Wants(entry.Key, entry.Priority)
166 167
			if exists, err := e.bs.Has(entry.Key); err == nil && exists {
				newWorkExists = true
168
				e.peerRequestQueue.Push(entry.Entry, p)
169
			}
170 171
		}
	}
Jeromy's avatar
Jeromy committed
172

173 174 175
	for _, block := range m.Blocks() {
		// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
		l.ReceivedBytes(len(block.Data))
176
		for _, l := range e.ledgerMap {
Jeromy's avatar
Jeromy committed
177
			if l.WantListContains(block.Key()) {
178
				newWorkExists = true
179
				e.peerRequestQueue.Push(wl.Entry{block.Key(), 1}, l.Partner)
Jeromy's avatar
Jeromy committed
180 181
			}
		}
182 183 184 185 186 187 188 189 190 191
	}
	return nil
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

192
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
193 194
	e.lock.Lock()
	defer e.lock.Unlock()
195

196
	l := e.findOrCreate(p)
197 198 199
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
		l.wantList.Remove(block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
200
		e.peerRequestQueue.Remove(block.Key(), p)
201 202 203 204 205
	}

	return nil
}

206
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
207
	// NB not threadsafe
208
	return e.findOrCreate(p).Accounting.BytesSent
209 210
}

211
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
212
	// NB not threadsafe
213
	return e.findOrCreate(p).Accounting.BytesRecv
214 215 216
}

// ledger lazily instantiates a ledger
217 218
func (e *Engine) findOrCreate(p peer.ID) *ledger {
	l, ok := e.ledgerMap[p]
219 220
	if !ok {
		l = newLedger(p)
221
		e.ledgerMap[p] = l
222 223 224
	}
	return l
}