engine.go 4.4 KB
Newer Older
1
package decision
2 3 4 5

import (
	"sync"

6
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Jeromy's avatar
Jeromy committed
7
	bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
8 9 10 11 12 13
	bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
	wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
)

14
var log = u.Logger("engine")
15

16
// Envelope contains a message for a Peer
17
type Envelope struct {
18 19 20
	// Peer is the intended recipient
	Peer peer.Peer
	// Message is the payload
21 22 23
	Message bsmsg.BitSwapMessage
}

24
type Engine struct {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
25 26 27 28
	// FIXME peerRequestQueue isn't threadsafe nor is it protected by a mutex.
	// consider a way to avoid sharing the peerRequestQueue between the worker
	// and the receiver
	peerRequestQueue *taskQueue
29

Jeromy's avatar
Jeromy committed
30
	workSignal chan struct{}
31 32 33 34 35 36 37 38

	outbox chan Envelope

	bs bstore.Blockstore

	lock sync.RWMutex
	// ledgerMap lists Ledgers by their Partner key.
	ledgerMap map[u.Key]*ledger
39 40
}

41 42
func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
	e := &Engine{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
43 44 45 46 47
		ledgerMap:        make(map[u.Key]*ledger),
		bs:               bs,
		peerRequestQueue: newTaskQueue(),
		outbox:           make(chan Envelope, 4), // TODO extract constant
		workSignal:       make(chan struct{}),
48
	}
49 50
	go e.taskWorker(ctx)
	return e
Jeromy's avatar
Jeromy committed
51 52
}

53
func (e *Engine) taskWorker(ctx context.Context) {
Jeromy's avatar
Jeromy committed
54
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
55
		nextTask := e.peerRequestQueue.Pop()
Jeromy's avatar
Jeromy committed
56 57 58 59
		if nextTask == nil {
			// No tasks in the list?
			// Wait until there are!
			select {
60
			case <-ctx.Done():
Jeromy's avatar
Jeromy committed
61
				return
62
			case <-e.workSignal:
Jeromy's avatar
Jeromy committed
63 64 65
			}
			continue
		}
66
		block, err := e.bs.Get(nextTask.Entry.Key)
67 68 69 70 71 72 73 74
		if err != nil {
			continue // TODO maybe return an error
		}
		// construct message here so we can make decisions about any additional
		// information we may want to include at this time.
		m := bsmsg.New()
		m.AddBlock(block)
		// TODO: maybe add keys from our wantlist?
Jeromy's avatar
Jeromy committed
75
		select {
76
		case <-ctx.Done():
Jeromy's avatar
Jeromy committed
77
			return
78
		case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
Jeromy's avatar
Jeromy committed
79 80 81 82
		}
	}
}

83 84
func (e *Engine) Outbox() <-chan Envelope {
	return e.outbox
85 86 87
}

// Returns a slice of Peers with whom the local node has active sessions
88 89 90
func (e *Engine) Peers() []peer.Peer {
	e.lock.RLock()
	defer e.lock.RUnlock()
91 92

	response := make([]peer.Peer, 0)
93
	for _, ledger := range e.ledgerMap {
94 95 96 97 98 99 100
		response = append(response, ledger.Partner)
	}
	return response
}

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
101
func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
102 103 104 105 106
	newWorkExists := false
	defer func() {
		if newWorkExists {
			// Signal task generation to restart (if stopped!)
			select {
107
			case e.workSignal <- struct{}{}:
108 109 110 111
			default:
			}
		}
	}()
112 113
	e.lock.Lock()
	defer e.lock.Unlock()
Jeromy's avatar
Jeromy committed
114

115
	l := e.findOrCreate(p)
116 117 118
	if m.Full() {
		l.wantList = wl.New()
	}
119 120 121
	for _, entry := range m.Wantlist() {
		if entry.Cancel {
			l.CancelWant(entry.Key)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
122
			e.peerRequestQueue.Remove(entry.Key, p)
123
		} else {
124
			l.Wants(entry.Key, entry.Priority)
125
			newWorkExists = true
Brian Tiger Chow's avatar
Brian Tiger Chow committed
126
			e.peerRequestQueue.Push(entry.Key, entry.Priority, p)
127 128
		}
	}
Jeromy's avatar
Jeromy committed
129

130 131 132
	for _, block := range m.Blocks() {
		// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
		l.ReceivedBytes(len(block.Data))
133
		for _, l := range e.ledgerMap {
Jeromy's avatar
Jeromy committed
134
			if l.WantListContains(block.Key()) {
135
				newWorkExists = true
Brian Tiger Chow's avatar
Brian Tiger Chow committed
136
				e.peerRequestQueue.Push(block.Key(), 1, l.Partner)
Jeromy's avatar
Jeromy committed
137 138
			}
		}
139 140 141 142 143 144 145 146 147 148
	}
	return nil
}

// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

149 150 151
func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
	e.lock.Lock()
	defer e.lock.Unlock()
152

153
	l := e.findOrCreate(p)
154 155 156
	for _, block := range m.Blocks() {
		l.SentBytes(len(block.Data))
		l.wantList.Remove(block.Key())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
157
		e.peerRequestQueue.Remove(block.Key(), p)
158 159 160 161 162
	}

	return nil
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
163 164
func (e *Engine) numBytesSentTo(p peer.Peer) uint64 {
	// NB not threadsafe
165
	return e.findOrCreate(p).Accounting.BytesSent
166 167
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
168 169
func (e *Engine) numBytesReceivedFrom(p peer.Peer) uint64 {
	// NB not threadsafe
170
	return e.findOrCreate(p).Accounting.BytesRecv
171 172 173
}

// ledger lazily instantiates a ledger
174 175
func (e *Engine) findOrCreate(p peer.Peer) *ledger {
	l, ok := e.ledgerMap[p.Key()]
176 177
	if !ok {
		l = newLedger(p)
178
		e.ledgerMap[p.Key()] = l
179 180 181
	}
	return l
}