package main import ( "context" "database/sql" "log" "os" "os/signal" "strings" "syscall" "SimpleArmaAdmin/internal/db" "SimpleArmaAdmin/internal/db/sqlc" "SimpleArmaAdmin/internal/nats" "github.com/nats-io/nats.go/jetstream" ) func main() { // 1. Environment Configuration natsURL := os.Getenv("NATS_URL") if natsURL == "" { natsURL = "nats://localhost:4222" } dbURL := os.Getenv("DB_URL") if dbURL == "" { dbURL = "postgres://admin:password@localhost:5432/master_db?sslmode=disable" } // 2. Database Connection & Migrations err := db.RunMigrations(dbURL) if err != nil { log.Printf("Migration warning: %v", err) } stdDB, err := db.Connect(dbURL) if err != nil { log.Fatalf("Could not connect to database: %v", err) } defer stdDB.Close() queries := sqlc.New(stdDB) log.Println("Storage node connected to PostgreSQL (SQLC)") // 3. NATS Connection nc, err := nats.Connect(natsURL) if err != nil { log.Fatalf("Could not connect to NATS: %v", err) } defer nc.Close() ctx := context.Background() // 4. Setup JetStream Consumer stream, err := nc.JS.Stream(ctx, "LOGS") if err != nil { log.Fatalf("Stream 'LOGS' not found: %v", err) } consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "storage_node_sqlc", AckPolicy: jetstream.AckExplicitPolicy, }) if err != nil { log.Fatalf("Failed to create consumer: %v", err) } // 5. Start Consuming and Persisting iter, err := consumer.Messages() if err != nil { log.Fatal(err) } go func() { for { msg, err := iter.Next() if err != nil { return } subjectParts := strings.Split(msg.Subject(), ".") logType := "unknown" communityID := "unknown" if len(subjectParts) >= 3 { communityID = subjectParts[1] logType = subjectParts[2] } // Use SQLC to create the log _, err = queries.CreateEncryptedLog(ctx, sqlc.CreateEncryptedLogParams{ LogType: logType, EncryptedPayload: msg.Data(), ServerID: communityID, BlindIndexHash: sql.NullString{String: "", Valid: false}, SessionID: sql.NullString{String: "", Valid: false}, }) if err != nil { log.Printf("SQLC error: %v", err) continue } log.Printf("Persisted %s log for %s via SQLC", logType, communityID) msg.Ack() } }() // Wait for shutdown sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) <-sig }