Add compatibility with only websocket-capable clients (#2132)
* handle control protocol through websocket The necessary behaviour is already in place, but the wasm build only issued GETs, and the handler was not invoked. * get DERP-over-websocket working for wasm clients * Prepare for testing builtin websocket-over-DERP Still needs some way to assert that clients are connected through websockets, rather than the TCP hijacking version of DERP. * integration tests: properly differentiate between DERP transports * do not touch unrelated code * linter fixes * integration testing: unexport common implementation of derp server scenario * fixup! integration testing: unexport common implementation of derp server scenario * dockertestutil/logs: remove unhelpful comment * update changelog --------- Co-authored-by: Csaba Sarkadi <sarkadicsa@tutanota.de>
This commit is contained in:
parent
10a72e8d54
commit
1e61084898
14 changed files with 280 additions and 37 deletions
|
@ -242,5 +242,4 @@ func TestValidateResolvConf(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package dockertestutil
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
|
@ -13,6 +14,28 @@ import (
|
|||
|
||||
const filePerm = 0o644
|
||||
|
||||
func WriteLog(
|
||||
pool *dockertest.Pool,
|
||||
resource *dockertest.Resource,
|
||||
stdout io.Writer,
|
||||
stderr io.Writer,
|
||||
) error {
|
||||
return pool.Client.Logs(
|
||||
docker.LogsOptions{
|
||||
Context: context.TODO(),
|
||||
Container: resource.Container.ID,
|
||||
OutputStream: stdout,
|
||||
ErrorStream: stderr,
|
||||
Tail: "all",
|
||||
RawTerminal: false,
|
||||
Stdout: true,
|
||||
Stderr: true,
|
||||
Follow: false,
|
||||
Timestamps: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func SaveLog(
|
||||
pool *dockertest.Pool,
|
||||
resource *dockertest.Resource,
|
||||
|
@ -23,23 +46,8 @@ func SaveLog(
|
|||
return "", "", err
|
||||
}
|
||||
|
||||
var stdout bytes.Buffer
|
||||
var stderr bytes.Buffer
|
||||
|
||||
err = pool.Client.Logs(
|
||||
docker.LogsOptions{
|
||||
Context: context.TODO(),
|
||||
Container: resource.Container.ID,
|
||||
OutputStream: &stdout,
|
||||
ErrorStream: &stderr,
|
||||
Tail: "all",
|
||||
RawTerminal: false,
|
||||
Stdout: true,
|
||||
Stderr: true,
|
||||
Follow: false,
|
||||
Timestamps: false,
|
||||
},
|
||||
)
|
||||
var stdout, stderr bytes.Buffer
|
||||
err = WriteLog(pool, resource, &stdout, &stderr)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
|
|
@ -15,6 +15,11 @@ import (
|
|||
"github.com/ory/dockertest/v3"
|
||||
)
|
||||
|
||||
type ClientsSpec struct {
|
||||
Plain int
|
||||
WebsocketDERP int
|
||||
}
|
||||
|
||||
type EmbeddedDERPServerScenario struct {
|
||||
*Scenario
|
||||
|
||||
|
@ -22,6 +27,65 @@ type EmbeddedDERPServerScenario struct {
|
|||
}
|
||||
|
||||
func TestDERPServerScenario(t *testing.T) {
|
||||
spec := map[string]ClientsSpec{
|
||||
"user1": {
|
||||
Plain: len(MustTestVersions),
|
||||
WebsocketDERP: 0,
|
||||
},
|
||||
}
|
||||
|
||||
derpServerScenario(t, spec, func(scenario *EmbeddedDERPServerScenario) {
|
||||
allClients, err := scenario.ListTailscaleClients()
|
||||
assertNoErrListClients(t, err)
|
||||
t.Logf("checking %d clients for websocket connections", len(allClients))
|
||||
|
||||
for _, client := range allClients {
|
||||
if didClientUseWebsocketForDERP(t, client) {
|
||||
t.Logf(
|
||||
"client %q used websocket a connection, but was not expected to",
|
||||
client.Hostname(),
|
||||
)
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestDERPServerWebsocketScenario(t *testing.T) {
|
||||
spec := map[string]ClientsSpec{
|
||||
"user1": {
|
||||
Plain: 0,
|
||||
WebsocketDERP: len(MustTestVersions),
|
||||
},
|
||||
}
|
||||
|
||||
derpServerScenario(t, spec, func(scenario *EmbeddedDERPServerScenario) {
|
||||
allClients, err := scenario.ListTailscaleClients()
|
||||
assertNoErrListClients(t, err)
|
||||
t.Logf("checking %d clients for websocket connections", len(allClients))
|
||||
|
||||
for _, client := range allClients {
|
||||
if !didClientUseWebsocketForDERP(t, client) {
|
||||
t.Logf(
|
||||
"client %q does not seem to have used a websocket connection, even though it was expected to do so",
|
||||
client.Hostname(),
|
||||
)
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// This function implements the common parts of a DERP scenario,
|
||||
// we *want* it to show up in stacktraces,
|
||||
// so marking it as a test helper would be counterproductive.
|
||||
//
|
||||
//nolint:thelper
|
||||
func derpServerScenario(
|
||||
t *testing.T,
|
||||
spec map[string]ClientsSpec,
|
||||
furtherAssertions ...func(*EmbeddedDERPServerScenario),
|
||||
) {
|
||||
IntegrationSkip(t)
|
||||
// t.Parallel()
|
||||
|
||||
|
@ -34,20 +98,18 @@ func TestDERPServerScenario(t *testing.T) {
|
|||
}
|
||||
defer scenario.ShutdownAssertNoPanics(t)
|
||||
|
||||
spec := map[string]int{
|
||||
"user1": len(MustTestVersions),
|
||||
}
|
||||
|
||||
err = scenario.CreateHeadscaleEnv(
|
||||
spec,
|
||||
hsic.WithTestName("derpserver"),
|
||||
hsic.WithExtraPorts([]string{"3478/udp"}),
|
||||
hsic.WithEmbeddedDERPServerOnly(),
|
||||
hsic.WithPort(443),
|
||||
hsic.WithTLS(),
|
||||
hsic.WithHostnameAsServerURL(),
|
||||
hsic.WithConfigEnv(map[string]string{
|
||||
"HEADSCALE_DERP_AUTO_UPDATE_ENABLED": "true",
|
||||
"HEADSCALE_DERP_UPDATE_FREQUENCY": "10s",
|
||||
"HEADSCALE_LISTEN_ADDR": "0.0.0.0:443",
|
||||
}),
|
||||
)
|
||||
assertNoErrHeadscaleEnv(t, err)
|
||||
|
@ -76,6 +138,11 @@ func TestDERPServerScenario(t *testing.T) {
|
|||
}
|
||||
|
||||
success := pingDerpAllHelper(t, allClients, allHostnames)
|
||||
if len(allHostnames)*len(allClients) > success {
|
||||
t.FailNow()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
for _, client := range allClients {
|
||||
status, err := client.Status()
|
||||
|
@ -98,6 +165,9 @@ func TestDERPServerScenario(t *testing.T) {
|
|||
time.Sleep(30 * time.Second)
|
||||
|
||||
success = pingDerpAllHelper(t, allClients, allHostnames)
|
||||
if len(allHostnames)*len(allClients) > success {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
for _, client := range allClients {
|
||||
status, err := client.Status()
|
||||
|
@ -114,10 +184,14 @@ func TestDERPServerScenario(t *testing.T) {
|
|||
}
|
||||
|
||||
t.Logf("Run2: %d successful pings out of %d", success, len(allClients)*len(allHostnames))
|
||||
|
||||
for _, check := range furtherAssertions {
|
||||
check(&scenario)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EmbeddedDERPServerScenario) CreateHeadscaleEnv(
|
||||
users map[string]int,
|
||||
users map[string]ClientsSpec,
|
||||
opts ...hsic.Option,
|
||||
) error {
|
||||
hsServer, err := s.Headscale(opts...)
|
||||
|
@ -137,6 +211,7 @@ func (s *EmbeddedDERPServerScenario) CreateHeadscaleEnv(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("headscale server ip address: %s", hsServer.GetIP())
|
||||
|
||||
hash, err := util.GenerateRandomStringDNSSafe(scenarioHashLength)
|
||||
if err != nil {
|
||||
|
@ -149,14 +224,31 @@ func (s *EmbeddedDERPServerScenario) CreateHeadscaleEnv(
|
|||
return err
|
||||
}
|
||||
|
||||
err = s.CreateTailscaleIsolatedNodesInUser(
|
||||
hash,
|
||||
userName,
|
||||
"all",
|
||||
clientCount,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
if clientCount.Plain > 0 {
|
||||
// Containers that use default DERP config
|
||||
err = s.CreateTailscaleIsolatedNodesInUser(
|
||||
hash,
|
||||
userName,
|
||||
"all",
|
||||
clientCount.Plain,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if clientCount.WebsocketDERP > 0 {
|
||||
// Containers that use DERP-over-WebSocket
|
||||
err = s.CreateTailscaleIsolatedNodesInUser(
|
||||
hash,
|
||||
userName,
|
||||
"all",
|
||||
clientCount.WebsocketDERP,
|
||||
tsic.WithWebsocketDERP(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
key, err := s.CreatePreAuthKey(userName, true, false)
|
||||
|
|
|
@ -461,6 +461,12 @@ func (t *HeadscaleInContainer) Shutdown() (string, string, error) {
|
|||
return stdoutPath, stderrPath, t.pool.Purge(t.container)
|
||||
}
|
||||
|
||||
// WriteLogs writes the current stdout/stderr log of the container to
|
||||
// the given io.Writers.
|
||||
func (t *HeadscaleInContainer) WriteLogs(stdout, stderr io.Writer) error {
|
||||
return dockertestutil.WriteLog(t.pool, t.container, stdout, stderr)
|
||||
}
|
||||
|
||||
// SaveLog saves the current stdout log of the container to a path
|
||||
// on the host system.
|
||||
func (t *HeadscaleInContainer) SaveLog(path string) (string, string, error) {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package integration
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/netip"
|
||||
"net/url"
|
||||
|
||||
|
@ -41,4 +42,6 @@ type TailscaleClient interface {
|
|||
// FailingPeersAsString returns a formatted-ish multi-line-string of peers in the client
|
||||
// and a bool indicating if the clients online count and peer count is equal.
|
||||
FailingPeersAsString() (string, bool, error)
|
||||
|
||||
WriteLogs(stdout, stderr io.Writer) error
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ type TailscaleInContainer struct {
|
|||
// optional config
|
||||
headscaleCert []byte
|
||||
headscaleHostname string
|
||||
withWebsocketDERP bool
|
||||
withSSH bool
|
||||
withTags []string
|
||||
withEntrypoint []string
|
||||
|
@ -126,6 +127,14 @@ func WithTags(tags []string) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithWebsocketDERP toggles a development knob to
|
||||
// force enable DERP connection through the new websocket protocol.
|
||||
func WithWebsocketDERP(enabled bool) Option {
|
||||
return func(tsic *TailscaleInContainer) {
|
||||
tsic.withWebsocketDERP = enabled
|
||||
}
|
||||
}
|
||||
|
||||
// WithSSH enables SSH for the Tailscale instance.
|
||||
func WithSSH() Option {
|
||||
return func(tsic *TailscaleInContainer) {
|
||||
|
@ -206,6 +215,14 @@ func New(
|
|||
// },
|
||||
Entrypoint: tsic.withEntrypoint,
|
||||
ExtraHosts: tsic.withExtraHosts,
|
||||
Env: []string{},
|
||||
}
|
||||
|
||||
if tsic.withWebsocketDERP {
|
||||
tailscaleOptions.Env = append(
|
||||
tailscaleOptions.Env,
|
||||
fmt.Sprintf("TS_DEBUG_DERP_WS_CLIENT=%t", tsic.withWebsocketDERP),
|
||||
)
|
||||
}
|
||||
|
||||
if tsic.headscaleHostname != "" {
|
||||
|
@ -351,6 +368,15 @@ func (t *TailscaleInContainer) Execute(
|
|||
return stdout, stderr, nil
|
||||
}
|
||||
|
||||
// Retrieve container logs.
|
||||
func (t *TailscaleInContainer) Logs(stdout, stderr io.Writer) error {
|
||||
return dockertestutil.WriteLog(
|
||||
t.pool,
|
||||
t.container,
|
||||
stdout, stderr,
|
||||
)
|
||||
}
|
||||
|
||||
// Up runs the login routine on the given Tailscale instance.
|
||||
// This login mechanism uses the authorised key for authentication.
|
||||
func (t *TailscaleInContainer) Login(
|
||||
|
@ -999,10 +1025,21 @@ func (t *TailscaleInContainer) WriteFile(path string, data []byte) error {
|
|||
// on the host system.
|
||||
func (t *TailscaleInContainer) SaveLog(path string) error {
|
||||
// TODO(kradalby): Assert if tailscale logs contains panics.
|
||||
// NOTE(enoperm): `t.WriteLog | countMatchingLines`
|
||||
// is probably most of what is for that,
|
||||
// but I'd rather not change the behaviour here,
|
||||
// as it may affect all the other tests
|
||||
// I have not otherwise touched.
|
||||
_, _, err := dockertestutil.SaveLog(t.pool, t.container, path)
|
||||
return err
|
||||
}
|
||||
|
||||
// WriteLogs writes the current stdout/stderr log of the container to
|
||||
// the given io.Writers.
|
||||
func (t *TailscaleInContainer) WriteLogs(stdout, stderr io.Writer) error {
|
||||
return dockertestutil.WriteLog(t.pool, t.container, stdout, stderr)
|
||||
}
|
||||
|
||||
// ReadFile reads a file from the Tailscale container.
|
||||
// It returns the content of the file as a byte slice.
|
||||
func (t *TailscaleInContainer) ReadFile(path string) ([]byte, error) {
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package integration
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -78,6 +81,25 @@ func assertContains(t *testing.T, str, subStr string) {
|
|||
}
|
||||
}
|
||||
|
||||
func didClientUseWebsocketForDERP(t *testing.T, client TailscaleClient) bool {
|
||||
t.Helper()
|
||||
|
||||
buf := &bytes.Buffer{}
|
||||
err := client.WriteLogs(buf, buf)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to fetch client logs: %s: %s", client.Hostname(), err)
|
||||
}
|
||||
|
||||
count, err := countMatchingLines(buf, func(line string) bool {
|
||||
return strings.Contains(line, "websocket: connected to ")
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to process client logs: %s: %s", client.Hostname(), err)
|
||||
}
|
||||
|
||||
return count > 0
|
||||
}
|
||||
|
||||
func pingAllHelper(t *testing.T, clients []TailscaleClient, addrs []string, opts ...tsic.PingOption) int {
|
||||
t.Helper()
|
||||
success := 0
|
||||
|
@ -113,7 +135,7 @@ func pingDerpAllHelper(t *testing.T, clients []TailscaleClient, addrs []string)
|
|||
tsic.WithPingUntilDirect(false),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to ping %s from %s: %s", addr, client.Hostname(), err)
|
||||
t.Logf("failed to ping %s from %s: %s", addr, client.Hostname(), err)
|
||||
} else {
|
||||
success++
|
||||
}
|
||||
|
@ -321,6 +343,25 @@ func dockertestMaxWait() time.Duration {
|
|||
return wait
|
||||
}
|
||||
|
||||
func countMatchingLines(in io.Reader, predicate func(string) bool) (int, error) {
|
||||
count := 0
|
||||
scanner := bufio.NewScanner(in)
|
||||
{
|
||||
const logBufferInitialSize = 1024 << 10 // preallocate 1 MiB
|
||||
buff := make([]byte, logBufferInitialSize)
|
||||
scanner.Buffer(buff, len(buff))
|
||||
scanner.Split(bufio.ScanLines)
|
||||
}
|
||||
|
||||
for scanner.Scan() {
|
||||
if predicate(scanner.Text()) {
|
||||
count += 1
|
||||
}
|
||||
}
|
||||
|
||||
return count, scanner.Err()
|
||||
}
|
||||
|
||||
// func dockertestCommandTimeout() time.Duration {
|
||||
// timeout := 10 * time.Second //nolint
|
||||
//
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue