engine.go 7.63 KB
Newer Older
1
package decision
2 3 4 5

import (
	"sync"

6
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Jeromy's avatar
Jeromy committed
7
	bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
8 9
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	peer "github.com/jbenet/go-ipfs/p2p/peer"
11
	eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
12 13
)

14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
// 1. an initial `sendwantlist` message to a provider of the first key in a request
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as appropriate
// * when handling `cancel`, if we recently received a wanted block from a
// 	 peer, include a partial wantlist that contains a few other high priority
//   blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44
var log = eventlog.Logger("engine")
45

Brian Tiger Chow's avatar
Brian Tiger Chow committed
46
const (
47 48
	// outboxChanBuffer must be 0 to prevent stale messages from being sent
	outboxChanBuffer = 0
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50
)

51
// Envelope contains a message for a Peer
52
type Envelope struct {
53
	// Peer is the intended recipient
54
	Peer peer.ID
55
	// Message is the payload
56 57 58
	Message bsmsg.BitSwapMessage
}

59
type Engine struct {
60 61 62
	// peerRequestQueue is a priority queue of requests received from peers.
	// Requests are popped from the queue, packaged up, and placed in the
	// outbox.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
63
	peerRequestQueue peerRequestQueue
64

65 66 67 68 69
	// FIXME it's a bit odd for the client and the worker to both share memory
	// (both modify the peerRequestQueue) and also to communicate over the
	// workSignal channel. consider sending requests over the channel and
	// allowing the worker to have exclusive access to the peerRequestQueue. In
	// that case, no lock would be required.
Jeromy's avatar
Jeromy committed
70
	workSignal chan struct{}
71

72 73
	// outbox contains outgoing messages to peers. This is owned by the
	// taskWorker goroutine
Brian Tiger Chow's avatar
Brian Tiger Chow committed
74
	outbox chan (<-chan *Envelope)
75 76 77

	bs bstore.Blockstore

78
	lock sync.RWMutex // protects the fields immediatly below
79
	// ledgerMap lists Ledgers by their Partner key.
80
	ledgerMap map[peer.ID]*ledger
81 82
}

83 84
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
	e := &Engine{
85
		ledgerMap:        make(map[peer.ID]*ledger),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
86
		bs:               bs,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87
		peerRequestQueue: newPRQ(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
88
		outbox:           make(chan (<-chan *Envelope), outboxChanBuffer),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
89
		workSignal:       make(chan struct{}),
90
	}
91 92
	go e.taskWorker(ctx)
	return e
Jeromy's avatar
Jeromy committed
93 94
}

95
func (e *Engine) taskWorker(ctx context.Context) {
96 97
	defer close(e.outbox) // because taskWorker uses the channel exclusively
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
98
		oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
99 100 101 102 103 104 105 106 107 108 109 110
		select {
		case <-ctx.Done():
			return
		case e.outbox <- oneTimeUse:
		}
		// receiver is ready for an outoing envelope. let's prepare one. first,
		// we must acquire a task from the PQ...
		envelope, err := e.nextEnvelope(ctx)
		if err != nil {
			close(oneTimeUse)
			return // ctx cancelled
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
111
		oneTimeUse <- envelope // buffered. won't block
112 113 114 115 116 117 118
		close(oneTimeUse)
	}
}

// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
Jeromy's avatar
Jeromy committed
119
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
120
		nextTask := e.peerRequestQueue.Pop()
121
		for nextTask == nil {
Jeromy's avatar
Jeromy committed
122
			select {
123
			case <-ctx.Done():
124
				return nil, ctx.Err()
125
			case <-e.workSignal:
126
				nextTask = e.peerRequestQueue.Pop()
Jeromy's avatar
Jeromy committed
127 128
			}
		}
129 130

		// with a task in hand, we're ready to prepare the envelope...
131

132
		block, err := e.bs.Get(nextTask.Entry.Key)
133
		if err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
134
			continue
135
		}
136 137

		m := bsmsg.New() // TODO: maybe add keys from our wantlist?
138
		m.AddBlock(block)
139
		return &Envelope{Peer: nextTask.Target, Message: m}, nil
Jeromy's avatar
Jeromy committed
140 141 142
	}
}

143
// Outbox returns a channel of one-time use Envelope channels.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
144
func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
145
	return e.outbox
146 147 148
}

// Returns a slice of Peers with whom the local node has active sessions
149
func (e *Engine) Peers() []peer.ID {
150 151
	e.lock.RLock()
	defer e.lock.RUnlock()
152

153
	response := make([]peer.ID, 0)
154
	for _, ledger := range e.ledgerMap {
155 156 157 158 159 160 161
		response = append(response, ledger.Partner)
	}
	return response
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
162
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
163
	log := log.Prefix("bitswap.Engine.MessageReceived(%s)", p)
164
	log.Debugf("enter. %d entries %d blocks", len(m.Wantlist()), len(m.Blocks()))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165 166
	defer log.Debugf("exit")

167 168 169 170
	if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
		log.Info("superfluous message")
	}

171 172 173
	newWorkExists := false
	defer func() {
		if newWorkExists {
174
			e.signalNewWork()
175 176
		}
	}()
177

178 179
	e.lock.Lock()
	defer e.lock.Unlock()
Jeromy's avatar
Jeromy committed
180

181
	l := e.findOrCreate(p)
182 183 184
	if m.Full() {
		l.wantList = wl.New()
	}
185

186 187
	for _, entry := range m.Wantlist() {
		if entry.Cancel {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188
			log.Debug("cancel", entry.Key)
189
			l.CancelWant(entry.Key)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
190
			e.peerRequestQueue.Remove(entry.Key, p)
191
		} else {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192
			log.Debug("wants", entry.Key, entry.Priority)
193
			l.Wants(entry.Key, entry.Priority)
194
			if exists, err := e.bs.Has(entry.Key); err == nil && exists {
195
				e.peerRequestQueue.Push(entry.Entry, p)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
196
				newWorkExists = true
197
			}
198 199
		}
	}
Jeromy's avatar
Jeromy committed
200

201
	for _, block := range m.Blocks() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202
		log.Debug("got block %s %d bytes", block.Key(), len(block.Data))
203
		l.ReceivedBytes(len(block.Data))
204
		for _, l := range e.ledgerMap {
205 206
			if entry, ok := l.WantListContains(block.Key()); ok {
				e.peerRequestQueue.Push(entry, l.Partner)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
207
				newWorkExists = true
Jeromy's avatar
Jeromy committed
208 209
			}
		}
210 211 212 213 214 215 216 217 218 219
	}
	return nil
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

220
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
221 222
	e.lock.Lock()
	defer e.lock.Unlock()
223

224
	l := e.findOrCreate(p)
225 226 227
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
		l.wantList.Remove(block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
228
		e.peerRequestQueue.Remove(block.Key(), p)
229 230 231 232 233
	}

	return nil
}

234
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
235
	// NB not threadsafe
236
	return e.findOrCreate(p).Accounting.BytesSent
237 238
}

239
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
240
	// NB not threadsafe
241
	return e.findOrCreate(p).Accounting.BytesRecv
242 243 244
}

// ledger lazily instantiates a ledger
245 246
func (e *Engine) findOrCreate(p peer.ID) *ledger {
	l, ok := e.ledgerMap[p]
247 248
	if !ok {
		l = newLedger(p)
249
		e.ledgerMap[p] = l
250 251 252
	}
	return l
}
253 254 255 256 257 258 259 260

func (e *Engine) signalNewWork() {
	// Signal task generation to restart (if stopped!)
	select {
	case e.workSignal <- struct{}{}:
	default:
	}
}