diff --git a/accounting/app/consumers/task_changes_consumer.rb b/accounting/app/consumers/task_changes_consumer.rb new file mode 100644 index 0000000..ea8d369 --- /dev/null +++ b/accounting/app/consumers/task_changes_consumer.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +# Example consumer that prints messages payloads +class TaskChangesConsumer < ApplicationConsumer + def consume + messages.each do |m| + message = m.payload + puts '-' * 80 + p message + puts '-' * 80 + + process_message(message) + end + end + + private + + def process_message(message) + case message['event_name'] + when 'Task.Created' # CUD + # ignore + when 'Task.Updated' # CUD + task = Task.find_by(public_id: message['data']['public_id']) + return unless task + + task.update!(title: message['data']['title']) + when 'Task.Added' # BE + Task.transaction do + task = Task.create!( + public_id: message['data']['public_id'], + title: message['data']['title'] + ) + user = User.find_or_create_by!(public_id: message['data']['assigned_to']['public_id']) + account = user.accounts.take || user.accounts.create!( + public_id: SecureRandom.uuid, + label: 'Default user account', + currency: Account::DEFAULT_CURRENCY + ) + account.statements.create!( + credit: task.assign_price, + description: "#{task.title} assigned", + ref: task + ) + account.update_balance! + task + end + when 'Task.Assigned' + task = Task.find_by!(public_id: message['data']['public_id']) + user = User.find_by!(public_id: message['data']['assigned_to']['public_id']) + account = user.accounts.take! + account.statements.create!( + credit: task.assign_price, + description: "Task #{task.title} reassigned", + ref: task + ) + account.update_balance! + task + when 'Task.Resolved' # BE + task = Task.find_by!(public_id: message['data']['public_id']) + user = User.find_by!(public_id: message['data']['assigned_to']['public_id']) + account = user.accounts.take + account.statements.create!( + debit: task.resolve_price, + description: "Task #{task.title} resolved", + ref: task + ) + account.update_balance! + task + end + rescue => e + puts "Processing failed: #{e}" + puts message + end +end diff --git a/accounting/app/consumers/user_changes_consumer.rb b/accounting/app/consumers/user_changes_consumer.rb new file mode 100644 index 0000000..9187648 --- /dev/null +++ b/accounting/app/consumers/user_changes_consumer.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +# Example consumer that prints messages payloads +class UserChangesConsumer < ApplicationConsumer + def consume + messages.each do |m| + message = m.payload + puts '-' * 80 + p message + puts '-' * 80 + + case message['event_name'] + when 'Account.Created' + user = fetch_user(message['data']['public_id']) + if user + user.update( + public_id: message['data']['public_id'], + email: message['data']['email'], + full_name: message['data']['full_name'], + role: message['data']['position'] + ) + else + create_user(message['data']) + end + when 'Account.Updated' + fetch_user(message['data']['public_id'])&.update!( + full_name: message['data']['full_name'] + ) + when 'Account.Deleted' + fetch_user(message['data']['public_id'])&.destroy! + # TODO: if you want + when 'Account.RoleChanged' + fetch_user(message['data']['public_id'])&.update!( + role: message['data']['role'] + ) + else + # store events in DB + end + end + end + + # Run anything upon partition being revoked + # def revoked + # end + + # Define here any teardown things you want when Karafka server stops + # def shutdown + # end + + + private + + def fetch_user(p_id) + User.find_by(public_id:p_id) + end + + def create_user(data) + User.transaction do + user = User.create!( + public_id: data['public_id'], + email: data['email'], + full_name: data['full_name'], + role: data['position'] + ) + user.accounts.create!( + public_id: SecureRandom.uuid, + label: 'Default user account', + currency: Account::DEFAULT_CURRENCY + ) + user + end + end +end diff --git a/accounting/app/controllers/accounts_controller.rb b/accounting/app/controllers/accounts_controller.rb new file mode 100644 index 0000000..19b3e69 --- /dev/null +++ b/accounting/app/controllers/accounts_controller.rb @@ -0,0 +1,12 @@ +class AccountsController < ApplicationController + before_action :require_user_logged_in! + + def index + @accounts = current_user.accounts.order(:id) + end + + def show + @account = current_user.accounts.find(params[:id]) + @statements = @account.statements.preload(:ref).order(id: :desc) + end +end diff --git a/accounting/app/helpers/accounts_helper.rb b/accounting/app/helpers/accounts_helper.rb new file mode 100644 index 0000000..d18d062 --- /dev/null +++ b/accounting/app/helpers/accounts_helper.rb @@ -0,0 +1,8 @@ +module AccountsHelper + def today_profit + @today_profit ||= begin + row = Statement.today.task_related.totals.take + (row&.total_credit || 0).abs + (row&.total_debit || 0) + end + end +end diff --git a/accounting/app/helpers/application_helper.rb b/accounting/app/helpers/application_helper.rb index de6be79..4601123 100644 --- a/accounting/app/helpers/application_helper.rb +++ b/accounting/app/helpers/application_helper.rb @@ -1,2 +1,5 @@ module ApplicationHelper + def current_user + request.env['warden'].user + end end diff --git a/accounting/app/models/account.rb b/accounting/app/models/account.rb new file mode 100644 index 0000000..4fb1971 --- /dev/null +++ b/accounting/app/models/account.rb @@ -0,0 +1,12 @@ +class Account < ApplicationRecord + DEFAULT_CURRENCY = 'INR' + belongs_to :user + has_many :statements + + def update_balance! + balance_row = statements.totals.take! + update!( + cached_balance: (balance_row.total_credit || 0) + (balance_row.total_debit || 0) + ) + end +end diff --git a/accounting/app/models/auth_provider.rb b/accounting/app/models/auth_provider.rb new file mode 100644 index 0000000..8224f6f --- /dev/null +++ b/accounting/app/models/auth_provider.rb @@ -0,0 +1,3 @@ +class AuthProvider < ApplicationRecord + belongs_to :user +end diff --git a/accounting/app/models/statement.rb b/accounting/app/models/statement.rb new file mode 100644 index 0000000..f2bc7d6 --- /dev/null +++ b/accounting/app/models/statement.rb @@ -0,0 +1,38 @@ +class Statement < ApplicationRecord + belongs_to :account + belongs_to :ref, polymorphic: true + + scope :totals, -> { select('SUM(credit) as total_credit, SUM(debit) as total_debit') } + scope :today, -> { where('DATE(created_at) = CURRENT_DATE') } + scope :for_date, ->(date) { where("DATE(created_at) = ?", date) } + scope :task_related, -> { where(ref_type: 'Task') } + + after_create do + statement = self + event_name = 'Statement.Created' + event = { + event_id: SecureRandom.uuid, + event_version: 1, + event_time: Time.now.to_s, + producer: 'accounting_service', + event_name: event_name, + data: { + owner: {public_id: statement.account.user.public_id}, + description: statement.description, + credit: statement.credit || 0, + debit: statement.debit || 0, + ref_type: statement.ref_type, + ref_public_id: statement.ref.public_id + } + } + + result = SchemaRegistry.validate_event(event, event_name.underscore, version: event[:event_version]) + + if result.success? + KAFKA_PRODUCER.produce_sync(topic: 'statements-stream', payload: event.to_json) + else + puts "Event validation error: #{result.failure}" + puts "Event data: #{event.inspect}" + end + end +end diff --git a/accounting/app/models/task.rb b/accounting/app/models/task.rb new file mode 100644 index 0000000..ca85cd9 --- /dev/null +++ b/accounting/app/models/task.rb @@ -0,0 +1,8 @@ +class Task < ApplicationRecord + def initialize(*, **) + super + + self.assign_price ||= rand(10) - 20 + self.resolve_price ||= rand(20) + 20 + end +end diff --git a/accounting/app/models/user.rb b/accounting/app/models/user.rb new file mode 100644 index 0000000..aa20fd9 --- /dev/null +++ b/accounting/app/models/user.rb @@ -0,0 +1,24 @@ +class User < ApplicationRecord + has_many :auth_providers + has_many :accounts + + def self.close_day(date = Date.yesterday) + User.find_each do |user| + user.accounts.each do |account| + next if account.statements.for_date(date).where(ref: user, description: "Payout for #{date}").exists? + + day_result = account.statements.task_related.for_date(date).totals.take + day_balance = (day_result.total_credit || 0) + (day_result.total_debit || 0) + if day_balance.positive? + account.statements.create!( + credit: -day_balance, + description: "Payout for #{date}", + ref: user, + created_at: date.end_of_day + ) + # Send mail + end + end + end + end +end diff --git a/accounting/app/views/accounts/_account.html.erb b/accounting/app/views/accounts/_account.html.erb new file mode 100644 index 0000000..3b15939 --- /dev/null +++ b/accounting/app/views/accounts/_account.html.erb @@ -0,0 +1,5 @@ + + <%= account.public_id %> + <%= link_to account.label, account %> + <%= account.cached_balance %> + diff --git a/accounting/app/views/accounts/index.html.erb b/accounting/app/views/accounts/index.html.erb new file mode 100644 index 0000000..a146551 --- /dev/null +++ b/accounting/app/views/accounts/index.html.erb @@ -0,0 +1,23 @@ +

Today profit: <%= today_profit %>

+

User accounts (<%= current_user.full_name %>, <%= current_user.public_id %>)

+ +
+ + + + + + + <% @accounts.each do |account| %> + <%= render account %> + + + + +

+ +

+ <% end %> +
Public IDNameBalance
+ +
diff --git a/accounting/app/views/accounts/show.html.erb b/accounting/app/views/accounts/show.html.erb new file mode 100644 index 0000000..1187f83 --- /dev/null +++ b/accounting/app/views/accounts/show.html.erb @@ -0,0 +1,28 @@ +

+ Account # <%= @account.label %> +

+ +
+ + + + + + + + + + + + <% @statements.each do |statement| %> + + + + + + + + <% end %> + +
RefDescCreditDebitDate
<%= statement.ref.class %> #<%= statement.ref.public_id %><%= statement.description %><%= statement.credit %><%= statement.debit %><%= statement.created_at %>
+
diff --git a/accounting/config/routes.rb b/accounting/config/routes.rb index 262ffd5..abee594 100644 --- a/accounting/config/routes.rb +++ b/accounting/config/routes.rb @@ -3,4 +3,11 @@ # Defines the root path route ("/") # root "articles#index" + + get '/logout', to: 'oauth_session#destroy' + get '/auth/:provider/callback', to: 'oauth_session#create' + + resources :accounts, only: %i[index show] do + resources :statements, only: %i[index] + end end diff --git a/accounting/config/schedule.rb b/accounting/config/schedule.rb new file mode 100644 index 0000000..d50de74 --- /dev/null +++ b/accounting/config/schedule.rb @@ -0,0 +1,24 @@ +# Use this file to easily define all of your cron jobs. +# +# It's helpful, but not entirely necessary to understand cron before proceeding. +# http://en.wikipedia.org/wiki/Cron + +# Example: +# +# set :output, "/path/to/my/cron_log.log" +# +# every 2.hours do +# command "/usr/bin/some_great_command" +# runner "MyModel.some_method" +# rake "some:great:rake:task" +# end +# +# every 4.days do +# runner "AnotherModel.prune_old_records" +# end + +# Learn more: http://github.com/javan/whenever + +every 1.day, at: '1:00 am' do + runner "User.close_day" +end diff --git a/accounting/db/migrate/20221008144037_create_users.rb b/accounting/db/migrate/20221008144037_create_users.rb new file mode 100644 index 0000000..d01a771 --- /dev/null +++ b/accounting/db/migrate/20221008144037_create_users.rb @@ -0,0 +1,12 @@ +class CreateUsers < ActiveRecord::Migration[7.0] + def change + create_table :users do |t| + t.string :public_id + t.string :full_name + t.string :email + t.string :role + + t.timestamps + end + end +end diff --git a/accounting/db/migrate/20221008144527_create_auth_providers.rb b/accounting/db/migrate/20221008144527_create_auth_providers.rb new file mode 100644 index 0000000..c7c3f87 --- /dev/null +++ b/accounting/db/migrate/20221008144527_create_auth_providers.rb @@ -0,0 +1,13 @@ +class CreateAuthProviders < ActiveRecord::Migration[7.0] + def change + create_table :auth_providers do |t| + t.integer :user_id + t.string :uid + t.string :provider + t.string :username + t.jsonb :user_info + + t.timestamps + end + end +end diff --git a/accounting/db/migrate/20221016172422_create_accounts.rb b/accounting/db/migrate/20221016172422_create_accounts.rb new file mode 100644 index 0000000..6ad9401 --- /dev/null +++ b/accounting/db/migrate/20221016172422_create_accounts.rb @@ -0,0 +1,13 @@ +class CreateAccounts < ActiveRecord::Migration[7.0] + def change + create_table :accounts do |t| + t.integer :user_id + t.string :public_id + t.string :currency + t.string :label + t.integer :cached_balance + + t.timestamps + end + end +end diff --git a/accounting/db/migrate/20221016172623_create_statements.rb b/accounting/db/migrate/20221016172623_create_statements.rb new file mode 100644 index 0000000..8246ebd --- /dev/null +++ b/accounting/db/migrate/20221016172623_create_statements.rb @@ -0,0 +1,13 @@ +class CreateStatements < ActiveRecord::Migration[7.0] + def change + create_table :statements do |t| + t.integer :account_id + t.string :description + t.integer :credit + t.integer :debit + t.references :ref, null: false, polymorphic: true + + t.timestamps + end + end +end diff --git a/accounting/db/migrate/20221016172806_create_tasks.rb b/accounting/db/migrate/20221016172806_create_tasks.rb new file mode 100644 index 0000000..9c62aac --- /dev/null +++ b/accounting/db/migrate/20221016172806_create_tasks.rb @@ -0,0 +1,12 @@ +class CreateTasks < ActiveRecord::Migration[7.0] + def change + create_table :tasks do |t| + t.string :public_id + t.string :title + t.integer :assign_price + t.integer :resolve_price + + t.timestamps + end + end +end diff --git a/accounting/db/schema.rb b/accounting/db/schema.rb new file mode 100644 index 0000000..a6af854 --- /dev/null +++ b/accounting/db/schema.rb @@ -0,0 +1,67 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[7.0].define(version: 2022_10_16_172806) do + # These are extensions that must be enabled in order to support this database + enable_extension "plpgsql" + + create_table "accounts", force: :cascade do |t| + t.integer "user_id" + t.string "public_id" + t.string "currency" + t.string "label" + t.integer "cached_balance" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + + create_table "auth_providers", force: :cascade do |t| + t.integer "user_id" + t.string "uid" + t.string "provider" + t.string "username" + t.jsonb "user_info" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + + create_table "statements", force: :cascade do |t| + t.integer "account_id" + t.string "description" + t.integer "credit" + t.integer "debit" + t.string "ref_type", null: false + t.bigint "ref_id", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["ref_type", "ref_id"], name: "index_statements_on_ref" + end + + create_table "tasks", force: :cascade do |t| + t.string "public_id" + t.string "title" + t.integer "assign_price" + t.integer "resolve_price" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + + create_table "users", force: :cascade do |t| + t.string "public_id" + t.string "full_name" + t.string "email" + t.string "role" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + +end diff --git a/accounting/karafka.rb b/accounting/karafka.rb new file mode 100644 index 0000000..5a17262 --- /dev/null +++ b/accounting/karafka.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +class KarafkaApp < Karafka::App + setup do |config| + config.kafka = { 'bootstrap.servers': '127.0.0.1:9092' } + config.client_id = 'acc_app' + # Recreate consumers with each batch. This will allow Rails code reload to work in the + # development mode. Otherwise Karafka process would not be aware of code changes + config.consumer_persistence = !Rails.env.development? + end + + # Comment out this part if you are not using instrumentation and/or you are not + # interested in logging events for certain environments. Since instrumentation + # notifications add extra boilerplate, if you want to achieve max performance, + # listen to only what you really need for given environment. + # Karafka.monitor.subscribe(Karafka::Instrumentation::LoggerListener.new) + # Karafka.monitor.subscribe(Karafka::Instrumentation::ProctitleListener.new) + + routes.draw do + # Uncomment this if you use Karafka with ActiveJob + # You ned to define the topic per each queue name you use + # active_job_topic :default + consumer_group :real_work do + topic :'accounts-stream' do + consumer UserChangesConsumer + end + + topic :'accounts' do + consumer UserChangesConsumer + end + + topic :'tasks' do + consumer TaskChangesConsumer + end + + topic :'tasks-stream' do + consumer TaskChangesConsumer + end + end + end +end diff --git a/analytics/app/consumers/account_changes_consumer.rb b/analytics/app/consumers/account_changes_consumer.rb new file mode 100644 index 0000000..f002034 --- /dev/null +++ b/analytics/app/consumers/account_changes_consumer.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +# Example consumer that prints messages payloads +class AccountChangesConsumer < ApplicationConsumer + def consume + messages.each do |m| + message = m.payload + puts '-' * 80 + p message + puts '-' * 80 + + process_message(message) + end + end + + # Run anything upon partition being revoked + # def revoked + # end + + # Define here any teardown things you want when Karafka server stops + # def shutdown + # end + + + private + + def process_message(message) + case message['event_name'] + when 'Account.Created' + account = fetch_account(message['data']['public_id']) || Account.new + account.update( + public_id: message['data']['public_id'], + email: message['data']['email'], + full_name: message['data']['full_name'], + role: message['data']['position'] + ) + when 'Account.Updated' + fetch_account(message['data']['public_id'])&.update!( + full_name: message['data']['full_name'] + ) + when 'Account.Deleted' + fetch_account(message['data']['public_id'])&.destroy! + # TODO: if you want + when 'Account.RoleChanged' + fetch_account(message['data']['public_id'])&.update!( + role: message['data']['role'] + ) + else + # store events in DB + end + end + + def fetch_account(p_id) + Account.find_by(public_id:p_id) + end +end diff --git a/analytics/app/consumers/statement_changes_consumer.rb b/analytics/app/consumers/statement_changes_consumer.rb new file mode 100644 index 0000000..626960e --- /dev/null +++ b/analytics/app/consumers/statement_changes_consumer.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +# Example consumer that prints messages payloads +class StatementChangesConsumer < ApplicationConsumer + def consume + messages.each do |m| + message = m.payload + puts '-' * 80 + p message + puts '-' * 80 + + process_message(message) + end + end + + private + + def process_message(message) + case message['event_name'] + when 'Statement.Created' + Statement.create!( + account_public_id: message['data']['owner']['public_id'], + description: message['data']['description'], + credit: message['data']['credit'], + debit: message['data']['debit'], + ref_type: message['data']['ref_type'], + ref_public_id: message['data']['ref_public_id'] + ) + else + # store events in DB + end + rescue => e + puts "Processing failed: #{e}" + puts message + end +end diff --git a/analytics/app/consumers/task_changes_consumer.rb b/analytics/app/consumers/task_changes_consumer.rb new file mode 100644 index 0000000..dd6f0b0 --- /dev/null +++ b/analytics/app/consumers/task_changes_consumer.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +# Example consumer that prints messages payloads +class TaskChangesConsumer < ApplicationConsumer + def consume + messages.each do |m| + message = m.payload + puts '-' * 80 + p message + puts '-' * 80 + + process_message(message) + end + end + + private + + def process_message(message) + case message['event_name'] + when 'Task.Created' + Task.create!( + public_id: message['data']['public_id'], + title: message['data']['title'], + state: message['data']['state'] + ) + when 'Task.Updated' # CUD + task = Task.find_by(public_id: message['data']['public_id']) + return unless task + + task.update!( + public_id: message['data']['public_id'], + title: message['data']['title'], + state: message['data']['state'] + ) + when 'Task.Resolved' # BE + task = Task.find_by(public_id: message['data']['public_id']) + return unless task + + task.update!(state: 'resolved') + else + # log + end + rescue => e + puts "Processing failed: #{e}" + puts message + end +end diff --git a/analytics/app/controllers/application_controller.rb b/analytics/app/controllers/application_controller.rb index 09705d1..d0d13f0 100644 --- a/analytics/app/controllers/application_controller.rb +++ b/analytics/app/controllers/application_controller.rb @@ -1,2 +1,9 @@ class ApplicationController < ActionController::Base + def current_account + request.env['warden'].user + end + + def require_user_logged_in! + redirect_to '/' unless current_account.admin? + end end diff --git a/analytics/app/controllers/dashboard_controller.rb b/analytics/app/controllers/dashboard_controller.rb new file mode 100644 index 0000000..d60eebd --- /dev/null +++ b/analytics/app/controllers/dashboard_controller.rb @@ -0,0 +1,7 @@ +class DashboardController < ApplicationController + before_action :require_user_logged_in! + + def index + + end +end diff --git a/analytics/app/controllers/oauth_session_controller.rb b/analytics/app/controllers/oauth_session_controller.rb new file mode 100644 index 0000000..c858ea1 --- /dev/null +++ b/analytics/app/controllers/oauth_session_controller.rb @@ -0,0 +1,49 @@ +class OauthSessionController < ApplicationController + def destroy + request.env['warden'].set_user(nil) + + redirect_to '/' + end + + def create + puts request.env['omniauth.auth'] + account = fetch_account || create_account + + return head(:forbidden) unless account.admin? + + request.env['warden'].set_user(account) + redirect_to '/dashboard' + end + + private + + def fetch_account + auth = AuthProvider.where(provider: request.env['omniauth.auth'].provider, uid: request.env['omniauth.auth'].uid).take + return unless auth + + auth.account + end + + def create_account + Account.transaction do + account = Account.find_by(public_id: payload['info']['public_id']) || Account.create!( + public_id: payload['info']['public_id'], + full_name: payload['info']['full_name'], + email: payload['info']['email'], + role: payload['info']['role'] + ) + account.auth_providers.create!( + uid: payload.uid, + provider: payload.provider, + username: payload['info']['email'], + user_info: payload.to_h + ) + + account + end + end + + def payload + request.env['omniauth.auth'] + end +end diff --git a/analytics/app/helpers/dashboard_helper.rb b/analytics/app/helpers/dashboard_helper.rb new file mode 100644 index 0000000..b46534a --- /dev/null +++ b/analytics/app/helpers/dashboard_helper.rb @@ -0,0 +1,25 @@ +module DashboardHelper + def daily_profit + @daily_profit ||= begin + row = Statement.today.task_related.totals.take + (row&.total_credit || 0).abs + (row&.total_debit || 0) + end + end + + def credited_accounts + @credited_accounts ||= begin + grouped_data = Statement.today.select('account_public_id, SUM(credit) as total_credit, SUM(debit) as total_debit') + .today.task_related.group(:account_public_id) + + grouped_data.count { (_1.total_credit + _1.total_debit).negative? } + end + end + + def top_tasks + @top_tasks = begin + Statement.task_related.where("debit > 0").preload(:task) + .group_by(&:execution_date) + .transform_values { _1.max_by(&:debit) } + end + end +end diff --git a/analytics/app/models/account.rb b/analytics/app/models/account.rb new file mode 100644 index 0000000..f740cea --- /dev/null +++ b/analytics/app/models/account.rb @@ -0,0 +1,7 @@ +class Account < ApplicationRecord + has_many :auth_providers + + def admin? + role == 'admin' + end +end diff --git a/analytics/app/models/auth_provider.rb b/analytics/app/models/auth_provider.rb new file mode 100644 index 0000000..dcaf4a9 --- /dev/null +++ b/analytics/app/models/auth_provider.rb @@ -0,0 +1,3 @@ +class AuthProvider < ApplicationRecord + belongs_to :account +end diff --git a/analytics/app/models/statement.rb b/analytics/app/models/statement.rb new file mode 100644 index 0000000..04be911 --- /dev/null +++ b/analytics/app/models/statement.rb @@ -0,0 +1,12 @@ +class Statement < ApplicationRecord + belongs_to :task, foreign_key: :ref_public_id, primary_key: :public_id + + scope :totals, -> { select('SUM(credit) as total_credit, SUM(debit) as total_debit') } + scope :today, -> { where('DATE(created_at) = CURRENT_DATE') } + scope :for_date, ->(date) { where("DATE(created_at) = ?", date) } + scope :task_related, -> { where(ref_type: 'Task') } + + def execution_date + created_at.to_date + end +end diff --git a/analytics/app/models/task.rb b/analytics/app/models/task.rb new file mode 100644 index 0000000..3c23424 --- /dev/null +++ b/analytics/app/models/task.rb @@ -0,0 +1,2 @@ +class Task < ApplicationRecord +end diff --git a/analytics/app/views/dashboard/index.html.erb b/analytics/app/views/dashboard/index.html.erb new file mode 100644 index 0000000..28525ad --- /dev/null +++ b/analytics/app/views/dashboard/index.html.erb @@ -0,0 +1,13 @@ +

