Add worker reading extra_records_path from file (#2271)

* consolidate scheduled tasks into one goroutine

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>

* rename Tailcfg dns struct

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>

* add dns.extra_records_path option

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>

* prettier lint

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>

* go-fmt

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>

---------

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
This commit is contained in:
Kristoffer Dalby 2024-12-13 07:52:40 +00:00 committed by GitHub
parent 89a648c7dd
commit 380fcdba17
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 388 additions and 81 deletions

View file

@ -27,6 +27,7 @@ import (
"github.com/juanfont/headscale/hscontrol/db"
"github.com/juanfont/headscale/hscontrol/derp"
derpServer "github.com/juanfont/headscale/hscontrol/derp/server"
"github.com/juanfont/headscale/hscontrol/dns"
"github.com/juanfont/headscale/hscontrol/mapper"
"github.com/juanfont/headscale/hscontrol/notifier"
"github.com/juanfont/headscale/hscontrol/policy"
@ -88,8 +89,9 @@ type Headscale struct {
DERPMap *tailcfg.DERPMap
DERPServer *derpServer.DERPServer
polManOnce sync.Once
polMan policy.PolicyManager
polManOnce sync.Once
polMan policy.PolicyManager
extraRecordMan *dns.ExtraRecordsMan
mapper *mapper.Mapper
nodeNotifier *notifier.Notifier
@ -184,7 +186,7 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {
}
app.authProvider = authProvider
if app.cfg.DNSConfig != nil && app.cfg.DNSConfig.Proxied { // if MagicDNS
if app.cfg.TailcfgDNSConfig != nil && app.cfg.TailcfgDNSConfig.Proxied { // if MagicDNS
// TODO(kradalby): revisit why this takes a list.
var magicDNSDomains []dnsname.FQDN
@ -196,11 +198,11 @@ func NewHeadscale(cfg *types.Config) (*Headscale, error) {
}
// we might have routes already from Split DNS
if app.cfg.DNSConfig.Routes == nil {
app.cfg.DNSConfig.Routes = make(map[string][]*dnstype.Resolver)
if app.cfg.TailcfgDNSConfig.Routes == nil {
app.cfg.TailcfgDNSConfig.Routes = make(map[string][]*dnstype.Resolver)
}
for _, d := range magicDNSDomains {
app.cfg.DNSConfig.Routes[d.WithoutTrailingDot()] = nil
app.cfg.TailcfgDNSConfig.Routes[d.WithoutTrailingDot()] = nil
}
}
@ -237,23 +239,38 @@ func (h *Headscale) redirect(w http.ResponseWriter, req *http.Request) {
http.Redirect(w, req, target, http.StatusFound)
}
// expireExpiredNodes expires nodes that have an explicit expiry set
// after that expiry time has passed.
func (h *Headscale) expireExpiredNodes(ctx context.Context, every time.Duration) {
ticker := time.NewTicker(every)
func (h *Headscale) scheduledTasks(ctx context.Context) {
expireTicker := time.NewTicker(updateInterval)
defer expireTicker.Stop()
lastCheck := time.Unix(0, 0)
var update types.StateUpdate
var changed bool
lastExpiryCheck := time.Unix(0, 0)
derpTicker := time.NewTicker(h.cfg.DERP.UpdateFrequency)
defer derpTicker.Stop()
// If we dont want auto update, just stop the ticker
if !h.cfg.DERP.AutoUpdate {
derpTicker.Stop()
}
var extraRecordsUpdate <-chan []tailcfg.DNSRecord
if h.extraRecordMan != nil {
extraRecordsUpdate = h.extraRecordMan.UpdateCh()
} else {
extraRecordsUpdate = make(chan []tailcfg.DNSRecord)
}
for {
select {
case <-ctx.Done():
ticker.Stop()
log.Info().Caller().Msg("scheduled task worker is shutting down.")
return
case <-ticker.C:
case <-expireTicker.C:
var update types.StateUpdate
var changed bool
if err := h.db.Write(func(tx *gorm.DB) error {
lastCheck, update, changed = db.ExpireExpiredNodes(tx, lastCheck)
lastExpiryCheck, update, changed = db.ExpireExpiredNodes(tx, lastExpiryCheck)
return nil
}); err != nil {
@ -267,24 +284,8 @@ func (h *Headscale) expireExpiredNodes(ctx context.Context, every time.Duration)
ctx := types.NotifyCtx(context.Background(), "expire-expired", "na")
h.nodeNotifier.NotifyAll(ctx, update)
}
}
}
}
// scheduledDERPMapUpdateWorker refreshes the DERPMap stored on the global object
// at a set interval.
func (h *Headscale) scheduledDERPMapUpdateWorker(cancelChan <-chan struct{}) {
log.Info().
Dur("frequency", h.cfg.DERP.UpdateFrequency).
Msg("Setting up a DERPMap update worker")
ticker := time.NewTicker(h.cfg.DERP.UpdateFrequency)
for {
select {
case <-cancelChan:
return
case <-ticker.C:
case <-derpTicker.C:
log.Info().Msg("Fetching DERPMap updates")
h.DERPMap = derp.GetDERPMap(h.cfg.DERP)
if h.cfg.DERP.ServerEnabled && h.cfg.DERP.AutomaticallyAddEmbeddedDerpRegion {
@ -297,6 +298,19 @@ func (h *Headscale) scheduledDERPMapUpdateWorker(cancelChan <-chan struct{}) {
Type: types.StateDERPUpdated,
DERPMap: h.DERPMap,
})
case records, ok := <-extraRecordsUpdate:
if !ok {
continue
}
h.cfg.TailcfgDNSConfig.ExtraRecords = records
ctx := types.NotifyCtx(context.Background(), "dns-extrarecord", "all")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
// TODO(kradalby): We can probably do better than sending a full update here,
// but for now this will ensure that all of the nodes get the new records.
Type: types.StateFullUpdate,
})
}
}
}
@ -568,12 +582,6 @@ func (h *Headscale) Serve() error {
go h.DERPServer.ServeSTUN()
}
if h.cfg.DERP.AutoUpdate {
derpMapCancelChannel := make(chan struct{})
defer func() { derpMapCancelChannel <- struct{}{} }()
go h.scheduledDERPMapUpdateWorker(derpMapCancelChannel)
}
if len(h.DERPMap.Regions) == 0 {
return errEmptyInitialDERPMap
}
@ -591,9 +599,21 @@ func (h *Headscale) Serve() error {
h.ephemeralGC.Schedule(node.ID, h.cfg.EphemeralNodeInactivityTimeout)
}
expireNodeCtx, expireNodeCancel := context.WithCancel(context.Background())
defer expireNodeCancel()
go h.expireExpiredNodes(expireNodeCtx, updateInterval)
if h.cfg.DNSConfig.ExtraRecordsPath != "" {
h.extraRecordMan, err = dns.NewExtraRecordsManager(h.cfg.DNSConfig.ExtraRecordsPath)
if err != nil {
return fmt.Errorf("setting up extrarecord manager: %w", err)
}
h.cfg.TailcfgDNSConfig.ExtraRecords = h.extraRecordMan.Records()
go h.extraRecordMan.Run()
defer h.extraRecordMan.Close()
}
// Start all scheduled tasks, e.g. expiring nodes, derp updates and
// records updates
scheduleCtx, scheduleCancel := context.WithCancel(context.Background())
defer scheduleCancel()
go h.scheduledTasks(scheduleCtx)
if zl.GlobalLevel() == zl.TraceLevel {
zerolog.RespLog = true
@ -847,7 +867,7 @@ func (h *Headscale) Serve() error {
Str("signal", sig.String()).
Msg("Received signal to stop, shutting down gracefully")
expireNodeCancel()
scheduleCancel()
h.ephemeralGC.Close()
// Gracefully shut down servers