Commit 66c43397 authored by hannahhoward's avatar hannahhoward

refactor(peertaskqueue): extract to package

parent b711d589
......@@ -54,6 +54,8 @@ github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/ipfs/go-peertaskqueue v0.0.1 h1:6K9LYgpwaXV5xFI/iu8tkeGnrAPT6RnYo+qhSlnGqzc=
github.com/ipfs/go-peertaskqueue v0.0.1/go.mod h1:03H8fhyeMfKNFWqzYEVyMbcPUeYrqP1MX6Kd+aN+rMQ=
github.com/ipld/go-ipld-prime v0.0.0-20190227132703-aaea73ad73c5 h1:Fm+tk5iaitTL/3L1nRlwibgLFtI5Eld4nIrjX4ajcKY=
github.com/ipld/go-ipld-prime v0.0.0-20190227132703-aaea73ad73c5/go.mod h1:hSGXgXt4BSdqvjA3Kkxhzcg4Rsk9yvIeEuEVCPCi7/A=
github.com/ipld/go-ipld-prime v0.0.0-20190306022502-066284669cf6 h1:FJW7rDl8g/580E6ZuR5rXzDT/9SsJvCuAEQCq8cbdPQ=
......@@ -94,6 +96,8 @@ github.com/libp2p/go-libp2p-blankhost v0.0.1/go.mod h1:Ibpbw/7cPPYwFb7PACIWdvxxv
github.com/libp2p/go-libp2p-circuit v0.0.1/go.mod h1:Dqm0s/BiV63j8EEAs8hr1H5HudqvCAeXxDyic59lCwE=
github.com/libp2p/go-libp2p-crypto v0.0.1 h1:JNQd8CmoGTohO/akqrH16ewsqZpci2CbgYH/LmYl8gw=
github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE=
github.com/libp2p/go-libp2p-crypto v0.0.2 h1:TTdJ4y6Uoa6NxQcuEaVkQfFRcQeCE2ReDk8Ok4I0Fyw=
github.com/libp2p/go-libp2p-crypto v0.0.2/go.mod h1:eETI5OUfBnvARGOHrJz2eWNyTUxEGZnBxMcbUjfIj4I=
github.com/libp2p/go-libp2p-discovery v0.0.1/go.mod h1:ZkkF9xIFRLA1xCc7bstYFkd80gBGK8Fc1JqGoU2i+zI=
github.com/libp2p/go-libp2p-host v0.0.1 h1:dnqusU+DheGcdxrE718kG4XgHNuL2n9eEv8Rg5zy8hQ=
github.com/libp2p/go-libp2p-host v0.0.1/go.mod h1:qWd+H1yuU0m5CwzAkvbSjqKairayEHdR5MMl7Cwa7Go=
......@@ -111,6 +115,8 @@ github.com/libp2p/go-libp2p-netutil v0.0.1 h1:LgD6+skofkOx8z6odD9+MZHKjupv3ng1u6
github.com/libp2p/go-libp2p-netutil v0.0.1/go.mod h1:GdusFvujWZI9Vt0X5BKqwWWmZFxecf9Gt03cKxm2f/Q=
github.com/libp2p/go-libp2p-peer v0.0.1 h1:0qwAOljzYewINrU+Kndoc+1jAL7vzY/oY2Go4DCGfyY=
github.com/libp2p/go-libp2p-peer v0.0.1/go.mod h1:nXQvOBbwVqoP+T5Y5nCjeH4sP9IX/J0AMzcDUVruVoo=
github.com/libp2p/go-libp2p-peer v0.1.1 h1:qGCWD1a+PyZcna6htMPo26jAtqirVnJ5NvBQIKV7rRY=
github.com/libp2p/go-libp2p-peer v0.1.1/go.mod h1:jkF12jGB4Gk/IOo+yomm+7oLWxF278F7UnrYUQ1Q8es=
github.com/libp2p/go-libp2p-peerstore v0.0.1 h1:twKovq8YK5trLrd3nB7PD2Zu9JcyAIdm7Bz9yBWjhq8=
github.com/libp2p/go-libp2p-peerstore v0.0.1/go.mod h1:RabLyPVJLuNQ+GFyoEkfi8H4Ti6k/HtZJ7YKgtSq+20=
github.com/libp2p/go-libp2p-protocol v0.0.1 h1:+zkEmZ2yFDi5adpVE3t9dqh/N9TbpFWywowzeEzBbLM=
......@@ -141,8 +147,12 @@ github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5 h1:l16XLUUJ34wIz+RIvLhSwGvLvKyy+W598b135bJN6mg=
github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/mr-tron/base58 v1.1.0 h1:Y51FGVJ91WBqCEabAi5OPUz38eAx8DakuAm5svLcsfQ=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78=
github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.1 h1:/QUV3VBMDI6pi6xfiw7lr6xhDWWvQKn9udPn68kLSdY=
......@@ -156,6 +166,8 @@ github.com/multiformats/go-multibase v0.0.1 h1:PN9/v21eLywrFWdFNsFKaU04kLJzuYzmr
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multihash v0.0.1 h1:HHwN1K12I+XllBCrqKnhX949Orn4oawPkegHMu2vDqQ=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.5 h1:1wxmCvTXAifAepIMyF39vZinRw5sbqjPs/UIi93+uik=
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/multiformats/go-multistream v0.0.1 h1:JV4VfSdY9n7ECTtY59/TlSyFCzRILvYx4T4Ws8ZgihU=
github.com/multiformats/go-multistream v0.0.1/go.mod h1:fJTiDfXJVmItycydCnNx4+wSzZ5NwG2FEVAI30fiovg=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
......@@ -167,6 +179,10 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polydawn/refmt v0.0.0-20190221155625-df39d6c2d992 h1:bzMe+2coZJYHnhGgVlcQKuRy4FSny4ds8dLQjw5P1XE=
github.com/polydawn/refmt v0.0.0-20190221155625-df39d6c2d992/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
......@@ -190,10 +206,15 @@ golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 h1:jsG6UpNLt9iAsb0S2AGW28DveNzzgmbXR+ENoPjUeIU=
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734 h1:p/H982KKEjUnLJkM3tt/LemDnOc1GiZL5FCVlORJ5zo=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180524181706-dfa909b99c79/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190227160552-c95aed5357e7 h1:C2F/nMkR/9sfUTpvR3QrjBuTdvMUC/cFajkphs1YLQo=
golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
......@@ -202,6 +223,8 @@ golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpbl
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e h1:ZytStCyV048ZqDsWHiYDdoI2Vd4msMcrDECFxS+tL9c=
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
......
......@@ -14,7 +14,7 @@ import (
"github.com/ipfs/go-graphsync/requestmanager"
"github.com/ipfs/go-graphsync/responsemanager"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue"
"github.com/ipfs/go-peertaskqueue"
logging "github.com/ipfs/go-log"
ipld "github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-peer"
......
package peertask
import (
"time"
pq "github.com/ipfs/go-ipfs-pq"
peer "github.com/libp2p/go-libp2p-peer"
)
// FIFOCompare is a basic task comparator that returns tasks in the order created.
var FIFOCompare = func(a, b *TaskBlock) bool {
return a.created.Before(b.created)
}
// PriorityCompare respects the target peer's task priority. For tasks involving
// different peers, the oldest task is prioritized.
var PriorityCompare = func(a, b *TaskBlock) bool {
if a.Target == b.Target {
return a.Priority > b.Priority
}
return FIFOCompare(a, b)
}
// WrapCompare wraps a TaskBlock comparison function so it can be used as
// comparison for a priority queue
func WrapCompare(f func(a, b *TaskBlock) bool) func(a, b pq.Elem) bool {
return func(a, b pq.Elem) bool {
return f(a.(*TaskBlock), b.(*TaskBlock))
}
}
// Identifier is a unique identifier for a task. It's used by the client library
// to act on a task once it exits the queue.
type Identifier interface{}
// Task is a single task to be executed as part of a task block.
type Task struct {
Identifier Identifier
Priority int
}
// TaskBlock is a block of tasks to execute on a single peer.
type TaskBlock struct {
Tasks []Task
Priority int
Target peer.ID
// A callback to signal that this task block has been completed
Done func([]Task)
// toPrune are the tasks that have already been taken care of as part of
// a different task block which can be removed from the task block.
toPrune map[Identifier]struct{}
created time.Time // created marks the time that the task was added to the queue
index int // book-keeping field used by the pq container
}
// NewTaskBlock creates a new task block with the given tasks, priority, target
// peer, and task completion function.
func NewTaskBlock(tasks []Task, priority int, target peer.ID, done func([]Task)) *TaskBlock {
return &TaskBlock{
Tasks: tasks,
Priority: priority,
Target: target,
Done: done,
toPrune: make(map[Identifier]struct{}, len(tasks)),
created: time.Now(),
}
}
// MarkPrunable marks any tasks with the given identifier as prunable at the time
// the task block is pulled of the queue to execute (because they've already been removed).
func (pt *TaskBlock) MarkPrunable(identifier Identifier) {
pt.toPrune[identifier] = struct{}{}
}
// PruneTasks removes all tasks previously marked as prunable from the lists of
// tasks in the block
func (pt *TaskBlock) PruneTasks() {
newTasks := make([]Task, 0, len(pt.Tasks)-len(pt.toPrune))
for _, task := range pt.Tasks {
if _, ok := pt.toPrune[task.Identifier]; !ok {
newTasks = append(newTasks, task)
}
}
pt.Tasks = newTasks
}
// Index implements pq.Elem.
func (pt *TaskBlock) Index() int {
return pt.index
}
// SetIndex implements pq.Elem.
func (pt *TaskBlock) SetIndex(i int) {
pt.index = i
}
package peertaskqueue
import (
"sync"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue/peertask"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue/peertracker"
pq "github.com/ipfs/go-ipfs-pq"
peer "github.com/libp2p/go-libp2p-peer"
)
// PeerTaskQueue is a prioritized list of tasks to be executed on peers.
// The queue puts tasks on in blocks, then alternates between peers (roughly)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
}
// New creates a new PeerTaskQueue
func New() *PeerTaskQueue {
return &PeerTaskQueue{
peerTrackers: make(map[peer.ID]*peertracker.PeerTracker),
frozenPeers: make(map[peer.ID]struct{}),
pQueue: pq.New(peertracker.PeerCompare),
}
}
// PushBlock adds a new block of tasks for the given peer to the queue
func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
peerTracker, ok := ptq.peerTrackers[to]
if !ok {
peerTracker = peertracker.New()
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
}
peerTracker.PushBlock(to, tasks, func(e []peertask.Task) {
ptq.lock.Lock()
for _, task := range e {
peerTracker.TaskDone(task.Identifier)
}
ptq.pQueue.Update(peerTracker.Index())
ptq.lock.Unlock()
})
ptq.pQueue.Update(peerTracker.Index())
}
// PopBlock 'pops' the next block of tasks to be performed. Returns nil if no block exists.
func (ptq *PeerTaskQueue) PopBlock() *peertask.TaskBlock {
ptq.lock.Lock()
defer ptq.lock.Unlock()
if ptq.pQueue.Len() == 0 {
return nil
}
peerTracker := ptq.pQueue.Pop().(*peertracker.PeerTracker)
out := peerTracker.PopBlock()
ptq.pQueue.Push(peerTracker)
return out
}
// Remove removes a task from the queue.
func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
ptq.lock.Lock()
peerTracker, ok := ptq.peerTrackers[p]
if ok {
peerTracker.Remove(identifier)
// we now also 'freeze' that partner. If they sent us a cancel for a
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
// them a block they already potentially have
if !peerTracker.IsFrozen() {
ptq.frozenPeers[p] = struct{}{}
}
peerTracker.Freeze()
ptq.pQueue.Update(peerTracker.Index())
}
ptq.lock.Unlock()
}
// FullThaw completely thaws all peers in the queue so they can execute tasks.
func (ptq *PeerTaskQueue) FullThaw() {
ptq.lock.Lock()
defer ptq.lock.Unlock()
for p := range ptq.frozenPeers {
peerTracker, ok := ptq.peerTrackers[p]
if ok {
peerTracker.FullThaw()
delete(ptq.frozenPeers, p)
ptq.pQueue.Update(peerTracker.Index())
}
}
}
// ThawRound unthaws peers incrementally, so that those have been frozen the least
// become unfrozen and able to execute tasks first.
func (ptq *PeerTaskQueue) ThawRound() {
ptq.lock.Lock()
defer ptq.lock.Unlock()
for p := range ptq.frozenPeers {
peerTracker, ok := ptq.peerTrackers[p]
if ok {
if peerTracker.Thaw() {
delete(ptq.frozenPeers, p)
}
ptq.pQueue.Update(peerTracker.Index())
}
}
}
package peertaskqueue
import (
"fmt"
"math"
"math/rand"
"sort"
"strings"
"testing"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue/peertask"
"github.com/ipfs/go-graphsync/testutil"
)
func TestPushPop(t *testing.T) {
ptq := New()
partner := testutil.GeneratePeers(1)[0]
alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
vowels := strings.Split("aeiou", "")
consonants := func() []string {
var out []string
for _, letter := range alphabet {
skip := false
for _, vowel := range vowels {
if letter == vowel {
skip = true
}
}
if !skip {
out = append(out, letter)
}
}
return out
}()
sort.Strings(alphabet)
sort.Strings(vowels)
sort.Strings(consonants)
// add a bunch of blocks. cancel some. drain the queue. the queue should only have the kept tasks
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
letter := alphabet[index]
t.Log(partner.String())
ptq.PushBlock(partner, peertask.Task{Identifier: letter, Priority: math.MaxInt32 - index})
}
for _, consonant := range consonants {
ptq.Remove(consonant, partner)
}
ptq.FullThaw()
var out []string
for {
received := ptq.PopBlock()
if received == nil {
break
}
for _, task := range received.Tasks {
out = append(out, task.Identifier.(string))
}
}
// Tasks popped should already be in correct order
for i, expected := range vowels {
if out[i] != expected {
t.Fatal("received", out[i], "expected", expected)
}
}
}
// This test checks that peers wont starve out other peers
func TestPeerRepeats(t *testing.T) {
ptq := New()
peers := testutil.GeneratePeers(4)
a := peers[0]
b := peers[1]
c := peers[2]
d := peers[3]
// Have each push some blocks
for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushBlock(a, peertask.Task{Identifier: is})
ptq.PushBlock(b, peertask.Task{Identifier: is})
ptq.PushBlock(c, peertask.Task{Identifier: is})
ptq.PushBlock(d, peertask.Task{Identifier: is})
}
// now, pop off four tasks, there should be one from each
var targets []string
var tasks []*peertask.TaskBlock
for i := 0; i < 4; i++ {
t := ptq.PopBlock()
targets = append(targets, t.Target.Pretty())
tasks = append(tasks, t)
}
expected := []string{a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty()}
sort.Strings(expected)
sort.Strings(targets)
t.Log(targets)
t.Log(expected)
for i, s := range targets {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}
// Now, if one of the tasks gets finished, the next task off the queue should
// be for the same peer
for blockI := 0; blockI < 4; blockI++ {
for i := 0; i < 4; i++ {
// its okay to mark the same task done multiple times here (JUST FOR TESTING)
tasks[i].Done(tasks[i].Tasks)
ntask := ptq.PopBlock()
if ntask.Target != tasks[i].Target {
t.Fatal("Expected task from peer with lowest active count")
}
}
}
}
package peertracker
import (
"sync"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue/peertask"
pq "github.com/ipfs/go-ipfs-pq"
peer "github.com/libp2p/go-libp2p-peer"
)
// PeerTracker tracks task blocks for a single peer, as well as active tasks
// for that peer
type PeerTracker struct {
// Active is the number of track tasks this peer is currently
// processing
// active must be locked around as it will be updated externally
activelk sync.Mutex
active int
activeTasks map[peertask.Identifier]struct{}
// total number of task tasks for this task
numTasks int
// for the PQ interface
index int
freezeVal int
taskMap map[peertask.Identifier]*peertask.TaskBlock
// priority queue of tasks belonging to this peer
taskBlockQueue pq.PQ
}
// New creates a new PeerTracker
func New() *PeerTracker {
return &PeerTracker{
taskBlockQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
taskMap: make(map[peertask.Identifier]*peertask.TaskBlock),
activeTasks: make(map[peertask.Identifier]struct{}),
}
}
// PeerCompare implements pq.ElemComparator
// returns true if peer 'a' has higher priority than peer 'b'
func PeerCompare(a, b pq.Elem) bool {
pa := a.(*PeerTracker)
pb := b.(*PeerTracker)
// having no tasks means lowest priority
// having both of these checks ensures stability of the sort
if pa.numTasks == 0 {
return false
}
if pb.numTasks == 0 {
return true
}
if pa.freezeVal > pb.freezeVal {
return false
}
if pa.freezeVal < pb.freezeVal {
return true
}
if pa.active == pb.active {
// sorting by taskQueue.Len() aids in cleaning out trash tasks faster
// if we sorted instead by requests, one peer could potentially build up
// a huge number of cancelled tasks in the queue resulting in a memory leak
return pa.taskBlockQueue.Len() > pb.taskBlockQueue.Len()
}
return pa.active < pb.active
}
// StartTask signals that a task was started for this peer.
func (p *PeerTracker) StartTask(identifier peertask.Identifier) {
p.activelk.Lock()
p.activeTasks[identifier] = struct{}{}
p.active++
p.activelk.Unlock()
}
// TaskDone signals that a task was completed for this peer.
func (p *PeerTracker) TaskDone(identifier peertask.Identifier) {
p.activelk.Lock()
delete(p.activeTasks, identifier)
p.active--
if p.active < 0 {
panic("more tasks finished than started!")
}
p.activelk.Unlock()
}
// Index implements pq.Elem.
func (p *PeerTracker) Index() int {
return p.index
}
// SetIndex implements pq.Elem.
func (p *PeerTracker) SetIndex(i int) {
p.index = i
}
// PushBlock adds a new block of tasks on to a peers queue from the given
// peer ID, list of tasks, and task block completion function
func (p *PeerTracker) PushBlock(target peer.ID, tasks []peertask.Task, done func(e []peertask.Task)) {
p.activelk.Lock()
defer p.activelk.Unlock()
var priority int
newTasks := make([]peertask.Task, 0, len(tasks))
for _, task := range tasks {
if _, ok := p.activeTasks[task.Identifier]; ok {
continue
}
if taskBlock, ok := p.taskMap[task.Identifier]; ok {
if task.Priority > taskBlock.Priority {
taskBlock.Priority = task.Priority
p.taskBlockQueue.Update(taskBlock.Index())
}
continue
}
if task.Priority > priority {
priority = task.Priority
}
newTasks = append(newTasks, task)
}
if len(newTasks) == 0 {
return
}
taskBlock := peertask.NewTaskBlock(newTasks, priority, target, done)
p.taskBlockQueue.Push(taskBlock)
for _, task := range newTasks {
p.taskMap[task.Identifier] = taskBlock
}
p.numTasks += len(newTasks)
}
// PopBlock removes a block of tasks from this peers queue
func (p *PeerTracker) PopBlock() *peertask.TaskBlock {
var out *peertask.TaskBlock
for p.taskBlockQueue.Len() > 0 && p.freezeVal == 0 {
out = p.taskBlockQueue.Pop().(*peertask.TaskBlock)
for _, task := range out.Tasks {
delete(p.taskMap, task.Identifier)
}
out.PruneTasks()
if len(out.Tasks) > 0 {
for _, task := range out.Tasks {
p.numTasks--
p.StartTask(task.Identifier)
}
} else {
out = nil
continue
}
break
}
return out
}
// Remove removes the task with the given identifier from this peers queue
func (p *PeerTracker) Remove(identifier peertask.Identifier) {
taskBlock, ok := p.taskMap[identifier]
if ok {
taskBlock.MarkPrunable(identifier)
p.numTasks--
}
}
// Freeze increments the freeze value for this peer. While a peer is frozen
// (freeze value > 0) it will not execute tasks.
func (p *PeerTracker) Freeze() {
p.freezeVal++
}
// Thaw decrements the freeze value for this peer. While a peer is frozen
// (freeze value > 0) it will not execute tasks.
func (p *PeerTracker) Thaw() bool {
p.freezeVal -= (p.freezeVal + 1) / 2
return p.freezeVal <= 0
}
// FullThaw completely unfreezes this peer so it can execute tasks.
func (p *PeerTracker) FullThaw() {
p.freezeVal = 0
}
// IsFrozen returns whether this peer is frozen and unable to execute tasks.
func (p *PeerTracker) IsFrozen() bool {
return p.freezeVal > 0
}
......@@ -10,7 +10,7 @@ import (
"github.com/ipfs/go-graphsync/ipldbridge"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/peertask"
peer "github.com/libp2p/go-libp2p-peer"
)
......
......@@ -12,7 +12,7 @@ import (
cid "github.com/ipfs/go-cid"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-graphsync/testbridge"
"github.com/ipfs/go-graphsync/testutil"
ipld "github.com/ipld/go-ipld-prime"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment