Skip to content
This repository was archived by the owner on Mar 5, 2024. It is now read-only.

Cache old partition states incase of ZooKeeper failure. #65

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ func (vs *version) serveProxied(w http.ResponseWriter, r *http.Request,

// Shuffle the peers, so we try them in a random order.
// TODO: We don't want to blacklist nodes, but we can weight them lower
peers := shuffle(vs.partitions.FindPeers(partition))
rawPeers, disapperedNodes := vs.partitions.FindPeers(partition)
if len(rawPeers) < vs.sequins.config.Sharding.Replication {
rawPeers = append(rawPeers, disapperedNodes...)
}
peers := shuffle(rawPeers)
if len(peers) == 0 {
log.Printf("No peers available for /%s/%s (version %s)", vs.db.name, key, vs.name)
w.WriteHeader(http.StatusBadGateway)
Expand All @@ -70,7 +74,12 @@ func (vs *version) serveProxied(w http.ResponseWriter, r *http.Request,
log.Println("Trying alternate partition for pathological key", key)

resp.Body.Close()
alternatePeers := shuffle(vs.partitions.FindPeers(alternatePartition))
rawPeers, disapperedNodes = vs.partitions.FindPeers(alternatePartition)
if len(rawPeers) < vs.sequins.config.Sharding.Replication {
rawPeers = append(rawPeers, disapperedNodes...)
}

alternatePeers := shuffle(rawPeers)
resp, peer, err = vs.proxy(r, alternatePeers)
}

Expand Down
53 changes: 46 additions & 7 deletions sharding/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Partitions struct {
selected map[int]bool
local map[int]bool
remote map[int][]string
disappeared map[int][]string
numMissing int
readyClosed bool
shouldAdvertise bool
Expand All @@ -55,6 +56,7 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu
replication: replication,
local: make(map[int]bool),
remote: make(map[int][]string),
disappeared: make(map[int][]string, 1024),
}

p.pickLocal()
Expand All @@ -69,6 +71,19 @@ func WatchPartitions(zkWatcher *zk.Watcher, peers *Peers, db, version string, nu
return p
}

// dedupe dedupelicates elements in a slice of strings.
func dedupe(nodes []string) []string {
found := map[string]bool{}
dedupedNodes := make([]string, 0, len(nodes))
for _, node := range nodes {
if !found[node] {
found[node] = true
dedupedNodes = append(dedupedNodes, node)
}
}
return dedupedNodes
}

// pickLocal selects which partitions are local by iterating through
// them all, and checking the hashring to see if this peer is one of the
// replicas.
Expand Down Expand Up @@ -106,18 +121,23 @@ func (p *Partitions) sync(updates chan []string) {
}
}

// FindPeers returns the list of peers who have the given partition available.
func (p *Partitions) FindPeers(partition int) []string {
if p.peers == nil {
return nil
}

// FindPeers returns the list of peers who have the given partition available,
// It also returns a list of disappeared peers,
// that are no longer in Zookeeper
func (p *Partitions) FindPeers(partition int) ([]string, []string) {
p.lock.RLock()
defer p.lock.RUnlock()

disappearedPeers := make([]string, 1024)
copy(disappearedPeers, p.disappeared[partition])

if p.peers == nil {
return nil, disappearedPeers
}

peers := make([]string, len(p.remote[partition]))
copy(peers, p.remote[partition])
return peers
return peers, disappearedPeers
}

// Update updates the list of local partitions to the given list.
Expand Down Expand Up @@ -228,6 +248,25 @@ func (p *Partitions) updateRemote(nodes []string) {
}
}

for partitionId, partition := range p.remote {
disappearedPeers := make([]string, len(partition))
for _, oldPeer := range partition {
found := false
for _, newPeer := range remote[partitionId] {
if newPeer == oldPeer {
found = true
}
}
if !found {
disappearedPeers = append(disappearedPeers, oldPeer)
}
}
p.disappeared[partitionId] = dedupe(append(disappearedPeers, p.disappeared[partitionId]...))
if len(p.disappeared[partitionId]) >= 1024 {
p.disappeared[partitionId] = p.disappeared[partitionId][:1024]
}
}

p.remote = remote
p.updateMissing()
}
Expand Down
23 changes: 23 additions & 0 deletions sharding/partitions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package sharding

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestDedupeRandom(t *testing.T) {
dupe := []string{"4", "1", "2", "1", "3", "2"}
expected := []string{"4", "1", "2", "3"}

deduped := dedupe(dupe)
assert.Equal(t, expected, deduped)
}

func TestDedupe(t *testing.T) {
dupe := []string{"1", "1", "1", "2", "2", "3", "3", "3", "4", "5", "5"}
expected := []string{"1", "2", "3", "4", "5"}

deduped := dedupe(dupe)
assert.Equal(t, expected, deduped)
}