Files
SimpleArmaAdmin/cmd/storage/main.go

112 lines
2.4 KiB
Go

package main
import (
"context"
"database/sql"
"log"
"os"
"os/signal"
"strings"
"syscall"
"SimpleArmaAdmin/internal/db"
"SimpleArmaAdmin/internal/db/sqlc"
"SimpleArmaAdmin/internal/nats"
"github.com/nats-io/nats.go/jetstream"
)
func main() {
// 1. Environment Configuration
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = "nats://localhost:4222"
}
dbURL := os.Getenv("DB_URL")
if dbURL == "" {
dbURL = "postgres://admin:password@localhost:5432/master_db?sslmode=disable"
}
// 2. Database Connection & Migrations
err := db.RunMigrations(dbURL)
if err != nil {
log.Printf("Migration warning: %v", err)
}
stdDB, err := db.Connect(dbURL)
if err != nil {
log.Fatalf("Could not connect to database: %v", err)
}
defer stdDB.Close()
queries := sqlc.New(stdDB)
log.Println("Storage node connected to PostgreSQL (SQLC)")
// 3. NATS Connection
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatalf("Could not connect to NATS: %v", err)
}
defer nc.Close()
ctx := context.Background()
// 4. Setup JetStream Consumer
stream, err := nc.JS.Stream(ctx, "LOGS")
if err != nil {
log.Fatalf("Stream 'LOGS' not found: %v", err)
}
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "storage_node_sqlc",
AckPolicy: jetstream.AckExplicitPolicy,
})
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
// 5. Start Consuming and Persisting
iter, err := consumer.Messages()
if err != nil {
log.Fatal(err)
}
go func() {
for {
msg, err := iter.Next()
if err != nil {
return
}
subjectParts := strings.Split(msg.Subject(), ".")
logType := "unknown"
communityID := "unknown"
if len(subjectParts) >= 3 {
communityID = subjectParts[1]
logType = subjectParts[2]
}
// Use SQLC to create the log
_, err = queries.CreateEncryptedLog(ctx, sqlc.CreateEncryptedLogParams{
LogType: logType,
EncryptedPayload: msg.Data(),
ServerID: communityID,
BlindIndexHash: sql.NullString{String: "", Valid: false},
SessionID: sql.NullString{String: "", Valid: false},
})
if err != nil {
log.Printf("SQLC error: %v", err)
continue
}
log.Printf("Persisted %s log for %s via SQLC", logType, communityID)
msg.Ack()
}
}()
// Wait for shutdown
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig
}