messagequeue.go 6.05 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
package messagequeue

import (
	"context"
	"time"

	bsmsg "github.com/ipfs/go-bitswap/message"
	bsnet "github.com/ipfs/go-bitswap/network"
	wantlist "github.com/ipfs/go-bitswap/wantlist"
	logging "github.com/ipfs/go-log"
	peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

16 17
const maxRetries = 10

18
// MessageNetwork is any network that can connect peers and generate a message
19
// sender.
20 21 22 23 24
type MessageNetwork interface {
	ConnectTo(context.Context, peer.ID) error
	NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
}

25 26 27 28
type request interface {
	handle(mq *MessageQueue)
}

29
// MessageQueue implements queue of want messages to send to peers.
30
type MessageQueue struct {
31 32
	ctx     context.Context
	p       peer.ID
33
	network MessageNetwork
34

35 36 37 38 39 40 41 42 43 44 45
	newRequests      chan request
	outgoingMessages chan bsmsg.BitSwapMessage
	done             chan struct{}

	// do not touch out of run loop
	wl          *wantlist.SessionTrackedWantlist
	nextMessage bsmsg.BitSwapMessage
	sender      bsnet.MessageSender
}

type messageRequest struct {
46
	entries []bsmsg.Entry
47 48
	ses     uint64
}
49

50 51
type wantlistRequest struct {
	wl *wantlist.SessionTrackedWantlist
52 53
}

54
// New creats a new MessageQueue.
55
func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
56
	return &MessageQueue{
57 58 59 60 61 62 63
		ctx:              ctx,
		wl:               wantlist.NewSessionTrackedWantlist(),
		network:          network,
		p:                p,
		newRequests:      make(chan request, 16),
		outgoingMessages: make(chan bsmsg.BitSwapMessage),
		done:             make(chan struct{}),
64 65 66
	}
}

67
// AddMessage adds new entries to an outgoing message for a given session.
68
func (mq *MessageQueue) AddMessage(entries []bsmsg.Entry, ses uint64) {
69
	select {
70 71
	case mq.newRequests <- &messageRequest{entries, ses}:
	case <-mq.ctx.Done():
72 73 74
	}
}

75
// AddWantlist adds a complete session tracked want list to a message queue
76 77 78
func (mq *MessageQueue) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) {
	wl := wantlist.NewSessionTrackedWantlist()
	initialWants.CopyWants(wl)
79

80 81 82
	select {
	case mq.newRequests <- &wantlistRequest{wl}:
	case <-mq.ctx.Done():
83
	}
84
}
85

86 87
// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
88 89 90
func (mq *MessageQueue) Startup() {
	go mq.runQueue()
	go mq.sendMessages()
91 92
}

93
// Shutdown stops the processing of messages for a message queue.
94 95 96
func (mq *MessageQueue) Shutdown() {
	close(mq.done)
}
97

98 99 100 101 102 103 104 105
func (mq *MessageQueue) runQueue() {
	outgoingMessages := func() chan bsmsg.BitSwapMessage {
		if mq.nextMessage == nil {
			return nil
		}
		return mq.outgoingMessages
	}

106 107
	for {
		select {
108 109 110 111
		case newRequest := <-mq.newRequests:
			newRequest.handle(mq)
		case outgoingMessages() <- mq.nextMessage:
			mq.nextMessage = nil
112 113 114 115 116
		case <-mq.done:
			if mq.sender != nil {
				mq.sender.Close()
			}
			return
117
		case <-mq.ctx.Done():
118 119 120 121 122 123 124 125
			if mq.sender != nil {
				mq.sender.Reset()
			}
			return
		}
	}
}

126 127 128 129 130 131 132 133 134 135 136 137 138 139
func (mr *messageRequest) handle(mq *MessageQueue) {
	mq.addEntries(mr.entries, mr.ses)
}

func (wr *wantlistRequest) handle(mq *MessageQueue) {
	initialWants := wr.wl
	initialWants.CopyWants(mq.wl)
	if initialWants.Len() > 0 {
		if mq.nextMessage == nil {
			mq.nextMessage = bsmsg.New(false)
		}
		for _, e := range initialWants.Entries() {
			mq.nextMessage.AddEntry(e.Cid, e.Priority)
		}
140
	}
141
}
142

143
func (mq *MessageQueue) addEntries(entries []bsmsg.Entry, ses uint64) {
144 145 146
	for _, e := range entries {
		if e.Cancel {
			if mq.wl.Remove(e.Cid, ses) {
147 148 149 150
				if mq.nextMessage == nil {
					mq.nextMessage = bsmsg.New(false)
				}
				mq.nextMessage.Cancel(e.Cid)
151 152 153
			}
		} else {
			if mq.wl.Add(e.Cid, e.Priority, ses) {
154 155 156 157
				if mq.nextMessage == nil {
					mq.nextMessage = bsmsg.New(false)
				}
				mq.nextMessage.AddEntry(e.Cid, e.Priority)
158 159 160 161 162
			}
		}
	}
}

163 164 165 166 167 168 169 170 171 172
func (mq *MessageQueue) sendMessages() {
	for {
		select {
		case nextMessage := <-mq.outgoingMessages:
			mq.sendMessage(nextMessage)
		case <-mq.done:
			return
		case <-mq.ctx.Done():
			return
		}
173
	}
174 175 176
}

func (mq *MessageQueue) sendMessage(message bsmsg.BitSwapMessage) {
177

178
	err := mq.initializeSender()
179 180 181 182
	if err != nil {
		log.Infof("cant open message sender to peer %s: %s", mq.p, err)
		// TODO: cant connect, what now?
		return
183 184
	}

185
	for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
186
		if mq.attemptSendAndRecovery(message) {
187 188
			return
		}
189 190
	}
}
191

192
func (mq *MessageQueue) initializeSender() error {
193 194 195
	if mq.sender != nil {
		return nil
	}
196
	nsender, err := openSender(mq.ctx, mq.network, mq.p)
197 198 199 200 201 202
	if err != nil {
		return err
	}
	mq.sender = nsender
	return nil
}
203

204 205
func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) bool {
	err := mq.sender.SendMsg(mq.ctx, message)
206 207 208
	if err == nil {
		return true
	}
209

210 211 212 213 214 215 216
	log.Infof("bitswap send error: %s", err)
	mq.sender.Reset()
	mq.sender = nil

	select {
	case <-mq.done:
		return true
217
	case <-mq.ctx.Done():
218 219 220 221 222
		return true
	case <-time.After(time.Millisecond * 100):
		// wait 100ms in case disconnect notifications are still propogating
		log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
	}
223

224
	err = mq.initializeSender()
225 226 227 228 229 230 231
	if err != nil {
		log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
		// TODO(why): what do we do now?
		// I think the *right* answer is to probably put the message we're
		// trying to send back, and then return to waiting for new work or
		// a disconnect.
		return true
232
	}
233 234 235 236 237 238 239 240 241 242 243 244

	// TODO: Is this the same instance for the remote peer?
	// If its not, we should resend our entire wantlist to them
	/*
		if mq.sender.InstanceID() != mq.lastSeenInstanceID {
			wlm = mq.getFullWantlistMessage()
		}
	*/
	return false
}

func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
245 246 247 248 249
	// allow ten minutes for connections this includes looking them up in the
	// dht dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

250
	err := network.ConnectTo(conctx, p)
251
	if err != nil {
252
		return nil, err
253 254
	}

255
	nsender, err := network.NewMessageSender(ctx, p)
256
	if err != nil {
257
		return nil, err
258 259
	}

260
	return nsender, nil
261
}