Skip to content
This repository was archived by the owner on Jun 14, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
345 changes: 345 additions & 0 deletions core/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,345 @@
package core

import (
"errors"
"log"
"sync"
"time"

"github.com/Tapjoy/dynamiq/app/compressor"
"github.com/Tapjoy/dynamiq/app/stats"
"github.com/hashicorp/memberlist"
)

// StatsConfig represents config info for sending data to any statsd like system
// and the client itself
type StatsConfig struct {
Address string
FlushInterval int
Prefix string
Client *stats.Client
}

// HTTPConfig represents config info for the HTTP server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how this is split out

type HTTPConfig struct {
APIVersion string
Port uint16
}

// DiscoveryConfig represents config info for how Dynamiq nodes discovery eachother via Memberlist
type DiscoveryConfig struct {
Port int
Memberlist *memberlist.Memberlist
}

// RiakConfig represents config info and the connection pool for Riak
type RiakConfig struct {
Addresses []string
Port uint16
Service *RiakService
ConfigSyncInterval time.Duration
}

// Config is the parent struct of all the individual configuration sections
type Config struct {
Riak *RiakConfig
Discovery *DiscoveryConfig
HTTP *HTTPConfig
Stats *StatsConfig
Compressor *compressor.Compressor
Queues *Queues
Topics *Topics
// Channels / Timer for syncing the config
syncScheduler *time.Ticker
syncKiller chan bool
}

var (
//ErrUnknownTopic is
ErrUnknownTopic = errors.New("There is no known topic by that name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


//ErrUnknownTopic is
ErrUnknownQueue = errors.New("There is no known queue by that name")
)

// GetConfig Parses and returns a config object
func GetConfig() (*Config, error) {
// TODO settle on an actual config package
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point I've come to the general conclusion that there is no universal way to do config in golang, and alot of times it comes down to either rolling you're own or pulling a package that works mostly as you want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea... I punted on it for now. Whatever one we settle on, it barely matters. What matters is that it translates into a config object our app can consume, which is what I focused on here


discoveryConfig := &DiscoveryConfig{
Port: 7000,
}

httpConfig := &HTTPConfig{
Port: 8081,
APIVersion: "2",
}

riakConfig := &RiakConfig{
Addresses: []string{"127.0.0.1"},
Port: 8087,
ConfigSyncInterval: 2 * time.Second,
}

rs, err := NewRiakService(riakConfig.Addresses, riakConfig.Port)
if err != nil {
return nil, err
}
riakConfig.Service = rs

t, err := LoadTopicsFromRiak(riakConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still want these tied to config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably really be "BootstrapConfig" or something. This is a slight mix of load config, and bootstrap some core data. I hear what you're saying, it might be good to break this out... but not sure we "need" to...

I'm open to it, though.

Maybe a GetConfig and a Bootstrap(*Config) method...

if err != nil {
return nil, err
}
q, err := LoadQueuesFromRiak(riakConfig)
if err != nil {
return nil, err
}

cfg := &Config{
Riak: riakConfig,
Discovery: discoveryConfig,
HTTP: httpConfig,
Queues: q,
Topics: t,
syncScheduler: time.NewTicker(riakConfig.ConfigSyncInterval),
syncKiller: make(chan bool),
}
cfg.beginSync()
return cfg, nil
}

// TopicNames is
func (cfg *Config) TopicNames() []string {
cfg.Topics.configLock.RLock()
list := make([]string, 0)

for name := range cfg.Topics.KnownTopics {
list = append(list, name)
}
cfg.Topics.configLock.RUnlock()
return list
}

// GetTopic is
func (cfg *Config) GetTopic(name string) (*Topic, error) {
cfg.Topics.configLock.RLock()
t, ok := cfg.Topics.KnownTopics[name]
cfg.Topics.configLock.RUnlock()
if ok {
return t, nil
}
return nil, ErrUnknownTopic
}

// TopicNames is
func (cfg *Config) QueueNames() []string {
cfg.Queues.configLock.RLock()
list := make([]string, 0)

for name := range cfg.Queues.KnownQueues {
list = append(list, name)
}
cfg.Queues.configLock.RUnlock()
return list
}

func (cfg *Config) GetQueueConfig(name string) (map[string]string, error) {
cfg.Queues.configLock.RLock()
m, err := cfg.Riak.Service.GetQueueConfigMap(name)
if err != nil {
return nil, err
}
results := make(map[string]string, len(m.Registers))
for k, v := range m.Registers {
results[k] = string(v)
}
cfg.Queues.configLock.RUnlock()
return results, nil
}

func (cfg *Config) beginSync() {
// Go routine to listen to either the scheduler or the killer
go func(config *Config) {
for {
select {
// Check to see if we have a tick
case <-config.syncScheduler.C:
config.syncConfig()
// Check to see if we've been stopped
case <-config.syncKiller:
config.syncScheduler.Stop()
return
}
}
}(cfg)
return
}

func (cfg *Config) syncConfig() error {
log.Println("Syncing")
// First - Refresh the list of topics and their metadata
tcfg, err := cfg.Riak.Service.GetTopicsConfigMap()
if err != nil {
if err == ErrConfigMapNotFound {
tcfg, err = cfg.Riak.Service.CreateTopicsConfigMap()
if err != nil {
return err
}
}
return err
}
var oldSet [][]byte
var ok bool
cfg.Topics.configLock.Lock()
defer cfg.Topics.configLock.Unlock()

if oldSet, ok = tcfg.Sets["topics"]; !ok {
// There were no known topics ?
// No known topics should not prevent the entire sync
// just the topic portion
log.Println(ErrNoKnownTopics)
return ErrNoKnownTopics
}

if len(oldSet) == 0 {
log.Println(ErrNoKnownTopics)

return ErrNoKnownTopics
}

cfg.Topics.Config = tcfg
var newSet [][]byte
if newSet, ok = cfg.Topics.Config.Sets["topics"]; !ok {
log.Println(ErrNoKnownTopics)

return ErrNoKnownTopics
}

// If a topic exists in topicsToKeep...
// ... and does exist in Topics, update it's config Object
// ... and does not exist in Topics, initialize it from Riak
// Else
// ... It is not in toKeep and is in Topics, remove it

topicsToKeep := make(map[string]bool)
for _, topic := range newSet {
tName := string(topic)
// Record this so we know who to evict
topicsToKeep[tName] = true
// Add or Update the topic to the known set
var t *Topic
if t, ok = cfg.Topics.KnownTopics[tName]; !ok {
// Didn't exist in memory, so create it
// TODO centralize this, don't just re-write initialization logic
t = &Topic{
Name: tName,
configLock: sync.RWMutex{},
}
}
// get the config and set it on the topic
topcfg, err := cfg.Riak.Service.GetTopicConfigMap(tName)
if err != nil {
// If the config object isn't found - no big deal
// Only config presently is a list of queues, the object will
// be created when the first queue is set
// This is a NOOP, but left in for documentation purposes
}
t.Config = topcfg
cfg.Topics.KnownTopics[tName] = t
}

// Go through the old list of topics
for _, topic := range oldSet {
tName := string(topic)
if _, ok := topicsToKeep[tName]; !ok {
// It wasn't in the old list, evict it
delete(cfg.Topics.KnownTopics, tName)
}
}

// Now, do the above but for the queues. Added effect - any queue removed
// must also be removed from a topic

qcfg, err := cfg.Riak.Service.GetQueuesConfigMap()
if err != nil {
if err != nil {
if err == ErrConfigMapNotFound {
qcfg, err = cfg.Riak.Service.CreateQueuesConfigMap()
if err != nil {
return err
}
}
return err
}
}

cfg.Queues.configLock.Lock()
defer cfg.Queues.configLock.Unlock()
if oldSet, ok = qcfg.Sets["queues"]; !ok {
// There were no known topics ?
// No known topics should not prevent the entire sync
// just the topic portion
log.Println(ErrNoKnownQueues)
return ErrNoKnownQueues
}

if len(oldSet) == 0 {
log.Println(ErrNoKnownQueues)
return ErrNoKnownQueues
}

cfg.Queues.Config = qcfg
if newSet, ok = cfg.Queues.Config.Sets["queues"]; !ok {
log.Println(ErrNoKnownQueues)
return ErrNoKnownQueues
}

queuesToRemove := make([]string, 0)
queuesToKeep := make(map[string]bool)

for _, queue := range newSet {
qName := string(queue)
// Record this so we know who to evict
queuesToKeep[qName] = true
// Add or Update the topic to the known set
var q *Queue
if q, ok = cfg.Queues.KnownQueues[qName]; !ok {
// TODO see node in LoadQueuesFromRiak about the need For
// and independent LoadFromRiak method
q, err = LoadQueueFromRiak(cfg.Riak.Service, qName)
if err != nil {
return err
}
}
cfg.Queues.KnownQueues[qName] = q
}

// Go through the old list of queues
for _, queue := range oldSet {
qName := string(queue)
if _, ok := queuesToKeep[qName]; !ok {
// It wasn't in the old list, evict it
delete(cfg.Queues.KnownQueues, qName)
queuesToRemove = append(queuesToRemove, qName)
}
}

// Go through any evicted queues, and remove them from topic subscriptions
// 3 loops... there has to be a better way to manage all of this
for _, queue := range queuesToRemove {
for tName, topic := range cfg.Topics.KnownTopics {
for _, q := range topic.Config.Sets["queues"] {
qName := string(q)
if qName == queue {
_, err := cfg.Riak.Service.UpdateTopicSubscription(tName, qName, false)
if err != nil {
return err
}
}
}
}
}
log.Println("Syncing Done")
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dig this file, much cleaner - much simpler and a single sync loop. The only item I'd raise as a philosophical question is whether the config type should handle the both the topics/queues + the other config params

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I pulled this all the way up to config, was because it was the only thing that had access to EVERYthing, where I could remove queues from the Queues collection as well as remove them from Topics by way of un-subscribe logic and removing it from the individual topic in memory at the same time.

This, and the RiakService, are a bit god-class-y, and I don't love it. But I also don't see a majorly better approach atm either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed it's much cleaner overall and splitting those two pieces out
probably wouldn't add any benefit - the KISS principle definitely applies

On Wed, Nov 11, 2015 at 4:25 PM, StabbyCutyou [email protected]
wrote:

In core/config.go
#89 (comment):

  • for _, queue := range queuesToRemove {
  •   for tName, topic := range cfg.Topics.KnownTopics {
    
  •       for _, q := range topic.Config.Sets["queues"] {
    
  •           qName := string(q)
    
  •           if qName == queue {
    
  •               _, err := cfg.Riak.Service.UpdateTopicSubscription(tName, qName, false)
    
  •               if err != nil {
    
  •                   return err
    
  •               }
    
  •           }
    
  •       }
    
  •   }
    
  • }
  • log.Println("Syncing Done")
  • return nil
    +}

The reason why I pulled this all the way up to config, was because it was
the only thing that had access to EVERYthing, where I could remove queues
from the Queues collection as well as remove them from Topics by way of
un-subscribe logic and removing it from the individual topic in memory at
the same time.

This, and the RiakService, are a bit god-class-y, and I don't love it. But
I also don't see a majorly better approach atm either.


Reply to this email directly or view it on GitHub
https://github.com/Tapjoy/dynamiq/pull/89/files#r44588386.

44 changes: 44 additions & 0 deletions core/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package core

import (
"sync"

"github.com/StabbyCutyou/partition_ring"
"github.com/basho/riak-go-client"
)

// Queue represents a bucket in Riak used to hold messages, and the behaviors that
// may be taken over such an object
type Queue struct {
// the definition of a queue
// name of the queue
Name string
// the PartitionRing for this queue
ring *partitionring.PartitionRing
// the RiakService
riakService *RiakService
// Mutex for protecting rw access to the Config object
configLock sync.RWMutex
// Individual settings for the queue
Config *riak.Map
}

// Define statistics keys suffixes

// QueueSentStatsSuffix is
const QueueSentStatsSuffix = "sent.count"

// QueueReceivedStatsSuffix is
const QueueReceivedStatsSuffix = "received.count"

// QueueDeletedStatsSuffix is
const QueueDeletedStatsSuffix = "deleted.count"

// QueueDepthStatsSuffix is
const QueueDepthStatsSuffix = "depth.count"

// QueueDepthAprStatsSuffix is
const QueueDepthAprStatsSuffix = "approximate_depth.count"

// QueueFillDeltaStatsSuffix
const QueueFillDeltaStatsSuffix = "fill.count"
Loading