Commit c93269ee authored by Brian Tiger Chow's avatar Brian Tiger Chow Committed by Juan Batiz-Benet

it's not a queue yet but it's okay to name it as such

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent fcaf7f56
......@@ -26,9 +26,9 @@ type LedgerManager struct {
lock sync.RWMutex
ledgerMap ledgerMap
bs bstore.Blockstore
// FIXME tasklist isn't threadsafe nor is it protected by a mutex. consider
// a way to avoid sharing the tasklist between the worker and the receiver
tasklist *taskList
// FIXME taskqueue isn't threadsafe nor is it protected by a mutex. consider
// a way to avoid sharing the taskqueue between the worker and the receiver
taskqueue *taskQueue
outbox chan Envelope
workSignal chan struct{}
}
......@@ -37,7 +37,7 @@ func NewLedgerManager(ctx context.Context, bs bstore.Blockstore) *LedgerManager
lm := &LedgerManager{
ledgerMap: make(ledgerMap),
bs: bs,
tasklist: newTaskList(),
taskqueue: newTaskQueue(),
outbox: make(chan Envelope, 4), // TODO extract constant
workSignal: make(chan struct{}),
}
......@@ -47,7 +47,7 @@ func NewLedgerManager(ctx context.Context, bs bstore.Blockstore) *LedgerManager
func (lm *LedgerManager) taskWorker(ctx context.Context) {
for {
nextTask := lm.tasklist.Pop()
nextTask := lm.taskqueue.Pop()
if nextTask == nil {
// No tasks in the list?
// Wait until there are!
......@@ -124,11 +124,11 @@ func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) er
for _, e := range m.Wantlist() {
if e.Cancel {
l.CancelWant(e.Key)
lm.tasklist.Cancel(e.Key, p)
lm.taskqueue.Cancel(e.Key, p)
} else {
l.Wants(e.Key, e.Priority)
newWorkExists = true
lm.tasklist.Push(e.Key, e.Priority, p)
lm.taskqueue.Push(e.Key, e.Priority, p)
}
}
......@@ -138,7 +138,7 @@ func (lm *LedgerManager) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) er
for _, l := range lm.ledgerMap {
if l.WantListContains(block.Key()) {
newWorkExists = true
lm.tasklist.Push(block.Key(), 1, l.Partner)
lm.taskqueue.Push(block.Key(), 1, l.Partner)
}
}
}
......@@ -159,7 +159,7 @@ func (lm *LedgerManager) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error
for _, block := range m.Blocks() {
l.SentBytes(len(block.Data))
l.wantList.Remove(block.Key())
lm.tasklist.Cancel(block.Key(), p)
lm.taskqueue.Cancel(block.Key(), p)
}
return nil
......
......@@ -8,13 +8,13 @@ import (
// TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
type taskList struct {
type taskQueue struct {
tasks []*Task
taskmap map[string]*Task
}
func newTaskList() *taskList {
return &taskList{
func newTaskQueue() *taskQueue {
return &taskQueue{
taskmap: make(map[string]*Task),
}
}
......@@ -27,7 +27,7 @@ type Task struct {
// Push currently adds a new task to the end of the list
// TODO: make this into a priority queue
func (tl *taskList) Push(block u.Key, priority int, to peer.Peer) {
func (tl *taskQueue) Push(block u.Key, priority int, to peer.Peer) {
if task, ok := tl.taskmap[taskKey(to, block)]; ok {
// TODO: when priority queue is implemented,
// rearrange this Task
......@@ -44,7 +44,7 @@ func (tl *taskList) Push(block u.Key, priority int, to peer.Peer) {
}
// Pop 'pops' the next task to be performed. Returns nil no task exists.
func (tl *taskList) Pop() *Task {
func (tl *taskQueue) Pop() *Task {
var out *Task
for len(tl.tasks) > 0 {
// TODO: instead of zero, use exponential distribution
......@@ -63,7 +63,7 @@ func (tl *taskList) Pop() *Task {
}
// Cancel lazily cancels the sending of a block to a given peer
func (tl *taskList) Cancel(k u.Key, p peer.Peer) {
func (tl *taskQueue) Cancel(k u.Key, p peer.Peer) {
t, ok := tl.taskmap[taskKey(p, k)]
if ok {
t.theirPriority = -1
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment