Commit 9069a8aa authored by Brian Tiger Chow's avatar Brian Tiger Chow Committed by Juan Batiz-Benet

refactor: change Tasks to Outbox

notice that moving the blockstore fetch into the manager removes the
weird error handling case.

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent f03e629f
...@@ -239,18 +239,8 @@ func (bs *bitswap) taskWorker(ctx context.Context) { ...@@ -239,18 +239,8 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case task := <-bs.ledgermanager.GetTaskChan(): case envelope := <-bs.ledgermanager.Outbox():
block, err := bs.blockstore.Get(task.Key) bs.send(ctx, envelope.Peer, envelope.Message)
if err != nil {
log.Errorf("Expected to have block %s, but it was not found!", task.Key)
continue
}
message := bsmsg.New()
message.AddBlock(block)
// TODO: maybe add keys from our wantlist?
bs.send(ctx, task.Target, message)
} }
} }
} }
......
...@@ -20,12 +20,17 @@ type ledgerMap map[peerKey]*ledger ...@@ -20,12 +20,17 @@ type ledgerMap map[peerKey]*ledger
// FIXME share this externally // FIXME share this externally
type peerKey u.Key type peerKey u.Key
type Envelope struct {
Peer peer.Peer
Message bsmsg.BitSwapMessage
}
type LedgerManager struct { type LedgerManager struct {
lock sync.RWMutex lock sync.RWMutex
ledgerMap ledgerMap ledgerMap ledgerMap
bs bstore.Blockstore bs bstore.Blockstore
tasklist *TaskList tasklist *TaskList
taskOut chan *Task outbox chan Envelope
workSignal chan struct{} workSignal chan struct{}
} }
...@@ -34,7 +39,7 @@ func NewLedgerManager(bs bstore.Blockstore, ctx context.Context) *LedgerManager ...@@ -34,7 +39,7 @@ func NewLedgerManager(bs bstore.Blockstore, ctx context.Context) *LedgerManager
ledgerMap: make(ledgerMap), ledgerMap: make(ledgerMap),
bs: bs, bs: bs,
tasklist: NewTaskList(), tasklist: NewTaskList(),
taskOut: make(chan *Task, 4), outbox: make(chan Envelope, 4), // TODO extract constant
workSignal: make(chan struct{}), workSignal: make(chan struct{}),
} }
go lm.taskWorker(ctx) go lm.taskWorker(ctx)
...@@ -54,17 +59,25 @@ func (lm *LedgerManager) taskWorker(ctx context.Context) { ...@@ -54,17 +59,25 @@ func (lm *LedgerManager) taskWorker(ctx context.Context) {
} }
continue continue
} }
block, err := lm.bs.Get(nextTask.Key)
if err != nil {
continue // TODO maybe return an error
}
// construct message here so we can make decisions about any additional
// information we may want to include at this time.
m := bsmsg.New()
m.AddBlock(block)
// TODO: maybe add keys from our wantlist?
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case lm.taskOut <- nextTask: case lm.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
} }
} }
} }
func (lm *LedgerManager) GetTaskChan() <-chan *Task { func (lm *LedgerManager) Outbox() <-chan Envelope {
return lm.taskOut return lm.outbox
} }
// Returns a slice of Peers with whom the local node has active sessions // Returns a slice of Peers with whom the local node has active sessions
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment