web/backend/secretapi/store.go

789 lines
30 KiB
Go

package main
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"
_ "modernc.org/sqlite"
)
var (
errNotFound = errors.New("record not found")
)
type store struct {
db *sql.DB
}
func openStore(path string) (*store, error) {
db, err := sql.Open("sqlite", path)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
s := &store{db: db}
if err := s.migrate(context.Background()); err != nil {
_ = db.Close()
return nil, err
}
return s, nil
}
func (s *store) close() error {
return s.db.Close()
}
func (s *store) migrate(ctx context.Context) error {
statements := []string{
`CREATE TABLE IF NOT EXISTS designations (
code TEXT PRIMARY KEY,
display_token TEXT NOT NULL,
intent_id TEXT NOT NULL UNIQUE,
nonce TEXT NOT NULL,
origin TEXT NOT NULL,
locale TEXT NOT NULL,
address TEXT NOT NULL,
chain_id INTEGER NOT NULL,
issued_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
verified_at TEXT,
membership_status TEXT NOT NULL DEFAULT 'none',
membership_tx_hash TEXT,
activated_at TEXT
);`,
`CREATE INDEX IF NOT EXISTS idx_designations_intent ON designations(intent_id);`,
`CREATE INDEX IF NOT EXISTS idx_designations_address ON designations(address);`,
`CREATE TABLE IF NOT EXISTS quotes (
quote_id TEXT PRIMARY KEY,
designation_code TEXT NOT NULL,
address TEXT NOT NULL,
payer_address TEXT,
chain_id INTEGER NOT NULL,
currency TEXT NOT NULL,
amount_atomic TEXT NOT NULL,
decimals INTEGER NOT NULL,
contract_address TEXT NOT NULL,
method TEXT NOT NULL,
calldata TEXT NOT NULL,
value_hex TEXT NOT NULL,
sponsorship_mode TEXT NOT NULL DEFAULT 'self',
sponsor_org_root_id TEXT,
created_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
confirmed_at TEXT,
confirmed_tx_hash TEXT,
FOREIGN KEY(designation_code) REFERENCES designations(code)
);`,
`CREATE INDEX IF NOT EXISTS idx_quotes_designation ON quotes(designation_code);`,
`CREATE TABLE IF NOT EXISTS governance_principals (
wallet TEXT PRIMARY KEY,
org_root_id TEXT NOT NULL,
principal_id TEXT NOT NULL,
principal_role TEXT NOT NULL,
entitlement_id TEXT NOT NULL,
entitlement_status TEXT NOT NULL,
access_class TEXT NOT NULL,
availability_state TEXT NOT NULL,
lease_expires_at TEXT,
updated_at TEXT NOT NULL
);`,
`CREATE INDEX IF NOT EXISTS idx_governance_principals_org_root ON governance_principals(org_root_id);`,
`CREATE TABLE IF NOT EXISTS governance_install_tokens (
install_token TEXT PRIMARY KEY,
wallet TEXT NOT NULL,
org_root_id TEXT NOT NULL,
principal_id TEXT NOT NULL,
principal_role TEXT NOT NULL,
device_id TEXT NOT NULL,
entitlement_id TEXT NOT NULL,
package_hash TEXT NOT NULL,
runtime_version TEXT NOT NULL,
policy_hash TEXT NOT NULL,
issued_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
consumed_at TEXT
);`,
`CREATE INDEX IF NOT EXISTS idx_gov_install_tokens_wallet ON governance_install_tokens(wallet);`,
`CREATE INDEX IF NOT EXISTS idx_gov_install_tokens_device ON governance_install_tokens(device_id);`,
`CREATE TABLE IF NOT EXISTS governance_installs (
install_token TEXT PRIMARY KEY,
wallet TEXT NOT NULL,
device_id TEXT NOT NULL,
entitlement_id TEXT NOT NULL,
runtime_version TEXT NOT NULL,
package_hash TEXT NOT NULL,
policy_hash TEXT NOT NULL,
launcher_receipt_ref TEXT,
installed_at TEXT NOT NULL,
activated_at TEXT NOT NULL
);`,
`CREATE INDEX IF NOT EXISTS idx_governance_installs_wallet ON governance_installs(wallet);`,
`CREATE INDEX IF NOT EXISTS idx_governance_installs_device ON governance_installs(device_id);`,
`CREATE TABLE IF NOT EXISTS member_channel_bindings (
channel_binding_id TEXT PRIMARY KEY,
wallet TEXT NOT NULL,
chain_id INTEGER NOT NULL,
device_id TEXT NOT NULL,
platform TEXT NOT NULL,
org_root_id TEXT NOT NULL,
principal_id TEXT NOT NULL,
principal_role TEXT NOT NULL,
app_version TEXT NOT NULL,
push_provider TEXT NOT NULL,
push_token TEXT,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
removed_at TEXT,
UNIQUE(wallet, device_id)
);`,
`CREATE INDEX IF NOT EXISTS idx_member_channel_bindings_wallet ON member_channel_bindings(wallet);`,
`CREATE INDEX IF NOT EXISTS idx_member_channel_bindings_org ON member_channel_bindings(org_root_id);`,
`CREATE TABLE IF NOT EXISTS member_channel_events (
seq INTEGER PRIMARY KEY AUTOINCREMENT,
event_id TEXT NOT NULL UNIQUE,
wallet TEXT NOT NULL,
org_root_id TEXT NOT NULL,
principal_id TEXT NOT NULL,
class TEXT NOT NULL,
created_at TEXT NOT NULL,
title TEXT NOT NULL,
body TEXT NOT NULL,
dedupe_key TEXT NOT NULL,
requires_ack INTEGER NOT NULL DEFAULT 1,
policy_hash TEXT NOT NULL,
payload_json TEXT NOT NULL,
visibility_scope TEXT NOT NULL,
UNIQUE(wallet, org_root_id, dedupe_key)
);`,
`CREATE INDEX IF NOT EXISTS idx_member_channel_events_wallet_seq ON member_channel_events(wallet, seq);`,
`CREATE INDEX IF NOT EXISTS idx_member_channel_events_org_seq ON member_channel_events(org_root_id, seq);`,
`CREATE TABLE IF NOT EXISTS member_channel_event_acks (
event_id TEXT NOT NULL,
wallet TEXT NOT NULL,
device_id TEXT NOT NULL,
acknowledged_at TEXT NOT NULL,
PRIMARY KEY(event_id, wallet, device_id)
);`,
`CREATE TABLE IF NOT EXISTS member_support_tickets (
ticket_id TEXT PRIMARY KEY,
wallet TEXT NOT NULL,
org_root_id TEXT NOT NULL,
principal_id TEXT NOT NULL,
category TEXT NOT NULL,
summary TEXT NOT NULL,
context_json TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL
);`,
`CREATE INDEX IF NOT EXISTS idx_member_support_tickets_wallet ON member_support_tickets(wallet);`,
}
for _, stmt := range statements {
if _, err := s.db.ExecContext(ctx, stmt); err != nil {
return fmt.Errorf("migrate: %w", err)
}
}
// Backward-compatible column adds for already-initialized local DBs.
if err := s.ensureColumn(ctx, "quotes", "payer_address", "TEXT"); err != nil {
return err
}
if err := s.ensureColumn(ctx, "quotes", "sponsorship_mode", "TEXT NOT NULL DEFAULT 'self'"); err != nil {
return err
}
if err := s.ensureColumn(ctx, "quotes", "sponsor_org_root_id", "TEXT"); err != nil {
return err
}
return nil
}
func (s *store) ensureColumn(ctx context.Context, table, column, columnDef string) error {
rows, err := s.db.QueryContext(ctx, "PRAGMA table_info("+table+")")
if err != nil {
return fmt.Errorf("migrate table_info %s: %w", table, err)
}
defer rows.Close()
for rows.Next() {
var (
cid int
name string
ctype string
notNull int
defaultVal sql.NullString
pk int
)
if err := rows.Scan(&cid, &name, &ctype, &notNull, &defaultVal, &pk); err != nil {
return fmt.Errorf("migrate scan table_info %s: %w", table, err)
}
if strings.EqualFold(name, column) {
return nil
}
}
if err := rows.Err(); err != nil {
return fmt.Errorf("migrate iterate table_info %s: %w", table, err)
}
if _, err := s.db.ExecContext(ctx, "ALTER TABLE "+table+" ADD COLUMN "+column+" "+columnDef); err != nil {
return fmt.Errorf("migrate add column %s.%s: %w", table, column, err)
}
return nil
}
func (s *store) putDesignation(ctx context.Context, rec designationRecord) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO designations (
code, display_token, intent_id, nonce, origin, locale, address, chain_id, issued_at, expires_at, verified_at, membership_status, membership_tx_hash, activated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(code) DO UPDATE SET
display_token=excluded.display_token,
intent_id=excluded.intent_id,
nonce=excluded.nonce,
origin=excluded.origin,
locale=excluded.locale,
address=excluded.address,
chain_id=excluded.chain_id,
issued_at=excluded.issued_at,
expires_at=excluded.expires_at,
verified_at=excluded.verified_at,
membership_status=excluded.membership_status,
membership_tx_hash=excluded.membership_tx_hash,
activated_at=excluded.activated_at
`, rec.Code, rec.DisplayToken, rec.IntentID, rec.Nonce, rec.Origin, rec.Locale, rec.Address, rec.ChainID, rec.IssuedAt.Format(time.RFC3339Nano), rec.ExpiresAt.Format(time.RFC3339Nano), formatNullableTime(rec.VerifiedAt), strings.ToLower(rec.MembershipStatus), nullableString(rec.MembershipTxHash), formatNullableTime(rec.ActivatedAt))
return err
}
func (s *store) getDesignationByIntent(ctx context.Context, intentID string) (designationRecord, error) {
row := s.db.QueryRowContext(ctx, `
SELECT code, display_token, intent_id, nonce, origin, locale, address, chain_id, issued_at, expires_at, verified_at, membership_status, membership_tx_hash, activated_at
FROM designations
WHERE intent_id = ?
`, intentID)
return scanDesignation(row)
}
func (s *store) getDesignationByCode(ctx context.Context, code string) (designationRecord, error) {
row := s.db.QueryRowContext(ctx, `
SELECT code, display_token, intent_id, nonce, origin, locale, address, chain_id, issued_at, expires_at, verified_at, membership_status, membership_tx_hash, activated_at
FROM designations
WHERE code = ?
`, code)
return scanDesignation(row)
}
func (s *store) getDesignationByAddress(ctx context.Context, address string) (designationRecord, error) {
row := s.db.QueryRowContext(ctx, `
SELECT code, display_token, intent_id, nonce, origin, locale, address, chain_id, issued_at, expires_at, verified_at, membership_status, membership_tx_hash, activated_at
FROM designations
WHERE address = ?
ORDER BY issued_at DESC
LIMIT 1
`, strings.ToLower(strings.TrimSpace(address)))
return scanDesignation(row)
}
func scanDesignation(row interface{ Scan(dest ...any) error }) (designationRecord, error) {
var rec designationRecord
var issued, expires, verified, activated sql.NullString
var membershipTx sql.NullString
err := row.Scan(&rec.Code, &rec.DisplayToken, &rec.IntentID, &rec.Nonce, &rec.Origin, &rec.Locale, &rec.Address, &rec.ChainID, &issued, &expires, &verified, &rec.MembershipStatus, &membershipTx, &activated)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return designationRecord{}, errNotFound
}
return designationRecord{}, err
}
rec.IssuedAt = parseRFC3339Nullable(issued)
rec.ExpiresAt = parseRFC3339Nullable(expires)
rec.VerifiedAt = parseRFC3339Ptr(verified)
rec.MembershipTxHash = membershipTx.String
rec.ActivatedAt = parseRFC3339Ptr(activated)
return rec, nil
}
func (s *store) putQuote(ctx context.Context, quote quoteRecord) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO quotes (
quote_id, designation_code, address, payer_address, chain_id, currency, amount_atomic, decimals, contract_address, method, calldata, value_hex, sponsorship_mode, sponsor_org_root_id, created_at, expires_at, confirmed_at, confirmed_tx_hash
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(quote_id) DO UPDATE SET
designation_code=excluded.designation_code,
address=excluded.address,
payer_address=excluded.payer_address,
chain_id=excluded.chain_id,
currency=excluded.currency,
amount_atomic=excluded.amount_atomic,
decimals=excluded.decimals,
contract_address=excluded.contract_address,
method=excluded.method,
calldata=excluded.calldata,
value_hex=excluded.value_hex,
sponsorship_mode=excluded.sponsorship_mode,
sponsor_org_root_id=excluded.sponsor_org_root_id,
created_at=excluded.created_at,
expires_at=excluded.expires_at,
confirmed_at=excluded.confirmed_at,
confirmed_tx_hash=excluded.confirmed_tx_hash
`, quote.QuoteID, quote.DesignationCode, quote.Address, nullableString(quote.PayerAddress), quote.ChainID, quote.Currency, quote.AmountAtomic, quote.Decimals, quote.ContractAddress, quote.Method, quote.Calldata, quote.ValueHex, strings.ToLower(strings.TrimSpace(quote.SponsorshipMode)), nullableString(quote.SponsorOrgRootID), quote.CreatedAt.Format(time.RFC3339Nano), quote.ExpiresAt.Format(time.RFC3339Nano), formatNullableTime(quote.ConfirmedAt), nullableString(quote.ConfirmedTxHash))
return err
}
func (s *store) getQuote(ctx context.Context, quoteID string) (quoteRecord, error) {
row := s.db.QueryRowContext(ctx, `
SELECT quote_id, designation_code, address, payer_address, chain_id, currency, amount_atomic, decimals, contract_address, method, calldata, value_hex, sponsorship_mode, sponsor_org_root_id, created_at, expires_at, confirmed_at, confirmed_tx_hash
FROM quotes
WHERE quote_id = ?
`, quoteID)
var rec quoteRecord
var created, expires, confirmed sql.NullString
var confirmedTx, payerAddress, sponsorOrgRootID sql.NullString
err := row.Scan(
&rec.QuoteID,
&rec.DesignationCode,
&rec.Address,
&payerAddress,
&rec.ChainID,
&rec.Currency,
&rec.AmountAtomic,
&rec.Decimals,
&rec.ContractAddress,
&rec.Method,
&rec.Calldata,
&rec.ValueHex,
&rec.SponsorshipMode,
&sponsorOrgRootID,
&created,
&expires,
&confirmed,
&confirmedTx,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return quoteRecord{}, errNotFound
}
return quoteRecord{}, err
}
rec.PayerAddress = payerAddress.String
rec.SponsorOrgRootID = sponsorOrgRootID.String
rec.CreatedAt = parseRFC3339Nullable(created)
rec.ExpiresAt = parseRFC3339Nullable(expires)
rec.ConfirmedAt = parseRFC3339Ptr(confirmed)
rec.ConfirmedTxHash = confirmedTx.String
return rec, nil
}
func (s *store) putGovernancePrincipal(ctx context.Context, rec governancePrincipalRecord) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO governance_principals (
wallet, org_root_id, principal_id, principal_role, entitlement_id, entitlement_status, access_class, availability_state, lease_expires_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(wallet) DO UPDATE SET
org_root_id=excluded.org_root_id,
principal_id=excluded.principal_id,
principal_role=excluded.principal_role,
entitlement_id=excluded.entitlement_id,
entitlement_status=excluded.entitlement_status,
access_class=excluded.access_class,
availability_state=excluded.availability_state,
lease_expires_at=excluded.lease_expires_at,
updated_at=excluded.updated_at
`, rec.Wallet, rec.OrgRootID, rec.PrincipalID, rec.PrincipalRole, rec.EntitlementID, rec.EntitlementStatus, rec.AccessClass, rec.AvailabilityState, formatNullableTime(rec.LeaseExpiresAt), rec.UpdatedAt.UTC().Format(time.RFC3339Nano))
return err
}
func (s *store) getGovernancePrincipal(ctx context.Context, wallet string) (governancePrincipalRecord, error) {
row := s.db.QueryRowContext(ctx, `
SELECT wallet, org_root_id, principal_id, principal_role, entitlement_id, entitlement_status, access_class, availability_state, lease_expires_at, updated_at
FROM governance_principals
WHERE wallet = ?
`, strings.ToLower(strings.TrimSpace(wallet)))
var rec governancePrincipalRecord
var leaseExpiresAt, updatedAt sql.NullString
err := row.Scan(&rec.Wallet, &rec.OrgRootID, &rec.PrincipalID, &rec.PrincipalRole, &rec.EntitlementID, &rec.EntitlementStatus, &rec.AccessClass, &rec.AvailabilityState, &leaseExpiresAt, &updatedAt)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return governancePrincipalRecord{}, errNotFound
}
return governancePrincipalRecord{}, err
}
rec.LeaseExpiresAt = parseRFC3339Ptr(leaseExpiresAt)
rec.UpdatedAt = parseRFC3339Nullable(updatedAt)
return rec, nil
}
func (s *store) putGovernanceInstallToken(ctx context.Context, rec governanceInstallTokenRecord) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO governance_install_tokens (
install_token, wallet, org_root_id, principal_id, principal_role, device_id, entitlement_id, package_hash, runtime_version, policy_hash, issued_at, expires_at, consumed_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(install_token) DO UPDATE SET
wallet=excluded.wallet,
org_root_id=excluded.org_root_id,
principal_id=excluded.principal_id,
principal_role=excluded.principal_role,
device_id=excluded.device_id,
entitlement_id=excluded.entitlement_id,
package_hash=excluded.package_hash,
runtime_version=excluded.runtime_version,
policy_hash=excluded.policy_hash,
issued_at=excluded.issued_at,
expires_at=excluded.expires_at,
consumed_at=excluded.consumed_at
`, rec.InstallToken, rec.Wallet, rec.OrgRootID, rec.PrincipalID, rec.PrincipalRole, rec.DeviceID, rec.EntitlementID, rec.PackageHash, rec.RuntimeVersion, rec.PolicyHash, rec.IssuedAt.UTC().Format(time.RFC3339Nano), rec.ExpiresAt.UTC().Format(time.RFC3339Nano), formatNullableTime(rec.ConsumedAt))
return err
}
func (s *store) getGovernanceInstallToken(ctx context.Context, token string) (governanceInstallTokenRecord, error) {
row := s.db.QueryRowContext(ctx, `
SELECT install_token, wallet, org_root_id, principal_id, principal_role, device_id, entitlement_id, package_hash, runtime_version, policy_hash, issued_at, expires_at, consumed_at
FROM governance_install_tokens
WHERE install_token = ?
`, strings.TrimSpace(token))
var rec governanceInstallTokenRecord
var issuedAt, expiresAt, consumedAt sql.NullString
err := row.Scan(&rec.InstallToken, &rec.Wallet, &rec.OrgRootID, &rec.PrincipalID, &rec.PrincipalRole, &rec.DeviceID, &rec.EntitlementID, &rec.PackageHash, &rec.RuntimeVersion, &rec.PolicyHash, &issuedAt, &expiresAt, &consumedAt)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return governanceInstallTokenRecord{}, errNotFound
}
return governanceInstallTokenRecord{}, err
}
rec.IssuedAt = parseRFC3339Nullable(issuedAt)
rec.ExpiresAt = parseRFC3339Nullable(expiresAt)
rec.ConsumedAt = parseRFC3339Ptr(consumedAt)
return rec, nil
}
func (s *store) putGovernanceInstall(ctx context.Context, rec governanceInstallRecord) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO governance_installs (
install_token, wallet, device_id, entitlement_id, runtime_version, package_hash, policy_hash, launcher_receipt_ref, installed_at, activated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(install_token) DO UPDATE SET
wallet=excluded.wallet,
device_id=excluded.device_id,
entitlement_id=excluded.entitlement_id,
runtime_version=excluded.runtime_version,
package_hash=excluded.package_hash,
policy_hash=excluded.policy_hash,
launcher_receipt_ref=excluded.launcher_receipt_ref,
installed_at=excluded.installed_at,
activated_at=excluded.activated_at
`, rec.InstallToken, rec.Wallet, rec.DeviceID, rec.EntitlementID, rec.RuntimeVersion, rec.PackageHash, rec.PolicyHash, nullableString(rec.LauncherReceiptRef), rec.InstalledAt.UTC().Format(time.RFC3339Nano), rec.ActivatedAt.UTC().Format(time.RFC3339Nano))
return err
}
func (s *store) getGovernanceInstallByToken(ctx context.Context, token string) (governanceInstallRecord, error) {
row := s.db.QueryRowContext(ctx, `
SELECT install_token, wallet, device_id, entitlement_id, runtime_version, package_hash, policy_hash, launcher_receipt_ref, installed_at, activated_at
FROM governance_installs
WHERE install_token = ?
`, strings.TrimSpace(token))
return scanGovernanceInstall(row)
}
func (s *store) getGovernanceInstallByDevice(ctx context.Context, wallet, deviceID string) (governanceInstallRecord, error) {
row := s.db.QueryRowContext(ctx, `
SELECT install_token, wallet, device_id, entitlement_id, runtime_version, package_hash, policy_hash, launcher_receipt_ref, installed_at, activated_at
FROM governance_installs
WHERE wallet = ? AND device_id = ?
ORDER BY activated_at DESC
LIMIT 1
`, strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(deviceID))
return scanGovernanceInstall(row)
}
func scanGovernanceInstall(row interface{ Scan(dest ...any) error }) (governanceInstallRecord, error) {
var rec governanceInstallRecord
var launcherReceiptRef, installedAt, activatedAt sql.NullString
err := row.Scan(&rec.InstallToken, &rec.Wallet, &rec.DeviceID, &rec.EntitlementID, &rec.RuntimeVersion, &rec.PackageHash, &rec.PolicyHash, &launcherReceiptRef, &installedAt, &activatedAt)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return governanceInstallRecord{}, errNotFound
}
return governanceInstallRecord{}, err
}
rec.LauncherReceiptRef = launcherReceiptRef.String
rec.InstalledAt = parseRFC3339Nullable(installedAt)
rec.ActivatedAt = parseRFC3339Nullable(activatedAt)
return rec, nil
}
func (s *store) putMemberChannelBinding(ctx context.Context, rec memberChannelBindingRecord) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO member_channel_bindings (
channel_binding_id, wallet, chain_id, device_id, platform, org_root_id, principal_id, principal_role, app_version, push_provider, push_token, status, created_at, updated_at, removed_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(wallet, device_id) DO UPDATE SET
channel_binding_id=excluded.channel_binding_id,
chain_id=excluded.chain_id,
platform=excluded.platform,
org_root_id=excluded.org_root_id,
principal_id=excluded.principal_id,
principal_role=excluded.principal_role,
app_version=excluded.app_version,
push_provider=excluded.push_provider,
push_token=excluded.push_token,
status=excluded.status,
updated_at=excluded.updated_at,
removed_at=excluded.removed_at
`, rec.ChannelBindingID, rec.Wallet, rec.ChainID, rec.DeviceID, rec.Platform, rec.OrgRootID, rec.PrincipalID, rec.PrincipalRole, rec.AppVersion, rec.PushProvider, nullableString(rec.PushToken), rec.Status, rec.CreatedAt.UTC().Format(time.RFC3339Nano), rec.UpdatedAt.UTC().Format(time.RFC3339Nano), formatNullableTime(rec.RemovedAt))
return err
}
func (s *store) getMemberChannelBinding(ctx context.Context, wallet, deviceID string) (memberChannelBindingRecord, error) {
row := s.db.QueryRowContext(ctx, `
SELECT channel_binding_id, wallet, chain_id, device_id, platform, org_root_id, principal_id, principal_role, app_version, push_provider, push_token, status, created_at, updated_at, removed_at
FROM member_channel_bindings
WHERE wallet = ? AND device_id = ? AND status = 'active'
`, strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(deviceID))
return scanMemberChannelBinding(row)
}
func (s *store) getMemberChannelBindingByPrincipal(ctx context.Context, wallet, orgRootID, principalID string) (memberChannelBindingRecord, error) {
row := s.db.QueryRowContext(ctx, `
SELECT channel_binding_id, wallet, chain_id, device_id, platform, org_root_id, principal_id, principal_role, app_version, push_provider, push_token, status, created_at, updated_at, removed_at
FROM member_channel_bindings
WHERE wallet = ? AND org_root_id = ? AND principal_id = ? AND status = 'active'
ORDER BY updated_at DESC
LIMIT 1
`, strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(orgRootID), strings.TrimSpace(principalID))
return scanMemberChannelBinding(row)
}
func scanMemberChannelBinding(row interface{ Scan(dest ...any) error }) (memberChannelBindingRecord, error) {
var rec memberChannelBindingRecord
var pushToken, createdAt, updatedAt, removedAt sql.NullString
err := row.Scan(
&rec.ChannelBindingID,
&rec.Wallet,
&rec.ChainID,
&rec.DeviceID,
&rec.Platform,
&rec.OrgRootID,
&rec.PrincipalID,
&rec.PrincipalRole,
&rec.AppVersion,
&rec.PushProvider,
&pushToken,
&rec.Status,
&createdAt,
&updatedAt,
&removedAt,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return memberChannelBindingRecord{}, errNotFound
}
return memberChannelBindingRecord{}, err
}
rec.PushToken = pushToken.String
rec.CreatedAt = parseRFC3339Nullable(createdAt)
rec.UpdatedAt = parseRFC3339Nullable(updatedAt)
rec.RemovedAt = parseRFC3339Ptr(removedAt)
return rec, nil
}
func (s *store) removeMemberChannelBinding(ctx context.Context, wallet, deviceID string, removedAt time.Time) error {
result, err := s.db.ExecContext(ctx, `
UPDATE member_channel_bindings
SET status = 'removed', removed_at = ?, updated_at = ?
WHERE wallet = ? AND device_id = ? AND status = 'active'
`, removedAt.UTC().Format(time.RFC3339Nano), removedAt.UTC().Format(time.RFC3339Nano), strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(deviceID))
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
return errNotFound
}
return nil
}
func (s *store) putMemberChannelEvent(ctx context.Context, rec memberChannelEventRecord) error {
if strings.TrimSpace(rec.EventID) == "" {
id, err := randomHex(8)
if err != nil {
return err
}
rec.EventID = "evt_" + id
}
_, err := s.db.ExecContext(ctx, `
INSERT INTO member_channel_events (
event_id, wallet, org_root_id, principal_id, class, created_at, title, body, dedupe_key, requires_ack, policy_hash, payload_json, visibility_scope
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(wallet, org_root_id, dedupe_key) DO NOTHING
`, rec.EventID, strings.ToLower(strings.TrimSpace(rec.Wallet)), strings.TrimSpace(rec.OrgRootID), strings.TrimSpace(rec.PrincipalID), strings.TrimSpace(rec.Class), rec.CreatedAt.UTC().Format(time.RFC3339Nano), rec.Title, rec.Body, rec.DedupeKey, boolToInt(rec.RequiresAck), rec.PolicyHash, rec.PayloadJSON, strings.TrimSpace(rec.VisibilityScope))
return err
}
func (s *store) getMemberChannelEventSeqByID(ctx context.Context, eventID string) (int64, error) {
row := s.db.QueryRowContext(ctx, `
SELECT seq
FROM member_channel_events
WHERE event_id = ?
`, strings.TrimSpace(eventID))
var seq int64
if err := row.Scan(&seq); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, errNotFound
}
return 0, err
}
return seq, nil
}
func (s *store) listMemberChannelEvents(ctx context.Context, wallet, orgRootID string, includeOwnerAdmin bool, cursor string, limit int) ([]memberChannelEventRecord, string, error) {
cursorSeq := int64(0)
if strings.TrimSpace(cursor) != "" {
seq, err := s.getMemberChannelEventSeqByID(ctx, cursor)
if err == nil {
cursorSeq = seq
}
}
visibilityClause := "visibility_scope = 'member'"
if includeOwnerAdmin {
visibilityClause = "(visibility_scope = 'member' OR visibility_scope = 'owner_admin')"
}
rows, err := s.db.QueryContext(ctx, `
SELECT seq, event_id, wallet, org_root_id, principal_id, class, created_at, title, body, dedupe_key, requires_ack, policy_hash, payload_json, visibility_scope
FROM member_channel_events
WHERE wallet = ? AND org_root_id = ? AND seq > ? AND `+visibilityClause+`
ORDER BY seq ASC
LIMIT ?
`, strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(orgRootID), cursorSeq, limit)
if err != nil {
return nil, "", err
}
defer rows.Close()
events := make([]memberChannelEventRecord, 0, limit)
nextCursor := ""
for rows.Next() {
var rec memberChannelEventRecord
var createdAt sql.NullString
var requiresAck int
if err := rows.Scan(
&rec.Seq,
&rec.EventID,
&rec.Wallet,
&rec.OrgRootID,
&rec.PrincipalID,
&rec.Class,
&createdAt,
&rec.Title,
&rec.Body,
&rec.DedupeKey,
&requiresAck,
&rec.PolicyHash,
&rec.PayloadJSON,
&rec.VisibilityScope,
); err != nil {
return nil, "", err
}
rec.CreatedAt = parseRFC3339Nullable(createdAt)
rec.RequiresAck = requiresAck == 1
events = append(events, rec)
nextCursor = rec.EventID
}
if err := rows.Err(); err != nil {
return nil, "", err
}
return events, nextCursor, nil
}
func (s *store) getMemberChannelEventByID(ctx context.Context, eventID string) (memberChannelEventRecord, error) {
row := s.db.QueryRowContext(ctx, `
SELECT seq, event_id, wallet, org_root_id, principal_id, class, created_at, title, body, dedupe_key, requires_ack, policy_hash, payload_json, visibility_scope
FROM member_channel_events
WHERE event_id = ?
`, strings.TrimSpace(eventID))
var rec memberChannelEventRecord
var createdAt sql.NullString
var requiresAck int
err := row.Scan(&rec.Seq, &rec.EventID, &rec.Wallet, &rec.OrgRootID, &rec.PrincipalID, &rec.Class, &createdAt, &rec.Title, &rec.Body, &rec.DedupeKey, &requiresAck, &rec.PolicyHash, &rec.PayloadJSON, &rec.VisibilityScope)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return memberChannelEventRecord{}, errNotFound
}
return memberChannelEventRecord{}, err
}
rec.CreatedAt = parseRFC3339Nullable(createdAt)
rec.RequiresAck = requiresAck == 1
return rec, nil
}
func (s *store) putMemberChannelEventAck(ctx context.Context, eventID, wallet, deviceID string, acknowledgedAt time.Time) (time.Time, error) {
_, err := s.db.ExecContext(ctx, `
INSERT INTO member_channel_event_acks (event_id, wallet, device_id, acknowledged_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(event_id, wallet, device_id) DO NOTHING
`, strings.TrimSpace(eventID), strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(deviceID), acknowledgedAt.UTC().Format(time.RFC3339Nano))
if err != nil {
return time.Time{}, err
}
row := s.db.QueryRowContext(ctx, `
SELECT acknowledged_at
FROM member_channel_event_acks
WHERE event_id = ? AND wallet = ? AND device_id = ?
`, strings.TrimSpace(eventID), strings.ToLower(strings.TrimSpace(wallet)), strings.TrimSpace(deviceID))
var ackRaw sql.NullString
if err := row.Scan(&ackRaw); err != nil {
return time.Time{}, err
}
return parseRFC3339Nullable(ackRaw), nil
}
func (s *store) putMemberChannelSupportTicket(ctx context.Context, rec memberChannelSupportTicketRecord) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO member_support_tickets (
ticket_id, wallet, org_root_id, principal_id, category, summary, context_json, status, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`, rec.TicketID, strings.ToLower(strings.TrimSpace(rec.Wallet)), strings.TrimSpace(rec.OrgRootID), strings.TrimSpace(rec.PrincipalID), strings.TrimSpace(rec.Category), rec.Summary, rec.ContextJSON, strings.TrimSpace(rec.Status), rec.CreatedAt.UTC().Format(time.RFC3339Nano))
return err
}
func parseRFC3339Nullable(raw sql.NullString) time.Time {
if !raw.Valid || strings.TrimSpace(raw.String) == "" {
return time.Time{}
}
ts, err := time.Parse(time.RFC3339Nano, raw.String)
if err != nil {
return time.Time{}
}
return ts.UTC()
}
func parseRFC3339Ptr(raw sql.NullString) *time.Time {
ts := parseRFC3339Nullable(raw)
if ts.IsZero() {
return nil
}
return &ts
}
func formatNullableTime(ts *time.Time) any {
if ts == nil || ts.IsZero() {
return nil
}
return ts.UTC().Format(time.RFC3339Nano)
}
func nullableString(v string) any {
if strings.TrimSpace(v) == "" {
return nil
}
return v
}
func boolToInt(v bool) int {
if v {
return 1
}
return 0
}