Dashboard

+ +

Daily profit - <%= daily_profit %>

+

Debtors - <%= credited_accounts %>

+

Top tasks

+ + diff --git a/analytics/config/routes.rb b/analytics/config/routes.rb index 262ffd5..5046e8c 100644 --- a/analytics/config/routes.rb +++ b/analytics/config/routes.rb @@ -3,4 +3,8 @@ # Defines the root path route ("/") # root "articles#index" + + get '/logout', to: 'oauth_session#destroy' + get '/auth/:provider/callback', to: 'oauth_session#create' + get '/dashboard', to: 'dashboard#index' end diff --git a/analytics/db/migrate/20221008144037_create_accounts.rb b/analytics/db/migrate/20221008144037_create_accounts.rb new file mode 100644 index 0000000..a144330 --- /dev/null +++ b/analytics/db/migrate/20221008144037_create_accounts.rb @@ -0,0 +1,12 @@ +class CreateAccounts < ActiveRecord::Migration[7.0] + def change + create_table :accounts do |t| + t.string :public_id + t.string :full_name + t.string :email + t.string :role + + t.timestamps + end + end +end diff --git a/analytics/db/migrate/20221008144527_create_auth_providers.rb b/analytics/db/migrate/20221008144527_create_auth_providers.rb new file mode 100644 index 0000000..b52e523 --- /dev/null +++ b/analytics/db/migrate/20221008144527_create_auth_providers.rb @@ -0,0 +1,13 @@ +class CreateAuthProviders < ActiveRecord::Migration[7.0] + def change + create_table :auth_providers do |t| + t.integer :account_id + t.string :uid + t.string :provider + t.string :username + t.jsonb :user_info + + t.timestamps + end + end +end diff --git a/analytics/db/migrate/20221016172623_create_statements.rb b/analytics/db/migrate/20221016172623_create_statements.rb new file mode 100644 index 0000000..6025652 --- /dev/null +++ b/analytics/db/migrate/20221016172623_create_statements.rb @@ -0,0 +1,14 @@ +class CreateStatements < ActiveRecord::Migration[7.0] + def change + create_table :statements do |t| + t.string :account_public_id + t.string :description + t.integer :credit, default: 0 + t.integer :debit, default: 0 + t.string :ref_type + t.string :ref_public_id + + t.timestamps + end + end +end diff --git a/analytics/db/migrate/20221016172806_create_tasks.rb b/analytics/db/migrate/20221016172806_create_tasks.rb new file mode 100644 index 0000000..38d0117 --- /dev/null +++ b/analytics/db/migrate/20221016172806_create_tasks.rb @@ -0,0 +1,11 @@ +class CreateTasks < ActiveRecord::Migration[7.0] + def change + create_table :tasks do |t| + t.string :public_id + t.string :title + t.string :state + + t.timestamps + end + end +end diff --git a/analytics/db/schema.rb b/analytics/db/schema.rb new file mode 100644 index 0000000..c39714e --- /dev/null +++ b/analytics/db/schema.rb @@ -0,0 +1,55 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[7.0].define(version: 2022_10_16_172806) do + # These are extensions that must be enabled in order to support this database + enable_extension "plpgsql" + + create_table "accounts", force: :cascade do |t| + t.string "public_id" + t.string "full_name" + t.string "email" + t.string "role" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + + create_table "auth_providers", force: :cascade do |t| + t.integer "account_id" + t.string "uid" + t.string "provider" + t.string "username" + t.jsonb "user_info" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + + create_table "statements", force: :cascade do |t| + t.integer "account_public_id" + t.string "description" + t.integer "credit", default: 0 + t.integer "debit", default: 0 + t.string "ref_type" + t.string "ref_public_id" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + + create_table "tasks", force: :cascade do |t| + t.string "public_id" + t.string "title" + t.string "state" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + +end diff --git a/analytics/karafka.rb b/analytics/karafka.rb new file mode 100644 index 0000000..bcbf08b --- /dev/null +++ b/analytics/karafka.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +class KarafkaApp < Karafka::App + setup do |config| + config.kafka = { 'bootstrap.servers': '127.0.0.1:9092' } + config.client_id = 'anal_app' + # Recreate consumers with each batch. This will allow Rails code reload to work in the + # development mode. Otherwise Karafka process would not be aware of code changes + config.consumer_persistence = !Rails.env.development? + end + + # Comment out this part if you are not using instrumentation and/or you are not + # interested in logging events for certain environments. Since instrumentation + # notifications add extra boilerplate, if you want to achieve max performance, + # listen to only what you really need for given environment. + # Karafka.monitor.subscribe(Karafka::Instrumentation::LoggerListener.new) + # Karafka.monitor.subscribe(Karafka::Instrumentation::ProctitleListener.new) + + routes.draw do + # Uncomment this if you use Karafka with ActiveJob + # You ned to define the topic per each queue name you use + # active_job_topic :default + + topic :'accounts-stream' do + consumer AccountChangesConsumer + end + + topic :'accounts' do + consumer AccountChangesConsumer + end + + topic :'tasks' do + consumer TaskChangesConsumer + end + + topic :'tasks-stream' do + consumer TaskChangesConsumer + end + topic :'statements-stream' do + consumer StatementChangesConsumer + end + + end +end diff --git a/auth/app/controllers/accounts_controller.rb b/auth/app/controllers/accounts_controller.rb index 2cad043..7b37471 100644 --- a/auth/app/controllers/accounts_controller.rb +++ b/auth/app/controllers/accounts_controller.rb @@ -27,30 +27,22 @@ def update new_role = @account.role != account_params[:role] ? account_params[:role] : nil if @account.update(account_params) - # ----------------------------- produce event ----------------------- - # Events::AccountUpdated.new(payload).to_h.to_json - event = { - **account_event_data, - event_name: 'AccountUpdated', - data: { - public_id: @account.public_id, - email: @account.email, - full_name: @account.full_name, - position: @account.role - } + payload = { + public_id: @account.public_id, + email: @account.email, + full_name: @account.full_name, + position: @account.role } - result = SchemaRegistry.validate_event(event, 'accounts.updated', version: 1) - - if result.success? - WaterDrop::SyncProducer.call(event.to_json, topic: 'accounts-stream') - else + build_event('accounts-stream', 'Account.Updated', meta: account_event_data, payload: payload) do |topic, event| + WaterDrop::SyncProducer.call(event.to_json, topic: topic) end - # -------------------------------------------------------------------- - - produce_be_event(@account.public_id, new_role) if new_role - # -------------------------------------------------------------------- + if new_role + build_event('accounts', 'Account.RoleChanged', meta: account_event_data, payload: { public_id: @account.public_id, role: new_role }) do |topic, event| + WaterDrop::SyncProducer.call(event.to_json, topic: topic) + end + end format.html { redirect_to root_path, notice: 'Account was successfully updated.' } format.json { render :index, status: :ok, location: @account } @@ -68,18 +60,9 @@ def update def destroy @account.update(active: false, disabled_at: Time.now) - # ----------------------------- produce event ----------------------- - event = { - **account_event_data, - event_name: 'AccountDeleted', - data: { public_id: @account.public_id } - } - result = SchemaRegistry.validate_event(event, 'accounts.deleted', version: 1) - - if result.success? - WaterDrop::SyncProducer.call(event.to_json, topic: 'accounts-stream') + build_event('accounts', 'Account.Deleted', meta: account_event_data, payload: { public_id: @account.public_id }) do |topic, event| + WaterDrop::SyncProducer.call(event.to_json, topic: topic) end - # -------------------------------------------------------------------- respond_to do |format| format.html { redirect_to root_path, notice: 'Account was successfully destroyed.' } @@ -115,16 +98,20 @@ def account_params params.require(:account).permit(:full_name, :role) end - def produce_be_event(public_id, role) + def build_event(topic, event_name, meta:, payload:) event = { - **account_event_data, - event_name: 'AccountRoleChanged', - data: { public_id: public_id, role: role } + **meta, + event_name: event_name, + data: payload } - result = SchemaRegistry.validate_event(event, 'accounts.role_changed', version: 1) + + result = SchemaRegistry.validate_event(event, event_name.underscore, version: event[:event_version]) if result.success? - WaterDrop::SyncProducer.call(event.to_json, topic: 'accounts') + yield topic, event + else + puts "Event validation error: #{result.failure}" + puts "Event data: #{event.inspect}" end end end diff --git a/auth/app/models/account.rb b/auth/app/models/account.rb index f333ba6..1c9edb7 100644 --- a/auth/app/models/account.rb +++ b/auth/app/models/account.rb @@ -30,7 +30,7 @@ class Account < ApplicationRecord event_version: 1, event_time: Time.now.to_s, producer: 'auth_service', - event_name: 'AccountCreated', + event_name: 'Account.Created', data: { public_id: account.public_id, email: account.email, @@ -38,7 +38,7 @@ class Account < ApplicationRecord position: account.role } } - result = SchemaRegistry.validate_event(event, 'accounts.created', version: 1) + result = SchemaRegistry.validate_event(event, 'account.created', version: 1) if result.success? puts "Validation success" diff --git a/event_schema_registry/schemas/accounts/created/1.json b/event_schema_registry/schemas/account/created/1.json similarity index 94% rename from event_schema_registry/schemas/accounts/created/1.json rename to event_schema_registry/schemas/account/created/1.json index 69c06e6..280b146 100644 --- a/event_schema_registry/schemas/accounts/created/1.json +++ b/event_schema_registry/schemas/account/created/1.json @@ -33,7 +33,7 @@ "properties": { "event_id": { "type": "string" }, "event_version": { "enum": [1] }, - "event_name": { "enum": ["AccountCreated"] }, + "event_name": { "enum": ["Account.Created"] }, "event_time": { "type": "string" }, "producer": { "type": "string" }, diff --git a/event_schema_registry/schemas/accounts/deleted/1.json b/event_schema_registry/schemas/account/deleted/1.json similarity index 89% rename from event_schema_registry/schemas/accounts/deleted/1.json rename to event_schema_registry/schemas/account/deleted/1.json index 6ecd49f..1173bbb 100644 --- a/event_schema_registry/schemas/accounts/deleted/1.json +++ b/event_schema_registry/schemas/account/deleted/1.json @@ -1,7 +1,7 @@ { "$schema": "http://json-schema.org/draft-04/schema#", - "title": "Accounts.Deleted.v1", + "title": "Account.Deleted.v1", "description": "json schema for CUD account events (version 1)", "definitions": { @@ -23,7 +23,7 @@ "properties": { "event_id": { "type": "string" }, "event_version": { "enum": [1] }, - "event_name": { "enum": ["AccountDeleted"] }, + "event_name": { "enum": ["Account.Deleted"] }, "event_time": { "type": "string" }, "producer": { "type": "string" }, diff --git a/event_schema_registry/schemas/accounts/role_changed/1.json b/event_schema_registry/schemas/account/role_changed/1.json similarity index 93% rename from event_schema_registry/schemas/accounts/role_changed/1.json rename to event_schema_registry/schemas/account/role_changed/1.json index 8bef89a..af59ac3 100644 --- a/event_schema_registry/schemas/accounts/role_changed/1.json +++ b/event_schema_registry/schemas/account/role_changed/1.json @@ -27,7 +27,7 @@ "properties": { "event_id": { "type": "string" }, "event_version": { "enum": [1] }, - "event_name": { "enum": ["AccountRoleChanged"] }, + "event_name": { "enum": ["Account.RoleChanged"] }, "event_time": { "type": "string" }, "producer": { "type": "string" }, diff --git a/event_schema_registry/schemas/accounts/updated/1.json b/event_schema_registry/schemas/account/updated/1.json similarity index 91% rename from event_schema_registry/schemas/accounts/updated/1.json rename to event_schema_registry/schemas/account/updated/1.json index ca7dbce..f38e7ab 100644 --- a/event_schema_registry/schemas/accounts/updated/1.json +++ b/event_schema_registry/schemas/account/updated/1.json @@ -1,7 +1,7 @@ { "$schema": "http://json-schema.org/draft-04/schema#", - "title": "Accounts.Updated.v1", + "title": "Account.Updated.v1", "description": "json schema for CUD account events (version 1)", "definitions": { @@ -32,7 +32,7 @@ "properties": { "event_id": { "type": "string" }, "event_version": { "enum": [1] }, - "event_name": { "enum": ["AccountUpdated"] }, + "event_name": { "enum": ["Account.Updated"] }, "event_time": { "type": "string" }, "producer": { "type": "string" }, diff --git a/event_schema_registry/schemas/accounts/created/2.json b/event_schema_registry/schemas/accounts/created/2.json deleted file mode 100644 index a3da053..0000000 --- a/event_schema_registry/schemas/accounts/created/2.json +++ /dev/null @@ -1,55 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-04/schema#", - - "title": "Accounts.Created.v2", - "description": "json schema for CUD account events (version 1)", - - "definitions": { - "event_data": { - "type": "object", - "properties": { - "public_id": { - "type": "string" - }, - "email": { - "type": "string" - }, - "first_name": { - "type": ["string", "null"] - }, - "last_name": { - "type": ["string", "null"] - }, - "position": { - "type": ["string", "null"] - } - }, - "required": [ - "public_id", - "email" - ] - } - }, - - "type": "object", - - "properties": { - "event_id": { "type": "string" }, - "event_version": { "enum": [1] }, - "event_name": { "enum": ["AccountCreated"] }, - "event_time": { "type": "string" }, - "producer": { "type": "string" }, - - "data": { "$ref": "#/definitions/event_data" } - }, - - "required": [ - "event_id", - "event_version", - "event_name", - "event_time", - "producer", - "data" - ] -} - diff --git a/event_schema_registry/schemas/statement/created/1.json b/event_schema_registry/schemas/statement/created/1.json new file mode 100644 index 0000000..f0874db --- /dev/null +++ b/event_schema_registry/schemas/statement/created/1.json @@ -0,0 +1,69 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + + "title": "Statement.Created.v1", + "description": "json schema for CUD Statement events (version 1)", + + "definitions": { + "event_data": { + "type": "object", + "properties": { + "owner": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + }, + "description": { + "type": "string" + }, + "credit": { + "type": "integer" + }, + "debit": { + "type": "integer" + }, + "ref_type": { + "type": ["string", "null"], + "enum": ["Task", "User"] + }, + "ref_public_id": { + "type": ["string", "null"] + } + }, + "required": [ + "owner", + "description", + "credit", + "debit", + "ref_type", + "ref_public_id" + ] + } + }, + + "type": "object", + + "properties": { + "event_id": { "type": "string" }, + "event_version": { "enum": [1] }, + "event_name": { "enum": ["Statement.Created"] }, + "event_time": { "type": "string" }, + "producer": { "type": "string" }, + + "data": { "$ref": "#/definitions/event_data" } + }, + + "required": [ + "event_id", + "event_version", + "event_name", + "event_time", + "producer", + "data" + ] +} + diff --git a/event_schema_registry/schemas/task/added/1.json b/event_schema_registry/schemas/task/added/1.json new file mode 100644 index 0000000..1412e6a --- /dev/null +++ b/event_schema_registry/schemas/task/added/1.json @@ -0,0 +1,75 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + + "title": "Tasks.Added.v1", + "description": "json schema for BE task events (version 1)", + + "definitions": { + "event_data": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + }, + "title": { + "type": "string" + }, + "state": { + "type": "string", + "enum": ["pending", "resolved"] + }, + "description": { + "type": ["string", "null"] + }, + "assigned_to": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + }, + "created_by": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + } + }, + "required": [ + "public_id", + "title", + "state", + "description", + "assigned_to", + "created_by" + ] + } + }, + + "type": "object", + + "properties": { + "event_id": { "type": "string" }, + "event_version": { "enum": [1] }, + "event_name": { "enum": ["Task.Added"] }, + "event_time": { "type": "string" }, + "producer": { "type": "string" }, + + "data": { "$ref": "#/definitions/event_data" } + }, + + "required": [ + "event_id", + "event_version", + "event_name", + "event_time", + "producer", + "data" + ] +} + diff --git a/event_schema_registry/schemas/task/added/2.json b/event_schema_registry/schemas/task/added/2.json new file mode 100644 index 0000000..d8f0b3c --- /dev/null +++ b/event_schema_registry/schemas/task/added/2.json @@ -0,0 +1,79 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + + "title": "Tasks.Added.v2", + "description": "json schema for BE task events (version 2)", + + "definitions": { + "event_data": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + }, + "title": { + "type": "string", + "pattern": "[^\\[\\]]" + }, + "jira_id": { + "type": ["string", "null"] + }, + "state": { + "type": "string", + "enum": ["pending", "resolved"] + }, + "description": { + "type": ["string", "null"] + }, + "assigned_to": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + }, + "created_by": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + } + }, + "required": [ + "public_id", + "title", + "state", + "description", + "assigned_to", + "created_by" + ] + } + }, + + "type": "object", + + "properties": { + "event_id": { "type": "string" }, + "event_version": { "enum": [2] }, + "event_name": { "enum": ["Task.Added"] }, + "event_time": { "type": "string" }, + "producer": { "type": "string" }, + + "data": { "$ref": "#/definitions/event_data" } + }, + + "required": [ + "event_id", + "event_version", + "event_name", + "event_time", + "producer", + "data" + ] +} + diff --git a/event_schema_registry/schemas/task/assigned/1.json b/event_schema_registry/schemas/task/assigned/1.json new file mode 100644 index 0000000..b4a10c2 --- /dev/null +++ b/event_schema_registry/schemas/task/assigned/1.json @@ -0,0 +1,52 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + + "title": "Tasks.Assigned.v1", + "description": "json schema for BE task events (version 1)", + + "definitions": { + "event_data": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + }, + "assigned_to": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + } + }, + "required": [ + "public_id", + "assigned_to" + ] + } + }, + + "type": "object", + + "properties": { + "event_id": { "type": "string" }, + "event_version": { "enum": [1] }, + "event_name": { "enum": ["Task.Assigned"] }, + "event_time": { "type": "string" }, + "producer": { "type": "string" }, + + "data": { "$ref": "#/definitions/event_data" } + }, + + "required": [ + "event_id", + "event_version", + "event_name", + "event_time", + "producer", + "data" + ] +} + diff --git a/event_schema_registry/schemas/task/created/1.json b/event_schema_registry/schemas/task/created/1.json new file mode 100644 index 0000000..a5305f0 --- /dev/null +++ b/event_schema_registry/schemas/task/created/1.json @@ -0,0 +1,75 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + + "title": "Tasks.Created.v1", + "description": "json schema for CUD task events (version 1)", + + "definitions": { + "event_data": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + }, + "title": { + "type": "string" + }, + "state": { + "type": "string", + "enum": ["pending", "resolved"] + }, + "description": { + "type": ["string", "null"] + }, + "assigned_to": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + }, + "created_by": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + } + }, + "required": [ + "public_id", + "title", + "state", + "description", + "assigned_to", + "created_by" + ] + } + }, + + "type": "object", + + "properties": { + "event_id": { "type": "string" }, + "event_version": { "enum": [1] }, + "event_name": { "enum": ["Task.Created"] }, + "event_time": { "type": "string" }, + "producer": { "type": "string" }, + + "data": { "$ref": "#/definitions/event_data" } + }, + + "required": [ + "event_id", + "event_version", + "event_name", + "event_time", + "producer", + "data" + ] +} + diff --git a/event_schema_registry/schemas/task/created/2.json b/event_schema_registry/schemas/task/created/2.json new file mode 100644 index 0000000..1b25c69 --- /dev/null +++ b/event_schema_registry/schemas/task/created/2.json @@ -0,0 +1,79 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + + "title": "Tasks.Created.v1", + "description": "json schema for CUD task events (version 1)", + + "definitions": { + "event_data": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + }, + "title": { + "type": "string", + "pattern": "[^\\[\\]]" + }, + "jira_id": { + "type": ["string", "null"] + }, + "state": { + "type": "string", + "enum": ["pending", "resolved"] + }, + "description": { + "type": ["string", "null"] + }, + "assigned_to": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + }, + "created_by": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + } + }, + "required": [ + "public_id", + "title", + "state", + "description", + "assigned_to", + "created_by" + ] + } + }, + + "type": "object", + + "properties": { + "event_id": { "type": "string" }, + "event_version": { "enum": [2] }, + "event_name": { "enum": ["Task.Created"] }, + "event_time": { "type": "string" }, + "producer": { "type": "string" }, + + "data": { "$ref": "#/definitions/event_data" } + }, + + "required": [ + "event_id", + "event_version", + "event_name", + "event_time", + "producer", + "data" + ] +} + diff --git a/event_schema_registry/schemas/task/resolved/1.json b/event_schema_registry/schemas/task/resolved/1.json new file mode 100644 index 0000000..66bef69 --- /dev/null +++ b/event_schema_registry/schemas/task/resolved/1.json @@ -0,0 +1,52 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + + "title": "Tasks.Resolved.v1", + "description": "json schema for BE task events (version 1)", + + "definitions": { + "event_data": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + }, + "assigned_to": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + } + }, + "required": [ + "public_id", + "assigned_to" + ] + } + }, + + "type": "object", + + "properties": { + "event_id": { "type": "string" }, + "event_version": { "enum": [1] }, + "event_name": { "enum": ["Task.Resolved"] }, + "event_time": { "type": "string" }, + "producer": { "type": "string" }, + + "data": { "$ref": "#/definitions/event_data" } + }, + + "required": [ + "event_id", + "event_version", + "event_name", + "event_time", + "producer", + "data" + ] +} + diff --git a/event_schema_registry/schemas/task/updated/1.json b/event_schema_registry/schemas/task/updated/1.json new file mode 100644 index 0000000..9daee35 --- /dev/null +++ b/event_schema_registry/schemas/task/updated/1.json @@ -0,0 +1,75 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + + "title": "Tasks.Updated.v1", + "description": "json schema for CUD task events (version 1)", + + "definitions": { + "event_data": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + }, + "title": { + "type": "string" + }, + "state": { + "type": "string", + "enum": ["pending", "resolved"] + }, + "description": { + "type": ["string", "null"] + }, + "assigned_to": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + }, + "created_by": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + } + }, + "required": [ + "public_id", + "title", + "state", + "description", + "assigned_to", + "created_by" + ] + } + }, + + "type": "object", + + "properties": { + "event_id": { "type": "string" }, + "event_version": { "enum": [1] }, + "event_name": { "enum": ["Task.Updated"] }, + "event_time": { "type": "string" }, + "producer": { "type": "string" }, + + "data": { "$ref": "#/definitions/event_data" } + }, + + "required": [ + "event_id", + "event_version", + "event_name", + "event_time", + "producer", + "data" + ] +} + diff --git a/event_schema_registry/schemas/task/updated/2.json b/event_schema_registry/schemas/task/updated/2.json new file mode 100644 index 0000000..1e997fc --- /dev/null +++ b/event_schema_registry/schemas/task/updated/2.json @@ -0,0 +1,79 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + + "title": "Tasks.Updated.v1", + "description": "json schema for CUD task events (version 1)", + + "definitions": { + "event_data": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + }, + "title": { + "type": "string", + "pattern": "[^\\[\\]]" + }, + "jira_id": { + "type": ["string", "null"] + }, + "state": { + "type": "string", + "enum": ["pending", "resolved"] + }, + "description": { + "type": ["string", "null"] + }, + "assigned_to": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + }, + "created_by": { + "type": "object", + "properties": { + "public_id": { + "type": "string" + } + }, + "required": ["public_id"] + } + }, + "required": [ + "public_id", + "title", + "state", + "description", + "assigned_to", + "created_by" + ] + } + }, + + "type": "object", + + "properties": { + "event_id": { "type": "string" }, + "event_version": { "enum": [2] }, + "event_name": { "enum": ["Task.Updated"] }, + "event_time": { "type": "string" }, + "producer": { "type": "string" }, + + "data": { "$ref": "#/definitions/event_data" } + }, + + "required": [ + "event_id", + "event_version", + "event_name", + "event_time", + "producer", + "data" + ] +} + diff --git a/task_manager/app/consumers/account_changes_consumer.rb b/task_manager/app/consumers/account_changes_consumer.rb index 81f0f3b..c41acbb 100644 --- a/task_manager/app/consumers/account_changes_consumer.rb +++ b/task_manager/app/consumers/account_changes_consumer.rb @@ -10,7 +10,7 @@ def consume puts '-' * 80 case message['event_name'] - when 'AccountCreated' + when 'Account.Created' account = fetch_account(message['data']['public_id']) || Account.new account.update( public_id: message['data']['public_id'], @@ -18,14 +18,14 @@ def consume full_name: message['data']['full_name'], role: message['data']['position'] ) - when 'AccountUpdated' + when 'Account.Updated' fetch_account(message['data']['public_id'])&.update!( full_name: message['data']['full_name'] ) - when 'AccountDeleted' + when 'Account.Deleted' fetch_account(message['data']['public_id'])&.destroy! # TODO: if you want - when 'AccountRoleChanged' + when 'Account.RoleChanged' fetch_account(message['data']['public_id'])&.update!( role: message['data']['role'] ) diff --git a/task_manager/app/controllers/oauth_session_controller.rb b/task_manager/app/controllers/oauth_session_controller.rb index d4671c0..db22cb2 100644 --- a/task_manager/app/controllers/oauth_session_controller.rb +++ b/task_manager/app/controllers/oauth_session_controller.rb @@ -36,6 +36,7 @@ def create_account username: payload['info']['email'], user_info: payload.to_h ) + account end end diff --git a/task_manager/app/controllers/tasks_controller.rb b/task_manager/app/controllers/tasks_controller.rb index a0a2aee..92b9440 100644 --- a/task_manager/app/controllers/tasks_controller.rb +++ b/task_manager/app/controllers/tasks_controller.rb @@ -5,6 +5,7 @@ class TasksController < ApplicationController # GET /tasks or /tasks.json def index @tasks = Task.all.preload(:assigned_to, :created_by).order(:id) + @tasks = @tasks.pending if params[:state] != 'all' end def my @@ -15,18 +16,15 @@ def shuffle events = Task.pending.each_with_object([]) do |t, acc| prev_user_id = t.assigned_to_id t.update!(assigned_to: fetch_account_to_assign) - event = { - **task_event_data, - event_name: 'Task.Assigned', - data: { + if prev_user_id != t.assigned_to_id + payload = { public_id: t.public_id, - assigned_to: { - public_id: t.assigned_to.public_id - } + assigned_to: {public_id: t.assigned_to.public_id} } - } - acc << {topic: 'tasks', payload: event.to_json} if prev_user_id != t.assigned_to_id - end + acc << build_event('tasks-stream', 'Task.Updated', meta: task_event_data, payload: task_data(t)) + acc << build_event('tasks', 'Task.Assigned', meta: task_event_data(1), payload: payload) + end + end.compact KAFKA_PRODUCER.produce_many_sync(events) redirect_to '/tasks' end @@ -52,12 +50,12 @@ def create respond_to do |format| if @task.save - data = task_data(@task) + payload = task_data(@task) KAFKA_PRODUCER.produce_many_sync( [ - {topic: 'tasks-stream', payload: {**task_event_data, event_name: 'Task.Created', data: data}.to_json}, - {topic: 'tasks', payload: {**task_event_data, event_name: 'Task.Added', data: data}.to_json} - ] + build_event('tasks-stream', 'Task.Created', meta: task_event_data, payload: payload), + build_event('tasks', 'Task.Added', meta: task_event_data, payload: payload) + ].compact ) format.html { redirect_to task_url(@task), notice: "Task was successfully created." } @@ -73,21 +71,19 @@ def create def update respond_to do |format| if @task.update(task_params) - data = task_data(@task) - events = [] + events = [ + build_event('tasks-stream', 'Task.Updated', meta: task_event_data, payload: task_data(@task)) + ] - events << {topic: 'tasks-stream', payload: {**task_event_data, event_name: 'Task.Updated', data: data}.to_json} if @task.previous_changes.has_key?('state') && @task.resolved? payload = { public_id: @task.public_id, - assigned_to: { - public_id: @task.assigned_to.public_id - } + assigned_to: {public_id: @task.assigned_to.public_id} } - events << {topic: 'tasks', payload: {**task_event_data, event_name: 'Task.Resolved', data: payload}.to_json} + events << build_event('tasks', 'Task.Resolved', meta: task_event_data(1), payload: payload) end - KAFKA_PRODUCER.produce_many_sync(events) + KAFKA_PRODUCER.produce_many_sync(events.compact) format.html { redirect_to task_url(@task), notice: "Task was successfully updated." } format.json { render :show, status: :ok, location: @task } else @@ -112,27 +108,61 @@ def fetch_account_to_assign Account.executors.random.take end - def task_event_data + def task_event_data(v = 2) { event_id: SecureRandom.uuid, - event_version: 1, + event_version: v, event_time: Time.now.to_s, producer: 'task_manager', } end - def task_data(task) - { - public_id: task.public_id, - title: task.title, - state: task.state, - description: task.description, - assigned_to: { - public_id: task.assigned_to.public_id - }, - created_by: { - public_id: task.created_by.public_id + def task_data(task, version=2) + case version + when 1 + { + public_id: task.public_id, + title: task.old_title, + state: task.state, + description: task.description, + assigned_to: { + public_id: task.assigned_to.public_id + }, + created_by: { + public_id: task.created_by.public_id + } + } + when 2 + { + public_id: task.public_id, + title: task.title, + jira_id: task.jira_id, + state: task.state, + description: task.description, + assigned_to: { + public_id: task.assigned_to.public_id + }, + created_by: { + public_id: task.created_by.public_id + } } + end + end + + def build_event(topic, event_name, meta:, payload:) + event = { + **meta, + event_name: event_name, + data: payload } + + result = SchemaRegistry.validate_event(event, event_name.underscore, version: event[:event_version]) + + if result.success? + {topic: topic, payload: event.to_json} + else + puts "Event validation error: #{result.failure}" + puts "Event data: #{event.inspect}" + end end end diff --git a/task_manager/app/models/task.rb b/task_manager/app/models/task.rb index fe1f7e8..60b912e 100644 --- a/task_manager/app/models/task.rb +++ b/task_manager/app/models/task.rb @@ -16,4 +16,20 @@ def initialize(*, **) self.state ||= :pending self.public_id ||= SecureRandom.uuid end + + def old_title + attributes['title'] + end + + def title + return if attributes['title'].nil? + + attributes['title'].split(/ - /).last + end + + def jira_id + return unless attributes['title'] =~ /\s\-\s/ + + attributes['title'].split(/ - /).first + end end