package p2p import ( "io" "sync" net "gx/ipfs/QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM/go-libp2p-net" peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net" ifconnmgr "gx/ipfs/QmVz2p8ZVZ5GcWPNWGs2HZHiZyHumZcJpQdMRpxkMDhc2C/go-libp2p-interface-connmgr" ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr" protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" ) const CMGR_TAG = "stream-fwd" // Stream holds information on active incoming and outgoing p2p streams. type Stream struct { id uint64 Protocol protocol.ID OriginAddr ma.Multiaddr TargetAddr ma.Multiaddr peer peer.ID Local manet.Conn Remote net.Stream Registry *StreamRegistry } // close closes stream endpoints and deregisters it func (s *Stream) close() error { s.Registry.Close(s) return nil } // reset closes stream endpoints and deregisters it func (s *Stream) reset() error { s.Registry.Reset(s) return nil } func (s *Stream) startStreaming() { go func() { _, err := io.Copy(s.Local, s.Remote) if err != nil { s.reset() } else { s.close() } }() go func() { _, err := io.Copy(s.Remote, s.Local) if err != nil { s.reset() } else { s.close() } }() } // StreamRegistry is a collection of active incoming and outgoing proto app streams. type StreamRegistry struct { sync.Mutex Streams map[uint64]*Stream conns map[peer.ID]int nextID uint64 ifconnmgr.ConnManager } // Register registers a stream to the registry func (r *StreamRegistry) Register(streamInfo *Stream) { r.Lock() defer r.Unlock() r.ConnManager.TagPeer(streamInfo.peer, CMGR_TAG, 20) r.conns[streamInfo.peer]++ streamInfo.id = r.nextID r.Streams[r.nextID] = streamInfo r.nextID++ } // Deregister deregisters stream from the registry func (r *StreamRegistry) Deregister(streamID uint64) { r.Lock() defer r.Unlock() s, ok := r.Streams[streamID] if !ok { return } p := s.peer r.conns[p]-- if r.conns[p] < 1 { delete(r.conns, p) r.ConnManager.UntagPeer(p, CMGR_TAG) } delete(r.Streams, streamID) } // close closes stream endpoints and deregisters it func (r *StreamRegistry) Close(s *Stream) error { s.Local.Close() s.Remote.Close() s.Registry.Deregister(s.id) return nil } // reset closes stream endpoints and deregisters it func (r *StreamRegistry) Reset(s *Stream) error { s.Local.Close() s.Remote.Reset() s.Registry.Deregister(s.id) return nil }