engine.go 7.2 KB
Newer Older
1
package decision
2 3 4 5

import (
	"sync"

6
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Jeromy's avatar
Jeromy committed
7
	bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
8 9
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	peer "github.com/jbenet/go-ipfs/p2p/peer"
11
	eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
12 13
)

14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
// 1. an initial `sendwantlist` message to a provider of the first key in a request
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as appropriate
// * when handling `cancel`, if we recently received a wanted block from a
// 	 peer, include a partial wantlist that contains a few other high priority
//   blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44
var log = eventlog.Logger("engine")
45

Brian Tiger Chow's avatar
Brian Tiger Chow committed
46 47 48 49
const (
	sizeOutboxChan = 4
)

50
// Envelope contains a message for a Peer
51
type Envelope struct {
52
	// Peer is the intended recipient
53
	Peer peer.ID
54
	// Message is the payload
55 56 57
	Message bsmsg.BitSwapMessage
}

58
type Engine struct {
59 60 61
	// peerRequestQueue is a priority queue of requests received from peers.
	// Requests are popped from the queue, packaged up, and placed in the
	// outbox.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
62
	peerRequestQueue *taskQueue
63

64 65 66 67 68
	// FIXME it's a bit odd for the client and the worker to both share memory
	// (both modify the peerRequestQueue) and also to communicate over the
	// workSignal channel. consider sending requests over the channel and
	// allowing the worker to have exclusive access to the peerRequestQueue. In
	// that case, no lock would be required.
Jeromy's avatar
Jeromy committed
69
	workSignal chan struct{}
70

71
	// outbox contains outgoing messages to peers
72 73 74 75
	outbox chan Envelope

	bs bstore.Blockstore

76
	lock sync.RWMutex // protects the fields immediatly below
77
	// ledgerMap lists Ledgers by their Partner key.
78
	ledgerMap map[peer.ID]*ledger
79 80
}

81 82
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
	e := &Engine{
83
		ledgerMap:        make(map[peer.ID]*ledger),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
84 85
		bs:               bs,
		peerRequestQueue: newTaskQueue(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
86
		outbox:           make(chan Envelope, sizeOutboxChan),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87
		workSignal:       make(chan struct{}),
88
	}
89 90
	go e.taskWorker(ctx)
	return e
Jeromy's avatar
Jeromy committed
91 92
}

93
func (e *Engine) taskWorker(ctx context.Context) {
94
	log := log.Prefix("bitswap.Engine.taskWorker")
Jeromy's avatar
Jeromy committed
95
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
96
		nextTask := e.peerRequestQueue.Pop()
Jeromy's avatar
Jeromy committed
97 98 99 100
		if nextTask == nil {
			// No tasks in the list?
			// Wait until there are!
			select {
101
			case <-ctx.Done():
102
				log.Debugf("exiting: %s", ctx.Err())
Jeromy's avatar
Jeromy committed
103
				return
104
			case <-e.workSignal:
105
				log.Debugf("woken up")
Jeromy's avatar
Jeromy committed
106 107 108
			}
			continue
		}
109 110 111
		log := log.Prefix("%s", nextTask)
		log.Debugf("processing")

112
		block, err := e.bs.Get(nextTask.Entry.Key)
113
		if err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
114 115
			log.Warning("engine: task exists to send block, but block is not in blockstore")
			continue
116 117 118 119 120 121
		}
		// construct message here so we can make decisions about any additional
		// information we may want to include at this time.
		m := bsmsg.New()
		m.AddBlock(block)
		// TODO: maybe add keys from our wantlist?
122
		log.Debugf("sending...")
Jeromy's avatar
Jeromy committed
123
		select {
124
		case <-ctx.Done():
Jeromy's avatar
Jeromy committed
125
			return
126
		case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
127
			log.Debugf("sent")
Jeromy's avatar
Jeromy committed
128 129 130 131
		}
	}
}

132 133
func (e *Engine) Outbox() <-chan Envelope {
	return e.outbox
134 135 136
}

// Returns a slice of Peers with whom the local node has active sessions
137
func (e *Engine) Peers() []peer.ID {
138 139
	e.lock.RLock()
	defer e.lock.RUnlock()
140

141
	response := make([]peer.ID, 0)
142
	for _, ledger := range e.ledgerMap {
143 144 145 146 147 148 149
		response = append(response, ledger.Partner)
	}
	return response
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
150
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
151
	log := log.Prefix("bitswap.Engine.MessageReceived(%s)", p)
152
	log.Debugf("enter. %d entries %d blocks", len(m.Wantlist()), len(m.Blocks()))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153 154
	defer log.Debugf("exit")

155 156 157 158
	if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
		log.Info("superfluous message")
	}

159 160 161
	newWorkExists := false
	defer func() {
		if newWorkExists {
162
			e.signalNewWork()
163 164
		}
	}()
165

166 167
	e.lock.Lock()
	defer e.lock.Unlock()
Jeromy's avatar
Jeromy committed
168

169
	l := e.findOrCreate(p)
170 171 172
	if m.Full() {
		l.wantList = wl.New()
	}
173

174 175
	for _, entry := range m.Wantlist() {
		if entry.Cancel {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
176
			log.Debug("cancel", entry.Key)
177
			l.CancelWant(entry.Key)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
178
			e.peerRequestQueue.Remove(entry.Key, p)
179
		} else {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
180
			log.Debug("wants", entry.Key, entry.Priority)
181
			l.Wants(entry.Key, entry.Priority)
182 183
			if exists, err := e.bs.Has(entry.Key); err == nil && exists {
				newWorkExists = true
184
				e.peerRequestQueue.Push(entry.Entry, p)
185
			}
186 187
		}
	}
Jeromy's avatar
Jeromy committed
188

189 190
	for _, block := range m.Blocks() {
		// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191
		log.Debug("got block %s %d bytes", block.Key(), len(block.Data))
192
		l.ReceivedBytes(len(block.Data))
193
		for _, l := range e.ledgerMap {
Jeromy's avatar
Jeromy committed
194
			if l.WantListContains(block.Key()) {
195
				newWorkExists = true
196
				e.peerRequestQueue.Push(wl.Entry{block.Key(), 1}, l.Partner)
Jeromy's avatar
Jeromy committed
197 198
			}
		}
199 200 201 202 203 204 205 206 207 208
	}
	return nil
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

209
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
210 211
	e.lock.Lock()
	defer e.lock.Unlock()
212

213
	l := e.findOrCreate(p)
214 215 216
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
		l.wantList.Remove(block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
217
		e.peerRequestQueue.Remove(block.Key(), p)
218 219 220 221 222
	}

	return nil
}

223
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
224
	// NB not threadsafe
225
	return e.findOrCreate(p).Accounting.BytesSent
226 227
}

228
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
229
	// NB not threadsafe
230
	return e.findOrCreate(p).Accounting.BytesRecv
231 232 233
}

// ledger lazily instantiates a ledger
234 235
func (e *Engine) findOrCreate(p peer.ID) *ledger {
	l, ok := e.ledgerMap[p]
236 237
	if !ok {
		l = newLedger(p)
238
		e.ledgerMap[p] = l
239 240 241
	}
	return l
}
242 243 244 245 246 247 248 249

func (e *Engine) signalNewWork() {
	// Signal task generation to restart (if stopped!)
	select {
	case e.workSignal <- struct{}{}:
	default:
	}
}