From 2a1ee3ae3a8bfa8d5b9aa45eb86bcad9bae33c77 Mon Sep 17 00:00:00 2001
From: Jeromy <jeromyj@gmail.com>
Date: Wed, 30 Jul 2014 20:16:34 -0700
Subject: [PATCH] use datastore for local data

---
 routing/dht/dht.go | 60 +++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 51 insertions(+), 9 deletions(-)

diff --git a/routing/dht/dht.go b/routing/dht/dht.go
index a1f63f10..0f55ba45 100644
--- a/routing/dht/dht.go
+++ b/routing/dht/dht.go
@@ -3,9 +3,12 @@ package dht
 import (
 	"sync"
 
-	peer "github.com/jbenet/go-ipfs/peer"
-	swarm "github.com/jbenet/go-ipfs/swarm"
-	u "github.com/jbenet/go-ipfs/util"
+	peer	"github.com/jbenet/go-ipfs/peer"
+	swarm	"github.com/jbenet/go-ipfs/swarm"
+	u		"github.com/jbenet/go-ipfs/util"
+
+	ds "github.com/jbenet/datastore.go"
+
 	"code.google.com/p/goprotobuf/proto"
 )
 
@@ -18,8 +21,11 @@ type IpfsDHT struct {
 
 	network *swarm.Swarm
 
-	// local data (TEMPORARY: until we formalize data storage with datastore)
-	data map[string][]byte
+	// Local peer (yourself)
+	self *peer.Peer
+
+	// Local data
+	datastore ds.Datastore
 
 	// map of channels waiting for reply messages
 	listeners  map[uint64]chan *swarm.Message
@@ -29,6 +35,15 @@ type IpfsDHT struct {
 	shutdown chan struct{}
 }
 
+func NewDHT(p *peer.Peer) *IpfsDHT {
+	dht := new(IpfsDHT)
+	dht.self = p
+	dht.network = swarm.NewSwarm(p)
+	dht.listeners = make(map[uint64]chan *swarm.Message)
+	dht.shutdown = make(chan struct{})
+	return dht
+}
+
 // Read in all messages from swarm and handle them appropriately
 // NOTE: this function is just a quick sketch
 func (dht *IpfsDHT) handleMessages() {
@@ -61,10 +76,13 @@ func (dht *IpfsDHT) handleMessages() {
 			case DHTMessage_GET_VALUE:
 				dht.handleGetValue(mes.Peer, pmes)
 			case DHTMessage_PUT_VALUE:
+				dht.handlePutValue(mes.Peer, pmes)
 			case DHTMessage_FIND_NODE:
+				dht.handleFindNode(mes.Peer, pmes)
 			case DHTMessage_ADD_PROVIDER:
 			case DHTMessage_GET_PROVIDERS:
 			case DHTMessage_PING:
+				dht.handleFindNode(mes.Peer, pmes)
 			}
 
 		case <-dht.shutdown:
@@ -74,15 +92,22 @@ func (dht *IpfsDHT) handleMessages() {
 }
 
 func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
-	val, found := dht.data[pmes.GetKey()]
-	if found {
+	dskey := ds.NewKey(pmes.GetKey())
+	i_val, err := dht.datastore.Get(dskey)
+	if err == nil {
 		isResponse := true
 		resp := new(DHTMessage)
 		resp.Response = &isResponse
 		resp.Id = pmes.Id
 		resp.Key = pmes.Key
+
+		val := i_val.([]byte)
 		resp.Value = val
-	} else {
+
+		mes := new(swarm.Message)
+		mes.Peer = p
+		mes.Data = []byte(resp.String())
+	} else if err == ds.ErrNotFound {
 		// Find closest node(s) to desired key and reply with that info
 		// TODO: this will need some other metadata in the protobuf message
 		//			to signal to the querying node that the data its receiving
@@ -90,8 +115,14 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
 	}
 }
 
+// Store a value in this nodes local storage
 func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *DHTMessage) {
-	panic("Not implemented.")
+	dskey := ds.NewKey(pmes.GetKey())
+	err := dht.datastore.Put(dskey, pmes.GetValue())
+	if err != nil {
+		// For now, just panic, handle this better later maybe
+		panic(err)
+	}
 }
 
 func (dht *IpfsDHT) handleFindNode(p *peer.Peer, pmes *DHTMessage) {
@@ -121,6 +152,17 @@ func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
 	return lchan
 }
 
+func (dht *IpfsDHT) Unlisten(mesid uint64) {
+	dht.listenLock.Lock()
+	ch, ok := dht.listeners[mesid]
+	if ok {
+		delete(dht.listeners, mesid)
+	}
+	dht.listenLock.Unlock()
+	close(ch)
+}
+
+
 // Stop all communications from this node and shut down
 func (dht *IpfsDHT) Halt() {
 	dht.shutdown <- struct{}{}
-- 
GitLab