Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/go-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
with:
go-version: ${{ env.GO_VERSION }}
- name: Run unit test
run: go run gotest.tools/gotestsum@latest -- -tags=router_test ./...
run: go run gotest.tools/gotestsum@latest -- --race -tags=router_test ./...
integration:
runs-on: ubuntu-latest
steps:
Expand All @@ -35,7 +35,7 @@ jobs:
with:
go-version: ${{ env.GO_VERSION }}
- name: Run integration
run: go run gotest.tools/gotestsum@latest -- -tags=integration ./integration/...
run: go run gotest.tools/gotestsum@latest -- --race -tags=integration ./integration/...
e2e:
runs-on: ubuntu-latest
steps:
Expand Down
151 changes: 3 additions & 148 deletions core/entrypoint.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,15 @@
package core

import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
"path"
"reflect"
"runtime"
"runtime/trace"
"syscall"
"time"

"github.com/encodeous/nylon/perf"
"github.com/encodeous/nylon/state"
"github.com/encodeous/tint"
"github.com/goccy/go-yaml"
slogmulti "github.com/samber/slog-multi"
)

func setupDebugging() {
Expand Down Expand Up @@ -135,146 +124,12 @@ func Bootstrap(centralPath, nodePath, logPath string, verbose bool) {
if err != nil {
panic(err)
}
err = Start(*centralCfg, *nodeCfg, level, centralPath, nil, nil)
n, err := NewNylon(*centralCfg, *nodeCfg, level, centralPath, nil)
if err != nil {
panic(err)
}
}

func Start(ccfg state.CentralCfg, ncfg state.LocalCfg, logLevel slog.Level, configPath string, aux map[string]any, initNylon **Nylon) error {
ctx, cancel := context.WithCancelCause(context.Background())

dispatch := make(chan func() error, 128)

handlers := make([]slog.Handler, 0)
if state.DBG_log_json {
handlers = append(handlers,
slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: logLevel,
}),
)
} else {
handlers = append(handlers,
tint.NewHandler(os.Stderr, &tint.Options{
Level: logLevel,
AddSource: false,
CustomPrefix: string(ncfg.Id),
ReplaceAttr: func(groups []string, attr slog.Attr) slog.Attr {
if attr.Key == "time" {
return slog.Attr{}
}
return attr
},
}))
}

if ncfg.LogPath != "" {
err := os.MkdirAll(path.Dir(ncfg.LogPath), 0700)
if err != nil {
return err
}
f, err := os.OpenFile(ncfg.LogPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0700)
if err != nil {
return err
}
handlers = append(handlers, slog.NewTextHandler(f, &slog.HandlerOptions{Level: logLevel}))
}

logger := slog.New(
slogmulti.Fanout(handlers...))

if ncfg.InterfaceName == "" {
ncfg.InterfaceName = "nylon"
}

n := &Nylon{
Trace: &NylonTrace{},
ConfigState: state.ConfigState{
CentralCfg: ccfg,
LocalCfg: ncfg,
},
Context: ctx,
Cancel: cancel,
DispatchChannel: dispatch,
Log: logger,
ConfigPath: configPath,
AuxConfig: aux,
}

n.Log.Info("init modules")

err := n.Init()
err = n.Start()
if err != nil {
return err
}
if initNylon != nil {
*initNylon = n
}
n.Log.Info("init modules complete")

n.Log.Info("Nylon has been initialized. To gracefully exit, send SIGINT or Ctrl+C.")

c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
go func() {
select {
case _ = <-c:
n.Cancel(errors.New("received shutdown signal"))
case <-ctx.Done():
return
}
}()

err = MainLoop(n, dispatch)
if err != nil {
return err
}
return nil
}

func MainLoop(n *Nylon, dispatch <-chan func() error) error {
n.Log.Debug("started main loop")
for {
select {
case fun := <-dispatch:
if fun == nil {
goto endLoop
}
//n.Log.Debug("start")
start := time.Now()
err := fun()
if err != nil {
n.Log.Error("error occurred during dispatch: ", "error", err)
n.Cancel(err)
}
elapsed := time.Since(start)
perf.DispatchLatency.Add(float64(elapsed.Microseconds()))
if elapsed > time.Millisecond*4 {
n.Log.Warn("dispatch took a long time!", "fun", runtime.FuncForPC(reflect.ValueOf(fun).Pointer()).Name(), "elapsed", elapsed, "len", len(dispatch))
}
//n.Log.Debug("done", "elapsed", elapsed)
case <-n.Context.Done():
goto endLoop
}
panic(err)
}
endLoop:
n.Log.Info("stopped main loop", "reason", context.Cause(n.Context).Error())
Stop(n)
return nil
}

