metrics, tuning in tests, db cleanups, fix concurrency issue (#1895)

This commit is contained in:
Kristoffer Dalby 2024-04-21 18:28:17 +02:00 committed by GitHub
parent 7d8178406d
commit ba614a5e6c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 328 additions and 201 deletions

View file

@ -64,7 +64,7 @@ func (h *Headscale) newMapSession(
w http.ResponseWriter,
node *types.Node,
) *mapSession {
warnf, tracef, infof, errf := logPollFunc(req, node)
warnf, infof, tracef, errf := logPollFunc(req, node)
// Use a buffered channel in case a node is not fully ready
// to receive a message to make sure we dont block the entire
@ -196,8 +196,10 @@ func (m *mapSession) serve() {
// return
err := m.handleSaveNode()
if err != nil {
mapResponseWriteUpdatesInStream.WithLabelValues("error").Inc()
return
}
mapResponseWriteUpdatesInStream.WithLabelValues("ok").Inc()
}
// Set up the client stream
@ -284,6 +286,7 @@ func (m *mapSession) serve() {
patches = filteredPatches
}
updateType := "full"
// When deciding what update to send, the following is considered,
// Full is a superset of all updates, when a full update is requested,
// send only that and move on, all other updates will be present in
@ -303,12 +306,15 @@ func (m *mapSession) serve() {
} else if changed != nil {
m.tracef(fmt.Sprintf("Sending Changed MapResponse: %v", lastMessage))
data, err = m.mapper.PeerChangedResponse(m.req, m.node, changed, patches, m.h.ACLPolicy, lastMessage)
updateType = "change"
} else if patches != nil {
m.tracef(fmt.Sprintf("Sending Changed Patch MapResponse: %v", lastMessage))
data, err = m.mapper.PeerChangedPatchResponse(m.req, m.node, patches, m.h.ACLPolicy)
updateType = "patch"
} else if derp {
m.tracef("Sending DERPUpdate MapResponse")
data, err = m.mapper.DERPMapResponse(m.req, m.node, m.h.DERPMap)
updateType = "derp"
}
if err != nil {
@ -324,19 +330,22 @@ func (m *mapSession) serve() {
startWrite := time.Now()
_, err = m.w.Write(data)
if err != nil {
mapResponseSent.WithLabelValues("error", updateType).Inc()
m.errf(err, "Could not write the map response, for mapSession: %p", m)
return
}
err = rc.Flush()
if err != nil {
mapResponseSent.WithLabelValues("error", updateType).Inc()
m.errf(err, "flushing the map response to client, for mapSession: %p", m)
return
}
log.Trace().Str("node", m.node.Hostname).TimeDiff("timeSpent", time.Now(), startWrite).Str("mkey", m.node.MachineKey.String()).Msg("finished writing mapresp to node")
m.infof("update sent")
mapResponseSent.WithLabelValues("ok", updateType).Inc()
m.tracef("update sent")
}
// reset
@ -364,7 +373,8 @@ func (m *mapSession) serve() {
// Consume all updates sent to node
case update := <-m.ch:
m.tracef("received stream update: %d %s", update.Type, update.Message)
m.tracef("received stream update: %s %s", update.Type.String(), update.Message)
mapResponseUpdateReceived.WithLabelValues(update.Type.String()).Inc()
switch update.Type {
case types.StateFullUpdate:
@ -404,27 +414,30 @@ func (m *mapSession) serve() {
data, err := m.mapper.KeepAliveResponse(m.req, m.node)
if err != nil {
m.errf(err, "Error generating the keep alive msg")
mapResponseSent.WithLabelValues("error", "keepalive").Inc()
return
}
_, err = m.w.Write(data)
if err != nil {
m.errf(err, "Cannot write keep alive message")
mapResponseSent.WithLabelValues("error", "keepalive").Inc()
return
}
err = rc.Flush()
if err != nil {
m.errf(err, "flushing keep alive to client, for mapSession: %p", m)
mapResponseSent.WithLabelValues("error", "keepalive").Inc()
return
}
mapResponseSent.WithLabelValues("ok", "keepalive").Inc()
}
}
}
func (m *mapSession) pollFailoverRoutes(where string, node *types.Node) {
update, err := db.Write(m.h.db.DB, func(tx *gorm.DB) (*types.StateUpdate, error) {
return db.FailoverNodeRoutesIfNeccessary(tx, m.h.nodeNotifier.ConnectedMap(), node)
return db.FailoverNodeRoutesIfNeccessary(tx, m.h.nodeNotifier.LikelyConnectedMap(), node)
})
if err != nil {
m.errf(err, fmt.Sprintf("failed to ensure failover routes, %s", where))
@ -454,7 +467,7 @@ func (h *Headscale) updateNodeOnlineStatus(online bool, node *types.Node) {
node.LastSeen = &now
change.LastSeen = &now
err := h.db.DB.Transaction(func(tx *gorm.DB) error {
err := h.db.Write(func(tx *gorm.DB) error {
return db.SetLastSeen(tx, node.ID, *node.LastSeen)
})
if err != nil {
@ -501,6 +514,7 @@ func (m *mapSession) handleEndpointUpdate() {
// If there is no changes and nothing to save,
// return early.
if peerChangeEmpty(change) && !sendUpdate {
mapResponseEndpointUpdates.WithLabelValues("noop").Inc()
return
}
@ -518,6 +532,7 @@ func (m *mapSession) handleEndpointUpdate() {
if err != nil {
m.errf(err, "Error processing node routes")
http.Error(m.w, "", http.StatusInternalServerError)
mapResponseEndpointUpdates.WithLabelValues("error").Inc()
return
}
@ -527,6 +542,7 @@ func (m *mapSession) handleEndpointUpdate() {
err := m.h.db.EnableAutoApprovedRoutes(m.h.ACLPolicy, m.node)
if err != nil {
m.errf(err, "Error running auto approved routes")
mapResponseEndpointUpdates.WithLabelValues("error").Inc()
}
}
@ -534,19 +550,19 @@ func (m *mapSession) handleEndpointUpdate() {
// has an updated packetfilter allowing the new route
// if it is defined in the ACL.
ctx := types.NotifyCtx(context.Background(), "poll-nodeupdate-self-hostinfochange", m.node.Hostname)
m.h.nodeNotifier.NotifyByMachineKey(
m.h.nodeNotifier.NotifyByNodeID(
ctx,
types.StateUpdate{
Type: types.StateSelfUpdate,
ChangeNodes: []types.NodeID{m.node.ID},
},
m.node.ID)
}
if err := m.h.db.DB.Save(m.node).Error; err != nil {
m.errf(err, "Failed to persist/update node in the database")
http.Error(m.w, "", http.StatusInternalServerError)
mapResponseEndpointUpdates.WithLabelValues("error").Inc()
return
}
@ -562,6 +578,7 @@ func (m *mapSession) handleEndpointUpdate() {
m.node.ID)
m.w.WriteHeader(http.StatusOK)
mapResponseEndpointUpdates.WithLabelValues("ok").Inc()
return
}
@ -639,7 +656,7 @@ func (m *mapSession) handleReadOnlyRequest() {
if err != nil {
m.errf(err, "Failed to create MapResponse")
http.Error(m.w, "", http.StatusInternalServerError)
mapResponseReadOnly.WithLabelValues("error").Inc()
return
}
@ -648,9 +665,12 @@ func (m *mapSession) handleReadOnlyRequest() {
_, err = m.w.Write(mapResp)
if err != nil {
m.errf(err, "Failed to write response")
mapResponseReadOnly.WithLabelValues("error").Inc()
return
}
m.w.WriteHeader(http.StatusOK)
mapResponseReadOnly.WithLabelValues("ok").Inc()
return
}