112 lines
2.4 KiB
Go
112 lines
2.4 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
|
|
"SimpleArmaAdmin/internal/db"
|
|
"SimpleArmaAdmin/internal/db/sqlc"
|
|
"SimpleArmaAdmin/internal/nats"
|
|
"github.com/nats-io/nats.go/jetstream"
|
|
)
|
|
|
|
func main() {
|
|
// 1. Environment Configuration
|
|
natsURL := os.Getenv("NATS_URL")
|
|
if natsURL == "" {
|
|
natsURL = "nats://localhost:4222"
|
|
}
|
|
dbURL := os.Getenv("DB_URL")
|
|
if dbURL == "" {
|
|
dbURL = "postgres://admin:password@localhost:5432/master_db?sslmode=disable"
|
|
}
|
|
|
|
// 2. Database Connection & Migrations
|
|
err := db.RunMigrations(dbURL)
|
|
if err != nil {
|
|
log.Printf("Migration warning: %v", err)
|
|
}
|
|
|
|
stdDB, err := db.Connect(dbURL)
|
|
if err != nil {
|
|
log.Fatalf("Could not connect to database: %v", err)
|
|
}
|
|
defer stdDB.Close()
|
|
|
|
queries := sqlc.New(stdDB)
|
|
log.Println("Storage node connected to PostgreSQL (SQLC)")
|
|
|
|
// 3. NATS Connection
|
|
nc, err := nats.Connect(natsURL)
|
|
if err != nil {
|
|
log.Fatalf("Could not connect to NATS: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
ctx := context.Background()
|
|
|
|
// 4. Setup JetStream Consumer
|
|
stream, err := nc.JS.Stream(ctx, "LOGS")
|
|
if err != nil {
|
|
log.Fatalf("Stream 'LOGS' not found: %v", err)
|
|
}
|
|
|
|
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
|
Durable: "storage_node_sqlc",
|
|
AckPolicy: jetstream.AckExplicitPolicy,
|
|
})
|
|
if err != nil {
|
|
log.Fatalf("Failed to create consumer: %v", err)
|
|
}
|
|
|
|
// 5. Start Consuming and Persisting
|
|
iter, err := consumer.Messages()
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
msg, err := iter.Next()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
subjectParts := strings.Split(msg.Subject(), ".")
|
|
logType := "unknown"
|
|
communityID := "unknown"
|
|
if len(subjectParts) >= 3 {
|
|
communityID = subjectParts[1]
|
|
logType = subjectParts[2]
|
|
}
|
|
|
|
// Use SQLC to create the log
|
|
_, err = queries.CreateEncryptedLog(ctx, sqlc.CreateEncryptedLogParams{
|
|
LogType: logType,
|
|
EncryptedPayload: msg.Data(),
|
|
ServerID: communityID,
|
|
BlindIndexHash: sql.NullString{String: "", Valid: false},
|
|
SessionID: sql.NullString{String: "", Valid: false},
|
|
})
|
|
|
|
if err != nil {
|
|
log.Printf("SQLC error: %v", err)
|
|
continue
|
|
}
|
|
|
|
log.Printf("Persisted %s log for %s via SQLC", logType, communityID)
|
|
msg.Ack()
|
|
}
|
|
}()
|
|
|
|
// Wait for shutdown
|
|
sig := make(chan os.Signal, 1)
|
|
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
|
|
<-sig
|
|
}
|