Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions backend/internal/api/app/app_dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,27 +392,27 @@ func (app *App) createWorkers() workers.Workers {
invoiceRepo := corepersistence.NewGormInvoiceRepository(app.core.db)
contractsRepo := corepersistence.NewGormUserContractDataRepository(app.core.db)

billingService := services.NewBillingService(
userRepo, contractsRepo, transferRecordsRepo, clusterRepo,
app.infra.graphql, app.infra.gridClient,
uint64(app.config.MinimumTFTAmountInWallet), services.Discount(app.config.AppliedDiscount),
)

workersService := services.NewWorkersService(
app.core.appCtx, userRepo, contractsRepo, invoiceRepo, clusterRepo, transferRecordsRepo,
app.communication.mailService, app.infra.gridClient, app.core.ewfEngine,
app.communication.notificationDispatcher, app.infra.graphql, app.infra.firesquidClient,
app.config.Invoice, app.config.SystemAccount.Mnemonic,
billingService, app.config.Invoice,
app.config.Currency, app.config.ClusterHealthCheckIntervalInHours,
app.config.NodeHealthCheck.ReservedNodeHealthCheckIntervalInHours,
app.config.NodeHealthCheck.ReservedNodeHealthCheckTimeoutInMinutes,
app.config.NodeHealthCheck.ReservedNodeHealthCheckWorkersNum,
app.config.SettleTransferRecordsIntervalInMinutes,
app.config.NotifyAdminsForPendingRecordsInHours,
app.config.MinimumTFTAmountInWallet, services.Discount(app.config.AppliedDiscount),
app.config.MinimumTFTAmountInWallet,
app.config.UsersBalanceCheckIntervalInHours,
app.config.CheckUserDebtIntervalInHours,
)

billingService := services.NewBillingService(
userRepo, contractsRepo, transferRecordsRepo, clusterRepo,
app.infra.graphql, app.infra.gridClient,
uint64(app.config.MinimumTFTAmountInWallet), services.Discount(app.config.AppliedDiscount),
)

return workers.NewWorkers(app.core.appCtx, workersService, billingService, app.core.metrics, app.core.db)
}
34 changes: 0 additions & 34 deletions backend/internal/api/handlers/deployment_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,23 +224,6 @@ func (h *DeploymentHandler) HandleDeployCluster(c *gin.Context) {
return
}

user, err := h.svc.GetUserByID(config.UserID)
if err != nil {
if errors.Is(err, models.ErrUserNotFound) {
NotFound(c, "User not found")
return
}
reqLog.Error().Err(err).Send()
InternalServerError(c)
return
}

if err := h.billingService.FundUserToFulfillDiscount(c.Request.Context(), &user, nil, cluster.Nodes); err != nil {
reqLog.Error().Err(err).Send()
InternalServerError(c)
return
}

projectName := kubedeployer.GetProjectName(config.UserID, cluster.Name)
logWithProject := reqLog.With().Str("project_name", projectName).Logger()
reqLog = &logWithProject
Expand Down Expand Up @@ -428,23 +411,6 @@ func (h *DeploymentHandler) HandleAddNode(c *gin.Context) {
}
}

user, err := h.svc.GetUserByID(config.UserID)
if err != nil {
if errors.Is(err, models.ErrUserNotFound) {
NotFound(c, "User not found")
return
}
reqLog.Error().Err(err).Send()
InternalServerError(c)
return
}

if err := h.billingService.FundUserToFulfillDiscount(c.Request.Context(), &user, nil, cluster.Nodes); err != nil {
reqLog.Error().Err(err).Send()
InternalServerError(c)
return
}

