implement selfupdate and pass expiry (#1647)
This commit is contained in:
parent
a592ae56b4
commit
3b103280ef
11 changed files with 114 additions and 52 deletions
|
@ -340,6 +340,16 @@ func (hsdb *HSDatabase) nodeSetExpiry(node *types.Node, expiry time.Time) error
|
|||
)
|
||||
}
|
||||
|
||||
node.Expiry = &expiry
|
||||
|
||||
stateSelfUpdate := types.StateUpdate{
|
||||
Type: types.StateSelfUpdate,
|
||||
ChangeNodes: types.Nodes{node},
|
||||
}
|
||||
if stateSelfUpdate.Valid() {
|
||||
hsdb.notifier.NotifyByMachineKey(stateSelfUpdate, node.MachineKey)
|
||||
}
|
||||
|
||||
stateUpdate := types.StateUpdate{
|
||||
Type: types.StatePeerChangedPatch,
|
||||
ChangePatches: []*tailcfg.PeerChange{
|
||||
|
@ -350,7 +360,7 @@ func (hsdb *HSDatabase) nodeSetExpiry(node *types.Node, expiry time.Time) error
|
|||
},
|
||||
}
|
||||
if stateUpdate.Valid() {
|
||||
hsdb.notifier.NotifyAll(stateUpdate)
|
||||
hsdb.notifier.NotifyWithIgnore(stateUpdate, node.MachineKey.String())
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -856,7 +866,7 @@ func (hsdb *HSDatabase) ExpireExpiredNodes(lastCheck time.Time) time.Time {
|
|||
// checked everything.
|
||||
started := time.Now()
|
||||
|
||||
expired := make([]*tailcfg.PeerChange, 0)
|
||||
expiredNodes := make([]*types.Node, 0)
|
||||
|
||||
nodes, err := hsdb.listNodes()
|
||||
if err != nil {
|
||||
|
@ -872,17 +882,13 @@ func (hsdb *HSDatabase) ExpireExpiredNodes(lastCheck time.Time) time.Time {
|
|||
// It will notify about all nodes that has been expired.
|
||||
// It should only notify about expired nodes since _last check_.
|
||||
node.Expiry.After(lastCheck) {
|
||||
expired = append(expired, &tailcfg.PeerChange{
|
||||
NodeID: tailcfg.NodeID(node.ID),
|
||||
KeyExpiry: node.Expiry,
|
||||
})
|
||||
expiredNodes = append(expiredNodes, &nodes[index])
|
||||
|
||||
now := time.Now()
|
||||
// Do not use setNodeExpiry as that has a notifier hook, which
|
||||
// can cause a deadlock, we are updating all changed nodes later
|
||||
// and there is no point in notifiying twice.
|
||||
if err := hsdb.db.Model(nodes[index]).Updates(types.Node{
|
||||
Expiry: &now,
|
||||
Expiry: &started,
|
||||
}).Error; err != nil {
|
||||
log.Error().
|
||||
Err(err).
|
||||
|
@ -898,6 +904,15 @@ func (hsdb *HSDatabase) ExpireExpiredNodes(lastCheck time.Time) time.Time {
|
|||
}
|
||||
}
|
||||
|
||||
expired := make([]*tailcfg.PeerChange, len(expiredNodes))
|
||||
for idx, node := range expiredNodes {
|
||||
expired[idx] = &tailcfg.PeerChange{
|
||||
NodeID: tailcfg.NodeID(node.ID),
|
||||
KeyExpiry: &started,
|
||||
}
|
||||
}
|
||||
|
||||
// Inform the peers of a node with a lightweight update.
|
||||
stateUpdate := types.StateUpdate{
|
||||
Type: types.StatePeerChangedPatch,
|
||||
ChangePatches: expired,
|
||||
|
@ -906,5 +921,16 @@ func (hsdb *HSDatabase) ExpireExpiredNodes(lastCheck time.Time) time.Time {
|
|||
hsdb.notifier.NotifyAll(stateUpdate)
|
||||
}
|
||||
|
||||
// Inform the node itself that it has expired.
|
||||
for _, node := range expiredNodes {
|
||||
stateSelfUpdate := types.StateUpdate{
|
||||
Type: types.StateSelfUpdate,
|
||||
ChangeNodes: types.Nodes{node},
|
||||
}
|
||||
if stateSelfUpdate.Valid() {
|
||||
hsdb.notifier.NotifyByMachineKey(stateSelfUpdate, node.MachineKey)
|
||||
}
|
||||
}
|
||||
|
||||
return started
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
"github.com/juanfont/headscale/hscontrol/util"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/samber/lo"
|
||||
"golang.org/x/exp/maps"
|
||||
"tailscale.com/envknob"
|
||||
"tailscale.com/smallzstd"
|
||||
|
@ -459,6 +458,8 @@ func (m *Mapper) marshalMapResponse(
|
|||
switch {
|
||||
case resp.Peers != nil && len(resp.Peers) > 0:
|
||||
responseType = "full"
|
||||
case resp.Peers == nil && resp.PeersChanged == nil && resp.PeersChangedPatch == nil:
|
||||
responseType = "lite"
|
||||
case resp.PeersChanged != nil && len(resp.PeersChanged) > 0:
|
||||
responseType = "changed"
|
||||
case resp.PeersChangedPatch != nil && len(resp.PeersChangedPatch) > 0:
|
||||
|
@ -593,15 +594,6 @@ func nodeMapToList(nodes map[uint64]*types.Node) types.Nodes {
|
|||
return ret
|
||||
}
|
||||
|
||||
func filterExpiredAndNotReady(peers types.Nodes) types.Nodes {
|
||||
return lo.Filter(peers, func(item *types.Node, index int) bool {
|
||||
// Filter out nodes that are expired OR
|
||||
// nodes that has no endpoints, this typically means they have
|
||||
// registered, but are not configured.
|
||||
return !item.IsExpired() || len(item.Endpoints) > 0
|
||||
})
|
||||
}
|
||||
|
||||
// appendPeerChanges mutates a tailcfg.MapResponse with all the
|
||||
// necessary changes when peers have changed.
|
||||
func appendPeerChanges(
|
||||
|
@ -627,9 +619,6 @@ func appendPeerChanges(
|
|||
return err
|
||||
}
|
||||
|
||||
// Filter out peers that have expired.
|
||||
changed = filterExpiredAndNotReady(changed)
|
||||
|
||||
// If there are filter rules present, see if there are any nodes that cannot
|
||||
// access eachother at all and remove them from the peers.
|
||||
if len(rules) > 0 {
|
||||
|
|
|
@ -95,6 +95,21 @@ func (n *Notifier) NotifyWithIgnore(update types.StateUpdate, ignore ...string)
|
|||
}
|
||||
}
|
||||
|
||||
func (n *Notifier) NotifyByMachineKey(update types.StateUpdate, mKey key.MachinePublic) {
|
||||
log.Trace().Caller().Interface("type", update.Type).Msg("acquiring lock to notify")
|
||||
defer log.Trace().
|
||||
Caller().
|
||||
Interface("type", update.Type).
|
||||
Msg("releasing lock, finished notifing")
|
||||
|
||||
n.l.RLock()
|
||||
defer n.l.RUnlock()
|
||||
|
||||
if c, ok := n.nodes[mKey.String()]; ok {
|
||||
c <- update
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Notifier) String() string {
|
||||
n.l.RLock()
|
||||
defer n.l.RUnlock()
|
||||
|
|
|
@ -397,6 +397,14 @@ func (h *Headscale) handlePoll(
|
|||
case types.StatePeerRemoved:
|
||||
logInfo("Sending PeerRemoved MapResponse")
|
||||
data, err = mapp.PeerRemovedResponse(mapRequest, node, update.Removed)
|
||||
case types.StateSelfUpdate:
|
||||
if len(update.ChangeNodes) == 1 {
|
||||
logInfo("Sending SelfUpdate MapResponse")
|
||||
node = update.ChangeNodes[0]
|
||||
data, err = mapp.LiteMapResponse(mapRequest, node, h.ACLPolicy)
|
||||
} else {
|
||||
logInfo("SelfUpdate contained too many nodes, this is likely a bug in the code, please report.")
|
||||
}
|
||||
case types.StateDERPUpdated:
|
||||
logInfo("Sending DERPUpdate MapResponse")
|
||||
data, err = mapp.DERPMapResponse(mapRequest, node, update.DERPMap)
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
MinimumCapVersion tailcfg.CapabilityVersion = 36
|
||||
MinimumCapVersion tailcfg.CapabilityVersion = 56
|
||||
)
|
||||
|
||||
// NoisePollNetMapHandler takes care of /machine/:id/map using the Noise protocol
|
||||
|
|
|
@ -91,6 +91,12 @@ const (
|
|||
StatePeerChanged
|
||||
StatePeerChangedPatch
|
||||
StatePeerRemoved
|
||||
// StateSelfUpdate is used to indicate that the node
|
||||
// has changed in control, and the client needs to be
|
||||
// informed.
|
||||
// The updated node is inside the ChangeNodes field
|
||||
// which should have a length of one.
|
||||
StateSelfUpdate
|
||||
StateDERPUpdated
|
||||
)
|
||||
|
||||
|
@ -142,6 +148,10 @@ func (su *StateUpdate) Valid() bool {
|
|||
if su.Removed == nil {
|
||||
panic("Mandatory field Removed is not set on StatePeerRemove update")
|
||||
}
|
||||
case StateSelfUpdate:
|
||||
if su.ChangeNodes == nil || len(su.ChangeNodes) != 1 {
|
||||
panic("Mandatory field ChangeNodes is not set for StateSelfUpdate or has more than one node")
|
||||
}
|
||||
case StateDERPUpdated:
|
||||
if su.DERPMap == nil {
|
||||
panic("Mandatory field DERPMap is not set on StateDERPUpdated update")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue