Simplify map session management (#1931)

This PR removes the complicated session management introduced in https://github.com/juanfont/headscale/pull/1791 which kept track of the sessions in a map, in addition to the channel already kept track of in the notifier.

Instead of trying to close the mapsession, it will now be replaced by the new one and closed after so all new updates goes to the right place.

The map session serve function is also split into a streaming and a non-streaming version for better readability.

RemoveNode in the notifier will not remove a node if the channel is not matching the one that has been passed (e.g. it has been replaced with a new one).

A new tuning parameter has been added to added to set timeout before the notifier gives up to send an update to a node.

Add a keep alive resetter so we wait with sending keep alives if a node has just received an update.

In addition it adds a bunch of env debug flags that can be set:

- `HEADSCALE_DEBUG_HIGH_CARDINALITY_METRICS`: make certain metrics include per node.id, not recommended to use in prod. 
- `HEADSCALE_DEBUG_PROFILING_ENABLED`: activate tracing 
- `HEADSCALE_DEBUG_PROFILING_PATH`: where to store traces 
- `HEADSCALE_DEBUG_DUMP_CONFIG`: calls `spew.Dump` on the config object startup
- `HEADSCALE_DEBUG_DEADLOCK`: enable go-deadlock to dump goroutines if it looks like a deadlock has occured, enabled in integration tests.

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
This commit is contained in:
Kristoffer Dalby 2024-05-24 09:15:34 +01:00 committed by GitHub
parent 8185a70dc7
commit c8ebbede54
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 426 additions and 285 deletions

View file

@ -3,22 +3,43 @@ package notifier
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"tailscale.com/envknob"
)
const prometheusNamespace = "headscale"
var debugHighCardinalityMetrics = envknob.Bool("HEADSCALE_DEBUG_HIGH_CARDINALITY_METRICS")
var notifierUpdateSent *prometheus.CounterVec
func init() {
if debugHighCardinalityMetrics {
notifierUpdateSent = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Name: "notifier_update_sent_total",
Help: "total count of update sent on nodes channel",
}, []string{"status", "type", "trigger", "id"})
} else {
notifierUpdateSent = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Name: "notifier_update_sent_total",
Help: "total count of update sent on nodes channel",
}, []string{"status", "type", "trigger"})
}
}
var (
notifierWaitersForLock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: prometheusNamespace,
Name: "notifier_waiters_for_lock",
Help: "gauge of waiters for the notifier lock",
}, []string{"type", "action"})
notifierWaitForLock = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: prometheusNamespace,
Name: "notifier_wait_for_lock_seconds",
Help: "histogram of time spent waiting for the notifier lock",
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.5, 1, 3, 5, 10},
}, []string{"action"})
notifierUpdateSent = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Name: "notifier_update_sent_total",
Help: "total count of update sent on nodes channel",
}, []string{"status", "type", "trigger"})
notifierUpdateReceived = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Name: "notifier_update_received_total",
@ -29,4 +50,19 @@ var (
Name: "notifier_open_channels_total",
Help: "total count open channels in notifier",
})
notifierBatcherWaitersForLock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: prometheusNamespace,
Name: "notifier_batcher_waiters_for_lock",
Help: "gauge of waiters for the notifier batcher lock",
}, []string{"type", "action"})
notifierBatcherChanges = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: prometheusNamespace,
Name: "notifier_batcher_changes_pending",
Help: "gauge of full changes pending in the notifier batcher",
}, []string{})
notifierBatcherPatches = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: prometheusNamespace,
Name: "notifier_batcher_patches_pending",
Help: "gauge of patches pending in the notifier batcher",
}, []string{})
)

View file

