Add configuration files, database migrations, and authentication implementation scaffolding
This commit is contained in:
111
cmd/storage/main.go
Normal file
111
cmd/storage/main.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"SimpleArmaAdmin/internal/db"
|
||||
"SimpleArmaAdmin/internal/db/sqlc"
|
||||
"SimpleArmaAdmin/internal/nats"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// 1. Environment Configuration
|
||||
natsURL := os.Getenv("NATS_URL")
|
||||
if natsURL == "" {
|
||||
natsURL = "nats://localhost:4222"
|
||||
}
|
||||
dbURL := os.Getenv("DB_URL")
|
||||
if dbURL == "" {
|
||||
dbURL = "postgres://admin:password@localhost:5432/master_db?sslmode=disable"
|
||||
}
|
||||
|
||||
// 2. Database Connection & Migrations
|
||||
err := db.RunMigrations(dbURL)
|
||||
if err != nil {
|
||||
log.Printf("Migration warning: %v", err)
|
||||
}
|
||||
|
||||
stdDB, err := db.Connect(dbURL)
|
||||
if err != nil {
|
||||
log.Fatalf("Could not connect to database: %v", err)
|
||||
}
|
||||
defer stdDB.Close()
|
||||
|
||||
queries := sqlc.New(stdDB)
|
||||
log.Println("Storage node connected to PostgreSQL (SQLC)")
|
||||
|
||||
// 3. NATS Connection
|
||||
nc, err := nats.Connect(natsURL)
|
||||
if err != nil {
|
||||
log.Fatalf("Could not connect to NATS: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// 4. Setup JetStream Consumer
|
||||
stream, err := nc.JS.Stream(ctx, "LOGS")
|
||||
if err != nil {
|
||||
log.Fatalf("Stream 'LOGS' not found: %v", err)
|
||||
}
|
||||
|
||||
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
||||
Durable: "storage_node_sqlc",
|
||||
AckPolicy: jetstream.AckExplicitPolicy,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create consumer: %v", err)
|
||||
}
|
||||
|
||||
// 5. Start Consuming and Persisting
|
||||
iter, err := consumer.Messages()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
msg, err := iter.Next()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
subjectParts := strings.Split(msg.Subject(), ".")
|
||||
logType := "unknown"
|
||||
communityID := "unknown"
|
||||
if len(subjectParts) >= 3 {
|
||||
communityID = subjectParts[1]
|
||||
logType = subjectParts[2]
|
||||
}
|
||||
|
||||
// Use SQLC to create the log
|
||||
_, err = queries.CreateEncryptedLog(ctx, sqlc.CreateEncryptedLogParams{
|
||||
LogType: logType,
|
||||
EncryptedPayload: msg.Data(),
|
||||
ServerID: communityID,
|
||||
BlindIndexHash: sql.NullString{String: "", Valid: false},
|
||||
SessionID: sql.NullString{String: "", Valid: false},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Printf("SQLC error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("Persisted %s log for %s via SQLC", logType, communityID)
|
||||
msg.Ack()
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for shutdown
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sig
|
||||
}
|
||||
Reference in New Issue
Block a user