You've already forked ionscale
mirror of
https://github.com/jsiebens/ionscale.git
synced 2026-04-05 12:32:58 +01:00
chore: fixes
This commit is contained in:
@@ -6,7 +6,6 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/lib/pq"
|
||||
"github.com/pkg/errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -16,13 +15,9 @@ type pgPubsub struct {
|
||||
target Pubsub
|
||||
}
|
||||
|
||||
// NewPubsub creates a new Pubsub implementation using a PostgreSQL connection.
|
||||
func NewPubsub(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) {
|
||||
// Creates a new listener using pq.
|
||||
errCh := make(chan error)
|
||||
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(event pq.ListenerEventType, err error) {
|
||||
// This callback gets events whenever the connection state changes.
|
||||
// Don't send if the errChannel has already been closed.
|
||||
select {
|
||||
case <-errCh:
|
||||
return
|
||||
@@ -35,14 +30,14 @@ func NewPubsub(ctx context.Context, database *sql.DB, connectURL string) (Pubsub
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
return nil, errors.Errorf("create pq listener: %w", err)
|
||||
return nil, fmt.Errorf("create pq listener: %w", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
if err := listener.Listen("ionscale_events"); err != nil {
|
||||
return nil, errors.Errorf("listen: %w", err)
|
||||
return nil, fmt.Errorf("listen: %w", err)
|
||||
}
|
||||
|
||||
pubsub := &pgPubsub{
|
||||
@@ -55,7 +50,6 @@ func NewPubsub(ctx context.Context, database *sql.DB, connectURL string) (Pubsub
|
||||
return pubsub, nil
|
||||
}
|
||||
|
||||
// Close closes the pubsub instance.
|
||||
func (p *pgPubsub) Close() error {
|
||||
return p.pgListener.Close()
|
||||
}
|
||||
@@ -75,18 +69,14 @@ func (p *pgPubsub) Publish(tailnet uint64, message *Signal) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't
|
||||
// support the first parameter being a prepared statement.
|
||||
//nolint:gosec
|
||||
_, err = p.db.ExecContext(context.Background(), `select pg_notify(`+pq.QuoteLiteral("ionscale_events")+`, $1)`, payload)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return errors.Errorf("exec pg_notify: %w", err)
|
||||
return fmt.Errorf("exec pg_notify: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// listen begins receiving messages on the pq listener.
|
||||
func (p *pgPubsub) listen(ctx context.Context) {
|
||||
var (
|
||||
notif *pq.Notification
|
||||
|
||||
Reference in New Issue
Block a user