Unverified Commit e8245a51 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #44 from ipfs/chore/update-peertaskqueue

Update peertaskqueue
parents 2cbb73ff 6c1b5b2f
......@@ -12,7 +12,7 @@ require (
github.com/ipfs/go-ipfs-blockstore v0.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-peertaskqueue v0.0.4
github.com/ipfs/go-peertaskqueue v0.1.2-0.20191111205511-fd33b91329d3
github.com/ipld/go-ipld-prime v0.0.2-0.20191025153308-092ea9a7696d
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/libp2p/go-eventbus v0.0.3 // indirect
......
......@@ -102,6 +102,8 @@ github.com/ipfs/go-ipfs-ds-help v0.0.1 h1:QBg+Ts2zgeemK/dB0saiF/ykzRGgfoFMT90Rzo
github.com/ipfs/go-ipfs-ds-help v0.0.1/go.mod h1:gtP9xRaZXqIQRh1HRpp595KbBEdgqWFxefeVKOV8sxo=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50=
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc=
......@@ -110,6 +112,8 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.0.4 h1:i0JprfjjILYcWM1xguO/1MCS8XKVxLSl+ECEVr6i8nw=
github.com/ipfs/go-peertaskqueue v0.0.4/go.mod h1:03H8fhyeMfKNFWqzYEVyMbcPUeYrqP1MX6Kd+aN+rMQ=
github.com/ipfs/go-peertaskqueue v0.1.2-0.20191111205511-fd33b91329d3 h1:c9CXamXsukIP0Ij/wIY4VECIq8uUnLPBBBy/XR3wlI8=
github.com/ipfs/go-peertaskqueue v0.1.2-0.20191111205511-fd33b91329d3/go.mod h1:5/eNrBEbtSKWCG+kQK8K8fGNixoYUnr+P7jivavs9lY=
github.com/ipld/go-ipld-prime v0.0.0-20190730002952-369bb56ad071 h1:+jRGf/jb5MnxBsLYczZF0u5pr3nzfbS8pPt/49Yxekw=
github.com/ipld/go-ipld-prime v0.0.0-20190730002952-369bb56ad071/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
github.com/ipld/go-ipld-prime v0.0.1-filecoin h1:jK1bUG/z73GNeKrNlVfdexkEIIWp0BEsBVjHkh9WvXo=
......
......@@ -42,9 +42,10 @@ type responseTaskData struct {
// QueryQueue is an interface that can receive new selector query tasks
// and prioritize them as needed, and pop them off later
type QueryQueue interface {
PushBlock(to peer.ID, tasks ...peertask.Task)
PopBlock() *peertask.TaskBlock
Remove(identifier peertask.Identifier, p peer.ID)
PushTasks(to peer.ID, tasks ...peertask.Task)
PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int)
Remove(topic peertask.Topic, p peer.ID)
TasksDone(to peer.ID, tasks ...*peertask.Task)
ThawRound()
}
......@@ -139,20 +140,20 @@ func (rm *ResponseManager) processQueriesWorker() {
taskDataChan := make(chan *responseTaskData)
var taskData *responseTaskData
for {
nextTaskBlock := rm.queryQueue.PopBlock()
for nextTaskBlock == nil {
p, nextTasks, _ := rm.queryQueue.PopTasks(1)
for nextTasks == nil {
select {
case <-rm.ctx.Done():
return
case <-rm.workSignal:
nextTaskBlock = rm.queryQueue.PopBlock()
p, nextTasks, _ = rm.queryQueue.PopTasks(1)
case <-rm.ticker.C:
rm.queryQueue.ThawRound()
nextTaskBlock = rm.queryQueue.PopBlock()
p, nextTasks, _ = rm.queryQueue.PopTasks(1)
}
}
for _, task := range nextTaskBlock.Tasks {
key := task.Identifier.(responseKey)
for _, task := range nextTasks {
key := task.Topic.(responseKey)
select {
case rm.messages <- &responseDataRequest{key, taskDataChan}:
case <-rm.ctx.Done():
......@@ -169,8 +170,7 @@ func (rm *ResponseManager) processQueriesWorker() {
case <-rm.ctx.Done():
}
}
nextTaskBlock.Done(nextTaskBlock.Tasks)
rm.queryQueue.TasksDone(p, nextTasks...)
}
}
......@@ -249,7 +249,11 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) {
root: request.Root(),
selector: request.Selector(),
}
rm.queryQueue.PushBlock(prm.p, peertask.Task{Identifier: key, Priority: int(request.Priority())})
rm.queryQueue.PushTasks(prm.p, peertask.Task{
Topic: key,
Priority: int(request.Priority()),
Work: 1,
})
select {
case rm.workSignal <- struct{}{}:
default:
......
......@@ -21,55 +21,61 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)
type taskRQ struct {
tasks []*peertask.Task
target peer.ID
}
type fakeQueryQueue struct {
popWait sync.WaitGroup
queriesLk sync.RWMutex
queries []*peertask.TaskBlock
queries []*taskRQ
}
func (fqq *fakeQueryQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
func (fqq *fakeQueryQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
fqq.queriesLk.Lock()
fqq.queries = append(fqq.queries, &peertask.TaskBlock{
Tasks: tasks,
Priority: tasks[0].Priority,
Target: to,
Done: func([]peertask.Task) {},
var ptrs []*peertask.Task
for _, t := range tasks {
ptrs = append(ptrs, &t)
}
fqq.queries = append(fqq.queries, &taskRQ{
tasks: ptrs,
target: to,
})
fqq.queriesLk.Unlock()
}
func (fqq *fakeQueryQueue) PopBlock() *peertask.TaskBlock {
func (fqq *fakeQueryQueue) PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) {
fqq.popWait.Wait()
fqq.queriesLk.Lock()
defer fqq.queriesLk.Unlock()
if len(fqq.queries) == 0 {
return nil
return "", nil, 0
}
block := fqq.queries[0]
trq := fqq.queries[0]
fqq.queries = fqq.queries[1:]
return block
return trq.target, trq.tasks, 0
}
func (fqq *fakeQueryQueue) Remove(identifier peertask.Identifier, p peer.ID) {
func (fqq *fakeQueryQueue) Remove(topic peertask.Topic, p peer.ID) {
fqq.queriesLk.Lock()
defer fqq.queriesLk.Unlock()
for i, query := range fqq.queries {
if query.Target == p {
for j, task := range query.Tasks {
if task.Identifier == identifier {
query.Tasks = append(query.Tasks[:j], query.Tasks[j+1:]...)
if query.target == p {
for j, task := range query.tasks {
if task.Topic == topic {
query.tasks = append(query.tasks[:j], query.tasks[j+1:]...)
}
}
if len(query.Tasks) == 0 {
if len(query.tasks) == 0 {
fqq.queries = append(fqq.queries[:i], fqq.queries[i+1:]...)
}
}
}
}
func (fqq *fakeQueryQueue) ThawRound() {
}
func (fqq *fakeQueryQueue) ThawRound() {}
func (fqq *fakeQueryQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {}
type fakePeerManager struct {
lastPeer peer.ID
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment