chore: remove pubsub and introduce session manager

This commit is contained in:
Johan Siebens
2022-12-24 12:00:00 +01:00
parent a8e8d1aa49
commit 61d9b40144
21 changed files with 202 additions and 517 deletions
+1 -2
View File
@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/bufbuild/connect-go"
"github.com/jsiebens/ionscale/internal/broker"
"github.com/jsiebens/ionscale/internal/domain"
"github.com/jsiebens/ionscale/internal/errors"
"github.com/jsiebens/ionscale/internal/mapping"
@@ -57,7 +56,7 @@ func (s *Service) SetACLPolicy(ctx context.Context, req *connect.Request[api.Set
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(tailnet.ID, &broker.Signal{ACLUpdated: true})
s.sessionManager.NotifyAll(tailnet.ID)
return connect.NewResponse(&api.SetACLPolicyResponse{}), nil
}
+2 -3
View File
@@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"github.com/bufbuild/connect-go"
"github.com/jsiebens/ionscale/internal/broker"
"github.com/jsiebens/ionscale/internal/domain"
"github.com/jsiebens/ionscale/internal/errors"
"github.com/jsiebens/ionscale/internal/util"
@@ -58,7 +57,7 @@ func (s *Service) SetDefaultDERPMap(ctx context.Context, req *connect.Request[ap
}
for _, t := range tailnets {
s.pubsub.Publish(t.ID, &broker.Signal{})
s.sessionManager.NotifyAll(t.ID)
}
return connect.NewResponse(&api.SetDefaultDERPMapResponse{Value: req.Msg.Value}), nil
@@ -82,7 +81,7 @@ func (s *Service) ResetDefaultDERPMap(ctx context.Context, req *connect.Request[
}
for _, t := range tailnets {
s.pubsub.Publish(t.ID, &broker.Signal{})
s.sessionManager.NotifyAll(t.ID)
}
return connect.NewResponse(&api.ResetDefaultDERPMapResponse{}), nil
+1 -2
View File
@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/bufbuild/connect-go"
"github.com/jsiebens/ionscale/internal/broker"
"github.com/jsiebens/ionscale/internal/config"
"github.com/jsiebens/ionscale/internal/domain"
"github.com/jsiebens/ionscale/internal/errors"
@@ -74,7 +73,7 @@ func (s *Service) SetDNSConfig(ctx context.Context, req *connect.Request[api.Set
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(tailnet.ID, &broker.Signal{DNSUpdated: true})
s.sessionManager.NotifyAll(tailnet.ID)
resp := &api.SetDNSConfigResponse{Config: dnsConfig}
+9 -12
View File
@@ -4,8 +4,6 @@ import (
"context"
"fmt"
"github.com/bufbuild/connect-go"
"github.com/jsiebens/ionscale/internal/broker"
"github.com/jsiebens/ionscale/internal/config"
"github.com/jsiebens/ionscale/internal/domain"
"github.com/jsiebens/ionscale/internal/errors"
api "github.com/jsiebens/ionscale/pkg/gen/ionscale/v1"
@@ -22,10 +20,9 @@ func (s *Service) machineToApi(m *domain.Machine) *api.Machine {
name = fmt.Sprintf("%s-%d", m.Name, m.NameIdx)
}
online := false
online := s.sessionManager.HasSession(m.TailnetID, m.ID)
if m.LastSeen != nil {
lastSeen = timestamppb.New(*m.LastSeen)
online = m.LastSeen.After(time.Now().Add(-config.KeepAliveInterval()))
}
return &api.Machine{
@@ -127,7 +124,7 @@ func (s *Service) DeleteMachine(ctx context.Context, req *connect.Request[api.De
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(m.TailnetID, &broker.Signal{PeersRemoved: []uint64{m.ID}})
s.sessionManager.NotifyAll(m.TailnetID)
return connect.NewResponse(&api.DeleteMachineResponse{}), nil
}
@@ -156,7 +153,7 @@ func (s *Service) ExpireMachine(ctx context.Context, req *connect.Request[api.Ex
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(m.TailnetID, &broker.Signal{PeerUpdated: &m.ID})
s.sessionManager.NotifyAll(m.TailnetID)
return connect.NewResponse(&api.ExpireMachineResponse{}), nil
}
@@ -184,7 +181,7 @@ func (s *Service) AuthorizeMachine(ctx context.Context, req *connect.Request[api
}
}
s.pubsub.Publish(m.TailnetID, &broker.Signal{PeerUpdated: &m.ID})
s.sessionManager.NotifyAll(m.TailnetID)
return connect.NewResponse(&api.AuthorizeMachineResponse{}), nil
}
@@ -256,7 +253,7 @@ func (s *Service) EnableMachineRoutes(ctx context.Context, req *connect.Request[
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(m.TailnetID, &broker.Signal{PeerUpdated: &m.ID})
s.sessionManager.NotifyAll(m.TailnetID)
response := api.EnableMachineRoutesResponse{
MachineId: m.ID,
@@ -305,7 +302,7 @@ func (s *Service) DisableMachineRoutes(ctx context.Context, req *connect.Request
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(m.TailnetID, &broker.Signal{PeerUpdated: &m.ID})
s.sessionManager.NotifyAll(m.TailnetID)
response := api.DisableMachineRoutesResponse{
MachineId: m.ID,
@@ -352,7 +349,7 @@ func (s *Service) EnableExitNode(ctx context.Context, req *connect.Request[api.E
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(m.TailnetID, &broker.Signal{PeerUpdated: &m.ID})
s.sessionManager.NotifyAll(m.TailnetID)
response := api.EnableExitNodeResponse{
MachineId: m.ID,
@@ -403,7 +400,7 @@ func (s *Service) DisableExitNode(ctx context.Context, req *connect.Request[api.
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(m.TailnetID, &broker.Signal{PeerUpdated: &m.ID})
s.sessionManager.NotifyAll(m.TailnetID)
response := api.DisableExitNodeResponse{
MachineId: m.ID,
@@ -440,7 +437,7 @@ func (s *Service) SetMachineKeyExpiry(ctx context.Context, req *connect.Request[
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(m.TailnetID, &broker.Signal{PeerUpdated: &m.ID})
s.sessionManager.NotifyAll(m.TailnetID)
return connect.NewResponse(&api.SetMachineKeyExpiryResponse{}), nil
}
+10 -10
View File
@@ -4,27 +4,27 @@ import (
"context"
"github.com/bufbuild/connect-go"
"github.com/jsiebens/ionscale/internal/auth"
"github.com/jsiebens/ionscale/internal/broker"
"github.com/jsiebens/ionscale/internal/config"
"github.com/jsiebens/ionscale/internal/core"
"github.com/jsiebens/ionscale/internal/domain"
"github.com/jsiebens/ionscale/internal/version"
api "github.com/jsiebens/ionscale/pkg/gen/ionscale/v1"
)
func NewService(config *config.Config, authProvider auth.Provider, repository domain.Repository, pubsub broker.Pubsub) *Service {
func NewService(config *config.Config, authProvider auth.Provider, repository domain.Repository, sessionManager core.PollMapSessionManager) *Service {
return &Service{
config: config,
authProvider: authProvider,
repository: repository,
pubsub: pubsub,
config: config,
authProvider: authProvider,
repository: repository,
sessionManager: sessionManager,
}
}
type Service struct {
config *config.Config
authProvider auth.Provider
repository domain.Repository
pubsub broker.Pubsub
config *config.Config
authProvider auth.Provider
repository domain.Repository
sessionManager core.PollMapSessionManager
}
func (s *Service) GetVersion(_ context.Context, _ *connect.Request[api.GetVersionRequest]) (*connect.Response[api.GetVersionResponse], error) {
+9 -10
View File
@@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"github.com/bufbuild/connect-go"
"github.com/jsiebens/ionscale/internal/broker"
"github.com/jsiebens/ionscale/internal/domain"
"github.com/jsiebens/ionscale/internal/errors"
"github.com/jsiebens/ionscale/internal/util"
@@ -141,7 +140,7 @@ func (s *Service) DeleteTailnet(ctx context.Context, req *connect.Request[api.De
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(req.Msg.TailnetId, &broker.Signal{})
s.sessionManager.NotifyAll(req.Msg.TailnetId)
return connect.NewResponse(&api.DeleteTailnetResponse{}), nil
}
@@ -174,7 +173,7 @@ func (s *Service) SetDERPMap(ctx context.Context, req *connect.Request[api.SetDE
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(tailnet.ID, &broker.Signal{})
s.sessionManager.NotifyAll(tailnet.ID)
raw, err := json.Marshal(derpMap)
if err != nil {
@@ -204,7 +203,7 @@ func (s *Service) ResetDERPMap(ctx context.Context, req *connect.Request[api.Res
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(tailnet.ID, &broker.Signal{})
s.sessionManager.NotifyAll(tailnet.ID)
return connect.NewResponse(&api.ResetDERPMapResponse{}), nil
}
@@ -256,7 +255,7 @@ func (s *Service) EnableFileSharing(ctx context.Context, req *connect.Request[ap
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(tailnet.ID, &broker.Signal{})
s.sessionManager.NotifyAll(tailnet.ID)
}
return connect.NewResponse(&api.EnableFileSharingResponse{}), nil
@@ -282,7 +281,7 @@ func (s *Service) DisableFileSharing(ctx context.Context, req *connect.Request[a
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(tailnet.ID, &broker.Signal{})
s.sessionManager.NotifyAll(tailnet.ID)
}
return connect.NewResponse(&api.DisableFileSharingResponse{}), nil
@@ -308,7 +307,7 @@ func (s *Service) EnableServiceCollection(ctx context.Context, req *connect.Requ
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(tailnet.ID, &broker.Signal{})
s.sessionManager.NotifyAll(tailnet.ID)
}
return connect.NewResponse(&api.EnableServiceCollectionResponse{}), nil
@@ -334,7 +333,7 @@ func (s *Service) DisableServiceCollection(ctx context.Context, req *connect.Req
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(tailnet.ID, &broker.Signal{})
s.sessionManager.NotifyAll(tailnet.ID)
}
return connect.NewResponse(&api.DisableServiceCollectionResponse{}), nil
@@ -360,7 +359,7 @@ func (s *Service) EnableSSH(ctx context.Context, req *connect.Request[api.Enable
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(tailnet.ID, &broker.Signal{})
s.sessionManager.NotifyAll(tailnet.ID)
}
return connect.NewResponse(&api.EnableSSHResponse{}), nil
@@ -386,7 +385,7 @@ func (s *Service) DisableSSH(ctx context.Context, req *connect.Request[api.Disab
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(tailnet.ID, &broker.Signal{})
s.sessionManager.NotifyAll(tailnet.ID)
}
return connect.NewResponse(&api.DisableSSHResponse{}), nil
+1 -2
View File
@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/bufbuild/connect-go"
"github.com/jsiebens/ionscale/internal/broker"
"github.com/jsiebens/ionscale/internal/domain"
"github.com/jsiebens/ionscale/internal/errors"
api "github.com/jsiebens/ionscale/pkg/gen/ionscale/v1"
@@ -91,7 +90,7 @@ func (s *Service) DeleteUser(ctx context.Context, req *connect.Request[api.Delet
return nil, errors.Wrap(err, 0)
}
s.pubsub.Publish(user.TailnetID, &broker.Signal{})
s.sessionManager.NotifyAll(user.TailnetID)
return connect.NewResponse(&api.DeleteUserResponse{}), nil
}