diag.go 6.04 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
package diagnostic

import (
	"bytes"
	"encoding/json"
	"errors"
	"io"
	"sync"
	"time"

	"crypto/rand"

	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"

	net "github.com/jbenet/go-ipfs/net"
	msg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
	util "github.com/jbenet/go-ipfs/util"
)

var log = util.Logger("diagnostics")

24 25
const ResponseTimeout = time.Second * 10

Jeromy's avatar
Jeromy committed
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
type Diagnostics struct {
	network net.Network
	sender  net.Sender
	self    *peer.Peer

	diagLock sync.Mutex
	diagMap  map[string]time.Time
	birth    time.Time
}

func NewDiagnostics(self *peer.Peer, inet net.Network, sender net.Sender) *Diagnostics {
	return &Diagnostics{
		network: inet,
		sender:  sender,
		self:    self,
		diagMap: make(map[string]time.Time),
		birth:   time.Now(),
	}
}

type connDiagInfo struct {
	Latency time.Duration
	ID      string
}

51
type DiagInfo struct {
Jeromy's avatar
Jeromy committed
52 53 54 55 56 57 58
	ID          string
	Connections []connDiagInfo
	Keys        []string
	LifeSpan    time.Duration
	CodeVersion string
}

59
func (di *DiagInfo) Marshal() []byte {
Jeromy's avatar
Jeromy committed
60 61 62 63 64 65 66 67 68
	b, err := json.Marshal(di)
	if err != nil {
		panic(err)
	}
	//TODO: also consider compressing this. There will be a lot of these
	return b
}

func (d *Diagnostics) getPeers() []*peer.Peer {
69
	return d.network.GetPeerList()
Jeromy's avatar
Jeromy committed
70 71
}

72 73
func (d *Diagnostics) getDiagInfo() *DiagInfo {
	di := new(DiagInfo)
Jeromy's avatar
Jeromy committed
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
	di.CodeVersion = "github.com/jbenet/go-ipfs"
	di.ID = d.self.ID.Pretty()
	di.LifeSpan = time.Since(d.birth)
	di.Keys = nil // Currently no way to query datastore

	for _, p := range d.getPeers() {
		di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID.Pretty()})
	}
	return di
}

func newID() string {
	id := make([]byte, 4)
	rand.Read(id)
	return string(id)
}

91
func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) {
Jeromy's avatar
Jeromy committed
92 93 94 95 96 97 98 99 100 101 102 103 104
	log.Debug("Getting diagnostic.")
	ctx, _ := context.WithTimeout(context.TODO(), timeout)

	diagID := newID()
	d.diagLock.Lock()
	d.diagMap[diagID] = time.Now()
	d.diagLock.Unlock()

	log.Debug("Begin Diagnostic")

	peers := d.getPeers()
	log.Debug("Sending diagnostic request to %d peers.", len(peers))

105
	var out []*DiagInfo
Jeromy's avatar
Jeromy committed
106 107 108 109
	di := d.getDiagInfo()
	out = append(out, di)

	pmes := newMessage(diagID)
110 111 112

	respdata := make(chan []byte)
	sends := 0
Jeromy's avatar
Jeromy committed
113 114
	for _, p := range peers {
		log.Debug("Sending getDiagnostic to: %s", p)
115 116 117
		sends++
		go func(p *peer.Peer) {
			data, err := d.getDiagnosticFromPeer(ctx, p, pmes)
Jeromy's avatar
Jeromy committed
118
			if err != nil {
119 120 121
				log.Error("GetDiagnostic error: %v", err)
				respdata <- nil
				return
Jeromy's avatar
Jeromy committed
122
			}
123 124 125 126 127 128 129 130
			respdata <- data
		}(p)
	}

	for i := 0; i < sends; i++ {
		data := <-respdata
		if data == nil {
			continue
Jeromy's avatar
Jeromy committed
131
		}
132
		out = AppendDiagnostics(data, out)
Jeromy's avatar
Jeromy committed
133 134 135 136
	}
	return out, nil
}

