Unverified Commit 21efed75 authored by Raúl Kripalani's avatar Raúl Kripalani Committed by GitHub

experimental introspection support (#159)

Co-authored-by: default avatarAarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: default avatarRaúl Kripalani <raul@protocol.ai>
parent 0d53a552
......@@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/introspection"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
......@@ -73,3 +74,16 @@ type Host interface {
// EventBus returns the hosts eventbus
EventBus() event.Bus
}
// IntrospectableHost is implemented by Host implementations that are
// introspectable, that is, that may have introspection capability.
type IntrospectableHost interface {
// Introspector returns the introspector, or nil if one hasn't been
// registered. With it, the call can register data providers, and can fetch
// introspection data.
Introspector() introspection.Introspector
// IntrospectionEndpoint returns the introspection endpoint, or nil if one
// hasn't been registered.
IntrospectionEndpoint() introspection.Endpoint
}
// Package introspection is EXPERIMENTAL. It is subject to heavy change, and it
// WILL change. For now, it is the simplest implementation to power the
// proof-of-concept of the libp2p introspection framework.
//
// Package introspect contains the abstract skeleton of the introspection system
// of go-libp2p, and holds the introspection data schema.
package introspection
package introspection
// Endpoint is the interface to be implemented by introspection endpoints.
//
// An introspection endpoint makes introspection data accessible to external
// consumers, over, for example, WebSockets, or TCP, or libp2p itself.
//
// Experimental.
type Endpoint interface {
// Start starts the introspection endpoint. It must only be called once, and
// once the server is started, subsequent calls made without first calling
// Close will error.
Start() error
// Close stops the introspection endpoint. Calls to Close on an already
// closed endpoint (or an unstarted endpoint) must noop.
Close() error
// ListenAddrs returns the listen addresses of this endpoint.
ListenAddrs() []string
// Sessions returns the ongoing sessions of this endpoint.
Sessions() []*Session
}
// Session represents an introspection session.
type Session struct {
// RemoteAddr is the remote address of the session.
RemoteAddr string
}
package introspection
import (
"io"
"github.com/libp2p/go-libp2p-core/introspection/pb"
)
// Introspector is the interface to be satisfied by components that are capable
// of spelunking the state of the system, and representing in accordance with
// the introspection schema.
//
// It's very rare to build a custom implementation of this interface;
// it exists mostly for mocking. In most cases, you'll end up using the
// default introspector.
//
// Introspector implementations are usually injected in introspection endpoints
// to serve the data to clients, but they can also be used separately for
// embedding or testing.
//
// Experimental.
type Introspector interface {
io.Closer
// FetchRuntime returns the runtime information of the system.
FetchRuntime() (*pb.Runtime, error)
// FetchFullState returns the full state cross-cut of the running system.
FetchFullState() (*pb.State, error)
// EventChan returns the channel where all eventbus events are dumped,
// decorated with their corresponding event metadata, ready to send over
// the wire.
EventChan() <-chan *pb.Event
// EventMetadata returns the metadata of all events known to the
// Introspector.
EventMetadata() []*pb.EventType
}
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --proto_path=$(PWD):$(PWD)/../..:$(GOPATH)/src --gogofaster_out=Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types:. $<
clean:
rm -f *.pb.go
rm -f *.go
\ No newline at end of file
// Package introspection/pb contains the protobuf definitions and objects for
// that form the libp2p introspection protocol.
package pb
This diff is collapsed.
syntax = "proto3";
package pb;
// Version of schema
message Version {
uint32 version = 1;
}
// ResultCounter is a monotonically increasing counter that reports an ok/err breakdown of the total.
message ResultCounter {
uint32 total = 1;
uint32 ok = 2;
uint32 err = 3;
}
// Moving totals over sliding time windows. Models sensible time windows,
// we don't have to populate them all at once.
//
// Graphical example:
//
// time past -> present an event 16 min ago
// ======================================================X================>>
// | | 1m
// | |---| 5m
// | |-------------| 15m
// |------------X---------------| 30m
// |------------------------------------------X---------------| 60m
message SlidingCounter {
uint32 over_1m = 1;
uint32 over_5m = 2;
uint32 over_15m = 3;
uint32 over_30m = 4;
uint32 over_1hr = 5;
uint32 over_2hr = 6;
uint32 over_4hr = 7;
uint32 over_8hr = 8;
uint32 over_12hr = 9;
uint32 over_24hr = 10;
}
// DataGauge reports stats for data traffic in a given direction.
message DataGauge {
// Cumulative bytes.
uint64 cum_bytes = 1;
// Cumulative packets.
uint64 cum_packets = 2;
// Instantaneous bandwidth measurement (bytes/second).
uint64 inst_bw = 3;
}
// describes a type of event
message EventType {
// metadata about content types in event's top-level content JSON
message EventProperty {
// tells client how to sort, filter or display known content properties
enum PropertyType {
// for properties to treat as a simple primitive
STRING = 0; // default
NUMBER = 1;
// for properties with special human-readable formatting
TIME = 10;
PEERID = 11;
MULTIADDR = 12;
// for complex structures like nested arrays, object trees etc
JSON = 90;
}
// property name of content e.g. openTs
string name = 1;
// type to interpret content value as
PropertyType type = 2;
// if true, expect an array of values of `type`; else, singular
bool has_multiple = 3;
}
// name of event type, e.g. PeerConnecting
string name = 1;
// for runtime, send property_types for all events already seen in events list
// for events, only send property_types in the first event of a type not in runtime
repeated EventProperty property_types = 2;
}
// Runtime encapsulates runtime info about a node.
message Runtime {
// e.g. go-libp2p, js-libp2p, rust-libp2p, etc.
string implementation = 1;
// e.g. 1.2.3.
string version = 2;
// e.g. Windows, Unix, macOS, Chrome, Mozilla, etc.
string platform = 3;
// our peer id - the peer id of the host system
string peer_id = 4;
// metadata describing configured event types
repeated EventType event_types = 7;
}
// EndpointPair is a pair of multiaddrs.
message EndpointPair {
// the source multiaddr.
string src_multiaddr = 1;
// the destination multiaddr.
string dst_multiaddr = 2;
}
// The status of a connection or stream.
enum Status {
ACTIVE = 0;
CLOSED = 1;
OPENING = 2;
CLOSING = 3;
ERROR = 4;
}
// Our role in a connection or stream.
enum Role {
INITIATOR = 0;
RESPONDER = 1;
}
// Traffic encloses data transfer statistics.
message Traffic {
// snapshot of the data in metrics.
DataGauge traffic_in = 1;
// snapshot of the data out metrics.
DataGauge traffic_out = 2;
}
// a list of streams, by reference or inlined.
message StreamList {
// NOTE: only one of the next 2 fields can appear, but proto3
// doesn't support combining oneof and repeated.
//
// streams within this connection by reference.
repeated bytes stream_ids = 1;
// streams within this connection by inlining.
repeated Stream streams = 2;
}
// Connection reports metrics and state of a libp2p connection.
message Connection {
// Timeline contains the timestamps (ms since epoch) of the well-known milestones of a connection.
message Timeline {
// the instant when a connection was opened on the wire.
uint64 open_ts = 1;
// the instant when the upgrade process (handshake, security, multiplexing) finished.
uint64 upgraded_ts = 2;
// the instant when this connection was terminated.
uint64 close_ts = 3;
}
// Attributes encapsulates the attributes of this connection.
message Attributes {
// the multiplexer being used.
string multiplexer = 1;
// the encryption method being used.
string encryption = 2;
}
// the id of this connection, not to be shown in user tooling,
// used for (cross)referencing connections (e.g. relay).
bytes id = 1;
// the peer id of the other party.
string peer_id = 2;
// the status of this connection.
Status status = 3;
// a reference to the transport managing this connection.
bytes transport_id = 4;
// the endpoints participating in this connection.
EndpointPair endpoints = 5;
// the timeline of the connection, see Connection.Timeline.
Timeline timeline = 6;
// our role in this connection.
Role role = 7;
// traffic statistics.
Traffic traffic = 8;
// properties of this connection.
Attributes attribs = 9;
// the instantaneous latency of this connection in nanoseconds.
uint64 latency_ns = 10;
// streams within this connection.
StreamList streams = 11;
reserved 12 to 15;
// if this is a relayed connection, this points to the relaying connection.
// a default value here (empty bytes) indicates this is not a relayed connection.
oneof relayed_over {
bytes conn_id = 16;
Connection conn = 17;
}
// user provided tags.
repeated string user_provided_tags = 99;
}
// Stream reports metrics and state of a libp2p stream.
message Stream {
message ConnectionRef {
oneof connection {
// the parent connection inlined.
Connection conn = 1;
// the parent connection by reference.
bytes conn_id = 2;
}
}
// Timeline contains the timestamps (ms since epoch) of the well-known milestones of a stream.
message Timeline {
// the instant when the stream was opened.
uint64 open_ts = 1;
// the instant when the stream was terminated.
uint64 close_ts = 2;
}
// the id of this stream, not to be shown in user tooling,
// used for (cross)referencing streams.
bytes id = 1;
// the protocol pinned to this stream.
string protocol = 2;
// our role in this stream.
Role role = 3;
// traffic statistics.
Traffic traffic = 4;
// the connection this stream is hosted under.
ConnectionRef conn = 5;
// the timeline of the stream, see Stream.Timeline.
Timeline timeline = 6;
// the status of this stream.
Status status = 7;
// the instantaneous latency of this stream in nanoseconds.
// TODO: this is hard to calculate.
uint64 latency_ns = 16;
// user provided tags.
repeated string user_provided_tags = 99;
}
// DHT metrics and state.
message DHT {
message Params {
// routing table bucket size.
uint64 k = 1;
// concurrency of asynchronous requests.
uint64 alpha = 2;
// number of disjoint paths to use.
uint64 disjoint_paths = 3;
// number of peers closest to a target that must have responded
// in order for a given query path to complete
uint64 beta = 4;
}
// Peer in DHT
message PeerInDHT {
// The DHT's relationship with this peer
enum Status {
// Connected, in a bucket, ready to send/receive queries
ACTIVE = 0;
// Not currently connected, still "in" a bucket (e.g. temporarily disconnected)
MISSING = 1;
// Removed from a bucket or candidate list (e.g. connection lost or too slow)
REJECTED = 2;
// Was reachable when last checked, waiting to join a currently-full bucket
CANDIDATE = 3;
}
// the peer id of the host system
string peer_id = 1;
// the peer's status when data snapshot is taken
Status status = 2;
// age in bucket (ms)
uint32 age_in_bucket = 3;
}
// A "k-bucket" containing peers of a certain kadamelia distance
message Bucket {
// CPL (Common Prefix Length) is the length of the common prefix
// between the ids of every peer in this bucket and the DHT peer id
uint32 cpl = 1;
// Peers associated with this bucket
repeated PeerInDHT peers = 2;
// Bucket may need more fields depending on WIP remodeling
}
// Counters of query events, by status
message QueryGauge {
// Cumulative counter of queries with "SUCCESS" status
uint64 success = 1;
// Cumulative counter of queries with "ERROR" status
uint64 error = 2;
// Cumulative counter of queries with "TIMEOUT" status
uint64 timeout = 3;
}
// DHT protocol name
string protocol = 1;
// protocol enabled.
bool enabled = 2;
// timestamp (ms since epoch) of start up.
uint64 start_ts = 3;
// params of the dht.
Params params = 4;
// existing, intantiated buckets and their contents
repeated Bucket buckets = 5;
// counts inbound queries received from other peers
QueryGauge incoming_queries = 6;
// counts outbound queries dispatched by this peer
QueryGauge outgoing_queries = 7;
}
// Subsystems encapsulates all instrumented subsystems for a libp2p host.
message Subsystems {
// connections data, source agnostic but currently only supports the Swarm subsystem
repeated Connection connections = 1;
// the DHT subsystem.
DHT dht = 2;
}
// Connections and streams output for a time interval is one of these.
message State {
// list of connections
Subsystems subsystems = 1;
// overall traffic for this peer
Traffic traffic = 2;
// moment this data snapshot and instantaneous values were taken
uint64 instant_ts = 3;
// start of included data collection (cumulative values counted from here)
uint64 start_ts = 4;
// length of time up to instant_ts covered by this data snapshot
uint32 snapshot_duration_ms = 5;
}
// Event
message Event {
// definition of event type, containing only `name` unless this is first encounter of novel event
EventType type = 1;
// time this event occurred (ms since epoch)
uint64 ts = 2;
// stringified json; top-level keys and value types match EventProperty definitions
string content = 3;
}
// ServerMessage wraps messages to be sent to clients to allow extension
// based on new types of data sources
message ServerMessage {
// Version of this protobuf.
Version version = 1;
// The payload this message contains.
oneof payload {
State state = 2;
Runtime runtime = 3;
Event event = 4;
CommandResponse response = 5;
ServerNotice notice = 6;
}
}
// Configuration encapsulates configuration fields for the protocol and commands.
message Configuration {
uint64 retention_period_ms = 1;
uint64 state_snapshot_interval_ms = 2;
}
// ClientCommand is a command sent from the client to the server.
message ClientCommand {
enum Source {
STATE = 0; // full state snapshot.
RUNTIME = 1; // runtime data message.
EVENTS = 2; // eventbus events.
}
enum Command {
// HELLO is the first command that a client must send to greet the server.
// Connections that do not respect this invariant will be terminated.
HELLO = 0;
// REQUEST is applicable to STATE and RUNTIME sources.
REQUEST = 1;
// PUSH streams can only be started for STATE and EVENTS sources.
PUSH_ENABLE = 2; // enables pushing for a given source.
PUSH_DISABLE = 3; // disables pushing for a given source.
PUSH_PAUSE = 4; // pauses pushing for all sources.
PUSH_RESUME = 5; // resumes pushing for all sources.
// UPDATE_CONFIG requests a configuration update. The config field is
// compulsory.
//
// The server reserves the right to override the requested values, and
// will return the effective configuration in the response.
UPDATE_CONFIG = 7;
}
Version version = 1;
uint64 id = 2; // a unique ID for this request.
Command command = 3;
Source source = 4;
Configuration config = 5;
}
// CommandResponse is a response to a command sent by the client.
message CommandResponse {
enum Result {
OK = 0;
ERR = 1;
}
uint64 id = 1; // for correlation with the request.
Result result = 2;
string error = 3;
// effective_config is the effective configuration the server holds for
// this connection. It is returned in response to HELLO and UPDATE_CONFIG
// commands.
Configuration effective_config = 4;
}
// ServerNotice represents a NOTICE sent from the server to the client.
message ServerNotice {
enum Kind {
DISCARDING_EVENTS = 0;
}
Kind kind = 1;
}
\ No newline at end of file
......@@ -5,6 +5,7 @@ import (
"time"
"github.com/libp2p/go-flow-metrics"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
......
......@@ -19,6 +19,10 @@ type Conn interface {
ConnSecurity
ConnMultiaddrs
// ID returns an identifier that uniquely identifies this Conn within this
// host, during this run. Connection IDs may repeat across restarts.
ID() string
// NewStream constructs a new Stream over this conn.
NewStream() (Stream, error)
......
......@@ -8,6 +8,7 @@ package network
import (
"context"
"io"
"time"
"github.com/jbenet/goprocess"
"github.com/libp2p/go-libp2p-core/peer"
......@@ -98,8 +99,12 @@ func (r Reachability) String() string {
// Stat stores metadata pertaining to a given Stream/Conn.
type Stat struct {
// Direction specifies whether this is an inbound or an outbound connection.
Direction Direction
Extra map[interface{}]interface{}
// Opened is the timestamp when this connection was opened.
Opened time.Time
// Extra stores additional metadata about this connection.
Extra map[interface{}]interface{}
}
// StreamHandler is the type of function used to listen for
......
......@@ -13,6 +13,10 @@ import (
type Stream interface {
mux.MuxedStream
// ID returns an identifier that uniquely identifies this Stream within this
// host, during this run. Stream IDs may repeat across restarts.
ID() string
Protocol() protocol.ID
SetProtocol(id protocol.ID)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment