Skip to content
This repository was archived by the owner on Mar 28, 2023. It is now read-only.

Ethereum into Mobile #2044

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ipfs/pointers.go
Original file line number Diff line number Diff line change
@@ -89,6 +89,7 @@ func PutPointerToPeer(dht *routing.IpfsDHT, ctx context.Context, peer peer.ID, p

func GetPointersFromPeer(dht *routing.IpfsDHT, ctx context.Context, p peer.ID, key *cid.Cid) ([]*ps.PeerInfo, error) {
pmes := dhtpb.NewMessage(dhtpb.Message_GET_PROVIDERS, key.Bytes(), 0)
log.Debugf("Fetching pointers from: %v\n", p.Pretty())
resp, err := dht.SendRequest(ctx, p, pmes)
if err != nil {
return []*ps.PeerInfo{}, err
3 changes: 2 additions & 1 deletion mobile/node.go
Original file line number Diff line number Diff line change
@@ -480,12 +480,13 @@ func (n *Node) start() error {
SendAck: n.OpenBazaarNode.SendOfflineAck,
SendError: n.OpenBazaarNode.SendError,
})
go MR.ResetPointerList()
go MR.Run()
n.OpenBazaarNode.MessageRetriever = MR
PR := rep.NewPointerRepublisher(n.OpenBazaarNode.DHT, n.OpenBazaarNode.Datastore, n.OpenBazaarNode.PushNodes, n.OpenBazaarNode.IsModerator)
go PR.Run()
n.OpenBazaarNode.PointerRepublisher = PR
MR.Wait()
// MR.Wait()

n.OpenBazaarNode.PublishLock.Unlock()
publishUnlocked = true
31 changes: 27 additions & 4 deletions net/retriever/retriever.go
Original file line number Diff line number Diff line change
@@ -29,7 +29,11 @@ import (

const DefaultPointerPrefixLength = 14

var log = logging.MustGetLogger("retriever")
var (
// Initialize a clear pointerList for the DHT on start
pointerList = []string{}
log = logging.MustGetLogger("retriever")
)

type MRConfig struct {
Db repo.Datastore
@@ -66,6 +70,20 @@ type offlineMessage struct {
env pb.Envelope
}

func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}

// Reset on startup
func (m *MessageRetriever) ResetPointerList() {
pointerList = []string{}
}

func NewMessageRetriever(cfg MRConfig) *MessageRetriever {
var client *http.Client
if cfg.Dialer != nil {
@@ -100,8 +118,8 @@ func (m *MessageRetriever) Run() {
peers := time.NewTicker(time.Minute)
defer dht.Stop()
defer peers.Stop()
go m.fetchPointersFromDHT()
go m.fetchPointersFromPushNodes()
go m.fetchPointersFromDHT()
for {
select {
case <-dht.C:
@@ -159,7 +177,9 @@ func (m *MessageRetriever) downloadMessages(peerOut chan ps.PeerInfo) {
inFlight := make(map[string]bool)
// Iterate over the pointers, adding 1 to the waitgroup for each pointer found
for p := range peerOut {
if len(p.Addrs) > 0 && !m.db.OfflineMessages().Has(p.Addrs[0].String()) && !inFlight[p.Addrs[0].String()] {
if len(p.Addrs) > 0 && !m.db.OfflineMessages().Has(p.Addrs[0].String()) && !stringInSlice(p.Addrs[0].String(), pointerList) && !inFlight[p.Addrs[0].String()] {
pointerList = append(pointerList, p.Addrs[0].String())
log.Debugf("Looking for pointer [%v] at %v\n", p.ID.Pretty(), p.Addrs)
inFlight[p.Addrs[0].String()] = true
log.Debugf("Found pointer with location %s", p.Addrs[0].String())
// IPFS
@@ -215,12 +235,15 @@ func (m *MessageRetriever) getPointersFromDataPeersRoutine(peerOut chan ps.PeerI
wg.Add(1)
go func(pid peer.ID) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*35)
defer cancel()
time.Sleep(time.Second * 15)
provs, err := ipfs.GetPointersFromPeer(m.routing, ctx, pid, &k)
if err != nil {
log.Errorf("Could not get pointers from push node because: %v", err)
return
}
log.Debugf("Successfully queried %s for pointers", pid.Pretty())
for _, pi := range provs {
peerOut <- *pi
}
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

kb "gx/ipfs/QmSNE1XryoCMnZCbRaj1D23k6YKCaTQ386eJciu1pAfu8M/go-libp2p-kbucket"
pb "gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht/pb"
@@ -65,13 +66,15 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
// since the query doesnt actually pass our context down
// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
parent := ctx
ctx, _ = context.WithTimeout(ctx, time.Second*3)
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
// For DHT query command
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})

ctx, _ = context.WithTimeout(ctx, time.Second*3)
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
if err != nil {
logger.Debugf("error getting closer peers: %s", err)
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

kb "gx/ipfs/QmSNE1XryoCMnZCbRaj1D23k6YKCaTQ386eJciu1pAfu8M/go-libp2p-kbucket"
cid "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid"
@@ -65,13 +66,15 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
// since the query doesnt actually pass our context down
// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
parent := ctx
ctx, _ = context.WithTimeout(ctx, time.Second*3)
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
// For DHT query command
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})

ctx, _ = context.WithTimeout(ctx, time.Second*3)
pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
if err != nil {
logger.Debugf("error getting closer peers: %s", err)