137
func AppendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo {
138 139 140
	buf := bytes.NewBuffer(data)
	dec := json.NewDecoder(buf)
	for {
141
		di := new(DiagInfo)
142 143 144
		err := dec.Decode(di)
		if err != nil {
			if err != io.EOF {
145
				log.Error("error decoding DiagInfo: %v", err)
146 147 148 149 150 151 152 153
			}
			break
		}
		cur = append(cur, di)
	}
	return cur
}

Jeromy's avatar
Jeromy committed
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
// TODO: this method no longer needed.
func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p *peer.Peer, mes *Message) ([]byte, error) {
	rpmes, err := d.sendRequest(ctx, p, mes)
	if err != nil {
		return nil, err
	}
	return rpmes.GetData(), nil
}

func newMessage(diagID string) *Message {
	pmes := new(Message)
	pmes.DiagID = proto.String(diagID)
	return pmes
}

func (d *Diagnostics) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) {

	mes, err := msg.FromObject(p, pmes)
	if err != nil {
		return nil, err
	}

	start := time.Now()

	rmes, err := d.sender.SendRequest(ctx, mes)
	if err != nil {
		return nil, err
	}
	if rmes == nil {
		return nil, errors.New("no response to request")
	}

	rtt := time.Since(start)
	log.Info("diagnostic request took: %s", rtt.String())

	rpmes := new(Message)
	if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
		return nil, err
	}

	return rpmes, nil
}

func (d *Diagnostics) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) {
Jeromy's avatar
Jeromy committed
198
	log.Debug("HandleDiagnostic from %s for id = %s", p, pmes.GetDiagID())
Jeromy's avatar
Jeromy committed
199 200 201 202 203 204 205 206 207 208 209 210 211 212
	resp := newMessage(pmes.GetDiagID())
	d.diagLock.Lock()
	_, found := d.diagMap[pmes.GetDiagID()]
	if found {
		d.diagLock.Unlock()
		return resp, nil
	}
	d.diagMap[pmes.GetDiagID()] = time.Now()
	d.diagLock.Unlock()

	buf := new(bytes.Buffer)
	di := d.getDiagInfo()
	buf.Write(di.Marshal())

213
	ctx, _ := context.WithTimeout(context.TODO(), ResponseTimeout)
Jeromy's avatar
Jeromy committed
214

215 216
	respdata := make(chan []byte)
	sendcount := 0
Jeromy's avatar
Jeromy committed
217 218
	for _, p := range d.getPeers() {
		log.Debug("Sending diagnostic request to peer: %s", p)
219
		sendcount++
220 221 222 223 224 225 226 227 228 229 230 231 232 233
		go func(p *peer.Peer) {
			out, err := d.getDiagnosticFromPeer(ctx, p, pmes)
			if err != nil {
				log.Error("getDiagnostic error: %v", err)
				respdata <- nil
				return
			}
			respdata <- out
		}(p)
	}

	for i := 0; i < sendcount; i++ {
		out := <-respdata
		_, err := buf.Write(out)
Jeromy's avatar
Jeromy committed
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
		if err != nil {
			log.Error("getDiagnostic write output error: %v", err)
			continue
		}
	}

	resp.Data = buf.Bytes()
	return resp, nil
}

func (d *Diagnostics) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.NetMessage {
	mData := mes.Data()
	if mData == nil {
		log.Error("message did not include Data")
		return nil
	}

	mPeer := mes.Peer()
	if mPeer == nil {
		log.Error("message did not include a Peer")
		return nil
	}

	// deserialize msg
	pmes := new(Message)
	err := proto.Unmarshal(mData, pmes)
	if err != nil {
		log.Error("Failed to decode protobuf message: %v", err)
		return nil
	}

	// Print out diagnostic
	log.Info("[peer: %s] Got message from [%s]\n",
		d.self.ID.Pretty(), mPeer.ID.Pretty())

	// dispatch handler.
	rpmes, err := d.handleDiagnostic(mPeer, pmes)
	if err != nil {
		log.Error("handleDiagnostic error: %s", err)
		return nil
	}

	// if nil response, return it before serializing
	if rpmes == nil {
		return nil
	}

	// serialize response msg
	rmes, err := msg.FromObject(mPeer, rpmes)
	if err != nil {
		log.Error("Failed to encode protobuf message: %v", err)
		return nil
	}

	return rmes
}