Skip to content

Commit

Permalink
Merge pull request #430 from knowsys/feature/421-aggregates-and-arith…
Browse files Browse the repository at this point in the history
…metics

Add ability to use arithmetic expressions and aggregates together
  • Loading branch information
rlwww authored Dec 4, 2023
2 parents 71296e2 + abc9e0b commit 9f06c83
Show file tree
Hide file tree
Showing 22 changed files with 260 additions and 95 deletions.
7 changes: 0 additions & 7 deletions nemo-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,6 @@ fn run(mut cli: CliApp) -> Result<(), Error> {
log::info!("Rules parsed");
log::trace!("{:?}", program);

for atom in program.rules().iter().flat_map(|rule| rule.head()) {
if atom.aggregates().next().is_some() {
log::warn!("Program is using the experimental aggregates feature and currently depends on the internally chosen variable orders for predicates.",);
break;
}
}

let parsed_facts = cli
.tracing
.traced_facts
Expand Down
7 changes: 5 additions & 2 deletions nemo-cli/tests/blackbox_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,18 @@ impl TestCase {
.lines()
.map(|s| s.to_string())
.collect::<Vec<_>>();
let mut expected_lines = read_to_string(expected_file)
let mut expected_lines = read_to_string(expected_file.clone())
.unwrap()
.trim()
.lines()
.map(|s| s.to_string())
.collect::<Vec<_>>();
output_lines.sort();
expected_lines.sort();
assert_eq!(output_lines, expected_lines);
assert_eq!(
output_lines, expected_lines,
"actual output does not match expected output from {expected_file:?}"
);
});
Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions nemo/src/execution/planning/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ pub(super) fn generate_node_aggregate(
let reordering =
ProjectReordering::from_vector(reordering_column_indices, variable_order.len());

node = current_plan.plan_mut().project(node, reordering);

current_plan.add_temporary_table(node.clone(), "Subtable Aggregate Reorder");
node = current_plan.plan_mut().project(node, reordering);

// Update variable order to reordering
variable_order = variable_order_after_reordering;
Expand Down
3 changes: 3 additions & 0 deletions nemo/src/execution/planning/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ pub(super) fn generate_node_arithmetic(
constructor.variable().clone(),
first_unused_index + constructor_index,
);

// Replace arithmetic instructions in term tree by the aggregate placeholder variables

constructor_instructions.push(AppendInstruction::Arithmetic(compile_termtree(
constructor.term(),
variable_order,
Expand Down
15 changes: 11 additions & 4 deletions nemo/src/execution/planning/plan_body_seminaive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use nemo_physical::management::execution_plan::ExecutionNodeRef;
use crate::{
execution::execution_engine::RuleInfo,
model::{
chase_model::{ChaseAggregate, ChaseRule, Constructor, AGGREGATE_VARIABLE_PREFIX},
chase_model::{variable::is_aggregate_variable, ChaseAggregate, ChaseRule, Constructor},
Variable,
},
program_analysis::{analysis::RuleAnalysis, variable_order::VariableOrder},
Expand Down Expand Up @@ -65,9 +65,9 @@ impl SeminaiveStrategy {
None
} else {
// Compute group-by variables for all aggregates in the rule
// This is the set of all universal variables in the head except for the aggregated variables
Some(analysis.head_variables.iter().filter(|variable| match variable {
Variable::Universal(identifier) => !identifier.0.starts_with(AGGREGATE_VARIABLE_PREFIX),
// This is the set of all universal variables in the head (before any arithmetic operations) except for the aggregated variables
Some(used_variables_before_arithmetic_operations.iter().filter(|variable| match variable {
Variable::Universal(_) => !is_aggregate_variable(variable),
Variable::Existential(_) => panic!("existential head variables are currently not supported together with aggregates"),
}).cloned().collect())
};
Expand Down Expand Up @@ -138,6 +138,13 @@ impl BodyStrategy for SeminaiveStrategy {
&self.aggregates,
aggregate_group_by_variables,
);

// This check can be removed when [`nemo_physical::tabular::operations::triescan_aggregate::TrieScanAggregateWrapper`] is removed
// Currently, this wrapper can only be turned into a partial trie scan using materialization
if !self.constructors.is_empty() {
current_plan
.add_temporary_table(node_seminaive.clone(), "Subtable Aggregate Arithmetics");
}
}

// Cut away layers not used after arithmetic operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct StrategyStratifiedNegation<SubStrategy: RuleSelectionStrategy> {
}

impl<SubStrategy: RuleSelectionStrategy> StrategyStratifiedNegation<SubStrategy> {
fn build_graph(rule_analyses: &Vec<&RuleAnalysis>) -> NegationGraph {
fn build_graph(rule_analyses: &[&RuleAnalysis]) -> NegationGraph {
let mut predicate_to_rules_body_positive = HashMap::<Identifier, Vec<usize>>::new();
let mut predicate_to_rules_body_negative = HashMap::<Identifier, Vec<usize>>::new();
let mut predicate_to_rules_head = HashMap::<Identifier, Vec<usize>>::new();
Expand Down
1 change: 1 addition & 0 deletions nemo/src/io/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,7 @@ impl<'a> RuleParser<'a> {
map_error(
alt((
self.parse_function_term(),
self.parse_aggregate(),
map(self.parse_primitive_term(), Term::Primitive),
self.parse_parenthesised_term(),
)),
Expand Down
3 changes: 0 additions & 3 deletions nemo/src/io/parser/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,6 @@ pub enum ParseError {
/// An aggregate term occurs in the body of a rule.
#[error(r#"An aggregate term ("{0}") occurs in the body of a rule"#)]
AggregateInBody(Aggregate),
/// An aggregate may not be used within a complex term.
#[error(r#"A term ("{0}") may not contain an aggregate as a subterm."#)]
AggregateSubterm(String),
/// Unknown aggregate operation
#[error(r#"Aggregate operation "{0}" is not known"#)]
UnknownAggregateOperation(String),
Expand Down
2 changes: 2 additions & 0 deletions nemo/src/model/chase_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ pub use atom::*;

mod constructor;
pub use constructor::*;

pub(crate) mod variable;
76 changes: 45 additions & 31 deletions nemo/src/model/chase_model/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@ use std::collections::{HashMap, HashSet};

use crate::{
error::Error,
model::{Constraint, Identifier, Literal, PrimitiveTerm, Rule, Term, Variable},
model::{
chase_model::variable::{AGGREGATE_VARIABLE_PREFIX, CONSTRUCT_VARIABLE_PREFIX},
Constraint, Identifier, Literal, PrimitiveTerm, Rule, Term, Variable,
},
};

use super::{ChaseAggregate, Constructor, PrimitiveAtom, VariableAtom};

/// Prefix used for generated aggregate variables in a [`ChaseRule`]
pub const AGGREGATE_VARIABLE_PREFIX: &str = "_AGGREGATE_";
/// Prefix used for generated variables encoding equality constraints in a [`ChaseRule`]
pub const EQUALITY_VARIABLE_PREFIX: &str = "_EQUALITY_";
/// Prefix used for generated variables for storing the value of complex terms in a [`ChaseRule`].
pub const CONSTRUCT_VARIABLE_PREFIX: &str = "_CONSTRUCT_";
use super::{
variable::EQUALITY_VARIABLE_PREFIX, ChaseAggregate, Constructor, PrimitiveAtom, VariableAtom,
};

/// Representation of a rule in a [`super::ChaseProgram`].
///
Expand Down Expand Up @@ -156,8 +154,14 @@ impl ChaseRule {
}

impl ChaseRule {
fn generate_variable_name(prefix: &str, counter: usize) -> Identifier {
Identifier(format!("{}{}", prefix, counter))
/// Increments `next_variable_id`, but returns it's old value with a prefix.
fn generate_incrementing_variable_name(
prefix: &str,
next_variable_id: &mut usize,
) -> Identifier {
let i = Identifier(format!("{}{}", prefix, next_variable_id));
*next_variable_id += 1;
i
}

// Remove constraints of the form ?X = ?Y from the rule
Expand Down Expand Up @@ -202,39 +206,48 @@ impl ChaseRule {
// New constraints that will be introduced due to the flattening of the atom
let mut positive_constraints = Vec::<Constraint>::new();

let mut global_term_index: usize = 0;
let mut rule_next_variable_id: usize = 0;

// Head atoms may only contain primitive terms
for atom in rule.head_mut() {
for term in atom.terms_mut() {
if !term.is_primitive() {
let new_variable = if let Term::Aggregation(aggregate) = term {
let new_variable = Variable::Universal(Self::generate_variable_name(
AGGREGATE_VARIABLE_PREFIX,
global_term_index,
));
// Replace aggregate terms or aggregates inside of arithmetic expressions with placeholder variables
term.update_subterms_recursively(&mut |subterm| match subterm {
Term::Aggregation(aggregate) => {
let new_variable =
Variable::Universal(Self::generate_incrementing_variable_name(
AGGREGATE_VARIABLE_PREFIX,
&mut rule_next_variable_id,
));

aggregates.push(ChaseAggregate::from_aggregate(
aggregate.clone(),
new_variable.clone(),
));

new_variable
} else {
let new_variable = Variable::Universal(Self::generate_variable_name(
*subterm = Term::Primitive(PrimitiveTerm::Variable(new_variable));

false
}
_ => true,
});

debug_assert!(
!matches!(term, Term::Aggregation(_)),
"Aggregate terms should have been replaced with placeholder variables"
);

if !term.is_primitive() {
let new_variable =
Variable::Universal(Self::generate_incrementing_variable_name(
CONSTRUCT_VARIABLE_PREFIX,
global_term_index,
&mut rule_next_variable_id,
));

constructors.push(Constructor::new(new_variable.clone(), term.clone()));

new_variable
};
constructors.push(Constructor::new(new_variable.clone(), term.clone()));

*term = Term::Primitive(PrimitiveTerm::Variable(new_variable));
}

global_term_index += 1;
}
}

Expand All @@ -247,7 +260,10 @@ impl ChaseRule {

for term in atom.terms_mut() {
let new_variable = Term::Primitive(PrimitiveTerm::Variable(Variable::Universal(
Self::generate_variable_name(EQUALITY_VARIABLE_PREFIX, global_term_index),
Self::generate_incrementing_variable_name(
EQUALITY_VARIABLE_PREFIX,
&mut rule_next_variable_id,
),
)));

if let Term::Primitive(PrimitiveTerm::Variable(variable)) = term.clone() {
Expand All @@ -266,8 +282,6 @@ impl ChaseRule {
}

*term = new_variable;

global_term_index += 1;
}
}

Expand Down
44 changes: 44 additions & 0 deletions nemo/src/model/chase_model/variable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use crate::model::{Identifier, Variable};

/// Prefix used for generated aggregate variables in a [`ChaseRule`]
pub(super) const AGGREGATE_VARIABLE_PREFIX: &str = "_AGGREGATE_";
/// Prefix used for generated variables encoding equality constraints in a [`ChaseRule`]
pub(super) const EQUALITY_VARIABLE_PREFIX: &str = "_EQUALITY_";
/// Prefix used for generated variables for storing the value of complex terms in a [`ChaseRule`].
pub(super) const CONSTRUCT_VARIABLE_PREFIX: &str = "_CONSTRUCT_";

fn is_aggregate_identifier(identifier: &Identifier) -> bool {
identifier.name().starts_with(AGGREGATE_VARIABLE_PREFIX)
}

/// Check if a variable is a aggregate placeholder variable, representing the output of an aggregate.
pub(crate) fn is_aggregate_variable(variable: &Variable) -> bool {
match variable {
Variable::Universal(identifier) => is_aggregate_identifier(identifier),
Variable::Existential(identifier) => {
debug_assert!(
!is_aggregate_identifier(identifier),
"aggregate variables must be universal variables"
);
false
}
}
}

fn is_construct_identifier(identifier: &Identifier) -> bool {
identifier.name().starts_with(CONSTRUCT_VARIABLE_PREFIX)
}

/// Check if a variable is a constructor variable
pub(crate) fn is_construct_variable(variable: &Variable) -> bool {
match variable {
Variable::Universal(identifier) => is_construct_identifier(identifier),
Variable::Existential(identifier) => {
debug_assert!(
!is_construct_identifier(identifier),
"construct variables must be universal variables"
);
false
}
}
}
7 changes: 0 additions & 7 deletions nemo/src/model/rule_model/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,6 @@ impl Rule {
}
}

// Check if aggregate is used within another complex term
for term in head.iter().flat_map(|a| a.terms()) {
if term.aggregate_subterm() {
return Err(ParseError::AggregateSubterm(term.to_string()));
}
}

Ok(Rule {
head,
body,
Expand Down
45 changes: 26 additions & 19 deletions nemo/src/model/rule_model/term.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,30 +398,37 @@ impl Term {
}
}

fn aggregate_subterm_recursive(term: &Term) -> bool {
match term {
Term::Primitive(_primitive) => false,
Term::Binary { lhs, rhs, .. } => {
Self::aggregate_subterm_recursive(lhs) || Self::aggregate_subterm(rhs)
fn subterms_mut(&mut self) -> Vec<&mut Term> {
match self {
Term::Primitive(_primitive) => Vec::new(),
Term::Binary {
ref mut lhs,
ref mut rhs,
..
} => {
vec![lhs, rhs]
}
Term::Unary(_, inner) => Self::aggregate_subterm_recursive(inner),
Term::Aggregation(_aggregate) => true,
Term::Function(_, subterms) => subterms.iter().any(Self::aggregate_subterm_recursive),
Term::Unary(_, ref mut inner) => vec![inner],
Term::Aggregation(_aggregate) => Vec::new(),
Term::Function(_, subterms) => subterms.iter_mut().collect(),
}
}

/// Checks if this term contains an aggregate as a sub term.
/// This is currently not allowed.
pub fn aggregate_subterm(&self) -> bool {
match self {
Term::Primitive(_primitive) => false,
Term::Binary { lhs, rhs, .. } => {
Self::aggregate_subterm_recursive(lhs) || Self::aggregate_subterm(rhs)
/// Mutate the term in place, calling the function `f` on itself and recursively on it's subterms if the function `f` returns true
///
/// This is used e.g. to rewrite aggregates inside of constructors with placeholder variables
pub fn update_subterms_recursively<F>(&mut self, f: &mut F)
where
F: FnMut(&mut Term) -> bool,
{
f(self);

for subterm in self.subterms_mut() {
let should_recurse = f(subterm);

if should_recurse {
subterm.update_subterms_recursively(f);
}
Term::Unary(_, inner) => Self::aggregate_subterm_recursive(inner),
// this is allowed, because the aggregate is on the top-level
Term::Aggregation(_aggregate) => false,
Term::Function(_, subterms) => subterms.iter().any(Self::aggregate_subterm_recursive),
}
}

Expand Down
Loading

0 comments on commit 9f06c83

Please sign in to comment.