subscriber.go 1.83 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
package responsemanager

import (
	"context"
	"errors"

	"github.com/ipfs/go-graphsync"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/notifications"
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
	"github.com/libp2p/go-libp2p-core/peer"
)

var errNetworkError = errors.New("network error")

type subscriber struct {
	p                     peer.ID
	request               gsmsg.GraphSyncRequest
	ctx                   context.Context
	messages              chan responseManagerMessage
	blockSentListeners    BlockSentListeners
	networkErrorListeners NetworkErrorListeners
	completedListeners    CompletedListeners
}

func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event) {
	responseEvent, ok := event.(peerresponsemanager.Event)
	if !ok {
		return
	}
	blockData, isBlockData := topic.(graphsync.BlockData)
	if isBlockData {
		switch responseEvent.Name {
		case peerresponsemanager.Error:
			s.networkErrorListeners.NotifyNetworkErrorListeners(s.p, s.request, responseEvent.Err)
			select {
			case s.messages <- &errorRequestMessage{s.p, s.request.ID(), errNetworkError, make(chan error, 1)}:
			case <-s.ctx.Done():
			}
		case peerresponsemanager.Sent:
			s.blockSentListeners.NotifyBlockSentListeners(s.p, s.request, blockData)
		}
		return
	}
	status, isStatus := topic.(graphsync.ResponseStatusCode)
	if isStatus {
		switch responseEvent.Name {
		case peerresponsemanager.Error:
			s.networkErrorListeners.NotifyNetworkErrorListeners(s.p, s.request, responseEvent.Err)
			select {
			case s.messages <- &errorRequestMessage{s.p, s.request.ID(), errNetworkError, make(chan error, 1)}:
			case <-s.ctx.Done():
			}
		case peerresponsemanager.Sent:
			s.completedListeners.NotifyCompletedListeners(s.p, s.request, status)
		}
	}
}

func (s *subscriber) OnClose(topic notifications.Topic) {

}