messagequeue.go 6.56 KB
Newer Older
1 2 3 4
package messagequeue

import (
	"context"
5
	"sync"
6 7 8 9 10 11
	"time"

	bsmsg "github.com/ipfs/go-bitswap/message"
	bsnet "github.com/ipfs/go-bitswap/network"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
12
	peer "github.com/libp2p/go-libp2p-core/peer"
13 14 15 16
)

var log = logging.Logger("bitswap")

17 18 19 20
const (
	defaultRebroadcastInterval = 30 * time.Second
	maxRetries                 = 10
)
21

22
// MessageNetwork is any network that can connect peers and generate a message
23
// sender.
24 25 26 27 28
type MessageNetwork interface {
	ConnectTo(context.Context, peer.ID) error
	NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
}

29
// MessageQueue implements queue of want messages to send to peers.
30
type MessageQueue struct {
31 32
	ctx     context.Context
	p       peer.ID
33
	network MessageNetwork
34

35 36
	outgoingWork chan struct{}
	done         chan struct{}
37 38

	// do not touch out of run loop
39 40 41 42 43 44 45
	wl                    *wantlist.SessionTrackedWantlist
	nextMessage           bsmsg.BitSwapMessage
	nextMessageLk         sync.RWMutex
	sender                bsnet.MessageSender
	rebroadcastIntervalLk sync.RWMutex
	rebroadcastInterval   time.Duration
	rebroadcastTimer      *time.Timer
46 47
}

48
// New creats a new MessageQueue.
49
func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
50
	return &MessageQueue{
51 52 53 54 55 56 57
		ctx:                 ctx,
		wl:                  wantlist.NewSessionTrackedWantlist(),
		network:             network,
		p:                   p,
		outgoingWork:        make(chan struct{}, 1),
		done:                make(chan struct{}),
		rebroadcastInterval: defaultRebroadcastInterval,
58 59 60
	}
}

61
// AddMessage adds new entries to an outgoing message for a given session.
62
func (mq *MessageQueue) AddMessage(entries []bsmsg.Entry, ses uint64) {
63 64 65
	if !mq.addEntries(entries, ses) {
		return
	}
66
	select {
67 68
	case mq.outgoingWork <- struct{}{}:
	default:
69 70 71
	}
}

72
// AddWantlist adds a complete session tracked want list to a message queue
73
func (mq *MessageQueue) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) {
74
	initialWants.CopyWants(mq.wl)
75 76 77 78 79 80 81
	mq.addWantlist()
}

// SetRebroadcastInterval sets a new interval on which to rebroadcast the full wantlist
func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {
	mq.rebroadcastIntervalLk.Lock()
	mq.rebroadcastInterval = delay
82 83 84
	if mq.rebroadcastTimer != nil {
		mq.rebroadcastTimer.Reset(delay)
	}
85
	mq.rebroadcastIntervalLk.Unlock()
86
}
87

88 89
// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
90
func (mq *MessageQueue) Startup() {
91 92 93
	mq.rebroadcastIntervalLk.RLock()
	mq.rebroadcastTimer = time.NewTimer(mq.rebroadcastInterval)
	mq.rebroadcastIntervalLk.RUnlock()
94
	go mq.runQueue()
95 96
}

97
// Shutdown stops the processing of messages for a message queue.
98 99 100
func (mq *MessageQueue) Shutdown() {
	close(mq.done)
}
101

102
func (mq *MessageQueue) runQueue() {
103 104
	for {
		select {
105 106
		case <-mq.rebroadcastTimer.C:
			mq.rebroadcastWantlist()
107 108
		case <-mq.outgoingWork:
			mq.sendMessage()
109 110 111 112 113
		case <-mq.done:
			if mq.sender != nil {
				mq.sender.Close()
			}
			return
114
		case <-mq.ctx.Done():
115 116 117 118 119 120 121 122
			if mq.sender != nil {
				mq.sender.Reset()
			}
			return
		}
	}
}

123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
func (mq *MessageQueue) addWantlist() {

	mq.nextMessageLk.Lock()
	defer mq.nextMessageLk.Unlock()

	if mq.wl.Len() > 0 {
		if mq.nextMessage == nil {
			mq.nextMessage = bsmsg.New(false)
		}
		for _, e := range mq.wl.Entries() {
			mq.nextMessage.AddEntry(e.Cid, e.Priority)
		}
		select {
		case mq.outgoingWork <- struct{}{}:
		default:
		}
	}
}

func (mq *MessageQueue) rebroadcastWantlist() {
	mq.rebroadcastIntervalLk.RLock()
	mq.rebroadcastTimer.Reset(mq.rebroadcastInterval)
	mq.rebroadcastIntervalLk.RUnlock()

	mq.addWantlist()
}

150 151 152 153 154 155 156
func (mq *MessageQueue) addEntries(entries []bsmsg.Entry, ses uint64) bool {
	var work bool
	mq.nextMessageLk.Lock()
	defer mq.nextMessageLk.Unlock()
	// if we have no message held allocate a new one
	if mq.nextMessage == nil {
		mq.nextMessage = bsmsg.New(false)
157 158 159 160 161
	}

	for _, e := range entries {
		if e.Cancel {
			if mq.wl.Remove(e.Cid, ses) {
162
				work = true
163
				mq.nextMessage.Cancel(e.Cid)
164 165 166
			}
		} else {
			if mq.wl.Add(e.Cid, e.Priority, ses) {
167
				work = true
168
				mq.nextMessage.AddEntry(e.Cid, e.Priority)
169 170 171
			}
		}
	}
172
	return work
173 174
}

175 176 177 178 179 180 181
func (mq *MessageQueue) extractOutgoingMessage() bsmsg.BitSwapMessage {
	// grab outgoing message
	mq.nextMessageLk.Lock()
	message := mq.nextMessage
	mq.nextMessage = nil
	mq.nextMessageLk.Unlock()
	return message
182 183
}

184 185 186 187 188
func (mq *MessageQueue) sendMessage() {
	message := mq.extractOutgoingMessage()
	if message == nil || message.Empty() {
		return
	}
189

190
	err := mq.initializeSender()
191 192 193 194
	if err != nil {
		log.Infof("cant open message sender to peer %s: %s", mq.p, err)
		// TODO: cant connect, what now?
		return
195 196
	}

197
	for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
198
		if mq.attemptSendAndRecovery(message) {
199 200
			return
		}
201 202
	}
}
203

204
func (mq *MessageQueue) initializeSender() error {
205 206 207
	if mq.sender != nil {
		return nil
	}
208
	nsender, err := openSender(mq.ctx, mq.network, mq.p)
209 210 211 212 213 214
	if err != nil {
		return err
	}
	mq.sender = nsender
	return nil
}
215

216 217
func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) bool {
	err := mq.sender.SendMsg(mq.ctx, message)
218 219 220
	if err == nil {
		return true
	}
221

222 223 224 225 226 227 228
	log.Infof("bitswap send error: %s", err)
	mq.sender.Reset()
	mq.sender = nil

	select {
	case <-mq.done:
		return true
229
	case <-mq.ctx.Done():
230 231 232 233 234
		return true
	case <-time.After(time.Millisecond * 100):
		// wait 100ms in case disconnect notifications are still propogating
		log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
	}
235

236
	err = mq.initializeSender()
237 238 239 240 241 242 243
	if err != nil {
		log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
		// TODO(why): what do we do now?
		// I think the *right* answer is to probably put the message we're
		// trying to send back, and then return to waiting for new work or
		// a disconnect.
		return true
244
	}
245 246 247 248 249 250 251 252 253 254 255 256

	// TODO: Is this the same instance for the remote peer?
	// If its not, we should resend our entire wantlist to them
	/*
		if mq.sender.InstanceID() != mq.lastSeenInstanceID {
			wlm = mq.getFullWantlistMessage()
		}
	*/
	return false
}

func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
257 258 259 260 261
	// allow ten minutes for connections this includes looking them up in the
	// dht dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

262
	err := network.ConnectTo(conctx, p)
263
	if err != nil {
264
		return nil, err
265 266
	}

267
	nsender, err := network.NewMessageSender(ctx, p)
268
	if err != nil {
269
		return nil, err
270 271
	}

272
	return nsender, nil
273
}