wfUUID, wfStatus, err := h.svc.AsyncAddNode(config, cl, cluster.Nodes[0])
if err != nil {
reqLog.Error().Err(err).Msg("failed to start add node workflow")
Expand Down
6 changes: 0 additions & 6 deletions backend/internal/api/handlers/node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,6 @@ func (h *NodeHandler) ReserveNodeHandler(c *gin.Context) {
return
}

if err := h.billingService.FundUserToFulfillDiscount(c.Request.Context(), &user, nil, nil); err != nil {
reqLog.Error().Err(err).Send()
InternalServerError(c)
return
}

wfUUID, err := h.svc.AsyncReserveNode(userID, user.Mnemonic, nodeID)
if err != nil {
reqLog.Error().Err(err).Msg("failed to start workflow to reserve node")
Expand Down
81 changes: 33 additions & 48 deletions backend/internal/core/services/billing_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func NewBillingService(userRepo models.UserRepository, contractsRepo models.Cont
}
}

func (svc *BillingService) Discount() Discount {
return svc.appliedDiscount
}

func (svc *BillingService) SettleUserUsage(user *models.User) error {
usageInUSDMillicent, err := svc.getUserLatestUsageInUSD(user.ID)
if err != nil {
Expand All @@ -65,92 +69,70 @@ func (svc *BillingService) SettleUserUsage(user *models.User) error {
}

func (svc *BillingService) AfterUserGetCredit(ctx context.Context, user *models.User) error {
if err := svc.CreateTransferRecordToChargeUserWithMinTFTAmount(user.ID, user.Username, user.Mnemonic); err != nil {
if err := svc.FundUserWithTFTs(ctx, user); err != nil {
return err
}

if err := svc.SettleUserUsage(user); err != nil {
return svc.SettleUserUsage(user)
}

func (svc *BillingService) FundUserWithTFTs(ctx context.Context, user *models.User) error {
userGridAccountTFTBalance, err := svc.gridClient.GetFreeBalanceTFT(user.Mnemonic)
if err != nil {
return err
}

return svc.FundUserToFulfillDiscount(ctx, user, nil, nil)
}

func (svc *BillingService) CreateTransferRecordToChargeUserWithMinTFTAmount(userID int, username, userMnemonic string) error {
userTFTBalance, err := svc.gridClient.GetFreeBalanceTFT(userMnemonic)
totalPendingTFTAmount, err := svc.transferRecordsRepo.CalculateTotalPendingTFTAmountPerUser(user.ID)
if err != nil {
return err
}

totalPendingTFTAmount, err := svc.transferRecordsRepo.CalculateTotalPendingTFTAmountPerUser(userID)
userBalanceInTFT, err := svc.gridClient.FromUSDMillicentToTFT(user.CreditedBalance + user.CreditCardBalance)
if err != nil {
return err
}

if userTFTBalance+totalPendingTFTAmount >= zeroTFTBalanceValue {
if userGridAccountTFTBalance+totalPendingTFTAmount >= userBalanceInTFT {
return nil
}

return svc.transferRecordsRepo.CreateTransferRecord(&models.TransferRecord{
UserID: userID,
Username: username,
TFTAmount: svc.minimumTFTAmountInWallet * TFTUnitFactor,
UserID: user.ID,
Username: user.Username,
TFTAmount: (userBalanceInTFT - (userGridAccountTFTBalance + totalPendingTFTAmount)) * TFTUnitFactor,
Operation: models.DepositOperation,
})
}

func (svc *BillingService) FundUserToFulfillDiscount(ctx context.Context, user *models.User, addedRentedNodes []types.Node, addedSharedNodes []kubedeployer.Node) error {
if user.CreditCardBalance+user.CreditedBalance-user.Debt <= 0 {
// user has no USD balance, skip
return nil
}

// calculate resources usage in USD applying discount
// I took the cluster nodes since only the new node is in cluster.Nodes
dailyUsageInUSDMillicent, err := svc.calculateResourcesUsageInUSDApplyingDiscount(ctx, user.ID, user.Mnemonic, addedRentedNodes, addedSharedNodes, svc.appliedDiscount)
if err != nil {
return err
}

dailyUsageInTFT, err := svc.gridClient.FromUSDMillicentToTFT(dailyUsageInUSDMillicent)
if err != nil {
return err
}

totalPendingTFTAmount, err := svc.transferRecordsRepo.CalculateTotalPendingTFTAmountPerUser(user.ID)
func (svc *BillingService) CreateTransferRecordToChargeUserWithMinTFTAmount(userID int, username, userMnemonic string) error {
userTFTBalance, err := svc.gridClient.GetFreeBalanceTFT(userMnemonic)
if err != nil {
return err
}

userTFTBalance, err := svc.gridClient.GetFreeBalanceTFT(user.Mnemonic)
totalPendingTFTAmount, err := svc.transferRecordsRepo.CalculateTotalPendingTFTAmountPerUser(userID)
if err != nil {
return err
}

// fund user to fulfill discount
// make sure no old payments will fund more than needed
if totalPendingTFTAmount+userTFTBalance < dailyUsageInTFT &&
dailyUsageInTFT > 0 {
if err := svc.transferRecordsRepo.CreateTransferRecord(&models.TransferRecord{
UserID: user.ID,
Username: user.Username,
TFTAmount: dailyUsageInTFT - userTFTBalance - totalPendingTFTAmount,
Operation: models.DepositOperation,
}); err != nil {
return err
}
if userTFTBalance+totalPendingTFTAmount >= zeroTFTBalanceValue {
return nil
}

return nil
return svc.transferRecordsRepo.CreateTransferRecord(&models.TransferRecord{
UserID: userID,
Username: username,
TFTAmount: svc.minimumTFTAmountInWallet * TFTUnitFactor,
Operation: models.DepositOperation,
})
}

func (svc *BillingService) calculateResourcesUsageInUSDApplyingDiscount(
func (svc *BillingService) calculateResourcesUsageInUSD(
ctx context.Context,
userID int,
userMnemonic string,
addedRentedNodes []types.Node,
addedSharedNodes []kubedeployer.Node,
configuredDiscount Discount,
) (uint64, error) {
calculator, err := svc.gridClient.NewCalculator(userMnemonic)
if err != nil {
Expand Down Expand Up @@ -251,8 +233,11 @@ func (svc *BillingService) calculateResourcesUsageInUSDApplyingDiscount(
}

totalResourcesCostMillicent += gridclient.FromUSDToUSDMillicent(float64(len(nameContracts)) * nameContractMonthlyCostInUSD)
return totalResourcesCostMillicent, nil
}

discount := getDiscountPackage(configuredDiscount).DurationInMonth
func (svc *BillingService) ApplyDiscountOnUsage(totalResourcesCostMillicent uint64) (uint64, error) {
discount := getDiscountPackage(svc.appliedDiscount).DurationInMonth
if discount == 0 {
return totalResourcesCostMillicent, nil
}
Expand Down
74 changes: 63 additions & 11 deletions backend/internal/core/services/workers_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ type WorkerService struct {
gridClient gridclient.GridClient
ewfEngine *ewf.Engine
notificationDispatcher *notification.NotificationDispatcher
billingService BillingService

// configs
systemMnemonic string
invoiceCompanyData config.InvoiceCompanyData
currency string
checkUserDebtIntervalInHours int
Expand All @@ -64,7 +64,6 @@ type WorkerService struct {
settleTransferRecordsIntervalInMinutes int
notifyAdminsForPendingRecordsInHours int
minimumTFTAmountInWallet int
appliedDiscount Discount
usersBalanceCheckIntervalInHours int
}

Expand All @@ -73,12 +72,12 @@ func NewWorkersService(
invoicesRepo models.InvoiceRepository, clusterRepo models.ClusterRepository, transferRecordsRepo models.TransferRecordRepository,
mailService mailservice.MailService,
gridClient gridclient.GridClient, ewfEngine *ewf.Engine, notificationDispatcher *notification.NotificationDispatcher,
graphql graphql.GraphQl, firesquidClient graphql.GraphQl,
invoiceCompanyData config.InvoiceCompanyData, systemMnemonic, currency string,
graphql graphql.GraphQl, firesquidClient graphql.GraphQl, billingService BillingService,
invoiceCompanyData config.InvoiceCompanyData, currency string,
clusterHealthCheckIntervalInHours, reservedNodeHealthCheckIntervalInHours,
reservedNodeHealthCheckTimeoutInMinutes, reservedNodeHealthCheckWorkersNum,
settleTransferRecordsIntervalInMinutes, notifyAdminsForPendingRecordsInHours,
minimumTFTAmountInWallet int, appliedDiscount Discount,
minimumTFTAmountInWallet,
usersBalanceCheckIntervalInHours,
checkUserDebtIntervalInHours int,
) WorkerService {
Expand All @@ -96,8 +95,8 @@ func NewWorkersService(
graphql: graphql,
firesquidClient: firesquidClient,
gridClient: gridClient,
billingService: billingService,

systemMnemonic: systemMnemonic,
invoiceCompanyData: invoiceCompanyData,
currency: currency,

Expand All @@ -110,7 +109,6 @@ func NewWorkersService(
notifyAdminsForPendingRecordsInHours: notifyAdminsForPendingRecordsInHours,

minimumTFTAmountInWallet: minimumTFTAmountInWallet,
appliedDiscount: appliedDiscount,
usersBalanceCheckIntervalInHours: usersBalanceCheckIntervalInHours,
}
}
Expand Down Expand Up @@ -168,7 +166,7 @@ func (svc WorkerService) GetUsersBalanceCheckInterval() time.Duration {
return time.Duration(svc.usersBalanceCheckIntervalInHours) * time.Hour
}

func (svc WorkerService) CreateUserInvoice(BillingService BillingService, user models.User) error {
func (svc WorkerService) CreateUserInvoice(user models.User) error {
now := time.Now()
timeMonthAgo := now.AddDate(0, -1, 0)

Expand All @@ -190,7 +188,7 @@ func (svc WorkerService) CreateUserInvoice(BillingService BillingService, user m
return err
}

totalAmountBilledInUSDMillicent, err := BillingService.calculateTotalUsageOfReportsInUSDMillicent(billReports.Reports)
totalAmountBilledInUSDMillicent, err := svc.billingService.calculateTotalUsageOfReportsInUSDMillicent(billReports.Reports)
if err != nil {
return err
}
Expand Down Expand Up @@ -435,7 +433,7 @@ func (svc WorkerService) SettlePendingPayments(records []models.TransferRecord)
var transferFailure string

// getting balance every time to ensure we have the latest balance
systemTFTBalance, err := svc.gridClient.GetFreeBalanceTFT(svc.systemMnemonic)
systemTFTBalance, err := svc.gridClient.GetSystemTFTBalance()
if err != nil {
log.Error().Err(err).Int("record_id", record.ID).Msg("Failed to get system TFT balance for pending record")
continue
Expand Down Expand Up @@ -517,7 +515,6 @@ func (svc *WorkerService) ResetUsersTFTsWithNoUSDBalance(users []models.User) er
}

func (svc WorkerService) NotifyAdminWithPendingRecords(records []models.TransferRecord) error {

admins, err := svc.userRepo.ListAdmins()
if err != nil {
return err
Expand All @@ -534,6 +531,61 @@ func (svc WorkerService) NotifyAdminWithPendingRecords(records []models.Transfer
return nil
}

func (svc WorkerService) NotifyAdminWithInsufficientBalance() error {
currentBalance, err := svc.gridClient.GetSystemTFTBalance()
if err != nil {
return err
}

users, err := svc.userRepo.ListAllUsers()
if err != nil {
return err
}

var totalUserDailyUsage uint64

for _, user := range users {
dailyUsageInUSDMillicent, err := svc.billingService.calculateResourcesUsageInUSD(svc.ctx, user.ID, user.Mnemonic, nil, nil)
if err != nil {
return err
}

dailyUsageInTFT, err := svc.gridClient.FromUSDMillicentToTFT(dailyUsageInUSDMillicent)
if err != nil {
return err
}

totalUserDailyUsage += dailyUsageInTFT
}

requiredBalance, err := svc.billingService.ApplyDiscountOnUsage(totalUserDailyUsage)
if err != nil {
return err
}

if requiredBalance <= currentBalance {
return nil
}

admins, err := svc.userRepo.ListAdmins()
if err != nil {
return err
}

for _, admin := range admins {
err = svc.mailService.SendInsufficientBalanceNotificationEmail(
admin.Email, float64(currentBalance)/TFTUnitFactor,
float64(requiredBalance)/TFTUnitFactor, fmt.Sprint(svc.billingService.Discount()),
)
if err != nil {
logger.ForOperation("balance_monitor", "send_admin_mail").Error().Err(err).Msg("Failed to send admin notification email")
continue
}
}

return nil
}

func (svc WorkerService) AsyncTrackClusterHealth(cluster models.Cluster) error {
wf, err := svc.ewfEngine.NewWorkflow(workflows.WorkflowTrackClusterHealth, ewf.WithDisplayName("Cluster Health Check"))
if err != nil {
Expand Down
Loading
Loading