Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
# - windows
toolchain:
- stable
- 1.79
- 1.86
name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }}
runs-on: ${{ matrix.os }}-latest
steps:
Expand Down
19 changes: 10 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ members = [
"differential-dataflow",
# "advent_of_code_2017",
"dogsdogsdogs",
"experiments",
"interactive",
"server",
"server/dataflows/degr_dist",
"server/dataflows/neighborhood",
"server/dataflows/random_graph",
"server/dataflows/reachability",
#"experiments",
#"interactive",
#"server",
#"server/dataflows/degr_dist",
#"server/dataflows/neighborhood",
#"server/dataflows/random_graph",
#"server/dataflows/reachability",
#"tpchlike",
"doop",
#"doop",
"mdbook",
]
resolver = "2"
Expand All @@ -21,8 +21,9 @@ edition = "2021"

[workspace.dependencies]
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.19.1" }
timely = { version = "0.26", default-features = false }
#timely = { version = "0.26", default-features = false }
columnar = { version = "0.11", default-features = false }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[profile.release]
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ Alternately, here is a fragment that computes the set of nodes reachable from a

```rust
let reachable =
roots.iterate(|reach|
edges.enter(&reach.scope())
roots.iterate(|scope, reach|
edges.enter(&scope)
.semijoin(reach)
.map(|(src, dst)| dst)
.concat(reach)
Expand Down Expand Up @@ -328,7 +328,7 @@ Here is a direct implementation, in which we repeatedly take determine the set o
let k = 5;

// iteratively thin edges.
edges.iterate(|inner| {
edges.iterate(|scope, inner| {

// determine the active vertices /-- this is a lie --\
let active = inner.flat_map(|(src,dst)| [src,dst].into_iter())
Expand All @@ -337,7 +337,7 @@ edges.iterate(|inner| {
.map(|(node,_)| node);

// keep edges between active vertices
edges.enter(&inner.scope())
edges.enter(&scope)
.semijoin(active)
.map(|(src,dst)| (dst,src))
.semijoin(active)
Expand Down
12 changes: 6 additions & 6 deletions advent_of_code_2017/src/bin/day_01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn main() {
let index = worker.index();
let peers = worker.peers();

let worker_input =
let worker_input =
input
.iter()
.map(|digit| digit - b'0')
Expand All @@ -28,17 +28,17 @@ fn main() {

let digits = scope.new_collection_from(worker_input).1;

// The two parts ask to line up elements with the next one in the sequence (part 1) and
// the one half way around the sequence (part 2). To find matches, we will shift the
// associated position field while keeping the part identifier as a field (`1` or `2`
// The two parts ask to line up elements with the next one in the sequence (part 1) and
// the one half way around the sequence (part 2). To find matches, we will shift the
// associated position field while keeping the part identifier as a field (`1` or `2`
// respectively). We then restrict both by the original `digits` to find matches.

let part1 = digits.map(move |(digit, position)| ((digit, (position + 1) % length), 1));
let part2 = digits.map(move |(digit, position)| ((digit, (position + length/2) % length), 2));

part1
.concat(&part2) // merge collections.
.semijoin(&digits) // restrict to matches.
.concat(part2) // merge collections.
.semijoin(digits) // restrict to matches.
.explode(|((digit, _pos), part)| Some((part, digit as isize))) // `part` with weight `digit`.
.consolidate() // consolidate weights by `part`.
.inspect(|elt| println!("part {} accumulation: {:?}", elt.0, elt.2)); // check out answers.
Expand Down
8 changes: 4 additions & 4 deletions advent_of_code_2017/src/bin/day_03.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ fn main() {
//
// The next odd square is arrived at by following four edges, each of which has length
// equal to the even number between the odd numbers. To determine the position of any
// given number, we can subtract the largest odd square from it and then subtract the
// given number, we can subtract the largest odd square from it and then subtract the
// multiples of the even length, at which point we have a distance along an edge, from
// which we can determine coordinates.
//
//
// Similarly, we can determine the sequence number from the coordinates, by determining
// which layer the point is in, adding the appropriate squared odd number, and then some
// cases to figure out where the point is in the sequence along that layer.
Expand All @@ -40,7 +40,7 @@ fn main() {

values
.map(|_val| ((0, 0), (1, 1)))
.iterate(|inner| {
.iterate(|_scope, inner| {

inner
.filter(move |&(_, (_, val))| val < input) // stop working when we've gotten enough
Expand Down Expand Up @@ -90,7 +90,7 @@ fn sequence(input: (isize, isize)) -> isize {
if -input.1 > layer { layer = -input.1; }

// input exists in layer from ((2 * layer - 1)^2, (2 * layer + 1)^2].
// note: excludes first number, includes last number.
// note: excludes first number, includes last number.

let base = (2 * layer - 1) * (2 * layer - 1);

Expand Down
22 changes: 11 additions & 11 deletions advent_of_code_2017/src/bin/day_05.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use differential_dataflow::operators::iterate::Variable;

fn main() {

let input =
let input =
"2
0
0
Expand Down Expand Up @@ -1051,7 +1051,7 @@ fn main() {
let index = worker.index();
let peers = worker.peers();

let worker_input =
let worker_input =
input
.split('\n')
.map(|phrase| phrase.parse::<isize>().unwrap())
Expand All @@ -1070,27 +1070,27 @@ fn main() {
let address = Variable::from(address.enter(nested));

// (addr, jump, steps)
let instruction = program.join(&address);
let instruction = program.join(address);

fn part1(jump: isize) -> isize { jump+1 }
// fn part2(jump: isize) -> isize { if jump >= 3 { jump-1 } else { jump+1 } }

let new_program =
let new_program =
instruction
.map(move |(addr, jump, _)| (addr, part1(jump)))
.concat(&instruction.map(|(addr, jump, _)| (addr, jump)).negate())
.concat(&program)
.concat(instruction.map(|(addr, jump, _)| (addr, jump)).negate())
.concat(program)
.consolidate();

let new_address =
let new_address =
instruction
.map(|(addr, jump, steps)| (addr + jump, steps + 1))
.concat(&instruction.map(|(addr, _, steps)| (addr, steps)).negate())
.concat(&address)
.concat(instruction.map(|(addr, _, steps)| (addr, steps)).negate())
.concat(address)
.consolidate();

program.set(&new_program);
address.set(&new_address)
program.set(new_program);
address.set(new_address)
.leave()
})
.consolidate()
Expand Down
18 changes: 9 additions & 9 deletions advent_of_code_2017/src/bin/day_06.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn main() {

timely::execute_from_args(std::env::args(), move |worker| {

let worker_input =
let worker_input =
input
.split_whitespace()
.map(|phrase| phrase.parse::<u8>().unwrap())
Expand All @@ -22,9 +22,9 @@ fn main() {

let banks = scope.new_collection_from(Some(worker_input)).1;

let stable = banks.iterate(|iter|
let stable = banks.iterate(|scope, iter|
iter.map_in_place(|banks| recycle(banks))
.concat(&banks.enter(&iter.scope()))
.concat(banks.enter(&scope))
.distinct()
);

Expand All @@ -36,14 +36,14 @@ fn main() {
// determine the repeated state by stepping all states and subtracting.
let loop_point = stable
.map_in_place(|banks| recycle(banks))
.concat(&stable.negate())
.concat(&banks);
.concat(stable.negate())
.concat(banks);

// restart iteration from known repeated element.
loop_point
.iterate(|iter|
loop_point
.iterate(|scope, iter|
iter.map_in_place(|banks| recycle(banks))
.concat(&loop_point.enter(&iter.scope()))
.concat(loop_point.enter(&scope))
.distinct()
)
.map(|_| ((),()))
Expand All @@ -67,5 +67,5 @@ fn recycle(banks: &mut [u8]) {
let banks_len = banks.len();
for i in 1 .. (redistribute + 1) {
banks[(max_idx + i) % banks_len] += 1;
}
}
}
12 changes: 6 additions & 6 deletions advent_of_code_2017/src/bin/day_07.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,23 +1095,23 @@ tvhftq (35)";

input.flat_map(|(_,_,links)| links)
.negate()
.concat(&input.map(|(name,_,_)| name))
.concat(input.map(|(name,_,_)| name))
.consolidate()
.inspect(|line| println!("part1: {:?}", line.0));

let weights = input.explode(|(name,weight,_)| Some((name, weight as isize)));
let parents = input.flat_map(|(name,_,links)| links.into_iter().map(move |link| (link,name.to_string())));

let total_weights: VecCollection<_,String> = weights
.iterate(|inner| {
parents.enter(&inner.scope())
.semijoin(&inner)
.iterate(|scope, inner| {
parents.enter(&scope)
.semijoin(inner)
.map(|(_, parent)| parent)
.concat(&weights.enter(&inner.scope()))
.concat(weights.enter(&scope))
});

parents
.semijoin(&total_weights)
.semijoin(total_weights)
.map(|(link,name)| (name,link))
.group(|key, input, output| {
if input.len() > 0 {
Expand Down
6 changes: 3 additions & 3 deletions advent_of_code_2017/src/bin/day_08.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1105,13 +1105,13 @@ wui inc -120 if i > -2038";
edits
.filter(|_| false)
.map(|_| ((0, String::new()), 0))
.iterate(|valid| {
.iterate(|scope, valid| {

let edits = edits.enter(&valid.scope());
let edits = edits.enter(&scope);

valid
.prefix_sum_at(edits.map(|(key,_)| key), 0, |_k,x,y| *x + *y)
.join(&edits)
.join(edits)
.filter(|&(_, sum, ((ref src_cmp, src_val), _))| match src_cmp.as_str() {
">" => sum > src_val,
">=" => sum >= src_val,
Expand Down
18 changes: 9 additions & 9 deletions advent_of_code_2017/src/bin/day_09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn main() {
let values = pp_broadcast(ranges, 0, [0, 1, 2, 3], |state, trans| trans[*state]);

// line up (position, symbol, state).
let symbols_state = input.join(&values);
let symbols_state = input.join(values);

// restrict the positions down to those that are neither '!' nor themselves cancelled.
let active = symbols_state.filter(|&(_, symbol, state)| symbol != '!' && (state == 0 || state == 1));
Expand All @@ -78,7 +78,7 @@ fn main() {
parens
.filter(|&(_pos, symbol, _)| symbol == '}')
.map(|(pos, symbol, _)| (pos, symbol))
.join(&values)
.join(values)
.explode(|(_pos, _sym, sum)| Some(((), sum)))
.consolidate()
.inspect(|x| println!("part1: {:?}", x.2));
Expand All @@ -105,7 +105,7 @@ where
let unit_ranges = collection.map(|(index, data)| ((index, 0), data));

unit_ranges
.iterate(|ranges|
.iterate(|scope, ranges|

// Each available range, of size less than usize::max_value(), advertises itself as the range
// twice as large, aligned to integer multiples of its size. Each range, which may contain at
Expand All @@ -120,7 +120,7 @@ where
if input.len() > 1 { result = combine(result, &(input[1].0).1); }
output.push((result, 1));
})
.concat(&unit_ranges.enter(&ranges.scope()))
.concat(unit_ranges.enter(&scope))
)
}

Expand All @@ -141,23 +141,23 @@ where
let zero_ranges =
ranges
.map(move |((pos, log),_)| ((pos, if log > 0 { log - 1 } else { 0 }), zero.clone()))
.antijoin(&ranges.map(|((pos, log),_)| (pos, log)));
.antijoin(ranges.map(|((pos, log),_)| (pos, log)));

let aggregates = ranges.concat(&zero_ranges);
let aggregates = ranges.concat(zero_ranges);

let init_state =
Some(((0, seed), Default::default(), 1))
.to_stream(&mut aggregates.scope())
.as_collection();

init_state
.iterate(|state| {
.iterate(|scope, state| {
aggregates
.filter(|&((_, log),_)| log < 64) // the log = 64 interval doesn't help us here (overflows).
.enter(&state.scope())
.enter(&scope)
.map(|((pos, log), data)| (pos, (log, data)))
.join_map(state, move |&pos, &(log, ref data), state| (pos + (1 << log), combine(state, data)))
.concat(&init_state.enter(&state.scope()))
.concat(init_state.enter(&scope))
.distinct()
})
.consolidate()
Expand Down
18 changes: 9 additions & 9 deletions advent_of_code_2017/src/bin/day_12.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2014,7 +2014,7 @@ fn main() {
let index = worker.index();
let peers = worker.peers();

let worker_input =
let worker_input =
input
.lines()
.enumerate()
Expand All @@ -2032,14 +2032,14 @@ fn main() {
let edges = scope.new_collection_from(worker_input).1;
let nodes = edges.map(|(src, _tgt)| (src, src)).distinct();

let labels =
let labels =
nodes
.iterate(|label| {
let edges = edges.enter(&label.scope());
let nodes = nodes.enter(&label.scope());
.iterate(|scope, label| {
let edges = edges.enter(&scope);
let nodes = nodes.enter(&scope);
label
.join_map(&edges, |_src, &lbl, &tgt| (tgt, lbl))
.concat(&nodes)
.join_map(edges, |_src, &lbl, &tgt| (tgt, lbl))
.concat(nodes)
.group(|_, input, output| output.push((*input[0].0, 1)))
})
.map(|(_src, lbl)| lbl);
Expand All @@ -2048,13 +2048,13 @@ fn main() {
.filter(|&lbl| lbl == 0)
.consolidate()
.inspect(|x| println!("part1: {:?}", x.2));

labels
.distinct()
.map(|_| ())
.consolidate()
.inspect(|x| println!("part2: {:?}", x.2));

});

}).unwrap();
Expand Down
Loading