chore: move mapping logic to seperate struct

This commit is contained in:
Johan Siebens
2024-01-04 09:23:35 +01:00
parent d6cc55cf5b
commit 473c3370ce
2 changed files with 198 additions and 154 deletions
+12 -154
View File
@@ -9,9 +9,7 @@ import (
"github.com/jsiebens/ionscale/internal/mapping"
"github.com/labstack/echo/v4"
"net/http"
"net/netip"
"tailscale.com/tailcfg"
"tailscale.com/types/opt"
"time"
)
@@ -90,10 +88,9 @@ func (h *PollNetMapHandler) handleUpdate(c echo.Context, binder bind.Binder, m *
return c.String(http.StatusOK, "")
}
var syncedPeers = make(map[uint64]bool)
var derpMapChecksum = ""
mapper := mapping.NewPollNetMapper(mapRequest, m.ID, h.repository, h.sessionManager)
response, syncedPeers, derpMapChecksum, err := h.createMapResponse(m, binder, mapRequest, false, make(map[uint64]bool), derpMapChecksum)
response, err := createMapResponse(mapper, binder, false, mapRequest.Compress)
if err != nil {
return logError(err)
}
@@ -104,7 +101,7 @@ func (h *PollNetMapHandler) handleUpdate(c echo.Context, binder bind.Binder, m *
// Listen to connection close
notify := c.Request().Context().Done()
keepAliveResponse, err := h.createKeepAliveResponse(binder, mapRequest)
keepAliveResponse, err := createKeepAliveResponse(binder, mapRequest)
if err != nil {
return logError(err)
}
@@ -157,7 +154,7 @@ func (h *PollNetMapHandler) handleUpdate(c echo.Context, binder bind.Binder, m *
var payload []byte
var payloadErr error
payload, syncedPeers, derpMapChecksum, payloadErr = h.createMapResponse(machine, binder, mapRequest, true, syncedPeers, derpMapChecksum)
payload, payloadErr = createMapResponse(mapper, binder, true, mapRequest.Compress)
if payloadErr != nil {
return payloadErr
@@ -186,16 +183,17 @@ func (h *PollNetMapHandler) handleReadOnly(c echo.Context, binder bind.Binder, m
return logError(err)
}
response, _, _, err := h.createMapResponse(m, binder, request, false, map[uint64]bool{}, "")
mapper := mapping.NewPollNetMapper(request, m.ID, h.repository, h.sessionManager)
payload, err := createMapResponse(mapper, binder, false, request.Compress)
if err != nil {
return logError(err)
}
_, err = c.Response().Write(response)
_, err = c.Response().Write(payload)
return logError(err)
}
func (h *PollNetMapHandler) createKeepAliveResponse(binder bind.Binder, request *tailcfg.MapRequest) ([]byte, error) {
func createKeepAliveResponse(binder bind.Binder, request *tailcfg.MapRequest) ([]byte, error) {
mapResponse := &tailcfg.MapResponse{
KeepAlive: true,
}
@@ -203,150 +201,10 @@ func (h *PollNetMapHandler) createKeepAliveResponse(binder bind.Binder, request
return binder.Marshal(request.Compress, mapResponse)
}
func (h *PollNetMapHandler) createMapResponse(m *domain.Machine, binder bind.Binder, request *tailcfg.MapRequest, delta bool, prevSyncedPeerIDs map[uint64]bool, prevDerpMapChecksum string) ([]byte, map[uint64]bool, string, error) {
ctx := context.TODO()
prc := &primaryRoutesCollector{flagged: map[netip.Prefix]bool{}}
tailnet, err := h.repository.GetTailnet(ctx, m.TailnetID)
func createMapResponse(m *mapping.PollNetMapper, binder bind.Binder, delta bool, compress string) ([]byte, error) {
response, err := m.CreateMapResponse(context.Background(), delta)
if err != nil {
return nil, nil, "", err
return nil, err
}
serviceUser, _, err := h.repository.GetOrCreateServiceUser(ctx, tailnet)
if err != nil {
return nil, nil, "", err
}
hostinfo := tailcfg.Hostinfo(m.HostInfo)
node, user, err := mapping.ToNode(m, tailnet, serviceUser, false, true, prc.filter)
if err != nil {
return nil, nil, "", err
}
policies := tailnet.ACLPolicy
var users = []tailcfg.UserProfile{*user}
var changedPeers []*tailcfg.Node
var removedPeers []tailcfg.NodeID
candidatePeers, err := h.repository.ListMachinePeers(ctx, m.TailnetID, m.MachineKey)
if err != nil {
return nil, nil, "", err
}
syncedPeerIDs := map[uint64]bool{}
syncedUserIDs := map[tailcfg.UserID]bool{user.ID: true}
for _, peer := range candidatePeers {
if peer.IsExpired() {
continue
}
if policies.IsValidPeer(m, &peer) || policies.IsValidPeer(&peer, m) {
isConnected := h.sessionManager.HasSession(peer.TailnetID, peer.ID)
n, u, err := mapping.ToNode(&peer, tailnet, serviceUser, true, isConnected, prc.filter)
if err != nil {
return nil, nil, "", err
}
changedPeers = append(changedPeers, n)
syncedPeerIDs[peer.ID] = true
delete(prevSyncedPeerIDs, peer.ID)
if _, ok := syncedUserIDs[u.ID]; !ok {
users = append(users, *u)
syncedUserIDs[u.ID] = true
}
}
}
for p, _ := range prevSyncedPeerIDs {
removedPeers = append(removedPeers, tailcfg.NodeID(p))
}
dnsConfig := tailnet.DNSConfig
derpMap, err := m.Tailnet.GetDERPMap(ctx, h.repository)
if err != nil {
return nil, nil, "", err
}
filterRules := policies.BuildFilterRules(candidatePeers, m)
controlTime := time.Now().UTC()
var mapResponse *tailcfg.MapResponse
if !delta {
mapResponse = &tailcfg.MapResponse{
KeepAlive: false,
Node: node,
DNSConfig: mapping.ToDNSConfig(m, &m.Tailnet, &dnsConfig),
PacketFilter: filterRules,
DERPMap: &derpMap.DERPMap,
Domain: domain.SanitizeTailnetName(m.Tailnet.Name),
Peers: changedPeers,
UserProfiles: users,
ControlTime: &controlTime,
CollectServices: optBool(tailnet.ServiceCollectionEnabled),
Debug: &tailcfg.Debug{
DisableLogTail: true,
},
}
} else {
mapResponse = &tailcfg.MapResponse{
Node: node,
DNSConfig: mapping.ToDNSConfig(m, &m.Tailnet, &dnsConfig),
PacketFilter: filterRules,
Domain: domain.SanitizeTailnetName(m.Tailnet.Name),
PeersChanged: changedPeers,
PeersRemoved: removedPeers,
UserProfiles: users,
ControlTime: &controlTime,
CollectServices: optBool(tailnet.ServiceCollectionEnabled),
}
if prevDerpMapChecksum != derpMap.Checksum {
mapResponse.DERPMap = &derpMap.DERPMap
}
}
if tailnet.SSHEnabled && hostinfo.TailscaleSSHEnabled() {
mapResponse.SSHPolicy = policies.BuildSSHPolicy(candidatePeers, m)
}
if request.OmitPeers {
mapResponse.PeersChanged = nil
mapResponse.PeersRemoved = nil
mapResponse.Peers = nil
}
payload, err := binder.Marshal(request.Compress, mapResponse)
return payload, syncedPeerIDs, derpMap.Checksum, nil
}
func optBool(v bool) opt.Bool {
b := opt.Bool("")
b.Set(v)
return b
}
type primaryRoutesCollector struct {
flagged map[netip.Prefix]bool
}
func (p *primaryRoutesCollector) filter(m *domain.Machine) []netip.Prefix {
var result = []netip.Prefix{}
for _, r := range m.AllowIPs {
if _, ok := p.flagged[r]; r.Bits() != 0 && !ok {
result = append(result, r)
p.flagged[r] = true
}
}
for _, r := range m.AutoAllowIPs {
if _, ok := p.flagged[r]; r.Bits() != 0 && !ok {
result = append(result, r)
p.flagged[r] = true
}
}
return result
return binder.Marshal(compress, response)
}
+186
View File
@@ -0,0 +1,186 @@
package mapping
import (
"context"
"github.com/jsiebens/ionscale/internal/core"
"github.com/jsiebens/ionscale/internal/domain"
"net/netip"
"sync"
"tailscale.com/tailcfg"
"tailscale.com/types/opt"
"time"
)
func NewPollNetMapper(req *tailcfg.MapRequest, machineID uint64, repository domain.Repository, sessionManager core.PollMapSessionManager) *PollNetMapper {
return &PollNetMapper{
req: req,
machineID: machineID,
prevSyncedPeerIDs: make(map[uint64]bool),
prevDerpMapChecksum: "",
repository: repository,
sessionManager: sessionManager,
}
}
type PollNetMapper struct {
sync.Mutex
req *tailcfg.MapRequest
machineID uint64
prevSyncedPeerIDs map[uint64]bool
prevDerpMapChecksum string
repository domain.Repository
sessionManager core.PollMapSessionManager
}
func (h *PollNetMapper) CreateMapResponse(ctx context.Context, delta bool) (*tailcfg.MapResponse, error) {
h.Lock()
defer h.Unlock()
m, err := h.repository.GetMachine(ctx, h.machineID)
if err != nil {
return nil, err
}
hostinfo := tailcfg.Hostinfo(m.HostInfo)
tailnet := m.Tailnet
policies := tailnet.ACLPolicy
dnsConfig := tailnet.DNSConfig
serviceUser, _, err := h.repository.GetOrCreateServiceUser(ctx, &tailnet)
if err != nil {
return nil, err
}
derpMap, err := m.Tailnet.GetDERPMap(ctx, h.repository)
if err != nil {
return nil, err
}
prc := &primaryRoutesCollector{flagged: map[netip.Prefix]bool{}}
node, user, err := ToNode(m, &tailnet, serviceUser, false, true, prc.filter)
if err != nil {
return nil, err
}
var users = []tailcfg.UserProfile{*user}
var changedPeers []*tailcfg.Node
var removedPeers []tailcfg.NodeID
candidatePeers, err := h.repository.ListMachinePeers(ctx, m.TailnetID, m.MachineKey)
if err != nil {
return nil, err
}
syncedPeerIDs := map[uint64]bool{}
syncedUserIDs := map[tailcfg.UserID]bool{user.ID: true}
for _, peer := range candidatePeers {
if peer.IsExpired() {
continue
}
if policies.IsValidPeer(m, &peer) || policies.IsValidPeer(&peer, m) {
isConnected := h.sessionManager.HasSession(peer.TailnetID, peer.ID)
n, u, err := ToNode(&peer, &tailnet, serviceUser, true, isConnected, prc.filter)
if err != nil {
return nil, err
}
changedPeers = append(changedPeers, n)
syncedPeerIDs[peer.ID] = true
delete(h.prevSyncedPeerIDs, peer.ID)
if _, ok := syncedUserIDs[u.ID]; !ok {
users = append(users, *u)
syncedUserIDs[u.ID] = true
}
}
}
for p, _ := range h.prevSyncedPeerIDs {
removedPeers = append(removedPeers, tailcfg.NodeID(p))
}
filterRules := policies.BuildFilterRules(candidatePeers, m)
controlTime := time.Now().UTC()
var mapResponse *tailcfg.MapResponse
if !delta {
mapResponse = &tailcfg.MapResponse{
KeepAlive: false,
Node: node,
DNSConfig: ToDNSConfig(m, &m.Tailnet, &dnsConfig),
PacketFilter: filterRules,
DERPMap: &derpMap.DERPMap,
Domain: domain.SanitizeTailnetName(m.Tailnet.Name),
Peers: changedPeers,
UserProfiles: users,
ControlTime: &controlTime,
CollectServices: optBool(tailnet.ServiceCollectionEnabled),
Debug: &tailcfg.Debug{
DisableLogTail: true,
},
}
} else {
mapResponse = &tailcfg.MapResponse{
Node: node,
DNSConfig: ToDNSConfig(m, &m.Tailnet, &dnsConfig),
PacketFilter: filterRules,
Domain: domain.SanitizeTailnetName(m.Tailnet.Name),
PeersChanged: changedPeers,
PeersRemoved: removedPeers,
UserProfiles: users,
ControlTime: &controlTime,
CollectServices: optBool(tailnet.ServiceCollectionEnabled),
}
if h.prevDerpMapChecksum != derpMap.Checksum {
mapResponse.DERPMap = &derpMap.DERPMap
}
}
if tailnet.SSHEnabled && hostinfo.TailscaleSSHEnabled() {
mapResponse.SSHPolicy = policies.BuildSSHPolicy(candidatePeers, m)
}
if h.req.OmitPeers {
mapResponse.PeersChanged = nil
mapResponse.PeersRemoved = nil
mapResponse.Peers = nil
}
h.prevSyncedPeerIDs = syncedPeerIDs
h.prevDerpMapChecksum = derpMap.Checksum
return mapResponse, nil
}
type primaryRoutesCollector struct {
flagged map[netip.Prefix]bool
}
func (p *primaryRoutesCollector) filter(m *domain.Machine) []netip.Prefix {
var result []netip.Prefix
for _, r := range m.AllowIPs {
if _, ok := p.flagged[r]; r.Bits() != 0 && !ok {
result = append(result, r)
p.flagged[r] = true
}
}
for _, r := range m.AutoAllowIPs {
if _, ok := p.flagged[r]; r.Bits() != 0 && !ok {
result = append(result, r)
p.flagged[r] = true
}
}
return result
}
func optBool(v bool) opt.Bool {
b := opt.Bool("")
b.Set(v)
return b
}