messagequeue.go 5.68 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
package messagequeue

import (
	"context"
	"sync"
	"time"

	bsmsg "github.com/ipfs/go-bitswap/message"
	bsnet "github.com/ipfs/go-bitswap/network"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	logging "github.com/ipfs/go-log"
	peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

17 18
const maxRetries = 10

19
// MessageNetwork is any network that can connect peers and generate a message
20
// sender.
21 22 23 24 25
type MessageNetwork interface {
	ConnectTo(context.Context, peer.ID) error
	NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
}

26
// MessageQueue implements queue of want messages to send to peers.
27 28 29 30 31
type MessageQueue struct {
	p peer.ID

	outlk   sync.Mutex
	out     bsmsg.BitSwapMessage
32
	network MessageNetwork
33 34 35 36 37 38 39 40 41 42
	wl      *wantlist.ThreadSafe

	sender bsnet.MessageSender

	refcnt int

	work chan struct{}
	done chan struct{}
}

43
// New creats a new MessageQueue.
44
func New(p peer.ID, network MessageNetwork) *MessageQueue {
45 46 47 48 49 50 51 52 53 54
	return &MessageQueue{
		done:    make(chan struct{}),
		work:    make(chan struct{}, 1),
		wl:      wantlist.NewThreadSafe(),
		network: network,
		p:       p,
		refcnt:  1,
	}
}

55
// RefIncrement increments the refcount for a message queue.
56 57 58 59
func (mq *MessageQueue) RefIncrement() {
	mq.refcnt++
}

60
// RefDecrement decrements the refcount for a message queue and returns true
61
// if the refcount is now 0.
62 63 64 65 66
func (mq *MessageQueue) RefDecrement() bool {
	mq.refcnt--
	return mq.refcnt > 0
}

67
// AddMessage adds new entries to an outgoing message for a given session.
68
func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
69 70
	if !mq.addEntries(entries, ses) {
		return
71
	}
72 73 74
	select {
	case mq.work <- struct{}{}:
	default:
75 76 77
	}
}

78
// Startup starts the processing of messages, and creates an initial message
79
// based on the given initial wantlist.
80 81 82
func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {

	// new peer, we will want to give them our full wantlist
83 84 85 86 87 88 89
	if len(initialEntries) > 0 {
		fullwantlist := bsmsg.New(true)
		for _, e := range initialEntries {
			for k := range e.SesTrk {
				mq.wl.AddEntry(e, k)
			}
			fullwantlist.AddEntry(e.Cid, e.Priority)
90
		}
91 92
		mq.out = fullwantlist
		mq.work <- struct{}{}
93 94
	}
	go mq.runQueue(ctx)
95

96 97
}

98
// Shutdown stops the processing of messages for a message queue.
99 100 101
func (mq *MessageQueue) Shutdown() {
	close(mq.done)
}
102

103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
func (mq *MessageQueue) runQueue(ctx context.Context) {
	for {
		select {
		case <-mq.work: // there is work to be done
			mq.doWork(ctx)
		case <-mq.done:
			if mq.sender != nil {
				mq.sender.Close()
			}
			return
		case <-ctx.Done():
			if mq.sender != nil {
				mq.sender.Reset()
			}
			return
		}
	}
}

122 123
func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) bool {
	var work bool
124
	mq.outlk.Lock()
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
	defer mq.outlk.Unlock()
	// if we have no message held allocate a new one
	if mq.out == nil {
		mq.out = bsmsg.New(false)
	}

	// TODO: add a msg.Combine(...) method
	// otherwise, combine the one we are holding with the
	// one passed in
	for _, e := range entries {
		if e.Cancel {
			if mq.wl.Remove(e.Cid, ses) {
				work = true
				mq.out.Cancel(e.Cid)
			}
		} else {
			if mq.wl.Add(e.Cid, e.Priority, ses) {
				work = true
				mq.out.AddEntry(e.Cid, e.Priority)
			}
		}
	}

	return work
}

func (mq *MessageQueue) doWork(ctx context.Context) {

	wlm := mq.extractOutgoingMessage()
154 155 156 157 158
	if wlm == nil || wlm.Empty() {
		return
	}

	// NB: only open a stream if we actually have data to send
159 160 161 162 163
	err := mq.initializeSender(ctx)
	if err != nil {
		log.Infof("cant open message sender to peer %s: %s", mq.p, err)
		// TODO: cant connect, what now?
		return
164 165 166
	}

	// send wantlist updates
167
	for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
168
		if mq.attemptSendAndRecovery(ctx, wlm) {
169 170
			return
		}
171 172
	}
}
173

174 175 176 177 178 179 180 181 182 183 184
func (mq *MessageQueue) initializeSender(ctx context.Context) error {
	if mq.sender != nil {
		return nil
	}
	nsender, err := openSender(ctx, mq.network, mq.p)
	if err != nil {
		return err
	}
	mq.sender = nsender
	return nil
}
185

186 187 188 189 190
func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.BitSwapMessage) bool {
	err := mq.sender.SendMsg(ctx, wlm)
	if err == nil {
		return true
	}
191

192 193 194 195 196 197 198 199 200 201 202 203 204
	log.Infof("bitswap send error: %s", err)
	mq.sender.Reset()
	mq.sender = nil

	select {
	case <-mq.done:
		return true
	case <-ctx.Done():
		return true
	case <-time.After(time.Millisecond * 100):
		// wait 100ms in case disconnect notifications are still propogating
		log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
	}
205

206 207 208 209 210 211 212 213
	err = mq.initializeSender(ctx)
	if err != nil {
		log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
		// TODO(why): what do we do now?
		// I think the *right* answer is to probably put the message we're
		// trying to send back, and then return to waiting for new work or
		// a disconnect.
		return true
214
	}
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232

	// TODO: Is this the same instance for the remote peer?
	// If its not, we should resend our entire wantlist to them
	/*
		if mq.sender.InstanceID() != mq.lastSeenInstanceID {
			wlm = mq.getFullWantlistMessage()
		}
	*/
	return false
}

func (mq *MessageQueue) extractOutgoingMessage() bsmsg.BitSwapMessage {
	// grab outgoing message
	mq.outlk.Lock()
	wlm := mq.out
	mq.out = nil
	mq.outlk.Unlock()
	return wlm
233 234
}

235
func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
236 237 238 239 240
	// allow ten minutes for connections this includes looking them up in the
	// dht dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

241
	err := network.ConnectTo(conctx, p)
242
	if err != nil {
243
		return nil, err
244 245
	}

246
	nsender, err := network.NewMessageSender(ctx, p)
247
	if err != nil {
248
		return nil, err
249 250
	}

251
	return nsender, nil
252
}