Unverified Commit 41fab2da authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #3 from ipfs/feat/configurable-options

Add configuration options for ignoring freezing, listening for adds & removes
parents 89a81893 e97a7ce4
Pipeline #240 failed with stages
in 0 seconds
......@@ -9,24 +9,115 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
)
type peerTaskQueueEvent int
const (
peerAdded = peerTaskQueueEvent(1)
peerRemoved = peerTaskQueueEvent(2)
)
type hookFunc func(p peer.ID, event peerTaskQueueEvent)
// PeerTaskQueue is a prioritized list of tasks to be executed on peers.
// The queue puts tasks on in blocks, then alternates between peers (roughly)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
}
// Option is a function that configures the peer task queue
type Option func(*PeerTaskQueue) Option
func chain(firstOption Option, secondOption Option) Option {
return func(ptq *PeerTaskQueue) Option {
firstReverse := firstOption(ptq)
secondReverse := secondOption(ptq)
return chain(secondReverse, firstReverse)
}
}
// IgnoreFreezing is an option that can make the task queue ignore freezing and unfreezing
func IgnoreFreezing(ignoreFreezing bool) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.ignoreFreezing
ptq.ignoreFreezing = ignoreFreezing
return IgnoreFreezing(previous)
}
}
func removeHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
for i, testHook := range ptq.hooks {
if &hook == &testHook {
ptq.hooks = append(ptq.hooks[:i], ptq.hooks[i+1:]...)
break
}
}
return addHook(hook)
}
}
func addHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
ptq.hooks = append(ptq.hooks, hook)
return removeHook(hook)
}
}
// OnPeerAddedHook adds a hook function that gets called whenever the ptq adds a new peer
func OnPeerAddedHook(onPeerAddedHook func(p peer.ID)) Option {
hook := func(p peer.ID, event peerTaskQueueEvent) {
if event == peerAdded {
onPeerAddedHook(p)
}
}
return addHook(hook)
}
// OnPeerRemovedHook adds a hook function that gets called whenever the ptq adds a new peer
func OnPeerRemovedHook(onPeerRemovedHook func(p peer.ID)) Option {
hook := func(p peer.ID, event peerTaskQueueEvent) {
if event == peerRemoved {
onPeerRemovedHook(p)
}
}
return addHook(hook)
}
// New creates a new PeerTaskQueue
func New() *PeerTaskQueue {
return &PeerTaskQueue{
func New(options ...Option) *PeerTaskQueue {
ptq := &PeerTaskQueue{
peerTrackers: make(map[peer.ID]*peertracker.PeerTracker),
frozenPeers: make(map[peer.ID]struct{}),
pQueue: pq.New(peertracker.PeerCompare),
}
ptq.Options(options...)
return ptq
}
// Options uses configuration functions to configure the peer task queue.
// It returns an Option that can be called to reverse the changes.
func (ptq *PeerTaskQueue) Options(options ...Option) Option {
if len(options) == 0 {
return nil
}
if len(options) == 1 {
return options[0](ptq)
}
reverse := options[0](ptq)
return chain(ptq.Options(options[1:]...), reverse)
}
func (ptq *PeerTaskQueue) callHooks(to peer.ID, event peerTaskQueueEvent) {
for _, hook := range ptq.hooks {
hook(to, event)
}
}
// PushBlock adds a new block of tasks for the given peer to the queue
......@@ -38,6 +129,7 @@ func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
peerTracker = peertracker.New(to)
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
ptq.callHooks(to, peerAdded)
}
peerTracker.PushBlock(to, tasks, func(e []peertask.Task) {
......@@ -65,6 +157,7 @@ func (ptq *PeerTaskQueue) PopBlock() *peertask.TaskBlock {
target := peerTracker.Target()
delete(ptq.peerTrackers, target)
delete(ptq.frozenPeers, target)
ptq.callHooks(target, peerRemoved)
} else {
ptq.pQueue.Push(peerTracker)
}
......@@ -81,11 +174,13 @@ func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
// them a block they already potentially have
if !peerTracker.IsFrozen() {
ptq.frozenPeers[p] = struct{}{}
}
if !ptq.ignoreFreezing {
if !peerTracker.IsFrozen() {
ptq.frozenPeers[p] = struct{}{}
}
peerTracker.Freeze()
peerTracker.Freeze()
}
ptq.pQueue.Update(peerTracker.Index())
}
ptq.lock.Unlock()
......
......@@ -10,6 +10,7 @@ import (
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/testutil"
peer "github.com/libp2p/go-libp2p-peer"
)
func TestPushPop(t *testing.T) {
......@@ -70,8 +71,7 @@ func TestPushPop(t *testing.T) {
}
}
// This test checks that peers wont starve out other peers
func TestPeerRepeats(t *testing.T) {
func TestFreezeUnfreeze(t *testing.T) {
ptq := New()
peers := testutil.GeneratePeers(4)
a := peers[0]
......@@ -90,26 +90,69 @@ func TestPeerRepeats(t *testing.T) {
}
// now, pop off four tasks, there should be one from each
var targets []string
var tasks []*peertask.TaskBlock
for i := 0; i < 4; i++ {
t := ptq.PopBlock()
targets = append(targets, t.Target.Pretty())
tasks = append(tasks, t)
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())
ptq.Remove(peertask.Task{Identifier: "1"}, b)
// b should be frozen, causing it to get skipped in the rotation
matchNTasks(t, ptq, 3, a.Pretty(), c.Pretty(), d.Pretty())
ptq.ThawRound()
matchNTasks(t, ptq, 1, b.Pretty())
}
func TestFreezeUnfreezeNoFreezingOption(t *testing.T) {
ptq := New(IgnoreFreezing(true))
peers := testutil.GeneratePeers(4)
a := peers[0]
b := peers[1]
c := peers[2]
d := peers[3]
// Have each push some blocks
for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushBlock(a, peertask.Task{Identifier: is})
ptq.PushBlock(b, peertask.Task{Identifier: is})
ptq.PushBlock(c, peertask.Task{Identifier: is})
ptq.PushBlock(d, peertask.Task{Identifier: is})
}
expected := []string{a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty()}
sort.Strings(expected)
sort.Strings(targets)
// now, pop off four tasks, there should be one from each
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())
t.Log(targets)
t.Log(expected)
for i, s := range targets {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
ptq.Remove(peertask.Task{Identifier: "1"}, b)
// b should be frozen, causing it to get skipped in the rotation
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())
}
// This test checks that peers wont starve out other peers
func TestPeerRepeats(t *testing.T) {
ptq := New()
peers := testutil.GeneratePeers(4)
a := peers[0]
b := peers[1]
c := peers[2]
d := peers[3]
// Have each push some blocks
for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushBlock(a, peertask.Task{Identifier: is})
ptq.PushBlock(b, peertask.Task{Identifier: is})
ptq.PushBlock(c, peertask.Task{Identifier: is})
ptq.PushBlock(d, peertask.Task{Identifier: is})
}
// now, pop off four tasks, there should be one from each
tasks := matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())
// Now, if one of the tasks gets finished, the next task off the queue should
// be for the same peer
for blockI := 0; blockI < 4; blockI++ {
......@@ -125,6 +168,50 @@ func TestPeerRepeats(t *testing.T) {
}
}
func TestHooks(t *testing.T) {
var peersAdded []string
var peersRemoved []string
onPeerAdded := func(p peer.ID) {
peersAdded = append(peersAdded, p.Pretty())
}
onPeerRemoved := func(p peer.ID) {
peersRemoved = append(peersRemoved, p.Pretty())
}
ptq := New(OnPeerAddedHook(onPeerAdded), OnPeerRemovedHook(onPeerRemoved))
peers := testutil.GeneratePeers(2)
a := peers[0]
b := peers[1]
ptq.PushBlock(a, peertask.Task{Identifier: "1"})
ptq.PushBlock(b, peertask.Task{Identifier: "2"})
expected := []string{a.Pretty(), b.Pretty()}
sort.Strings(expected)
sort.Strings(peersAdded)
if len(peersAdded) != len(expected) {
t.Fatal("Incorrect number of peers added")
}
for i, s := range peersAdded {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}
task := ptq.PopBlock()
task.Done(task.Tasks)
task = ptq.PopBlock()
task.Done(task.Tasks)
ptq.PopBlock()
ptq.PopBlock()
sort.Strings(peersRemoved)
if len(peersRemoved) != len(expected) {
t.Fatal("Incorrect number of peers removed")
}
for i, s := range peersRemoved {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}
}
func TestCleaningUpQueues(t *testing.T) {
ptq := New()
......@@ -157,3 +244,26 @@ func TestCleaningUpQueues(t *testing.T) {
}
}
func matchNTasks(t *testing.T, ptq *PeerTaskQueue, n int, expected ...string) []*peertask.TaskBlock {
var targets []string
var tasks []*peertask.TaskBlock
for i := 0; i < n; i++ {
t := ptq.PopBlock()
targets = append(targets, t.Target.Pretty())
tasks = append(tasks, t)
}
sort.Strings(expected)
sort.Strings(targets)
t.Log(targets)
t.Log(expected)
for i, s := range targets {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}
return tasks
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment