Merge branch 'main' into magic-dns-support

This commit is contained in:
Juan Font 2021-10-09 12:24:07 +02:00 committed by GitHub
commit c4487b73c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 125 additions and 170 deletions

View file

@ -2,13 +2,13 @@ package headscale
import (
"encoding/json"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/fatih/set"
"github.com/rs/zerolog/log"
"gorm.io/datatypes"
@ -66,7 +66,7 @@ func (h *Headscale) getDirectPeers(m *Machine) (Machines, error) {
if err := h.db.Preload("Namespace").Where("namespace_id = ? AND machine_key <> ? AND registered",
m.NamespaceID, m.MachineKey).Find(&machines).Error; err != nil {
log.Error().Err(err).Msg("Error accessing db")
return nil, err
return Machines{}, err
}
sort.Slice(machines, func(i, j int) bool { return machines[i].ID < machines[j].ID })
@ -88,7 +88,7 @@ func (h *Headscale) getShared(m *Machine) (Machines, error) {
sharedMachines := []SharedMachine{}
if err := h.db.Preload("Namespace").Preload("Machine").Where("namespace_id = ?",
m.NamespaceID).Find(&sharedMachines).Error; err != nil {
return nil, err
return Machines{}, err
}
peers := make(Machines, 0)
@ -112,7 +112,7 @@ func (h *Headscale) getPeers(m *Machine) (Machines, error) {
Str("func", "getPeers").
Err(err).
Msg("Cannot fetch peers")
return nil, err
return Machines{}, err
}
shared, err := h.getShared(m)
@ -121,7 +121,7 @@ func (h *Headscale) getPeers(m *Machine) (Machines, error) {
Str("func", "getDirectPeers").
Err(err).
Msg("Cannot fetch peers")
return nil, err
return Machines{}, err
}
peers := append(direct, shared...)
@ -214,118 +214,31 @@ func (m *Machine) GetHostInfo() (*tailcfg.Hostinfo, error) {
return &hostinfo, nil
}
func (h *Headscale) notifyChangesToPeers(m *Machine) {
peers, err := h.getPeers(m)
if err != nil {
log.Error().
Str("func", "notifyChangesToPeers").
Str("machine", m.Name).
Msgf("Error getting peers: %s", err)
return
}
for _, peer := range peers {
log.Info().
Str("func", "notifyChangesToPeers").
Str("machine", m.Name).
Str("peer", peer.Name).
Str("address", peer.IPAddress).
Msgf("Notifying peer %s (%s)", peer.Name, peer.IPAddress)
err := h.sendRequestOnUpdateChannel(&peer)
if err != nil {
log.Info().
Str("func", "notifyChangesToPeers").
Str("machine", m.Name).
Str("peer", peer.Name).
Msgf("Peer %s does not have an open update client, skipping.", peer.Name)
continue
}
log.Trace().
Str("func", "notifyChangesToPeers").
Str("machine", m.Name).
Str("peer", peer.Name).
Str("address", peer.IPAddress).
Msgf("Notified peer %s (%s)", peer.Name, peer.IPAddress)
}
}
func (h *Headscale) getOrOpenUpdateChannel(m *Machine) <-chan struct{} {
var updateChan chan struct{}
if storedChan, ok := h.clientsUpdateChannels.Load(m.ID); ok {
if unwrapped, ok := storedChan.(chan struct{}); ok {
updateChan = unwrapped
} else {
log.Error().
Str("handler", "openUpdateChannel").
Str("machine", m.Name).
Msg("Failed to convert update channel to struct{}")
}
} else {
log.Debug().
Str("handler", "openUpdateChannel").
Str("machine", m.Name).
Msg("Update channel not found, creating")
updateChan = make(chan struct{})
h.clientsUpdateChannels.Store(m.ID, updateChan)
}
return updateChan
}
func (h *Headscale) closeUpdateChannel(m *Machine) {
h.clientsUpdateChannelMutex.Lock()
defer h.clientsUpdateChannelMutex.Unlock()
if storedChan, ok := h.clientsUpdateChannels.Load(m.ID); ok {
if unwrapped, ok := storedChan.(chan struct{}); ok {
close(unwrapped)
}
}
h.clientsUpdateChannels.Delete(m.ID)
}
func (h *Headscale) sendRequestOnUpdateChannel(m *Machine) error {
h.clientsUpdateChannelMutex.Lock()
defer h.clientsUpdateChannelMutex.Unlock()
pUp, ok := h.clientsUpdateChannels.Load(uint64(m.ID))
if ok {
log.Info().
Str("func", "requestUpdate").
Str("machine", m.Name).
Msgf("Notifying peer %s", m.Name)
if update, ok := pUp.(chan struct{}); ok {
log.Trace().
Str("func", "requestUpdate").
Str("machine", m.Name).
Msgf("Update channel is %#v", update)
updateRequestsToNode.Inc()
update <- struct{}{}
log.Trace().
Str("func", "requestUpdate").
Str("machine", m.Name).
Msgf("Notified machine %s", m.Name)
}
} else {
err := errors.New("machine does not have an open update channel")
log.Info().
Str("func", "requestUpdate").
Str("machine", m.Name).
Msgf("Machine %s does not have an open update channel", m.Name)
return err
}
return nil
}
func (h *Headscale) isOutdated(m *Machine) bool {
err := h.UpdateMachine(m)
if err != nil {
// It does not seem meaningful to propagate this error as the end result
// will have to be that the machine has to be considered outdated.
return true
}
lastChange := h.getLastStateChange(m.Namespace.Name)
sharedMachines, _ := h.getShared(m)
namespaceSet := set.New(set.ThreadSafe)
namespaceSet.Add(m.Namespace.Name)
// Check if any of our shared namespaces has updates that we have
// not propagated.
for _, sharedMachine := range sharedMachines {
namespaceSet.Add(sharedMachine.Namespace.Name)
}
namespaces := make([]string, namespaceSet.Size())
for index, namespace := range namespaceSet.List() {
namespaces[index] = namespace.(string)
}
lastChange := h.getLastStateChange(namespaces...)
log.Trace().
Str("func", "keepAlive").
Str("machine", m.Name).