diff --git a/core/config.go b/core/config.go new file mode 100644 index 0000000..467fcc1 --- /dev/null +++ b/core/config.go @@ -0,0 +1,345 @@ +package core + +import ( + "errors" + "log" + "sync" + "time" + + "github.com/Tapjoy/dynamiq/app/compressor" + "github.com/Tapjoy/dynamiq/app/stats" + "github.com/hashicorp/memberlist" +) + +// StatsConfig represents config info for sending data to any statsd like system +// and the client itself +type StatsConfig struct { + Address string + FlushInterval int + Prefix string + Client *stats.Client +} + +// HTTPConfig represents config info for the HTTP server +type HTTPConfig struct { + APIVersion string + Port uint16 +} + +// DiscoveryConfig represents config info for how Dynamiq nodes discovery eachother via Memberlist +type DiscoveryConfig struct { + Port int + Memberlist *memberlist.Memberlist +} + +// RiakConfig represents config info and the connection pool for Riak +type RiakConfig struct { + Addresses []string + Port uint16 + Service *RiakService + ConfigSyncInterval time.Duration +} + +// Config is the parent struct of all the individual configuration sections +type Config struct { + Riak *RiakConfig + Discovery *DiscoveryConfig + HTTP *HTTPConfig + Stats *StatsConfig + Compressor *compressor.Compressor + Queues *Queues + Topics *Topics + // Channels / Timer for syncing the config + syncScheduler *time.Ticker + syncKiller chan bool +} + +var ( + //ErrUnknownTopic is + ErrUnknownTopic = errors.New("There is no known topic by that name") + + //ErrUnknownTopic is + ErrUnknownQueue = errors.New("There is no known queue by that name") +) + +// GetConfig Parses and returns a config object +func GetConfig() (*Config, error) { + // TODO settle on an actual config package + + discoveryConfig := &DiscoveryConfig{ + Port: 7000, + } + + httpConfig := &HTTPConfig{ + Port: 8081, + APIVersion: "2", + } + + riakConfig := &RiakConfig{ + Addresses: []string{"127.0.0.1"}, + Port: 8087, + ConfigSyncInterval: 2 * time.Second, + } + + rs, err := NewRiakService(riakConfig.Addresses, riakConfig.Port) + if err != nil { + return nil, err + } + riakConfig.Service = rs + + t, err := LoadTopicsFromRiak(riakConfig) + if err != nil { + return nil, err + } + q, err := LoadQueuesFromRiak(riakConfig) + if err != nil { + return nil, err + } + + cfg := &Config{ + Riak: riakConfig, + Discovery: discoveryConfig, + HTTP: httpConfig, + Queues: q, + Topics: t, + syncScheduler: time.NewTicker(riakConfig.ConfigSyncInterval), + syncKiller: make(chan bool), + } + cfg.beginSync() + return cfg, nil +} + +// TopicNames is +func (cfg *Config) TopicNames() []string { + cfg.Topics.configLock.RLock() + list := make([]string, 0) + + for name := range cfg.Topics.KnownTopics { + list = append(list, name) + } + cfg.Topics.configLock.RUnlock() + return list +} + +// GetTopic is +func (cfg *Config) GetTopic(name string) (*Topic, error) { + cfg.Topics.configLock.RLock() + t, ok := cfg.Topics.KnownTopics[name] + cfg.Topics.configLock.RUnlock() + if ok { + return t, nil + } + return nil, ErrUnknownTopic +} + +// TopicNames is +func (cfg *Config) QueueNames() []string { + cfg.Queues.configLock.RLock() + list := make([]string, 0) + + for name := range cfg.Queues.KnownQueues { + list = append(list, name) + } + cfg.Queues.configLock.RUnlock() + return list +} + +func (cfg *Config) GetQueueConfig(name string) (map[string]string, error) { + cfg.Queues.configLock.RLock() + m, err := cfg.Riak.Service.GetQueueConfigMap(name) + if err != nil { + return nil, err + } + results := make(map[string]string, len(m.Registers)) + for k, v := range m.Registers { + results[k] = string(v) + } + cfg.Queues.configLock.RUnlock() + return results, nil +} + +func (cfg *Config) beginSync() { + // Go routine to listen to either the scheduler or the killer + go func(config *Config) { + for { + select { + // Check to see if we have a tick + case <-config.syncScheduler.C: + config.syncConfig() + // Check to see if we've been stopped + case <-config.syncKiller: + config.syncScheduler.Stop() + return + } + } + }(cfg) + return +} + +func (cfg *Config) syncConfig() error { + log.Println("Syncing") + // First - Refresh the list of topics and their metadata + tcfg, err := cfg.Riak.Service.GetTopicsConfigMap() + if err != nil { + if err == ErrConfigMapNotFound { + tcfg, err = cfg.Riak.Service.CreateTopicsConfigMap() + if err != nil { + return err + } + } + return err + } + var oldSet [][]byte + var ok bool + cfg.Topics.configLock.Lock() + defer cfg.Topics.configLock.Unlock() + + if oldSet, ok = tcfg.Sets["topics"]; !ok { + // There were no known topics ? + // No known topics should not prevent the entire sync + // just the topic portion + log.Println(ErrNoKnownTopics) + return ErrNoKnownTopics + } + + if len(oldSet) == 0 { + log.Println(ErrNoKnownTopics) + + return ErrNoKnownTopics + } + + cfg.Topics.Config = tcfg + var newSet [][]byte + if newSet, ok = cfg.Topics.Config.Sets["topics"]; !ok { + log.Println(ErrNoKnownTopics) + + return ErrNoKnownTopics + } + + // If a topic exists in topicsToKeep... + // ... and does exist in Topics, update it's config Object + // ... and does not exist in Topics, initialize it from Riak + // Else + // ... It is not in toKeep and is in Topics, remove it + + topicsToKeep := make(map[string]bool) + for _, topic := range newSet { + tName := string(topic) + // Record this so we know who to evict + topicsToKeep[tName] = true + // Add or Update the topic to the known set + var t *Topic + if t, ok = cfg.Topics.KnownTopics[tName]; !ok { + // Didn't exist in memory, so create it + // TODO centralize this, don't just re-write initialization logic + t = &Topic{ + Name: tName, + configLock: sync.RWMutex{}, + } + } + // get the config and set it on the topic + topcfg, err := cfg.Riak.Service.GetTopicConfigMap(tName) + if err != nil { + // If the config object isn't found - no big deal + // Only config presently is a list of queues, the object will + // be created when the first queue is set + // This is a NOOP, but left in for documentation purposes + } + t.Config = topcfg + cfg.Topics.KnownTopics[tName] = t + } + + // Go through the old list of topics + for _, topic := range oldSet { + tName := string(topic) + if _, ok := topicsToKeep[tName]; !ok { + // It wasn't in the old list, evict it + delete(cfg.Topics.KnownTopics, tName) + } + } + + // Now, do the above but for the queues. Added effect - any queue removed + // must also be removed from a topic + + qcfg, err := cfg.Riak.Service.GetQueuesConfigMap() + if err != nil { + if err != nil { + if err == ErrConfigMapNotFound { + qcfg, err = cfg.Riak.Service.CreateQueuesConfigMap() + if err != nil { + return err + } + } + return err + } + } + + cfg.Queues.configLock.Lock() + defer cfg.Queues.configLock.Unlock() + if oldSet, ok = qcfg.Sets["queues"]; !ok { + // There were no known topics ? + // No known topics should not prevent the entire sync + // just the topic portion + log.Println(ErrNoKnownQueues) + return ErrNoKnownQueues + } + + if len(oldSet) == 0 { + log.Println(ErrNoKnownQueues) + return ErrNoKnownQueues + } + + cfg.Queues.Config = qcfg + if newSet, ok = cfg.Queues.Config.Sets["queues"]; !ok { + log.Println(ErrNoKnownQueues) + return ErrNoKnownQueues + } + + queuesToRemove := make([]string, 0) + queuesToKeep := make(map[string]bool) + + for _, queue := range newSet { + qName := string(queue) + // Record this so we know who to evict + queuesToKeep[qName] = true + // Add or Update the topic to the known set + var q *Queue + if q, ok = cfg.Queues.KnownQueues[qName]; !ok { + // TODO see node in LoadQueuesFromRiak about the need For + // and independent LoadFromRiak method + q, err = LoadQueueFromRiak(cfg.Riak.Service, qName) + if err != nil { + return err + } + } + cfg.Queues.KnownQueues[qName] = q + } + + // Go through the old list of queues + for _, queue := range oldSet { + qName := string(queue) + if _, ok := queuesToKeep[qName]; !ok { + // It wasn't in the old list, evict it + delete(cfg.Queues.KnownQueues, qName) + queuesToRemove = append(queuesToRemove, qName) + } + } + + // Go through any evicted queues, and remove them from topic subscriptions + // 3 loops... there has to be a better way to manage all of this + for _, queue := range queuesToRemove { + for tName, topic := range cfg.Topics.KnownTopics { + for _, q := range topic.Config.Sets["queues"] { + qName := string(q) + if qName == queue { + _, err := cfg.Riak.Service.UpdateTopicSubscription(tName, qName, false) + if err != nil { + return err + } + } + } + } + } + log.Println("Syncing Done") + return nil +} diff --git a/core/queue.go b/core/queue.go new file mode 100644 index 0000000..620b207 --- /dev/null +++ b/core/queue.go @@ -0,0 +1,44 @@ +package core + +import ( + "sync" + + "github.com/StabbyCutyou/partition_ring" + "github.com/basho/riak-go-client" +) + +// Queue represents a bucket in Riak used to hold messages, and the behaviors that +// may be taken over such an object +type Queue struct { + // the definition of a queue + // name of the queue + Name string + // the PartitionRing for this queue + ring *partitionring.PartitionRing + // the RiakService + riakService *RiakService + // Mutex for protecting rw access to the Config object + configLock sync.RWMutex + // Individual settings for the queue + Config *riak.Map +} + +// Define statistics keys suffixes + +// QueueSentStatsSuffix is +const QueueSentStatsSuffix = "sent.count" + +// QueueReceivedStatsSuffix is +const QueueReceivedStatsSuffix = "received.count" + +// QueueDeletedStatsSuffix is +const QueueDeletedStatsSuffix = "deleted.count" + +// QueueDepthStatsSuffix is +const QueueDepthStatsSuffix = "depth.count" + +// QueueDepthAprStatsSuffix is +const QueueDepthAprStatsSuffix = "approximate_depth.count" + +// QueueFillDeltaStatsSuffix +const QueueFillDeltaStatsSuffix = "fill.count" diff --git a/core/queues.go b/core/queues.go new file mode 100644 index 0000000..9a7c730 --- /dev/null +++ b/core/queues.go @@ -0,0 +1,224 @@ +package core + +// Queues represents a collection of Queues, and the behaviors that may be taken +import ( + "errors" + "log" + "strconv" + "sync" + "time" + + "github.com/StabbyCutyou/partition_ring" + "github.com/basho/riak-go-client" +) + +// VisibilityTimeout is the name of the config setting name for controlling how long a message is "inflight" +const VisibilityTimeout = "visibility_timeout" + +// PartitionCount is +const PartitionStep = "partition_step" + +// CompressedMessages is the name of the config setting name for controlling if the queue is using compression or not +const CompressedMessages = "compressed_messages" + +var ( + // ErrQueueAlreadyExists is an error for when you try to create a queue that already exists + ErrQueueAlreadyExists = errors.New("Queue already exists") + // ErrConfigMapNotFound is + ErrConfigMapNotFound = errors.New("The Config map in Riak has not yet been created") + //ErrNoKnownQueues + ErrNoKnownQueues = errors.New("There are no known queues in the system") + // Settings Arrays and maps cannot be made immutable in golang + Settings = [...]string{VisibilityTimeout, PartitionStep, CompressedMessages} + // DefaultSettings is + DefaultSettings = map[string]string{VisibilityTimeout: "5s", PartitionStep: "922337203685477580", CompressedMessages: "false"} +) + +// Queues represents a collection of Queue objects, and the behaviors that may be +// taken over a collection of such objects +type Queues struct { + // Reference to shared riak client + riakService *RiakService + // Mutex for protecting rw access to the Config object + configLock sync.RWMutex + // Settings for Queues in general, ie queue list + Config *riak.Map + // a container for all queues + KnownQueues map[string]*Queue +} + +// LoadQueuesFromRiak is +func LoadQueuesFromRiak(cfg *RiakConfig) (*Queues, error) { + queues := &Queues{ + KnownQueues: make(map[string]*Queue), + riakService: cfg.Service, + configLock: sync.RWMutex{}, + } + + m, err := queues.riakService.GetQueuesConfigMap() + + if err == ErrConfigMapNotFound { + m, err = queues.riakService.CreateQueuesConfigMap() + if err != nil { + return nil, err + } + } else if err != nil { + return nil, err + } + queues.Config = m + + for _, q := range queues.Config.Sets["queues"] { + queueName := string(q) + rQueue, err := LoadQueueFromRiak(queues.riakService, queueName) + if err != nil { + return nil, err + } + queues.KnownQueues[queueName] = rQueue + } + // Need to use a general LoadFromRiak method + + return queues, nil +} + +func LoadQueueFromRiak(rs *RiakService, queueName string) (*Queue, error) { + cfg, err := rs.GetQueueConfigMap(queueName) + if err != nil { + return nil, err + } + + step := cfg.Registers[PartitionStep] + intStep, err := strconv.ParseInt(string(step), 10, 64) + if err != nil { + return nil, err + } + visTimeout := cfg.Registers[VisibilityTimeout] + timeDuration, err := time.ParseDuration(string(visTimeout)) + + if err != nil { + return nil, err + } + + q := &Queue{ + Name: queueName, + Config: cfg, + configLock: sync.RWMutex{}, + riakService: rs, + ring: partitionring.New(0, partitionring.MaxPartitionUpperBound, intStep, timeDuration), + } + + return q, nil +} + +// Create will register a new queue with the default config +// This queue will be available to be used once all the nodes have had their config +// refreshed +func (queues *Queues) Create(queueName string, options map[string]string) (bool, error) { + // This function intentionally not optimized because it should + // not be a high-throughput operation + + if ok, err := queues.Exists(queueName); ok || err != nil { + if err == nil { + return false, ErrQueueAlreadyExists + } + return false, err + } + + // build the operation to update the set + op := &riak.MapOperation{} + op.AddToSet("queues", []byte(queueName)) + _, err := queues.riakService.CreateOrUpdateMap("config", "queues_config", op) + if err != nil { + return false, err + } + + //cfgOps := make([]*riak.MapOperation, 0) + // Create the config + cOp := &riak.MapOperation{} + for name, defaultValue := range DefaultSettings { + if val, ok := options[name]; ok { + cOp.SetRegister(name, []byte(val)) + } else { + cOp.SetRegister(name, []byte(defaultValue)) + } + //cfgOps = append(cfgOps, cOp) + } + + _, err = queues.riakService.CreateOrUpdateMap("config", queueConfigRecordName(queueName), cOp) + if err != nil { + return false, err + } + + return true, nil +} + +// Delete will remove a queue from the system, and remove it from any topics +// that it is subscribed to. This will not delete any un-acknowledged messages +// although in the future it will expand to cover this as well. +func (queues *Queues) Delete(queueName string) (bool, error) { + // This function intentionally not optimized because it should + // not be a high-throughput operation + + // Also, not implemented yet! + return false, nil +} + +// Exists checks is the given queue name is already created or not +func (queues *Queues) Exists(queueName string) (bool, error) { + m, err := queues.riakService.GetQueuesConfigMap() + if err != nil { + return false, err + } + + if set, ok := m.Sets["queues"]; ok { + for _, item := range set { + if queueName == string(item) { + return true, nil + } + } + } + + return false, nil +} + +func (q *Queues) GetMessage(queueName string, id string) (string, error) { + return q.riakService.GetMessage(queueName, id) +} + +func (q *Queues) SaveMessage(queueName string, data string) (string, error) { + return q.riakService.StoreMessage(queueName, data) +} + +// DeleteMessage is +func (queues *Queues) DeleteMessage(name string, id string) (map[string]bool, error) { + return queues.DeleteMessages(name, []string{id}) +} + +// DeleteMessages is +func (queues *Queues) DeleteMessages(name string, ids []string) (map[string]bool, error) { + return queues.riakService.DeleteMessages(name, ids) +} + +// PollMessages does a range scan over the queue bucket and returns a map of message ids to bodies +func (queues *Queues) PollMessages(name string, batchSize uint32) (map[string]string, error) { + queues.configLock.RLock() + defer queues.configLock.RUnlock() + queue, ok := queues.KnownQueues[name] + if !ok { + return nil, ErrUnknownQueue + } + lower, upper, err := queue.ring.ReserveNext() + if err != nil { + log.Println(err) + return nil, err + } + riakObjects, err := queue.riakService.RangeScanMessages(queue.Name, batchSize, lower, upper) + if err != nil { + log.Println(err) + return nil, err + } + results := make(map[string]string, len(riakObjects)) + for _, obj := range riakObjects { + results[obj.Key] = string(obj.Value) + } + return results, nil +} diff --git a/core/riak_service.go b/core/riak_service.go new file mode 100644 index 0000000..fb985dc --- /dev/null +++ b/core/riak_service.go @@ -0,0 +1,408 @@ +package core + +import ( + "crypto/rand" + "fmt" + "math" + "math/big" + "sync" + "time" + + "github.com/basho/riak-go-client" +) + +// MaxMessageID is +var MaxMessageID = *big.NewInt(math.MaxInt64) + +// RiakService is an abstraction over the client, to DRY up some common lookups +type RiakService struct { + Pool *riak.Client +} + +type deletedMessage struct { + key string + deleted bool +} + +// NewRiakService creates a RiakService to use in the app +func NewRiakService(addresses []string, port uint16) (*RiakService, error) { + // Create a new Riak Client + clientOpts := &riak.NewClientOptions{ + RemoteAddresses: addresses, + Port: port, + } + + client, err := riak.NewClient(clientOpts) + + if err != nil { + return nil, err + } + // Slap it into our struct so we can add some common lookups to it + return &RiakService{ + Pool: client, + }, nil +} + +// Execute is lazy method to ferry to the underlying clients Execute +func (rs *RiakService) Execute(cmd riak.Command) error { + return rs.Pool.Execute(cmd) +} + +// GetQueueConfigMap loads the primary queue configuration data from Riak, as a native riak.Map +func (rs *RiakService) GetQueuesConfigMap() (*riak.Map, error) { + return rs.GetMap("queues_config") +} + +// GetTopicConfigMap loads the primary topic configuration data from Riak, as a native riak.Map +func (rs *RiakService) GetTopicsConfigMap() (*riak.Map, error) { + return rs.GetMap("topics_config") +} + +// CreateQueueConfigMap is +func (rs *RiakService) CreateQueuesConfigMap() (*riak.Map, error) { + op := &riak.MapOperation{} + op.SetRegister("created", []byte(time.Now().String())) + + return rs.CreateOrUpdateMap("config", "queues_config", op) +} + +// CreateTopicConfigMap is +func (rs *RiakService) CreateTopicsConfigMap() (*riak.Map, error) { + op := &riak.MapOperation{} + op.SetRegister("created", []byte(time.Now().String())) + + return rs.CreateOrUpdateMap("config", "topics_config", op) +} + +// GetMap loads a CRDT Map from Riak +func (rs *RiakService) GetMap(name string) (*riak.Map, error) { + // TODO solution for how to centralize this knowledge + // few things need to reach into config and they're confined to + // private methods in 2 packages, but still. Magic strings + not DRY + fmc, err := riak.NewFetchMapCommandBuilder(). + WithBucketType("maps"). + WithBucket("config"). + WithKey(name).Build() + + if err != nil { + return nil, err + } + + if err = rs.Execute(fmc); err != nil { + return nil, err + } + res := fmc.(*riak.FetchMapCommand) + if res.Error() != nil { + return nil, res.Error() + } + if res.Response.IsNotFound { + // There will be no error from res.Error() here, as it isn't an error + return nil, ErrConfigMapNotFound + } + return res.Response.Map, nil +} + +// CreateOrUpdateMap does exactly that because thats what the riak lib allows for +func (rs *RiakService) CreateOrUpdateMap(bucket string, key string, op *riak.MapOperation) (*riak.Map, error) { + cmd, err := riak.NewUpdateMapCommandBuilder(). + WithBucketType("maps"). + WithBucket(bucket). + WithReturnBody(true). + WithKey(key). + WithMapOperation(op).Build() + + if err != nil { + return nil, err + } + + if err = rs.Execute(cmd); err != nil { + return nil, err + } + + res := cmd.(*riak.UpdateMapCommand) + return res.Response.Map, res.Error() +} + +// CreateQueueConfig will create a configuration for a new queue +func (rs *RiakService) CreateQueueConfig(queueName string, values map[string]string) (*riak.Map, error) { + op := &riak.MapOperation{} + for key, value := range values { + op.SetRegister(key, []byte(value)) + } + + return rs.CreateOrUpdateMap("config", queueConfigRecordName(queueName), op) +} + +// RangeScanMessages is +func (rs *RiakService) RangeScanMessages(queueName string, numMessages uint32, lowerBound int64, upperBound int64) ([]*riak.Object, error) { + cmd, err := riak.NewSecondaryIndexQueryCommandBuilder(). + WithBucketType("messages"). + WithBucket(queueName). + WithIntRange(lowerBound, upperBound). + WithMaxResults(numMessages). + WithIndexName("id_int").Build() + + if err != nil { + return nil, err + } + + if err = rs.Execute(cmd); err != nil { + return nil, err + } + + res := cmd.(*riak.SecondaryIndexQueryCommand) + if res.Error() != nil { + return nil, res.Error() + } + return rs.lookupMessagesForRangeScanResults(queueName, res.Response.Results) +} + +func (rs *RiakService) GetMessage(queueName string, messageKey string) (string, error) { + cmd, err := riak.NewFetchValueCommandBuilder(). + WithBucketType("messages"). + WithBucket(queueName). + WithKey(messageKey).Build() + + if err != nil { + return "", err + } + if err = rs.Execute(cmd); err != nil { + return "", err + } + + res := cmd.(*riak.FetchValueCommand) + if res.Error() != nil || res.Response.IsNotFound { + return "", res.Error() + } + + if len(res.Response.Values) > 1 { + for _, obj := range res.Response.Values[1:len(res.Response.Values)] { + _, err := rs.StoreMessage(queueName, string(obj.Value)) + if err != nil { + // Couldn't save that Message + // That would mean it's lost + // need to incorporate a retry mechanic + } + } + } + + return string(res.Response.Values[0].Value), nil +} + +func (rs *RiakService) lookupMessagesForRangeScanResults(queueName string, results []*riak.SecondaryIndexQueryResult) ([]*riak.Object, error) { + // Channel for holding the results of the io calls + objChan := make(chan []*riak.Object, len(results)) + // Waitgroup to gate the function completing + wg := &sync.WaitGroup{} + // Seed it with the expected number of ops + wg.Add(len(results)) + for _, item := range results { + key := string(item.ObjectKey) + // Go wide with IO requests, use a channel to communicate with a consumer, below + go func(riakService *RiakService, w *sync.WaitGroup, c chan []*riak.Object, messageKey string) { + defer wg.Done() + cmd, err := riak.NewFetchValueCommandBuilder(). + WithBucketType("messages"). + WithBucket(queueName). + WithKey(messageKey).Build() + + if err != nil { + return // Can't do anything if there was an error. Probably good to log here? + } + + if err = riakService.Execute(cmd); err != nil { + return // Can't do anything? + } + + res := cmd.(*riak.FetchValueCommand) + if res.Error() != nil || res.Response.IsNotFound { + return // ? + } + + c <- res.Response.Values + return + }(rs, wg, objChan, key) + } + + // Kickoff the waitgroup to close the conn once they all report in + // Boy, I hope no weird goroutine scheduling stuff occurs or this could get...racey + go func(waitGroup *sync.WaitGroup, oChan chan []*riak.Object) { + waitGroup.Wait() + close(oChan) + }(wg, objChan) + + // Allocate it all up front to save some time + foundMessages := make([]*riak.Object, len(results)) + // Keep track of how many actually returned, for later + foundCount := 0 + for objs := range objChan { + if len(objs) > 1 { + for _, o := range objs[1:] { + // Rewrite the sibling back into the system + // Message siblings indicate unique id collisions, and should + // be re-published into the system for later delivery + _, err := rs.StoreMessage(queueName, string(o.Value)) + if err != nil { + // Couldn't save that Message + // That would mean it's lost + // need to incorporate a retry mechanic + } + } + } + + foundMessages[foundCount] = objs[0] + foundCount++ + } + // Return only the slice of messages found + return foundMessages[:foundCount], nil +} + +func (rs *RiakService) StoreMessage(queueName string, message string) (string, error) { + randID, err := rand.Int(rand.Reader, &MaxMessageID) + if err != nil { + return "", err + } + + id := randID.String() + + obj := &riak.Object{ + ContentType: "application/json", + Charset: "utf-8", + ContentEncoding: "utf-8", + Value: []byte(message), + } + + obj.AddToIntIndex("id_int", int(randID.Int64())) + + cmd, err := riak.NewStoreValueCommandBuilder(). + WithBucketType("messages"). + WithBucket(queueName). + WithKey(id). + WithContent(obj).Build() + + if err != nil { + return "", err + } + + if err = rs.Execute(cmd); err != nil { + return "", err + } + + res := cmd.(*riak.StoreValueCommand) + if res.Error() != nil { + return "", err + } + + return id, nil +} + +func (rs *RiakService) DeleteMessage(queueName string, key string) (bool, error) { + cmd, err := riak.NewDeleteValueCommandBuilder(). + WithBucketType("messages"). + WithBucket(queueName). + WithKey(key).Build() + + if err != nil { + return false, err + } + + if err = rs.Execute(cmd); err != nil { + return false, err + } + + res := cmd.(*riak.DeleteValueCommand) + if res.Error() != nil { + return false, err + } + + return res.Response, nil + +} + +func (rs *RiakService) DeleteMessages(queueName string, keys []string) (map[string]bool, error) { + // Channel for holding the results of the io calls + boolChan := make(chan *deletedMessage, len(keys)) + // Waitgroup to gate the function completing + wg := &sync.WaitGroup{} + // Seed it with the expected number of ops + wg.Add(len(keys)) + + results := make(map[string]bool) + + for _, mKey := range keys { + // Kick off a go routine to delete the message + go func(riakService *RiakService, w *sync.WaitGroup, c chan *deletedMessage, messageKey string) { + defer w.Done() + + deleted, err := riakService.DeleteMessage(queueName, messageKey) + if err == nil { + // Pop the results onto the channel + c <- &deletedMessage{key: messageKey, deleted: deleted} + } + return + }(rs, wg, boolChan, mKey) + } + + // Kickoff the waitgroup to close the conn once they all report in + // Boy, I hope no weird goroutine scheduling stuff occurs or this could get...racey + go func(waitGroup *sync.WaitGroup, c chan *deletedMessage) { + waitGroup.Wait() + close(c) + }(wg, boolChan) + + // Harvest until the channel closes + for obj := range boolChan { + results[obj.key] = obj.deleted + } + return results, nil +} + +func (rs *RiakService) UpdateTopicSubscription(topicName string, queueName string, addQueue bool) (bool, error) { + // Probably less awkward to just have the calling code build the cmd but... + // I'm trying to keep all the riak stuff in one place + + // Also - this needs to verify both exist before attempting to map them + // It doesn't, currently + op := &riak.MapOperation{} + if addQueue { + op.AddToSet("queues", []byte(queueName)) + } else { + op.RemoveFromSet("queues", []byte(queueName)) + } + cmd, err := riak.NewUpdateMapCommandBuilder(). + WithBucketType("maps"). + WithBucket("config"). + WithKey(topicConfigRecordName(topicName)). + WithMapOperation(op).Build() + + if err != nil { + return false, err + } + + if err = rs.Execute(cmd); err != nil { + return false, err + } + res := cmd.(*riak.UpdateMapCommand) + if res.Error() != nil { + return false, res.Error() + } + + // If the op is a success, its a success + return res.Success(), nil +} + +func (rs *RiakService) GetQueueConfigMap(queueName string) (*riak.Map, error) { + return rs.GetMap(queueConfigRecordName(queueName)) +} + +func (rs *RiakService) GetTopicConfigMap(topicName string) (*riak.Map, error) { + return rs.GetMap(topicConfigRecordName(topicName)) +} + +func queueConfigRecordName(queueName string) string { + return fmt.Sprintf("queue_%s_config", queueName) +} + +func topicConfigRecordName(topicName string) string { + return fmt.Sprintf("topic_%s_config", topicName) +} diff --git a/core/topic.go b/core/topic.go new file mode 100644 index 0000000..2510ae2 --- /dev/null +++ b/core/topic.go @@ -0,0 +1,31 @@ +package core + +import ( + "sync" + + "github.com/basho/riak-go-client" +) + +// Topic represents a bucket in Riak used to hold messages, and the behaviors that +// may be taken over such an object +type Topic struct { + // the definition of a queue + // name of the queue + Name string + // RiakService for interacting with Riak + // Mutex for protecting rw access to the Config object + configLock sync.RWMutex + // Individual settings for the queue + Config *riak.Map +} + +// GetQueueNames is +func (t *Topic) GetQueueNames() []string { + t.configLock.RLock() + names := make([]string, 0) + for _, q := range t.Config.Sets["queues"] { + names = append(names, string(q)) + } + t.configLock.RUnlock() + return names +} diff --git a/core/topics.go b/core/topics.go new file mode 100644 index 0000000..826f608 --- /dev/null +++ b/core/topics.go @@ -0,0 +1,130 @@ +package core + +import ( + "errors" + "log" + "sync" + "time" + + "github.com/basho/riak-go-client" +) + +// Topics is +type Topics struct { + riakService *RiakService + + syncScheduler *time.Ticker + syncKiller chan bool + + configLock sync.RWMutex + Config *riak.Map + KnownTopics map[string]*Topic +} + +var ( + //ErrNoKnownTopics is + ErrNoKnownTopics = errors.New("There are no known topics in the system") + // ErrTopicAlreadyExists is + ErrTopicAlreadyExists = errors.New("Topic already exists") +) + +// LoadTopicsFromRiak is +func LoadTopicsFromRiak(cfg *RiakConfig) (*Topics, error) { + topics := &Topics{ + KnownTopics: make(map[string]*Topic), + syncScheduler: time.NewTicker(cfg.ConfigSyncInterval), + syncKiller: make(chan bool, 0), + riakService: cfg.Service, + configLock: sync.RWMutex{}, + } + + m, err := topics.riakService.GetTopicsConfigMap() + if err == ErrConfigMapNotFound { + m, err = topics.riakService.CreateTopicsConfigMap() + if err != nil { + return nil, err + } + } else if err != nil { + return nil, err + } + topics.Config = m + return topics, nil +} + +// Create will register a new topic +// This queue will be available to be used once all the nodes have had their config +// refreshed +func (topics *Topics) Create(queueName string) (bool, error) { + // This function intentionally not optimized because it should + // not be a high-throughput operation + + if ok, err := topics.Exists(queueName); ok || err != nil { + if err == nil { + return false, ErrTopicAlreadyExists + } + return false, err + } + // build the operation to update the set + op := &riak.MapOperation{} + op.AddToSet("topics", []byte(queueName)) + _, err := topics.riakService.CreateOrUpdateMap("config", "topics_config", op) + if err != nil { + log.Println(err) + return false, err + } + return true, nil +} + +// Delete will remove a topic from the system. +// this will not delete any queues or messages. +func (topics *Topics) Delete(queueName string) (bool, error) { + // This function intentionally not optimized because it should + // not be a high-throughput operation + return false, nil +} + +// Exists checks is the given topic name is already created or not +func (topics *Topics) Exists(topicName string) (bool, error) { + m, err := topics.riakService.GetTopicsConfigMap() + if err != nil { + return false, err + } + if set, ok := m.Sets["topics"]; ok { + for _, item := range set { + if topicName == string(item) { + return true, nil + } + } + } + return false, nil +} + +func (topics *Topics) SubscribeQueue(topicName string, queueName string) (bool, error) { + // Add the queue to the topic + return topics.riakService.UpdateTopicSubscription(topicName, queueName, true) +} + +func (topics *Topics) UnsubscribeQueue(topicName string, queueName string) (bool, error) { + // Remove the queue from the topic + return topics.riakService.UpdateTopicSubscription(topicName, queueName, false) +} + +func (t *Topics) BroadcastMessage(topicName string, data string) ([]map[string]interface{}, error) { + t.configLock.RLock() + topic, ok := t.KnownTopics[topicName] + if !ok { + return nil, ErrUnknownTopic + } + results := make([]map[string]interface{}, 0) + for _, q := range topic.Config.Sets["queues"] { + queueName := string(q) + id, err := t.riakService.StoreMessage(queueName, data) + result := map[string]interface{}{"id": id, "queue": queueName} + if err != nil { + result[queueName] = err.Error() + } + results = append(results, result) + } + t.configLock.RUnlock() + return results, nil +} diff --git a/dynamiq.go b/dynamiq.go index e80f3cb..7e78b4b 100644 --- a/dynamiq.go +++ b/dynamiq.go @@ -2,12 +2,30 @@ package main import ( "flag" + "log" "github.com/Sirupsen/logrus" "github.com/Tapjoy/dynamiq/app" + "github.com/Tapjoy/dynamiq/core" + "github.com/Tapjoy/dynamiq/server/http/v2" ) +func main2() { + cfg, err := core.GetConfig() + if err != nil { + log.Fatal(err) + } + + httpServer, err := httpv2.New(cfg) + if err != nil { + log.Println(err) + } + httpServer.Listen() +} + func main() { + main2() + return //Get some Command line options configFile := flag.String("c", "./lib/config.gcfg", "location of config file") flag.Parse() diff --git a/server/http/v2/queue_handlers.go b/server/http/v2/queue_handlers.go new file mode 100644 index 0000000..9ccbfbd --- /dev/null +++ b/server/http/v2/queue_handlers.go @@ -0,0 +1,105 @@ +package httpv2 + +import ( + "encoding/json" + "net/http" + "strconv" + + "github.com/Tapjoy/dynamiq/core" + "github.com/gorilla/mux" +) + +func (h *HTTPApi) queueList(w http.ResponseWriter, r *http.Request) { + response(w, http.StatusOK, map[string]interface{}{"queues": h.context.QueueNames()}) +} + +func (h *HTTPApi) queueDetails(w http.ResponseWriter, r *http.Request) { + queueName := mux.Vars(r)["queue"] + conf, err := h.context.GetQueueConfig(queueName) + if err != nil { + errorResponse(w, err) + return + } + response(w, http.StatusOK, map[string]interface{}{"queue": queueName, "config": conf}) +} + +func (h *HTTPApi) queueConfigure(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) queueCreate(w http.ResponseWriter, r *http.Request) { + queueName := mux.Vars(r)["queue"] + ok, err := h.context.Queues.Create(queueName, core.DefaultSettings) + if err != nil { + // return 500 for now - should return contextually correct errors otherwise + errorResponse(w, err) + return + } + response(w, http.StatusOK, map[string]interface{}{"queue": queueName, "created": ok}) + +} + +func (h *HTTPApi) queueDelete(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) queueSubmitMessage(w http.ResponseWriter, r *http.Request) { + queueName := mux.Vars(r)["queue"] + msgData := "" // TODO read from body + + id, err := h.context.Queues.SaveMessage(queueName, msgData) + if err != nil { + errorResponse(w, err) + return + } + response(w, http.StatusOK, map[string]interface{}{"queue": queueName, "id": id}) +} + +func (h *HTTPApi) queueGetMessage(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + queueName := vars["queue"] + id := vars["id"] + msg, err := h.context.Queues.GetMessage(queueName, id) + if err != nil { + errorResponse(w, err) + return + } + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(msg) +} + +func (h *HTTPApi) queuePollMessage(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + queueName := vars["queue"] + count := vars["count"] + + uCount, err := strconv.ParseUint(count, 10, 32) + if err != nil { + errorResponse(w, err) + return + } + + msgs, err := h.context.Queues.PollMessages(queueName, uint32(uCount)) + + if err != nil { + errorResponse(w, err) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(msgs) +} + +func (h *HTTPApi) queueDeleteMessage(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + queueName := vars["queue"] + id := vars["id"] + + ok, err := h.context.Queues.DeleteMessage(queueName, id) + if err != nil { + errorResponse(w, err) + return + } + response(w, http.StatusOK, map[string]interface{}{"queue": queueName, "deleted": ok}) + +} diff --git a/server/http/v2/status_handlers.go b/server/http/v2/status_handlers.go new file mode 100644 index 0000000..e7107cb --- /dev/null +++ b/server/http/v2/status_handlers.go @@ -0,0 +1,20 @@ +package httpv2 + +import ( + "encoding/json" + "fmt" + "net/http" +) + +func (h *HTTPApi) statusServers(w http.ResponseWriter, r *http.Request) { + response := make([]string, 0) + for _, member := range h.context.Discovery.Memberlist.Members() { + response = append(response, fmt.Sprintf("Member: %s %s", member.Name, member.Addr)) + } + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) +} + +func (h *HTTPApi) statusPartitionRange(w http.ResponseWriter, r *http.Request) { + +} diff --git a/server/http/v2/topic_handlers.go b/server/http/v2/topic_handlers.go new file mode 100644 index 0000000..3a86b71 --- /dev/null +++ b/server/http/v2/topic_handlers.go @@ -0,0 +1,79 @@ +package httpv2 + +import ( + "encoding/json" + "io/ioutil" + "net/http" + + "github.com/gorilla/mux" +) + +func (h *HTTPApi) topicList(w http.ResponseWriter, r *http.Request) { + response(w, http.StatusOK, map[string]interface{}{"topics": h.context.TopicNames()}) +} + +func (h *HTTPApi) topicDetails(w http.ResponseWriter, r *http.Request) { + topicName := mux.Vars(r)["topic"] + t, err := h.context.GetTopic(topicName) + if err != nil { + errorResponse(w, err) + } + response(w, http.StatusOK, map[string]interface{}{"topic": topicName, "queues": t.GetQueueNames()}) +} + +func (h *HTTPApi) topicCreate(w http.ResponseWriter, r *http.Request) { + topicName := mux.Vars(r)["topic"] + res, err := h.context.Topics.Create(topicName) + if err != nil { + // return 500 for now - should return contextually correct errors otherwise + errorResponse(w, err) + } + response(w, http.StatusOK, map[string]interface{}{"topic": topicName, "created": res}) +} + +func (h *HTTPApi) topicDelete(w http.ResponseWriter, r *http.Request) { + +} + +func (h *HTTPApi) topicSubmitMessage(w http.ResponseWriter, r *http.Request) { + topicName := mux.Vars(r)["topic"] + msgData, err := ioutil.ReadAll(r.Body) + r.Body.Close() + + if err != nil { + errorResponse(w, err) + } + + results, err := h.context.Topics.BroadcastMessage(topicName, string(msgData)) + if err != nil { + errorResponse(w, err) + } + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(results) +} + +func (h *HTTPApi) topicSubscribe(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + topicName := vars["topic"] + queueName := vars["queue"] + + ok, err := h.context.Topics.SubscribeQueue(topicName, queueName) + resp := map[string]interface{}{"topic": topicName, "queue": queueName, "subscribed": ok} + if err != nil { + resp["error"] = err.Error() + } + response(w, http.StatusOK, resp) +} + +func (h *HTTPApi) topicUnsubscribe(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + topicName := vars["topic"] + queueName := vars["queue"] + + ok, err := h.context.Topics.UnsubscribeQueue(topicName, queueName) + resp := map[string]interface{}{"topic": topicName, "queue": queueName, "unsubscribed": ok} + if err != nil { + resp["error"] = err.Error() + } + response(w, http.StatusOK, resp) +} diff --git a/server/http/v2/v2.go b/server/http/v2/v2.go new file mode 100644 index 0000000..783c1d9 --- /dev/null +++ b/server/http/v2/v2.go @@ -0,0 +1,67 @@ +package httpv2 + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + + "github.com/Tapjoy/dynamiq/core" + "github.com/gorilla/mux" +) + +// HTTPApi represents the object used to govern http calls into the system +type HTTPApi struct { + context *core.Config +} + +// New initializes a new +func New(cfg *core.Config) (*HTTPApi, error) { + h := &HTTPApi{ + context: cfg, + } + router := mux.NewRouter().PathPrefix("/v2").Subrouter() + + statusRoutes := router.PathPrefix("/status").Subrouter() + topicRoutes := router.PathPrefix("/topics").Subrouter() + queueRoutes := router.PathPrefix("/queues").Subrouter() + + statusRoutes.HandleFunc("/server", h.statusServers).Methods("GET") + statusRoutes.HandleFunc("/partitionrange", h.statusPartitionRange).Methods("GET") + + topicRoutes.HandleFunc("/", h.topicList).Methods("GET") + topicRoutes.HandleFunc("/{topic}", h.topicDetails).Methods("GET") + topicRoutes.HandleFunc("/{topic}", h.topicCreate).Methods("PUT") + topicRoutes.HandleFunc("/{topic}", h.topicDelete).Methods("DELETE") + topicRoutes.HandleFunc("/{topic}", h.topicSubmitMessage).Methods("POST") + topicRoutes.HandleFunc("/{topic}/queues/{queue}", h.topicSubscribe).Methods("PUT") + topicRoutes.HandleFunc("/{topic}/queues/{queue}", h.topicUnsubscribe).Methods("DELETE") + + queueRoutes.HandleFunc("/", h.queueList).Methods("GET") + queueRoutes.HandleFunc("/{queue}", h.queueDetails).Methods("GET") + queueRoutes.HandleFunc("/{queue}", h.queueConfigure).Methods("PATCH") + queueRoutes.HandleFunc("/{queue}", h.queueCreate).Methods("PUT") + queueRoutes.HandleFunc("/{queue}", h.queueDelete).Methods("DELETE") + queueRoutes.HandleFunc("/{queue}", h.queueSubmitMessage).Methods("POST") + queueRoutes.HandleFunc("/{queue}/{id}", h.queueGetMessage).Methods("GET") + queueRoutes.HandleFunc("/{queue}/{id}", h.queueDeleteMessage).Methods("DELETE") + queueRoutes.HandleFunc("/{queue}/poll/{count}", h.queuePollMessage).Methods("GET") + + http.Handle("/", router) + return h, nil +} + +// Listen is +func (h *HTTPApi) Listen() { + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", h.context.HTTP.Port), nil)) +} + +func response(w http.ResponseWriter, status int, responsePayload map[string]interface{}) { + w.WriteHeader(status) + json.NewEncoder(w).Encode(responsePayload) +} + +func errorResponse(w http.ResponseWriter, err error) { + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(map[string]error{"error": err}) +}