engine.go 4.6 KB
Newer Older
1
package decision
2 3 4 5

import (
	"sync"

6
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Jeromy's avatar
Jeromy committed
7
	bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
8 9 10 11 12 13
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
)

14
var log = u.Logger("engine")
15

16
// Envelope contains a message for a Peer
17
type Envelope struct {
18 19 20
	// Peer is the intended recipient
	Peer peer.Peer
	// Message is the payload
21 22 23
	Message bsmsg.BitSwapMessage
}

24
type Engine struct {
25 26
	// FIXME taskqueue isn't threadsafe nor is it protected by a mutex. consider
	// a way to avoid sharing the taskqueue between the worker and the receiver
27 28
	taskqueue *taskQueue

Jeromy's avatar
Jeromy committed
29
	workSignal chan struct{}
30 31 32 33 34 35 36 37

	outbox chan Envelope

	bs bstore.Blockstore

	lock sync.RWMutex
	// ledgerMap lists Ledgers by their Partner key.
	ledgerMap map[u.Key]*ledger
38 39
}

40 41
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
	e := &Engine{
42
		ledgerMap:  make(map[u.Key]*ledger),
Jeromy's avatar
Jeromy committed
43
		bs:         bs,
44
		taskqueue:  newTaskQueue(),
45
		outbox:     make(chan Envelope, 4), // TODO extract constant
Jeromy's avatar
Jeromy committed
46
		workSignal: make(chan struct{}),
47
	}
48 49
	go e.taskWorker(ctx)
	return e
Jeromy's avatar
Jeromy committed
50 51
}

52
func (e *Engine) taskWorker(ctx context.Context) {
Jeromy's avatar
Jeromy committed
53
	for {
54
		nextTask := e.taskqueue.Pop()
Jeromy's avatar
Jeromy committed
55 56 57 58
		if nextTask == nil {
			// No tasks in the list?
			// Wait until there are!
			select {
59
			case <-ctx.Done():
Jeromy's avatar
Jeromy committed
60
				return
61
			case <-e.workSignal:
Jeromy's avatar
Jeromy committed
62 63 64
			}
			continue
		}
65
		block, err := e.bs.Get(nextTask.Entry.Key)
66 67 68 69 70 71 72 73
		if err != nil {
			continue // TODO maybe return an error
		}
		// construct message here so we can make decisions about any additional
		// information we may want to include at this time.
		m := bsmsg.New()
		m.AddBlock(block)
		// TODO: maybe add keys from our wantlist?
Jeromy's avatar
Jeromy committed
74
		select {
75
		case <-ctx.Done():
Jeromy's avatar
Jeromy committed
76
			return
77
		case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
Jeromy's avatar
Jeromy committed
78 79 80 81
		}
	}
}

82 83
func (e *Engine) Outbox() <-chan Envelope {
	return e.outbox
84 85 86
}

// Returns a slice of Peers with whom the local node has active sessions
87 88 89
func (e *Engine) Peers() []peer.Peer {
	e.lock.RLock()
	defer e.lock.RUnlock()
90 91

	response := make([]peer.Peer, 0)
92
	for _, ledger := range e.ledgerMap {
93 94 95 96 97 98 99
		response = append(response, ledger.Partner)
	}
	return response
}

// BlockIsWantedByPeer returns true if peer wants the block given by this
// key
100 101 102
func (e *Engine) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool {
	e.lock.RLock()
	defer e.lock.RUnlock()
103

104
	ledger := e.findOrCreate(p)
105 106 107 108 109
	return ledger.WantListContains(k)
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
110
func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
111 112 113 114 115
	newWorkExists := false
	defer func() {
		if newWorkExists {
			// Signal task generation to restart (if stopped!)
			select {
116
			case e.workSignal <- struct{}{}:
117 118 119 120
			default:
			}
		}
	}()
121 122
	e.lock.Lock()
	defer e.lock.Unlock()
Jeromy's avatar
Jeromy committed
123

124
	l := e.findOrCreate(p)
125 126 127
	if m.Full() {
		l.wantList = wl.New()
	}
128 129 130 131
	for _, entry := range m.Wantlist() {
		if entry.Cancel {
			l.CancelWant(entry.Key)
			e.taskqueue.Remove(entry.Key, p)
132
		} else {
133
			l.Wants(entry.Key, entry.Priority)
134
			newWorkExists = true
135
			e.taskqueue.Push(entry.Key, entry.Priority, p)
136 137
		}
	}
Jeromy's avatar
Jeromy committed
138

139 140 141
	for _, block := range m.Blocks() {
		// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
		l.ReceivedBytes(len(block.Data))
142
		for _, l := range e.ledgerMap {
Jeromy's avatar
Jeromy committed
143
			if l.WantListContains(block.Key()) {
144
				newWorkExists = true
145
				e.taskqueue.Push(block.Key(), 1, l.Partner)
Jeromy's avatar
Jeromy committed
146 147
			}
		}
148 149 150 151 152 153 154 155 156 157
	}
	return nil
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

158 159 160
func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
	e.lock.Lock()
	defer e.lock.Unlock()
161

162
	l := e.findOrCreate(p)
163 164 165
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
		l.wantList.Remove(block.Key())
166
		e.taskqueue.Remove(block.Key(), p)
167 168 169 170 171
	}

	return nil
}

172 173 174
func (e *Engine) NumBytesSentTo(p peer.Peer) uint64 {
	e.lock.RLock()
	defer e.lock.RUnlock()
175

176
	return e.findOrCreate(p).Accounting.BytesSent
177 178
}

179 180 181
func (e *Engine) NumBytesReceivedFrom(p peer.Peer) uint64 {
	e.lock.RLock()
	defer e.lock.RUnlock()
182

183
	return e.findOrCreate(p).Accounting.BytesRecv
184 185 186
}

// ledger lazily instantiates a ledger
187 188
func (e *Engine) findOrCreate(p peer.Peer) *ledger {
	l, ok := e.ledgerMap[p.Key()]
189 190
	if !ok {
		l = newLedger(p)
191
		e.ledgerMap[p.Key()] = l
192 193 194
	}
	return l
}