Merge branch 'main' into db-error-handling

This commit is contained in:
Kristoffer Dalby 2022-05-31 10:18:13 +02:00 committed by GitHub
commit 0676aa11a9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 1522 additions and 554 deletions

123
poll.go
View file

@ -84,21 +84,10 @@ func (h *Headscale) PollNetMapHandler(ctx *gin.Context) {
log.Trace().
Str("handler", "PollNetMap").
Str("id", ctx.Param("id")).
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("Found machine in database")
hname, err := NormalizeToFQDNRules(
req.Hostinfo.Hostname,
h.cfg.OIDC.StripEmaildomain,
)
if err != nil {
log.Error().
Caller().
Str("func", "handleAuthKey").
Str("hostinfo.name", req.Hostinfo.Hostname).
Err(err)
}
machine.Name = hname
machine.Hostname = req.Hostinfo.Hostname
machine.HostInfo = HostInfo(*req.Hostinfo)
machine.DiscoKey = DiscoPublicKeyStripPrefix(req.DiscoKey)
now := time.Now().UTC()
@ -110,7 +99,7 @@ func (h *Headscale) PollNetMapHandler(ctx *gin.Context) {
log.Error().
Caller().
Str("func", "handleAuthKey").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Err(err)
}
}
@ -146,7 +135,7 @@ func (h *Headscale) PollNetMapHandler(ctx *gin.Context) {
log.Error().
Str("handler", "PollNetMap").
Str("id", ctx.Param("id")).
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Err(err).
Msg("Failed to get Map response")
ctx.String(http.StatusInternalServerError, ":(")
@ -162,7 +151,7 @@ func (h *Headscale) PollNetMapHandler(ctx *gin.Context) {
log.Debug().
Str("handler", "PollNetMap").
Str("id", ctx.Param("id")).
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Bool("readOnly", req.ReadOnly).
Bool("omitPeers", req.OmitPeers).
Bool("stream", req.Stream).
@ -171,7 +160,7 @@ func (h *Headscale) PollNetMapHandler(ctx *gin.Context) {
if req.ReadOnly {
log.Info().
Str("handler", "PollNetMap").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("Client is starting up. Probably interested in a DERP map")
ctx.Data(http.StatusOK, "application/json; charset=utf-8", data)
@ -189,27 +178,27 @@ func (h *Headscale) PollNetMapHandler(ctx *gin.Context) {
log.Trace().
Str("handler", "PollNetMap").
Str("id", ctx.Param("id")).
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("Loading or creating update channel")
const chanSize = 8
updateChan := make(chan struct{}, chanSize)
pollDataChan := make(chan []byte, chanSize)
defer closeChanWithLog(pollDataChan, machine.Name, "pollDataChan")
defer closeChanWithLog(pollDataChan, machine.Hostname, "pollDataChan")
keepAliveChan := make(chan []byte)
if req.OmitPeers && !req.Stream {
log.Info().
Str("handler", "PollNetMap").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("Client sent endpoint update and is ok with a response without peer list")
ctx.Data(http.StatusOK, "application/json; charset=utf-8", data)
// It sounds like we should update the nodes when we have received a endpoint update
// even tho the comments in the tailscale code dont explicitly say so.
updateRequestsFromNode.WithLabelValues(machine.Namespace.Name, machine.Name, "endpoint-update").
updateRequestsFromNode.WithLabelValues(machine.Namespace.Name, machine.Hostname, "endpoint-update").
Inc()
updateChan <- struct{}{}
@ -217,7 +206,7 @@ func (h *Headscale) PollNetMapHandler(ctx *gin.Context) {
} else if req.OmitPeers && req.Stream {
log.Warn().
Str("handler", "PollNetMap").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("Ignoring request, don't know how to handle it")
ctx.String(http.StatusBadRequest, "")
@ -226,19 +215,19 @@ func (h *Headscale) PollNetMapHandler(ctx *gin.Context) {
log.Info().
Str("handler", "PollNetMap").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("Client is ready to access the tailnet")
log.Info().
Str("handler", "PollNetMap").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("Sending initial map")
pollDataChan <- data
log.Info().
Str("handler", "PollNetMap").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("Notifying peers")
updateRequestsFromNode.WithLabelValues(machine.Namespace.Name, machine.Name, "full-update").
updateRequestsFromNode.WithLabelValues(machine.Namespace.Name, machine.Hostname, "full-update").
Inc()
updateChan <- struct{}{}
@ -254,7 +243,7 @@ func (h *Headscale) PollNetMapHandler(ctx *gin.Context) {
log.Trace().
Str("handler", "PollNetMap").
Str("id", ctx.Param("id")).
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("Finished stream, closing PollNetMap session")
}
@ -289,7 +278,7 @@ func (h *Headscale) PollNetMapStream(
return
}
ctx := context.WithValue(ctx.Request.Context(), machineNameContextKey, machine.Name)
ctx := context.WithValue(ctx.Request.Context(), machineNameContextKey, machine.Hostname)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -307,19 +296,19 @@ func (h *Headscale) PollNetMapStream(
ctx.Stream(func(writer io.Writer) bool {
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("Waiting for data to stream...")
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msgf("pollData is %#v, keepAliveChan is %#v, updateChan is %#v", pollDataChan, keepAliveChan, updateChan)
select {
case data := <-pollDataChan:
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "pollData").
Int("bytes", len(data)).
Msg("Sending data received via pollData channel")
@ -327,7 +316,7 @@ func (h *Headscale) PollNetMapStream(
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "pollData").
Err(err).
Msg("Cannot write data")
@ -336,18 +325,18 @@ func (h *Headscale) PollNetMapStream(
}
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "pollData").
Int("bytes", len(data)).
Msg("Data from pollData channel written successfully")
// TODO(kradalby): Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
err = h.UpdateMachine(machine)
err = h.UpdateMachineFromDatabase(machine)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "pollData").
Err(err).
Msg("Cannot update machine from database")
@ -359,7 +348,7 @@ func (h *Headscale) PollNetMapStream(
now := time.Now().UTC()
machine.LastSeen = &now
lastStateUpdate.WithLabelValues(machine.Namespace.Name, machine.Name).
lastStateUpdate.WithLabelValues(machine.Namespace.Name, machine.Hostname).
Set(float64(now.Unix()))
machine.LastSuccessfulUpdate = &now
@ -367,14 +356,14 @@ func (h *Headscale) PollNetMapStream(
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "pollData").
Err(err).
Msg("Cannot update machine LastSuccessfulUpdate")
} else {
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "pollData").
Int("bytes", len(data)).
Msg("Machine entry in database updated successfully after sending pollData")
@ -385,7 +374,7 @@ func (h *Headscale) PollNetMapStream(
case data := <-keepAliveChan:
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "keepAlive").
Int("bytes", len(data)).
Msg("Sending keep alive message")
@ -393,7 +382,7 @@ func (h *Headscale) PollNetMapStream(
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "keepAlive").
Err(err).
Msg("Cannot write keep alive message")
@ -402,18 +391,18 @@ func (h *Headscale) PollNetMapStream(
}
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "keepAlive").
Int("bytes", len(data)).
Msg("Keep alive sent successfully")
// TODO(kradalby): Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
err = h.UpdateMachine(machine)
err = h.UpdateMachineFromDatabase(machine)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "keepAlive").
Err(err).
Msg("Cannot update machine from database")
@ -428,14 +417,14 @@ func (h *Headscale) PollNetMapStream(
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "keepAlive").
Err(err).
Msg("Cannot update machine LastSeen")
} else {
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "keepAlive").
Int("bytes", len(data)).
Msg("Machine updated successfully after sending keep alive")
@ -446,10 +435,10 @@ func (h *Headscale) PollNetMapStream(
case <-updateChan:
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "update").
Msg("Received a request for update")
updateRequestsReceivedOnChannel.WithLabelValues(machine.Namespace.Name, machine.Name).
updateRequestsReceivedOnChannel.WithLabelValues(machine.Namespace.Name, machine.Hostname).
Inc()
if h.isOutdated(machine) {
var lastUpdate time.Time
@ -458,15 +447,15 @@ func (h *Headscale) PollNetMapStream(
}
log.Debug().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Time("last_successful_update", lastUpdate).
Time("last_state_change", h.getLastStateChange(machine.Namespace.Name)).
Msgf("There has been updates since the last successful update to %s", machine.Name)
Msgf("There has been updates since the last successful update to %s", machine.Hostname)
data, err := h.getMapResponse(machineKey, mapRequest, machine)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "update").
Err(err).
Msg("Could not get the map update")
@ -475,21 +464,21 @@ func (h *Headscale) PollNetMapStream(
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "update").
Err(err).
Msg("Could not write the map response")
updateRequestsSentToNode.WithLabelValues(machine.Namespace.Name, machine.Name, "failed").
updateRequestsSentToNode.WithLabelValues(machine.Namespace.Name, machine.Hostname, "failed").
Inc()
return false
}
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "update").
Msg("Updated Map has been sent")
updateRequestsSentToNode.WithLabelValues(machine.Namespace.Name, machine.Name, "success").
updateRequestsSentToNode.WithLabelValues(machine.Namespace.Name, machine.Hostname, "success").
Inc()
// Keep track of the last successful update,
@ -499,11 +488,11 @@ func (h *Headscale) PollNetMapStream(
// TODO(kradalby): Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
err = h.UpdateMachine(machine)
err = h.UpdateMachineFromDatabase(machine)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "update").
Err(err).
Msg("Cannot update machine from database")
@ -514,7 +503,7 @@ func (h *Headscale) PollNetMapStream(
}
now := time.Now().UTC()
lastStateUpdate.WithLabelValues(machine.Namespace.Name, machine.Name).
lastStateUpdate.WithLabelValues(machine.Namespace.Name, machine.Hostname).
Set(float64(now.Unix()))
machine.LastSuccessfulUpdate = &now
@ -522,7 +511,7 @@ func (h *Headscale) PollNetMapStream(
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "update").
Err(err).
Msg("Cannot update machine LastSuccessfulUpdate")
@ -534,10 +523,10 @@ func (h *Headscale) PollNetMapStream(
}
log.Trace().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Time("last_successful_update", lastUpdate).
Time("last_state_change", h.getLastStateChange(machine.Namespace.Name)).
Msgf("%s is up to date", machine.Name)
Msgf("%s is up to date", machine.Hostname)
}
return true
@ -545,16 +534,16 @@ func (h *Headscale) PollNetMapStream(
case <-ctx.Request.Context().Done():
log.Info().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("The client has closed the connection")
// TODO: Abstract away all the database calls, this can cause race conditions
// when an outdated machine object is kept alive, e.g. db is update from
// command line, but then overwritten.
err := h.UpdateMachine(machine)
err := h.UpdateMachineFromDatabase(machine)
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "Done").
Err(err).
Msg("Cannot update machine from database")
@ -569,7 +558,7 @@ func (h *Headscale) PollNetMapStream(
if err != nil {
log.Error().
Str("handler", "PollNetMapStream").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Str("channel", "Done").
Err(err).
Msg("Cannot update machine LastSeen")
@ -620,16 +609,16 @@ func (h *Headscale) scheduledPollWorker(
log.Debug().
Str("func", "keepAlive").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("Sending keepalive")
keepAliveChan <- data
case <-updateCheckerTicker.C:
log.Debug().
Str("func", "scheduledPollWorker").
Str("machine", machine.Name).
Str("machine", machine.Hostname).
Msg("Sending update request")
updateRequestsFromNode.WithLabelValues(machine.Namespace.Name, machine.Name, "scheduled-update").
updateRequestsFromNode.WithLabelValues(machine.Namespace.Name, machine.Hostname, "scheduled-update").
Inc()
updateChan <- struct{}{}
}