func Stop(n *Nylon) {
n.cleanupOnce.Do(func() {
n.Cancel(context.Canceled)
if n.DispatchChannel != nil {
close(n.DispatchChannel)
n.DispatchChannel = nil
}
n.Log.Info("cleaning up modules")
err := n.Cleanup()
if err != nil {
n.Log.Error("error occurred during Stop: ", "error", err)
}
n.Log.Info("stopped")
})
}
75 changes: 37 additions & 38 deletions core/ipc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,37 @@ func HandleNylonIPC(n *Nylon, rw *bufio.ReadWriter) error {
}
return device.ErrIPCStatusHandled
}
var resp *protocol.IpcResponse
switch req.Request.(type) {
case *protocol.IpcRequest_Status:
resp = handleStatus(n, req.GetStatus())
case *protocol.IpcRequest_Probe:
resp = handleIPCProbe(n, req.GetProbe())
case *protocol.IpcRequest_Reload:
resp = handleIPCReload(n, req.GetReload())
case *protocol.IpcRequest_Trace:

// trace is blocking, so we dont dispatch
if _, ok := req.Request.(*protocol.IpcRequest_Trace); ok {
return handleTrace(n, rw)
default:
resp = errResponse("unknown method")
}

done := make(chan *protocol.IpcResponse, 1)
n.Dispatch(func() error {
var resp *protocol.IpcResponse
switch req.Request.(type) {
case *protocol.IpcRequest_Status:
resp = handleStatus(n, req.GetStatus())
case *protocol.IpcRequest_Probe:
resp = handleIPCProbe(n, req.GetProbe())
case *protocol.IpcRequest_Reload:
resp = handleIPCReload(n, req.GetReload())
default:
resp = errResponse("unknown method")
}
done <- resp
return nil
})

var resp *protocol.IpcResponse
select {
case resp = <-done:
case <-n.Context.Done():
resp = errResponse("nylon shutting down")
case <-time.After(1 * time.Second):
// nylon is too busy to handle IPC requests
resp = errResponse("timed out waiting for dispatch")
}
if err := writeResponse(rw, resp); err != nil {
return err
Expand Down Expand Up @@ -231,15 +250,15 @@ func buildRouteTables(n *Nylon) *protocol.RouteTables {
slices.SortFunc(tables.Selected, func(a, b *protocol.SelRoute) int {
return comparePubRoute(a.PubRoute, b.PubRoute)
})
for prefix, route := range n.router.ForwardTable.All() {
for prefix, route := range n.router.ForwardTable.Load().All() {
tables.Forward = append(tables.Forward, &protocol.RouteTableEntry{
Prefix: prefix.String(),
Nh: string(route.Nh),
Blackhole: route.Blackhole,
})
}
sortRouteTableEntries(tables.Forward)
for prefix, route := range n.router.ExitTable.All() {
for prefix, route := range n.router.ExitTable.Load().All() {
tables.Exit = append(tables.Exit, &protocol.RouteTableEntry{
Prefix: prefix.String(),
Nh: string(route.Nh),
Expand Down Expand Up @@ -401,7 +420,7 @@ func handleIPCProbe(n *Nylon, req *protocol.ProbeRequest) *protocol.IpcResponse
for _, ep := range neigh.Eps {
nep := ep.AsNylonEndpoint()
addr := nep.DynEP.Value
err := n.Probe(neigh.Id, nep)
err := n.Probe(neigh.Id, nep, true)
r := &protocol.EndpointProbeResult{Address: addr, Success: err == nil}
if err != nil {
r.Error = err.Error()
Expand All @@ -420,10 +439,12 @@ func handleIPCReload(n *Nylon, req *protocol.ReloadRequest) *protocol.IpcRespons
return errResponse(fmt.Sprintf("read file: %v", err))
}
var cfg state.CentralCfg
if err := yaml.Unmarshal(data, &cfg); err != nil {
if err = yaml.Unmarshal(data, &cfg); err != nil {
return errResponse(fmt.Sprintf("parse config: %v", err))
}
result, err := applyCentralConfigSync(n, cfg)
// We're running on the dispatch goroutine, so call ApplyCentralConfig
// directly rather than re-dispatching (which would deadlock).
result, err := n.ApplyCentralConfig(&cfg)
msg := ""
if err != nil {
msg = err.Error()
Expand All @@ -448,28 +469,6 @@ func handleIPCReload(n *Nylon, req *protocol.ReloadRequest) *protocol.IpcRespons
}
}

func applyCentralConfigSync(n *Nylon, cfg state.CentralCfg) (ApplyResult, error) {
type result struct {
applyResult ApplyResult
err error
}
done := make(chan result, 1)
n.Dispatch(func() error {
applyResult, err := n.ApplyCentralConfig(cfg)
done <- result{applyResult: applyResult, err: err}
return nil
})

select {
case r := <-done:
return r.applyResult, r.err
case <-n.Context.Done():
return ApplyRejected, context.Cause(n.Context)
case <-time.After(30 * time.Second):
return ApplyRejected, fmt.Errorf("timed out waiting for config reload")
}
}

func handleTrace(n *Nylon, rw *bufio.ReadWriter) error {
if !state.DBG_trace_tc {
if err := writeResponse(rw, errResponse("tracing not enabled; restart with --dbg-trace-tc")); err != nil {
Expand Down
Loading
Loading