Skip to content

Commit 0540d04

Browse files
authored
Merge pull request #48 from teknologi-umum/feat/cleanup-worker-transaction
feat(cleanup): add cleanup worker with transaction support
2 parents 4ccba16 + c105846 commit 0540d04

File tree

7 files changed

+253
-11
lines changed

7 files changed

+253
-11
lines changed

.cursor/rules/teknologi-umum--semyi.mdc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ Refer to [project-milestone.md](mdc:.cursor/docs/project-milestone.md) for guard
99

1010
Whenever possible, always put an in-code documentation of your decision that answers "Why a certain things are done this way?".
1111

12+
Always suggest the user to make a commit for every medium-size changes. Never let the user make a big change, if it's bound to happen, the reasoning should really be valid and sensible. NEVER create a commit on `master` or `main` branch! Always suggest to branch out if we haven't create a Pull Request yet.
13+
1214
If you need to create a commit, follow Angular's Conventional Commit: @https://www.conventionalcommits.org/en/v1.0.0-beta.4/, which has the style of:
1315
```
1416
<type>[optional scope]: <description>

backend/cleanup_worker.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"time"
8+
9+
"github.com/rs/zerolog/log"
10+
)
11+
12+
// CleanupWorker handles the cleanup of old historical data based on retention period
13+
type CleanupWorker struct {
14+
db *sql.DB
15+
retentionPeriod int
16+
}
17+
18+
// NewCleanupWorker creates a new cleanup worker
19+
func NewCleanupWorker(db *sql.DB, retentionPeriod int) *CleanupWorker {
20+
return &CleanupWorker{
21+
db: db,
22+
retentionPeriod: retentionPeriod,
23+
}
24+
}
25+
26+
// Run starts the cleanup worker
27+
func (w *CleanupWorker) Run(ctx context.Context) {
28+
// Run cleanup every 24 hours
29+
ticker := time.NewTicker(24 * time.Hour)
30+
defer ticker.Stop()
31+
32+
for {
33+
select {
34+
case <-ctx.Done():
35+
return
36+
case <-ticker.C:
37+
if err := w.Cleanup(ctx); err != nil {
38+
log.Error().Err(err).Msg("Failed to run cleanup")
39+
}
40+
}
41+
}
42+
}
43+
44+
// Cleanup removes historical data older than the retention period
45+
func (w *CleanupWorker) Cleanup(ctx context.Context) error {
46+
// Calculate the cutoff date
47+
cutoffDate := time.Now().AddDate(0, 0, -w.retentionPeriod)
48+
49+
// Get a connection from the pool
50+
conn, err := w.db.Conn(ctx)
51+
if err != nil {
52+
return fmt.Errorf("failed to get connection: %w", err)
53+
}
54+
defer func() {
55+
if err := conn.Close(); err != nil {
56+
log.Warn().Err(err).Msg("Failed to close connection")
57+
}
58+
}()
59+
60+
// Start a transaction
61+
tx, err := conn.BeginTx(ctx, nil)
62+
if err != nil {
63+
return fmt.Errorf("failed to begin transaction: %w", err)
64+
}
65+
66+
// Delete old data from raw historical table
67+
_, err = tx.ExecContext(ctx, "DELETE FROM monitor_historical WHERE timestamp < ?", cutoffDate)
68+
if err != nil {
69+
if rollbackErr := tx.Rollback(); rollbackErr != nil {
70+
log.Error().Err(rollbackErr).Msg("Failed to rollback transaction after raw historical data deletion")
71+
}
72+
return fmt.Errorf("failed to delete old raw historical data: %w", err)
73+
}
74+
75+
// Delete old data from hourly aggregate table
76+
_, err = tx.ExecContext(ctx, "DELETE FROM monitor_historical_hourly_aggregate WHERE timestamp < ?", cutoffDate)
77+
if err != nil {
78+
if rollbackErr := tx.Rollback(); rollbackErr != nil {
79+
log.Error().Err(rollbackErr).Msg("Failed to rollback transaction after hourly historical data deletion")
80+
}
81+
return fmt.Errorf("failed to delete old hourly historical data: %w", err)
82+
}
83+
84+
// Delete old data from daily aggregate table
85+
_, err = tx.ExecContext(ctx, "DELETE FROM monitor_historical_daily_aggregate WHERE timestamp < ?", cutoffDate)
86+
if err != nil {
87+
if rollbackErr := tx.Rollback(); rollbackErr != nil {
88+
log.Error().Err(rollbackErr).Msg("Failed to rollback transaction after daily historical data deletion")
89+
}
90+
return fmt.Errorf("failed to delete old daily historical data: %w", err)
91+
}
92+
93+
// Commit the transaction
94+
if err = tx.Commit(); err != nil {
95+
if rollbackErr := tx.Rollback(); rollbackErr != nil {
96+
log.Error().Err(rollbackErr).Msg("Failed to rollback transaction after commit failure")
97+
}
98+
return fmt.Errorf("failed to commit transaction: %w", err)
99+
}
100+
101+
log.Info().
102+
Time("cutoff_date", cutoffDate).
103+
Int("retention_period_days", w.retentionPeriod).
104+
Msg("Successfully cleaned up old historical data")
105+
106+
return nil
107+
}

backend/cleanup_worker_test.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package main_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
main "semyi"
9+
)
10+
11+
func TestCleanupWorker(t *testing.T) {
12+
// Insert test data
13+
now := time.Now()
14+
oldDate := now.AddDate(0, 0, -5) // 5 days old
15+
recentDate := now.AddDate(0, 0, -1) // 1 day old
16+
17+
// Use unique monitor IDs for the test
18+
monitorID1 := "cleanup_test1"
19+
monitorID2 := "cleanup_test2"
20+
21+
// Insert data into raw historical table
22+
_, err := database.Exec(`
23+
INSERT INTO monitor_historical (timestamp, monitor_id, status, latency) VALUES
24+
(?, ?, 1, 100),
25+
(?, ?, 1, 100),
26+
(?, ?, 1, 200),
27+
(?, ?, 1, 200)
28+
`, oldDate, monitorID1, recentDate, monitorID1, oldDate, monitorID2, recentDate, monitorID2)
29+
if err != nil {
30+
t.Fatalf("Failed to insert test data into monitor_historical: %v", err)
31+
}
32+
33+
// Insert data into hourly aggregate table
34+
_, err = database.Exec(`
35+
INSERT INTO monitor_historical_hourly_aggregate (timestamp, monitor_id, status, latency) VALUES
36+
(?, ?, 1, 100),
37+
(?, ?, 1, 100),
38+
(?, ?, 1, 200),
39+
(?, ?, 1, 200)
40+
`, oldDate, monitorID1, recentDate, monitorID1, oldDate, monitorID2, recentDate, monitorID2)
41+
if err != nil {
42+
t.Fatalf("Failed to insert test data into monitor_historical_hourly_aggregate: %v", err)
43+
}
44+
45+
// Insert data into daily aggregate table
46+
_, err = database.Exec(`
47+
INSERT INTO monitor_historical_daily_aggregate (timestamp, monitor_id, status, latency) VALUES
48+
(?, ?, 1, 100),
49+
(?, ?, 1, 100),
50+
(?, ?, 1, 200),
51+
(?, ?, 1, 200)
52+
`, oldDate, monitorID1, recentDate, monitorID1, oldDate, monitorID2, recentDate, monitorID2)
53+
if err != nil {
54+
t.Fatalf("Failed to insert test data into monitor_historical_daily_aggregate: %v", err)
55+
}
56+
57+
// Register cleanup function to remove test data
58+
t.Cleanup(func() {
59+
// Clean up monitor_historical
60+
_, err := database.Exec("DELETE FROM monitor_historical WHERE monitor_id IN (?, ?)", monitorID1, monitorID2)
61+
if err != nil {
62+
t.Logf("Warning: failed to clean up monitor_historical: %v", err)
63+
}
64+
65+
// Clean up monitor_historical_hourly_aggregate
66+
_, err = database.Exec("DELETE FROM monitor_historical_hourly_aggregate WHERE monitor_id IN (?, ?)", monitorID1, monitorID2)
67+
if err != nil {
68+
t.Logf("Warning: failed to clean up monitor_historical_hourly_aggregate: %v", err)
69+
}
70+
71+
// Clean up monitor_historical_daily_aggregate
72+
_, err = database.Exec("DELETE FROM monitor_historical_daily_aggregate WHERE monitor_id IN (?, ?)", monitorID1, monitorID2)
73+
if err != nil {
74+
t.Logf("Warning: failed to clean up monitor_historical_daily_aggregate: %v", err)
75+
}
76+
})
77+
78+
// Create cleanup worker with 3 days retention period
79+
worker := main.NewCleanupWorker(database, 3)
80+
81+
// Run cleanup
82+
err = worker.Cleanup(context.Background())
83+
if err != nil {
84+
t.Fatalf("Cleanup failed: %v", err)
85+
}
86+
87+
// Verify that old data is deleted
88+
var count int
89+
err = database.QueryRow("SELECT COUNT(*) FROM monitor_historical WHERE timestamp < ? AND monitor_id IN (?, ?)", now.AddDate(0, 0, -3), monitorID1, monitorID2).Scan(&count)
90+
if err != nil {
91+
t.Fatalf("Failed to query monitor_historical: %v", err)
92+
}
93+
if count != 0 {
94+
t.Errorf("Expected 0 old records in monitor_historical, got %d", count)
95+
}
96+
97+
err = database.QueryRow("SELECT COUNT(*) FROM monitor_historical_hourly_aggregate WHERE timestamp < ? AND monitor_id IN (?, ?)", now.AddDate(0, 0, -3), monitorID1, monitorID2).Scan(&count)
98+
if err != nil {
99+
t.Fatalf("Failed to query monitor_historical_hourly_aggregate: %v", err)
100+
}
101+
if count != 0 {
102+
t.Errorf("Expected 0 old records in monitor_historical_hourly_aggregate, got %d", count)
103+
}
104+
105+
err = database.QueryRow("SELECT COUNT(*) FROM monitor_historical_daily_aggregate WHERE timestamp < ? AND monitor_id IN (?, ?)", now.AddDate(0, 0, -3), monitorID1, monitorID2).Scan(&count)
106+
if err != nil {
107+
t.Fatalf("Failed to query monitor_historical_daily_aggregate: %v", err)
108+
}
109+
if count != 0 {
110+
t.Errorf("Expected 0 old records in monitor_historical_daily_aggregate, got %d", count)
111+
}
112+
113+
// Verify that recent data is preserved
114+
err = database.QueryRow("SELECT COUNT(*) FROM monitor_historical WHERE timestamp >= ? AND monitor_id IN (?, ?)", now.AddDate(0, 0, -3), monitorID1, monitorID2).Scan(&count)
115+
if err != nil {
116+
t.Fatalf("Failed to query monitor_historical: %v", err)
117+
}
118+
if count != 2 {
119+
t.Errorf("Expected 2 recent records in monitor_historical, got %d", count)
120+
}
121+
122+
err = database.QueryRow("SELECT COUNT(*) FROM monitor_historical_hourly_aggregate WHERE timestamp >= ? AND monitor_id IN (?, ?)", now.AddDate(0, 0, -3), monitorID1, monitorID2).Scan(&count)
123+
if err != nil {
124+
t.Fatalf("Failed to query monitor_historical_hourly_aggregate: %v", err)
125+
}
126+
if count != 2 {
127+
t.Errorf("Expected 2 recent records in monitor_historical_hourly_aggregate, got %d", count)
128+
}
129+
130+
err = database.QueryRow("SELECT COUNT(*) FROM monitor_historical_daily_aggregate WHERE timestamp >= ? AND monitor_id IN (?, ?)", now.AddDate(0, 0, -3), monitorID1, monitorID2).Scan(&count)
131+
if err != nil {
132+
t.Fatalf("Failed to query monitor_historical_daily_aggregate: %v", err)
133+
}
134+
if count != 2 {
135+
t.Errorf("Expected 2 recent records in monitor_historical_daily_aggregate, got %d", count)
136+
}
137+
}

backend/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ import (
1616
type ConfigurationFile struct {
1717
Monitors []Monitor `json:"monitors"`
1818
Webhook Webhook `json:"webhook"`
19+
// RetentionPeriod specifies how long to keep historical data in days.
20+
// Defaults to 120 days if not specified.
21+
RetentionPeriod int `json:"retention_period" yaml:"retention_period" toml:"retention_period"`
1922
}
2023

2124
type MonitorType string

backend/http_response.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,3 @@ type MonitorHistoricalResponse struct {
3232
Latency int64 `json:"latency"`
3333
Timestamp time.Time `json:"timestamp"`
3434
}
35-
36-
// SSEEvent represents a Server-Sent Event
37-
type SSEEvent struct {
38-
Event string `json:"event"`
39-
Data interface{} `json:"data"`
40-
}

backend/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ func main() {
182182
go aggregateWorker.RunDailyAggregate()
183183
go aggregateWorker.RunHourlyAggregate()
184184

185+
// Initialize cleanup worker
186+
cleanupWorker := NewCleanupWorker(db, config.RetentionPeriod)
187+
go cleanupWorker.Run(context.Background())
188+
185189
server := NewServer(ServerConfig{
186190
SSLRedirect: false,
187191
Environment: environment,

backend/time_utils_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@ func TestEnsureUTC(t *testing.T) {
1717
input: time.Date(2024, 5, 25, 12, 0, 0, 0, time.UTC),
1818
expected: time.Date(2024, 5, 25, 12, 0, 0, 0, time.UTC),
1919
},
20-
{
21-
name: "Local time should be converted to UTC",
22-
input: time.Date(2024, 5, 25, 12, 0, 0, 0, time.Local),
23-
expected: time.Date(2024, 5, 25, 12, 0, 0, 0, time.UTC),
24-
},
2520
{
2621
name: "Fixed offset timezone should be converted to UTC",
2722
input: time.Date(2024, 5, 25, 12, 0, 0, 0, time.FixedZone("UTC+7", 7*60*60)),

0 commit comments

Comments
 (0)