engine.go 4.54 KB
Newer Older
1
package decision
2 3 4 5

import (
	"sync"

6
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Jeromy's avatar
Jeromy committed
7
	bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
8 9 10 11 12 13
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
)

14
var log = u.Logger("engine")
15

Brian Tiger Chow's avatar
Brian Tiger Chow committed
16 17 18 19
const (
	sizeOutboxChan = 4
)

20
// Envelope contains a message for a Peer
21
type Envelope struct {
22 23 24
	// Peer is the intended recipient
	Peer peer.Peer
	// Message is the payload
25 26 27
	Message bsmsg.BitSwapMessage
}

28
type Engine struct {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
29 30 31 32
	// FIXME peerRequestQueue isn't threadsafe nor is it protected by a mutex.
	// consider a way to avoid sharing the peerRequestQueue between the worker
	// and the receiver
	peerRequestQueue *taskQueue
33

Jeromy's avatar
Jeromy committed
34
	workSignal chan struct{}
35 36 37 38 39 40 41 42

	outbox chan Envelope

	bs bstore.Blockstore

	lock sync.RWMutex
	// ledgerMap lists Ledgers by their Partner key.
	ledgerMap map[u.Key]*ledger
43 44
}

45 46
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
	e := &Engine{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47 48 49
		ledgerMap:        make(map[u.Key]*ledger),
		bs:               bs,
		peerRequestQueue: newTaskQueue(),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
50
		outbox:           make(chan Envelope, sizeOutboxChan),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
51
		workSignal:       make(chan struct{}),
52
	}
53 54
	go e.taskWorker(ctx)
	return e
Jeromy's avatar
Jeromy committed
55 56
}

57
func (e *Engine) taskWorker(ctx context.Context) {
Jeromy's avatar
Jeromy committed
58
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
59
		nextTask := e.peerRequestQueue.Pop()
Jeromy's avatar
Jeromy committed
60 61 62 63
		if nextTask == nil {
			// No tasks in the list?
			// Wait until there are!
			select {
64
			case <-ctx.Done():
Jeromy's avatar
Jeromy committed
65
				return
66
			case <-e.workSignal:
Jeromy's avatar
Jeromy committed
67 68 69
			}
			continue
		}
70
		block, err := e.bs.Get(nextTask.Entry.Key)
71
		if err != nil {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
72 73
			log.Warning("engine: task exists to send block, but block is not in blockstore")
			continue
74 75 76 77 78 79
		}
		// construct message here so we can make decisions about any additional
		// information we may want to include at this time.
		m := bsmsg.New()
		m.AddBlock(block)
		// TODO: maybe add keys from our wantlist?
Jeromy's avatar
Jeromy committed
80
		select {
81
		case <-ctx.Done():
Jeromy's avatar
Jeromy committed
82
			return
83
		case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
Jeromy's avatar
Jeromy committed
84 85 86 87
		}
	}
}

88 89
func (e *Engine) Outbox() <-chan Envelope {
	return e.outbox
90 91 92
}

// Returns a slice of Peers with whom the local node has active sessions
93 94 95
func (e *Engine) Peers() []peer.Peer {
	e.lock.RLock()
	defer e.lock.RUnlock()
96 97

	response := make([]peer.Peer, 0)
98
	for _, ledger := range e.ledgerMap {
99 100 101 102 103 104 105
		response = append(response, ledger.Partner)
	}
	return response
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
106
func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
107 108 109 110 111
	newWorkExists := false
	defer func() {
		if newWorkExists {
			// Signal task generation to restart (if stopped!)
			select {
112
			case e.workSignal <- struct{}{}:
113 114 115 116
			default:
			}
		}
	}()
117 118
	e.lock.Lock()
	defer e.lock.Unlock()
Jeromy's avatar
Jeromy committed
119

120
	l := e.findOrCreate(p)
121 122 123
	if m.Full() {
		l.wantList = wl.New()
	}
124 125 126
	for _, entry := range m.Wantlist() {
		if entry.Cancel {
			l.CancelWant(entry.Key)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
127
			e.peerRequestQueue.Remove(entry.Key, p)
128
		} else {
129
			l.Wants(entry.Key, entry.Priority)
130 131
			if exists, err := e.bs.Has(entry.Key); err == nil && exists {
				newWorkExists = true
132
				e.peerRequestQueue.Push(entry.Entry, p)
133
			}
134 135
		}
	}
Jeromy's avatar
Jeromy committed
136

137 138 139
	for _, block := range m.Blocks() {
		// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
		l.ReceivedBytes(len(block.Data))
140
		for _, l := range e.ledgerMap {
Jeromy's avatar
Jeromy committed
141
			if l.WantListContains(block.Key()) {
142
				newWorkExists = true
143
				e.peerRequestQueue.Push(wl.Entry{block.Key(), 1}, l.Partner)
Jeromy's avatar
Jeromy committed
144 145
			}
		}
146 147 148 149 150 151 152 153 154 155
	}
	return nil
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

156 157 158
func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
	e.lock.Lock()
	defer e.lock.Unlock()
159

160
	l := e.findOrCreate(p)
161 162 163
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
		l.wantList.Remove(block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
164
		e.peerRequestQueue.Remove(block.Key(), p)
165 166 167 168 169
	}

	return nil
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
170 171
func (e *Engine) numBytesSentTo(p peer.Peer) uint64 {
	// NB not threadsafe
172
	return e.findOrCreate(p).Accounting.BytesSent
173 174
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
175 176
func (e *Engine) numBytesReceivedFrom(p peer.Peer) uint64 {
	// NB not threadsafe
177
	return e.findOrCreate(p).Accounting.BytesRecv
178 179 180
}

// ledger lazily instantiates a ledger
181 182
func (e *Engine) findOrCreate(p peer.Peer) *ledger {
	l, ok := e.ledgerMap[p.Key()]
183 184
	if !ok {
		l = newLedger(p)
185
		e.ledgerMap[p.Key()] = l
186 187 188
	}
	return l
}