Commit 280c7e7e authored by Jeromy's avatar Jeromy

implement diagnostics service

parent 7652f86c
{
"ImportPath": "github.com/jbenet/go-ipfs",
"GoVersion": "go1.3",
"GoVersion": "go1.3.3",
"Packages": [
"./..."
],
......
......@@ -81,6 +81,7 @@ func ipfsCmd(c *commander.Command, args []string) error {
}
func main() {
u.AllLoggersOn()
u.Debug = false
// setup logging
......
......@@ -13,6 +13,7 @@ import (
bserv "github.com/jbenet/go-ipfs/blockservice"
config "github.com/jbenet/go-ipfs/config"
ci "github.com/jbenet/go-ipfs/crypto"
diag "github.com/jbenet/go-ipfs/diagnostics"
exchange "github.com/jbenet/go-ipfs/exchange"
bitswap "github.com/jbenet/go-ipfs/exchange/bitswap"
merkledag "github.com/jbenet/go-ipfs/merkledag"
......@@ -64,6 +65,9 @@ type IpfsNode struct {
// the name system, resolves paths to hashes
Namesys namesys.NameSystem
// the diagnostics service
Diagnostics *diag.Diagnostics
}
// NewIpfsNode constructs a new IpfsNode based on the given config.
......@@ -103,12 +107,14 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
// TODO: refactor so we can use IpfsRouting interface instead of being DHT-specific
route *dht.IpfsDHT
exchangeSession exchange.Interface
diagnostics *diag.Diagnostics
)
if online {
dhtService := netservice.NewService(nil) // nil handler for now, need to patch it
exchangeService := netservice.NewService(nil) // nil handler for now, need to patch it
diagService := netservice.NewService(nil)
if err := dhtService.Start(ctx); err != nil {
return nil, err
......@@ -118,14 +124,18 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
}
net, err = inet.NewIpfsNetwork(context.TODO(), local, peerstore, &mux.ProtocolMap{
mux.ProtocolID_Routing: dhtService,
mux.ProtocolID_Exchange: exchangeService,
mux.ProtocolID_Routing: dhtService,
mux.ProtocolID_Exchange: exchangeService,
mux.ProtocolID_Diagnostic: diagService,
// add protocol services here.
})
if err != nil {
return nil, err
}
diagnostics = diag.NewDiagnostics(local, net, diagService)
diagService.Handler = diagnostics
route = dht.NewDHT(local, peerstore, net, dhtService, d)
// TODO(brian): perform this inside NewDHT factory method
dhtService.Handler = route // wire the handler to the service.
......@@ -149,16 +159,17 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
success = true
return &IpfsNode{
Config: cfg,
Peerstore: peerstore,
Datastore: d,
Blocks: bs,
DAG: dag,
Resolver: &path.Resolver{DAG: dag},
Exchange: exchangeSession,
Identity: local,
Routing: route,
Namesys: ns,
Config: cfg,
Peerstore: peerstore,
Datastore: d,
Blocks: bs,
DAG: dag,
Resolver: &path.Resolver{DAG: dag},
Exchange: exchangeSession,
Identity: local,
Routing: route,
Namesys: ns,
Diagnostics: diagnostics,
}, nil
}
......
......@@ -8,6 +8,7 @@ import (
"os"
"path"
"sync"
"time"
core "github.com/jbenet/go-ipfs/core"
"github.com/jbenet/go-ipfs/core/commands"
......@@ -136,6 +137,15 @@ func (dl *DaemonListener) handleConnection(conn net.Conn) {
err = commands.Publish(dl.node, command.Args, command.Opts, conn)
case "resolve":
err = commands.Resolve(dl.node, command.Args, command.Opts, conn)
case "diag":
log.Debug("DIAGNOSTIC!")
info, err := dl.node.Diagnostics.GetDiagnostic(time.Second * 20)
if err != nil {
fmt.Fprintln(conn, err)
return
}
enc := json.NewEncoder(conn)
err = enc.Encode(info)
default:
err = fmt.Errorf("Invalid Command: '%s'", command.Command)
}
......
package diagnostic
import (
"bytes"
"encoding/json"
"errors"
"io"
"sync"
"time"
"crypto/rand"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
net "github.com/jbenet/go-ipfs/net"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
util "github.com/jbenet/go-ipfs/util"
)
var log = util.Logger("diagnostics")
type Diagnostics struct {
network net.Network
sender net.Sender
self *peer.Peer
diagLock sync.Mutex
diagMap map[string]time.Time
birth time.Time
}
func NewDiagnostics(self *peer.Peer, inet net.Network, sender net.Sender) *Diagnostics {
return &Diagnostics{
network: inet,
sender: sender,
self: self,
diagMap: make(map[string]time.Time),
birth: time.Now(),
}
}
type connDiagInfo struct {
Latency time.Duration
ID string
}
type diagInfo struct {
ID string
Connections []connDiagInfo
Keys []string
LifeSpan time.Duration
CodeVersion string
}
func (di *diagInfo) Marshal() []byte {
b, err := json.Marshal(di)
if err != nil {
panic(err)
}
//TODO: also consider compressing this. There will be a lot of these
return b
}
func (d *Diagnostics) getPeers() []*peer.Peer {
// <HACKY>
n, ok := d.network.(*net.IpfsNetwork)
if !ok {
return nil
}
s := n.GetSwarm()
return s.GetPeerList()
// </HACKY>
}
func (d *Diagnostics) getDiagInfo() *diagInfo {
di := new(diagInfo)
di.CodeVersion = "github.com/jbenet/go-ipfs"
di.ID = d.self.ID.Pretty()
di.LifeSpan = time.Since(d.birth)
di.Keys = nil // Currently no way to query datastore
for _, p := range d.getPeers() {
di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID.Pretty()})
}
return di
}
func newID() string {
id := make([]byte, 4)
rand.Read(id)
return string(id)
}
func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
log.Debug("Getting diagnostic.")
ctx, _ := context.WithTimeout(context.TODO(), timeout)
diagID := newID()
d.diagLock.Lock()
d.diagMap[diagID] = time.Now()
d.diagLock.Unlock()
log.Debug("Begin Diagnostic")
peers := d.getPeers()
log.Debug("Sending diagnostic request to %d peers.", len(peers))
var out []*diagInfo
di := d.getDiagInfo()
out = append(out, di)
pmes := newMessage(diagID)
for _, p := range peers {
log.Debug("Sending getDiagnostic to: %s", p)
data, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil {
log.Error("GetDiagnostic error: %v", err)
continue
}
buf := bytes.NewBuffer(data)
dec := json.NewDecoder(buf)
for {
di := new(diagInfo)
err := dec.Decode(di)
if err != nil {
if err != io.EOF {
log.Error("error decoding diagInfo: %v", err)
}
break
}
out = append(out, di)
}
}
return out, nil
}
// TODO: this method no longer needed.
func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p *peer.Peer, mes *Message) ([]byte, error) {
rpmes, err := d.sendRequest(ctx, p, mes)
if err != nil {
return nil, err
}
return rpmes.GetData(), nil
}
func newMessage(diagID string) *Message {
pmes := new(Message)
pmes.DiagID = proto.String(diagID)
return pmes
}
func (d *Diagnostics) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) {
mes, err := msg.FromObject(p, pmes)
if err != nil {
return nil, err
}
start := time.Now()
rmes, err := d.sender.SendRequest(ctx, mes)
if err != nil {
return nil, err
}
if rmes == nil {
return nil, errors.New("no response to request")
}
rtt := time.Since(start)
log.Info("diagnostic request took: %s", rtt.String())
rpmes := new(Message)
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
return nil, err
}
return rpmes, nil
}
// NOTE: not yet finished, low priority
func (d *Diagnostics) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) {
resp := newMessage(pmes.GetDiagID())
d.diagLock.Lock()
_, found := d.diagMap[pmes.GetDiagID()]
if found {
d.diagLock.Unlock()
return resp, nil
}
d.diagMap[pmes.GetDiagID()] = time.Now()
d.diagLock.Unlock()
buf := new(bytes.Buffer)
di := d.getDiagInfo()
buf.Write(di.Marshal())
ctx, _ := context.WithTimeout(context.TODO(), time.Second*10)
for _, p := range d.getPeers() {
log.Debug("Sending diagnostic request to peer: %s", p)
out, err := d.getDiagnosticFromPeer(ctx, p, pmes)
if err != nil {
log.Error("getDiagnostic error: %v", err)
continue
}
_, err = buf.Write(out)
if err != nil {
log.Error("getDiagnostic write output error: %v", err)
continue
}
}
resp.Data = buf.Bytes()
return resp, nil
}
func (d *Diagnostics) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.NetMessage {
mData := mes.Data()
if mData == nil {
log.Error("message did not include Data")
return nil
}
mPeer := mes.Peer()
if mPeer == nil {
log.Error("message did not include a Peer")
return nil
}
// deserialize msg
pmes := new(Message)
err := proto.Unmarshal(mData, pmes)
if err != nil {
log.Error("Failed to decode protobuf message: %v", err)
return nil
}
// Print out diagnostic
log.Info("[peer: %s] Got message from [%s]\n",
d.self.ID.Pretty(), mPeer.ID.Pretty())
// dispatch handler.
rpmes, err := d.handleDiagnostic(mPeer, pmes)
if err != nil {
log.Error("handleDiagnostic error: %s", err)
return nil
}
// if nil response, return it before serializing
if rpmes == nil {
return nil
}
// serialize response msg
rmes, err := msg.FromObject(mPeer, rpmes)
if err != nil {
log.Error("Failed to encode protobuf message: %v", err)
return nil
}
return rmes
}
// Code generated by protoc-gen-go.
// source: message.proto
// DO NOT EDIT!
/*
Package diagnostic is a generated protocol buffer package.
It is generated from these files:
message.proto
It has these top-level messages:
Message
*/
package diagnostic
import proto "code.google.com/p/goprotobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type Message struct {
DiagID *string `protobuf:"bytes,1,req" json:"DiagID,omitempty"`
Data []byte `protobuf:"bytes,2,opt" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (m *Message) GetDiagID() string {
if m != nil && m.DiagID != nil {
return *m.DiagID
}
return ""
}
func (m *Message) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func init() {
}
package diagnostic;
message Message {
required string DiagID = 1;
optional bytes Data = 2;
}
// Code generated by protoc-gen-gogo.
// Code generated by protoc-gen-go.
// source: mux.proto
// DO NOT EDIT!
......@@ -13,22 +13,21 @@ It has these top-level messages:
*/
package mux
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import json "encoding/json"
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
import math "math"
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
type ProtocolID int32
const (
ProtocolID_Test ProtocolID = 0
ProtocolID_Identify ProtocolID = 1
ProtocolID_Routing ProtocolID = 2
ProtocolID_Exchange ProtocolID = 3
ProtocolID_Test ProtocolID = 0
ProtocolID_Identify ProtocolID = 1
ProtocolID_Routing ProtocolID = 2
ProtocolID_Exchange ProtocolID = 3
ProtocolID_Diagnostic ProtocolID = 4
)
var ProtocolID_name = map[int32]string{
......@@ -36,12 +35,14 @@ var ProtocolID_name = map[int32]string{
1: "Identify",
2: "Routing",
3: "Exchange",
4: "Diagnostic",
}
var ProtocolID_value = map[string]int32{
"Test": 0,
"Identify": 1,
"Routing": 2,
"Exchange": 3,
"Test": 0,
"Identify": 1,
"Routing": 2,
"Exchange": 3,
"Diagnostic": 4,
}
func (x ProtocolID) Enum() *ProtocolID {
......
......@@ -5,6 +5,7 @@ enum ProtocolID {
Identify = 1; // setup
Routing = 2; // dht
Exchange = 3; // bitswap
Diagnostic = 4;
}
message PBProtocolMessage {
......
......@@ -107,3 +107,8 @@ func (n *IpfsNetwork) Close() error {
n.cancel = nil
return nil
}
// XXX
func (n *IpfsNetwork) GetSwarm() *swarm.Swarm {
return n.swarm
}
......@@ -122,10 +122,12 @@ func (s *Swarm) connSetup(c *conn.Conn) error {
// add to conns
s.connsLock.Lock()
if _, ok := s.conns[c.Peer.Key()]; ok {
log.Debug("Conn already open!")
s.connsLock.Unlock()
return ErrAlreadyOpen
}
s.conns[c.Peer.Key()] = c
log.Debug("Added conn to map!")
s.connsLock.Unlock()
// kick off reader goroutine
......
......@@ -201,5 +201,15 @@ func (s *Swarm) GetErrChan() chan error {
return s.errChan
}
func (s *Swarm) GetPeerList() []*peer.Peer {
var out []*peer.Peer
s.connsLock.Lock()
for _, p := range s.conns {
out = append(out, p.Peer)
}
s.connsLock.Unlock()
return out
}
// Temporary to ensure that the Swarm always matches the Network interface as we are changing it
// var _ Network = &Swarm{}
......@@ -153,6 +153,13 @@ func SetupLogging() {
}
}
func AllLoggersOn() {
for n, log := range loggers {
logging.SetLevel(logging.DEBUG, n)
log.Error("setting logger: %s to %v\n", n, logging.DEBUG)
}
}
// Logger retrieves a particular logger + initializes it at a particular level
func Logger(name string) *logging.Logger {
log := logging.MustGetLogger(name)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment