Replace database locks with transactions (#1701)
This commits removes the locks used to guard data integrity for the database and replaces them with Transactions, turns out that SQL had a way to deal with this all along. This reduces the complexity we had with multiple locks that might stack or recurse (database, nofitifer, mapper). All notifications and state updates are now triggered _after_ a database change. Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
This commit is contained in:
parent
cbf57e27a7
commit
83769ba715
32 changed files with 1496 additions and 1128 deletions
|
@ -4,12 +4,15 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/juanfont/headscale/hscontrol/db"
|
||||
"github.com/juanfont/headscale/hscontrol/mapper"
|
||||
"github.com/juanfont/headscale/hscontrol/types"
|
||||
"github.com/rs/zerolog/log"
|
||||
xslices "golang.org/x/exp/slices"
|
||||
"gorm.io/gorm"
|
||||
"tailscale.com/tailcfg"
|
||||
)
|
||||
|
||||
|
@ -128,10 +131,14 @@ func (h *Headscale) handlePoll(
|
|||
|
||||
if h.ACLPolicy != nil {
|
||||
// update routes with peer information
|
||||
err = h.db.EnableAutoApprovedRoutes(h.ACLPolicy, node)
|
||||
update, err := h.db.EnableAutoApprovedRoutes(h.ACLPolicy, node)
|
||||
if err != nil {
|
||||
logErr(err, "Error running auto approved routes")
|
||||
}
|
||||
|
||||
if update != nil {
|
||||
sendUpdate = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -146,7 +153,7 @@ func (h *Headscale) handlePoll(
|
|||
}
|
||||
|
||||
if sendUpdate {
|
||||
if err := h.db.NodeSave(node); err != nil {
|
||||
if err := h.db.DB.Save(node).Error; err != nil {
|
||||
logErr(err, "Failed to persist/update node in the database")
|
||||
http.Error(writer, "", http.StatusInternalServerError)
|
||||
|
||||
|
@ -161,7 +168,9 @@ func (h *Headscale) handlePoll(
|
|||
Message: "called from handlePoll -> update -> new hostinfo",
|
||||
}
|
||||
if stateUpdate.Valid() {
|
||||
ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-peers-hostinfochange", node.Hostname)
|
||||
h.nodeNotifier.NotifyWithIgnore(
|
||||
ctx,
|
||||
stateUpdate,
|
||||
node.MachineKey.String())
|
||||
}
|
||||
|
@ -174,7 +183,9 @@ func (h *Headscale) handlePoll(
|
|||
ChangeNodes: types.Nodes{node},
|
||||
}
|
||||
if selfUpdate.Valid() {
|
||||
ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-self-hostinfochange", node.Hostname)
|
||||
h.nodeNotifier.NotifyByMachineKey(
|
||||
ctx,
|
||||
selfUpdate,
|
||||
node.MachineKey)
|
||||
}
|
||||
|
@ -183,7 +194,7 @@ func (h *Headscale) handlePoll(
|
|||
}
|
||||
}
|
||||
|
||||
if err := h.db.NodeSave(node); err != nil {
|
||||
if err := h.db.DB.Save(node).Error; err != nil {
|
||||
logErr(err, "Failed to persist/update node in the database")
|
||||
http.Error(writer, "", http.StatusInternalServerError)
|
||||
|
||||
|
@ -195,7 +206,9 @@ func (h *Headscale) handlePoll(
|
|||
ChangePatches: []*tailcfg.PeerChange{&change},
|
||||
}
|
||||
if stateUpdate.Valid() {
|
||||
ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-peers-patch", node.Hostname)
|
||||
h.nodeNotifier.NotifyWithIgnore(
|
||||
ctx,
|
||||
stateUpdate,
|
||||
node.MachineKey.String())
|
||||
}
|
||||
|
@ -251,7 +264,7 @@ func (h *Headscale) handlePoll(
|
|||
}
|
||||
}
|
||||
|
||||
if err := h.db.NodeSave(node); err != nil {
|
||||
if err := h.db.DB.Save(node).Error; err != nil {
|
||||
logErr(err, "Failed to persist/update node in the database")
|
||||
http.Error(writer, "", http.StatusInternalServerError)
|
||||
|
||||
|
@ -288,7 +301,10 @@ func (h *Headscale) handlePoll(
|
|||
// update ACLRules with peer informations (to update server tags if necessary)
|
||||
if h.ACLPolicy != nil {
|
||||
// update routes with peer information
|
||||
err = h.db.EnableAutoApprovedRoutes(h.ACLPolicy, node)
|
||||
// This state update is ignored as it will be sent
|
||||
// as part of the whole node
|
||||
// TODO(kradalby): figure out if that is actually correct
|
||||
_, err = h.db.EnableAutoApprovedRoutes(h.ACLPolicy, node)
|
||||
if err != nil {
|
||||
logErr(err, "Error running auto approved routes")
|
||||
}
|
||||
|
@ -324,11 +340,17 @@ func (h *Headscale) handlePoll(
|
|||
Message: "called from handlePoll -> new node added",
|
||||
}
|
||||
if stateUpdate.Valid() {
|
||||
ctx := types.NotifyCtx(context.Background(), "poll-newnode-peers", node.Hostname)
|
||||
h.nodeNotifier.NotifyWithIgnore(
|
||||
ctx,
|
||||
stateUpdate,
|
||||
node.MachineKey.String())
|
||||
}
|
||||
|
||||
if len(node.Routes) > 0 {
|
||||
go h.pollFailoverRoutes(logErr, "new node", node)
|
||||
}
|
||||
|
||||
// Set up the client stream
|
||||
h.pollNetMapStreamWG.Add(1)
|
||||
defer h.pollNetMapStreamWG.Done()
|
||||
|
@ -346,15 +368,9 @@ func (h *Headscale) handlePoll(
|
|||
|
||||
keepAliveTicker := time.NewTicker(keepAliveInterval)
|
||||
|
||||
ctx = context.WithValue(ctx, nodeNameContextKey, node.Hostname)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
ctx, cancel := context.WithCancel(context.WithValue(ctx, nodeNameContextKey, node.Hostname))
|
||||
defer cancel()
|
||||
|
||||
if len(node.Routes) > 0 {
|
||||
go h.db.EnsureFailoverRouteIsAvailable(node)
|
||||
}
|
||||
|
||||
for {
|
||||
logInfo("Waiting for update on stream channel")
|
||||
select {
|
||||
|
@ -403,6 +419,7 @@ func (h *Headscale) handlePoll(
|
|||
return
|
||||
}
|
||||
|
||||
startMapResp := time.Now()
|
||||
switch update.Type {
|
||||
case types.StateFullUpdate:
|
||||
logInfo("Sending Full MapResponse")
|
||||
|
@ -411,6 +428,7 @@ func (h *Headscale) handlePoll(
|
|||
case types.StatePeerChanged:
|
||||
logInfo(fmt.Sprintf("Sending Changed MapResponse: %s", update.Message))
|
||||
|
||||
isConnectedMap := h.nodeNotifier.ConnectedMap()
|
||||
for _, node := range update.ChangeNodes {
|
||||
// If a node is not reported to be online, it might be
|
||||
// because the value is outdated, check with the notifier.
|
||||
|
@ -418,7 +436,7 @@ func (h *Headscale) handlePoll(
|
|||
// this might be because it has announced itself, but not
|
||||
// reached the stage to actually create the notifier channel.
|
||||
if node.IsOnline != nil && !*node.IsOnline {
|
||||
isOnline := h.nodeNotifier.IsConnected(node.MachineKey)
|
||||
isOnline := isConnectedMap[node.MachineKey]
|
||||
node.IsOnline = &isOnline
|
||||
}
|
||||
}
|
||||
|
@ -434,7 +452,7 @@ func (h *Headscale) handlePoll(
|
|||
if len(update.ChangeNodes) == 1 {
|
||||
logInfo("Sending SelfUpdate MapResponse")
|
||||
node = update.ChangeNodes[0]
|
||||
data, err = mapp.LiteMapResponse(mapRequest, node, h.ACLPolicy)
|
||||
data, err = mapp.LiteMapResponse(mapRequest, node, h.ACLPolicy, types.SelfUpdateIdentifier)
|
||||
} else {
|
||||
logInfo("SelfUpdate contained too many nodes, this is likely a bug in the code, please report.")
|
||||
}
|
||||
|
@ -449,8 +467,11 @@ func (h *Headscale) handlePoll(
|
|||
return
|
||||
}
|
||||
|
||||
log.Trace().Str("node", node.Hostname).TimeDiff("timeSpent", time.Now(), startMapResp).Str("mkey", node.MachineKey.String()).Int("type", int(update.Type)).Msg("finished making map response")
|
||||
|
||||
// Only send update if there is change
|
||||
if data != nil {
|
||||
startWrite := time.Now()
|
||||
_, err = writer.Write(data)
|
||||
if err != nil {
|
||||
logErr(err, "Could not write the map response")
|
||||
|
@ -468,6 +489,7 @@ func (h *Headscale) handlePoll(
|
|||
|
||||
return
|
||||
}
|
||||
log.Trace().Str("node", node.Hostname).TimeDiff("timeSpent", time.Now(), startWrite).Str("mkey", node.MachineKey.String()).Int("type", int(update.Type)).Msg("finished writing mapresp to node")
|
||||
|
||||
log.Info().
|
||||
Caller().
|
||||
|
@ -487,7 +509,7 @@ func (h *Headscale) handlePoll(
|
|||
go h.updateNodeOnlineStatus(false, node)
|
||||
|
||||
// Failover the node's routes if any.
|
||||
go h.db.FailoverNodeRoutesWithNotify(node)
|
||||
go h.pollFailoverRoutes(logErr, "node closing connection", node)
|
||||
|
||||
// The connection has been closed, so we can stop polling.
|
||||
return
|
||||
|
@ -500,6 +522,22 @@ func (h *Headscale) handlePoll(
|
|||
}
|
||||
}
|
||||
|
||||
func (h *Headscale) pollFailoverRoutes(logErr func(error, string), where string, node *types.Node) {
|
||||
update, err := db.Write(h.db.DB, func(tx *gorm.DB) (*types.StateUpdate, error) {
|
||||
return db.EnsureFailoverRouteIsAvailable(tx, h.nodeNotifier.ConnectedMap(), node)
|
||||
})
|
||||
if err != nil {
|
||||
logErr(err, fmt.Sprintf("failed to ensure failover routes, %s", where))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if update != nil && !update.Empty() && update.Valid() {
|
||||
ctx := types.NotifyCtx(context.Background(), fmt.Sprintf("poll-%s-routes-ensurefailover", strings.ReplaceAll(where, " ", "-")), node.Hostname)
|
||||
h.nodeNotifier.NotifyWithIgnore(ctx, *update, node.MachineKey.String())
|
||||
}
|
||||
}
|
||||
|
||||
// updateNodeOnlineStatus records the last seen status of a node and notifies peers
|
||||
// about change in their online/offline status.
|
||||
// It takes a StateUpdateType of either StatePeerOnlineChanged or StatePeerOfflineChanged.
|
||||
|
@ -519,10 +557,13 @@ func (h *Headscale) updateNodeOnlineStatus(online bool, node *types.Node) {
|
|||
},
|
||||
}
|
||||
if statusUpdate.Valid() {
|
||||
h.nodeNotifier.NotifyWithIgnore(statusUpdate, node.MachineKey.String())
|
||||
ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-onlinestatus", node.Hostname)
|
||||
h.nodeNotifier.NotifyWithIgnore(ctx, statusUpdate, node.MachineKey.String())
|
||||
}
|
||||
|
||||
err := h.db.UpdateLastSeen(node)
|
||||
err := h.db.DB.Transaction(func(tx *gorm.DB) error {
|
||||
return db.UpdateLastSeen(tx, node.ID, *node.LastSeen)
|
||||
})
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Cannot update node LastSeen")
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue