package main import ( "context" "database/sql" "errors" "fmt" "strings" "time" _ "modernc.org/sqlite" ) var ( errNotFound = errors.New("record not found") ) type store struct { db *sql.DB } func openStore(path string) (*store, error) { db, err := sql.Open("sqlite", path) if err != nil { return nil, err } db.SetMaxOpenConns(1) s := &store{db: db} if err := s.migrate(context.Background()); err != nil { _ = db.Close() return nil, err } return s, nil } func (s *store) close() error { return s.db.Close() } func (s *store) migrate(ctx context.Context) error { statements := []string{ `CREATE TABLE IF NOT EXISTS designations ( code TEXT PRIMARY KEY, display_token TEXT NOT NULL, intent_id TEXT NOT NULL UNIQUE, nonce TEXT NOT NULL, origin TEXT NOT NULL, locale TEXT NOT NULL, address TEXT NOT NULL, chain_id INTEGER NOT NULL, issued_at TEXT NOT NULL, expires_at TEXT NOT NULL, verified_at TEXT, membership_status TEXT NOT NULL DEFAULT 'none', membership_tx_hash TEXT, activated_at TEXT, identity_assurance_level TEXT NOT NULL DEFAULT 'none', identity_attested_by TEXT, identity_attestation_id TEXT, identity_attested_at TEXT );`, `CREATE INDEX IF NOT EXISTS idx_designations_intent ON designations(intent_id);`, `CREATE INDEX IF NOT EXISTS idx_designations_address ON designations(address);`, `CREATE TABLE IF NOT EXISTS wallet_sessions ( session_token TEXT PRIMARY KEY, wallet TEXT NOT NULL, designation_code TEXT NOT NULL, chain_id INTEGER NOT NULL, issued_at TEXT NOT NULL, expires_at TEXT NOT NULL, last_seen_at TEXT, revoked_at TEXT );`, `CREATE INDEX IF NOT EXISTS idx_wallet_sessions_wallet ON wallet_sessions(wallet);`, `CREATE INDEX IF NOT EXISTS idx_wallet_sessions_expires ON wallet_sessions(expires_at);`, `CREATE TABLE IF NOT EXISTS quotes ( quote_id TEXT PRIMARY KEY, designation_code TEXT NOT NULL, address TEXT NOT NULL, payer_address TEXT, chain_id INTEGER NOT NULL, currency TEXT NOT NULL, amount_atomic TEXT NOT NULL, decimals INTEGER NOT NULL, contract_address TEXT NOT NULL, method TEXT NOT NULL, calldata TEXT NOT NULL, value_hex TEXT NOT NULL, sponsorship_mode TEXT NOT NULL DEFAULT 'self', sponsor_org_root_id TEXT, created_at TEXT NOT NULL, expires_at TEXT NOT NULL, confirmed_at TEXT, confirmed_tx_hash TEXT, FOREIGN KEY(designation_code) REFERENCES designations(code) );`, `CREATE INDEX IF NOT EXISTS idx_quotes_designation ON quotes(designation_code);`, `CREATE TABLE IF NOT EXISTS marketplace_quotes ( quote_id TEXT PRIMARY KEY, wallet TEXT NOT NULL, payer_wallet TEXT, offer_id TEXT NOT NULL, org_root_id TEXT, principal_id TEXT, principal_role TEXT, workspace_id TEXT, currency TEXT NOT NULL, amount_atomic TEXT NOT NULL, total_amount_atomic TEXT NOT NULL, decimals INTEGER NOT NULL, membership_included INTEGER NOT NULL DEFAULT 0, line_items_json TEXT NOT NULL, policy_hash TEXT NOT NULL, access_class TEXT NOT NULL, availability_state TEXT NOT NULL, expected_tx_to TEXT, expected_tx_data TEXT, expected_tx_value_hex TEXT, created_at TEXT NOT NULL, expires_at TEXT NOT NULL, confirmed_at TEXT, confirmed_tx_hash TEXT );`, `CREATE INDEX IF NOT EXISTS idx_marketplace_quotes_wallet ON marketplace_quotes(wallet);`, `CREATE TABLE IF NOT EXISTS marketplace_entitlements ( entitlement_id TEXT PRIMARY KEY, quote_id TEXT NOT NULL UNIQUE, offer_id TEXT NOT NULL, wallet TEXT NOT NULL, payer_wallet TEXT, org_root_id TEXT, principal_id TEXT, principal_role TEXT, workspace_id TEXT, state TEXT NOT NULL, access_class TEXT NOT NULL, availability_state TEXT NOT NULL, policy_hash TEXT NOT NULL, issued_at TEXT NOT NULL, tx_hash TEXT NOT NULL );`, `CREATE INDEX IF NOT EXISTS idx_marketplace_entitlements_wallet ON marketplace_entitlements(wallet);`, `CREATE TABLE IF NOT EXISTS governance_principals ( wallet TEXT PRIMARY KEY, org_root_id TEXT NOT NULL, principal_id TEXT NOT NULL, principal_role TEXT NOT NULL, entitlement_id TEXT NOT NULL, entitlement_status TEXT NOT NULL, access_class TEXT NOT NULL, availability_state TEXT NOT NULL, lease_expires_at TEXT, updated_at TEXT NOT NULL );`, `CREATE INDEX IF NOT EXISTS idx_governance_principals_org_root ON governance_principals(org_root_id);`, `CREATE TABLE IF NOT EXISTS governance_install_tokens ( install_token TEXT PRIMARY KEY, wallet TEXT NOT NULL, org_root_id TEXT NOT NULL, principal_id TEXT NOT NULL, principal_role TEXT NOT NULL, device_id TEXT NOT NULL, entitlement_id TEXT NOT NULL, package_hash TEXT NOT NULL, runtime_version TEXT NOT NULL, policy_hash TEXT NOT NULL, issued_at TEXT NOT NULL, expires_at TEXT NOT NULL, consumed_at TEXT );`, `CREATE INDEX IF NOT EXISTS idx_gov_install_tokens_wallet ON governance_install_tokens(wallet);`, `CREATE INDEX IF NOT EXISTS idx_gov_install_tokens_device ON governance_install_tokens(device_id);`, `CREATE TABLE IF NOT EXISTS governance_installs ( install_token TEXT PRIMARY KEY, wallet TEXT NOT NULL, device_id TEXT NOT NULL, entitlement_id TEXT NOT NULL, runtime_version TEXT NOT NULL, package_hash TEXT NOT NULL, policy_hash TEXT NOT NULL, launcher_receipt_ref TEXT, installed_at TEXT NOT NULL, activated_at TEXT NOT NULL );`, `CREATE INDEX IF NOT EXISTS idx_governance_installs_wallet ON governance_installs(wallet);`, `CREATE INDEX IF NOT EXISTS idx_governance_installs_device ON governance_installs(device_id);`, `CREATE TABLE IF NOT EXISTS member_channel_bindings ( channel_binding_id TEXT PRIMARY KEY, wallet TEXT NOT NULL, chain_id INTEGER NOT NULL, device_id TEXT NOT NULL, platform TEXT NOT NULL, org_root_id TEXT NOT NULL, principal_id TEXT NOT NULL, principal_role TEXT NOT NULL, app_version TEXT NOT NULL, push_provider TEXT NOT NULL, push_token TEXT, status TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, removed_at TEXT, UNIQUE(wallet, device_id) );`, `CREATE INDEX IF NOT EXISTS idx_member_channel_bindings_wallet ON member_channel_bindings(wallet);`, `CREATE INDEX IF NOT EXISTS idx_member_channel_bindings_org ON member_channel_bindings(org_root_id);`, `CREATE TABLE IF NOT EXISTS member_channel_events ( seq INTEGER PRIMARY KEY AUTOINCREMENT, event_id TEXT NOT NULL UNIQUE, wallet TEXT NOT NULL, org_root_id TEXT NOT NULL, principal_id TEXT NOT NULL, class TEXT NOT NULL, created_at TEXT NOT NULL, title TEXT NOT NULL, body TEXT NOT NULL, dedupe_key TEXT NOT NULL, requires_ack INTEGER NOT NULL DEFAULT 1, policy_hash TEXT NOT NULL, payload_json TEXT NOT NULL, visibility_scope TEXT NOT NULL, UNIQUE(wallet, org_root_id, dedupe_key) );`, `CREATE INDEX IF NOT EXISTS idx_member_channel_events_wallet_seq ON member_channel_events(wallet, seq);`, `CREATE INDEX IF NOT EXISTS idx_member_channel_events_org_seq ON member_channel_events(org_root_id, seq);`, `CREATE TABLE IF NOT EXISTS member_channel_event_acks ( event_id TEXT NOT NULL, wallet TEXT NOT NULL, device_id TEXT NOT NULL, acknowledged_at TEXT NOT NULL, PRIMARY KEY(event_id, wallet, device_id) );`, `CREATE TABLE IF NOT EXISTS member_support_tickets ( ticket_id TEXT PRIMARY KEY, wallet TEXT NOT NULL, org_root_id TEXT NOT NULL, principal_id TEXT NOT NULL, category TEXT NOT NULL, summary TEXT NOT NULL, context_json TEXT NOT NULL, status TEXT NOT NULL, created_at TEXT NOT NULL );`, `CREATE INDEX IF NOT EXISTS idx_member_support_tickets_wallet ON member_support_tickets(wallet);`, } for _, stmt := range statements { if _, err := s.db.ExecContext(ctx, stmt); err != nil { return fmt.Errorf("migrate: %w", err) } } // Backward-compatible column adds for already-initialized local DBs. if err := s.ensureColumn(ctx, "quotes", "payer_address", "TEXT"); err != nil { return err } if err := s.ensureColumn(ctx, "quotes", "sponsorship_mode", "TEXT NOT NULL DEFAULT 'self'"); err != nil { return err } if err := s.ensureColumn(ctx, "quotes", "sponsor_org_root_id", "TEXT"); err != nil { return err } if err := s.ensureColumn(ctx, "designations", "identity_assurance_level", "TEXT NOT NULL DEFAULT 'none'"); err != nil { return err } if err := s.ensureColumn(ctx, "designations", "identity_attested_by", "TEXT"); err != nil { return err } if err := s.ensureColumn(ctx, "designations", "identity_attestation_id", "TEXT"); err != nil { return err } if err := s.ensureColumn(ctx, "designations", "identity_attested_at", "TEXT"); err != nil { return err } if err := s.ensureColumn(ctx, "marketplace_quotes", "expected_tx_to", "TEXT"); err != nil { return err } if err := s.ensureColumn(ctx, "marketplace_quotes", "expected_tx_data", "TEXT"); err != nil { return err } if err := s.ensureColumn(ctx, "marketplace_quotes", "expected_tx_value_hex", "TEXT"); err != nil { return err } return nil } func (s *store) ensureColumn(ctx context.Context, table, column, columnDef string) error { rows, err := s.db.QueryContext(ctx, "PRAGMA table_info("+table+")") if err != nil { return fmt.Errorf("migrate table_info %s: %w", table, err) } defer rows.Close() for rows.Next() { var ( cid int name string ctype string notNull int defaultVal sql.NullString pk int ) if err := rows.Scan(&cid, &name, &ctype, ¬Null, &defaultVal, &pk); err != nil { return fmt.Errorf("migrate scan table_info %s: %w", table, err) } if strings.EqualFold(name, column) { return nil } } if err := rows.Err(); err != nil { return fmt.Errorf("migrate iterate table_info %s: %w", table, err) } if _, err := s.db.ExecContext(ctx, "ALTER TABLE "+table+" ADD COLUMN "+column+" "+columnDef); err != nil { return fmt.Errorf("migrate add column %s.%s: %w", table, column, err) } return nil } func (s *store) putDesignation(ctx context.Context, rec designationRecord) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO designations ( code, display_token, intent_id, nonce, origin, locale, address, chain_id, issued_at, expires_at, verified_at, membership_status, membership_tx_hash, activated_at, identity_assurance_level, identity_attested_by, identity_attestation_id, identity_attested_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(code) DO UPDATE SET display_token=excluded.display_token, intent_id=excluded.intent_id, nonce=excluded.nonce, origin=excluded.origin, locale=excluded.locale, address=excluded.address, chain_id=excluded.chain_id, issued_at=excluded.issued_at, expires_at=excluded.expires_at, verified_at=excluded.verified_at, membership_status=excluded.membership_status, membership_tx_hash=excluded.membership_tx_hash, activated_at=excluded.activated_at, identity_assurance_level=excluded.identity_assurance_level, identity_attested_by=excluded.identity_attested_by, identity_attestation_id=excluded.identity_attestation_id, identity_attested_at=excluded.identity_attested_at `, rec.Code, rec.DisplayToken, rec.IntentID, rec.Nonce, rec.Origin, rec.Locale, rec.Address, rec.ChainID, rec.IssuedAt.Format(time.RFC3339Nano), rec.ExpiresAt.Format(time.RFC3339Nano), formatNullableTime(rec.VerifiedAt), strings.ToLower(rec.MembershipStatus), nullableString(rec.MembershipTxHash), formatNullableTime(rec.ActivatedAt), normalizeAssuranceLevel(rec.IdentityAssurance), nullableString(rec.IdentityAttestedBy), nullableString(rec.IdentityAttestationID), formatNullableTime(rec.IdentityAttestedAt)) return err } func (s *store) getDesignationByIntent(ctx context.Context, intentID string) (designationRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT code, display_token, intent_id, nonce, origin, locale, address, chain_id, issued_at, expires_at, verified_at, membership_status, membership_tx_hash, activated_at, identity_assurance_level, identity_attested_by, identity_attestation_id, identity_attested_at FROM designations WHERE intent_id = ? `, intentID) return scanDesignation(row) } func (s *store) getDesignationByCode(ctx context.Context, code string) (designationRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT code, display_token, intent_id, nonce, origin, locale, address, chain_id, issued_at, expires_at, verified_at, membership_status, membership_tx_hash, activated_at, identity_assurance_level, identity_attested_by, identity_attestation_id, identity_attested_at FROM designations WHERE code = ? `, code) return scanDesignation(row) } func (s *store) getDesignationByAddress(ctx context.Context, address string) (designationRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT code, display_token, intent_id, nonce, origin, locale, address, chain_id, issued_at, expires_at, verified_at, membership_status, membership_tx_hash, activated_at, identity_assurance_level, identity_attested_by, identity_attestation_id, identity_attested_at FROM designations WHERE address = ? ORDER BY issued_at DESC LIMIT 1 `, strings.ToLower(strings.TrimSpace(address))) return scanDesignation(row) } func scanDesignation(row interface{ Scan(dest ...any) error }) (designationRecord, error) { var rec designationRecord var issued, expires, verified, activated, identityAttestedAt sql.NullString var membershipTx, identityAssurance, identityAttestedBy, identityAttestationID sql.NullString err := row.Scan(&rec.Code, &rec.DisplayToken, &rec.IntentID, &rec.Nonce, &rec.Origin, &rec.Locale, &rec.Address, &rec.ChainID, &issued, &expires, &verified, &rec.MembershipStatus, &membershipTx, &activated, &identityAssurance, &identityAttestedBy, &identityAttestationID, &identityAttestedAt) if err != nil { if errors.Is(err, sql.ErrNoRows) { return designationRecord{}, errNotFound } return designationRecord{}, err } rec.IssuedAt = parseRFC3339Nullable(issued) rec.ExpiresAt = parseRFC3339Nullable(expires) rec.VerifiedAt = parseRFC3339Ptr(verified) rec.MembershipTxHash = membershipTx.String rec.ActivatedAt = parseRFC3339Ptr(activated) rec.IdentityAssurance = normalizeAssuranceLevel(identityAssurance.String) rec.IdentityAttestedBy = identityAttestedBy.String rec.IdentityAttestationID = identityAttestationID.String rec.IdentityAttestedAt = parseRFC3339Ptr(identityAttestedAt) return rec, nil } func (s *store) putWalletSession(ctx context.Context, rec walletSessionRecord) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO wallet_sessions ( session_token, wallet, designation_code, chain_id, issued_at, expires_at, last_seen_at, revoked_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(session_token) DO UPDATE SET wallet=excluded.wallet, designation_code=excluded.designation_code, chain_id=excluded.chain_id, issued_at=excluded.issued_at, expires_at=excluded.expires_at, last_seen_at=excluded.last_seen_at, revoked_at=excluded.revoked_at `, strings.TrimSpace(rec.SessionToken), strings.ToLower(strings.TrimSpace(rec.Wallet)), strings.TrimSpace(rec.DesignationCode), rec.ChainID, rec.IssuedAt.UTC().Format(time.RFC3339Nano), rec.ExpiresAt.UTC().Format(time.RFC3339Nano), formatNullableTime(rec.LastSeenAt), formatNullableTime(rec.RevokedAt)) return err } func (s *store) getWalletSession(ctx context.Context, token string) (walletSessionRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT session_token, wallet, designation_code, chain_id, issued_at, expires_at, last_seen_at, revoked_at FROM wallet_sessions WHERE session_token = ? `, strings.TrimSpace(token)) var rec walletSessionRecord var issuedAt sql.NullString var expiresAt sql.NullString var lastSeenAt sql.NullString var revokedAt sql.NullString err := row.Scan(&rec.SessionToken, &rec.Wallet, &rec.DesignationCode, &rec.ChainID, &issuedAt, &expiresAt, &lastSeenAt, &revokedAt) if err != nil { if errors.Is(err, sql.ErrNoRows) { return walletSessionRecord{}, errNotFound } return walletSessionRecord{}, err } rec.IssuedAt = parseRFC3339Nullable(issuedAt) rec.ExpiresAt = parseRFC3339Nullable(expiresAt) rec.LastSeenAt = parseRFC3339Ptr(lastSeenAt) rec.RevokedAt = parseRFC3339Ptr(revokedAt) return rec, nil } func (s *store) touchWalletSession(ctx context.Context, token string, touchedAt time.Time) error { res, err := s.db.ExecContext(ctx, ` UPDATE wallet_sessions SET last_seen_at = ? WHERE session_token = ? `, touchedAt.UTC().Format(time.RFC3339Nano), strings.TrimSpace(token)) if err != nil { return err } affected, err := res.RowsAffected() if err != nil { return err } if affected == 0 { return errNotFound } return nil } func (s *store) revokeWalletSession(ctx context.Context, token string, revokedAt time.Time) error { res, err := s.db.ExecContext(ctx, ` UPDATE wallet_sessions SET revoked_at = ? WHERE session_token = ? `, revokedAt.UTC().Format(time.RFC3339Nano), strings.TrimSpace(token)) if err != nil { return err } affected, err := res.RowsAffected() if err != nil { return err } if affected == 0 { return errNotFound } return nil } func (s *store) deleteExpiredWalletSessions(ctx context.Context, now time.Time) (int64, error) { res, err := s.db.ExecContext(ctx, ` DELETE FROM wallet_sessions WHERE expires_at <= ? `, now.UTC().Format(time.RFC3339Nano)) if err != nil { return 0, err } affected, err := res.RowsAffected() if err != nil { return 0, err } return affected, nil } func (s *store) putQuote(ctx context.Context, quote quoteRecord) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO quotes ( quote_id, designation_code, address, payer_address, chain_id, currency, amount_atomic, decimals, contract_address, method, calldata, value_hex, sponsorship_mode, sponsor_org_root_id, created_at, expires_at, confirmed_at, confirmed_tx_hash ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(quote_id) DO UPDATE SET designation_code=excluded.designation_code, address=excluded.address, payer_address=excluded.payer_address, chain_id=excluded.chain_id, currency=excluded.currency, amount_atomic=excluded.amount_atomic, decimals=excluded.decimals, contract_address=excluded.contract_address, method=excluded.method, calldata=excluded.calldata, value_hex=excluded.value_hex, sponsorship_mode=excluded.sponsorship_mode, sponsor_org_root_id=excluded.sponsor_org_root_id, created_at=excluded.created_at, expires_at=excluded.expires_at, confirmed_at=excluded.confirmed_at, confirmed_tx_hash=excluded.confirmed_tx_hash `, quote.QuoteID, quote.DesignationCode, quote.Address, nullableString(quote.PayerAddress), quote.ChainID, quote.Currency, quote.AmountAtomic, quote.Decimals, quote.ContractAddress, quote.Method, quote.Calldata, quote.ValueHex, strings.ToLower(strings.TrimSpace(quote.SponsorshipMode)), nullableString(quote.SponsorOrgRootID), quote.CreatedAt.Format(time.RFC3339Nano), quote.ExpiresAt.Format(time.RFC3339Nano), formatNullableTime(quote.ConfirmedAt), nullableString(quote.ConfirmedTxHash)) return err } func (s *store) getQuote(ctx context.Context, quoteID string) (quoteRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT quote_id, designation_code, address, payer_address, chain_id, currency, amount_atomic, decimals, contract_address, method, calldata, value_hex, sponsorship_mode, sponsor_org_root_id, created_at, expires_at, confirmed_at, confirmed_tx_hash FROM quotes WHERE quote_id = ? `, quoteID) var rec quoteRecord var created, expires, confirmed sql.NullString var confirmedTx, payerAddress, sponsorOrgRootID sql.NullString err := row.Scan( &rec.QuoteID, &rec.DesignationCode, &rec.Address, &payerAddress, &rec.ChainID, &rec.Currency, &rec.AmountAtomic, &rec.Decimals, &rec.ContractAddress, &rec.Method, &rec.Calldata, &rec.ValueHex, &rec.SponsorshipMode, &sponsorOrgRootID, &created, &expires, &confirmed, &confirmedTx, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { return quoteRecord{}, errNotFound } return quoteRecord{}, err } rec.PayerAddress = payerAddress.String rec.SponsorOrgRootID = sponsorOrgRootID.String rec.CreatedAt = parseRFC3339Nullable(created) rec.ExpiresAt = parseRFC3339Nullable(expires) rec.ConfirmedAt = parseRFC3339Ptr(confirmed) rec.ConfirmedTxHash = confirmedTx.String return rec, nil } func (s *store) putMarketplaceQuote(ctx context.Context, quote marketplaceQuoteRecord) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO marketplace_quotes ( quote_id, wallet, payer_wallet, offer_id, org_root_id, principal_id, principal_role, workspace_id, currency, amount_atomic, total_amount_atomic, decimals, membership_included, line_items_json, policy_hash, access_class, availability_state, expected_tx_to, expected_tx_data, expected_tx_value_hex, created_at, expires_at, confirmed_at, confirmed_tx_hash ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(quote_id) DO UPDATE SET wallet=excluded.wallet, payer_wallet=excluded.payer_wallet, offer_id=excluded.offer_id, org_root_id=excluded.org_root_id, principal_id=excluded.principal_id, principal_role=excluded.principal_role, workspace_id=excluded.workspace_id, currency=excluded.currency, amount_atomic=excluded.amount_atomic, total_amount_atomic=excluded.total_amount_atomic, decimals=excluded.decimals, membership_included=excluded.membership_included, line_items_json=excluded.line_items_json, policy_hash=excluded.policy_hash, access_class=excluded.access_class, availability_state=excluded.availability_state, expected_tx_to=excluded.expected_tx_to, expected_tx_data=excluded.expected_tx_data, expected_tx_value_hex=excluded.expected_tx_value_hex, created_at=excluded.created_at, expires_at=excluded.expires_at, confirmed_at=excluded.confirmed_at, confirmed_tx_hash=excluded.confirmed_tx_hash `, quote.QuoteID, strings.ToLower(strings.TrimSpace(quote.Wallet)), nullableString(strings.ToLower(strings.TrimSpace(quote.PayerWallet))), strings.TrimSpace(quote.OfferID), nullableString(strings.TrimSpace(quote.OrgRootID)), nullableString(strings.TrimSpace(quote.PrincipalID)), nullableString(strings.ToLower(strings.TrimSpace(quote.PrincipalRole))), nullableString(strings.TrimSpace(quote.WorkspaceID)), strings.TrimSpace(quote.Currency), strings.TrimSpace(quote.AmountAtomic), strings.TrimSpace(quote.TotalAmountAtomic), quote.Decimals, boolToInt(quote.MembershipIncluded), quote.LineItemsJSON, strings.TrimSpace(quote.PolicyHash), strings.ToLower(strings.TrimSpace(quote.AccessClass)), strings.ToLower(strings.TrimSpace(quote.AvailabilityState)), nullableString(strings.ToLower(strings.TrimSpace(quote.ExpectedTxTo))), nullableString(strings.ToLower(strings.TrimSpace(quote.ExpectedTxData))), nullableString(strings.ToLower(strings.TrimSpace(quote.ExpectedTxValueHex))), quote.CreatedAt.UTC().Format(time.RFC3339Nano), quote.ExpiresAt.UTC().Format(time.RFC3339Nano), formatNullableTime(quote.ConfirmedAt), nullableString(strings.ToLower(strings.TrimSpace(quote.ConfirmedTxHash))), ) return err } func (s *store) getMarketplaceQuote(ctx context.Context, quoteID string) (marketplaceQuoteRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT quote_id, wallet, payer_wallet, offer_id, org_root_id, principal_id, principal_role, workspace_id, currency, amount_atomic, total_amount_atomic, decimals, membership_included, line_items_json, policy_hash, access_class, availability_state, expected_tx_to, expected_tx_data, expected_tx_value_hex, created_at, expires_at, confirmed_at, confirmed_tx_hash FROM marketplace_quotes WHERE quote_id = ? `, strings.TrimSpace(quoteID)) var rec marketplaceQuoteRecord var payerWallet, orgRootID, principalID, principalRole, workspaceID sql.NullString var expectedTxTo, expectedTxData, expectedTxValueHex sql.NullString var createdAt, expiresAt, confirmedAt, confirmedTxHash sql.NullString var membershipIncluded int err := row.Scan( &rec.QuoteID, &rec.Wallet, &payerWallet, &rec.OfferID, &orgRootID, &principalID, &principalRole, &workspaceID, &rec.Currency, &rec.AmountAtomic, &rec.TotalAmountAtomic, &rec.Decimals, &membershipIncluded, &rec.LineItemsJSON, &rec.PolicyHash, &rec.AccessClass, &rec.AvailabilityState, &expectedTxTo, &expectedTxData, &expectedTxValueHex, &createdAt, &expiresAt, &confirmedAt, &confirmedTxHash, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { return marketplaceQuoteRecord{}, errNotFound } return marketplaceQuoteRecord{}, err } rec.PayerWallet = payerWallet.String rec.OrgRootID = orgRootID.String rec.PrincipalID = principalID.String rec.PrincipalRole = principalRole.String rec.WorkspaceID = workspaceID.String rec.ExpectedTxTo = expectedTxTo.String rec.ExpectedTxData = expectedTxData.String rec.ExpectedTxValueHex = expectedTxValueHex.String rec.MembershipIncluded = membershipIncluded == 1 rec.CreatedAt = parseRFC3339Nullable(createdAt) rec.ExpiresAt = parseRFC3339Nullable(expiresAt) rec.ConfirmedAt = parseRFC3339Ptr(confirmedAt) rec.ConfirmedTxHash = confirmedTxHash.String return rec, nil } func (s *store) putMarketplaceEntitlement(ctx context.Context, ent marketplaceEntitlementRecord) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO marketplace_entitlements ( entitlement_id, quote_id, offer_id, wallet, payer_wallet, org_root_id, principal_id, principal_role, workspace_id, state, access_class, availability_state, policy_hash, issued_at, tx_hash ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(entitlement_id) DO UPDATE SET quote_id=excluded.quote_id, offer_id=excluded.offer_id, wallet=excluded.wallet, payer_wallet=excluded.payer_wallet, org_root_id=excluded.org_root_id, principal_id=excluded.principal_id, principal_role=excluded.principal_role, workspace_id=excluded.workspace_id, state=excluded.state, access_class=excluded.access_class, availability_state=excluded.availability_state, policy_hash=excluded.policy_hash, issued_at=excluded.issued_at, tx_hash=excluded.tx_hash `, strings.TrimSpace(ent.EntitlementID), strings.TrimSpace(ent.QuoteID), strings.TrimSpace(ent.OfferID), strings.ToLower(strings.TrimSpace(ent.Wallet)), nullableString(strings.ToLower(strings.TrimSpace(ent.PayerWallet))), nullableString(strings.TrimSpace(ent.OrgRootID)), nullableString(strings.TrimSpace(ent.PrincipalID)), nullableString(strings.ToLower(strings.TrimSpace(ent.PrincipalRole))), nullableString(strings.TrimSpace(ent.WorkspaceID)), strings.ToLower(strings.TrimSpace(ent.State)), strings.ToLower(strings.TrimSpace(ent.AccessClass)), strings.ToLower(strings.TrimSpace(ent.AvailabilityState)), strings.TrimSpace(ent.PolicyHash), ent.IssuedAt.UTC().Format(time.RFC3339Nano), strings.ToLower(strings.TrimSpace(ent.TxHash)), ) return err } func (s *store) getMarketplaceEntitlementByQuote(ctx context.Context, quoteID string) (marketplaceEntitlementRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT entitlement_id, quote_id, offer_id, wallet, payer_wallet, org_root_id, principal_id, principal_role, workspace_id, state, access_class, availability_state, policy_hash, issued_at, tx_hash FROM marketplace_entitlements WHERE quote_id = ? `, strings.TrimSpace(quoteID)) return scanMarketplaceEntitlement(row) } func (s *store) listMarketplaceEntitlementsByWallet(ctx context.Context, wallet string) ([]marketplaceEntitlementRecord, error) { rows, err := s.db.QueryContext(ctx, ` SELECT entitlement_id, quote_id, offer_id, wallet, payer_wallet, org_root_id, principal_id, principal_role, workspace_id, state, access_class, availability_state, policy_hash, issued_at, tx_hash FROM marketplace_entitlements WHERE wallet = ? ORDER BY issued_at DESC `, strings.ToLower(strings.TrimSpace(wallet))) if err != nil { return nil, err } defer rows.Close() records := make([]marketplaceEntitlementRecord, 0) for rows.Next() { rec, err := scanMarketplaceEntitlement(rows) if err != nil { return nil, err } records = append(records, rec) } if err := rows.Err(); err != nil { return nil, err } return records, nil } func (s *store) hasActiveEntitlement(ctx context.Context, wallet, offerID, orgRootID string) (bool, error) { wallet = strings.ToLower(strings.TrimSpace(wallet)) offerID = strings.TrimSpace(offerID) orgRootID = strings.TrimSpace(orgRootID) if wallet == "" || offerID == "" { return false, nil } query := ` SELECT 1 FROM marketplace_entitlements WHERE wallet = ? AND offer_id = ? AND state = 'active' ` args := []any{wallet, offerID} if orgRootID != "" { query += " AND org_root_id = ?" args = append(args, orgRootID) } query += " LIMIT 1" row := s.db.QueryRowContext(ctx, query, args...) var marker int if err := row.Scan(&marker); err != nil { if errors.Is(err, sql.ErrNoRows) { return false, nil } return false, err } return marker == 1, nil } func scanMarketplaceEntitlement(row interface{ Scan(dest ...any) error }) (marketplaceEntitlementRecord, error) { var rec marketplaceEntitlementRecord var payerWallet, orgRootID, principalID, principalRole, workspaceID sql.NullString var issuedAt sql.NullString err := row.Scan( &rec.EntitlementID, &rec.QuoteID, &rec.OfferID, &rec.Wallet, &payerWallet, &orgRootID, &principalID, &principalRole, &workspaceID, &rec.State, &rec.AccessClass, &rec.AvailabilityState, &rec.PolicyHash, &issuedAt, &rec.TxHash, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { return marketplaceEntitlementRecord{}, errNotFound } return marketplaceEntitlementRecord{}, err } rec.PayerWallet = payerWallet.String rec.OrgRootID = orgRootID.String rec.PrincipalID = principalID.String rec.PrincipalRole = principalRole.String rec.WorkspaceID = workspaceID.String rec.IssuedAt = parseRFC3339Nullable(issuedAt) return rec, nil } func (s *store) putGovernancePrincipal(ctx context.Context, rec governancePrincipalRecord) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO governance_principals ( wallet, org_root_id, principal_id, principal_role, entitlement_id, entitlement_status, access_class, availability_state, lease_expires_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(wallet) DO UPDATE SET org_root_id=excluded.org_root_id, principal_id=excluded.principal_id, principal_role=excluded.principal_role, entitlement_id=excluded.entitlement_id, entitlement_status=excluded.entitlement_status, access_class=excluded.access_class, availability_state=excluded.availability_state, lease_expires_at=excluded.lease_expires_at, updated_at=excluded.updated_at `, rec.Wallet, rec.OrgRootID, rec.PrincipalID, rec.PrincipalRole, rec.EntitlementID, rec.EntitlementStatus, rec.AccessClass, rec.AvailabilityState, formatNullableTime(rec.LeaseExpiresAt), rec.UpdatedAt.UTC().Format(time.RFC3339Nano)) return err } func (s *store) getGovernancePrincipal(ctx context.Context, wallet string) (governancePrincipalRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT wallet, org_root_id, principal_id, principal_role, entitlement_id, entitlement_status, access_class, availability_state, lease_expires_at, updated_at FROM governance_principals WHERE wallet = ? `, strings.ToLower(strings.TrimSpace(wallet))) var rec governancePrincipalRecord var leaseExpiresAt, updatedAt sql.NullString err := row.Scan(&rec.Wallet, &rec.OrgRootID, &rec.PrincipalID, &rec.PrincipalRole, &rec.EntitlementID, &rec.EntitlementStatus, &rec.AccessClass, &rec.AvailabilityState, &leaseExpiresAt, &updatedAt) if err != nil { if errors.Is(err, sql.ErrNoRows) { return governancePrincipalRecord{}, errNotFound } return governancePrincipalRecord{}, err } rec.LeaseExpiresAt = parseRFC3339Ptr(leaseExpiresAt) rec.UpdatedAt = parseRFC3339Nullable(updatedAt) return rec, nil } func (s *store) putGovernanceInstallToken(ctx context.Context, rec governanceInstallTokenRecord) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO governance_install_tokens ( install_token, wallet, org_root_id, principal_id, principal_role, device_id, entitlement_id, package_hash, runtime_version, policy_hash, issued_at, expires_at, consumed_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(install_token) DO UPDATE SET wallet=excluded.wallet, org_root_id=excluded.org_root_id, principal_id=excluded.principal_id, principal_role=excluded.principal_role, device_id=excluded.device_id, entitlement_id=excluded.entitlement_id, package_hash=excluded.package_hash, runtime_version=excluded.runtime_version, policy_hash=excluded.policy_hash, issued_at=excluded.issued_at, expires_at=excluded.expires_at, consumed_at=excluded.consumed_at `, rec.InstallToken, rec.Wallet, rec.OrgRootID, rec.PrincipalID, rec.PrincipalRole, rec.DeviceID, rec.EntitlementID, rec.PackageHash, rec.RuntimeVersion, rec.PolicyHash, rec.IssuedAt.UTC().Format(time.RFC3339Nano), rec.ExpiresAt.UTC().Format(time.RFC3339Nano), formatNullableTime(rec.ConsumedAt)) return err } func (s *store) getGovernanceInstallToken(ctx context.Context, token string) (governanceInstallTokenRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT install_token, wallet, org_root_id, principal_id, principal_role, device_id, entitlement_id, package_hash, runtime_version, policy_hash, issued_at, expires_at, consumed_at FROM governance_install_tokens WHERE install_token = ? `, strings.TrimSpace(token)) var rec governanceInstallTokenRecord var issuedAt, expiresAt, consumedAt sql.NullString err := row.Scan(&rec.InstallToken, &rec.Wallet, &rec.OrgRootID, &rec.PrincipalID, &rec.PrincipalRole, &rec.DeviceID, &rec.EntitlementID, &rec.PackageHash, &rec.RuntimeVersion, &rec.PolicyHash, &issuedAt, &expiresAt, &consumedAt) if err != nil { if errors.Is(err, sql.ErrNoRows) { return governanceInstallTokenRecord{}, errNotFound } return governanceInstallTokenRecord{}, err } rec.IssuedAt = parseRFC3339Nullable(issuedAt) rec.ExpiresAt = parseRFC3339Nullable(expiresAt) rec.ConsumedAt = parseRFC3339Ptr(consumedAt) return rec, nil } func (s *store) putGovernanceInstall(ctx context.Context, rec governanceInstallRecord) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO governance_installs ( install_token, wallet, device_id, entitlement_id, runtime_version, package_hash, policy_hash, launcher_receipt_ref, installed_at, activated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(install_token) DO UPDATE SET wallet=excluded.wallet, device_id=excluded.device_id, entitlement_id=excluded.entitlement_id, runtime_version=excluded.runtime_version, package_hash=excluded.package_hash, policy_hash=excluded.policy_hash, launcher_receipt_ref=excluded.launcher_receipt_ref, installed_at=excluded.installed_at, activated_at=excluded.activated_at `, rec.InstallToken, rec.Wallet, rec.DeviceID, rec.EntitlementID, rec.RuntimeVersion, rec.PackageHash, rec.PolicyHash, nullableString(rec.LauncherReceiptRef), rec.InstalledAt.UTC().Format(time.RFC3339Nano), rec.ActivatedAt.UTC().Format(time.RFC3339Nano)) return err } func (s *store) getGovernanceInstallByToken(ctx context.Context, token string) (governanceInstallRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT install_token, wallet, device_id, entitlement_id, runtime_version, package_hash, policy_hash, launcher_receipt_ref, installed_at, activated_at FROM governance_installs WHERE install_token = ? `, strings.TrimSpace(token)) return scanGovernanceInstall(row) } func (s *store) getGovernanceInstallByDevice(ctx context.Context, wallet, deviceID string) (governanceInstallRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT install_token, wallet, device_id, entitlement_id, runtime_version, package_hash, policy_hash, launcher_receipt_ref, installed_at, activated_at FROM governance_installs WHERE wallet = ? AND device_id = ? ORDER BY activated_at DESC LIMIT 1 `, strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(deviceID)) return scanGovernanceInstall(row) } func scanGovernanceInstall(row interface{ Scan(dest ...any) error }) (governanceInstallRecord, error) { var rec governanceInstallRecord var launcherReceiptRef, installedAt, activatedAt sql.NullString err := row.Scan(&rec.InstallToken, &rec.Wallet, &rec.DeviceID, &rec.EntitlementID, &rec.RuntimeVersion, &rec.PackageHash, &rec.PolicyHash, &launcherReceiptRef, &installedAt, &activatedAt) if err != nil { if errors.Is(err, sql.ErrNoRows) { return governanceInstallRecord{}, errNotFound } return governanceInstallRecord{}, err } rec.LauncherReceiptRef = launcherReceiptRef.String rec.InstalledAt = parseRFC3339Nullable(installedAt) rec.ActivatedAt = parseRFC3339Nullable(activatedAt) return rec, nil } func (s *store) putMemberChannelBinding(ctx context.Context, rec memberChannelBindingRecord) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO member_channel_bindings ( channel_binding_id, wallet, chain_id, device_id, platform, org_root_id, principal_id, principal_role, app_version, push_provider, push_token, status, created_at, updated_at, removed_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(wallet, device_id) DO UPDATE SET channel_binding_id=excluded.channel_binding_id, chain_id=excluded.chain_id, platform=excluded.platform, org_root_id=excluded.org_root_id, principal_id=excluded.principal_id, principal_role=excluded.principal_role, app_version=excluded.app_version, push_provider=excluded.push_provider, push_token=excluded.push_token, status=excluded.status, updated_at=excluded.updated_at, removed_at=excluded.removed_at `, rec.ChannelBindingID, rec.Wallet, rec.ChainID, rec.DeviceID, rec.Platform, rec.OrgRootID, rec.PrincipalID, rec.PrincipalRole, rec.AppVersion, rec.PushProvider, nullableString(rec.PushToken), rec.Status, rec.CreatedAt.UTC().Format(time.RFC3339Nano), rec.UpdatedAt.UTC().Format(time.RFC3339Nano), formatNullableTime(rec.RemovedAt)) return err } func (s *store) getMemberChannelBinding(ctx context.Context, wallet, deviceID string) (memberChannelBindingRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT channel_binding_id, wallet, chain_id, device_id, platform, org_root_id, principal_id, principal_role, app_version, push_provider, push_token, status, created_at, updated_at, removed_at FROM member_channel_bindings WHERE wallet = ? AND device_id = ? AND status = 'active' `, strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(deviceID)) return scanMemberChannelBinding(row) } func (s *store) getMemberChannelBindingByPrincipal(ctx context.Context, wallet, orgRootID, principalID string) (memberChannelBindingRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT channel_binding_id, wallet, chain_id, device_id, platform, org_root_id, principal_id, principal_role, app_version, push_provider, push_token, status, created_at, updated_at, removed_at FROM member_channel_bindings WHERE wallet = ? AND org_root_id = ? AND principal_id = ? AND status = 'active' ORDER BY updated_at DESC LIMIT 1 `, strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(orgRootID), strings.TrimSpace(principalID)) return scanMemberChannelBinding(row) } func scanMemberChannelBinding(row interface{ Scan(dest ...any) error }) (memberChannelBindingRecord, error) { var rec memberChannelBindingRecord var pushToken, createdAt, updatedAt, removedAt sql.NullString err := row.Scan( &rec.ChannelBindingID, &rec.Wallet, &rec.ChainID, &rec.DeviceID, &rec.Platform, &rec.OrgRootID, &rec.PrincipalID, &rec.PrincipalRole, &rec.AppVersion, &rec.PushProvider, &pushToken, &rec.Status, &createdAt, &updatedAt, &removedAt, ) if err != nil { if errors.Is(err, sql.ErrNoRows) { return memberChannelBindingRecord{}, errNotFound } return memberChannelBindingRecord{}, err } rec.PushToken = pushToken.String rec.CreatedAt = parseRFC3339Nullable(createdAt) rec.UpdatedAt = parseRFC3339Nullable(updatedAt) rec.RemovedAt = parseRFC3339Ptr(removedAt) return rec, nil } func (s *store) removeMemberChannelBinding(ctx context.Context, wallet, deviceID string, removedAt time.Time) error { result, err := s.db.ExecContext(ctx, ` UPDATE member_channel_bindings SET status = 'removed', removed_at = ?, updated_at = ? WHERE wallet = ? AND device_id = ? AND status = 'active' `, removedAt.UTC().Format(time.RFC3339Nano), removedAt.UTC().Format(time.RFC3339Nano), strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(deviceID)) if err != nil { return err } affected, err := result.RowsAffected() if err != nil { return err } if affected == 0 { return errNotFound } return nil } func (s *store) putMemberChannelEvent(ctx context.Context, rec memberChannelEventRecord) error { if strings.TrimSpace(rec.EventID) == "" { id, err := randomHex(8) if err != nil { return err } rec.EventID = "evt_" + id } _, err := s.db.ExecContext(ctx, ` INSERT INTO member_channel_events ( event_id, wallet, org_root_id, principal_id, class, created_at, title, body, dedupe_key, requires_ack, policy_hash, payload_json, visibility_scope ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(wallet, org_root_id, dedupe_key) DO NOTHING `, rec.EventID, strings.ToLower(strings.TrimSpace(rec.Wallet)), strings.TrimSpace(rec.OrgRootID), strings.TrimSpace(rec.PrincipalID), strings.TrimSpace(rec.Class), rec.CreatedAt.UTC().Format(time.RFC3339Nano), rec.Title, rec.Body, rec.DedupeKey, boolToInt(rec.RequiresAck), rec.PolicyHash, rec.PayloadJSON, strings.TrimSpace(rec.VisibilityScope)) return err } func (s *store) getMemberChannelEventSeqByID(ctx context.Context, eventID string) (int64, error) { row := s.db.QueryRowContext(ctx, ` SELECT seq FROM member_channel_events WHERE event_id = ? `, strings.TrimSpace(eventID)) var seq int64 if err := row.Scan(&seq); err != nil { if errors.Is(err, sql.ErrNoRows) { return 0, errNotFound } return 0, err } return seq, nil } func (s *store) listMemberChannelEvents(ctx context.Context, wallet, orgRootID string, includeOwnerAdmin bool, cursor string, limit int) ([]memberChannelEventRecord, string, error) { cursorSeq := int64(0) if strings.TrimSpace(cursor) != "" { seq, err := s.getMemberChannelEventSeqByID(ctx, cursor) if err == nil { cursorSeq = seq } } visibilityClause := "visibility_scope = 'member'" if includeOwnerAdmin { visibilityClause = "(visibility_scope = 'member' OR visibility_scope = 'owner_admin')" } rows, err := s.db.QueryContext(ctx, ` SELECT seq, event_id, wallet, org_root_id, principal_id, class, created_at, title, body, dedupe_key, requires_ack, policy_hash, payload_json, visibility_scope FROM member_channel_events WHERE wallet = ? AND org_root_id = ? AND seq > ? AND `+visibilityClause+` ORDER BY seq ASC LIMIT ? `, strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(orgRootID), cursorSeq, limit) if err != nil { return nil, "", err } defer rows.Close() events := make([]memberChannelEventRecord, 0, limit) nextCursor := "" for rows.Next() { var rec memberChannelEventRecord var createdAt sql.NullString var requiresAck int if err := rows.Scan( &rec.Seq, &rec.EventID, &rec.Wallet, &rec.OrgRootID, &rec.PrincipalID, &rec.Class, &createdAt, &rec.Title, &rec.Body, &rec.DedupeKey, &requiresAck, &rec.PolicyHash, &rec.PayloadJSON, &rec.VisibilityScope, ); err != nil { return nil, "", err } rec.CreatedAt = parseRFC3339Nullable(createdAt) rec.RequiresAck = requiresAck == 1 events = append(events, rec) nextCursor = rec.EventID } if err := rows.Err(); err != nil { return nil, "", err } return events, nextCursor, nil } func (s *store) getMemberChannelEventByID(ctx context.Context, eventID string) (memberChannelEventRecord, error) { row := s.db.QueryRowContext(ctx, ` SELECT seq, event_id, wallet, org_root_id, principal_id, class, created_at, title, body, dedupe_key, requires_ack, policy_hash, payload_json, visibility_scope FROM member_channel_events WHERE event_id = ? `, strings.TrimSpace(eventID)) var rec memberChannelEventRecord var createdAt sql.NullString var requiresAck int err := row.Scan(&rec.Seq, &rec.EventID, &rec.Wallet, &rec.OrgRootID, &rec.PrincipalID, &rec.Class, &createdAt, &rec.Title, &rec.Body, &rec.DedupeKey, &requiresAck, &rec.PolicyHash, &rec.PayloadJSON, &rec.VisibilityScope) if err != nil { if errors.Is(err, sql.ErrNoRows) { return memberChannelEventRecord{}, errNotFound } return memberChannelEventRecord{}, err } rec.CreatedAt = parseRFC3339Nullable(createdAt) rec.RequiresAck = requiresAck == 1 return rec, nil } func (s *store) putMemberChannelEventAck(ctx context.Context, eventID, wallet, deviceID string, acknowledgedAt time.Time) (time.Time, error) { _, err := s.db.ExecContext(ctx, ` INSERT INTO member_channel_event_acks (event_id, wallet, device_id, acknowledged_at) VALUES (?, ?, ?, ?) ON CONFLICT(event_id, wallet, device_id) DO NOTHING `, strings.TrimSpace(eventID), strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(deviceID), acknowledgedAt.UTC().Format(time.RFC3339Nano)) if err != nil { return time.Time{}, err } row := s.db.QueryRowContext(ctx, ` SELECT acknowledged_at FROM member_channel_event_acks WHERE event_id = ? AND wallet = ? AND device_id = ? `, strings.TrimSpace(eventID), strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(deviceID)) var ackRaw sql.NullString if err := row.Scan(&ackRaw); err != nil { return time.Time{}, err } return parseRFC3339Nullable(ackRaw), nil } func (s *store) putMemberChannelSupportTicket(ctx context.Context, rec memberChannelSupportTicketRecord) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO member_support_tickets ( ticket_id, wallet, org_root_id, principal_id, category, summary, context_json, status, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) `, rec.TicketID, strings.ToLower(strings.TrimSpace(rec.Wallet)), strings.TrimSpace(rec.OrgRootID), strings.TrimSpace(rec.PrincipalID), strings.TrimSpace(rec.Category), rec.Summary, rec.ContextJSON, strings.TrimSpace(rec.Status), rec.CreatedAt.UTC().Format(time.RFC3339Nano)) return err } func parseRFC3339Nullable(raw sql.NullString) time.Time { if !raw.Valid || strings.TrimSpace(raw.String) == "" { return time.Time{} } ts, err := time.Parse(time.RFC3339Nano, raw.String) if err != nil { return time.Time{} } return ts.UTC() } func parseRFC3339Ptr(raw sql.NullString) *time.Time { ts := parseRFC3339Nullable(raw) if ts.IsZero() { return nil } return &ts } func formatNullableTime(ts *time.Time) any { if ts == nil || ts.IsZero() { return nil } return ts.UTC().Format(time.RFC3339Nano) } func nullableString(v string) any { if strings.TrimSpace(v) == "" { return nil } return v } func boolToInt(v bool) int { if v { return 1 } return 0 }