From 6cf790510fe23997940c547bb116fe143476ea5c Mon Sep 17 00:00:00 2001 From: Whyrusleeping Date: Thu, 20 Jun 2019 16:59:53 +0200 Subject: [PATCH] eventbus abstraction + initial events (#17) --- event/bus.go | 48 +++++++++++++++++++++++++++++++++++++++++++++++ event/doc.go | 11 +++++++++++ event/protocol.go | 26 +++++++++++++++++++++++++ host/host.go | 4 ++++ protocol/id.go | 20 ++++++++++++++++++++ 5 files changed, 109 insertions(+) create mode 100644 event/bus.go create mode 100644 event/doc.go create mode 100644 event/protocol.go diff --git a/event/bus.go b/event/bus.go new file mode 100644 index 0000000..2538839 --- /dev/null +++ b/event/bus.go @@ -0,0 +1,48 @@ +package event + +import "io" + +// SubscriptionOpt represents a subscriber option. Use the options exposed by the implementation of choice. +type SubscriptionOpt = func(interface{}) error + +// EmitterOpt represents an emitter option. Use the options exposed by the implementation of choice. +type EmitterOpt = func(interface{}) error + +// CancelFunc closes a subscriber. +type CancelFunc = func() + +// Emitter represents an actor that emits events onto the eventbus. +type Emitter interface { + io.Closer + + // Emit emits an event onto the eventbus. If any channel subscribed to the topic is blocked, + // calls to Emit will block. + // + // Calling this function with wrong event type will cause a panic. + Emit(evt interface{}) +} + +// Bus is an interface for a type-based event delivery system. +type Bus interface { + // Subscribe creates a new subscription. + // + // Failing to drain the channel may cause publishers to block. CancelFunc must return after + // last send to the channel. + // + // Example: + // ch := make(chan EventT, 10) + // defer close(ch) + // cancel, err := eventbus.Subscribe(ch) + // defer cancel() + Subscribe(typedChan interface{}, opts ...SubscriptionOpt) (CancelFunc, error) + + // Emitter creates a new event emitter. + // + // eventType accepts typed nil pointers, and uses the type information for wiring purposes. + // + // Example: + // em, err := eventbus.Emitter(new(EventT)) + // defer em.Close() // MUST call this after being done with the emitter + // em.Emit(EventT{}) + Emitter(eventType interface{}, opts ...EmitterOpt) (Emitter, error) +} diff --git a/event/doc.go b/event/doc.go new file mode 100644 index 0000000..3a070f3 --- /dev/null +++ b/event/doc.go @@ -0,0 +1,11 @@ +// Package event contains the abstractions for a local event bus, along with the standard events +// that libp2p subsystems may emit. +// +// Source code is arranged as follows: +// * doc.go: this file. +// * bus.go: abstractions for the event bus. +// * rest: event structs, sensibly categorised in files by entity, and following this naming convention: +// Evt[Entity (noun)][Event (verb past tense / gerund)] +// The past tense is used to convey that something happened, whereas the gerund form of the verb (-ing) +// expresses that a process is in progress. Examples: EvtConnEstablishing, EvtConnEstablished. +package event diff --git a/event/protocol.go b/event/protocol.go new file mode 100644 index 0000000..87b4312 --- /dev/null +++ b/event/protocol.go @@ -0,0 +1,26 @@ +package event + +import ( + peer "github.com/libp2p/go-libp2p-core/peer" + protocol "github.com/libp2p/go-libp2p-core/protocol" +) + +// EvtPeerProtocolsUpdated should be emitted when a peer we're connected to adds or removes protocols from their stack. +type EvtPeerProtocolsUpdated struct { + // Peer is the peer whose protocols were updated. + Peer peer.ID + // Added enumerates the protocols that were added by this peer. + Added []protocol.ID + // Removed enumerates the protocols that were removed by this peer. + Removed []protocol.ID +} + +// EvtLocalProtocolsUpdated should be emitted when stream handlers are attached or detached from the local host. +// For handlers attached with a matcher predicate (host.SetStreamHandlerMatch()), only the protocol ID will be +// included in this event. +type EvtLocalProtocolsUpdated struct { + // Added enumerates the protocols that were added locally. + Added []protocol.ID + // Removed enumerates the protocols that were removed locally. + Removed []protocol.ID +} diff --git a/host/host.go b/host/host.go index d4b3c03..9aad5e1 100644 --- a/host/host.go +++ b/host/host.go @@ -7,6 +7,7 @@ import ( "context" "github.com/libp2p/go-libp2p-core/connmgr" + "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" @@ -68,4 +69,7 @@ type Host interface { // ConnManager returns this hosts connection manager ConnManager() connmgr.ConnManager + + // EventBus returns the hosts eventbus + EventBus() event.Bus } diff --git a/protocol/id.go b/protocol/id.go index f7e4a32..9df3b5b 100644 --- a/protocol/id.go +++ b/protocol/id.go @@ -7,3 +7,23 @@ type ID string const ( TestingID ID = "/p2p/_testing" ) + +// ConvertFromStrings is a convenience function that takes a slice of strings and +// converts it to a slice of protocol.ID. +func ConvertFromStrings(ids []string) (res []ID) { + res = make([]ID, 0, len(ids)) + for _, id := range ids { + res = append(res, ID(id)) + } + return res +} + +// ConvertToStrings is a convenience function that takes a slice of protocol.ID and +// converts it to a slice of strings. +func ConvertToStrings(ids []ID) (res []string) { + res = make([]string, 0, len(ids)) + for _, id := range ids { + res = append(res, string(id)) + } + return res +} -- GitLab