Commit bddc3094 authored by tavit ohanian's avatar tavit ohanian

Merge remote-tracking branch 'upstream/master' into reference

parents 26d2987b 0d567453
Pipeline #236 failed with stages
in 0 seconds
blank_issues_enabled: false
contact_links:
- name: Getting Help on IPFS
url: https://ipfs.io/help
about: All information about how and where to get help on IPFS.
- name: IPFS Official Forum
url: https://discuss.ipfs.io
about: Please post general questions, support requests, and discussions here.
---
name: Open an issue
about: Only for actionable issues relevant to this repository.
title: ''
labels: need/triage
assignees: ''
---
<!--
Hello! To ensure this issue is correctly addressed as soon as possible by the IPFS team, please try to make sure:
- This issue is relevant to this repository's topic or codebase.
- A clear description is provided. It should includes as much relevant information as possible and clear scope for the issue to be actionable.
FOR GENERAL DISCUSSION, HELP OR QUESTIONS, please see the options at https://ipfs.io/help or head directly to https://discuss.ipfs.io.
(you can delete this section after reading)
-->
0.1.1: QmZPNrj6EvRRh1TyzvkyJ9dGyMPn5wBNMCNdXEaY2UzdeK
os:
- linux
language: go
go:
- 1.11.x
env:
global:
- GOTFLAGS="-race"
matrix:
- BUILD_DEPTYPE=gomod
# disable travis install
install:
- true
script:
- bash <(curl -s https://raw.githubusercontent.com/ipfs/ci-helpers/master/travis-ci/run-standard-tests.sh)
cache:
directories:
- $GOPATH/src/gx
- $GOPATH/pkg/mod
- $HOME/.cache/go-build
notifications:
email: false
Copyright 2019. Protocol Labs, Inc.
This library is dual-licensed under Apache 2.0 and MIT terms.
Copyright 2019. Protocol Labs, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Copyright 2019. Protocol Labs, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495 h1:6IyqGr3fnd0tM3YxipK27TUskaOVUjU2nG45yzwcQKY=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-ipfs-pq v0.0.0-20191101181110-8122fa6a9529 h1:izQqDLe/uSPKe6NYr3FjwnvU0AAg0im/4DLVXplLFUQ=
github.com/ipfs/go-ipfs-pq v0.0.0-20191101181110-8122fa6a9529/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I=
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5 h1:l16XLUUJ34wIz+RIvLhSwGvLvKyy+W598b135bJN6mg=
github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs=
github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78=
github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.2 h1:RBysRCv5rv3FWlhKWKoXv8tnsCUpEpIZpCmqAGZos2s=
github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multihash v0.0.1 h1:HHwN1K12I+XllBCrqKnhX949Orn4oawPkegHMu2vDqQ=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.5 h1:1wxmCvTXAifAepIMyF39vZinRw5sbqjPs/UIi93+uik=
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a h1:/eS3yfGjQKG+9kayBkj0ip1BGhq6zJ3eaVksphxAaek=
github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX1gxn7irUTF1fLpQovfQ5M=
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734 h1:p/H982KKEjUnLJkM3tt/LemDnOc1GiZL5FCVlORJ5zo=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d h1:Z0Ahzd7HltpJtjAHHxX8QFP3j1yYgiuvjbjRzDj/KH0=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
package peertask
import (
"time"
pq "github.com/ipfs/go-ipfs-pq"
peer "github.com/libp2p/go-libp2p-core/peer"
)
// FIFOCompare is a basic task comparator that returns tasks in the order created.
var FIFOCompare = func(a, b *QueueTask) bool {
return a.created.Before(b.created)
}
// PriorityCompare respects the target peer's task priority. For tasks involving
// different peers, the oldest task is prioritized.
var PriorityCompare = func(a, b *QueueTask) bool {
if a.Target == b.Target {
return a.Priority > b.Priority
}
return FIFOCompare(a, b)
}
// WrapCompare wraps a QueueTask comparison function so it can be used as
// comparison for a priority queue
func WrapCompare(f func(a, b *QueueTask) bool) func(a, b pq.Elem) bool {
return func(a, b pq.Elem) bool {
return f(a.(*QueueTask), b.(*QueueTask))
}
}
// Topic is a non-unique name for a task. It's used by the client library
// to act on a task once it exits the queue.
type Topic interface{}
// Data is used by the client to associate extra information with a Task
type Data interface{}
// Task is a single task to be executed in Priority order.
type Task struct {
// Topic for the task
Topic Topic
// Priority of the task
Priority int
// The size of the task
// - peers with most active work are deprioritized
// - peers with most pending work are prioritized
Work int
// Arbitrary data associated with this Task by the client
Data Data
}
// QueueTask contains a Task, and also some bookkeeping information.
// It is used internally by the PeerTracker to keep track of tasks.
type QueueTask struct {
Task
Target peer.ID
created time.Time // created marks the time that the task was added to the queue
index int // book-keeping field used by the pq container
}
// NewQueueTask creates a new QueueTask from the given Task.
func NewQueueTask(task Task, target peer.ID, created time.Time) *QueueTask {
return &QueueTask{
Task: task,
Target: target,
created: created,
}
}
// Index implements pq.Elem.
func (pt *QueueTask) Index() int {
return pt.index
}
// SetIndex implements pq.Elem.
func (pt *QueueTask) SetIndex(i int) {
pt.index = i
}
package peertaskqueue
import (
"sync"
pq "github.com/ipfs/go-ipfs-pq"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/peertracker"
peer "github.com/libp2p/go-libp2p-core/peer"
)
type peerTaskQueueEvent int
const (
peerAdded = peerTaskQueueEvent(1)
peerRemoved = peerTaskQueueEvent(2)
)
type hookFunc func(p peer.ID, event peerTaskQueueEvent)
// PeerTaskQueue is a prioritized list of tasks to be executed on peers.
// Tasks are added to the queue, then popped off alternately between peers (roughly)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
taskMerger peertracker.TaskMerger
}
// Option is a function that configures the peer task queue
type Option func(*PeerTaskQueue) Option
func chain(firstOption Option, secondOption Option) Option {
return func(ptq *PeerTaskQueue) Option {
firstReverse := firstOption(ptq)
secondReverse := secondOption(ptq)
return chain(secondReverse, firstReverse)
}
}
// IgnoreFreezing is an option that can make the task queue ignore freezing and unfreezing
func IgnoreFreezing(ignoreFreezing bool) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.ignoreFreezing
ptq.ignoreFreezing = ignoreFreezing
return IgnoreFreezing(previous)
}
}
// TaskMerger is an option that specifies merge behaviour when pushing a task
// with the same Topic as an existing Topic.
func TaskMerger(tmfp peertracker.TaskMerger) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.taskMerger
ptq.taskMerger = tmfp
return TaskMerger(previous)
}
}
func removeHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
for i, testHook := range ptq.hooks {
if &hook == &testHook {
ptq.hooks = append(ptq.hooks[:i], ptq.hooks[i+1:]...)
break
}
}
return addHook(hook)
}
}
func addHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
ptq.hooks = append(ptq.hooks, hook)
return removeHook(hook)
}
}
// OnPeerAddedHook adds a hook function that gets called whenever the ptq adds a new peer
func OnPeerAddedHook(onPeerAddedHook func(p peer.ID)) Option {
hook := func(p peer.ID, event peerTaskQueueEvent) {
if event == peerAdded {
onPeerAddedHook(p)
}
}
return addHook(hook)
}
// OnPeerRemovedHook adds a hook function that gets called whenever the ptq adds a new peer
func OnPeerRemovedHook(onPeerRemovedHook func(p peer.ID)) Option {
hook := func(p peer.ID, event peerTaskQueueEvent) {
if event == peerRemoved {
onPeerRemovedHook(p)
}
}
return addHook(hook)
}
// New creates a new PeerTaskQueue
func New(options ...Option) *PeerTaskQueue {
ptq := &PeerTaskQueue{
peerTrackers: make(map[peer.ID]*peertracker.PeerTracker),
frozenPeers: make(map[peer.ID]struct{}),
pQueue: pq.New(peertracker.PeerCompare),
taskMerger: &peertracker.DefaultTaskMerger{},
}
ptq.Options(options...)
return ptq
}
// Options uses configuration functions to configure the peer task queue.
// It returns an Option that can be called to reverse the changes.
func (ptq *PeerTaskQueue) Options(options ...Option) Option {
if len(options) == 0 {
return nil
}
if len(options) == 1 {
return options[0](ptq)
}
reverse := options[0](ptq)
return chain(ptq.Options(options[1:]...), reverse)
}
func (ptq *PeerTaskQueue) callHooks(to peer.ID, event peerTaskQueueEvent) {
for _, hook := range ptq.hooks {
hook(to, event)
}
}
// PushTasks adds a new group of tasks for the given peer to the queue
func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
peerTracker, ok := ptq.peerTrackers[to]
if !ok {
peerTracker = peertracker.New(to, ptq.taskMerger)
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
ptq.callHooks(to, peerAdded)
}
peerTracker.PushTasks(tasks...)
ptq.pQueue.Update(peerTracker.Index())
}
// PopTasks finds the peer with the highest priority and pops as many tasks
// off the peer's queue as necessary to cover targetMinWork, in priority order.
// If there are not enough tasks to cover targetMinWork it just returns
// whatever is in the peer's queue.
// - Peers with the most "active" work are deprioritized.
// This heuristic is for fairness, we try to keep all peers "busy".
// - Peers with the most "pending" work are prioritized.
// This heuristic is so that peers with a lot to do get asked for work first.
// The third response argument is pending work: the amount of work in the
// queue for this peer.
func (ptq *PeerTaskQueue) PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
if ptq.pQueue.Len() == 0 {
return "", nil, -1
}
var peerTracker *peertracker.PeerTracker
// Choose the highest priority peer
peerTracker = ptq.pQueue.Peek().(*peertracker.PeerTracker)
if peerTracker == nil {
return "", nil, -1
}
// Get the highest priority tasks for the given peer
out, pendingWork := peerTracker.PopTasks(targetMinWork)
// If the peer has no more tasks, remove its peer tracker
if peerTracker.IsIdle() {
ptq.pQueue.Pop()
target := peerTracker.Target()
delete(ptq.peerTrackers, target)
delete(ptq.frozenPeers, target)
ptq.callHooks(target, peerRemoved)
} else {
// We may have modified the peer tracker's state (by popping tasks), so
// update its position in the priority queue
ptq.pQueue.Update(peerTracker.Index())
}
return peerTracker.Target(), out, pendingWork
}
// TasksDone is called to indicate that the given tasks have completed
// for the given peer
func (ptq *PeerTaskQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
// Get the peer tracker for the peer
peerTracker, ok := ptq.peerTrackers[to]
if !ok {
return
}
// Tell the peer tracker that the tasks have completed
for _, task := range tasks {
peerTracker.TaskDone(task)
}
// This may affect the peer's position in the peer queue, so update if
// necessary
ptq.pQueue.Update(peerTracker.Index())
}
// Remove removes a task from the queue.
func (ptq *PeerTaskQueue) Remove(topic peertask.Topic, p peer.ID) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
peerTracker, ok := ptq.peerTrackers[p]
if ok {
if peerTracker.Remove(topic) {
// we now also 'freeze' that partner. If they sent us a cancel for a
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
// them a block they already potentially have
if !ptq.ignoreFreezing {
if !peerTracker.IsFrozen() {
ptq.frozenPeers[p] = struct{}{}
}
peerTracker.Freeze()
}
ptq.pQueue.Update(peerTracker.Index())
}
}
}
// FullThaw completely thaws all peers in the queue so they can execute tasks.
func (ptq *PeerTaskQueue) FullThaw() {
ptq.lock.Lock()
defer ptq.lock.Unlock()
for p := range ptq.frozenPeers {
peerTracker, ok := ptq.peerTrackers[p]
if ok {
peerTracker.FullThaw()
delete(ptq.frozenPeers, p)
ptq.pQueue.Update(peerTracker.Index())
}
}
}
// ThawRound unthaws peers incrementally, so that those have been frozen the least
// become unfrozen and able to execute tasks first.
func (ptq *PeerTaskQueue) ThawRound() {
ptq.lock.Lock()
defer ptq.lock.Unlock()
for p := range ptq.frozenPeers {
peerTracker, ok := ptq.peerTrackers[p]
if ok {
if peerTracker.Thaw() {
delete(ptq.frozenPeers, p)
}
ptq.pQueue.Update(peerTracker.Index())
}
}
}
package peertaskqueue
import (
"fmt"
"math"
"math/rand"
"sort"
"strings"
"testing"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/testutil"
peer "github.com/libp2p/go-libp2p-core/peer"
)
func TestPushPop(t *testing.T) {
ptq := New()
partner := testutil.GeneratePeers(1)[0]
alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
vowels := strings.Split("aeiou", "")
consonants := func() []string {
var out []string
for _, letter := range alphabet {
skip := false
for _, vowel := range vowels {
if letter == vowel {
skip = true
}
}
if !skip {
out = append(out, letter)
}
}
return out
}()
sort.Strings(alphabet)
sort.Strings(vowels)
sort.Strings(consonants)
// add a bunch of blocks. cancel some. drain the queue. the queue should only have the kept tasks
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
letter := alphabet[index]
t.Log(letter)
// add tasks out of order, but with in-order priority
ptq.PushTasks(partner, peertask.Task{Topic: letter, Priority: math.MaxInt32 - index})
}
for _, consonant := range consonants {
ptq.Remove(consonant, partner)
}
ptq.FullThaw()
var out []string
for {
_, received, _ := ptq.PopTasks(100)
if len(received) == 0 {
break
}
for _, task := range received {
out = append(out, task.Topic.(string))
}
}
// Tasks popped should already be in correct order
for i, expected := range vowels {
if out[i] != expected {
t.Fatal("received", out[i], "expected", expected)
}
}
}
func TestFreezeUnfreeze(t *testing.T) {
ptq := New()
peers := testutil.GeneratePeers(4)
a := peers[0]
b := peers[1]
c := peers[2]
d := peers[3]
// Push 5 blocks to each peer
for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushTasks(a, peertask.Task{Topic: is, Work: 1})
ptq.PushTasks(b, peertask.Task{Topic: is, Work: 1})
ptq.PushTasks(c, peertask.Task{Topic: is, Work: 1})
ptq.PushTasks(d, peertask.Task{Topic: is, Work: 1})
}
// now, pop off four tasks, there should be one from each
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())
ptq.Remove("1", b)
// b should be frozen, causing it to get skipped in the rotation
matchNTasks(t, ptq, 3, a.Pretty(), c.Pretty(), d.Pretty())
ptq.ThawRound()
matchNTasks(t, ptq, 1, b.Pretty())
// remove none existent task
ptq.Remove("-1", b)
// b should not be frozen
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())
}
func TestFreezeUnfreezeNoFreezingOption(t *testing.T) {
ptq := New(IgnoreFreezing(true))
peers := testutil.GeneratePeers(4)
a := peers[0]
b := peers[1]
c := peers[2]
d := peers[3]
// Have each push some blocks
for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushTasks(a, peertask.Task{Topic: is, Work: 1})
ptq.PushTasks(b, peertask.Task{Topic: is, Work: 1})
ptq.PushTasks(c, peertask.Task{Topic: is, Work: 1})
ptq.PushTasks(d, peertask.Task{Topic: is, Work: 1})
}
// now, pop off four tasks, there should be one from each
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())
ptq.Remove("1", b)
// b should not be frozen, so it wont get skipped in the rotation
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())
}
// This test checks that ordering of peers is correct
func TestPeerOrder(t *testing.T) {
ptq := New()
peers := testutil.GeneratePeers(3)
a := peers[0]
b := peers[1]
c := peers[2]
ptq.PushTasks(a, peertask.Task{Topic: "1", Work: 3, Priority: 2})
ptq.PushTasks(a, peertask.Task{Topic: "2", Work: 1, Priority: 1})
ptq.PushTasks(b, peertask.Task{Topic: "3", Work: 1, Priority: 3})
ptq.PushTasks(b, peertask.Task{Topic: "4", Work: 3, Priority: 2})
ptq.PushTasks(b, peertask.Task{Topic: "5", Work: 1, Priority: 1})
ptq.PushTasks(c, peertask.Task{Topic: "6", Work: 2, Priority: 2})
ptq.PushTasks(c, peertask.Task{Topic: "7", Work: 2, Priority: 1})
// All peers have nothing in their active queue, so equal chance of any
// peer being chosen
var ps []string
var ids []string
for i := 0; i < 3; i++ {
p, tasks, _ := ptq.PopTasks(1)
ps = append(ps, p.String())
ids = append(ids, fmt.Sprint(tasks[0].Topic))
}
matchArrays(t, ps, []string{a.String(), b.String(), c.String()})
matchArrays(t, ids, []string{"1", "3", "6"})
// Active queues:
// a: 3 Pending: [1]
// b: 1 Pending: [3, 1]
// c: 2 Pending: [2]
// So next peer should be b (least work in active queue)
p, tsk, pending := ptq.PopTasks(1)
if len(tsk) != 1 || p != b || tsk[0].Topic != "4" {
t.Fatal("Expected ID 4 from peer b")
}
if pending != 1 {
t.Fatal("Expected pending work to be 1")
}
// Active queues:
// a: 3 Pending: [1]
// b: 1 + 3 Pending: [1]
// c: 2 Pending: [2]
// So next peer should be c (least work in active queue)
p, tsk, pending = ptq.PopTasks(1)
if len(tsk) != 1 || p != c || tsk[0].Topic != "7" {
t.Fatal("Expected ID 7 from peer c")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
// Active queues:
// a: 3 Pending: [1]
// b: 1 + 3 Pending: [1]
// c: 2 + 2
// So next peer should be a (least work in active queue)
p, tsk, pending = ptq.PopTasks(1)
if len(tsk) != 1 || p != a || tsk[0].Topic != "2" {
t.Fatal("Expected ID 2 from peer a")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
// Active queues:
// a: 3 + 1
// b: 1 + 3 Pending: [1]
// c: 2 + 2
// a & c have no more pending tasks, so next peer should be b
p, tsk, pending = ptq.PopTasks(1)
if len(tsk) != 1 || p != b || tsk[0].Topic != "5" {
t.Fatal("Expected ID 5 from peer b")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
// Active queues:
// a: 3 + 1
// b: 1 + 3 + 1
// c: 2 + 2
// No more pending tasks, so next pop should return nothing
_, tsk, pending = ptq.PopTasks(1)
if len(tsk) != 0 {
t.Fatal("Expected no more tasks")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
}
func TestHooks(t *testing.T) {
var peersAdded []string
var peersRemoved []string
onPeerAdded := func(p peer.ID) {
peersAdded = append(peersAdded, p.Pretty())
}
onPeerRemoved := func(p peer.ID) {
peersRemoved = append(peersRemoved, p.Pretty())
}
ptq := New(OnPeerAddedHook(onPeerAdded), OnPeerRemovedHook(onPeerRemoved))
peers := testutil.GeneratePeers(2)
a := peers[0]
b := peers[1]
ptq.PushTasks(a, peertask.Task{Topic: "1"})
ptq.PushTasks(b, peertask.Task{Topic: "2"})
expected := []string{a.Pretty(), b.Pretty()}
sort.Strings(expected)
sort.Strings(peersAdded)
if len(peersAdded) != len(expected) {
t.Fatal("Incorrect number of peers added")
}
for i, s := range peersAdded {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}
p, task, _ := ptq.PopTasks(100)
ptq.TasksDone(p, task...)
p, task, _ = ptq.PopTasks(100)
ptq.TasksDone(p, task...)
ptq.PopTasks(100)
ptq.PopTasks(100)
sort.Strings(peersRemoved)
if len(peersRemoved) != len(expected) {
t.Fatal("Incorrect number of peers removed")
}
for i, s := range peersRemoved {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}
}
func TestCleaningUpQueues(t *testing.T) {
ptq := New()
peer := testutil.GeneratePeers(1)[0]
var peerTasks []peertask.Task
for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
peerTasks = append(peerTasks, peertask.Task{Topic: is})
}
// push a block, pop a block, complete everything, should be removed
ptq.PushTasks(peer, peerTasks...)
p, task, _ := ptq.PopTasks(100)
ptq.TasksDone(p, task...)
_, task, _ = ptq.PopTasks(100)
if len(task) != 0 || len(ptq.peerTrackers) > 0 || ptq.pQueue.Len() > 0 {
t.Fatal("PeerTracker should have been removed because it's idle")
}
// push a block, remove each of its entries, should be removed
ptq.PushTasks(peer, peerTasks...)
for _, peerTask := range peerTasks {
ptq.Remove(peerTask.Topic, peer)
}
_, task, _ = ptq.PopTasks(100)
if len(task) != 0 || len(ptq.peerTrackers) > 0 || ptq.pQueue.Len() > 0 {
t.Fatal("Partner should have been removed because it's idle")
}
}
func matchNTasks(t *testing.T, ptq *PeerTaskQueue, n int, expected ...string) {
var targets []string
for i := 0; i < n; i++ {
p, tsk, _ := ptq.PopTasks(1)
if len(tsk) != 1 {
t.Fatal("expected 1 task at a time")
}
targets = append(targets, p.Pretty())
}
matchArrays(t, expected, targets)
}
func matchArrays(t *testing.T, str1, str2 []string) {
if len(str1) != len(str2) {
t.Fatal("array lengths did not match", str1, str2)
}
sort.Strings(str1)
sort.Strings(str2)
t.Log(str1)
t.Log(str2)
for i, s := range str2 {
if str1[i] != s {
t.Fatal("unexpected peer", s, str1[i])
}
}
}
package peertracker
import (
"sync"
"time"
pq "github.com/ipfs/go-ipfs-pq"
"github.com/ipfs/go-peertaskqueue/peertask"
peer "github.com/libp2p/go-libp2p-core/peer"
)
// TaskMerger is an interface that is used to merge new tasks into the active
// and pending queues
type TaskMerger interface {
// HasNewInfo indicates whether the given task has more information than
// the existing group of tasks (which have the same Topic), and thus should
// be merged.
HasNewInfo(task peertask.Task, existing []peertask.Task) bool
// Merge copies relevant fields from a new task to an existing task.
Merge(task peertask.Task, existing *peertask.Task)
}
// DefaultTaskMerger is the TaskMerger used by default. It never overwrites an
// existing task (with the same Topic).
type DefaultTaskMerger struct{}
func (*DefaultTaskMerger) HasNewInfo(task peertask.Task, existing []peertask.Task) bool {
return false
}
func (*DefaultTaskMerger) Merge(task peertask.Task, existing *peertask.Task) {
}
// PeerTracker tracks task blocks for a single peer, as well as active tasks
// for that peer
type PeerTracker struct {
target peer.ID
// Tasks that are pending being made active
pendingTasks map[peertask.Topic]*peertask.QueueTask
// Tasks that have been made active
activeTasks map[*peertask.Task]struct{}
// activeWork must be locked around as it will be updated externally
activelk sync.Mutex
activeWork int
// for the PQ interface
index int
freezeVal int
// priority queue of tasks belonging to this peer
taskQueue pq.PQ
taskMerger TaskMerger
}
// New creates a new PeerTracker
func New(target peer.ID, taskMerger TaskMerger) *PeerTracker {
return &PeerTracker{
target: target,
taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
pendingTasks: make(map[peertask.Topic]*peertask.QueueTask),
activeTasks: make(map[*peertask.Task]struct{}),
taskMerger: taskMerger,
}
}
// PeerCompare implements pq.ElemComparator
// returns true if peer 'a' has higher priority than peer 'b'
func PeerCompare(a, b pq.Elem) bool {
pa := a.(*PeerTracker)
pb := b.(*PeerTracker)
// having no pending tasks means lowest priority
paPending := len(pa.pendingTasks)
pbPending := len(pb.pendingTasks)
if paPending == 0 {
return false
}
if pbPending == 0 {
return true
}
// Frozen peers have lowest priority
if pa.freezeVal > pb.freezeVal {
return false
}
if pa.freezeVal < pb.freezeVal {
return true
}
// If each peer has an equal amount of work in its active queue, choose the
// peer with the most amount of work pending
if pa.activeWork == pb.activeWork {
return paPending > pbPending
}
// Choose the peer with the least amount of work in its active queue.
// This way we "keep peers busy" by sending them as much data as they can
// process.
return pa.activeWork < pb.activeWork
}
// Target returns the peer that this peer tracker tracks tasks for
func (p *PeerTracker) Target() peer.ID {
return p.target
}
// IsIdle returns true if the peer has no active tasks or queued tasks
func (p *PeerTracker) IsIdle() bool {
p.activelk.Lock()
defer p.activelk.Unlock()
return len(p.pendingTasks) == 0 && len(p.activeTasks) == 0
}
// Index implements pq.Elem.
func (p *PeerTracker) Index() int {
return p.index
}
// SetIndex implements pq.Elem.
func (p *PeerTracker) SetIndex(i int) {
p.index = i
}
// PushTasks adds a group of tasks onto a peer's queue
func (p *PeerTracker) PushTasks(tasks ...peertask.Task) {
now := time.Now()
p.activelk.Lock()
defer p.activelk.Unlock()
for _, task := range tasks {
// If the new task doesn't add any more information over what we
// already have in the active queue, then we can skip the new task
if !p.taskHasMoreInfoThanActiveTasks(task) {
continue
}
// If there is already a non-active task with this Topic
if existingTask, ok := p.pendingTasks[task.Topic]; ok {
// If the new task has a higher priority than the old task,
if task.Priority > existingTask.Priority {
// Update the priority and the task's position in the queue
existingTask.Priority = task.Priority
p.taskQueue.Update(existingTask.Index())
}
p.taskMerger.Merge(task, &existingTask.Task)
// A task with the Topic exists, so we don't need to add
// the new task to the queue
continue
}
// Push the new task onto the queue
qTask := peertask.NewQueueTask(task, p.target, now)
p.pendingTasks[task.Topic] = qTask
p.taskQueue.Push(qTask)
}
}
// PopTasks pops as many tasks off the queue as necessary to cover
// targetMinWork, in priority order. If there are not enough tasks to cover
// targetMinWork it just returns whatever is in the queue.
// The second response argument is pending work: the amount of work in the
// queue for this peer.
func (p *PeerTracker) PopTasks(targetMinWork int) ([]*peertask.Task, int) {
var out []*peertask.Task
work := 0
for p.taskQueue.Len() > 0 && p.freezeVal == 0 && work < targetMinWork {
// Pop the next task off the queue
t := p.taskQueue.Pop().(*peertask.QueueTask)
// Start the task (this makes it "active")
p.startTask(&t.Task)
out = append(out, &t.Task)
work += t.Work
}
return out, p.getPendingWork()
}
// startTask signals that a task was started for this peer.
func (p *PeerTracker) startTask(task *peertask.Task) {
p.activelk.Lock()
defer p.activelk.Unlock()
// Remove task from pending queue
delete(p.pendingTasks, task.Topic)
// Add task to active queue
if _, ok := p.activeTasks[task]; !ok {
p.activeTasks[task] = struct{}{}
p.activeWork += task.Work
}
}
func (p *PeerTracker) getPendingWork() int {
total := 0
for _, t := range p.pendingTasks {
total += t.Work
}
return total
}
// TaskDone signals that a task was completed for this peer.
func (p *PeerTracker) TaskDone(task *peertask.Task) {
p.activelk.Lock()
defer p.activelk.Unlock()
// Remove task from active queue
if _, ok := p.activeTasks[task]; ok {
delete(p.activeTasks, task)
p.activeWork -= task.Work
if p.activeWork < 0 {
panic("more tasks finished than started!")
}
}
}
// Remove removes the task with the given topic from this peer's queue
func (p *PeerTracker) Remove(topic peertask.Topic) bool {
t, ok := p.pendingTasks[topic]
if ok {
delete(p.pendingTasks, topic)
p.taskQueue.Remove(t.Index())
}
return ok
}
// Freeze increments the freeze value for this peer. While a peer is frozen
// (freeze value > 0) it will not execute tasks.
func (p *PeerTracker) Freeze() {
p.freezeVal++
}
// Thaw decrements the freeze value for this peer. While a peer is frozen
// (freeze value > 0) it will not execute tasks.
func (p *PeerTracker) Thaw() bool {
p.freezeVal -= (p.freezeVal + 1) / 2
return p.freezeVal <= 0
}
// FullThaw completely unfreezes this peer so it can execute tasks.
func (p *PeerTracker) FullThaw() {
p.freezeVal = 0
}
// IsFrozen returns whether this peer is frozen and unable to execute tasks.
func (p *PeerTracker) IsFrozen() bool {
return p.freezeVal > 0
}
// Indicates whether the new task adds any more information over tasks that are
// already in the active task queue
func (p *PeerTracker) taskHasMoreInfoThanActiveTasks(task peertask.Task) bool {
var tasksWithTopic []peertask.Task
for at := range p.activeTasks {
if task.Topic == at.Topic {
tasksWithTopic = append(tasksWithTopic, *at)
}
}
// No tasks with that topic, so the new task adds information
if len(tasksWithTopic) == 0 {
return true
}
return p.taskMerger.HasNewInfo(task, tasksWithTopic)
}
package peertracker
import (
"testing"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/testutil"
)
func TestEmpty(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks, _ := tracker.PopTasks(100)
if len(tasks) != 0 {
t.Fatal("Expected no tasks")
}
}
func TestPushPop(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 1,
Work: 10,
},
}
tracker.PushTasks(tasks...)
popped, _ := tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if popped[0].Topic != "1" {
t.Fatal("Expected same task")
}
}
func TestPopNegativeOrZeroSize(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 1,
Work: 10,
},
}
tracker.PushTasks(tasks...)
popped, _ := tracker.PopTasks(-1)
if len(popped) != 0 {
t.Fatal("Expected 0 tasks")
}
popped, _ = tracker.PopTasks(0)
if len(popped) != 0 {
t.Fatal("Expected 0 tasks")
}
}
func TestPushPopSizeAndOrder(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
},
peertask.Task{
Topic: "2",
Priority: 20,
Work: 10,
},
peertask.Task{
Topic: "3",
Priority: 15,
Work: 10,
},
}
tracker.PushTasks(tasks...)
popped, pending := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if popped[0].Topic != "2" {
t.Fatal("Expected tasks in order")
}
if pending != 20 {
t.Fatal("Expected pending work to be 20")
}
popped, pending = tracker.PopTasks(100)
if len(popped) != 2 {
t.Fatal("Expected 2 tasks")
}
if popped[0].Topic != "3" || popped[1].Topic != "1" {
t.Fatal("Expected tasks in order")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
popped, pending = tracker.PopTasks(100)
if len(popped) != 0 {
t.Fatal("Expected 0 tasks")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
}
func TestPopFirstItemAlways(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 20,
Work: 10,
},
peertask.Task{
Topic: "2",
Priority: 10,
Work: 5,
},
}
tracker.PushTasks(tasks...)
// Pop with target size 7.
// PopTasks should always return the first task even if it's under target work.
popped, _ := tracker.PopTasks(7)
if len(popped) != 1 || popped[0].Topic != "1" {
t.Fatal("Expected first task to be popped")
}
popped, _ = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
}
func TestPopItemsToCoverTargetWork(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 20,
Work: 5,
},
peertask.Task{
Topic: "2",
Priority: 10,
Work: 5,
},
peertask.Task{
Topic: "3",
Priority: 5,
Work: 5,
},
}
tracker.PushTasks(tasks...)
// Pop with target size 7.
// PopTasks should return enough items to cover the target work.
popped, _ := tracker.PopTasks(7)
if len(popped) != 2 || popped[0].Topic != "1" || popped[1].Topic != "2" {
t.Fatal("Expected first two tasks to be popped")
}
popped, _ = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
}
func TestRemove(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
},
peertask.Task{
Topic: "2",
Priority: 20,
Work: 10,
},
peertask.Task{
Topic: "3",
Priority: 15,
Work: 10,
},
}
tracker.PushTasks(tasks...)
tracker.Remove("2")
popped, _ := tracker.PopTasks(100)
if len(popped) != 2 {
t.Fatal("Expected 2 tasks")
}
if popped[0].Topic != "3" || popped[1].Topic != "1" {
t.Fatal("Expected tasks in order")
}
}
func TestRemoveMulti(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
},
peertask.Task{
Topic: "1",
Priority: 20,
Work: 1,
},
peertask.Task{
Topic: "2",
Priority: 15,
Work: 10,
},
}
tracker.PushTasks(tasks...)
tracker.Remove("1")
popped, _ := tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if popped[0].Topic != "2" {
t.Fatal("Expected remaining task")
}
}
func TestTaskDone(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 20,
Work: 10,
Data: "b",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Pop task "a". This makes the task active.
popped, _ := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
// Mark task "a" as done.
tracker.TaskDone(popped[0])
// Push task "b"
tracker.PushTasks(tasks[1]) // Topic "1"
// Pop all tasks. Task "a" was done so task "b" should have been allowed to
// be added.
popped, _ = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
}
type permissiveTaskMerger struct{}
func (*permissiveTaskMerger) HasNewInfo(task peertask.Task, existing []peertask.Task) bool {
return true
}
func (*permissiveTaskMerger) Merge(task peertask.Task, existing *peertask.Task) {
existing.Data = task.Data
existing.Work = task.Work
}
func TestReplaceTaskPermissive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 20,
Work: 10,
Data: "b",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Push task "b". Has same topic and permissive task merger, so should
// replace task "a".
tracker.PushTasks(tasks[1]) // Topic "1"
// Pop all tasks, should only be task "b".
popped, _ := tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if popped[0].Data != "b" {
t.Fatal("Expected b to replace a")
}
if popped[0].Priority != 20 {
t.Fatal("Expected higher Priority to replace lower Priority")
}
}
func TestReplaceTaskSize(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 10,
Work: 20,
Data: "b",
},
peertask.Task{
Topic: "2",
Priority: 5,
Work: 5,
Data: "c",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Push task "b". Has same topic as task "a" and permissive task merger,
// so should replace task "a", and update its Work from 10 to 20.
tracker.PushTasks(tasks[1]) // Topic "1"
// Push task "c"
tracker.PushTasks(tasks[2]) // Topic "2"
// Pop with target size 15. Should only pop task "a" because its Work
// is now 20 (was 10)
popped, pending := tracker.PopTasks(15)
if len(popped) != 1 || popped[0].Data != "b" {
t.Fatal("Expected 1 task")
}
if pending != 5 {
t.Fatal("Expected pending work to be 5")
}
popped, pending = tracker.PopTasks(30)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if pending != 0 {
t.Fatal("Expected pending work to be 0")
}
}
func TestReplaceActiveTask(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 20,
Work: 10,
Data: "b",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Pop task "a". This makes the task active.
popped, _ := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
// Push task "b"
tracker.PushTasks(tasks[1]) // Topic "1"
// Pop all tasks. Task "a" was active so task "b" should have been moved to
// the pending queue.
popped, _ = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
}
func TestReplaceActiveTaskNonPermissive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 20,
Work: 10,
Data: "b",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Pop task "a". This makes the task active.
popped, _ := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
// Push task "b". Task merger is not permissive, so should ignore task "b".
tracker.PushTasks(tasks[1]) // Topic "1"
// Pop all tasks.
popped, _ = tracker.PopTasks(100)
if len(popped) != 0 {
t.Fatal("Expected no tasks")
}
}
func TestReplaceTaskThatIsActiveAndPending(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "b",
},
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "c",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Pop task "a". This makes the task active.
popped, _ := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
// Push task "b". Same Topic so should be added to the pending queue.
tracker.PushTasks(tasks[1]) // Topic "1"
// Push task "c". Permissive task merger so should replace pending task "b"
// with same Topic.
tracker.PushTasks(tasks[2]) // Topic "1"
// Pop all tasks.
popped, _ = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if popped[0].Data != "c" {
t.Fatalf("Expected last task to overwrite pending task")
}
}
func TestRemoveActive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Work: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 20,
Work: 10,
Data: "b",
},
peertask.Task{
Topic: "2",
Priority: 15,
Work: 10,
Data: "c",
},
}
// Push task "a"
tracker.PushTasks(tasks[0]) // Topic "1"
// Pop task "a". This makes the task active.
popped, _ := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
// Push task "b" and "c"
tracker.PushTasks(tasks[1]) // Topic "1"
tracker.PushTasks(tasks[2]) // Topic "2"
// Remove all tasks with Topic "1".
// This should remove task "b" from the pending queue.
tracker.Remove("1")
popped, _ = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
if popped[0].Topic != "2" {
t.Fatal("Expected tasks in order")
}
}
package testutil
import peer "github.com/libp2p/go-libp2p-core/peer"
var peerSeq int
// GeneratePeers creates n peer ids.
func GeneratePeers(n int) []peer.ID {
peerIds := make([]peer.ID, 0, n)
for i := 0; i < n; i++ {
peerSeq++
p := peer.ID(peerSeq)
peerIds = append(peerIds, p)
}
return peerIds
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment