Skip to content

Commit a2a7a44

Browse files
authored
fix(core): add synchronization for race conditions (#101)
Due to the addition of hot-reloading config, and the realization in #99, nylon has many sections of code which exhibit race conditions. This PR tries to address some of the obvious race conditions.
1 parent 0242f9c commit a2a7a44

17 files changed

Lines changed: 522 additions & 327 deletions

.github/workflows/go-test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
with:
2626
go-version: ${{ env.GO_VERSION }}
2727
- name: Run unit test
28-
run: go run gotest.tools/gotestsum@latest -- -tags=router_test ./...
28+
run: go run gotest.tools/gotestsum@latest -- --race -tags=router_test ./...
2929
integration:
3030
runs-on: ubuntu-latest
3131
steps:
@@ -35,7 +35,7 @@ jobs:
3535
with:
3636
go-version: ${{ env.GO_VERSION }}
3737
- name: Run integration
38-
run: go run gotest.tools/gotestsum@latest -- -tags=integration ./integration/...
38+
run: go run gotest.tools/gotestsum@latest -- --race -tags=integration ./integration/...
3939
e2e:
4040
runs-on: ubuntu-latest
4141
steps:

core/entrypoint.go

Lines changed: 3 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,15 @@
11
package core
22

33
import (
4-
"context"
5-
"errors"
64
"fmt"
75
"log"
86
"log/slog"
97
"net/http"
108
"os"
11-
"os/signal"
12-
"path"
13-
"reflect"
14-
"runtime"
159
"runtime/trace"
16-
"syscall"
17-
"time"
1810

19-
"github.com/encodeous/nylon/perf"
2011
"github.com/encodeous/nylon/state"
21-
"github.com/encodeous/tint"
2212
"github.com/goccy/go-yaml"
23-
slogmulti "github.com/samber/slog-multi"
2413
)
2514

2615
func setupDebugging() {
@@ -135,146 +124,12 @@ func Bootstrap(centralPath, nodePath, logPath string, verbose bool) {
135124
if err != nil {
136125
panic(err)
137126
}
138-
err = Start(*centralCfg, *nodeCfg, level, centralPath, nil, nil)
127+
n, err := NewNylon(*centralCfg, *nodeCfg, level, centralPath, nil)
139128
if err != nil {
140129
panic(err)
141130
}
142-
}
143-
144-
func Start(ccfg state.CentralCfg, ncfg state.LocalCfg, logLevel slog.Level, configPath string, aux map[string]any, initNylon **Nylon) error {
145-
ctx, cancel := context.WithCancelCause(context.Background())
146-
147-
dispatch := make(chan func() error, 128)
148-
149-
handlers := make([]slog.Handler, 0)
150-
if state.DBG_log_json {
151-
handlers = append(handlers,
152-
slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
153-
Level: logLevel,
154-
}),
155-
)
156-
} else {
157-
handlers = append(handlers,
158-
tint.NewHandler(os.Stderr, &tint.Options{
159-
Level: logLevel,
160-
AddSource: false,
161-
CustomPrefix: string(ncfg.Id),
162-
ReplaceAttr: func(groups []string, attr slog.Attr) slog.Attr {
163-
if attr.Key == "time" {
164-
return slog.Attr{}
165-
}
166-
return attr
167-
},
168-
}))
169-
}
170-
171-
if ncfg.LogPath != "" {
172-
err := os.MkdirAll(path.Dir(ncfg.LogPath), 0700)
173-
if err != nil {
174-
return err
175-
}
176-
f, err := os.OpenFile(ncfg.LogPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0700)
177-
if err != nil {
178-
return err
179-
}
180-
handlers = append(handlers, slog.NewTextHandler(f, &slog.HandlerOptions{Level: logLevel}))
181-
}
182-
183-
logger := slog.New(
184-
slogmulti.Fanout(handlers...))
185-
186-
if ncfg.InterfaceName == "" {
187-
ncfg.InterfaceName = "nylon"
188-
}
189-
190-
n := &Nylon{
191-
Trace: &NylonTrace{},
192-
ConfigState: state.ConfigState{
193-
CentralCfg: ccfg,
194-
LocalCfg: ncfg,
195-
},
196-
Context: ctx,
197-
Cancel: cancel,
198-
DispatchChannel: dispatch,
199-
Log: logger,
200-
ConfigPath: configPath,
201-
AuxConfig: aux,
202-
}
203-
204-
n.Log.Info("init modules")
205-
206-
err := n.Init()
131+
err = n.Start()
207132
if err != nil {
208-
return err
209-
}
210-
if initNylon != nil {
211-
*initNylon = n
212-
}
213-
n.Log.Info("init modules complete")
214-
215-
n.Log.Info("Nylon has been initialized. To gracefully exit, send SIGINT or Ctrl+C.")
216-
217-
c := make(chan os.Signal, 1)
218-
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
219-
go func() {
220-
select {
221-
case _ = <-c:
222-
n.Cancel(errors.New("received shutdown signal"))
223-
case <-ctx.Done():
224-
return
225-
}
226-
}()
227-
228-
err = MainLoop(n, dispatch)
229-
if err != nil {
230-
return err
231-
}
232-
return nil
233-
}
234-
235-
func MainLoop(n *Nylon, dispatch <-chan func() error) error {
236-
n.Log.Debug("started main loop")
237-
for {
238-
select {
239-
case fun := <-dispatch:
240-
if fun == nil {
241-
goto endLoop
242-
}
243-
//n.Log.Debug("start")
244-
start := time.Now()
245-
err := fun()
246-
if err != nil {
247-
n.Log.Error("error occurred during dispatch: ", "error", err)
248-
n.Cancel(err)
249-
}
250-
elapsed := time.Since(start)
251-
perf.DispatchLatency.Add(float64(elapsed.Microseconds()))
252-
if elapsed > time.Millisecond*4 {
253-
n.Log.Warn("dispatch took a long time!", "fun", runtime.FuncForPC(reflect.ValueOf(fun).Pointer()).Name(), "elapsed", elapsed, "len", len(dispatch))
254-
}
255-
//n.Log.Debug("done", "elapsed", elapsed)
256-
case <-n.Context.Done():
257-
goto endLoop
258-
}
133+
panic(err)
259134
}
260-
endLoop:
261-
n.Log.Info("stopped main loop", "reason", context.Cause(n.Context).Error())
262-
Stop(n)
263-
return nil
264-
}
265-
266-
func Stop(n *Nylon) {
267-
n.cleanupOnce.Do(func() {
268-
n.Cancel(context.Canceled)
269-
if n.DispatchChannel != nil {
270-
close(n.DispatchChannel)
271-
n.DispatchChannel = nil
272-
}
273-
n.Log.Info("cleaning up modules")
274-
err := n.Cleanup()
275-
if err != nil {
276-
n.Log.Error("error occurred during Stop: ", "error", err)
277-
}
278-
n.Log.Info("stopped")
279-
})
280135
}

core/ipc_handler.go

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,37 @@ func HandleNylonIPC(n *Nylon, rw *bufio.ReadWriter) error {
5959
}
6060
return device.ErrIPCStatusHandled
6161
}
62-
var resp *protocol.IpcResponse
63-
switch req.Request.(type) {
64-
case *protocol.IpcRequest_Status:
65-
resp = handleStatus(n, req.GetStatus())
66-
case *protocol.IpcRequest_Probe:
67-
resp = handleIPCProbe(n, req.GetProbe())
68-
case *protocol.IpcRequest_Reload:
69-
resp = handleIPCReload(n, req.GetReload())
70-
case *protocol.IpcRequest_Trace:
62+
63+
// trace is blocking, so we dont dispatch
64+
if _, ok := req.Request.(*protocol.IpcRequest_Trace); ok {
7165
return handleTrace(n, rw)
72-
default:
73-
resp = errResponse("unknown method")
66+
}
67+
68+
done := make(chan *protocol.IpcResponse, 1)
69+
n.Dispatch(func() error {
70+
var resp *protocol.IpcResponse
71+
switch req.Request.(type) {
72+
case *protocol.IpcRequest_Status:
73+
resp = handleStatus(n, req.GetStatus())
74+
case *protocol.IpcRequest_Probe:
75+
resp = handleIPCProbe(n, req.GetProbe())
76+
case *protocol.IpcRequest_Reload:
77+
resp = handleIPCReload(n, req.GetReload())
78+
default:
79+
resp = errResponse("unknown method")
80+
}
81+
done <- resp
82+
return nil
83+
})
84+
85+
var resp *protocol.IpcResponse
86+
select {
87+
case resp = <-done:
88+
case <-n.Context.Done():
89+
resp = errResponse("nylon shutting down")
90+
case <-time.After(1 * time.Second):
91+
// nylon is too busy to handle IPC requests
92+
resp = errResponse("timed out waiting for dispatch")
7493
}
7594
if err := writeResponse(rw, resp); err != nil {
7695
return err
@@ -231,15 +250,15 @@ func buildRouteTables(n *Nylon) *protocol.RouteTables {
231250
slices.SortFunc(tables.Selected, func(a, b *protocol.SelRoute) int {
232251
return comparePubRoute(a.PubRoute, b.PubRoute)
233252
})
234-
for prefix, route := range n.router.ForwardTable.All() {
253+
for prefix, route := range n.router.ForwardTable.Load().All() {
235254
tables.Forward = append(tables.Forward, &protocol.RouteTableEntry{
236255
Prefix: prefix.String(),
237256
Nh: string(route.Nh),
238257
Blackhole: route.Blackhole,
239258
})
240259
}
241260
sortRouteTableEntries(tables.Forward)
242-
for prefix, route := range n.router.ExitTable.All() {
261+
for prefix, route := range n.router.ExitTable.Load().All() {
243262
tables.Exit = append(tables.Exit, &protocol.RouteTableEntry{
244263
Prefix: prefix.String(),
245264
Nh: string(route.Nh),
@@ -401,7 +420,7 @@ func handleIPCProbe(n *Nylon, req *protocol.ProbeRequest) *protocol.IpcResponse
401420
for _, ep := range neigh.Eps {
402421
nep := ep.AsNylonEndpoint()
403422
addr := nep.DynEP.Value
404-
err := n.Probe(neigh.Id, nep)
423+
err := n.Probe(neigh.Id, nep, true)
405424
r := &protocol.EndpointProbeResult{Address: addr, Success: err == nil}
406425
if err != nil {
407426
r.Error = err.Error()
@@ -420,10 +439,12 @@ func handleIPCReload(n *Nylon, req *protocol.ReloadRequest) *protocol.IpcRespons
420439
return errResponse(fmt.Sprintf("read file: %v", err))
421440
}
422441
var cfg state.CentralCfg
423-
if err := yaml.Unmarshal(data, &cfg); err != nil {
442+
if err = yaml.Unmarshal(data, &cfg); err != nil {
424443
return errResponse(fmt.Sprintf("parse config: %v", err))
425444
}
426-
result, err := applyCentralConfigSync(n, cfg)
445+
// We're running on the dispatch goroutine, so call ApplyCentralConfig
446+
// directly rather than re-dispatching (which would deadlock).
447+
result, err := n.ApplyCentralConfig(&cfg)
427448
msg := ""
428449
if err != nil {
429450
msg = err.Error()
@@ -448,28 +469,6 @@ func handleIPCReload(n *Nylon, req *protocol.ReloadRequest) *protocol.IpcRespons
448469
}
449470
}
450471

451-
func applyCentralConfigSync(n *Nylon, cfg state.CentralCfg) (ApplyResult, error) {
452-
type result struct {
453-
applyResult ApplyResult
454-
err error
455-
}
456-
done := make(chan result, 1)
457-
n.Dispatch(func() error {
458-
applyResult, err := n.ApplyCentralConfig(cfg)
459-
done <- result{applyResult: applyResult, err: err}
460-
return nil
461-
})
462-
463-
select {
464-
case r := <-done:
465-
return r.applyResult, r.err
466-
case <-n.Context.Done():
467-
return ApplyRejected, context.Cause(n.Context)
468-
case <-time.After(30 * time.Second):
469-
return ApplyRejected, fmt.Errorf("timed out waiting for config reload")
470-
}
471-
}
472-
473472
func handleTrace(n *Nylon, rw *bufio.ReadWriter) error {
474473
if !state.DBG_trace_tc {
475474
if err := writeResponse(rw, errResponse("tracing not enabled; restart with --dbg-trace-tc")); err != nil {

0 commit comments

Comments
 (0)