diag.go 7.02 KB
Newer Older
1 2 3
// package diagnostics implements a network diagnostics service that
// allows a request to traverse the network and gather information
// on every node connected to it.
4
package diagnostics
Jeromy's avatar
Jeromy committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18

import (
	"bytes"
	"encoding/json"
	"errors"
	"io"
	"sync"
	"time"

	"crypto/rand"

	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"

19
	pb "github.com/jbenet/go-ipfs/diagnostics/internal/pb"
Jeromy's avatar
Jeromy committed
20 21 22 23 24 25 26 27
	net "github.com/jbenet/go-ipfs/net"
	msg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
	util "github.com/jbenet/go-ipfs/util"
)

var log = util.Logger("diagnostics")

28 29
const ResponseTimeout = time.Second * 10

Jeromy's avatar
Jeromy committed
30 31
// Diagnostics is a net service that manages requesting and responding to diagnostic
// requests
Jeromy's avatar
Jeromy committed
32 33 34
type Diagnostics struct {
	network net.Network
	sender  net.Sender
35
	self    peer.Peer
Jeromy's avatar
Jeromy committed
36 37 38 39 40 41

	diagLock sync.Mutex
	diagMap  map[string]time.Time
	birth    time.Time
}

Jeromy's avatar
Jeromy committed
42
// NewDiagnostics instantiates a new diagnostics service running on the given network
43
func NewDiagnostics(self peer.Peer, inet net.Network, sender net.Sender) *Diagnostics {
Jeromy's avatar
Jeromy committed
44 45 46 47 48 49 50 51 52 53 54 55 56 57
	return &Diagnostics{
		network: inet,
		sender:  sender,
		self:    self,
		diagMap: make(map[string]time.Time),
		birth:   time.Now(),
	}
}

type connDiagInfo struct {
	Latency time.Duration
	ID      string
}

58
type DiagInfo struct {
Jeromy's avatar
Jeromy committed
59 60 61 62
	// This nodes ID
	ID string

	// A list of peers this node currently has open connections to
Jeromy's avatar
Jeromy committed
63
	Connections []connDiagInfo
Jeromy's avatar
Jeromy committed
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78

	// A list of keys provided by this node
	//    (currently not filled)
	Keys []string

	// How long this node has been running for
	LifeSpan time.Duration

	// Incoming Bandwidth Usage
	BwIn uint64

	// Outgoing Bandwidth Usage
	BwOut uint64

	// Information about the version of code this node is running
Jeromy's avatar
Jeromy committed
79 80 81
	CodeVersion string
}

Jeromy's avatar
Jeromy committed
82
// Marshal to json
83
func (di *DiagInfo) Marshal() []byte {
Jeromy's avatar
Jeromy committed
84 85 86 87 88 89 90 91
	b, err := json.Marshal(di)
	if err != nil {
		panic(err)
	}
	//TODO: also consider compressing this. There will be a lot of these
	return b
}

92
func (d *Diagnostics) getPeers() []peer.Peer {
93
	return d.network.GetPeerList()
Jeromy's avatar
Jeromy committed
94 95
}

96 97
func (d *Diagnostics) getDiagInfo() *DiagInfo {
	di := new(DiagInfo)
Jeromy's avatar
Jeromy committed
98
	di.CodeVersion = "github.com/jbenet/go-ipfs"
99
	di.ID = d.self.ID().Pretty()
Jeromy's avatar
Jeromy committed
100 101
	di.LifeSpan = time.Since(d.birth)
	di.Keys = nil // Currently no way to query datastore
102
	di.BwIn, di.BwOut = d.network.GetBandwidthTotals()
Jeromy's avatar
Jeromy committed
103 104

	for _, p := range d.getPeers() {
105 106
		d := connDiagInfo{p.GetLatency(), p.ID().Pretty()}
		di.Connections = append(di.Connections, d)
Jeromy's avatar
Jeromy committed
107 108 109 110 111 112 113 114 115 116
	}
	return di
}

func newID() string {
	id := make([]byte, 4)
	rand.Read(id)
	return string(id)
}

Jeromy's avatar
Jeromy committed
117
// GetDiagnostic runs a diagnostics request across the entire network
118
func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) {
Jeromy's avatar
Jeromy committed
119 120 121 122 123 124 125 126 127 128 129
	log.Debug("Getting diagnostic.")
	ctx, _ := context.WithTimeout(context.TODO(), timeout)

	diagID := newID()
	d.diagLock.Lock()
	d.diagMap[diagID] = time.Now()
	d.diagLock.Unlock()

	log.Debug("Begin Diagnostic")

	peers := d.getPeers()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130
	log.Debugf("Sending diagnostic request to %d peers.", len(peers))
Jeromy's avatar
Jeromy committed
131

132
	var out []*DiagInfo
Jeromy's avatar
Jeromy committed
133 134 135 136
	di := d.getDiagInfo()
	out = append(out, di)

	pmes := newMessage(diagID)
137 138 139

	respdata := make(chan []byte)
	sends := 0
Jeromy's avatar
Jeromy committed
140
	for _, p := range peers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141
		log.Debugf("Sending getDiagnostic to: %s", p)
142
		sends++
143
		go func(p peer.Peer) {
144
			data, err := d.getDiagnosticFromPeer(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
145
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146
				log.Errorf("GetDiagnostic error: %v", err)
147 148
				respdata <- nil
				return
Jeromy's avatar
Jeromy committed
149
			}
150 151 152 153 154 155 156 157
			respdata <- data
		}(p)
	}

	for i := 0; i < sends; i++ {
		data := <-respdata
		if data == nil {
			continue
Jeromy's avatar
Jeromy committed
158
		}
Jeromy's avatar
Jeromy committed
159
		out = appendDiagnostics(data, out)
Jeromy's avatar
Jeromy committed
160 161 162 163
	}
	return out, nil
}

Jeromy's avatar
Jeromy committed
164
func appendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo {
165 166 167
	buf := bytes.NewBuffer(data)
	dec := json.NewDecoder(buf)
	for {
168
		di := new(DiagInfo)
169 170 171
		err := dec.Decode(di)
		if err != nil {
			if err != io.EOF {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
172
				log.Errorf("error decoding DiagInfo: %v", err)
173 174 175 176 177 178 179 180
			}
			break
		}
		cur = append(cur, di)
	}
	return cur
}

Jeromy's avatar
Jeromy committed
181
// TODO: this method no longer needed.
182
func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.Peer, mes *pb.Message) ([]byte, error) {
Jeromy's avatar
Jeromy committed
183 184 185 186 187 188 189
	rpmes, err := d.sendRequest(ctx, p, mes)
	if err != nil {
		return nil, err
	}
	return rpmes.GetData(), nil
}

190 191
func newMessage(diagID string) *pb.Message {
	pmes := new(pb.Message)
Jeromy's avatar
Jeromy committed
192 193 194 195
	pmes.DiagID = proto.String(diagID)
	return pmes
}

196
func (d *Diagnostics) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
Jeromy's avatar
Jeromy committed
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return nil, err
	}

	start := time.Now()

	rmes, err := d.sender.SendRequest(ctx, mes)
	if err != nil {
		return nil, err
	}
	if rmes == nil {
		return nil, errors.New("no response to request")
	}

	rtt := time.Since(start)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214
	log.Infof("diagnostic request took: %s", rtt.String())
Jeromy's avatar
Jeromy committed
215

216
	rpmes := new(pb.Message)
Jeromy's avatar
Jeromy committed
217 218 219 220 221 222 223
	if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
		return nil, err
	}

	return rpmes, nil
}

224
func (d *Diagnostics) handleDiagnostic(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225
	log.Debugf("HandleDiagnostic from %s for id = %s", p, pmes.GetDiagID())
Jeromy's avatar
Jeromy committed
226
	resp := newMessage(pmes.GetDiagID())
Jeromy's avatar
Jeromy committed
227 228

	// Make sure we havent already handled this request to prevent loops
Jeromy's avatar
Jeromy committed
229 230 231 232 233 234 235 236 237 238 239 240 241
	d.diagLock.Lock()
	_, found := d.diagMap[pmes.GetDiagID()]
	if found {
		d.diagLock.Unlock()
		return resp, nil
	}
	d.diagMap[pmes.GetDiagID()] = time.Now()
	d.diagLock.Unlock()

	buf := new(bytes.Buffer)
	di := d.getDiagInfo()
	buf.Write(di.Marshal())

242
	ctx, _ := context.WithTimeout(context.TODO(), ResponseTimeout)
Jeromy's avatar
Jeromy committed
243

244 245
	respdata := make(chan []byte)
	sendcount := 0
Jeromy's avatar
Jeromy committed
246
	for _, p := range d.getPeers() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
247
		log.Debugf("Sending diagnostic request to peer: %s", p)
248
		sendcount++
249
		go func(p peer.Peer) {
250 251
			out, err := d.getDiagnosticFromPeer(ctx, p, pmes)
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252
				log.Errorf("getDiagnostic error: %v", err)
253 254 255 256 257 258 259 260 261 262
				respdata <- nil
				return
			}
			respdata <- out
		}(p)
	}

	for i := 0; i < sendcount; i++ {
		out := <-respdata
		_, err := buf.Write(out)
Jeromy's avatar
Jeromy committed
263
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
264
			log.Errorf("getDiagnostic write output error: %v", err)
Jeromy's avatar
Jeromy committed
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
			continue
		}
	}

	resp.Data = buf.Bytes()
	return resp, nil
}

func (d *Diagnostics) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.NetMessage {
	mData := mes.Data()
	if mData == nil {
		log.Error("message did not include Data")
		return nil
	}

	mPeer := mes.Peer()
	if mPeer == nil {
		log.Error("message did not include a Peer")
		return nil
	}

	// deserialize msg
287
	pmes := new(pb.Message)
Jeromy's avatar
Jeromy committed
288 289
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
290
		log.Errorf("Failed to decode protobuf message: %v", err)
Jeromy's avatar
Jeromy committed
291 292 293 294
		return nil
	}

	// Print out diagnostic
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
295
	log.Infof("[peer: %s] Got message from [%s]\n",
296
		d.self.ID().Pretty(), mPeer.ID().Pretty())
Jeromy's avatar
Jeromy committed
297 298 299 300

	// dispatch handler.
	rpmes, err := d.handleDiagnostic(mPeer, pmes)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
301
		log.Errorf("handleDiagnostic error: %s", err)
Jeromy's avatar
Jeromy committed
302 303 304 305 306 307 308 309 310 311 312
		return nil
	}

	// if nil response, return it before serializing
	if rpmes == nil {
		return nil
	}

	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
313
		log.Errorf("Failed to encode protobuf message: %v", err)
Jeromy's avatar
Jeromy committed
314 315 316 317 318
		return nil
	}

	return rmes
}