Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ jobs:
apps: |
[
{"name": "ev-node-evm", "dockerfile": "apps/evm/Dockerfile"},
{"name": "ev-node-grpc", "dockerfile": "apps/grpc/Dockerfile"},
{"name": "ev-node-testapp", "dockerfile": "apps/testapp/Dockerfile"}
]

Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ linters:
gosec:
excludes:
- G115
- G118
revive:
rules:
- name: package-comments
Expand Down
2 changes: 1 addition & 1 deletion apps/grpc/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ WORKDIR /ev-node
# Copy go mod files
COPY go.mod go.sum ./
COPY apps/grpc/go.mod apps/grpc/go.sum ./apps/grpc/
COPY core/go.mod ./core/
COPY core/go.mod core/go.sum ./core/
COPY execution/grpc/go.mod execution/grpc/go.sum ./execution/grpc/

# Download dependencies
Expand Down
41 changes: 33 additions & 8 deletions block/internal/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/evstack/ev-node/types"
)

// isNotFound is a convenience alias for store.IsNotFound.
var isNotFound = store.IsNotFound

const (
// HeaderDAIncludedPrefix is the store key prefix for header DA inclusion tracking.
HeaderDAIncludedPrefix = "cache/header-da-included/"
Expand Down Expand Up @@ -387,23 +390,45 @@ func (m *implementation) ClearFromStore() error {
return nil
}

// getMetadataUint64 reads an 8-byte little-endian uint64 from store metadata.
// Returns (0, false, nil) when the key is absent and a non-nil error for
// genuine backend failures or malformed values.
func getMetadataUint64(ctx context.Context, st store.Store, key string) (uint64, bool, error) {
b, err := st.GetMetadata(ctx, key)
if err != nil {
// Key absent — not an error, just missing.
if isNotFound(err) {
return 0, false, nil
}
return 0, false, fmt.Errorf("read metadata %q: %w", key, err)
}
if len(b) != 8 {
return 0, false, fmt.Errorf("invalid metadata length for %q: %d", key, len(b))
}
return binary.LittleEndian.Uint64(b), true, nil
}

// initDAHeightFromStore seeds maxDAHeight from the HeightToDAHeight metadata
// written by the submitter for the last finalized block. This ensures
// DaHeight() is non-zero on restart even when the in-flight snapshot is empty.
func (m *implementation) initDAHeightFromStore(ctx context.Context) {
daIncludedBytes, err := m.store.GetMetadata(ctx, store.DAIncludedHeightKey)
if err != nil || len(daIncludedBytes) != 8 {
daIncludedHeight, ok, err := getMetadataUint64(ctx, m.store, store.DAIncludedHeightKey)
if err != nil {
m.logger.Error().Err(err).Msg("failed to read DA included height from store")
return
}
daIncludedHeight := binary.LittleEndian.Uint64(daIncludedBytes)
if daIncludedHeight == 0 {
if !ok || daIncludedHeight == 0 {
return
}

if b, err := m.store.GetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(daIncludedHeight)); err == nil && len(b) == 8 {
m.headerCache.setMaxDAHeight(binary.LittleEndian.Uint64(b))
if h, ok, err := getMetadataUint64(ctx, m.store, store.GetHeightToDAHeightHeaderKey(daIncludedHeight)); err != nil {
m.logger.Error().Err(err).Msg("failed to read header DA height from store")
} else if ok {
m.headerCache.setMaxDAHeight(h)
}
if b, err := m.store.GetMetadata(ctx, store.GetHeightToDAHeightDataKey(daIncludedHeight)); err == nil && len(b) == 8 {
m.dataCache.setMaxDAHeight(binary.LittleEndian.Uint64(b))
if h, ok, err := getMetadataUint64(ctx, m.store, store.GetHeightToDAHeightDataKey(daIncludedHeight)); err != nil {
m.logger.Error().Err(err).Msg("failed to read data DA height from store")
} else if ok {
m.dataCache.setMaxDAHeight(h)
}
}
37 changes: 15 additions & 22 deletions block/internal/cache/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,10 @@ func tempConfig(t *testing.T) config.Config {
return cfg
}

// helper to make an in-memory store
func memStore(t *testing.T) pkgstore.Store {
ds, err := pkgstore.NewTestInMemoryKVStore()
require.NoError(t, err)
return pkgstore.New(ds)
}

func TestManager_HeaderDataOperations(t *testing.T) {
t.Parallel()
cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)

m, err := NewManager(cfg, st, zerolog.Nop())
require.NoError(t, err)
Expand All @@ -55,7 +48,7 @@ func TestManager_HeaderDataOperations(t *testing.T) {
func TestManager_PendingEventsCRUD(t *testing.T) {
t.Parallel()
cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)

m, err := NewManager(cfg, st, zerolog.Nop())
require.NoError(t, err)
Expand Down Expand Up @@ -89,7 +82,7 @@ func TestManager_PendingEventsCRUD(t *testing.T) {
func TestManager_SaveAndRestoreFromStore(t *testing.T) {
t.Parallel()
cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)
ctx := context.Background()

h1, d1 := types.GetRandomBlock(1, 1, "test-chain")
Expand Down Expand Up @@ -169,7 +162,7 @@ func TestManager_SaveAndRestoreFromStore(t *testing.T) {
func TestManager_GetNextPendingEvent_NonExistent(t *testing.T) {
t.Parallel()
cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)

m, err := NewManager(cfg, st, zerolog.Nop())
require.NoError(t, err)
Expand All @@ -181,7 +174,7 @@ func TestManager_GetNextPendingEvent_NonExistent(t *testing.T) {

func TestPendingHeadersAndData_Flow(t *testing.T) {
t.Parallel()
st := memStore(t)
st := testMemStore(t)
ctx := context.Background()
logger := zerolog.Nop()

Expand Down Expand Up @@ -247,7 +240,7 @@ func TestPendingHeadersAndData_Flow(t *testing.T) {
func TestManager_TxOperations(t *testing.T) {
t.Parallel()
cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)

m, err := NewManager(cfg, st, zerolog.Nop())
require.NoError(t, err)
Expand All @@ -268,7 +261,7 @@ func TestManager_TxOperations(t *testing.T) {
func TestManager_CleanupOldTxs(t *testing.T) {
t.Parallel()
cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)

m, err := NewManager(cfg, st, zerolog.Nop())
require.NoError(t, err)
Expand Down Expand Up @@ -296,7 +289,7 @@ func TestManager_CleanupOldTxs(t *testing.T) {
func TestManager_CleanupOldTxs_SelectiveRemoval(t *testing.T) {
t.Parallel()
cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)

m, err := NewManager(cfg, st, zerolog.Nop())
require.NoError(t, err)
Expand Down Expand Up @@ -330,7 +323,7 @@ func TestManager_CleanupOldTxs_SelectiveRemoval(t *testing.T) {
func TestManager_CleanupOldTxs_DefaultDuration(t *testing.T) {
t.Parallel()
cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)

m, err := NewManager(cfg, st, zerolog.Nop())
require.NoError(t, err)
Expand Down Expand Up @@ -359,7 +352,7 @@ func TestManager_CleanupOldTxs_DefaultDuration(t *testing.T) {
func TestManager_CleanupOldTxs_NoTransactions(t *testing.T) {
t.Parallel()
cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)

m, err := NewManager(cfg, st, zerolog.Nop())
require.NoError(t, err)
Expand All @@ -372,7 +365,7 @@ func TestManager_CleanupOldTxs_NoTransactions(t *testing.T) {
func TestManager_TxCache_NotPersistedToStore(t *testing.T) {
t.Parallel()
cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)

// Create first manager and add transactions
m1, err := NewManager(cfg, st, zerolog.Nop())
Expand Down Expand Up @@ -400,7 +393,7 @@ func TestManager_TxCache_NotPersistedToStore(t *testing.T) {
func TestManager_DeleteHeight_PreservesTxCache(t *testing.T) {
t.Parallel()
cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)

m, err := NewManager(cfg, st, zerolog.Nop())
require.NoError(t, err)
Expand Down Expand Up @@ -429,7 +422,7 @@ func TestManager_DeleteHeight_PreservesTxCache(t *testing.T) {
func TestManager_DAInclusionPersistence(t *testing.T) {
t.Parallel()
cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)
ctx := context.Background()

// Create blocks and save to store
Expand Down Expand Up @@ -484,7 +477,7 @@ func TestManager_DaHeightAfterCacheClear(t *testing.T) {
t.Parallel()

cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)
ctx := context.Background()

// Store a block first
Expand Down Expand Up @@ -529,7 +522,7 @@ func TestManager_DaHeightFromStoreOnRestore(t *testing.T) {
t.Parallel()

cfg := tempConfig(t)
st := memStore(t)
st := testMemStore(t)
ctx := context.Background()

// Store a block first
Expand Down
10 changes: 5 additions & 5 deletions block/internal/cache/pending_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func TestPendingData_BasicFlow(t *testing.T) {
t.Parallel()
ctx := context.Background()
store := memStore(t)
store := testMemStore(t)

// three blocks with transactions
chainID := "pd-basic"
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestPendingData_BasicFlow(t *testing.T) {
func TestPendingData_AdvancesPastEmptyData(t *testing.T) {
t.Parallel()
ctx := context.Background()
store := memStore(t)
store := testMemStore(t)

// Create blocks: non-empty, empty, empty, non-empty
chainID := "pd-empty"
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestPendingData_AdvancesPastEmptyData(t *testing.T) {
func TestPendingData_AdvancesPastAllEmptyToEnd(t *testing.T) {
t.Parallel()
ctx := context.Background()
store := memStore(t)
store := testMemStore(t)

// Create blocks: non-empty, empty, empty (all remaining are empty)
chainID := "pd-all-empty"
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestPendingData_AdvancesPastAllEmptyToEnd(t *testing.T) {
func TestPendingData_AdvancesPastEmptyAtStart(t *testing.T) {
t.Parallel()
ctx := context.Background()
store := memStore(t)
store := testMemStore(t)

// Create blocks: empty, empty, non-empty
chainID := "pd-empty-start"
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestPendingData_InitFromMetadata(t *testing.T) {
func TestPendingData_GetPending_PropagatesFetchError(t *testing.T) {
t.Parallel()
ctx := context.Background()
store := memStore(t)
store := testMemStore(t)

// Set height to 1 but do not save any block data
batch, err := store.NewBatch(ctx)
Expand Down
4 changes: 2 additions & 2 deletions block/internal/cache/pending_headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func TestPendingHeaders_BasicFlow(t *testing.T) {
t.Parallel()
ctx := context.Background()
store := memStore(t)
store := testMemStore(t)

// create and persist three blocks
chainID := "ph-basic"
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestPendingHeaders_BasicFlow(t *testing.T) {
func TestPendingHeaders_EmptyWhenUpToDate(t *testing.T) {
t.Parallel()
ctx := context.Background()
store := memStore(t)
store := testMemStore(t)

h, d := types.GetRandomBlock(1, 1, "ph-up")
batch, err := store.NewBatch(ctx)
Expand Down
42 changes: 20 additions & 22 deletions block/internal/submitting/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,17 +349,16 @@ func (s *Submitter) processDAInclusionLoop() {
return
}

// Persist DA included height before advancing in-memory state
if err := putUint64Metadata(s.ctx, s.store, store.DAIncludedHeightKey, nextHeight); err != nil {
s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("failed to persist DA included height")
break
}

// Update DA included height
s.SetDAIncludedHeight(nextHeight)
currentDAIncluded = nextHeight

// Persist DA included height
bz := make([]byte, 8)
binary.LittleEndian.PutUint64(bz, nextHeight)
if err := s.store.SetMetadata(s.ctx, store.DAIncludedHeightKey, bz); err != nil {
s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("failed to persist DA included height")
}

// Delete height cache for that height
// This can only be performed after the height has been persisted to store
s.cache.DeleteHeight(nextHeight)
Expand Down Expand Up @@ -415,6 +414,13 @@ func (s *Submitter) initializeDAIncludedHeight(ctx context.Context) error {
return nil
}

// putUint64Metadata encodes val as 8-byte little-endian and writes it to the store.
func putUint64Metadata(ctx context.Context, st store.Store, key string, val uint64) error {
bz := make([]byte, 8)
binary.LittleEndian.PutUint64(bz, val)
return st.SetMetadata(ctx, key, bz)
}

// sendCriticalError sends a critical error to the error channel without blocking
func (s *Submitter) sendCriticalError(err error) {
if s.errorCh != nil {
Expand All @@ -431,41 +437,33 @@ func (s *Submitter) sendCriticalError(err error) {
func (s *Submitter) setNodeHeightToDAHeight(ctx context.Context, height uint64, data *types.Data, genesisInclusion bool) error {
dataHash := data.DACommitment()

headerDaHeightBytes := make([]byte, 8)
daHeightForHeader, ok := s.cache.GetHeaderDAIncludedByHeight(height)
if !ok {
return fmt.Errorf("header for height %d not found in cache", height)
}
binary.LittleEndian.PutUint64(headerDaHeightBytes, daHeightForHeader)

if err := s.store.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(height), headerDaHeightBytes); err != nil {
if err := putUint64Metadata(ctx, s.store, store.GetHeightToDAHeightHeaderKey(height), daHeightForHeader); err != nil {
return err
}

genesisDAIncludedHeight := daHeightForHeader
dataDaHeightBytes := make([]byte, 8)
// For empty transactions, use the same DA height as the header
if bytes.Equal(dataHash, common.DataHashForEmptyTxs) {
binary.LittleEndian.PutUint64(dataDaHeightBytes, daHeightForHeader)
} else {
// For empty transactions, use the same DA height as the header.
dataDAHeight := daHeightForHeader
if !bytes.Equal(dataHash, common.DataHashForEmptyTxs) {
daHeightForData, ok := s.cache.GetDataDAIncludedByHeight(height)
if !ok {
return fmt.Errorf("data for height %d not found in cache", height)
}
binary.LittleEndian.PutUint64(dataDaHeightBytes, daHeightForData)

dataDAHeight = daHeightForData
// if data posted before header, use data da included height for genesis da height
genesisDAIncludedHeight = min(daHeightForData, genesisDAIncludedHeight)
}
if err := s.store.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(height), dataDaHeightBytes); err != nil {
if err := putUint64Metadata(ctx, s.store, store.GetHeightToDAHeightDataKey(height), dataDAHeight); err != nil {
return err
}

if genesisInclusion {
genesisDAIncludedHeightBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(genesisDAIncludedHeightBytes, genesisDAIncludedHeight)

if err := s.store.SetMetadata(ctx, store.GenesisDAHeightKey, genesisDAIncludedHeightBytes); err != nil {
if err := putUint64Metadata(ctx, s.store, store.GenesisDAHeightKey, genesisDAIncludedHeight); err != nil {
return err
}

Expand Down
Loading