Unverified Commit f0529d7b authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #8 from ipfs/feat/proto-ext-poc

Extend peer task queue to work with want-have / want-block
parents 1e8b1e8c fd33b913
......@@ -3,7 +3,7 @@ module github.com/ipfs/go-peertaskqueue
go 1.12
require (
github.com/ipfs/go-ipfs-pq v0.0.1
github.com/ipfs/go-ipfs-pq v0.0.2
github.com/libp2p/go-libp2p-core v0.0.1
github.com/multiformats/go-multihash v0.0.5 // indirect
)
......@@ -15,14 +15,20 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-ipfs-pq v0.0.0-20191101181110-8122fa6a9529 h1:izQqDLe/uSPKe6NYr3FjwnvU0AAg0im/4DLVXplLFUQ=
github.com/ipfs/go-ipfs-pq v0.0.0-20191101181110-8122fa6a9529/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
......
......@@ -8,90 +8,72 @@ import (
)
// FIFOCompare is a basic task comparator that returns tasks in the order created.
var FIFOCompare = func(a, b *TaskBlock) bool {
var FIFOCompare = func(a, b *QueueTask) bool {
return a.created.Before(b.created)
}
// PriorityCompare respects the target peer's task priority. For tasks involving
// different peers, the oldest task is prioritized.
var PriorityCompare = func(a, b *TaskBlock) bool {
var PriorityCompare = func(a, b *QueueTask) bool {
if a.Target == b.Target {
return a.Priority > b.Priority
}
return FIFOCompare(a, b)
}
// WrapCompare wraps a TaskBlock comparison function so it can be used as
// WrapCompare wraps a QueueTask comparison function so it can be used as
// comparison for a priority queue
func WrapCompare(f func(a, b *TaskBlock) bool) func(a, b pq.Elem) bool {
func WrapCompare(f func(a, b *QueueTask) bool) func(a, b pq.Elem) bool {
return func(a, b pq.Elem) bool {
return f(a.(*TaskBlock), b.(*TaskBlock))
return f(a.(*QueueTask), b.(*QueueTask))
}
}
// Identifier is a unique identifier for a task. It's used by the client library
// Topic is a non-unique name for a task. It's used by the client library
// to act on a task once it exits the queue.
type Identifier interface{}
type Topic interface{}
// Task is a single task to be executed as part of a task block.
type Task struct {
Identifier Identifier
Priority int
}
// Data is used by the client to associate extra information with a Task
type Data interface{}
// TaskBlock is a block of tasks to execute on a single peer.
type TaskBlock struct {
Tasks []Task
// Task is a single task to be executed in Priority order.
type Task struct {
// Topic for the task
Topic Topic
// Priority of the task
Priority int
Target peer.ID
// A callback to signal that this task block has been completed
Done func([]Task)
// The size of the task
// - peers with most active work are deprioritized
// - peers with most pending work are prioritized
Work int
// Arbitrary data associated with this Task by the client
Data Data
}
// toPrune are the tasks that have already been taken care of as part of
// a different task block which can be removed from the task block.
toPrune map[Identifier]struct{}
// QueueTask contains a Task, and also some bookkeeping information.
// It is used internally by the PeerTracker to keep track of tasks.
type QueueTask struct {
Task
Target peer.ID
created time.Time // created marks the time that the task was added to the queue
index int // book-keeping field used by the pq container
}
// NewTaskBlock creates a new task block with the given tasks, priority, target
// peer, and task completion function.
func NewTaskBlock(tasks []Task, priority int, target peer.ID, done func([]Task)) *TaskBlock {
return &TaskBlock{
Tasks: tasks,
Priority: priority,
Target: target,
Done: done,
toPrune: make(map[Identifier]struct{}, len(tasks)),
created: time.Now(),
}
}
// MarkPrunable marks any tasks with the given identifier as prunable at the time
// the task block is pulled of the queue to execute (because they've already been removed).
func (pt *TaskBlock) MarkPrunable(identifier Identifier) {
pt.toPrune[identifier] = struct{}{}
}
// PruneTasks removes all tasks previously marked as prunable from the lists of
// tasks in the block
func (pt *TaskBlock) PruneTasks() {
newTasks := make([]Task, 0, len(pt.Tasks)-len(pt.toPrune))
for _, task := range pt.Tasks {
if _, ok := pt.toPrune[task.Identifier]; !ok {
newTasks = append(newTasks, task)
}
// NewQueueTask creates a new QueueTask from the given Task.
func NewQueueTask(task Task, target peer.ID, created time.Time) *QueueTask {
return &QueueTask{
Task: task,
Target: target,
created: created,
}
pt.Tasks = newTasks
}
// Index implements pq.Elem.
func (pt *TaskBlock) Index() int {
func (pt *QueueTask) Index() int {
return pt.index
}
// SetIndex implements pq.Elem.
func (pt *TaskBlock) SetIndex(i int) {
func (pt *QueueTask) SetIndex(i int) {
pt.index = i
}
......@@ -19,7 +19,7 @@ const (
type hookFunc func(p peer.ID, event peerTaskQueueEvent)
// PeerTaskQueue is a prioritized list of tasks to be executed on peers.
// The queue puts tasks on in blocks, then alternates between peers (roughly)
// Tasks are added to the queue, then popped off alternately between peers (roughly)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
......@@ -29,6 +29,7 @@ type PeerTaskQueue struct {
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
taskMerger peertracker.TaskMerger
}
// Option is a function that configures the peer task queue
......@@ -51,6 +52,16 @@ func IgnoreFreezing(ignoreFreezing bool) Option {
}
}
// TaskMerger is an option that specifies merge behaviour when pushing a task
// with the same Topic as an existing Topic.
func TaskMerger(tmfp peertracker.TaskMerger) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.taskMerger
ptq.taskMerger = tmfp
return TaskMerger(previous)
}
}
func removeHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
for i, testHook := range ptq.hooks {
......@@ -96,6 +107,7 @@ func New(options ...Option) *PeerTaskQueue {
peerTrackers: make(map[peer.ID]*peertracker.PeerTracker),
frozenPeers: make(map[peer.ID]struct{}),
pQueue: pq.New(peertracker.PeerCompare),
taskMerger: &peertracker.DefaultTaskMerger{},
}
ptq.Options(options...)
return ptq
......@@ -120,56 +132,98 @@ func (ptq *PeerTaskQueue) callHooks(to peer.ID, event peerTaskQueueEvent) {
}
}
// PushBlock adds a new block of tasks for the given peer to the queue
func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
// PushTasks adds a new group of tasks for the given peer to the queue
func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
peerTracker, ok := ptq.peerTrackers[to]
if !ok {
peerTracker = peertracker.New(to)
peerTracker = peertracker.New(to, ptq.taskMerger)
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
ptq.callHooks(to, peerAdded)
}
peerTracker.PushBlock(to, tasks, func(e []peertask.Task) {
ptq.lock.Lock()
for _, task := range e {
peerTracker.TaskDone(task.Identifier)
}
ptq.pQueue.Update(peerTracker.Index())
ptq.lock.Unlock()
})
peerTracker.PushTasks(tasks...)
ptq.pQueue.Update(peerTracker.Index())
}
// PopBlock 'pops' the next block of tasks to be performed. Returns nil if no block exists.
func (ptq *PeerTaskQueue) PopBlock() *peertask.TaskBlock {
// PopTasks finds the peer with the highest priority and pops as many tasks
// off the peer's queue as necessary to cover targetMinWork, in priority order.
// If there are not enough tasks to cover targetMinWork it just returns
// whatever is in the peer's queue.
// - Peers with the most "active" work are deprioritized.
// This heuristic is for fairness, we try to keep all peers "busy".
// - Peers with the most "pending" work are prioritized.
// This heuristic is so that peers with a lot to do get asked for work first.
// The third response argument is pending work: the amount of work in the
// queue for this peer.
func (ptq *PeerTaskQueue) PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
if ptq.pQueue.Len() == 0 {
return nil
return "", nil, -1
}
var peerTracker *peertracker.PeerTracker
// Choose the highest priority peer
peerTracker = ptq.pQueue.Peek().(*peertracker.PeerTracker)
if peerTracker == nil {
return "", nil, -1
}
peerTracker := ptq.pQueue.Pop().(*peertracker.PeerTracker)
out := peerTracker.PopBlock()
// Get the highest priority tasks for the given peer
out, pendingWork := peerTracker.PopTasks(targetMinWork)
// If the peer has no more tasks, remove its peer tracker
if peerTracker.IsIdle() {
ptq.pQueue.Pop()
target := peerTracker.Target()
delete(ptq.peerTrackers, target)
delete(ptq.frozenPeers, target)
ptq.callHooks(target, peerRemoved)
} else {
ptq.pQueue.Push(peerTracker)
// We may have modified the peer tracker's state (by popping tasks), so
// update its position in the priority queue
ptq.pQueue.Update(peerTracker.Index())
}
return out
return peerTracker.Target(), out, pendingWork
}
// TasksDone is called to indicate that the given tasks have completed
// for the given peer
func (ptq *PeerTaskQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
// Get the peer tracker for the peer
peerTracker, ok := ptq.peerTrackers[to]
if !ok {
return
}
// Tell the peer tracker that the tasks have completed
for _, task := range tasks {
peerTracker.TaskDone(task)
}
// This may affect the peer's position in the peer queue, so update if
// necessary
ptq.pQueue.Update(peerTracker.Index())
}
// Remove removes a task from the queue.
func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
func (ptq *PeerTaskQueue) Remove(topic peertask.Topic, p peer.ID) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
peerTracker, ok := ptq.peerTrackers[p]
if ok {
if peerTracker.Remove(identifier) {
if peerTracker.Remove(topic) {
// we now also 'freeze' that partner. If they sent us a cancel for a
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
......@@ -184,7 +238,6 @@ func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
ptq.pQueue.Update(peerTracker.Index())
}
}
ptq.lock.Unlock()
}
// FullThaw completely thaws all peers in the queue so they can execute tasks.
......
......@@ -41,9 +41,10 @@ func TestPushPop(t *testing.T) {
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
letter := alphabet[index]
t.Log(partner.String())
t.Log(letter)
ptq.PushBlock(partner, peertask.Task{Identifier: letter, Priority: math.MaxInt32 - index})
// add tasks out of order, but with in-order priority
ptq.PushTasks(partner, peertask.Task{Topic: letter, Priority: math.MaxInt32 - index})
}
for _, consonant := range consonants {
ptq.Remove(consonant, partner)
......@@ -53,13 +54,13 @@ func TestPushPop(t *testing.T) {
var out []string
for {
received := ptq.PopBlock()
if received == nil {
_, received, _ := ptq.PopTasks(100)
if len(received) == 0 {
break
}
for _, task := range received.Tasks {
out = append(out, task.Identifier.(string))
for _, task := range received {
out = append(out, task.Topic.(string))
}
}
......@@ -79,14 +80,13 @@ func TestFreezeUnfreeze(t *testing.T) {
c := peers[2]
d := peers[3]
// Have each push some blocks
// Push 5 blocks to each peer
for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushBlock(a, peertask.Task{Identifier: is})
ptq.PushBlock(b, peertask.Task{Identifier: is})
ptq.PushBlock(c, peertask.Task{Identifier: is})
ptq.PushBlock(d, peertask.Task{Identifier: is})
ptq.PushTasks(a, peertask.Task{Topic: is, Work: 1})
ptq.PushTasks(b, peertask.Task{Topic: is, Work: 1})
ptq.PushTasks(c, peertask.Task{Topic: is, Work: 1})
ptq.PushTasks(d, peertask.Task{Topic: is, Work: 1})
}
// now, pop off four tasks, there should be one from each
......@@ -121,10 +121,10 @@ func TestFreezeUnfreezeNoFreezingOption(t *testing.T) {
for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushBlock(a, peertask.Task{Identifier: is})
ptq.PushBlock(b, peertask.Task{Identifier: is})
ptq.PushBlock(c, peertask.Task{Identifier: is})
ptq.PushBlock(d, peertask.Task{Identifier: is})
ptq.PushTasks(a, peertask.Task{Topic: is, Work: 1})
ptq.PushTasks(b, peertask.Task{Topic: is, Work: 1})
ptq.PushTasks(c, peertask.Task{Topic: is, Work: 1})
ptq.PushTasks(d, peertask.Task{Topic: is, Work: 1})
}
// now, pop off four tasks, there should be one from each
......@@ -132,45 +132,103 @@ func TestFreezeUnfreezeNoFreezingOption(t *testing.T) {
ptq.Remove("1", b)
// b should be frozen, causing it to get skipped in the rotation
// b should not be frozen, so it wont get skipped in the rotation
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())
}
// This test checks that peers wont starve out other peers
func TestPeerRepeats(t *testing.T) {
// This test checks that ordering of peers is correct
func TestPeerOrder(t *testing.T) {
ptq := New()
peers := testutil.GeneratePeers(4)
peers := testutil.GeneratePeers(3)
a := peers[0]
b := peers[1]
c := peers[2]
d := peers[3]
// Have each push some blocks
ptq.PushTasks(a, peertask.Task{Topic: "1", Work: 3, Priority: 2})
ptq.PushTasks(a, peertask.Task{Topic: "2", Work: 1, Priority: 1})
for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushBlock(a, peertask.Task{Identifier: is})
ptq.PushBlock(b, peertask.Task{Identifier: is})
ptq.PushBlock(c, peertask.Task{Identifier: is})
ptq.PushBlock(d, peertask.Task{Identifier: is})
ptq.PushTasks(b, peertask.Task{Topic: "3", Work: 1, Priority: 3})
ptq.PushTasks(b, peertask.Task{Topic: "4", Work: 3, Priority: 2})
ptq.PushTasks(b, peertask.Task{Topic: "5", Work: 1, Priority: 1})
ptq.PushTasks(c, peertask.Task{Topic: "6", Work: 2, Priority: 2})
ptq.PushTasks(c, peertask.Task{Topic: "7", Work: 2, Priority: 1})
// All peers have nothing in their active queue, so equal chance of any
// peer being chosen
var ps []string
var ids []string
for i := 0; i < 3; i++ {
p, tasks, _ := ptq.PopTasks(1)
ps = append(ps, p.String())
ids = append(ids, fmt.Sprint(tasks[0].Topic))
}
matchArrays(t, ps, []string{a.String(), b.String(), c.String()})
matchArrays(t, ids, []string{"1", "3", "6"})
// Active queues:
// a: 3 Pending: [1]
// b: 1 Pending: [3, 1]
// c: 2 Pending: [2]
// So next peer should be b (least work in active queue)
p, tsk, pending := ptq.PopTasks(1)
if len(tsk) != 1 || p != b || tsk[0].Topic != "4" {
t.Fatal("Expected ID 4 from peer b")
}
if pending != 1 {
t.Fatal("Expected pending work to be 1")
}
// now, pop off four tasks, there should be one from each
tasks := matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())
// Now, if one of the tasks gets finished, the next task off the queue should
// be for the same peer
for blockI := 0; blockI < 4; blockI++ {
for i := 0; i < 4; i++ {
// its okay to mark the same task done multiple times here (JUST FOR TESTING)
tasks[i].Done(tasks[i].Tasks)
ntask := ptq.PopBlock()
if ntask.Target != tasks[i].Target {
t.Fatal("Expected task from peer with lowest active count")
}
}
// Active queues:
// a: 3 Pending: [1]
// b: 1 + 3 Pending: [1]
// c: 2 Pending: [2]
// So next peer should be c (least work in active queue)
p, tsk, pending = ptq.PopTasks(1)
if len(tsk) != 1 || p != c || tsk[0].Topic != "7" {
t.Fatal("Expected ID 7 from peer c")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
// Active queues:
// a: 3 Pending: [1]
// b: 1 + 3 Pending: [1]
// c: 2 + 2
// So next peer should be a (least work in active queue)
p, tsk, pending = ptq.PopTasks(1)
if len(tsk) != 1 || p != a || tsk[0].Topic != "2" {
t.Fatal("Expected ID 2 from peer a")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
// Active queues:
// a: 3 + 1
// b: 1 + 3 Pending: [1]
// c: 2 + 2
// a & c have no more pending tasks, so next peer should be b
p, tsk, pending = ptq.PopTasks(1)
if len(tsk) != 1 || p != b || tsk[0].Topic != "5" {
t.Fatal("Expected ID 5 from peer b")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
// Active queues:
// a: 3 + 1
// b: 1 + 3 + 1
// c: 2 + 2
// No more pending tasks, so next pop should return nothing
_, tsk, pending = ptq.PopTasks(1)
if len(tsk) != 0 {
t.Fatal("Expected no more tasks")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
}
......@@ -187,8 +245,8 @@ func TestHooks(t *testing.T) {
peers := testutil.GeneratePeers(2)
a := peers[0]
b := peers[1]
ptq.PushBlock(a, peertask.Task{Identifier: "1"})
ptq.PushBlock(b, peertask.Task{Identifier: "2"})
ptq.PushTasks(a, peertask.Task{Topic: "1"})
ptq.PushTasks(b, peertask.Task{Topic: "2"})
expected := []string{a.Pretty(), b.Pretty()}
sort.Strings(expected)
sort.Strings(peersAdded)
......@@ -201,12 +259,12 @@ func TestHooks(t *testing.T) {
}
}
task := ptq.PopBlock()
task.Done(task.Tasks)
task = ptq.PopBlock()
task.Done(task.Tasks)
ptq.PopBlock()
ptq.PopBlock()
p, task, _ := ptq.PopTasks(100)
ptq.TasksDone(p, task...)
p, task, _ = ptq.PopTasks(100)
ptq.TasksDone(p, task...)
ptq.PopTasks(100)
ptq.PopTasks(100)
sort.Strings(peersRemoved)
if len(peersRemoved) != len(expected) {
......@@ -218,6 +276,7 @@ func TestHooks(t *testing.T) {
}
}
}
func TestCleaningUpQueues(t *testing.T) {
ptq := New()
......@@ -225,51 +284,57 @@ func TestCleaningUpQueues(t *testing.T) {
var peerTasks []peertask.Task
for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
peerTasks = append(peerTasks, peertask.Task{Identifier: is})
peerTasks = append(peerTasks, peertask.Task{Topic: is})
}
// push a block, pop a block, complete everything, should be removed
ptq.PushBlock(peer, peerTasks...)
task := ptq.PopBlock()
task.Done(task.Tasks)
task = ptq.PopBlock()
ptq.PushTasks(peer, peerTasks...)
p, task, _ := ptq.PopTasks(100)
ptq.TasksDone(p, task...)
_, task, _ = ptq.PopTasks(100)
if task != nil || len(ptq.peerTrackers) > 0 || ptq.pQueue.Len() > 0 {
if len(task) != 0 || len(ptq.peerTrackers) > 0 || ptq.pQueue.Len() > 0 {
t.Fatal("PeerTracker should have been removed because it's idle")
}
// push a block, remove each of its entries, should be removed
ptq.PushBlock(peer, peerTasks...)
ptq.PushTasks(peer, peerTasks...)
for _, peerTask := range peerTasks {
ptq.Remove(peerTask.Identifier, peer)
ptq.Remove(peerTask.Topic, peer)
}
task = ptq.PopBlock()
_, task, _ = ptq.PopTasks(100)
if task != nil || len(ptq.peerTrackers) > 0 || ptq.pQueue.Len() > 0 {
if len(task) != 0 || len(ptq.peerTrackers) > 0 || ptq.pQueue.Len() > 0 {
t.Fatal("Partner should have been removed because it's idle")
}
}
func matchNTasks(t *testing.T, ptq *PeerTaskQueue, n int, expected ...string) []*peertask.TaskBlock {
func matchNTasks(t *testing.T, ptq *PeerTaskQueue, n int, expected ...string) {
var targets []string
var tasks []*peertask.TaskBlock
for i := 0; i < n; i++ {
t := ptq.PopBlock()
targets = append(targets, t.Target.Pretty())
tasks = append(tasks, t)
p, tsk, _ := ptq.PopTasks(1)
if len(tsk) != 1 {
t.Fatal("expected 1 task at a time")
}
targets = append(targets, p.Pretty())
}
sort.Strings(expected)
sort.Strings(targets)
matchArrays(t, expected, targets)
}
t.Log(targets)
t.Log(expected)
for i, s := range targets {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
func matchArrays(t *testing.T, str1, str2 []string) {
if len(str1) != len(str2) {
t.Fatal("array lengths did not match", str1, str2)
}
return tasks
sort.Strings(str1)
sort.Strings(str2)
t.Log(str1)
t.Log(str2)
for i, s := range str2 {
if str1[i] != s {
t.Fatal("unexpected peer", s, str1[i])
}
}
}
......@@ -2,44 +2,68 @@ package peertracker
import (
"sync"
"time"
pq "github.com/ipfs/go-ipfs-pq"
"github.com/ipfs/go-peertaskqueue/peertask"
peer "github.com/libp2p/go-libp2p-core/peer"
)
// TaskMerger is an interface that is used to merge new tasks into the active
// and pending queues
type TaskMerger interface {
// HasNewInfo indicates whether the given task has more information than
// the existing group of tasks (which have the same Topic), and thus should
// be merged.
HasNewInfo(task peertask.Task, existing []peertask.Task) bool
// Merge copies relevant fields from a new task to an existing task.
Merge(task peertask.Task, existing *peertask.Task)
}
// DefaultTaskMerger is the TaskMerger used by default. It never overwrites an
// existing task (with the same Topic).
type DefaultTaskMerger struct{}
func (*DefaultTaskMerger) HasNewInfo(task peertask.Task, existing []peertask.Task) bool {
return false
}
func (*DefaultTaskMerger) Merge(task peertask.Task, existing *peertask.Task) {
}
// PeerTracker tracks task blocks for a single peer, as well as active tasks
// for that peer
type PeerTracker struct {
target peer.ID
// Active is the number of track tasks this peer is currently
// processing
// active must be locked around as it will be updated externally
activelk sync.Mutex
active int
activeTasks map[peertask.Identifier]struct{}
// total number of task tasks for this task
numTasks int
// Tasks that are pending being made active
pendingTasks map[peertask.Topic]*peertask.QueueTask
// Tasks that have been made active
activeTasks map[*peertask.Task]struct{}
// activeWork must be locked around as it will be updated externally
activelk sync.Mutex
activeWork int
// for the PQ interface
index int
freezeVal int
taskMap map[peertask.Identifier]*peertask.TaskBlock
// priority queue of tasks belonging to this peer
taskBlockQueue pq.PQ
taskQueue pq.PQ
taskMerger TaskMerger
}
// New creates a new PeerTracker
func New(target peer.ID) *PeerTracker {
func New(target peer.ID, taskMerger TaskMerger) *PeerTracker {
return &PeerTracker{
target: target,
taskBlockQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
taskMap: make(map[peertask.Identifier]*peertask.TaskBlock),
activeTasks: make(map[peertask.Identifier]struct{}),
target: target,
taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
pendingTasks: make(map[peertask.Topic]*peertask.QueueTask),
activeTasks: make(map[*peertask.Task]struct{}),
taskMerger: taskMerger,
}
}
......@@ -49,15 +73,17 @@ func PeerCompare(a, b pq.Elem) bool {
pa := a.(*PeerTracker)
pb := b.(*PeerTracker)
// having no tasks means lowest priority
// having both of these checks ensures stability of the sort
if pa.numTasks == 0 {
// having no pending tasks means lowest priority
paPending := len(pa.pendingTasks)
pbPending := len(pb.pendingTasks)
if paPending == 0 {
return false
}
if pb.numTasks == 0 {
if pbPending == 0 {
return true
}
// Frozen peers have lowest priority
if pa.freezeVal > pb.freezeVal {
return false
}
......@@ -65,32 +91,16 @@ func PeerCompare(a, b pq.Elem) bool {
return true
}
if pa.active == pb.active {
// sorting by taskQueue.Len() aids in cleaning out trash tasks faster
// if we sorted instead by requests, one peer could potentially build up
// a huge number of cancelled tasks in the queue resulting in a memory leak
return pa.taskBlockQueue.Len() > pb.taskBlockQueue.Len()
// If each peer has an equal amount of work in its active queue, choose the
// peer with the most amount of work pending
if pa.activeWork == pb.activeWork {
return paPending > pbPending
}
return pa.active < pb.active
}
// StartTask signals that a task was started for this peer.
func (p *PeerTracker) StartTask(identifier peertask.Identifier) {
p.activelk.Lock()
p.activeTasks[identifier] = struct{}{}
p.active++
p.activelk.Unlock()
}
// TaskDone signals that a task was completed for this peer.
func (p *PeerTracker) TaskDone(identifier peertask.Identifier) {
p.activelk.Lock()
delete(p.activeTasks, identifier)
p.active--
if p.active < 0 {
panic("more tasks finished than started!")
}
p.activelk.Unlock()
// Choose the peer with the least amount of work in its active queue.
// This way we "keep peers busy" by sending them as much data as they can
// process.
return pa.activeWork < pb.activeWork
}
// Target returns the peer that this peer tracker tracks tasks for
......@@ -102,7 +112,8 @@ func (p *PeerTracker) Target() peer.ID {
func (p *PeerTracker) IsIdle() bool {
p.activelk.Lock()
defer p.activelk.Unlock()
return p.numTasks == 0 && p.active == 0
return len(p.pendingTasks) == 0 && len(p.activeTasks) == 0
}
// Index implements pq.Elem.
......@@ -115,75 +126,109 @@ func (p *PeerTracker) SetIndex(i int) {
p.index = i
}
// PushBlock adds a new block of tasks on to a peers queue from the given
// peer ID, list of tasks, and task block completion function
func (p *PeerTracker) PushBlock(target peer.ID, tasks []peertask.Task, done func(e []peertask.Task)) {
// PushTasks adds a group of tasks onto a peer's queue
func (p *PeerTracker) PushTasks(tasks ...peertask.Task) {
now := time.Now()
p.activelk.Lock()
defer p.activelk.Unlock()
var priority int
newTasks := make([]peertask.Task, 0, len(tasks))
for _, task := range tasks {
if _, ok := p.activeTasks[task.Identifier]; ok {
// If the new task doesn't add any more information over what we
// already have in the active queue, then we can skip the new task
if !p.taskHasMoreInfoThanActiveTasks(task) {
continue
}
if taskBlock, ok := p.taskMap[task.Identifier]; ok {
if task.Priority > taskBlock.Priority {
taskBlock.Priority = task.Priority
p.taskBlockQueue.Update(taskBlock.Index())
// If there is already a non-active task with this Topic
if existingTask, ok := p.pendingTasks[task.Topic]; ok {
// If the new task has a higher priority than the old task,
if task.Priority > existingTask.Priority {
// Update the priority and the task's position in the queue
existingTask.Priority = task.Priority
p.taskQueue.Update(existingTask.Index())
}
p.taskMerger.Merge(task, &existingTask.Task)
// A task with the Topic exists, so we don't need to add
// the new task to the queue
continue
}
if task.Priority > priority {
priority = task.Priority
}
newTasks = append(newTasks, task)
// Push the new task onto the queue
qTask := peertask.NewQueueTask(task, p.target, now)
p.pendingTasks[task.Topic] = qTask
p.taskQueue.Push(qTask)
}
}
if len(newTasks) == 0 {
return
// PopTasks pops as many tasks off the queue as necessary to cover
// targetMinWork, in priority order. If there are not enough tasks to cover
// targetMinWork it just returns whatever is in the queue.
// The second response argument is pending work: the amount of work in the
// queue for this peer.
func (p *PeerTracker) PopTasks(targetMinWork int) ([]*peertask.Task, int) {
var out []*peertask.Task
work := 0
for p.taskQueue.Len() > 0 && p.freezeVal == 0 && work < targetMinWork {
// Pop the next task off the queue
t := p.taskQueue.Pop().(*peertask.QueueTask)
// Start the task (this makes it "active")
p.startTask(&t.Task)
out = append(out, &t.Task)
work += t.Work
}
taskBlock := peertask.NewTaskBlock(newTasks, priority, target, done)
p.taskBlockQueue.Push(taskBlock)
for _, task := range newTasks {
p.taskMap[task.Identifier] = taskBlock
return out, p.getPendingWork()
}
// startTask signals that a task was started for this peer.
func (p *PeerTracker) startTask(task *peertask.Task) {
p.activelk.Lock()
defer p.activelk.Unlock()
// Remove task from pending queue
delete(p.pendingTasks, task.Topic)
// Add task to active queue
if _, ok := p.activeTasks[task]; !ok {
p.activeTasks[task] = struct{}{}
p.activeWork += task.Work
}
p.numTasks += len(newTasks)
}
// PopBlock removes a block of tasks from this peers queue
func (p *PeerTracker) PopBlock() *peertask.TaskBlock {
var out *peertask.TaskBlock
for p.taskBlockQueue.Len() > 0 && p.freezeVal == 0 {
out = p.taskBlockQueue.Pop().(*peertask.TaskBlock)
func (p *PeerTracker) getPendingWork() int {
total := 0
for _, t := range p.pendingTasks {
total += t.Work
}
return total
}
for _, task := range out.Tasks {
delete(p.taskMap, task.Identifier)
}
out.PruneTasks()
// TaskDone signals that a task was completed for this peer.
func (p *PeerTracker) TaskDone(task *peertask.Task) {
p.activelk.Lock()
defer p.activelk.Unlock()
if len(out.Tasks) > 0 {
for _, task := range out.Tasks {
p.numTasks--
p.StartTask(task.Identifier)
}
} else {
out = nil
continue
// Remove task from active queue
if _, ok := p.activeTasks[task]; ok {
delete(p.activeTasks, task)
p.activeWork -= task.Work
if p.activeWork < 0 {
panic("more tasks finished than started!")
}
break
}
return out
}
// Remove removes the task with the given identifier from this peers queue
func (p *PeerTracker) Remove(identifier peertask.Identifier) bool {
taskBlock, ok := p.taskMap[identifier]
// Remove removes the task with the given topic from this peer's queue
func (p *PeerTracker) Remove(topic peertask.Topic) bool {
t, ok := p.pendingTasks[topic]
if ok {
taskBlock.MarkPrunable(identifier)
p.numTasks--
delete(p.pendingTasks, topic)
p.taskQueue.Remove(t.Index())
}
return ok
}
......@@ -210,3 +255,21 @@ func (p *PeerTracker) FullThaw() {
func (p *PeerTracker) IsFrozen() bool {
return p.freezeVal > 0
}
// Indicates whether the new task adds any more information over tasks that are
// already in the active task queue
func (p *PeerTracker) taskHasMoreInfoThanActiveTasks(task peertask.Task) bool {
var tasksWithTopic []peertask.Task
for at := range p.activeTasks {
if task.Topic == at.Topic {
tasksWithTopic = append(tasksWithTopic, *at)
}
}
// No tasks with that topic, so the new task adds information
if len(tasksWithTopic) == 0 {
return true
}
return p.taskMerger.HasNewInfo(task, tasksWithTopic)
}
package peertracker
import (
"testing"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/testutil"
)
func TestEmpty(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks, _ := tracker.PopTasks(100)
if len(tasks) != 0 {
t.Fatal("Expected no tasks")
}
}
func TestPushPop(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 1,
Work: 10,
},
}
tracker.PushTasks(tasks...)
popped, _ := tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if popped[0].Topic != "1" {
t.Fatal("Expected same task")
}
}
func TestPopNegativeOrZeroSize(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 1,
Work: 10,
},
}
tracker.PushTasks(tasks...)
popped, _ := tracker.PopTasks(-1)
if len(popped) != 0 {
t.Fatal("Expected 0 tasks")
}
popped, _ = tracker.PopTasks(0)
if len(popped) != 0 {
t.Fatal("Expected 0 tasks")
}
}
func TestPushPopSizeAndOrder(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
},
peertask.Task{
Topic: "2",
Priority: 20,
Work: 10,
},
peertask.Task{
Topic: "3",
Priority: 15,
Work: 10,
},
}
tracker.PushTasks(tasks...)
popped, pending := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if popped[0].Topic != "2" {
t.Fatal("Expected tasks in order")
}
if pending != 20 {
t.Fatal("Expected pending work to be 20")
}
popped, pending = tracker.PopTasks(100)
if len(popped) != 2 {
t.Fatal("Expected 2 tasks")
}
if popped[0].Topic != "3" || popped[1].Topic != "1" {
t.Fatal("Expected tasks in order")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
popped, pending = tracker.PopTasks(100)
if len(popped) != 0 {
t.Fatal("Expected 0 tasks")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
}
func TestPopFirstItemAlways(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 20,
Work: 10,
},
peertask.Task{
Topic: "2",
Priority: 10,
Work: 5,
},
}
tracker.PushTasks(tasks...)
// Pop with target size 7.
// PopTasks should always return the first task even if it's under target work.
popped, _ := tracker.PopTasks(7)
if len(popped) != 1 || popped[0].Topic != "1" {
t.Fatal("Expected first task to be popped")
}
popped, _ = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
}
func TestPopItemsToCoverTargetWork(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 20,
Work: 5,
},
peertask.Task{
Topic: "2",
Priority: 10,
Work: 5,
},
peertask.Task{
Topic: "3",
Priority: 5,
Work: 5,
},
}
tracker.PushTasks(tasks...)
// Pop with target size 7.
// PopTasks should return enough items to cover the target work.
popped, _ := tracker.PopTasks(7)
if len(popped) != 2 || popped[0].Topic != "1" || popped[1].Topic != "2" {
t.Fatal("Expected first two tasks to be popped")
}
popped, _ = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
}
func TestRemove(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
},
peertask.Task{
Topic: "2",
Priority: 20,
Work: 10,
},
peertask.Task{
Topic: "3",
Priority: 15,
Work: 10,
},
}
tracker.PushTasks(tasks...)
tracker.Remove("2")
popped, _ := tracker.PopTasks(100)
if len(popped) != 2 {
t.Fatal("Expected 2 tasks")
}
if popped[0].Topic != "3" || popped[1].Topic != "1" {
t.Fatal("Expected tasks in order")
}
}
func TestRemoveMulti(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
},
peertask.Task{
Topic: "1",
Priority: 20,
Work: 1,
},
peertask.Task{
Topic: "2",
Priority: 15,
Work: 10,
},
}
tracker.PushTasks(tasks...)
tracker.Remove("1")
popped, _ := tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if popped[0].Topic != "2" {
t.Fatal("Expected remaining task")
}
}
func TestTaskDone(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 20,
Work: 10,
Data: "b",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Pop task "a". This makes the task active.
popped, _ := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
// Mark task "a" as done.
tracker.TaskDone(popped[0])
// Push task "b"
tracker.PushTasks(tasks[1]) // Topic "1"
// Pop all tasks. Task "a" was done so task "b" should have been allowed to
// be added.
popped, _ = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
}
type permissiveTaskMerger struct{}
func (*permissiveTaskMerger) HasNewInfo(task peertask.Task, existing []peertask.Task) bool {
return true
}
func (*permissiveTaskMerger) Merge(task peertask.Task, existing *peertask.Task) {
existing.Data = task.Data
existing.Work = task.Work
}
func TestReplaceTaskPermissive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 20,
Work: 10,
Data: "b",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Push task "b". Has same topic and permissive task merger, so should
// replace task "a".
tracker.PushTasks(tasks[1]) // Topic "1"
// Pop all tasks, should only be task "b".
popped, _ := tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if popped[0].Data != "b" {
t.Fatal("Expected b to replace a")
}
if popped[0].Priority != 20 {
t.Fatal("Expected higher Priority to replace lower Priority")
}
}
func TestReplaceTaskSize(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 10,
Work: 20,
Data: "b",
},
peertask.Task{
Topic: "2",
Priority: 5,
Work: 5,
Data: "c",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Push task "b". Has same topic as task "a" and permissive task merger,
// so should replace task "a", and update its Work from 10 to 20.
tracker.PushTasks(tasks[1]) // Topic "1"
// Push task "c"
tracker.PushTasks(tasks[2]) // Topic "2"
// Pop with target size 15. Should only pop task "a" because its Work
// is now 20 (was 10)
popped, pending := tracker.PopTasks(15)
if len(popped) != 1 || popped[0].Data != "b" {
t.Fatal("Expected 1 task")
}
if pending != 5 {
t.Fatal("Expected pending work to be 5")
}
popped, pending = tracker.PopTasks(30)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
}
func TestReplaceActiveTask(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 20,
Work: 10,
Data: "b",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Pop task "a". This makes the task active.
popped, _ := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
// Push task "b"
tracker.PushTasks(tasks[1]) // Topic "1"
// Pop all tasks. Task "a" was active so task "b" should have been moved to
// the pending queue.
popped, _ = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
}
func TestReplaceActiveTaskNonPermissive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 20,
Work: 10,
Data: "b",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Pop task "a". This makes the task active.
popped, _ := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
// Push task "b". Task merger is not permissive, so should ignore task "b".
tracker.PushTasks(tasks[1]) // Topic "1"
// Pop all tasks.
popped, _ = tracker.PopTasks(100)
if len(popped) != 0 {
t.Fatal("Expected no tasks")
}
}
func TestReplaceTaskThatIsActiveAndPending(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "b",
},
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "c",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Pop task "a". This makes the task active.
popped, _ := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
// Push task "b". Same Topic so should be added to the pending queue.
tracker.PushTasks(tasks[1]) // Topic "1"
// Push task "c". Permissive task merger so should replace pending task "b"
// with same Topic.
tracker.PushTasks(tasks[2]) // Topic "1"
// Pop all tasks.
popped, _ = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if popped[0].Data != "c" {
t.Fatalf("Expected last task to overwrite pending task")
}
}
func TestRemoveActive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 20,
Work: 10,
Data: "b",
},
peertask.Task{
Topic: "2",
Priority: 15,
Work: 10,
Data: "c",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Pop task "a". This makes the task active.
popped, _ := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
// Push task "b" and "c"
tracker.PushTasks(tasks[1]) // Topic "1"
tracker.PushTasks(tasks[2]) // Topic "2"
// Remove all tasks with Topic "1".
// This should remove task "b" from the pending queue.
tracker.Remove("1")
popped, _ = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if popped[0].Topic != "2" {
t.Fatal("Expected tasks in order")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment