Merge branch 'main' into configurable-mtls
This commit is contained in:
commit
7bf2a91dd0
56 changed files with 3033 additions and 643 deletions
325
app.go
325
app.go
|
@ -27,7 +27,6 @@ import (
|
|||
zerolog "github.com/philip-bui/grpc-zerolog"
|
||||
zl "github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/soheilhy/cmux"
|
||||
ginprometheus "github.com/zsais/go-gin-prometheus"
|
||||
"golang.org/x/crypto/acme"
|
||||
"golang.org/x/crypto/acme/autocert"
|
||||
|
@ -36,6 +35,7 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/reflection"
|
||||
|
@ -72,6 +72,8 @@ const (
|
|||
type Config struct {
|
||||
ServerURL string
|
||||
Addr string
|
||||
GRPCAddr string
|
||||
GRPCAllowInsecure bool
|
||||
EphemeralNodeInactivityTimeout time.Duration
|
||||
IPPrefixes []netaddr.IPPrefix
|
||||
PrivateKeyPath string
|
||||
|
@ -126,8 +128,8 @@ type DERPConfig struct {
|
|||
type CLIConfig struct {
|
||||
Address string
|
||||
APIKey string
|
||||
Insecure bool
|
||||
Timeout time.Duration
|
||||
Insecure bool
|
||||
}
|
||||
|
||||
// Headscale represents the base app of the service.
|
||||
|
@ -295,20 +297,6 @@ func (h *Headscale) expireEphemeralNodesWorker() {
|
|||
}
|
||||
}
|
||||
|
||||
// WatchForKVUpdates checks the KV DB table for requests to perform tailnet upgrades
|
||||
// This is a way to communitate the CLI with the headscale server.
|
||||
func (h *Headscale) watchForKVUpdates(milliSeconds int64) {
|
||||
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
|
||||
for range ticker.C {
|
||||
h.watchForKVUpdatesWorker()
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Headscale) watchForKVUpdatesWorker() {
|
||||
h.checkForNamespacesPendingUpdates()
|
||||
// more functions will come here in the future
|
||||
}
|
||||
|
||||
func (h *Headscale) grpcAuthenticationInterceptor(ctx context.Context,
|
||||
req interface{},
|
||||
info *grpc.UnaryServerInfo,
|
||||
|
@ -365,26 +353,26 @@ func (h *Headscale) grpcAuthenticationInterceptor(ctx context.Context,
|
|||
)
|
||||
}
|
||||
|
||||
// TODO(kradalby): Implement API key backend:
|
||||
// - Table in the DB
|
||||
// - Key name
|
||||
// - Encrypted
|
||||
// - Expiry
|
||||
//
|
||||
// Currently all other than localhost traffic is unauthorized, this is intentional to allow
|
||||
// us to make use of gRPC for our CLI, but not having to implement any of the remote capabilities
|
||||
// and API key auth
|
||||
return ctx, status.Error(
|
||||
codes.Unauthenticated,
|
||||
"Authentication is not implemented yet",
|
||||
)
|
||||
valid, err := h.ValidateAPIKey(strings.TrimPrefix(token, AuthPrefix))
|
||||
if err != nil {
|
||||
log.Error().
|
||||
Caller().
|
||||
Err(err).
|
||||
Str("client_address", client.Addr.String()).
|
||||
Msg("failed to validate token")
|
||||
|
||||
// if strings.TrimPrefix(token, AUTH_PREFIX) != a.Token {
|
||||
// log.Error().Caller().Str("client_address", p.Addr.String()).Msg("invalid token")
|
||||
// return ctx, status.Error(codes.Unauthenticated, "invalid token")
|
||||
// }
|
||||
return ctx, status.Error(codes.Internal, "failed to validate token")
|
||||
}
|
||||
|
||||
// return handler(ctx, req)
|
||||
if !valid {
|
||||
log.Info().
|
||||
Str("client_address", client.Addr.String()).
|
||||
Msg("invalid token")
|
||||
|
||||
return ctx, status.Error(codes.Unauthenticated, "invalid token")
|
||||
}
|
||||
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
func (h *Headscale) httpAuthenticationMiddleware(ctx *gin.Context) {
|
||||
|
@ -407,19 +395,30 @@ func (h *Headscale) httpAuthenticationMiddleware(ctx *gin.Context) {
|
|||
|
||||
ctx.AbortWithStatus(http.StatusUnauthorized)
|
||||
|
||||
// TODO(kradalby): Implement API key backend
|
||||
// Currently all traffic is unauthorized, this is intentional to allow
|
||||
// us to make use of gRPC for our CLI, but not having to implement any of the remote capabilities
|
||||
// and API key auth
|
||||
//
|
||||
// if strings.TrimPrefix(authHeader, AUTH_PREFIX) != a.Token {
|
||||
// log.Error().Caller().Str("client_address", c.ClientIP()).Msg("invalid token")
|
||||
// c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error", "unauthorized"})
|
||||
valid, err := h.ValidateAPIKey(strings.TrimPrefix(authHeader, AuthPrefix))
|
||||
if err != nil {
|
||||
log.Error().
|
||||
Caller().
|
||||
Err(err).
|
||||
Str("client_address", ctx.ClientIP()).
|
||||
Msg("failed to validate token")
|
||||
|
||||
// return
|
||||
// }
|
||||
ctx.AbortWithStatus(http.StatusInternalServerError)
|
||||
|
||||
// c.Next()
|
||||
return
|
||||
}
|
||||
|
||||
if !valid {
|
||||
log.Info().
|
||||
Str("client_address", ctx.ClientIP()).
|
||||
Msg("invalid token")
|
||||
|
||||
ctx.AbortWithStatus(http.StatusUnauthorized)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ctx.Next()
|
||||
}
|
||||
|
||||
// ensureUnixSocketIsAbsent will check if the given path for headscales unix socket is clear
|
||||
|
@ -433,15 +432,71 @@ func (h *Headscale) ensureUnixSocketIsAbsent() error {
|
|||
return os.Remove(h.cfg.UnixSocket)
|
||||
}
|
||||
|
||||
func (h *Headscale) createRouter(grpcMux *runtime.ServeMux) *gin.Engine {
|
||||
router := gin.Default()
|
||||
|
||||
prometheus := ginprometheus.NewPrometheus("gin")
|
||||
prometheus.Use(router)
|
||||
|
||||
router.GET(
|
||||
"/health",
|
||||
func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"healthy": "ok"}) },
|
||||
)
|
||||
router.GET("/key", h.KeyHandler)
|
||||
router.GET("/register", h.RegisterWebAPI)
|
||||
router.POST("/machine/:id/map", h.PollNetMapHandler)
|
||||
router.POST("/machine/:id", h.RegistrationHandler)
|
||||
router.GET("/oidc/register/:mkey", h.RegisterOIDC)
|
||||
router.GET("/oidc/callback", h.OIDCCallback)
|
||||
router.GET("/apple", h.AppleMobileConfig)
|
||||
router.GET("/apple/:platform", h.ApplePlatformConfig)
|
||||
router.GET("/swagger", SwaggerUI)
|
||||
router.GET("/swagger/v1/openapiv2.json", SwaggerAPIv1)
|
||||
|
||||
api := router.Group("/api")
|
||||
api.Use(h.httpAuthenticationMiddleware)
|
||||
{
|
||||
api.Any("/v1/*any", gin.WrapF(grpcMux.ServeHTTP))
|
||||
}
|
||||
|
||||
router.NoRoute(stdoutHandler)
|
||||
|
||||
return router
|
||||
}
|
||||
|
||||
// Serve launches a GIN server with the Headscale API.
|
||||
func (h *Headscale) Serve() error {
|
||||
var err error
|
||||
|
||||
// Fetch an initial DERP Map before we start serving
|
||||
h.DERPMap = GetDERPMap(h.cfg.DERP)
|
||||
|
||||
if h.cfg.DERP.AutoUpdate {
|
||||
derpMapCancelChannel := make(chan struct{})
|
||||
defer func() { derpMapCancelChannel <- struct{}{} }()
|
||||
go h.scheduledDERPMapUpdateWorker(derpMapCancelChannel)
|
||||
}
|
||||
|
||||
go h.expireEphemeralNodes(updateInterval)
|
||||
|
||||
if zl.GlobalLevel() == zl.TraceLevel {
|
||||
zerolog.RespLog = true
|
||||
} else {
|
||||
zerolog.RespLog = false
|
||||
}
|
||||
|
||||
// Prepare group for running listeners
|
||||
errorGroup := new(errgroup.Group)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
defer cancel()
|
||||
|
||||
//
|
||||
//
|
||||
// Set up LOCAL listeners
|
||||
//
|
||||
|
||||
err = h.ensureUnixSocketIsAbsent()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to remove old socket file: %w", err)
|
||||
|
@ -470,32 +525,13 @@ func (h *Headscale) Serve() error {
|
|||
os.Exit(0)
|
||||
}(sigc)
|
||||
|
||||
networkListener, err := net.Listen("tcp", h.cfg.Addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to bind to TCP address: %w", err)
|
||||
}
|
||||
|
||||
// Create the cmux object that will multiplex 2 protocols on the same port.
|
||||
// The two following listeners will be served on the same port below gracefully.
|
||||
networkMutex := cmux.New(networkListener)
|
||||
// Match gRPC requests here
|
||||
grpcListener := networkMutex.MatchWithWriters(
|
||||
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
|
||||
cmux.HTTP2MatchHeaderFieldSendSettings(
|
||||
"content-type",
|
||||
"application/grpc+proto",
|
||||
),
|
||||
)
|
||||
// Otherwise match regular http requests.
|
||||
httpListener := networkMutex.Match(cmux.Any())
|
||||
|
||||
grpcGatewayMux := runtime.NewServeMux()
|
||||
|
||||
// Make the grpc-gateway connect to grpc over socket
|
||||
grpcGatewayConn, err := grpc.Dial(
|
||||
h.cfg.UnixSocket,
|
||||
[]grpc.DialOption{
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithContextDialer(GrpcSocketDialer),
|
||||
}...,
|
||||
)
|
||||
|
@ -510,46 +546,80 @@ func (h *Headscale) Serve() error {
|
|||
return err
|
||||
}
|
||||
|
||||
router := gin.Default()
|
||||
// Start the local gRPC server without TLS and without authentication
|
||||
grpcSocket := grpc.NewServer(zerolog.UnaryInterceptor())
|
||||
|
||||
prometheus := ginprometheus.NewPrometheus("gin")
|
||||
prometheus.Use(router)
|
||||
v1.RegisterHeadscaleServiceServer(grpcSocket, newHeadscaleV1APIServer(h))
|
||||
reflection.Register(grpcSocket)
|
||||
|
||||
router.GET(
|
||||
"/health",
|
||||
func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"healthy": "ok"}) },
|
||||
)
|
||||
router.GET("/key", h.KeyHandler)
|
||||
router.GET("/register", h.RegisterWebAPI)
|
||||
router.POST("/machine/:id/map", h.PollNetMapHandler)
|
||||
router.POST("/machine/:id", h.RegistrationHandler)
|
||||
router.GET("/oidc/register/:mkey", h.RegisterOIDC)
|
||||
router.GET("/oidc/callback", h.OIDCCallback)
|
||||
router.GET("/apple", h.AppleMobileConfig)
|
||||
router.GET("/apple/:platform", h.ApplePlatformConfig)
|
||||
router.GET("/swagger", SwaggerUI)
|
||||
router.GET("/swagger/v1/openapiv2.json", SwaggerAPIv1)
|
||||
errorGroup.Go(func() error { return grpcSocket.Serve(socketListener) })
|
||||
|
||||
api := router.Group("/api")
|
||||
api.Use(h.httpAuthenticationMiddleware)
|
||||
{
|
||||
api.Any("/v1/*any", gin.WrapF(grpcGatewayMux.ServeHTTP))
|
||||
//
|
||||
//
|
||||
// Set up REMOTE listeners
|
||||
//
|
||||
|
||||
tlsConfig, err := h.getTLSSettings()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to set up TLS configuration")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
router.NoRoute(stdoutHandler)
|
||||
//
|
||||
//
|
||||
// gRPC setup
|
||||
//
|
||||
|
||||
// Fetch an initial DERP Map before we start serving
|
||||
h.DERPMap = GetDERPMap(h.cfg.DERP)
|
||||
// We are sadly not able to run gRPC and HTTPS (2.0) on the same
|
||||
// port because the connection mux does not support matching them
|
||||
// since they are so similar. There is multiple issues open and we
|
||||
// can revisit this if changes:
|
||||
// https://github.com/soheilhy/cmux/issues/68
|
||||
// https://github.com/soheilhy/cmux/issues/91
|
||||
|
||||
if h.cfg.DERP.AutoUpdate {
|
||||
derpMapCancelChannel := make(chan struct{})
|
||||
defer func() { derpMapCancelChannel <- struct{}{} }()
|
||||
go h.scheduledDERPMapUpdateWorker(derpMapCancelChannel)
|
||||
if tlsConfig != nil || h.cfg.GRPCAllowInsecure {
|
||||
log.Info().Msgf("Enabling remote gRPC at %s", h.cfg.GRPCAddr)
|
||||
|
||||
grpcOptions := []grpc.ServerOption{
|
||||
grpc.UnaryInterceptor(
|
||||
grpc_middleware.ChainUnaryServer(
|
||||
h.grpcAuthenticationInterceptor,
|
||||
zerolog.NewUnaryServerInterceptor(),
|
||||
),
|
||||
),
|
||||
}
|
||||
|
||||
if tlsConfig != nil {
|
||||
grpcOptions = append(grpcOptions,
|
||||
grpc.Creds(credentials.NewTLS(tlsConfig)),
|
||||
)
|
||||
} else {
|
||||
log.Warn().Msg("gRPC is running without security")
|
||||
}
|
||||
|
||||
grpcServer := grpc.NewServer(grpcOptions...)
|
||||
|
||||
v1.RegisterHeadscaleServiceServer(grpcServer, newHeadscaleV1APIServer(h))
|
||||
reflection.Register(grpcServer)
|
||||
|
||||
grpcListener, err := net.Listen("tcp", h.cfg.GRPCAddr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to bind to TCP address: %w", err)
|
||||
}
|
||||
|
||||
errorGroup.Go(func() error { return grpcServer.Serve(grpcListener) })
|
||||
|
||||
log.Info().
|
||||
Msgf("listening and serving gRPC on: %s", h.cfg.GRPCAddr)
|
||||
}
|
||||
|
||||
// I HATE THIS
|
||||
go h.watchForKVUpdates(updateInterval)
|
||||
go h.expireEphemeralNodes(updateInterval)
|
||||
//
|
||||
//
|
||||
// HTTP setup
|
||||
//
|
||||
|
||||
router := h.createRouter(grpcGatewayMux)
|
||||
|
||||
httpServer := &http.Server{
|
||||
Addr: h.cfg.Addr,
|
||||
|
@ -562,65 +632,21 @@ func (h *Headscale) Serve() error {
|
|||
WriteTimeout: 0,
|
||||
}
|
||||
|
||||
if zl.GlobalLevel() == zl.TraceLevel {
|
||||
zerolog.RespLog = true
|
||||
} else {
|
||||
zerolog.RespLog = false
|
||||
}
|
||||
|
||||
grpcOptions := []grpc.ServerOption{
|
||||
grpc.UnaryInterceptor(
|
||||
grpc_middleware.ChainUnaryServer(
|
||||
h.grpcAuthenticationInterceptor,
|
||||
zerolog.NewUnaryServerInterceptor(),
|
||||
),
|
||||
),
|
||||
}
|
||||
|
||||
tlsConfig, err := h.getTLSSettings()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to set up TLS configuration")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
var httpListener net.Listener
|
||||
if tlsConfig != nil {
|
||||
httpServer.TLSConfig = tlsConfig
|
||||
|
||||
grpcOptions = append(grpcOptions, grpc.Creds(credentials.NewTLS(tlsConfig)))
|
||||
}
|
||||
|
||||
grpcServer := grpc.NewServer(grpcOptions...)
|
||||
|
||||
// Start the local gRPC server without TLS and without authentication
|
||||
grpcSocket := grpc.NewServer(zerolog.UnaryInterceptor())
|
||||
|
||||
v1.RegisterHeadscaleServiceServer(grpcServer, newHeadscaleV1APIServer(h))
|
||||
v1.RegisterHeadscaleServiceServer(grpcSocket, newHeadscaleV1APIServer(h))
|
||||
reflection.Register(grpcServer)
|
||||
reflection.Register(grpcSocket)
|
||||
|
||||
errorGroup := new(errgroup.Group)
|
||||
|
||||
errorGroup.Go(func() error { return grpcSocket.Serve(socketListener) })
|
||||
|
||||
// TODO(kradalby): Verify if we need the same TLS setup for gRPC as HTTP
|
||||
errorGroup.Go(func() error { return grpcServer.Serve(grpcListener) })
|
||||
|
||||
if tlsConfig != nil {
|
||||
errorGroup.Go(func() error {
|
||||
tlsl := tls.NewListener(httpListener, tlsConfig)
|
||||
|
||||
return httpServer.Serve(tlsl)
|
||||
})
|
||||
httpListener, err = tls.Listen("tcp", h.cfg.Addr, tlsConfig)
|
||||
} else {
|
||||
errorGroup.Go(func() error { return httpServer.Serve(httpListener) })
|
||||
httpListener, err = net.Listen("tcp", h.cfg.Addr)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to bind to TCP address: %w", err)
|
||||
}
|
||||
|
||||
errorGroup.Go(func() error { return networkMutex.Serve() })
|
||||
errorGroup.Go(func() error { return httpServer.Serve(httpListener) })
|
||||
|
||||
log.Info().
|
||||
Msgf("listening and serving (multiplexed HTTP and gRPC) on: %s", h.cfg.Addr)
|
||||
Msgf("listening and serving HTTP on: %s", h.cfg.Addr)
|
||||
|
||||
return errorGroup.Wait()
|
||||
}
|
||||
|
@ -656,6 +682,7 @@ func (h *Headscale) getTLSSettings() (*tls.Config, error) {
|
|||
// service, which can be configured to run on any other port.
|
||||
go func() {
|
||||
log.Fatal().
|
||||
Caller().
|
||||
Err(http.ListenAndServe(h.cfg.TLSLetsEncryptListen, certManager.HTTPHandler(http.HandlerFunc(h.redirect)))).
|
||||
Msg("failed to set up a HTTP server")
|
||||
}()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue