Commit b3712aae authored by Brian Tiger Chow's avatar Brian Tiger Chow Committed by Juan Batiz-Benet

rename to peerRequestQueue

this opens up the possibility of having multiple queues. And for all
outgoing messages to be managed by the decision engine

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent 0426b97e
...@@ -22,9 +22,10 @@ type Envelope struct { ...@@ -22,9 +22,10 @@ type Envelope struct {
} }
type Engine struct { type Engine struct {
// FIXME taskqueue isn't threadsafe nor is it protected by a mutex. consider // FIXME peerRequestQueue isn't threadsafe nor is it protected by a mutex.
// a way to avoid sharing the taskqueue between the worker and the receiver // consider a way to avoid sharing the peerRequestQueue between the worker
taskqueue *taskQueue // and the receiver
peerRequestQueue *taskQueue
workSignal chan struct{} workSignal chan struct{}
...@@ -41,7 +42,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { ...@@ -41,7 +42,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
e := &Engine{ e := &Engine{
ledgerMap: make(map[u.Key]*ledger), ledgerMap: make(map[u.Key]*ledger),
bs: bs, bs: bs,
taskqueue: newTaskQueue(), peerRequestQueue: newTaskQueue(),
outbox: make(chan Envelope, 4), // TODO extract constant outbox: make(chan Envelope, 4), // TODO extract constant
workSignal: make(chan struct{}), workSignal: make(chan struct{}),
} }
...@@ -51,7 +52,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { ...@@ -51,7 +52,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
func (e *Engine) taskWorker(ctx context.Context) { func (e *Engine) taskWorker(ctx context.Context) {
for { for {
nextTask := e.taskqueue.Pop() nextTask := e.peerRequestQueue.Pop()
if nextTask == nil { if nextTask == nil {
// No tasks in the list? // No tasks in the list?
// Wait until there are! // Wait until there are!
...@@ -128,11 +129,11 @@ func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { ...@@ -128,11 +129,11 @@ func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
for _, entry := range m.Wantlist() { for _, entry := range m.Wantlist() {
if entry.Cancel { if entry.Cancel {
l.CancelWant(entry.Key) l.CancelWant(entry.Key)
e.taskqueue.Remove(entry.Key, p) e.peerRequestQueue.Remove(entry.Key, p)
} else { } else {
l.Wants(entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority)
newWorkExists = true newWorkExists = true
e.taskqueue.Push(entry.Key, entry.Priority, p) e.peerRequestQueue.Push(entry.Key, entry.Priority, p)
} }
} }
...@@ -142,7 +143,7 @@ func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { ...@@ -142,7 +143,7 @@ func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error {
for _, l := range e.ledgerMap { for _, l := range e.ledgerMap {
if l.WantListContains(block.Key()) { if l.WantListContains(block.Key()) {
newWorkExists = true newWorkExists = true
e.taskqueue.Push(block.Key(), 1, l.Partner) e.peerRequestQueue.Push(block.Key(), 1, l.Partner)
} }
} }
} }
...@@ -163,7 +164,7 @@ func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error { ...@@ -163,7 +164,7 @@ func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error {
for _, block := range m.Blocks() { for _, block := range m.Blocks() {
l.SentBytes(len(block.Data)) l.SentBytes(len(block.Data))
l.wantList.Remove(block.Key()) l.wantList.Remove(block.Key())
e.taskqueue.Remove(block.Key(), p) e.peerRequestQueue.Remove(block.Key(), p)
} }
return nil return nil
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment