diag.go 8.17 KB
Newer Older
1 2 3
// package diagnostics implements a network diagnostics service that
// allows a request to traverse the network and gather information
// on every node connected to it.
4
package diagnostics
Jeromy's avatar
Jeromy committed
5 6

import (
7
	"crypto/rand"
Jeromy's avatar
Jeromy committed
8 9
	"encoding/json"
	"errors"
10
	"fmt"
Jeromy's avatar
Jeromy committed
11 12 13
	"sync"
	"time"

14
	context "context"
15
	pb "github.com/ipfs/go-ipfs/diagnostics/pb"
Jeromy's avatar
Jeromy committed
16
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
17
	ctxio "gx/ipfs/QmTKsRYeY4simJyf37K93juSq75Lo8MVCDJ7owjmf46u8W/go-context/io"
Jeromy's avatar
Jeromy committed
18 19
	ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
	proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
20 21 22 23
	protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
	host "gx/ipfs/QmdML3R42PRSwnt46jSuEts9bHSqLctVYEjJqMR3UYV8ki/go-libp2p-host"
	inet "gx/ipfs/QmdXimY9QHaasZmw6hWojWnCJvfgxETjZQfg9g6ZrA9wMX/go-libp2p-net"
	peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
24 25
)

Jeromy's avatar
Jeromy committed
26
var log = logging.Logger("diagnostics")
Jeromy's avatar
Jeromy committed
27

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
// ProtocolDiag is the diagnostics protocol.ID
29 30
var ProtocolDiag protocol.ID = "/ipfs/diag/net/1.0.0"
var ProtocolDiagOld protocol.ID = "/ipfs/diagnostics"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31

32 33
var ErrAlreadyRunning = errors.New("diagnostic with that ID already running")

34
const ResponseTimeout = time.Second * 10
35
const HopTimeoutDecrement = time.Second * 2
36

Jeromy's avatar
Jeromy committed
37 38
// Diagnostics is a net service that manages requesting and responding to diagnostic
// requests
Jeromy's avatar
Jeromy committed
39
type Diagnostics struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40 41
	host host.Host
	self peer.ID
Jeromy's avatar
Jeromy committed
42 43 44 45 46 47

	diagLock sync.Mutex
	diagMap  map[string]time.Time
	birth    time.Time
}

Jeromy's avatar
Jeromy committed
48
// NewDiagnostics instantiates a new diagnostics service running on the given network
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49
func NewDiagnostics(self peer.ID, h host.Host) *Diagnostics {
50
	d := &Diagnostics{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51
		host:    h,
Jeromy's avatar
Jeromy committed
52 53
		self:    self,
		birth:   time.Now(),
54
		diagMap: make(map[string]time.Time),
Jeromy's avatar
Jeromy committed
55
	}
56

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57
	h.SetStreamHandler(ProtocolDiag, d.handleNewStream)
58
	h.SetStreamHandler(ProtocolDiagOld, d.handleNewStream)
59
	return d
Jeromy's avatar
Jeromy committed
60 61 62 63 64
}

type connDiagInfo struct {
	Latency time.Duration
	ID      string
65
	Count   int
Jeromy's avatar
Jeromy committed
66 67
}

68
type DiagInfo struct {
Jeromy's avatar
Jeromy committed
69 70 71 72
	// This nodes ID
	ID string

	// A list of peers this node currently has open connections to
Jeromy's avatar
Jeromy committed
73
	Connections []connDiagInfo
Jeromy's avatar
Jeromy committed
74 75 76 77 78 79

	// A list of keys provided by this node
	//    (currently not filled)
	Keys []string

	// How long this node has been running for
80
	// TODO rename Uptime
Jeromy's avatar
Jeromy committed
81 82 83 84 85 86 87 88 89
	LifeSpan time.Duration

	// Incoming Bandwidth Usage
	BwIn uint64

	// Outgoing Bandwidth Usage
	BwOut uint64

	// Information about the version of code this node is running
Jeromy's avatar
Jeromy committed
90 91 92
	CodeVersion string
}

Jeromy's avatar
Jeromy committed
93
// Marshal to json
94
func (di *DiagInfo) Marshal() []byte {
Jeromy's avatar
Jeromy committed
95 96 97 98 99 100 101 102
	b, err := json.Marshal(di)
	if err != nil {
		panic(err)
	}
	//TODO: also consider compressing this. There will be a lot of these
	return b
}

103 104 105 106
func (d *Diagnostics) getPeers() map[peer.ID]int {
	counts := make(map[peer.ID]int)
	for _, p := range d.host.Network().Peers() {
		counts[p]++
107
	}
108

109
	return counts
Jeromy's avatar
Jeromy committed
110 111
}

112 113
func (d *Diagnostics) getDiagInfo() *DiagInfo {
	di := new(DiagInfo)
114
	di.CodeVersion = "github.com/ipfs/go-ipfs"
115
	di.ID = d.self.Pretty()
Jeromy's avatar
Jeromy committed
116 117
	di.LifeSpan = time.Since(d.birth)
	di.Keys = nil // Currently no way to query datastore
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118 119

	// di.BwIn, di.BwOut = d.host.BandwidthTotals() //TODO fix this.
Jeromy's avatar
Jeromy committed
120

121 122 123 124 125 126
	for p, n := range d.getPeers() {
		d := connDiagInfo{
			Latency: d.host.Peerstore().LatencyEWMA(p),
			ID:      p.Pretty(),
			Count:   n,
		}
127
		di.Connections = append(di.Connections, d)
Jeromy's avatar
Jeromy committed
128 129 130 131 132
	}
	return di
}

func newID() string {
133
	id := make([]byte, 16)
Jeromy's avatar
Jeromy committed
134 135 136 137
	rand.Read(id)
	return string(id)
}

Jeromy's avatar
Jeromy committed
138
// GetDiagnostic runs a diagnostics request across the entire network
139
func (d *Diagnostics) GetDiagnostic(ctx context.Context, timeout time.Duration) ([]*DiagInfo, error) {
140
	log.Debug("getting diagnostic")
141
	ctx, cancel := context.WithTimeout(ctx, timeout)
142
	defer cancel()
Jeromy's avatar
Jeromy committed
143 144 145 146 147 148

	diagID := newID()
	d.diagLock.Lock()
	d.diagMap[diagID] = time.Now()
	d.diagLock.Unlock()

149
	log.Debug("begin diagnostic")
Jeromy's avatar
Jeromy committed
150 151

	peers := d.getPeers()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152
	log.Debugf("Sending diagnostic request to %d peers.", len(peers))
Jeromy's avatar
Jeromy committed
153 154

	pmes := newMessage(diagID)
155

156 157 158 159
	pmes.SetTimeoutDuration(timeout - HopTimeoutDecrement) // decrease timeout per hop
	dpeers, err := d.getDiagnosticFromPeers(ctx, d.getPeers(), pmes)
	if err != nil {
		return nil, fmt.Errorf("diagnostic from peers err: %s", err)
160 161
	}

162
	di := d.getDiagInfo()
163 164 165
	out := []*DiagInfo{di}
	for dpi := range dpeers {
		out = append(out, dpi)
Jeromy's avatar
Jeromy committed
166 167 168 169
	}
	return out, nil
}

170 171 172 173 174
func decodeDiagJson(data []byte) (*DiagInfo, error) {
	di := new(DiagInfo)
	err := json.Unmarshal(data, di)
	if err != nil {
		return nil, err
175 176
	}

177 178
	return di, nil
}
179

180 181 182
func (d *Diagnostics) getDiagnosticFromPeers(ctx context.Context, peers map[peer.ID]int, pmes *pb.Message) (<-chan *DiagInfo, error) {
	respdata := make(chan *DiagInfo)
	wg := sync.WaitGroup{}
rht's avatar
rht committed
183
	for p := range peers {
184
		wg.Add(1)
185 186
		log.Debugf("Sending diagnostic request to peer: %s", p)
		go func(p peer.ID) {
187
			defer wg.Done()
188 189
			out, err := d.getDiagnosticFromPeer(ctx, p, pmes)
			if err != nil {
190
				log.Debugf("Error getting diagnostic from %s: %s", p, err)
191 192
				return
			}
193
			for d := range out {
194 195 196 197 198
				select {
				case respdata <- d:
				case <-ctx.Done():
					return
				}
199
			}
200 201 202
		}(p)
	}

203 204 205 206
	go func() {
		wg.Wait()
		close(respdata)
	}()
Jeromy's avatar
Jeromy committed
207

208
	return respdata, nil
Jeromy's avatar
Jeromy committed
209 210
}

