Commit 2c313a81 authored by Adin Schmahmann's avatar Adin Schmahmann

refactor: rename messageSender to peerMessageSender and added comments to...

refactor: rename messageSender to peerMessageSender and added comments to clarify what peerMessageSender and messageManager do
parent 138cb80a
...@@ -190,7 +190,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) ...@@ -190,7 +190,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
dht.Validator = cfg.validator dht.Validator = cfg.validator
dht.messageMgr = &messageManager{ dht.messageMgr = &messageManager{
host: h, host: h,
strmap: make(map[peer.ID]*messageSender), strmap: make(map[peer.ID]*peerMessageSender),
protocols: dht.protocols, protocols: dht.protocols,
} }
dht.protoMessenger, err = pb.NewProtocolMessenger(dht.messageMgr, pb.WithValidator(dht.Validator)) dht.protoMessenger, err = pb.NewProtocolMessenger(dht.messageMgr, pb.WithValidator(dht.Validator))
......
...@@ -19,10 +19,12 @@ import ( ...@@ -19,10 +19,12 @@ import (
"go.opencensus.io/tag" "go.opencensus.io/tag"
) )
// messageManager is responsible for sending requests and messages to peers efficiently, including reuse of streams.
// It also tracks metrics for sent requests and messages.
type messageManager struct { type messageManager struct {
host host.Host // the network services we need host host.Host // the network services we need
smlk sync.Mutex smlk sync.Mutex
strmap map[peer.ID]*messageSender strmap map[peer.ID]*peerMessageSender
protocols []protocol.ID protocols []protocol.ID
} }
...@@ -111,14 +113,14 @@ func (m *messageManager) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Me ...@@ -111,14 +113,14 @@ func (m *messageManager) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Me
return nil return nil
} }
func (m *messageManager) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) { func (m *messageManager) messageSenderForPeer(ctx context.Context, p peer.ID) (*peerMessageSender, error) {
m.smlk.Lock() m.smlk.Lock()
ms, ok := m.strmap[p] ms, ok := m.strmap[p]
if ok { if ok {
m.smlk.Unlock() m.smlk.Unlock()
return ms, nil return ms, nil
} }
ms = &messageSender{p: p, m: m, lk: newCtxMutex()} ms = &peerMessageSender{p: p, m: m, lk: newCtxMutex()}
m.strmap[p] = ms m.strmap[p] = ms
m.smlk.Unlock() m.smlk.Unlock()
...@@ -143,7 +145,8 @@ func (m *messageManager) messageSenderForPeer(ctx context.Context, p peer.ID) (* ...@@ -143,7 +145,8 @@ func (m *messageManager) messageSenderForPeer(ctx context.Context, p peer.ID) (*
return ms, nil return ms, nil
} }
type messageSender struct { // peerMessageSender is responsible for sending requests and messages to a particular peer
type peerMessageSender struct {
s network.Stream s network.Stream
r msgio.ReadCloser r msgio.ReadCloser
lk ctxMutex lk ctxMutex
...@@ -154,10 +157,10 @@ type messageSender struct { ...@@ -154,10 +157,10 @@ type messageSender struct {
singleMes int singleMes int
} }
// invalidate is called before this messageSender is removed from the strmap. // invalidate is called before this peerMessageSender is removed from the strmap.
// It prevents the messageSender from being reused/reinitialized and then // It prevents the peerMessageSender from being reused/reinitialized and then
// forgotten (leaving the stream open). // forgotten (leaving the stream open).
func (ms *messageSender) invalidate() { func (ms *peerMessageSender) invalidate() {
ms.invalid = true ms.invalid = true
if ms.s != nil { if ms.s != nil {
_ = ms.s.Reset() _ = ms.s.Reset()
...@@ -165,7 +168,7 @@ func (ms *messageSender) invalidate() { ...@@ -165,7 +168,7 @@ func (ms *messageSender) invalidate() {
} }
} }
func (ms *messageSender) prepOrInvalidate(ctx context.Context) error { func (ms *peerMessageSender) prepOrInvalidate(ctx context.Context) error {
if err := ms.lk.Lock(ctx); err != nil { if err := ms.lk.Lock(ctx); err != nil {
return err return err
} }
...@@ -178,7 +181,7 @@ func (ms *messageSender) prepOrInvalidate(ctx context.Context) error { ...@@ -178,7 +181,7 @@ func (ms *messageSender) prepOrInvalidate(ctx context.Context) error {
return nil return nil
} }
func (ms *messageSender) prep(ctx context.Context) error { func (ms *peerMessageSender) prep(ctx context.Context) error {
if ms.invalid { if ms.invalid {
return fmt.Errorf("message sender has been invalidated") return fmt.Errorf("message sender has been invalidated")
} }
...@@ -205,7 +208,7 @@ func (ms *messageSender) prep(ctx context.Context) error { ...@@ -205,7 +208,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
// behaviour. // behaviour.
const streamReuseTries = 3 const streamReuseTries = 3
func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error { func (ms *peerMessageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
if err := ms.lk.Lock(ctx); err != nil { if err := ms.lk.Lock(ctx); err != nil {
return err return err
} }
...@@ -242,7 +245,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro ...@@ -242,7 +245,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
} }
} }
func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) { func (ms *peerMessageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
if err := ms.lk.Lock(ctx); err != nil { if err := ms.lk.Lock(ctx); err != nil {
return nil, err return nil, err
} }
...@@ -293,11 +296,11 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb ...@@ -293,11 +296,11 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
} }
} }
func (ms *messageSender) writeMsg(pmes *pb.Message) error { func (ms *peerMessageSender) writeMsg(pmes *pb.Message) error {
return writeMsg(ms.s, pmes) return writeMsg(ms.s, pmes)
} }
func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error { func (ms *peerMessageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error {
errc := make(chan error, 1) errc := make(chan error, 1)
go func(r msgio.ReadCloser) { go func(r msgio.ReadCloser) {
defer close(errc) defer close(errc)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment