Commit 58ce371b authored by hannahhoward's avatar hannahhoward

feat(peertaskqueue): create task queue

Create a prioritized task queue for tracking blocks of tasks between peers
parent c27b30c6
......@@ -7,6 +7,7 @@ require (
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-cid v0.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-pq v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/ipld/go-ipld-prime v0.0.0-20190320000329-46ca29fe25db
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
......
package peertask
import (
"time"
pq "github.com/ipfs/go-ipfs-pq"
peer "github.com/libp2p/go-libp2p-peer"
)
// FIFOCompare is a basic task comparator that returns tasks in the order created.
var FIFOCompare = func(a, b *TaskBlock) bool {
return a.created.Before(b.created)
}
// PriorityCompare respects the target peer's task priority. For tasks involving
// different peers, the oldest task is prioritized.
var PriorityCompare = func(a, b *TaskBlock) bool {
if a.Target == b.Target {
return a.Priority > b.Priority
}
return FIFOCompare(a, b)
}
// WrapCompare wraps a TaskBlock comparison function so it can be used as
// comparison for a priority queue
func WrapCompare(f func(a, b *TaskBlock) bool) func(a, b pq.Elem) bool {
return func(a, b pq.Elem) bool {
return f(a.(*TaskBlock), b.(*TaskBlock))
}
}
// Identifier is a unique identifier for a task. It's used by the client library
// to act on a task once it exits the queue.
type Identifier interface{}
// Task is a single task to be executed as part of a task block.
type Task struct {
Identifier Identifier
Priority int
}
// TaskBlock is a block of tasks to execute on a single peer.
type TaskBlock struct {
Tasks []Task
Priority int
Target peer.ID
// A callback to signal that this task block has been completed
Done func([]Task)
// toPrune are the tasks that have already been taken care of as part of
// a different task block which can be removed from the task block.
toPrune map[Identifier]struct{}
created time.Time // created marks the time that the task was added to the queue
index int // book-keeping field used by the pq container
}
// NewTaskBlock creates a new task block with the given tasks, priority, target
// peer, and task completion function.
func NewTaskBlock(tasks []Task, priority int, target peer.ID, done func([]Task)) *TaskBlock {
return &TaskBlock{
Tasks: tasks,
Priority: priority,
Target: target,
Done: done,
toPrune: make(map[Identifier]struct{}, len(tasks)),
created: time.Now(),
}
}
// MarkPrunable marks any tasks with the given identifier as prunable at the time
// the task block is pulled of the queue to execute (because they've already been removed).
func (pt *TaskBlock) MarkPrunable(identifier Identifier) {
pt.toPrune[identifier] = struct{}{}
}
// PruneTasks removes all tasks previously marked as prunable from the lists of
// tasks in the block
func (pt *TaskBlock) PruneTasks() {
newTasks := make([]Task, 0, len(pt.Tasks)-len(pt.toPrune))
for _, task := range pt.Tasks {
if _, ok := pt.toPrune[task.Identifier]; !ok {
newTasks = append(newTasks, task)
}
}
pt.Tasks = newTasks
}
// Index implements pq.Elem.
func (pt *TaskBlock) Index() int {
return pt.index
}
// SetIndex implements pq.Elem.
func (pt *TaskBlock) SetIndex(i int) {
pt.index = i
}
package peertaskqueue
import (
"sync"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue/peertask"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue/peertracker"
pq "github.com/ipfs/go-ipfs-pq"
peer "github.com/libp2p/go-libp2p-peer"
)
// PeerTaskQueue is a prioritized list of tasks to be executed on peers.
// The queue puts tasks on in blocks, then alternates between peers (roughly)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
}
// New creates a new PeerTaskQueue
func New() *PeerTaskQueue {
return &PeerTaskQueue{
peerTrackers: make(map[peer.ID]*peertracker.PeerTracker),
frozenPeers: make(map[peer.ID]struct{}),
pQueue: pq.New(peertracker.PeerCompare),
}
}
// PushBlock adds a new block of tasks for the given peer to the queue
func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
peerTracker, ok := ptq.peerTrackers[to]
if !ok {
peerTracker = peertracker.New()
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
}
peerTracker.PushBlock(to, tasks, func(e []peertask.Task) {
ptq.lock.Lock()
for _, task := range e {
peerTracker.TaskDone(task.Identifier)
}
ptq.pQueue.Update(peerTracker.Index())
ptq.lock.Unlock()
})
ptq.pQueue.Update(peerTracker.Index())
}
// PopBlock 'pops' the next block of tasks to be performed. Returns nil if no block exists.
func (ptq *PeerTaskQueue) PopBlock() *peertask.TaskBlock {
ptq.lock.Lock()
defer ptq.lock.Unlock()
if ptq.pQueue.Len() == 0 {
return nil
}
peerTracker := ptq.pQueue.Pop().(*peertracker.PeerTracker)
out := peerTracker.PopBlock()
ptq.pQueue.Push(peerTracker)
return out
}
// Remove removes a task from the queue.
func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
ptq.lock.Lock()
peerTracker, ok := ptq.peerTrackers[p]
if ok {
peerTracker.Remove(identifier)
// we now also 'freeze' that partner. If they sent us a cancel for a
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
// them a block they already potentially have
if !peerTracker.IsFrozen() {
ptq.frozenPeers[p] = struct{}{}
}
peerTracker.Freeze()
ptq.pQueue.Update(peerTracker.Index())
}
ptq.lock.Unlock()
}
// FullThaw completely thaws all peers in the queue so they can execute tasks.
func (ptq *PeerTaskQueue) FullThaw() {
ptq.lock.Lock()
defer ptq.lock.Unlock()
for p := range ptq.frozenPeers {
peerTracker, ok := ptq.peerTrackers[p]
if ok {
peerTracker.FullThaw()
delete(ptq.frozenPeers, p)
ptq.pQueue.Update(peerTracker.Index())
}
}
}
// ThawRound unthaws peers incrementally, so that those have been frozen the least
// become unfrozen and able to execute tasks first.
func (ptq *PeerTaskQueue) ThawRound() {
ptq.lock.Lock()
defer ptq.lock.Unlock()
for p := range ptq.frozenPeers {
peerTracker, ok := ptq.peerTrackers[p]
if ok {
if peerTracker.Thaw() {
delete(ptq.frozenPeers, p)
}
ptq.pQueue.Update(peerTracker.Index())
}
}
}
package peertaskqueue
import (
"fmt"
"math"
"math/rand"
"sort"
"strings"
"testing"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue/peertask"
"github.com/ipfs/go-graphsync/testutil"
)
func TestPushPop(t *testing.T) {
ptq := New()
partner := testutil.GeneratePeers(1)[0]
alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
vowels := strings.Split("aeiou", "")
consonants := func() []string {
var out []string
for _, letter := range alphabet {
skip := false
for _, vowel := range vowels {
if letter == vowel {
skip = true
}
}
if !skip {
out = append(out, letter)
}
}
return out
}()
sort.Strings(alphabet)
sort.Strings(vowels)
sort.Strings(consonants)
// add a bunch of blocks. cancel some. drain the queue. the queue should only have the kept tasks
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
letter := alphabet[index]
t.Log(partner.String())
ptq.PushBlock(partner, peertask.Task{Identifier: letter, Priority: math.MaxInt32 - index})
}
for _, consonant := range consonants {
ptq.Remove(consonant, partner)
}
ptq.FullThaw()
var out []string
for {
received := ptq.PopBlock()
if received == nil {
break
}
for _, task := range received.Tasks {
out = append(out, task.Identifier.(string))
}
}
// Tasks popped should already be in correct order
for i, expected := range vowels {
if out[i] != expected {
t.Fatal("received", out[i], "expected", expected)
}
}
}
// This test checks that peers wont starve out other peers
func TestPeerRepeats(t *testing.T) {
ptq := New()
peers := testutil.GeneratePeers(4)
a := peers[0]
b := peers[1]
c := peers[2]
d := peers[3]
// Have each push some blocks
for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushBlock(a, peertask.Task{Identifier: is})
ptq.PushBlock(b, peertask.Task{Identifier: is})
ptq.PushBlock(c, peertask.Task{Identifier: is})
ptq.PushBlock(d, peertask.Task{Identifier: is})
}
// now, pop off four tasks, there should be one from each
var targets []string
var tasks []*peertask.TaskBlock
for i := 0; i < 4; i++ {
t := ptq.PopBlock()
targets = append(targets, t.Target.Pretty())
tasks = append(tasks, t)
}
expected := []string{a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty()}
sort.Strings(expected)
sort.Strings(targets)
t.Log(targets)
t.Log(expected)
for i, s := range targets {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}
// Now, if one of the tasks gets finished, the next task off the queue should
// be for the same peer
for blockI := 0; blockI < 4; blockI++ {
for i := 0; i < 4; i++ {
// its okay to mark the same task done multiple times here (JUST FOR TESTING)
tasks[i].Done(tasks[i].Tasks)
ntask := ptq.PopBlock()
if ntask.Target != tasks[i].Target {
t.Fatal("Expected task from peer with lowest active count")
}
}
}
}
package peertracker
import (
"sync"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue/peertask"
pq "github.com/ipfs/go-ipfs-pq"
peer "github.com/libp2p/go-libp2p-peer"
)
// PeerTracker tracks task blocks for a single peer, as well as active tasks
// for that peer
type PeerTracker struct {
// Active is the number of track tasks this peer is currently
// processing
// active must be locked around as it will be updated externally
activelk sync.Mutex
active int
activeTasks map[peertask.Identifier]struct{}
// total number of task tasks for this task
numTasks int
// for the PQ interface
index int
freezeVal int
taskMap map[peertask.Identifier]*peertask.TaskBlock
// priority queue of tasks belonging to this peer
taskBlockQueue pq.PQ
}
// New creates a new PeerTracker
func New() *PeerTracker {
return &PeerTracker{
taskBlockQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
taskMap: make(map[peertask.Identifier]*peertask.TaskBlock),
activeTasks: make(map[peertask.Identifier]struct{}),
}
}
// PeerCompare implements pq.ElemComparator
// returns true if peer 'a' has higher priority than peer 'b'
func PeerCompare(a, b pq.Elem) bool {
pa := a.(*PeerTracker)
pb := b.(*PeerTracker)
// having no tasks means lowest priority
// having both of these checks ensures stability of the sort
if pa.numTasks == 0 {
return false
}
if pb.numTasks == 0 {
return true
}
if pa.freezeVal > pb.freezeVal {
return false
}
if pa.freezeVal < pb.freezeVal {
return true
}
if pa.active == pb.active {
// sorting by taskQueue.Len() aids in cleaning out trash tasks faster
// if we sorted instead by requests, one peer could potentially build up
// a huge number of cancelled tasks in the queue resulting in a memory leak
return pa.taskBlockQueue.Len() > pb.taskBlockQueue.Len()
}
return pa.active < pb.active
}
// StartTask signals that a task was started for this peer.
func (p *PeerTracker) StartTask(identifier peertask.Identifier) {
p.activelk.Lock()
p.activeTasks[identifier] = struct{}{}
p.active++
p.activelk.Unlock()
}
// TaskDone signals that a task was completed for this peer.
func (p *PeerTracker) TaskDone(identifier peertask.Identifier) {
p.activelk.Lock()
delete(p.activeTasks, identifier)
p.active--
if p.active < 0 {
panic("more tasks finished than started!")
}
p.activelk.Unlock()
}
// Index implements pq.Elem.
func (p *PeerTracker) Index() int {
return p.index
}
// SetIndex implements pq.Elem.
func (p *PeerTracker) SetIndex(i int) {
p.index = i
}
// PushBlock adds a new block of tasks on to a peers queue from the given
// peer ID, list of tasks, and task block completion function
func (p *PeerTracker) PushBlock(target peer.ID, tasks []peertask.Task, done func(e []peertask.Task)) {
p.activelk.Lock()
defer p.activelk.Unlock()
var priority int
newTasks := make([]peertask.Task, 0, len(tasks))
for _, task := range tasks {
if _, ok := p.activeTasks[task.Identifier]; ok {
continue
}
if taskBlock, ok := p.taskMap[task.Identifier]; ok {
if task.Priority > taskBlock.Priority {
taskBlock.Priority = task.Priority
p.taskBlockQueue.Update(taskBlock.Index())
}
continue
}
if task.Priority > priority {
priority = task.Priority
}
newTasks = append(newTasks, task)
}
if len(newTasks) == 0 {
return
}
taskBlock := peertask.NewTaskBlock(newTasks, priority, target, done)
p.taskBlockQueue.Push(taskBlock)
for _, task := range newTasks {
p.taskMap[task.Identifier] = taskBlock
}
p.numTasks += len(newTasks)
}
// PopBlock removes a block of tasks from this peers queue
func (p *PeerTracker) PopBlock() *peertask.TaskBlock {
var out *peertask.TaskBlock
for p.taskBlockQueue.Len() > 0 && p.freezeVal == 0 {
out = p.taskBlockQueue.Pop().(*peertask.TaskBlock)
for _, task := range out.Tasks {
delete(p.taskMap, task.Identifier)
}
out.PruneTasks()
if len(out.Tasks) > 0 {
for _, task := range out.Tasks {
p.numTasks--
p.StartTask(task.Identifier)
}
} else {
out = nil
continue
}
break
}
return out
}
// Remove removes the task with the given identifier from this peers queue
func (p *PeerTracker) Remove(identifier peertask.Identifier) {
taskBlock, ok := p.taskMap[identifier]
if ok {
taskBlock.MarkPrunable(identifier)
p.numTasks--
}
}
// Freeze increments the freeze value for this peer. While a peer is frozen
// (freeze value > 0) it will not execute tasks.
func (p *PeerTracker) Freeze() {
p.freezeVal++
}
// Thaw decrements the freeze value for this peer. While a peer is frozen
// (freeze value > 0) it will not execute tasks.
func (p *PeerTracker) Thaw() bool {
p.freezeVal -= (p.freezeVal + 1) / 2
return p.freezeVal <= 0
}
// FullThaw completely unfreezes this peer so it can execute tasks.
func (p *PeerTracker) FullThaw() {
p.freezeVal = 0
}
// IsFrozen returns whether this peer is frozen and unable to execute tasks.
func (p *PeerTracker) IsFrozen() bool {
return p.freezeVal > 0
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment