Skip to content

Commit c0be77d

Browse files
Add atomic load/store.
1 parent 71f7be2 commit c0be77d

3 files changed

Lines changed: 268 additions & 30 deletions

File tree

ext/io/buffer/atomic.c

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,23 @@ static ID RB_IO_BUFFER_DATA_TYPE_S64;
8181
return val2num(result); \
8282
}
8383

84+
// Macro for atomic load (works on raw bytes in host endian)
85+
#define ATOMIC_LOAD_IMPL(name, ctype, atomic_type, size, num2val, val2num) \
86+
static VALUE atomic_load_##name(VALUE self, size_t offset) { \
87+
void *pointer = get_buffer_pointer(self, offset, size); \
88+
ctype result = atomic_load((atomic_type*)pointer); \
89+
return val2num(result); \
90+
}
91+
92+
// Macro for atomic store (works on raw bytes in host endian)
93+
#define ATOMIC_STORE_IMPL(name, ctype, atomic_type, size, num2val, val2num) \
94+
static VALUE atomic_store_##name(VALUE self, size_t offset, VALUE value) { \
95+
void *pointer = get_buffer_pointer(self, offset, size); \
96+
ctype converted_value = (ctype)num2val(value); \
97+
atomic_store((atomic_type*)pointer, converted_value); \
98+
return self; \
99+
}
100+
84101
// Macro for atomic compare and swap
85102
#define ATOMIC_CAS_IMPL(name, ctype, atomic_type, size, num2val, val2num) \
86103
static int atomic_cas_##name(VALUE self, size_t offset, VALUE expected_value, VALUE desired_value) { \
@@ -128,6 +145,25 @@ ATOMIC_BITWISE_IMPL(i32, int32_t, atomic_int_least32_t, 4, NUM2INT, INT2NUM, xor
128145
ATOMIC_BITWISE_IMPL(u64, uint64_t, atomic_uint_least64_t, 8, NUM2ULL, ULL2NUM, xor)
129146
ATOMIC_BITWISE_IMPL(i64, int64_t, atomic_int_least64_t, 8, NUM2LL, LL2NUM, xor)
130147

148+
// Atomic load/store work on raw bytes in host endian (both :u32 and :U32 map to same implementation)
149+
ATOMIC_LOAD_IMPL(u8, uint8_t, atomic_uint_least8_t, 1, NUM2UINT, UINT2NUM)
150+
ATOMIC_LOAD_IMPL(s8, int8_t, atomic_int_least8_t, 1, NUM2INT, INT2NUM)
151+
ATOMIC_LOAD_IMPL(u16, uint16_t, atomic_uint_least16_t, 2, NUM2UINT, UINT2NUM)
152+
ATOMIC_LOAD_IMPL(s16, int16_t, atomic_int_least16_t, 2, NUM2INT, INT2NUM)
153+
ATOMIC_LOAD_IMPL(u32, uint32_t, atomic_uint_least32_t, 4, NUM2UINT, UINT2NUM)
154+
ATOMIC_LOAD_IMPL(s32, int32_t, atomic_int_least32_t, 4, NUM2INT, INT2NUM)
155+
ATOMIC_LOAD_IMPL(u64, uint64_t, atomic_uint_least64_t, 8, NUM2ULL, ULL2NUM)
156+
ATOMIC_LOAD_IMPL(s64, int64_t, atomic_int_least64_t, 8, NUM2LL, LL2NUM)
157+
158+
ATOMIC_STORE_IMPL(u8, uint8_t, atomic_uint_least8_t, 1, NUM2UINT, UINT2NUM)
159+
ATOMIC_STORE_IMPL(s8, int8_t, atomic_int_least8_t, 1, NUM2INT, INT2NUM)
160+
ATOMIC_STORE_IMPL(u16, uint16_t, atomic_uint_least16_t, 2, NUM2UINT, UINT2NUM)
161+
ATOMIC_STORE_IMPL(s16, int16_t, atomic_int_least16_t, 2, NUM2INT, INT2NUM)
162+
ATOMIC_STORE_IMPL(u32, uint32_t, atomic_uint_least32_t, 4, NUM2UINT, UINT2NUM)
163+
ATOMIC_STORE_IMPL(s32, int32_t, atomic_int_least32_t, 4, NUM2INT, INT2NUM)
164+
ATOMIC_STORE_IMPL(u64, uint64_t, atomic_uint_least64_t, 8, NUM2ULL, ULL2NUM)
165+
ATOMIC_STORE_IMPL(s64, int64_t, atomic_int_least64_t, 8, NUM2LL, LL2NUM)
166+
131167
ATOMIC_CAS_IMPL(u8, uint8_t, atomic_uint_least8_t, 1, NUM2UINT, UINT2NUM)
132168
ATOMIC_CAS_IMPL(i8, int8_t, atomic_int_least8_t, 1, NUM2INT, INT2NUM)
133169
ATOMIC_CAS_IMPL(u16, uint16_t, atomic_uint_least16_t, 2, NUM2UINT, UINT2NUM)
@@ -242,6 +278,60 @@ static VALUE atomic_xor_impl(VALUE self, ID type_identifier, size_t offset, long
242278
return Qnil;
243279
}
244280

281+
static VALUE atomic_load_impl(VALUE self, ID type_identifier, size_t offset) {
282+
#define ATOMIC_LOAD_CASE(name, impl_name) \
283+
if (type_identifier == RB_IO_BUFFER_DATA_TYPE_##name) { \
284+
return atomic_load_##impl_name(self, offset); \
285+
}
286+
287+
// Support both uppercase and lowercase (they map to same implementation - atomic ops work on raw bytes)
288+
ATOMIC_LOAD_CASE(U8, u8)
289+
ATOMIC_LOAD_CASE(S8, s8)
290+
ATOMIC_LOAD_CASE(u16, u16)
291+
ATOMIC_LOAD_CASE(s16, s16)
292+
ATOMIC_LOAD_CASE(U16, u16)
293+
ATOMIC_LOAD_CASE(S16, s16)
294+
ATOMIC_LOAD_CASE(u32, u32)
295+
ATOMIC_LOAD_CASE(s32, s32)
296+
ATOMIC_LOAD_CASE(U32, u32)
297+
ATOMIC_LOAD_CASE(S32, s32)
298+
ATOMIC_LOAD_CASE(u64, u64)
299+
ATOMIC_LOAD_CASE(s64, s64)
300+
ATOMIC_LOAD_CASE(U64, u64)
301+
ATOMIC_LOAD_CASE(S64, s64)
302+
#undef ATOMIC_LOAD_CASE
303+
304+
rb_raise(rb_eArgError, "Unsupported type for atomic operations: %"PRIsVALUE, rb_id2str(type_identifier));
305+
return Qnil;
306+
}
307+
308+
static VALUE atomic_store_impl(VALUE self, ID type_identifier, size_t offset, VALUE value) {
309+
#define ATOMIC_STORE_CASE(name, impl_name) \
310+
if (type_identifier == RB_IO_BUFFER_DATA_TYPE_##name) { \
311+
return atomic_store_##impl_name(self, offset, value); \
312+
}
313+
314+
// Support both uppercase and lowercase (they map to same implementation - atomic ops work on raw bytes)
315+
ATOMIC_STORE_CASE(U8, u8)
316+
ATOMIC_STORE_CASE(S8, s8)
317+
ATOMIC_STORE_CASE(u16, u16)
318+
ATOMIC_STORE_CASE(s16, s16)
319+
ATOMIC_STORE_CASE(U16, u16)
320+
ATOMIC_STORE_CASE(S16, s16)
321+
ATOMIC_STORE_CASE(u32, u32)
322+
ATOMIC_STORE_CASE(s32, s32)
323+
ATOMIC_STORE_CASE(U32, u32)
324+
ATOMIC_STORE_CASE(S32, s32)
325+
ATOMIC_STORE_CASE(u64, u64)
326+
ATOMIC_STORE_CASE(s64, s64)
327+
ATOMIC_STORE_CASE(U64, u64)
328+
ATOMIC_STORE_CASE(S64, s64)
329+
#undef ATOMIC_STORE_CASE
330+
331+
rb_raise(rb_eArgError, "Unsupported type for atomic operations: %"PRIsVALUE, rb_id2str(type_identifier));
332+
return Qnil;
333+
}
334+
245335
static VALUE atomic_compare_and_swap_impl(VALUE self, ID type_identifier, size_t offset, VALUE expected_value, VALUE desired_value) {
246336
#define ATOMIC_CAS_CASE(name, impl_name) \
247337
if (type_identifier == RB_IO_BUFFER_DATA_TYPE_##name) { \
@@ -347,6 +437,20 @@ static VALUE atomic_xor(VALUE self, VALUE type_symbol, VALUE offset_value, VALUE
347437
return atomic_xor_impl(self, type_identifier, offset, value);
348438
}
349439

440+
static VALUE atomic_load_method(VALUE self, VALUE type_symbol, VALUE offset_value) {
441+
ID type_identifier = get_type_id(type_symbol);
442+
size_t offset = NUM2SIZET(offset_value);
443+
444+
return atomic_load_impl(self, type_identifier, offset);
445+
}
446+
447+
static VALUE atomic_store_method(VALUE self, VALUE type_symbol, VALUE offset_value, VALUE value) {
448+
ID type_identifier = get_type_id(type_symbol);
449+
size_t offset = NUM2SIZET(offset_value);
450+
451+
return atomic_store_impl(self, type_identifier, offset, value);
452+
}
453+
350454
static VALUE atomic_compare_and_swap(VALUE self, VALUE type_symbol, VALUE offset_value, VALUE expected_value, VALUE desired_value) {
351455
ID type_identifier = get_type_id(type_symbol);
352456
size_t offset = NUM2SIZET(offset_value);
@@ -371,7 +475,7 @@ void Init_IO_Buffer_Atomic(void) {
371475

372476
VALUE IO_Buffer = rb_const_get(rb_cIO, rb_intern("Buffer"));
373477

374-
// Initialize type IDs (matching Ruby's pattern)
478+
// Initialize type IDs (support both forms for compatibility, but atomic operations work on raw bytes in host endian)
375479
#define IO_BUFFER_DEFINE_DATA_TYPE(name) RB_IO_BUFFER_DATA_TYPE_##name = rb_intern_const(#name)
376480
IO_BUFFER_DEFINE_DATA_TYPE(U8);
377481
IO_BUFFER_DEFINE_DATA_TYPE(S8);
@@ -396,5 +500,7 @@ void Init_IO_Buffer_Atomic(void) {
396500
DEFINE_METHOD_IF_NOT_EXISTS(IO_Buffer, "atomic_and", atomic_and, 3);
397501
DEFINE_METHOD_IF_NOT_EXISTS(IO_Buffer, "atomic_or", atomic_or, 3);
398502
DEFINE_METHOD_IF_NOT_EXISTS(IO_Buffer, "atomic_xor", atomic_xor, 3);
503+
DEFINE_METHOD_IF_NOT_EXISTS(IO_Buffer, "atomic_load", atomic_load_method, 2);
504+
DEFINE_METHOD_IF_NOT_EXISTS(IO_Buffer, "atomic_store", atomic_store_method, 3);
399505
DEFINE_METHOD_IF_NOT_EXISTS(IO_Buffer, "atomic_compare_and_swap", atomic_compare_and_swap, 4);
400506
}

guides/getting-started/readme.md

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ $ bundle add io-buffer-atomic
1414

1515
`io-buffer-atomic` extends `IO::Buffer` with atomic operations that are safe for concurrent access across threads and processes. When multiple threads or processes share memory (via `IO::Buffer`), you need atomic operations to prevent race conditions and ensure data consistency.
1616

17+
**Important**: Atomic operations work on raw memory bytes in host endian (native byte order). Both `:u32` and `:U32` map to the same atomic operation since they operate on the same raw memory bytes. The endianness distinction only affects how Ruby's `get_value`/`set_value` interpret bytes, not atomic operations.
18+
1719
Use atomic operations when you need:
1820
- **Thread-safe counters**: Multiple threads updating shared counters without locks.
1921
- **Process-safe coordination**: Multiple processes coordinating via shared memory.
@@ -77,6 +79,38 @@ result = buffer.atomic_xor(:u32, 0, 0b1111)
7779
# => 0b0000
7880
```
7981

82+
### Atomic Load
83+
84+
Atomic load operations ensure you read a complete, consistent value from shared memory, preventing torn reads:
85+
86+
```ruby
87+
# Set a value:
88+
buffer.set_value(:u32, 0, 42)
89+
90+
# Atomically read the value (ensures no torn reads):
91+
result = buffer.atomic_load(:u32, 0)
92+
# => 42
93+
```
94+
95+
Use `atomic_load` instead of `get_value` when reading values that may be modified by other threads or processes, as it provides memory ordering guarantees and prevents data races.
96+
97+
### Atomic Store
98+
99+
Atomic store operations ensure you write a complete value atomically to shared memory:
100+
101+
```ruby
102+
# Atomically write a value:
103+
buffer.atomic_store(:u32, 0, 42)
104+
105+
# Read it back:
106+
result = buffer.atomic_load(:u32, 0)
107+
# => 42
108+
```
109+
110+
Use `atomic_store` instead of `set_value` when writing values that may be read by other threads or processes, as it provides memory ordering guarantees and prevents torn writes.
111+
112+
**Note**: `atomic_load` and `atomic_store` work on raw bytes in host endian, matching the atomic operations themselves. Both `:u32` and `:U32` are equivalent for atomic operations.
113+
80114
### Compare and Swap
81115

82116
Compare-and-swap operations enable lock-free algorithms and optimistic concurrency:
@@ -88,7 +122,7 @@ buffer.set_value(:u32, 0, 10)
88122
# Atomically swap if current value matches expected:
89123
swapped = buffer.atomic_compare_and_swap(:u32, 0, 10, 20)
90124
# => true
91-
buffer.get_value(:u32, 0)
125+
buffer.atomic_load(:u32, 0)
92126
# => 20
93127

94128
# If value doesn't match, swap fails:
@@ -98,6 +132,8 @@ swapped = buffer.atomic_compare_and_swap(:u32, 0, 10, 30)
98132

99133
## Supported Operations
100134

135+
- `atomic_load(type, offset)` - Atomically read a value (prevents torn reads).
136+
- `atomic_store(type, offset, value)` - Atomically write a value (prevents torn writes).
101137
- `atomic_increment(type, offset, value = 1)` - Atomically increment a value.
102138
- `atomic_decrement(type, offset, value = 1)` - Atomically decrement a value.
103139
- `atomic_add(type, offset, value)` - Atomically add a value.
@@ -106,8 +142,3 @@ swapped = buffer.atomic_compare_and_swap(:u32, 0, 10, 30)
106142
- `atomic_or(type, offset, value)` - Atomically perform bitwise OR.
107143
- `atomic_xor(type, offset, value)` - Atomically perform bitwise XOR.
108144
- `atomic_compare_and_swap(type, offset, expected, desired)` - Atomically compare and swap.
109-
110-
## Requirements
111-
112-
- Ruby \>= 3.2.6.
113-
- `IO::Buffer` support (available in Ruby 3.2+).

0 commit comments

Comments
 (0)