Commit 61a51d9f authored by hannahhoward's avatar hannahhoward

Revert "Merge pull request #44 from ipfs/chore/update-peertaskqueue"

This reverts commit e8245a51, reversing
changes made to 2cbb73ff.
parent e8245a51
...@@ -12,7 +12,7 @@ require ( ...@@ -12,7 +12,7 @@ require (
github.com/ipfs/go-ipfs-blockstore v0.0.1 github.com/ipfs/go-ipfs-blockstore v0.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-log v0.0.1 github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-peertaskqueue v0.1.2-0.20191111205511-fd33b91329d3 github.com/ipfs/go-peertaskqueue v0.0.4
github.com/ipld/go-ipld-prime v0.0.2-0.20191025153308-092ea9a7696d github.com/ipld/go-ipld-prime v0.0.2-0.20191025153308-092ea9a7696d
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/libp2p/go-eventbus v0.0.3 // indirect github.com/libp2p/go-eventbus v0.0.3 // indirect
......
...@@ -102,8 +102,6 @@ github.com/ipfs/go-ipfs-ds-help v0.0.1 h1:QBg+Ts2zgeemK/dB0saiF/ykzRGgfoFMT90Rzo ...@@ -102,8 +102,6 @@ github.com/ipfs/go-ipfs-ds-help v0.0.1 h1:QBg+Ts2zgeemK/dB0saiF/ykzRGgfoFMT90Rzo
github.com/ipfs/go-ipfs-ds-help v0.0.1/go.mod h1:gtP9xRaZXqIQRh1HRpp595KbBEdgqWFxefeVKOV8sxo= github.com/ipfs/go-ipfs-ds-help v0.0.1/go.mod h1:gtP9xRaZXqIQRh1HRpp595KbBEdgqWFxefeVKOV8sxo=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU= github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50= github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50=
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc= github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc=
...@@ -112,8 +110,6 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG ...@@ -112,8 +110,6 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.0.4 h1:i0JprfjjILYcWM1xguO/1MCS8XKVxLSl+ECEVr6i8nw= github.com/ipfs/go-peertaskqueue v0.0.4 h1:i0JprfjjILYcWM1xguO/1MCS8XKVxLSl+ECEVr6i8nw=
github.com/ipfs/go-peertaskqueue v0.0.4/go.mod h1:03H8fhyeMfKNFWqzYEVyMbcPUeYrqP1MX6Kd+aN+rMQ= github.com/ipfs/go-peertaskqueue v0.0.4/go.mod h1:03H8fhyeMfKNFWqzYEVyMbcPUeYrqP1MX6Kd+aN+rMQ=
github.com/ipfs/go-peertaskqueue v0.1.2-0.20191111205511-fd33b91329d3 h1:c9CXamXsukIP0Ij/wIY4VECIq8uUnLPBBBy/XR3wlI8=
github.com/ipfs/go-peertaskqueue v0.1.2-0.20191111205511-fd33b91329d3/go.mod h1:5/eNrBEbtSKWCG+kQK8K8fGNixoYUnr+P7jivavs9lY=
github.com/ipld/go-ipld-prime v0.0.0-20190730002952-369bb56ad071 h1:+jRGf/jb5MnxBsLYczZF0u5pr3nzfbS8pPt/49Yxekw= github.com/ipld/go-ipld-prime v0.0.0-20190730002952-369bb56ad071 h1:+jRGf/jb5MnxBsLYczZF0u5pr3nzfbS8pPt/49Yxekw=
github.com/ipld/go-ipld-prime v0.0.0-20190730002952-369bb56ad071/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w= github.com/ipld/go-ipld-prime v0.0.0-20190730002952-369bb56ad071/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
github.com/ipld/go-ipld-prime v0.0.1-filecoin h1:jK1bUG/z73GNeKrNlVfdexkEIIWp0BEsBVjHkh9WvXo= github.com/ipld/go-ipld-prime v0.0.1-filecoin h1:jK1bUG/z73GNeKrNlVfdexkEIIWp0BEsBVjHkh9WvXo=
......
...@@ -42,10 +42,9 @@ type responseTaskData struct { ...@@ -42,10 +42,9 @@ type responseTaskData struct {
// QueryQueue is an interface that can receive new selector query tasks // QueryQueue is an interface that can receive new selector query tasks
// and prioritize them as needed, and pop them off later // and prioritize them as needed, and pop them off later
type QueryQueue interface { type QueryQueue interface {
PushTasks(to peer.ID, tasks ...peertask.Task) PushBlock(to peer.ID, tasks ...peertask.Task)
PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) PopBlock() *peertask.TaskBlock
Remove(topic peertask.Topic, p peer.ID) Remove(identifier peertask.Identifier, p peer.ID)
TasksDone(to peer.ID, tasks ...*peertask.Task)
ThawRound() ThawRound()
} }
...@@ -140,20 +139,20 @@ func (rm *ResponseManager) processQueriesWorker() { ...@@ -140,20 +139,20 @@ func (rm *ResponseManager) processQueriesWorker() {
taskDataChan := make(chan *responseTaskData) taskDataChan := make(chan *responseTaskData)
var taskData *responseTaskData var taskData *responseTaskData
for { for {
p, nextTasks, _ := rm.queryQueue.PopTasks(1) nextTaskBlock := rm.queryQueue.PopBlock()
for nextTasks == nil { for nextTaskBlock == nil {
select { select {
case <-rm.ctx.Done(): case <-rm.ctx.Done():
return return
case <-rm.workSignal: case <-rm.workSignal:
p, nextTasks, _ = rm.queryQueue.PopTasks(1) nextTaskBlock = rm.queryQueue.PopBlock()
case <-rm.ticker.C: case <-rm.ticker.C:
rm.queryQueue.ThawRound() rm.queryQueue.ThawRound()
p, nextTasks, _ = rm.queryQueue.PopTasks(1) nextTaskBlock = rm.queryQueue.PopBlock()
} }
} }
for _, task := range nextTasks { for _, task := range nextTaskBlock.Tasks {
key := task.Topic.(responseKey) key := task.Identifier.(responseKey)
select { select {
case rm.messages <- &responseDataRequest{key, taskDataChan}: case rm.messages <- &responseDataRequest{key, taskDataChan}:
case <-rm.ctx.Done(): case <-rm.ctx.Done():
...@@ -170,7 +169,8 @@ func (rm *ResponseManager) processQueriesWorker() { ...@@ -170,7 +169,8 @@ func (rm *ResponseManager) processQueriesWorker() {
case <-rm.ctx.Done(): case <-rm.ctx.Done():
} }
} }
rm.queryQueue.TasksDone(p, nextTasks...) nextTaskBlock.Done(nextTaskBlock.Tasks)
} }
} }
...@@ -249,11 +249,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { ...@@ -249,11 +249,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) {
root: request.Root(), root: request.Root(),
selector: request.Selector(), selector: request.Selector(),
} }
rm.queryQueue.PushTasks(prm.p, peertask.Task{ rm.queryQueue.PushBlock(prm.p, peertask.Task{Identifier: key, Priority: int(request.Priority())})
Topic: key,
Priority: int(request.Priority()),
Work: 1,
})
select { select {
case rm.workSignal <- struct{}{}: case rm.workSignal <- struct{}{}:
default: default:
......
...@@ -21,61 +21,55 @@ import ( ...@@ -21,61 +21,55 @@ import (
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
) )
type taskRQ struct {
tasks []*peertask.Task
target peer.ID
}
type fakeQueryQueue struct { type fakeQueryQueue struct {
popWait sync.WaitGroup popWait sync.WaitGroup
queriesLk sync.RWMutex queriesLk sync.RWMutex
queries []*taskRQ queries []*peertask.TaskBlock
} }
func (fqq *fakeQueryQueue) PushTasks(to peer.ID, tasks ...peertask.Task) { func (fqq *fakeQueryQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
fqq.queriesLk.Lock() fqq.queriesLk.Lock()
var ptrs []*peertask.Task fqq.queries = append(fqq.queries, &peertask.TaskBlock{
for _, t := range tasks { Tasks: tasks,
ptrs = append(ptrs, &t) Priority: tasks[0].Priority,
} Target: to,
fqq.queries = append(fqq.queries, &taskRQ{ Done: func([]peertask.Task) {},
tasks: ptrs,
target: to,
}) })
fqq.queriesLk.Unlock() fqq.queriesLk.Unlock()
} }
func (fqq *fakeQueryQueue) PopTasks(targetMinWork int) (peer.ID, []*peertask.Task, int) { func (fqq *fakeQueryQueue) PopBlock() *peertask.TaskBlock {
fqq.popWait.Wait() fqq.popWait.Wait()
fqq.queriesLk.Lock() fqq.queriesLk.Lock()
defer fqq.queriesLk.Unlock() defer fqq.queriesLk.Unlock()
if len(fqq.queries) == 0 { if len(fqq.queries) == 0 {
return "", nil, 0 return nil
} }
trq := fqq.queries[0] block := fqq.queries[0]
fqq.queries = fqq.queries[1:] fqq.queries = fqq.queries[1:]
return trq.target, trq.tasks, 0 return block
} }
func (fqq *fakeQueryQueue) Remove(topic peertask.Topic, p peer.ID) { func (fqq *fakeQueryQueue) Remove(identifier peertask.Identifier, p peer.ID) {
fqq.queriesLk.Lock() fqq.queriesLk.Lock()
defer fqq.queriesLk.Unlock() defer fqq.queriesLk.Unlock()
for i, query := range fqq.queries { for i, query := range fqq.queries {
if query.target == p { if query.Target == p {
for j, task := range query.tasks { for j, task := range query.Tasks {
if task.Topic == topic { if task.Identifier == identifier {
query.tasks = append(query.tasks[:j], query.tasks[j+1:]...) query.Tasks = append(query.Tasks[:j], query.Tasks[j+1:]...)
} }
} }
if len(query.tasks) == 0 { if len(query.Tasks) == 0 {
fqq.queries = append(fqq.queries[:i], fqq.queries[i+1:]...) fqq.queries = append(fqq.queries[:i], fqq.queries[i+1:]...)
} }
} }
} }
} }
func (fqq *fakeQueryQueue) ThawRound() {} func (fqq *fakeQueryQueue) ThawRound() {
func (fqq *fakeQueryQueue) TasksDone(to peer.ID, tasks ...*peertask.Task) {}
}
type fakePeerManager struct { type fakePeerManager struct {
lastPeer peer.ID lastPeer peer.ID
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment