Commit 3b397e8e authored by Brian Tiger Chow's avatar Brian Tiger Chow

feat(PQ)

	refactor: peerRequestQueue

	it's a mistake to make one queue to fit all. Go's lack of algebraic
	types turns a generalized queue into a monstrosity of type
	checking/casting. Better to have individual queues for individual
	purposes.

	Conflicts:
		exchange/bitswap/decision/bench_test.go
		exchange/bitswap/decision/tasks/task_queue.go

	fix(bitswap.decision.PRQ): if peers match, always return result of pri comparison

	fix(bitswap.decision.Engine): push to the queue before notifying

	TOCTOU bug

	1. client notifies
	2. worker checks (finds nil)
	3. worker sleeps
	3. client pushes (worker missed the update)

	test(PQ): improve documentation and add test

	test(bitswap.decision.Engine): handling received messages

	License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent 6f8835c5
......@@ -13,12 +13,13 @@ import (
// FWIW: At the time of this commit, including a timestamp in task increases
// time cost of Push by 3%.
func BenchmarkTaskQueuePush(b *testing.B) {
q := newTaskQueue()
q := newPRQ()
peers := []peer.ID{
testutil.RandPeerIDFatal(b),
testutil.RandPeerIDFatal(b),
testutil.RandPeerIDFatal(b),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Push(wantlist.Entry{Key: util.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
}
......
......@@ -59,7 +59,7 @@ type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers.
// Requests are popped from the queue, packaged up, and placed in the
// outbox.
peerRequestQueue *taskQueue
peerRequestQueue peerRequestQueue
// FIXME it's a bit odd for the client and the worker to both share memory
// (both modify the peerRequestQueue) and also to communicate over the
......@@ -82,7 +82,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
bs: bs,
peerRequestQueue: newTaskQueue(),
peerRequestQueue: newPRQ(),
outbox: make(chan Envelope, sizeOutboxChan),
workSignal: make(chan struct{}),
}
......@@ -180,8 +180,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
log.Debug("wants", entry.Key, entry.Priority)
l.Wants(entry.Key, entry.Priority)
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
newWorkExists = true
e.peerRequestQueue.Push(entry.Entry, p)
newWorkExists = true
}
}
}
......@@ -191,8 +191,8 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
l.ReceivedBytes(len(block.Data))
for _, l := range e.ledgerMap {
if entry, ok := l.WantListContains(block.Key()); ok {
newWorkExists = true
e.peerRequestQueue.Push(entry, l.Partner)
newWorkExists = true
}
}
}
......
package decision
import (
"math"
"strings"
"sync"
"testing"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
message "github.com/jbenet/go-ipfs/exchange/bitswap/message"
peer "github.com/jbenet/go-ipfs/p2p/peer"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
type peerAndEngine struct {
......@@ -19,18 +21,20 @@ type peerAndEngine struct {
Engine *Engine
}
func newPeerAndLedgermanager(idStr string) peerAndEngine {
func newEngine(ctx context.Context, idStr string) peerAndEngine {
return peerAndEngine{
Peer: peer.ID(idStr),
//Strategy: New(true),
Engine: NewEngine(context.TODO(),
blockstore.NewBlockstore(sync.MutexWrap(ds.NewMapDatastore()))),
Engine: NewEngine(ctx,
blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))),
}
}
func TestConsistentAccounting(t *testing.T) {
sender := newPeerAndLedgermanager("Ernie")
receiver := newPeerAndLedgermanager("Bert")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sender := newEngine(ctx, "Ernie")
receiver := newEngine(ctx, "Bert")
// Send messages from Ernie to Bert
for i := 0; i < 1000; i++ {
......@@ -62,8 +66,10 @@ func TestConsistentAccounting(t *testing.T) {
func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
sanfrancisco := newPeerAndLedgermanager("sf")
seattle := newPeerAndLedgermanager("sea")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sanfrancisco := newEngine(ctx, "sf")
seattle := newEngine(ctx, "sea")
m := message.New()
......@@ -91,3 +97,96 @@ func peerIsPartner(p peer.ID, e *Engine) bool {
}
return false
}
func TestOutboxClosedWhenEngineClosed(t *testing.T) {
t.SkipNow() // TODO implement *Engine.Close
e := NewEngine(context.Background(), blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())))
var wg sync.WaitGroup
wg.Add(1)
go func() {
for _ = range e.Outbox() {
}
wg.Done()
}()
// e.Close()
wg.Wait()
if _, ok := <-e.Outbox(); ok {
t.Fatal("channel should be closed")
}
}
func TestPartnerWantsThenCancels(t *testing.T) {
alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
vowels := strings.Split("aeiou", "")
type testCase [][]string
testcases := []testCase{
testCase{
alphabet, vowels,
},
testCase{
alphabet, stringsComplement(alphabet, vowels),
},
}
for _, testcase := range testcases {
set := testcase[0]
cancels := testcase[1]
keeps := stringsComplement(set, cancels)
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := NewEngine(context.Background(), bs)
partner := testutil.RandPeerIDFatal(t)
for _, letter := range set {
block := blocks.NewBlock([]byte(letter))
bs.Put(block)
}
partnerWants(e, set, partner)
partnerCancels(e, cancels, partner)
assertPoppedInOrder(t, e, keeps)
}
}
func partnerWants(e *Engine, keys []string, partner peer.ID) {
add := message.New()
for i, letter := range keys {
block := blocks.NewBlock([]byte(letter))
add.AddEntry(block.Key(), math.MaxInt32-i)
}
e.MessageReceived(partner, add)
}
func partnerCancels(e *Engine, keys []string, partner peer.ID) {
cancels := message.New()
for _, k := range keys {
block := blocks.NewBlock([]byte(k))
cancels.Cancel(block.Key())
}
e.MessageReceived(partner, cancels)
}
func assertPoppedInOrder(t *testing.T, e *Engine, keys []string) {
for _, k := range keys {
envelope := <-e.Outbox()
received := envelope.Message.Blocks()[0]
expected := blocks.NewBlock([]byte(k))
if received.Key() != expected.Key() {
t.Fatal("received", string(received.Data), "expected", string(expected.Data))
}
}
}
func stringsComplement(set, subset []string) []string {
m := make(map[string]struct{})
for _, letter := range subset {
m[letter] = struct{}{}
}
var complement []string
for _, letter := range set {
if _, exists := m[letter]; !exists {
complement = append(complement, letter)
}
}
return complement
}
package decision
import (
"sync"
"time"
pq "github.com/jbenet/go-ipfs/exchange/bitswap/decision/pq"
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/jbenet/go-ipfs/p2p/peer"
u "github.com/jbenet/go-ipfs/util"
)
type peerRequestQueue interface {
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
Pop() *peerRequestTask
Push(entry wantlist.Entry, to peer.ID)
Remove(k u.Key, p peer.ID)
// NB: cannot expose simply expose taskQueue.Len because trashed elements
// may exist. These trashed elements should not contribute to the count.
}
func newPRQ() peerRequestQueue {
return &prq{
taskMap: make(map[string]*peerRequestTask),
taskQueue: pq.New(wrapCmp(V1)),
}
}
var _ peerRequestQueue = &prq{}
// TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
type prq struct {
lock sync.Mutex
taskQueue pq.PQ
taskMap map[string]*peerRequestTask
}
// Push currently adds a new peerRequestTask to the end of the list
func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
tl.lock.Lock()
defer tl.lock.Unlock()
if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
task.Entry.Priority = entry.Priority
tl.taskQueue.Update(task.index)
return
}
task := &peerRequestTask{
Entry: entry,
Target: to,
created: time.Now(),
}
tl.taskQueue.Push(task)
tl.taskMap[task.Key()] = task
}
// Pop 'pops' the next task to be performed. Returns nil if no task exists.
func (tl *prq) Pop() *peerRequestTask {
tl.lock.Lock()
defer tl.lock.Unlock()
var out *peerRequestTask
for tl.taskQueue.Len() > 0 {
out = tl.taskQueue.Pop().(*peerRequestTask)
delete(tl.taskMap, out.Key())
if out.trash {
continue // discarding tasks that have been removed
}
break // and return |out|
}
return out
}
// Remove removes a task from the queue
func (tl *prq) Remove(k u.Key, p peer.ID) {
tl.lock.Lock()
t, ok := tl.taskMap[taskKey(p, k)]
if ok {
// remove the task "lazily"
// simply mark it as trash, so it'll be dropped when popped off the
// queue.
t.trash = true
}
tl.lock.Unlock()
}
type peerRequestTask struct {
Entry wantlist.Entry
Target peer.ID // required
// trash in a book-keeping field
trash bool
// created marks the time that the task was added to the queue
created time.Time
index int // book-keeping field used by the pq container
}
// Key uniquely identifies a task.
func (t *peerRequestTask) Key() string {
return taskKey(t.Target, t.Entry.Key)
}
func (t *peerRequestTask) Index() int {
return t.index
}
func (t *peerRequestTask) SetIndex(i int) {
t.index = i
}
// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.ID, k u.Key) string {
return string(p.String() + k.String())
}
// FIFO is a basic task comparator that returns tasks in the order created.
var FIFO = func(a, b *peerRequestTask) bool {
return a.created.Before(b.created)
}
// V1 respects the target peer's wantlist priority. For tasks involving
// different peers, the oldest task is prioritized.
var V1 = func(a, b *peerRequestTask) bool {
if a.Target == b.Target {
return a.Entry.Priority > b.Entry.Priority
}
return FIFO(a, b)
}
func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool {
return func(a, b pq.Elem) bool {
return f(a.(*peerRequestTask), b.(*peerRequestTask))
}
}
package decision
import (
"math"
"math/rand"
"sort"
"strings"
"testing"
"github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
"github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/util/testutil"
)
func TestPushPop(t *testing.T) {
prq := newPRQ()
partner := testutil.RandPeerIDFatal(t)
alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
vowels := strings.Split("aeiou", "")
consonants := func() []string {
var out []string
for _, letter := range alphabet {
skip := false
for _, vowel := range vowels {
if letter == vowel {
skip = true
}
}
if !skip {
out = append(out, letter)
}
}
return out
}()
sort.Strings(alphabet)
sort.Strings(vowels)
sort.Strings(consonants)
// add a bunch of blocks. cancel some. drain the queue. the queue should only have the kept entries
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
letter := alphabet[index]
t.Log(partner.String())
prq.Push(wantlist.Entry{Key: util.Key(letter), Priority: math.MaxInt32 - index}, partner)
}
for _, consonant := range consonants {
prq.Remove(util.Key(consonant), partner)
}
for _, expected := range vowels {
received := prq.Pop().Entry.Key
if received != util.Key(expected) {
t.Fatal("received", string(received), "expected", string(expected))
}
}
}
package pq
import "container/heap"
// PQ is a basic priority queue.
type PQ interface {
// Push adds the ele
Push(Elem)
// Pop returns the highest priority Elem in PQ.
Pop() Elem
// Len returns the number of elements in the PQ.
Len() int
// Update `fixes` the PQ.
Update(index int)
// TODO explain why this interface should not be extended
// It does not support Remove. This is because...
}
// Elem describes elements that can be added to the PQ. Clients must implement
// this interface.
type Elem interface {
// SetIndex stores the int index.
SetIndex(int)
// Index returns the last given by SetIndex(int).
Index() int
}
// ElemComparator returns true if pri(a) > pri(b)
type ElemComparator func(a, b Elem) bool
// New creates a PQ with a client-supplied comparator.
func New(cmp ElemComparator) PQ {
q := &wrapper{heapinterface{
elems: make([]Elem, 0),
cmp: cmp,
}}
heap.Init(&q.heapinterface)
return q
}
// wrapper exists because we cannot re-define Push. We want to expose
// Push(Elem) but heap.Interface requires Push(interface{})
type wrapper struct {
heapinterface
}
var _ PQ = &wrapper{}
func (w *wrapper) Push(e Elem) {
heap.Push(&w.heapinterface, e)
}
func (w *wrapper) Pop() Elem {
return heap.Pop(&w.heapinterface).(Elem)
}
func (w *wrapper) Update(index int) {
heap.Fix(&w.heapinterface, index)
}
// heapinterface handles dirty low-level details of managing the priority queue.
type heapinterface struct {
elems []Elem
cmp ElemComparator
}
var _ heap.Interface = &heapinterface{}
// public interface
func (q *heapinterface) Len() int {
return len(q.elems)
}
// Less delegates the decision to the comparator
func (q *heapinterface) Less(i, j int) bool {
return q.cmp(q.elems[i], q.elems[j])
}
// Swap swaps the elements with indexes i and j.
func (q *heapinterface) Swap(i, j int) {
q.elems[i], q.elems[j] = q.elems[j], q.elems[i]
q.elems[i].SetIndex(i)
q.elems[j].SetIndex(j)
}
// Note that Push and Pop in this interface are for package heap's
// implementation to call. To add and remove things from the heap, wrap with
// the pq struct to call heap.Push and heap.Pop.
func (q *heapinterface) Push(x interface{}) { // where to put the elem?
t := x.(Elem)
t.SetIndex(len(q.elems))
q.elems = append(q.elems, t)
}
func (q *heapinterface) Pop() interface{} {
old := q.elems
n := len(old)
elem := old[n-1] // remove the last
elem.SetIndex(-1) // for safety // FIXME why?
q.elems = old[0 : n-1] // shrink
return elem
}
package pq
import (
"sort"
"testing"
)
type TestElem struct {
Key string
Priority int
index int
}
func (e *TestElem) Index() int {
return e.index
}
func (e *TestElem) SetIndex(i int) {
e.index = i
}
var PriorityComparator = func(i, j Elem) bool {
return i.(*TestElem).Priority > j.(*TestElem).Priority
}
func TestQueuesReturnTypeIsSameAsParameterToPush(t *testing.T) {
q := New(PriorityComparator)
expectedKey := "foo"
elem := &TestElem{Key: expectedKey}
q.Push(elem)
switch v := q.Pop().(type) {
case *TestElem:
if v.Key != expectedKey {
t.Fatal("the key doesn't match the pushed value")
}
default:
t.Fatal("the queue is not casting values appropriately")
}
}
func TestCorrectnessOfPop(t *testing.T) {
q := New(PriorityComparator)
tasks := []TestElem{
TestElem{Key: "a", Priority: 9},
TestElem{Key: "b", Priority: 4},
TestElem{Key: "c", Priority: 3},
TestElem{Key: "d", Priority: 0},
TestElem{Key: "e", Priority: 6},
}
for _, e := range tasks {
q.Push(&e)
}
var priorities []int
for q.Len() > 0 {
i := q.Pop().(*TestElem).Priority
t.Log("popped %v", i)
priorities = append(priorities, i)
}
if !sort.IntsAreSorted(priorities) {
t.Fatal("the values were not returned in sorted order")
}
}
func TestUpdate(t *testing.T) {
t.Log(`
Add 3 elements.
Update the highest priority element to have the lowest priority and fix the queue.
It should come out last.`)
q := New(PriorityComparator)
lowest := &TestElem{Key: "originallyLowest", Priority: 1}
middle := &TestElem{Key: "originallyMiddle", Priority: 2}
highest := &TestElem{Key: "toBeUpdated", Priority: 3}
q.Push(middle)
q.Push(highest)
q.Push(lowest)
if q.Pop().(*TestElem).Key != highest.Key {
t.Fatal("popped element doesn't have the highest priority")
}
q.Push(highest) // re-add the popped element
highest.Priority = 0 // update the PQ
q.Update(highest.Index()) // fix the PQ
if q.Pop().(*TestElem).Key != middle.Key {
t.Fatal("middle element should now have the highest priority")
}
}
package decision
import (
"fmt"
"sync"
"time"
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/jbenet/go-ipfs/p2p/peer"
u "github.com/jbenet/go-ipfs/util"
)
// TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
type taskQueue struct {
// TODO: make this into a priority queue
lock sync.Mutex
tasks []*task
taskmap map[string]*task
}
func newTaskQueue() *taskQueue {
return &taskQueue{
taskmap: make(map[string]*task),
}
}
type task struct {
Entry wantlist.Entry
Target peer.ID
Trash bool // TODO make private
created time.Time
}
func (t *task) String() string {
return fmt.Sprintf("<Task %s, %s, %v>", t.Target, t.Entry.Key, t.Trash)
}
// Push currently adds a new task to the end of the list
func (tl *taskQueue) Push(entry wantlist.Entry, to peer.ID) {
tl.lock.Lock()
defer tl.lock.Unlock()
if task, ok := tl.taskmap[taskKey(to, entry.Key)]; ok {
// TODO: when priority queue is implemented,
// rearrange this task
task.Entry.Priority = entry.Priority
return
}
task := &task{
Entry: entry,
Target: to,
created: time.Now(),
}
tl.tasks = append(tl.tasks, task)
tl.taskmap[taskKey(to, entry.Key)] = task
}
// Pop 'pops' the next task to be performed. Returns nil no task exists.
func (tl *taskQueue) Pop() *task {
tl.lock.Lock()
defer tl.lock.Unlock()
var out *task
for len(tl.tasks) > 0 {
// TODO: instead of zero, use exponential distribution
// it will help reduce the chance of receiving
// the same block from multiple peers
out = tl.tasks[0]
tl.tasks = tl.tasks[1:]
delete(tl.taskmap, taskKey(out.Target, out.Entry.Key))
if out.Trash {
continue // discarding tasks that have been removed
}
break // and return |out|
}
return out
}
// Remove lazily removes a task from the queue
func (tl *taskQueue) Remove(k u.Key, p peer.ID) {
tl.lock.Lock()
t, ok := tl.taskmap[taskKey(p, k)]
if ok {
t.Trash = true
}
tl.lock.Unlock()
}
// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.ID, k u.Key) string {
return string(p) + string(k)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment