engine.go 6.55 KB
Newer Older
1
package decision
2 3 4 5

import (
	"sync"

6
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Jeromy's avatar
Jeromy committed
7
	bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
8 9
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	peer "github.com/jbenet/go-ipfs/p2p/peer"
11 12 13
	u "github.com/jbenet/go-ipfs/util"
)

14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
// 1. an initial `sendwantlist` message to a provider of the first key in a request
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as appropriate
// * when handling `cancel`, if we recently received a wanted block from a
// 	 peer, include a partial wantlist that contains a few other high priority
//   blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).

44
var log = u.Logger("engine")
45

Brian Tiger Chow's avatar
Brian Tiger Chow committed
46 47 48 49
const (
	sizeOutboxChan = 4
)

50
// Envelope contains a message for a Peer
51
type Envelope struct {
52
	// Peer is the intended recipient
53
	Peer peer.ID
54
	// Message is the payload
55 56 57
	Message bsmsg.BitSwapMessage
}

58
type Engine struct {
59 60 61
	// peerRequestQueue is a priority queue of requests received from peers.
	// Requests are popped from the queue, packaged up, and placed in the
	// outbox.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
62
	peerRequestQueue *taskQueue
63

64 65 66 67 68
	// FIXME it's a bit odd for the client and the worker to both share memory
	// (both modify the peerRequestQueue) and also to communicate over the
	// workSignal channel. consider sending requests over the channel and
	// allowing the worker to have exclusive access to the peerRequestQueue. In
	// that case, no lock would be required.
Jeromy's avatar
Jeromy committed
69
	workSignal chan struct{}
70

71
	// outbox contains outgoing messages to peers
72 73 74 75
	outbox chan Envelope

	bs bstore.Blockstore

76
	lock sync.RWMutex // protects the fields immediatly below
77
	// ledgerMap lists Ledgers by their Partner key.
78
	ledgerMap map[peer.ID]*ledger
79 80
}

81 82
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
	e := &Engine{
83
		ledgerMap:        make(map[peer.ID]*ledger),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
84 85
		bs:               bs,
		peerRequestQueue: newTaskQueue(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
86
		outbox:           make(chan Envelope, sizeOutboxChan),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87
		workSignal:       make(chan struct{}),
88
	}
89 90
	go e.taskWorker(ctx)
	return e
Jeromy's avatar
Jeromy committed
91 92
}

93
func (e *Engine) taskWorker(ctx context.Context) {
Jeromy's avatar
Jeromy committed
94
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
95
		nextTask := e.peerRequestQueue.Pop()
Jeromy's avatar
Jeromy committed
96 97 98 99
		if nextTask == nil {
			// No tasks in the list?
			// Wait until there are!
			select {
100
			case <-ctx.Done():
Jeromy's avatar
Jeromy committed
101
				return
102
			case <-e.workSignal:
Jeromy's avatar
Jeromy committed
103 104 105
			}
			continue
		}
106
		block, err := e.bs.Get(nextTask.Entry.Key)
107
		if err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
108 109
			log.Warning("engine: task exists to send block, but block is not in blockstore")
			continue
110 111 112 113 114 115
		}
		// construct message here so we can make decisions about any additional
		// information we may want to include at this time.
		m := bsmsg.New()
		m.AddBlock(block)
		// TODO: maybe add keys from our wantlist?
Jeromy's avatar
Jeromy committed
116
		select {
117
		case <-ctx.Done():
Jeromy's avatar
Jeromy committed
118
			return
119
		case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
Jeromy's avatar
Jeromy committed
120 121 122 123
		}
	}
}

124 125
func (e *Engine) Outbox() <-chan Envelope {
	return e.outbox
126 127 128
}

// Returns a slice of Peers with whom the local node has active sessions
129
func (e *Engine) Peers() []peer.ID {
130 131
	e.lock.RLock()
	defer e.lock.RUnlock()
132

133
	response := make([]peer.ID, 0)
134
	for _, ledger := range e.ledgerMap {
135 136 137 138 139 140 141
		response = append(response, ledger.Partner)
	}
	return response
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
142
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
143 144 145
	newWorkExists := false
	defer func() {
		if newWorkExists {
146
			e.signalNewWork()
147 148
		}
	}()
149

150 151
	e.lock.Lock()
	defer e.lock.Unlock()
Jeromy's avatar
Jeromy committed
152

153
	l := e.findOrCreate(p)
154 155 156
	if m.Full() {
		l.wantList = wl.New()
	}
157 158 159
	for _, entry := range m.Wantlist() {
		if entry.Cancel {
			l.CancelWant(entry.Key)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
160
			e.peerRequestQueue.Remove(entry.Key, p)
161
		} else {
162
			l.Wants(entry.Key, entry.Priority)
163 164
			if exists, err := e.bs.Has(entry.Key); err == nil && exists {
				newWorkExists = true
165
				e.peerRequestQueue.Push(entry.Entry, p)
166
			}
167 168
		}
	}
Jeromy's avatar
Jeromy committed
169

170 171 172
	for _, block := range m.Blocks() {
		// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
		l.ReceivedBytes(len(block.Data))
173
		for _, l := range e.ledgerMap {
Jeromy's avatar
Jeromy committed
174
			if l.WantListContains(block.Key()) {
175
				newWorkExists = true
176
				e.peerRequestQueue.Push(wl.Entry{block.Key(), 1}, l.Partner)
Jeromy's avatar
Jeromy committed
177 178
			}
		}
179 180 181 182 183 184 185 186 187 188
	}
	return nil
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

189
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
190 191
	e.lock.Lock()
	defer e.lock.Unlock()
192

193
	l := e.findOrCreate(p)
194 195 196
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
		l.wantList.Remove(block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
197
		e.peerRequestQueue.Remove(block.Key(), p)
198 199 200 201 202
	}

	return nil
}

203
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
204
	// NB not threadsafe
205
	return e.findOrCreate(p).Accounting.BytesSent
206 207
}

208
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
209
	// NB not threadsafe
210
	return e.findOrCreate(p).Accounting.BytesRecv
211 212 213
}

// ledger lazily instantiates a ledger
214 215
func (e *Engine) findOrCreate(p peer.ID) *ledger {
	l, ok := e.ledgerMap[p]
216 217
	if !ok {
		l = newLedger(p)
218
		e.ledgerMap[p] = l
219 220 221
	}
	return l
}
222 223 224 225 226 227 228 229

func (e *Engine) signalNewWork() {
	// Signal task generation to restart (if stopped!)
	select {
	case e.workSignal <- struct{}{}:
	default:
	}
}