@ -11,25 +11,40 @@ import (
"github.com/juanfont/headscale/hscontrol/types"
"github.com/puzpuzpuz/xsync/v3"
"github.com/rs/zerolog/log"
"github.com/sasha-s/go-deadlock"
"tailscale.com/envknob"
"tailscale.com/tailcfg"
"tailscale.com/util/set"
)
var debugDeadlock = envknob.Bool("HEADSCALE_DEBUG_DEADLOCK")
var debugDeadlockTimeout = envknob.RegisterDuration("HEADSCALE_DEBUG_DEADLOCK_TIMEOUT")
func init() {
deadlock.Opts.Disable = !debugDeadlock
if debugDeadlock {
deadlock.Opts.DeadlockTimeout = debugDeadlockTimeout()
deadlock.Opts.PrintAllCurrentGoroutines = true
}
}
type Notifier struct {
l sync.RWMutex
l deadlock.Mutex
nodes map[types.NodeID]chan<- types.StateUpdate
connected *xsync.MapOf[types.NodeID, bool]
b *batcher
cfg *types.Config
}
func NewNotifier(cfg *types.Config) *Notifier {
n := &Notifier{
nodes: make(map[types.NodeID]chan<- types.StateUpdate),
connected: xsync.NewMapOf[types.NodeID, bool](),
cfg: cfg,
}
b := newBatcher(cfg.Tuning.BatchChangeDelay, n)
n.b = b
// TODO(kradalby): clean this up
go b.doWork()
return n
}
@ -39,59 +54,75 @@ func (n *Notifier) Close() {
n.b.close()
}
func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) {
log.Trace().Caller().Uint64("node.id", nodeID.Uint64()).Msg("acquiring lock to add node")
defer log.Trace().
Caller().
Uint64("node.id", nodeID.Uint64()).
Msg("releasing lock to add node")
func (n *Notifier) tracef(nID types.NodeID, msg string, args ...any) {
log.Trace().
Uint64("node.id", nID.Uint64()).
Int("open_chans", len(n.nodes)).Msgf(msg, args...)
}
func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) {
start := time.Now()
notifierWaitersForLock.WithLabelValues("lock", "add").Inc()
n.l.Lock()
defer n.l.Unlock()
notifierWaitersForLock.WithLabelValues("lock", "add").Dec()
notifierWaitForLock.WithLabelValues("add").Observe(time.Since(start).Seconds())
// If a channel exists, it means the node has opened a new
// connection. Close the old channel and replace it.
if curr, ok := n.nodes[nodeID]; ok {
n.tracef(nodeID, "channel present, closing and replacing")
close(curr)
}
n.nodes[nodeID] = c
n.connected.Store(nodeID, true)
log.Trace().
Uint64("node.id", nodeID.Uint64()).
Int("open_chans", len(n.nodes)).
Msg("Added new channel")
n.tracef(nodeID, "added new channel")
notifierNodeUpdateChans.Inc()
}
func (n *Notifier) RemoveNode(nodeID types.NodeID) {
log.Trace().Caller().Uint64("node.id", nodeID.Uint64()).Msg("acquiring lock to remove node")
defer log.Trace().
Caller().
Uint64("node.id", nodeID.Uint64()).
Msg("releasing lock to remove node")
// RemoveNode removes a node and a given channel from the notifier.
// It checks that the channel is the same as currently being updated
// and ignores the removal if it is not.
// RemoveNode reports if the node/chan was removed.
func (n *Notifier) RemoveNode(nodeID types.NodeID, c chan<- types.StateUpdate) bool {
start := time.Now()
notifierWaitersForLock.WithLabelValues("lock", "remove").Inc()
n.l.Lock()
defer n.l.Unlock()
notifierWaitersForLock.WithLabelValues("lock", "remove").Dec()
notifierWaitForLock.WithLabelValues("remove").Observe(time.Since(start).Seconds())
if len(n.nodes) == 0 {
return
return true
}
// If the channel exist, but it does not belong
// to the caller, ignore.
if curr, ok := n.nodes[nodeID]; ok {
if curr != c {
n.tracef(nodeID, "channel has been replaced, not removing")
return false
}
}
delete(n.nodes, nodeID)
n.connected.Store(nodeID, false)
log.Trace().
Uint64("node.id", nodeID.Uint64()).
Int("open_chans", len(n.nodes)).
Msg("Removed channel")
n.tracef(nodeID, "removed channel")
notifierNodeUpdateChans.Dec()
return true
}
// IsConnected reports if a node is connected to headscale and has a
// poll session open.
func (n *Notifier) IsConnected(nodeID types.NodeID) bool {
n.l.RLock()
defer n.l.RUnlock()
notifierWaitersForLock.WithLabelValues("lock", "conncheck").Inc()
n.l.Lock()
defer n.l.Unlock()
notifierWaitersForLock.WithLabelValues("lock", "conncheck").Dec()
if val, ok := n.connected.Load(nodeID); ok {
return val
@ -130,15 +161,11 @@ func (n *Notifier) NotifyByNodeID(
update types.StateUpdate,
nodeID types.NodeID,
) {
log.Trace().Caller().Str("type", update.Type.String()).Msg("acquiring lock to notify")
defer log.Trace().
Caller().
Str("type", update.Type.String()).
Msg("releasing lock, finished notifying")
start := time.Now()
n.l.RLock()
defer n.l.RUnlock()
notifierWaitersForLock.WithLabelValues("lock", "notify").Inc()
n.l.Lock()
defer n.l.Unlock()
notifierWaitersForLock.WithLabelValues("lock", "notify").Dec()
notifierWaitForLock.WithLabelValues("notify").Observe(time.Since(start).Seconds())
if c, ok := n.nodes[nodeID]; ok {
@ -150,50 +177,94 @@ func (n *Notifier) NotifyByNodeID(
Any("origin", types.NotifyOriginKey.Value(ctx)).
Any("origin-hostname", types.NotifyHostnameKey.Value(ctx)).
Msgf("update not sent, context cancelled")
notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx)).Inc()
if debugHighCardinalityMetrics {
notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.String()).Inc()
} else {
notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), types.NotifyOriginKey.Value(ctx)).Inc()
}
return
case c <- update:
log.Trace().
Uint64("node.id", nodeID.Uint64()).
Any("origin", ctx.Value("origin")).
Any("origin-hostname", ctx.Value("hostname")).
Msgf("update successfully sent on chan")
notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx)).Inc()
n.tracef(nodeID, "update successfully sent on chan, origin: %s, origin-hostname: %s", ctx.Value("origin"), ctx.Value("hostname"))
if debugHighCardinalityMetrics {
notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx), nodeID.String()).Inc()
} else {
notifierUpdateSent.WithLabelValues("ok", update.Type.String(), types.NotifyOriginKey.Value(ctx)).Inc()
}
}
}
}
func (n *Notifier) sendAll(update types.StateUpdate) {
start := time.Now()
n.l.RLock()
defer n.l.RUnlock()
notifierWaitersForLock.WithLabelValues("lock", "send-all").Inc()
n.l.Lock()
defer n.l.Unlock()
notifierWaitersForLock.WithLabelValues("lock", "send-all").Dec()
notifierWaitForLock.WithLabelValues("send-all").Observe(time.Since(start).Seconds())
for _, c := range n.nodes {
c <- update
notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc()
for id, c := range n.nodes {
// Whenever an update is sent to all nodes, there is a chance that the node
// has disconnected and the goroutine that was supposed to consume the update
// has shut down the channel and is waiting for the lock held here in RemoveNode.
// This means that there is potential for a deadlock which would stop all updates
// going out to clients. This timeout prevents that from happening by moving on to the
// next node if the context is cancelled. Afther sendAll releases the lock, the add/remove
// call will succeed and the update will go to the correct nodes on the next call.
ctx, cancel := context.WithTimeout(context.Background(), n.cfg.Tuning.NotifierSendTimeout)
defer cancel()
select {
case <-ctx.Done():
log.Error().
Err(ctx.Err()).
Uint64("node.id", id.Uint64()).
Msgf("update not sent, context cancelled")
if debugHighCardinalityMetrics {
notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all", id.String()).Inc()
} else {
notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all").Inc()
}
return
case c <- update:
if debugHighCardinalityMetrics {
notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all", id.String()).Inc()
} else {
notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc()
}
}
}
}
func (n *Notifier) String() string {
n.l.RLock()
defer n.l.RUnlock()
notifierWaitersForLock.WithLabelValues("lock", "string").Inc()
n.l.Lock()
defer n.l.Unlock()
notifierWaitersForLock.WithLabelValues("lock", "string").Dec()
var b strings.Builder
b.WriteString("chans:\n")
fmt.Fprintf(&b, "chans (%d):\n", len(n.nodes))
for k, v := range n.nodes {
fmt.Fprintf(&b, "\t%d: %p\n", k, v)
var keys []types.NodeID
n.connected.Range(func(key types.NodeID, value bool) bool {
keys = append(keys, key)
return true
})
sort.Slice(keys, func(i, j int) bool {
return keys[i] < keys[j]
})
for _, key := range keys {
fmt.Fprintf(&b, "\t%d: %p\n", key, n.nodes[key])
}
b.WriteString("\n")
b.WriteString("connected:\n")
fmt.Fprintf(&b, "connected (%d):\n", len(n.nodes))
n.connected.Range(func(k types.NodeID, v bool) bool {
fmt.Fprintf(&b, "\t%d: %t\n", k, v)
return true
})
for _, key := range keys {
val, _ := n.connected.Load(key)
fmt.Fprintf(&b, "\t%d: %t\n", key, val)
}
return b.String()
}
@ -230,13 +301,16 @@ func (b *batcher) close() {
// addOrPassthrough adds the update to the batcher, if it is not a
// type that is currently batched, it will be sent immediately.
func (b *batcher) addOrPassthrough(update types.StateUpdate) {
notifierBatcherWaitersForLock.WithLabelValues("lock", "add").Inc()
b.mu.Lock()
defer b.mu.Unlock()
notifierBatcherWaitersForLock.WithLabelValues("lock", "add").Dec()
switch update.Type {
case types.StatePeerChanged:
b.changedNodeIDs.Add(update.ChangeNodes...)
b.nodesChanged = true
notifierBatcherChanges.WithLabelValues().Set(float64(b.changedNodeIDs.Len()))
case types.StatePeerChangedPatch:
for _, newPatch := range update.ChangePatches {
@ -248,6 +322,7 @@ func (b *batcher) addOrPassthrough(update types.StateUpdate) {
}
}
b.patchesChanged = true
notifierBatcherPatches.WithLabelValues().Set(float64(len(b.patches)))
default:
b.n.sendAll(update)
@ -257,8 +332,10 @@ func (b *batcher) addOrPassthrough(update types.StateUpdate) {
// flush sends all the accumulated patches to all
// nodes in the notifier.
func (b *batcher) flush() {
notifierBatcherWaitersForLock.WithLabelValues("lock", "flush").Inc()
b.mu.Lock()
defer b.mu.Unlock()
notifierBatcherWaitersForLock.WithLabelValues("lock", "flush").Dec()
if b.nodesChanged || b.patchesChanged {
var patches []*tailcfg.PeerChange
@ -296,8 +373,10 @@ func (b *batcher) flush() {
}
b.changedNodeIDs = set.Slice[types.NodeID]{}
notifierBatcherChanges.WithLabelValues().Set(0)
b.nodesChanged = false
b.patches = make(map[types.NodeID]tailcfg.PeerChange, len(b.patches))
notifierBatcherPatches.WithLabelValues().Set(0)
b.patchesChanged = false
}
}

View file

@ -227,7 +227,7 @@ func TestBatcher(t *testing.T) {
ch := make(chan types.StateUpdate, 30)
defer close(ch)
n.AddNode(1, ch)
defer n.RemoveNode(1)
defer n.RemoveNode(1, ch)
for _, u := range tt.updates {
n.NotifyAll(context.Background(), u)