211
func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (<-chan *DiagInfo, error) {
212
	s, err := d.host.NewStream(ctx, p, ProtocolDiag, ProtocolDiagOld)
Jeromy's avatar
Jeromy committed
213 214 215
	if err != nil {
		return nil, err
	}
216

217 218
	cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
	cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
219 220
	r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
	w := ggio.NewDelimitedWriter(cw)
Jeromy's avatar
Jeromy committed
221 222 223

	start := time.Now()

224 225 226 227
	if err := w.WriteMsg(pmes); err != nil {
		return nil, err
	}

228 229
	out := make(chan *DiagInfo)
	go func() {
Jeromy's avatar
Jeromy committed
230

231 232 233 234 235 236
		defer func() {
			close(out)
			s.Close()
			rtt := time.Since(start)
			log.Infof("diagnostic request took: %s", rtt.String())
		}()
Jeromy's avatar
Jeromy committed
237

238 239 240
		for {
			rpmes := new(pb.Message)
			if err := r.ReadMsg(rpmes); err != nil {
241
				log.Debugf("Error reading diagnostic from stream: %s", err)
242 243 244
				return
			}
			if rpmes == nil {
245
				log.Debug("got no response back from diag request")
246 247
				return
			}
Jeromy's avatar
Jeromy committed
248

249 250
			di, err := decodeDiagJson(rpmes.GetData())
			if err != nil {
251
				log.Debug(err)
252 253
				return
			}
Jeromy's avatar
Jeromy committed
254

255 256 257 258 259
			select {
			case out <- di:
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
260 261
		}

262 263 264 265 266 267 268 269 270
	}()

	return out, nil
}

func newMessage(diagID string) *pb.Message {
	pmes := new(pb.Message)
	pmes.DiagID = proto.String(diagID)
	return pmes
Jeromy's avatar
Jeromy committed
271 272
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
273
func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error {
Jeromy's avatar
Jeromy committed
274

275 276
	cr := ctxio.NewReader(ctx, s)
	cw := ctxio.NewWriter(ctx, s)
277 278
	r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) // maxsize
	w := ggio.NewDelimitedWriter(cw)
Jeromy's avatar
Jeromy committed
279 280

	// deserialize msg
281
	pmes := new(pb.Message)
282
	if err := r.ReadMsg(pmes); err != nil {
283
		log.Debugf("Failed to decode protobuf message: %v", err)
Jeromy's avatar
Jeromy committed
284 285 286 287
		return nil
	}

	// Print out diagnostic
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
288
	log.Infof("[peer: %s] Got message from [%s]\n",
289
		d.self.Pretty(), s.Conn().RemotePeer())
Jeromy's avatar
Jeromy committed
290

291 292
	// Make sure we havent already handled this request to prevent loops
	if err := d.startDiag(pmes.GetDiagID()); err != nil {
Jeromy's avatar
Jeromy committed
293 294 295
		return nil
	}

296 297 298
	resp := newMessage(pmes.GetDiagID())
	resp.Data = d.getDiagInfo().Marshal()
	if err := w.WriteMsg(resp); err != nil {
299
		log.Debugf("Failed to write protobuf message over stream: %s", err)
300
		return err
Jeromy's avatar
Jeromy committed
301 302
	}

303 304 305 306
	timeout := pmes.GetTimeoutDuration()
	if timeout < HopTimeoutDecrement {
		return fmt.Errorf("timeout too short: %s", timeout)
	}
rht's avatar
rht committed
307 308
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
309 310 311 312
	pmes.SetTimeoutDuration(timeout - HopTimeoutDecrement)

	dpeers, err := d.getDiagnosticFromPeers(ctx, d.getPeers(), pmes)
	if err != nil {
313
		log.Debugf("diagnostic from peers err: %s", err)
314
		return err
Jeromy's avatar
Jeromy committed
315
	}
316 317 318 319
	for b := range dpeers {
		resp := newMessage(pmes.GetDiagID())
		resp.Data = b.Marshal()
		if err := w.WriteMsg(resp); err != nil {
320
			log.Debugf("Failed to write protobuf message over stream: %s", err)
321 322 323 324 325 326
			return err
		}
	}

	return nil
}
Jeromy's avatar
Jeromy committed
327

328 329 330 331 332 333 334 335 336
func (d *Diagnostics) startDiag(id string) error {
	d.diagLock.Lock()
	_, found := d.diagMap[id]
	if found {
		d.diagLock.Unlock()
		return ErrAlreadyRunning
	}
	d.diagMap[id] = time.Now()
	d.diagLock.Unlock()
337 338 339
	return nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
340 341 342
func (d *Diagnostics) handleNewStream(s inet.Stream) {
	d.HandleMessage(context.Background(), s)
	s.Close()
Jeromy's avatar
Jeromy committed
343
}