diff --git a/openfisca_survey_manager/input_dataframe_generator.py b/openfisca_survey_manager/input_dataframe_generator.py index 7da2c2fd..abc80df0 100644 --- a/openfisca_survey_manager/input_dataframe_generator.py +++ b/openfisca_survey_manager/input_dataframe_generator.py @@ -85,23 +85,18 @@ def make_input_dataframe_by_entity(tax_benefit_system, nb_persons, nb_groups): def random_data_generator(tax_benefit_system, nb_persons, nb_groups, variable_generators_by_period, collection = None): - """Generate randomn values for some variables of a tax-benefit system and store them in a specified collection + """ + Generate randomn values for some variables of a tax-benefit system and store them in a specified collection. Args: - TaxBenefitSystem: tax_benefit_system: the tax_benefit_system to use - int: nb_persons: the number of persons in the system - int: nb_groups: the number of collective entities in the system - dict: variable_generators_by_period: the specification of the periods and values of the generated variables - string: collection: the collection storing the produced data - tax_benefit_system: - nb_persons: - nb_groups: - variable_generators_by_period: - collection: (Default value = None) + tax_benefit_system (TaxBenefitSystem): tax_benefit_system: the tax_benefit_system to use + nb_persons (int): the number of persons in the data + nb_groups (int): the number of collective entities in the data + variable_generators_by_period (dict): parameters of the varaibles for every period + collection (str, optional): collection where to store the input survey. Defaults to None. Returns: - A dictionnary of the entities tables by period - + dict: The entities tables by period """ initial_input_dataframe_by_entity = make_input_dataframe_by_entity(tax_benefit_system, nb_persons, nb_groups) table_by_entity_by_period = dict() diff --git a/openfisca_survey_manager/scenarios/abstract_scenario.py b/openfisca_survey_manager/scenarios/abstract_scenario.py index 274e7dc1..4f5cf36b 100644 --- a/openfisca_survey_manager/scenarios/abstract_scenario.py +++ b/openfisca_survey_manager/scenarios/abstract_scenario.py @@ -1,8 +1,5 @@ """Abstract survey scenario definition.""" -from typing import Dict, List - - import logging import os import numpy as np @@ -11,14 +8,11 @@ from openfisca_core import periods from openfisca_survey_manager.simulations import Simulation # noqa analysis:ignore -from openfisca_core.simulation_builder import SimulationBuilder -from openfisca_core.indexed_enums import Enum from openfisca_core.periods import MONTH, YEAR from openfisca_core.tools.simulation_dumper import dump_simulation, restore_simulation from openfisca_survey_manager.calibration import Calibration from openfisca_survey_manager import default_config_files_directory -from openfisca_survey_manager.survey_collections import SurveyCollection from openfisca_survey_manager.surveys import Survey log = logging.getLogger(__name__) @@ -238,8 +232,8 @@ def compute_marginal_tax_rate(self, target_variable, period, simulation = None, modified_simulation = self.simulations[f"_modified_{simulation_name}"] - variables = self.reference_tax_benefit_system.variables - assert target_variable in self.reference_tax_benefit_system.variables + variables = simulation.tax_benefit_system.variables + assert target_variable in variables variables_belong_to_same_entity = ( variables[varying_variable].entity.key == variables[target_variable].entity.key @@ -254,7 +248,7 @@ def compute_marginal_tax_rate(self, target_variable, period, simulation = None, else: target_variable_entity_key = variables[target_variable].entity.key - def cast_to_target_entity(simulation): + def cast_to_target_entity(simulation: Simulation): population = simulation.populations[target_variable_entity_key] df = (pd.DataFrame( { @@ -307,8 +301,6 @@ def compute_pivot_table(self, aggfunc = 'mean', columns = None, baseline_simulat pd.DataFrame: Pivot table """ - assert aggfunc in ['count', 'mean', 'sum'] - assert columns or index or values assert (not difference) or (baseline_simulation is not None), "Can't have difference when not baseline simulation" simulation = self.simulations[simulation] @@ -379,15 +371,12 @@ def create_data_frame_by_entity(self, variables = None, expressions = None, filt dict or pandas.DataFrame: Dictionnary of dataframes by entities or dataframe with all the computed variables """ - # TODO: remove this method ? if simulation is None: assert len(self.simulations.keys()) == 1 simulation = list(self.simulations.values())[0] else: simulation = self.simulations[simulation] - id_variable_by_entity_key = self.id_variable_by_entity_key - return simulation.create_data_frame_by_entity( variables = variables, expressions = expressions, @@ -395,7 +384,6 @@ def create_data_frame_by_entity(self, variables = None, expressions = None, filt index = index, period = period, merge = merge, - id_variable_by_entity_key = id_variable_by_entity_key, ) def custom_input_data_frame(self, input_data_frame, **kwargs): @@ -436,105 +424,6 @@ def dump_simulations(self, directory: str): simulation = list(self.simulations.values())[0] dump_simulation(simulation, directory) - def init_all_entities(self, tax_benefit_system, input_data_frame, builder, period = None): - assert period is not None - log.info('Initialasing simulation using input_data_frame for period {}'.format(period)) - - if period.unit == YEAR: # 1. year - simulation = self.init_simulation_with_data_frame( - tax_benefit_system, - input_data_frame = input_data_frame, - period = period, - builder = builder, - ) - elif period.unit == MONTH and period.size == 3: # 2. quarter - for offset in range(period.size): - period_item = period.first_month.offset(offset, MONTH) - simulation = self.init_simulation_with_data_frame( - tax_benefit_system, - input_data_frame = input_data_frame, - period = period_item, - builder = builder, - ) - elif period.unit == MONTH and period.size == 1: # 3. months - simulation = self.init_simulation_with_data_frame( - tax_benefit_system, - input_data_frame = input_data_frame, - period = period, - builder = builder, - ) - else: - raise ValueError("Invalid period {}".format(period)) - - return simulation - - def filter_input_variables(self, input_data_frame = None): - """Filter the input data frame from variables that won't be used or are set to be computed. - - Args: - input_data_frame: Input dataframe (Default value = None) - - Returns: - pd.DataFrame: filtered dataframe - - """ - assert input_data_frame is not None - id_variable_by_entity_key = self.id_variable_by_entity_key - role_variable_by_entity_key = self.role_variable_by_entity_key - used_as_input_variables = self.used_as_input_variables - - tax_benefit_system = self.reference_tax_benefit_system - variables = tax_benefit_system.variables - - id_variables = [ - id_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] - role_variables = [ - role_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] - - log.debug('Variable used_as_input_variables in filter: \n {}'.format(used_as_input_variables)) - - unknown_columns = [] - for column_name in input_data_frame: - if column_name in id_variables + role_variables: - continue - if column_name not in variables: - unknown_columns.append(column_name) - - input_data_frame.drop(unknown_columns, axis = 1, inplace = True) - - if unknown_columns: - log.debug('The following unknown columns {}, are dropped from input table'.format( - sorted(unknown_columns))) - - used_columns = [] - dropped_columns = [] - for column_name in input_data_frame: - if column_name in id_variables + role_variables: - continue - variable = variables[column_name] - # Keeping the calculated variables that are initialized by the input data - if variable.formulas: - if column_name in used_as_input_variables: - used_columns.append(column_name) - continue - - dropped_columns.append(column_name) - - input_data_frame.drop(dropped_columns, axis = 1, inplace = True) - - if used_columns: - log.debug( - 'These columns are not dropped because present in used_as_input_variables:\n {}'.format( - sorted(used_columns))) - if dropped_columns: - log.debug( - 'These columns in survey are set to be calculated, we drop them from the input table:\n {}'.format( - sorted(dropped_columns))) - - log.info('Keeping the following variables in the input_data_frame:\n {}'.format( - sorted(list(input_data_frame.columns)))) - return input_data_frame - def generate_performance_data(self, output_dir: str): if not self.trace: raise ValueError("Method generate_performance_data cannot be used if trace hasn't been activated.") @@ -556,30 +445,8 @@ def inflate(self, inflator_by_variable = None, period = None, target_by_variable self.inflator_by_variable = inflator_by_variable self.target_by_variable = target_by_variable - for simulation_name, simulation in self.simulations.items(): - tax_benefit_system = simulation.tax_benefit_system - for variable_name in set(inflator_by_variable.keys()).union(set(target_by_variable.keys())): - assert variable_name in tax_benefit_system.variables, \ - "Variable {} is not a valid variable of the tax-benefit system".format(variable_name) - if variable_name in target_by_variable: - inflator = inflator_by_variable[variable_name] = \ - target_by_variable[variable_name] / self.compute_aggregate( - variable = variable_name, simulation = simulation_name, period = period) - log.info('Using {} as inflator for {} to reach the target {} '.format( - inflator, variable_name, target_by_variable[variable_name])) - else: - assert variable_name in inflator_by_variable, 'variable_name is not in inflator_by_variable' - log.info('Using inflator {} for {}. The target is thus {}'.format( - inflator_by_variable[variable_name], - variable_name, inflator_by_variable[variable_name] * self.compute_aggregate( - variable = variable_name, simulation = simulation_name, period = period) - )) - inflator = inflator_by_variable[variable_name] - - array = simulation.calculate_add(variable_name, period = period) - assert array is not None - simulation.delete_arrays(variable_name, period = period) # delete existing arrays - simulation.set_input(variable_name, period, inflator * array) # insert inflated array + for _, simulation in self.simulations.items(): + simulation.inflate(inflator_by_variable, period, target_by_variable) def init_from_data(self, calibration_kwargs = None, inflation_kwargs = None, rebuild_input_data = False, rebuild_kwargs = None, data = None, memory_config = None, use_marginal_tax_rate = False, @@ -600,10 +467,6 @@ def init_from_data(self, calibration_kwargs = None, inflation_kwargs = None, if data is not None: data_year = data.get("data_year", self.period) - self._set_id_variable_by_entity_key() - self._set_role_variable_by_entity_key() - self._set_used_as_input_variables_by_entity() - # When ``True`` it'll assume it is raw data and do all that described supra. # When ``False``, it'll assume data is ready for consumption. if rebuild_input_data: @@ -616,7 +479,8 @@ def init_from_data(self, calibration_kwargs = None, inflation_kwargs = None, trace = self.trace if use_marginal_tax_rate: - assert self.varying_variable in self.reference_tax_benefit_system.variables + for name, tax_benefit_system in self.tax_benefit_systems.items(): + assert self.varying_variable in tax_benefit_system.variables, f"Variable {self.varying_variable} is not present tax benefit system named {name}" # Inverting reform and baseline because we are more likely # to use baseline input in reform than the other way around @@ -639,287 +503,49 @@ def init_from_data(self, calibration_kwargs = None, inflation_kwargs = None, if inflation_kwargs: self.inflate(**inflation_kwargs) - def init_entity_structure(self, tax_benefit_system, entity, input_data_frame, builder): - """Initialize sthe simulation with tax_benefit_system entities and input_data_frame. - - Args: - tax_benefit_system(TaxBenfitSystem): The TaxBenefitSystem to get the structure from - entity(Entity): The entity to initialize structure - input_data_frame(pd.DataFrame): The input - builder(Builder): The builder - - """ - id_variables = [ - self.id_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] - role_variables = [ - self.role_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] - - if entity.is_person: - for id_variable in id_variables + role_variables: - assert id_variable in input_data_frame.columns, \ - "Variable {} is not present in input dataframe".format(id_variable) - - input_data_frame = self.filter_input_variables(input_data_frame = input_data_frame) - - ids = range(len(input_data_frame)) - if entity.is_person: - builder.declare_person_entity(entity.key, ids) - for group_entity in tax_benefit_system.group_entities: - _key = group_entity.key - _id_variable = self.id_variable_by_entity_key[_key] - _role_variable = self.role_variable_by_entity_key[_key] - group_population = builder.declare_entity(_key, input_data_frame[_id_variable].drop_duplicates().sort_values().values) - builder.join_with_persons( - group_population, - input_data_frame[_id_variable].astype('int').values, - input_data_frame[_role_variable].astype('int').values, - ) - - def init_entity_data(self, entity, input_data_frame, period, simulation): - used_as_input_variables = self.used_as_input_variables_by_entity[entity.key] - diagnose_variable_mismatch(used_as_input_variables, input_data_frame) - input_data_frame = self.filter_input_variables(input_data_frame = input_data_frame) - - for column_name, column_serie in input_data_frame.items(): - variable_instance = self.reference_tax_benefit_system.variables.get(column_name) - if variable_instance is None: - log.info(f"Ignoring {column_name} in input data") - continue - - if variable_instance.entity.key != entity.key: - log.info("Ignoring variable {} which is not part of entity {} but {}".format( - column_name, entity.key, variable_instance.entity.key)) - continue - init_variable_in_entity(simulation, entity.key, column_name, column_serie, period) - - def init_simulation_with_data_frame(self, tax_benefit_system, input_data_frame, period, builder): - """Initialize the simulation period with current input_data_frame for an entity if specified.""" - used_as_input_variables = self.used_as_input_variables - id_variable_by_entity_key = self.id_variable_by_entity_key - role_variable_by_entity_key = self.role_variable_by_entity_key - - diagnose_variable_mismatch(used_as_input_variables, input_data_frame) - - id_variables = [ - id_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] - role_variables = [ - role_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] - - for id_variable in id_variables + role_variables: - assert id_variable in input_data_frame.columns, \ - "Variable {} is not present in input dataframe".format(id_variable) - - input_data_frame = self.filter_input_variables(input_data_frame = input_data_frame) - - index_by_entity_key = dict() - - for entity in tax_benefit_system.entities: - self.init_entity_structure(tax_benefit_system, entity, input_data_frame, builder) - - if entity.is_person: - continue - - else: - index_by_entity_key[entity.key] = input_data_frame.loc[ - input_data_frame[role_variable_by_entity_key[entity.key]] == 0, - id_variable_by_entity_key[entity.key] - ].sort_values().index - - for column_name, column_serie in input_data_frame.items(): - if role_variable_by_entity_key is not None: - if column_name in role_variable_by_entity_key.values(): - continue - - if id_variable_by_entity_key is not None: - if column_name in id_variable_by_entity_key.values(): - continue - - simulation = builder.build(tax_benefit_system) - entity = tax_benefit_system.variables[column_name].entity - if entity.is_person: - init_variable_in_entity(simulation, entity.key, column_name, column_serie, period) - else: - init_variable_in_entity(simulation, entity.key, column_name, column_serie[index_by_entity_key[entity.key]], period) - - return simulation - def new_simulation(self, simulation_name, debug = False, trace = False, data = None, memory_config = None, marginal_tax_rate_only = False): tax_benefit_system = self.tax_benefit_systems[simulation_name] assert tax_benefit_system is not None period = periods.period(self.period) - # TODO create a class mathod for simulation to init a simulation - simulation = self.new_simulation_from_tax_benefit_system( + if 'custom_initialize' in dir(self): + custom_initialize = ( + None + if marginal_tax_rate_only + else self.custom_initialize + ) + else: + custom_initialize = None + + data["used_as_input_variables"] = self.used_as_input_variables + data["collection"] = self.collection + + simulation = Simulation.new_from_tax_benefit_system( tax_benefit_system = tax_benefit_system, debug = debug, trace = trace, data = data, memory_config = memory_config, period = period, - skip_custom_initialize = marginal_tax_rate_only, # Done after applying modifcation below + custom_initialize = custom_initialize, ) if marginal_tax_rate_only: self._apply_modification(simulation, period) - if 'custom_initialize' in dir(self): - self.custom_initialize(simulation) - + if custom_initialize: + custom_initialize(simulation) self.simulations[f"_modified_{simulation_name}"] = simulation - return simulation - - self.simulations[simulation_name] = simulation - - return simulation - - def new_simulation_from_tax_benefit_system(self, tax_benefit_system = None, debug = False, trace = False, data = None, memory_config = None, period = None, skip_custom_initialize = False): - assert tax_benefit_system is not None - self.neutralize_variables(tax_benefit_system) - # - simulation = self.init_simulation(tax_benefit_system, period, data) - simulation.debug = debug - simulation.trace = trace - simulation.opt_out_cache = True if self.cache_blacklist is not None else False - simulation.memory_config = memory_config - - if (not skip_custom_initialize): - if 'custom_initialize' in dir(self): - self.custom_initialize(simulation) - - return simulation - - def init_simulation(self, tax_benefit_system, period, data): - builder = SimulationBuilder() - builder.create_entities(tax_benefit_system) - - data_year = data.get("data_year", self.period) - survey = data.get('survey') - - default_source_types = [ - 'input_data_frame', - 'input_data_table', - 'input_data_frame_by_entity', - 'input_data_frame_by_entity_by_period', - 'input_data_table_by_entity_by_period', - 'input_data_table_by_period', - ] - source_types = [ - source_type_ - for source_type_ in default_source_types - if data.get(source_type_, None) is not None - ] - assert len(source_types) < 2, "There are too many data source types" - assert len(source_types) >= 1, "There should be one data source type included in {}".format( - default_source_types) - source_type = source_types[0] - source = data[source_type] - - if source_type == 'input_data_frame_by_entity': - assert data_year is not None - source_type = 'input_data_frame_by_entity_by_period' - source = {periods.period(data_year): source} - - input_data_survey_prefix = data.get("input_data_survey_prefix") if data is not None else None - - if source_type == 'input_data_frame': - simulation = self.init_all_entities(tax_benefit_system, source, builder, period) - - if source_type == 'input_data_table': - # Case 1: fill simulation with a unique input_data_frame given by the attribute - if input_data_survey_prefix is not None: - openfisca_survey_collection = SurveyCollection.load(collection = self.collection) - openfisca_survey = openfisca_survey_collection.get_survey("{}_{}".format( - input_data_survey_prefix, data_year)) - input_data_frame = openfisca_survey.get_values(table = "input").reset_index(drop = True) - else: - NotImplementedError - - self.custom_input_data_frame(input_data_frame, period = period) - simulation = self.init_all_entities(tax_benefit_system, input_data_frame, builder, period) # monolithic dataframes - - elif source_type == 'input_data_table_by_period': - # Case 2: fill simulation with input_data_frame by period containing all entity variables - for period, table in self.input_data_table_by_period.items(): - period = periods.period(period) - log.debug('From survey {} loading table {}'.format(survey, table)) - input_data_frame = self.load_table(survey = survey, table = table) - self.custom_input_data_frame(input_data_frame, period = period) - simulation = self.init_all_entities(tax_benefit_system, input_data_frame, builder, period) # monolithic dataframes - - elif source_type == 'input_data_frame_by_entity_by_period': - for period, input_data_frame_by_entity in source.items(): - period = periods.period(period) - for entity in tax_benefit_system.entities: - input_data_frame = input_data_frame_by_entity.get(entity.key) - if input_data_frame is None: - continue - self.custom_input_data_frame(input_data_frame, period = period, entity = entity.key) - self.init_entity_structure(tax_benefit_system, entity, input_data_frame, builder) - - simulation = builder.build(tax_benefit_system) - for period, input_data_frame_by_entity in source.items(): - for entity in tax_benefit_system.entities: - input_data_frame = input_data_frame_by_entity.get(entity.key) - if input_data_frame is None: - log.debug("No input_data_frame found for entity {} at period {}".format(entity, period)) - continue - self.custom_input_data_frame(input_data_frame, period = period, entity = entity.key) - self.init_entity_data(entity, input_data_frame, period, simulation) - - elif source_type == 'input_data_table_by_entity_by_period': - # Case 3: fill simulation with input_data_table by entity_by_period containing a dictionnary - # of all periods containing a dictionnary of entity variables - input_data_table_by_entity_by_period = source - simulation = None - for period, input_data_table_by_entity in input_data_table_by_entity_by_period.items(): - period = periods.period(period) - - if simulation is None: - for entity in tax_benefit_system.entities: - table = input_data_table_by_entity.get(entity.key) - if table is None: - continue - if survey is not None: - input_data_frame = self.load_table(survey = survey, table = table) - else: - input_data_frame = self.load_table(survey = 'input', table = table) - self.custom_input_data_frame(input_data_frame, period = period, entity = entity.key) - self.init_entity_structure(tax_benefit_system, entity, input_data_frame, builder) - - simulation = builder.build(tax_benefit_system) - - for entity in tax_benefit_system.entities: - table = input_data_table_by_entity.get(entity.key) - if table is None: - continue - if survey is not None: - input_data_frame = self.load_table(survey = survey, table = table) - else: - input_data_frame = self.load_table(survey = 'input', table = table) - self.custom_input_data_frame(input_data_frame, period = period, entity = entity.key) - self.init_entity_data(entity, input_data_frame, period, simulation) else: - pass + self.simulations[simulation_name] = simulation + + simulation.weight_variable_by_entity = self.weight_variable_by_entity if self.period is not None: simulation.period = periods.period(self.period) - simulation.weight_variable_by_entity = self.weight_variable_by_entity - return simulation - def load_table(self, variables = None, collection = None, survey = None, - table = None): - collection = collection or self.collection - survey_collection = SurveyCollection.load(collection = self.collection, config_files_directory=self.config_files_directory) - if survey is not None: - survey = survey - else: - survey = "{}_{}".format(self.input_data_survey_prefix, str(self.period)) - survey_ = survey_collection.get_survey(survey) - log.debug("Loading table {} in survey {} from collection {}".format(table, survey, collection)) - return survey_.get_values(table = table, variables = variables) - def memory_usage(self): for simulation_name, simulation in self.simulations.items(): print(f"simulation : {simulation_name}") # noqa analysis:ignore @@ -958,12 +584,16 @@ def restore_simulations(self, directory, **kwargs): self.simulations = dict() if use_sub_directories: for simulation_name, tax_benefit_system in self.tax_benefit_systems.items(): - self.simulations[simulation_name] = restore_simulation( + simulation = restore_simulation( os.path.join(directory, simulation_name), tax_benefit_system, **kwargs) + simulation.id_variable_by_entity_key = self.id_variable_by_entity_key + self.simulations[simulation_name] = simulation else: - self.simulations["unique_simulation"] = restore_simulation(directory, self.reference_tax_benefit_system, **kwargs) + simulation = restore_simulation(directory, list(self.tax_benefit_systems.values())[0], **kwargs) + simulation.id_variable_by_entity_key = self.id_variable_by_entity_key + self.simulations["unique_simulation"] = simulation def set_input_data_frame(self, input_data_frame): """Set the input dataframe. @@ -1050,132 +680,3 @@ def set_variable(varying_variable, varying_variable_value, period_): set_variable(varying_variable, varying_variable_value / 12, period_) else: ValueError() - - def _set_id_variable_by_entity_key(self) -> Dict[str, str]: - """Identify and sets the correct ids for the different entities.""" - if self.id_variable_by_entity_key is None: - log.debug("Use default id_variable names") - self.id_variable_by_entity_key = dict( - (entity.key, entity.key + '_id') for entity in self.reference_tax_benefit_system.entities) - - return self.id_variable_by_entity_key - - def _set_role_variable_by_entity_key(self) -> Dict[str, str]: - """Identify and sets the correct roles for the different entities.""" - if self.role_variable_by_entity_key is None: - self.role_variable_by_entity_key = dict( - (entity.key, entity.key + '_role_index') for entity in self.reference_tax_benefit_system.entities) - - return self.role_variable_by_entity_key - - def _set_used_as_input_variables_by_entity(self) -> Dict[str, List[str]]: - """Identify and sets the correct input variables for the different entities.""" - if self.used_as_input_variables_by_entity is not None: - return - - tax_benefit_system = self.reference_tax_benefit_system - - assert set(self.used_as_input_variables) <= set(tax_benefit_system.variables.keys()), \ - "Some variables used as input variables are not part of the tax benefit system:\n {}".format( - set(self.used_as_input_variables).difference(set(tax_benefit_system.variables.keys())) - ) - - self.used_as_input_variables_by_entity = dict() - - for entity in tax_benefit_system.entities: - self.used_as_input_variables_by_entity[entity.key] = [ - variable - for variable in self.used_as_input_variables - if tax_benefit_system.get_variable(variable).entity.key == entity.key - ] - - return self.used_as_input_variables_by_entity - - -# Helpers - -def init_variable_in_entity(simulation, entity, variable_name, series, period): - variable = simulation.tax_benefit_system.variables[variable_name] - - # np.issubdtype cannot handles categorical variables - if (not pd.api.types.is_categorical_dtype(series)) and np.issubdtype(series.values.dtype, np.floating): - if series.isnull().any(): - log.debug('There are {} NaN values for {} non NaN values in variable {}'.format( - series.isnull().sum(), series.notnull().sum(), variable_name)) - log.debug('We convert these NaN values of variable {} to {} its default value'.format( - variable_name, variable.default_value)) - series.fillna(variable.default_value, inplace = True) - assert series.notnull().all(), \ - 'There are {} NaN values for {} non NaN values in variable {}'.format( - series.isnull().sum(), series.notnull().sum(), variable_name) - - enum_variable_imputed_as_enum = ( - variable.value_type == Enum - and ( - pd.api.types.is_categorical_dtype(series) - or not ( - np.issubdtype(series.values.dtype, np.integer) - or np.issubdtype(series.values.dtype, float) - ) - ) - ) - - if enum_variable_imputed_as_enum: - if series.isnull().any(): - log.debug('There are {} NaN values ({}% of the array) in variable {}'.format( - series.isnull().sum(), series.isnull().mean() * 100, variable_name)) - log.debug('We convert these NaN values of variable {} to {} its default value'.format( - variable_name, variable.default_value._name_)) - series.fillna(variable.default_value._name_, inplace = True) - possible_values = variable.possible_values - index_by_category = dict(zip( - possible_values._member_names_, - range(len(possible_values._member_names_)) - )) - series.replace(index_by_category, inplace = True) - - if series.values.dtype != variable.dtype: - log.debug( - 'Converting {} from dtype {} to {}'.format( - variable_name, series.values.dtype, variable.dtype) - ) - - array = series.values.astype(variable.dtype) - # TODO is the next line needed ? - # Might be due to values returning also ndarray like objects - # for instance for categories or - np_array = np.array(array, dtype = variable.dtype) - if variable.definition_period == YEAR and period.unit == MONTH: - # Some variables defined for a year are present in month/quarter dataframes - # Cleaning the dataframe would probably be better in the long run - log.warn('Trying to set a monthly value for variable {}, which is defined on a year. The montly values you provided will be summed.' - .format(variable_name).encode('utf-8')) - - if simulation.get_array(variable_name, period.this_year) is not None: - array_sum = simulation.get_array(variable_name, period.this_year) + np_array - simulation.set_input(variable_name, period.this_year, array_sum) - else: - simulation.set_input(variable_name, period.this_year, np_array) - - else: - simulation.set_input(variable_name, period, np_array) - - -def diagnose_variable_mismatch(used_as_input_variables, input_data_frame): - """Diagnose variables mismatch. - - Args: - used_as_input_variables(lsit): List of variable to test presence - input_data_frame: DataFrame in which to test variables presence - - """ - variables_mismatch = set(used_as_input_variables).difference(set(input_data_frame.columns)) if used_as_input_variables else None - if variables_mismatch: - log.info( - 'The following variables are used as input variables are not present in the input data frame: \n {}'.format( - sorted(variables_mismatch))) - if variables_mismatch: - log.debug('The following variables are used as input variables: \n {}'.format( - sorted(used_as_input_variables))) - log.debug('The input_data_frame contains the following variables: \n {}'.format( - sorted(list(input_data_frame.columns)))) diff --git a/openfisca_survey_manager/scenarios/reform_scenario.py b/openfisca_survey_manager/scenarios/reform_scenario.py index 77f23c42..82fe1bac 100644 --- a/openfisca_survey_manager/scenarios/reform_scenario.py +++ b/openfisca_survey_manager/scenarios/reform_scenario.py @@ -5,7 +5,7 @@ import pandas as pd -from openfisca_survey_manager.scenarios.abstract_scenario import AbstractSurveyScenario, diagnose_variable_mismatch, init_variable_in_entity +from openfisca_survey_manager.scenarios.abstract_scenario import AbstractSurveyScenario # from openfisca_core import periods # from openfisca_survey_manager.simulations import Simulation # noqa analysis:ignore # from openfisca_core.simulation_builder import SimulationBuilder @@ -69,45 +69,6 @@ def calculate_variable(self, variable, period = None, use_baseline = False): return simulation.adaptative_calculate_variable(variable, period = period) - # def calibrate(self, period: int = None, target_margins_by_variable: dict = None, parameters: dict = None, target_entity_count: float = None): - # """Calibrate the scenario data. - - # Args: - # period (int, optionnal): Period of calibration. Defaults to scenario.year - # target_margins_by_variable (dict, optional): Variable targets margins. Defaults to None. - # parameters (dict, optional): Calibration parameters. Defaults to None. - # total_population (float, optional): Total population target. Defaults to None. - # """ - # survey_scenario = self - - # if period is None: - # assert survey_scenario.period is not None - # period = survey_scenario.period - - # if parameters is not None: - # assert parameters['method'] in ['linear', 'raking ratio', 'logit'], \ - # "Incorect parameter value: method should be 'linear', 'raking ratio' or 'logit'" - # if parameters['method'] == 'logit': - # assert parameters['invlo'] is not None - # assert parameters['up'] is not None - # else: - # parameters = dict(method = 'logit', up = 3, invlo = 3) - - # # TODO: filtering using filtering_variable_by_entity - # for simulation in [survey_scenario.simulation, survey_scenario.baseline_simulation]: - # if simulation is None: - # continue - # calibration = Calibration( - # simulation, - # target_margins_by_variable, - # period, - # target_entity_count = target_entity_count, - # parameters = parameters, - # # filter_by = self.filter_by, - # ) - # calibration.calibrate(inplace = True) - # simulation.calibration = calibration - def compute_aggregate(self, variable = None, aggfunc = 'sum', filter_by = None, period = None, use_baseline = False, difference = False, missing_variable_default_value = np.nan, weighted = True, alternative_weights = None): """Compute variable aggregate. @@ -376,276 +337,3 @@ def create_data_frame_by_entity(self, variables = None, expressions = None, filt merge = merge, id_variable_by_entity_key = id_variable_by_entity_key, ) - - def custom_input_data_frame(self, input_data_frame, **kwargs): - """Customize input data frame. - - Args: - input_data_frame: original input data frame. - kwargs: keyword arguments. - """ - pass - - def inflate(self, inflator_by_variable = None, period = None, target_by_variable = None): - assert inflator_by_variable or target_by_variable - assert period is not None - inflator_by_variable = dict() if inflator_by_variable is None else inflator_by_variable - target_by_variable = dict() if target_by_variable is None else target_by_variable - self.inflator_by_variable = inflator_by_variable - self.target_by_variable = target_by_variable - - assert self.simulation is not None - for use_baseline in [False, True]: - if use_baseline: - simulation = self.baseline_simulation - else: - assert self.simulation is not None - simulation = self.simulation - if (self.simulation == self.baseline_simulation): # Avoid inflating two times - continue - - if simulation is None: - continue - - tax_benefit_system = self.tax_benefit_system - for variable_name in set(inflator_by_variable.keys()).union(set(target_by_variable.keys())): - assert variable_name in tax_benefit_system.variables, \ - "Variable {} is not a valid variable of the tax-benefit system".format(variable_name) - if variable_name in target_by_variable: - inflator = inflator_by_variable[variable_name] = \ - target_by_variable[variable_name] / self.compute_aggregate( - variable = variable_name, use_baseline = use_baseline, period = period) - log.info('Using {} as inflator for {} to reach the target {} '.format( - inflator, variable_name, target_by_variable[variable_name])) - else: - assert variable_name in inflator_by_variable, 'variable_name is not in inflator_by_variable' - log.info('Using inflator {} for {}. The target is thus {}'.format( - inflator_by_variable[variable_name], - variable_name, inflator_by_variable[variable_name] * self.compute_aggregate( - variable = variable_name, use_baseline = use_baseline, period = period) - )) - inflator = inflator_by_variable[variable_name] - - array = simulation.calculate_add(variable_name, period = period) - assert array is not None - simulation.delete_arrays(variable_name, period = period) # delete existing arrays - simulation.set_input(variable_name, period, inflator * array) # insert inflated array - - def init_from_data(self, calibration_kwargs = None, inflation_kwargs = None, - rebuild_input_data = False, rebuild_kwargs = None, data = None, memory_config = None, use_marginal_tax_rate = False, - config_files_directory = default_config_files_directory): - """Initialise a survey scenario from data. - - Args: - rebuild_input_data(bool): Whether or not to clean, format and save data. Take a look at :func:`build_input_data` - data(dict): Contains the data, or metadata needed to know where to find it. - use_marginal_tax_rate(bool): True to go into marginal effective tax rate computation mode. - calibration_kwargs(dict): Calibration options (Default value = None) - inflation_kwargs(dict): Inflations options (Default value = None) - rebuild_input_data(bool): Whether to rebuild the data (Default value = False) - rebuild_kwargs: Rebuild options (Default value = None) - config_files_directory: Directory where to find the configuration files (Default value = default_config_files_directory) - """ - # When not ``None``, it'll try to get the data for *period*. - if data is not None: - data_year = data.get("data_year", self.period) - - self._set_id_variable_by_entity_key() - self._set_role_variable_by_entity_key() - self._set_used_as_input_variables_by_entity() - - # When ``True`` it'll assume it is raw data and do all that described supra. - # When ``False``, it'll assume data is ready for consumption. - if rebuild_input_data: - if rebuild_kwargs is not None: - self.build_input_data(year = data_year, **rebuild_kwargs) - else: - self.build_input_data(year = data_year) - - debug = self.debug - trace = self.trace - - if use_marginal_tax_rate: - assert self.varying_variable in self.tax_benefit_system.variables - - # Inverting reform and baseline because we are more likely - # to use baseline input in reform than the other way around - if self.baseline_tax_benefit_system is not None: - self.new_simulation(debug = debug, data = data, trace = trace, memory_config = memory_config, - use_baseline = True) - if use_marginal_tax_rate: - self.new_simulation(debug = debug, data = data, trace = trace, memory_config = memory_config, use_baseline = True, - marginal_tax_rate_only = True) - - # Note that I can pass a :class:`pd.DataFrame` directly, if I don't want to rebuild the data. - self.new_simulation(debug = debug, data = data, trace = trace, memory_config = memory_config) - - if use_marginal_tax_rate: - self.new_simulation(debug = debug, data = data, trace = trace, memory_config = memory_config, marginal_tax_rate_only = True) - - self.set_weight_variable_by_entity() - - if calibration_kwargs is not None: - assert set(calibration_kwargs.keys()).issubset(set( - ['target_margins_by_variable', 'parameters', 'total_population'])) - - if inflation_kwargs is not None: - assert set(inflation_kwargs.keys()).issubset(set(['inflator_by_variable', 'target_by_variable', 'period'])) - - if calibration_kwargs: - self.calibrate(**calibration_kwargs) - - if inflation_kwargs: - self.inflate(**inflation_kwargs) - - def init_entity_structure(self, tax_benefit_system, entity, input_data_frame, builder): - """Initialize sthe simulation with tax_benefit_system entities and input_data_frame. - - Args: - tax_benefit_system(TaxBenfitSystem): The TaxBenefitSystem to get the structure from - entity(Entity): The entity to initialize structure - input_data_frame(pd.DataFrame): The input - builder(Builder): The builder - - """ - id_variables = [ - self.id_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] - role_variables = [ - self.role_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] - - if entity.is_person: - for id_variable in id_variables + role_variables: - assert id_variable in input_data_frame.columns, \ - "Variable {} is not present in input dataframe".format(id_variable) - - input_data_frame = self.filter_input_variables(input_data_frame = input_data_frame) - - ids = range(len(input_data_frame)) - if entity.is_person: - builder.declare_person_entity(entity.key, ids) - for group_entity in tax_benefit_system.group_entities: - _key = group_entity.key - _id_variable = self.id_variable_by_entity_key[_key] - _role_variable = self.role_variable_by_entity_key[_key] - group_population = builder.declare_entity(_key, input_data_frame[_id_variable].drop_duplicates().sort_values().values) - builder.join_with_persons( - group_population, - input_data_frame[_id_variable].astype('int').values, - input_data_frame[_role_variable].astype('int').values, - ) - - def init_entity_data(self, entity, input_data_frame, period, simulation): - used_as_input_variables = self.used_as_input_variables_by_entity[entity.key] - diagnose_variable_mismatch(used_as_input_variables, input_data_frame) - input_data_frame = self.filter_input_variables(input_data_frame = input_data_frame) - - for column_name, column_serie in input_data_frame.items(): - variable_instance = self.tax_benefit_system.variables.get(column_name) - if variable_instance is None: - continue - - if variable_instance.entity.key != entity.key: - log.info("Ignoring variable {} which is not part of entity {} but {}".format( - column_name, entity.key, variable_instance.entity.key)) - continue - init_variable_in_entity(simulation, entity.key, column_name, column_serie, period) - - def init_simulation_with_data_frame(self, tax_benefit_system, input_data_frame, period, builder): - """Initialize the simulation period with current input_data_frame for an entity if specified.""" - used_as_input_variables = self.used_as_input_variables - id_variable_by_entity_key = self.id_variable_by_entity_key - role_variable_by_entity_key = self.role_variable_by_entity_key - - diagnose_variable_mismatch(used_as_input_variables, input_data_frame) - - id_variables = [ - id_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] - role_variables = [ - role_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] - - for id_variable in id_variables + role_variables: - assert id_variable in input_data_frame.columns, \ - "Variable {} is not present in input dataframe".format(id_variable) - - input_data_frame = self.filter_input_variables(input_data_frame = input_data_frame) - - index_by_entity_key = dict() - - for entity in tax_benefit_system.entities: - self.init_entity_structure(tax_benefit_system, entity, input_data_frame, builder) - - if entity.is_person: - continue - - else: - index_by_entity_key[entity.key] = input_data_frame.loc[ - input_data_frame[role_variable_by_entity_key[entity.key]] == 0, - id_variable_by_entity_key[entity.key] - ].sort_values().index - - for column_name, column_serie in input_data_frame.items(): - if role_variable_by_entity_key is not None: - if column_name in role_variable_by_entity_key.values(): - continue - - if id_variable_by_entity_key is not None: - if column_name in id_variable_by_entity_key.values(): - continue - - simulation = builder.build(tax_benefit_system) - entity = tax_benefit_system.variables[column_name].entity - if entity.is_person: - init_variable_in_entity(simulation, entity.key, column_name, column_serie, period) - else: - init_variable_in_entity(simulation, entity.key, column_name, column_serie[index_by_entity_key[entity.key]], period) - - return simulation - - # def memory_usage(self, use_baseline = False): - # if use_baseline: - # simulation = self.baseline_simulation - # else: - # simulation = self.simulation - - # memory_usage_by_variable = simulation.get_memory_usage()['by_variable'] - # try: - # usage_stats = simulation.tracer.usage_stats - # except AttributeError: - # log.warning("The simulation trace mode is not activated. You need to activate it to get stats about variable usage (hits).") - # usage_stats = None - # infos_lines = list() - - # for variable, infos in memory_usage_by_variable.items(): - # hits = usage_stats[variable]['nb_requests'] if usage_stats else None - # infos_lines.append(( - # infos['total_nb_bytes'], - # variable, "{}: {} periods * {} cells * item size {} ({}) = {} with {} hits".format( - # variable, - # infos['nb_arrays'], - # infos['nb_cells_by_array'], - # infos['cell_size'], - # infos['dtype'], - # humanize.naturalsize(infos['total_nb_bytes'], gnu = True), - # hits, - # ) - # )) - # infos_lines.sort() - # for _, _, line in infos_lines: - # print(line.rjust(100)) # noqa analysis:ignore - - def set_input_data_frame(self, input_data_frame): - """Set the input dataframe. - - Args: - input_data_frame (pd.DataFrame): Input data frame - - """ - self.input_data_frame = input_data_frame - - def set_weight_variable_by_entity(self, weight_variable_by_entity = None): - if weight_variable_by_entity is not None: - self.weight_variable_by_entity = weight_variable_by_entity - - for simulation in [self.simulation, self.baseline_simulation]: - if simulation is not None: - simulation.weight_variable_by_entity = self.weight_variable_by_entity diff --git a/openfisca_survey_manager/simulation_builder.py b/openfisca_survey_manager/simulation_builder.py new file mode 100644 index 00000000..297879e0 --- /dev/null +++ b/openfisca_survey_manager/simulation_builder.py @@ -0,0 +1,285 @@ +import logging +from typing import Dict, List + +from openfisca_core.model_api import MONTH, YEAR +from openfisca_core.simulations.simulation_builder import SimulationBuilder +from openfisca_survey_manager.utils import do_nothing + + +SimulationBuilder.id_variable_by_entity_key = None +SimulationBuilder.role_variable_by_entity_key = None +SimulationBuilder.used_as_input_variables = None +SimulationBuilder.used_as_input_variables_by_entity = None + + +log = logging.getLogger(__name__) + + +# Helpers + +def diagnose_variable_mismatch(used_as_input_variables, input_data_frame): + """Diagnose variables mismatch. + + Args: + used_as_input_variables(lsit): List of variable to test presence + input_data_frame: DataFrame in which to test variables presence + + """ + variables_mismatch = set(used_as_input_variables).difference(set(input_data_frame.columns)) if used_as_input_variables else None + if variables_mismatch: + log.info( + 'The following variables are used as input variables are not present in the input data frame: \n {}'.format( + sorted(variables_mismatch))) + if variables_mismatch: + log.debug('The following variables are used as input variables: \n {}'.format( + sorted(used_as_input_variables))) + log.debug('The input_data_frame contains the following variables: \n {}'.format( + sorted(list(input_data_frame.columns)))) + + +# SimulationBuilder monkey-patched methods + +def _set_id_variable_by_entity_key(builder) -> Dict[str, str]: + """Identify and sets the correct ids for the different entities.""" + if builder.id_variable_by_entity_key is None: + log.debug("Use default id_variable names") + builder.id_variable_by_entity_key = dict( + (entity.key, entity.key + '_id') for entity in builder.tax_benefit_system.entities) + + return builder.id_variable_by_entity_key + +def _set_role_variable_by_entity_key(builder) -> Dict[str, str]: + """Identify and sets the correct roles for the different entities.""" + if builder.role_variable_by_entity_key is None: + builder.role_variable_by_entity_key = dict( + (entity.key, entity.key + '_role_index') for entity in builder.tax_benefit_system.entities) + + return builder.role_variable_by_entity_key + +def _set_used_as_input_variables_by_entity(builder) -> Dict[str, List[str]]: + """Identify and sets the correct input variables for the different entities.""" + if builder.used_as_input_variables_by_entity is not None: + return + + tax_benefit_system = builder.tax_benefit_system + + assert set(builder.used_as_input_variables) <= set(tax_benefit_system.variables.keys()), \ + "Some variables used as input variables are not part of the tax benefit system:\n {}".format( + set(builder.used_as_input_variables).difference(set(tax_benefit_system.variables.keys())) + ) + + builder.used_as_input_variables_by_entity = dict() + + for entity in tax_benefit_system.entities: + builder.used_as_input_variables_by_entity[entity.key] = [ + variable + for variable in builder.used_as_input_variables + if tax_benefit_system.get_variable(variable).entity.key == entity.key + ] + + return builder.used_as_input_variables_by_entity + + +def filter_input_variables(builder, input_data_frame, tax_benefit_system): + """Filter the input data frame from variables that won't be used or are set to be computed. + + Args: + input_data_frame: Input dataframe (Default value = None) + + Returns: + pd.DataFrame: filtered dataframe + + """ + assert input_data_frame is not None + id_variable_by_entity_key = builder.id_variable_by_entity_key + role_variable_by_entity_key = builder.role_variable_by_entity_key + used_as_input_variables = builder.used_as_input_variables + + variables = tax_benefit_system.variables + + id_variables = [ + id_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] + role_variables = [ + role_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] + + log.debug('Variable used_as_input_variables in filter: \n {}'.format(used_as_input_variables)) + + unknown_columns = [] + for column_name in input_data_frame: + if column_name in id_variables + role_variables: + continue + if column_name not in variables: + unknown_columns.append(column_name) + + input_data_frame.drop(unknown_columns, axis = 1, inplace = True) + + if unknown_columns: + log.debug('The following unknown columns {}, are dropped from input table'.format( + sorted(unknown_columns))) + + used_columns = [] + dropped_columns = [] + for column_name in input_data_frame: + if column_name in id_variables + role_variables: + continue + variable = variables[column_name] + # Keeping the calculated variables that are initialized by the input data + if variable.formulas: + if column_name in used_as_input_variables: + used_columns.append(column_name) + continue + + dropped_columns.append(column_name) + + input_data_frame.drop(dropped_columns, axis = 1, inplace = True) + + if used_columns: + log.debug( + 'These columns are not dropped because present in used_as_input_variables:\n {}'.format( + sorted(used_columns))) + if dropped_columns: + log.debug( + 'These columns in survey are set to be calculated, we drop them from the input table:\n {}'.format( + sorted(dropped_columns))) + + log.info('Keeping the following variables in the input_data_frame:\n {}'.format( + sorted(list(input_data_frame.columns)))) + return input_data_frame + + +def init_all_entities(builder, tax_benefit_system, input_data_frame, period = None): + assert period is not None + log.info('Initialasing simulation using input_data_frame for period {}'.format(period)) + + if period.unit == YEAR: # 1. year + simulation = builder.init_simulation_with_data_frame( + tax_benefit_system, + input_data_frame = input_data_frame, + period = period, + ) + elif period.unit == MONTH and period.size == 3: # 2. quarter + for offset in range(period.size): + period_item = period.first_month.offset(offset, MONTH) + simulation = builder.init_simulation_with_data_frame( + tax_benefit_system, + input_data_frame = input_data_frame, + period = period_item, + builder = builder, + ) + elif period.unit == MONTH and period.size == 1: # 3. months + simulation = builder.init_simulation_with_data_frame( + tax_benefit_system, + input_data_frame = input_data_frame, + period = period, + ) + else: + raise ValueError("Invalid period {}".format(period)) + + assert builder.id_variable_by_entity_key is not None + simulation.id_variable_by_entity_key = builder.id_variable_by_entity_key + return simulation + + +def init_entity_structure(builder, tax_benefit_system, entity, input_data_frame): + """Initialize sthe simulation with tax_benefit_system entities and input_data_frame. + + Args: + tax_benefit_system(TaxBenfitSystem): The TaxBenefitSystem to get the structure from + entity(Entity): The entity to initialize structure + input_data_frame(pd.DataFrame): The input + builder(Builder): The builder + + """ + builder.tax_benefit_system = tax_benefit_system + builder._set_id_variable_by_entity_key() + builder._set_role_variable_by_entity_key() + builder._set_used_as_input_variables_by_entity() + + input_data_frame = builder.filter_input_variables(input_data_frame, tax_benefit_system) + + id_variables = [ + builder.id_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] + role_variables = [ + builder.role_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] + + if entity.is_person: + for id_variable in id_variables + role_variables: + assert id_variable in input_data_frame.columns, \ + "Variable {} is not present in input dataframe".format(id_variable) + + ids = range(len(input_data_frame)) + if entity.is_person: + builder.declare_person_entity(entity.key, ids) + for group_entity in tax_benefit_system.group_entities: + _key = group_entity.key + _id_variable = builder.id_variable_by_entity_key[_key] + _role_variable = builder.role_variable_by_entity_key[_key] + group_population = builder.declare_entity(_key, input_data_frame[_id_variable].drop_duplicates().sort_values().values) + builder.join_with_persons( + group_population, + input_data_frame[_id_variable].astype('int').values, + input_data_frame[_role_variable].astype('int').values, + ) + + +def init_simulation_with_data_frame(builder, tax_benefit_system, input_data_frame, period): + """Initialize the simulation period with current input_data_frame for an entity if specified.""" + used_as_input_variables = builder.used_as_input_variables + id_variable_by_entity_key = builder.id_variable_by_entity_key + role_variable_by_entity_key = builder.role_variable_by_entity_key + + diagnose_variable_mismatch(used_as_input_variables, input_data_frame) + + id_variables = [ + id_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] + role_variables = [ + role_variable_by_entity_key[_entity.key] for _entity in tax_benefit_system.group_entities] + + for id_variable in id_variables + role_variables: + assert id_variable in input_data_frame.columns, \ + "Variable {} is not present in input dataframe".format(id_variable) + + input_data_frame = builder.filter_input_variables(input_data_frame, tax_benefit_system) + + index_by_entity_key = dict() + + for entity in tax_benefit_system.entities: + builder.init_entity_structure(tax_benefit_system, entity, input_data_frame) + + if entity.is_person: + continue + + else: + index_by_entity_key[entity.key] = input_data_frame.loc[ + input_data_frame[role_variable_by_entity_key[entity.key]] == 0, + id_variable_by_entity_key[entity.key] + ].sort_values().index + + for column_name, column_serie in input_data_frame.items(): + if role_variable_by_entity_key is not None: + if column_name in role_variable_by_entity_key.values(): + continue + + if id_variable_by_entity_key is not None: + if column_name in id_variable_by_entity_key.values(): + continue + + simulation = builder.build(tax_benefit_system) + entity = tax_benefit_system.variables[column_name].entity + if entity.is_person: + simulation.init_variable_in_entity(entity.key, column_name, column_serie, period) + else: + simulation.init_variable_in_entity(entity.key, column_name, column_serie[index_by_entity_key[entity.key]], period) + + assert builder.id_variable_by_entity_key is not None + simulation.id_variable_by_entity_key = builder.id_variable_by_entity_key + return simulation + + +SimulationBuilder._set_id_variable_by_entity_key = _set_id_variable_by_entity_key +SimulationBuilder._set_role_variable_by_entity_key = _set_role_variable_by_entity_key +SimulationBuilder._set_used_as_input_variables_by_entity = _set_used_as_input_variables_by_entity +SimulationBuilder.filter_input_variables = filter_input_variables +SimulationBuilder.init_all_entities = init_all_entities +SimulationBuilder.init_entity_structure = init_entity_structure +SimulationBuilder.init_simulation_with_data_frame = init_simulation_with_data_frame diff --git a/openfisca_survey_manager/simulations.py b/openfisca_survey_manager/simulations.py index ea58ca7f..eedd84e1 100644 --- a/openfisca_survey_manager/simulations.py +++ b/openfisca_survey_manager/simulations.py @@ -4,7 +4,7 @@ import numpy as np import pandas as pd import re -from typing import Dict, List +from typing import Dict, List, Optional, Union import humanize @@ -12,26 +12,33 @@ from openfisca_core import periods from openfisca_core.indexed_enums import Enum -from openfisca_core.periods import ETERNITY +from openfisca_core.periods import ETERNITY, MONTH, YEAR +from openfisca_core.types import Array, Period, Simulation, TaxBenefitSystem from openfisca_core.simulations import Simulation +from openfisca_survey_manager import default_config_files_directory +from openfisca_survey_manager.simulation_builder import diagnose_variable_mismatch, SimulationBuilder +from openfisca_survey_manager.survey_collections import SurveyCollection + from openfisca_survey_manager.statshelpers import mark_weighted_percentiles +from openfisca_survey_manager.utils import do_nothing + log = logging.getLogger(__name__) # Helpers -def assert_variables_in_same_entity(tax_benefit_system, variables): - """Asserts taht variables are in the same entity +def assert_variables_in_same_entity(tax_benefit_system: TaxBenefitSystem, variables: List): + """ + Assert that variables are in the same entity. Args: - survey_scenario: Host SurveyScenario - variables: Variables to check presence + tax_benefit_system (TaxBenefitSystem): Host tax benefit system + variables (List): Variables supposed to belong to the same entity Returns: - str: Unique entity key if variables all belongs to it - + str: Common entity of the variables """ entity = None for variable_name in variables: @@ -44,24 +51,40 @@ def assert_variables_in_same_entity(tax_benefit_system, variables): return entity.key -def get_words(text): +def load_table(variables = None, collection = None, survey = None, input_data_survey_prefix = None, data_year = None, table = None, config_files_directory = default_config_files_directory): + survey_collection = SurveyCollection.load(collection = collection, config_files_directory=config_files_directory) + if survey is not None: + survey = survey + else: + survey = "{}_{}".format(input_data_survey_prefix, str(data_year)) + survey_ = survey_collection.get_survey(survey) + log.debug("Loading table {} in survey {} from collection {}".format(table, survey, collection)) + return survey_.get_values(table = table, variables = variables) + + +def get_words(text: str): return re.compile('[A-Za-z_]+').findall(text) # Main functions -def adaptative_calculate_variable(simulation, variable = None, period = None): - assert variable is not None - assert simulation is not None - assert period is not None +def adaptative_calculate_variable(simulation: Simulation, variable: str, period: Optional[Union[int, str, Period]]) -> Array: + """ + Calculate variable by adpating it definition period to the target period. - tax_benefit_system = simulation.tax_benefit_system + Args: + simulation (Simulation): Simulation to suse + variable (str): Variable to be computed + period (Optional[Union[int, str, Period]]): Target period + + Returns: + Array: Values of the variable on the target period + """ - if isinstance(period, (int, np.integer)): - period = str(period) if not isinstance(period, periods.Period): - period = periods.period(period) - assert simulation is not None + period = periods.period(str(period)) + + tax_benefit_system = simulation.tax_benefit_system assert tax_benefit_system is not None assert variable in tax_benefit_system.variables, "{} is not a valid variable".format(variable) @@ -89,9 +112,9 @@ def adaptative_calculate_variable(simulation, variable = None, period = None): return values -def compute_aggregate(simulation, variable = None, aggfunc = 'sum', filter_by = None, period = None, - missing_variable_default_value = np.nan, weighted = True, alternative_weights = None, - filtering_variable_by_entity = None): +def compute_aggregate(simulation: Simulation, variable: str = None, aggfunc: str = 'sum', filter_by: str = None, period: Optional[Union[int, str, Period]] = None, + missing_variable_default_value = np.nan, weighted: bool = True, alternative_weights: Optional[Union[str, int, float, Array]] = None, + filtering_variable_by_entity: Dict = None): weight_variable_by_entity = simulation.weight_variable_by_entity tax_benefit_system = simulation.tax_benefit_system @@ -196,7 +219,7 @@ def compute_aggregate(simulation, variable = None, aggfunc = 'sum', filter_by = return aggregate -def compute_quantiles(simulation = None, variable = None, nquantiles = None, period = None, filter_by = None, +def compute_quantiles(simulation: Simulation = None, variable: Optional[str] = None, nquantiles = None, period: Optional[Union[int, str, Period]] = None, filter_by = None, weighted = True, alternative_weights = None, filtering_variable_by_entity = None): @@ -236,15 +259,17 @@ def compute_quantiles(simulation = None, variable = None, nquantiles = None, per return values -def compute_pivot_table(simulation = None, baseline_simulation = None, aggfunc = 'mean', +def compute_pivot_table(simulation: Simulation = None, baseline_simulation = None, aggfunc = 'mean', columns = None, difference = False, filter_by = None, index = None, - period = None, use_baseline_for_columns = None, values = None, + period: Optional[Union[int, str, Period]] = None, use_baseline_for_columns = None, values = None, missing_variable_default_value = np.nan, concat_axis = None, weighted = True, alternative_weights = None, filtering_variable_by_entity = None): weight_variable_by_entity = simulation.weight_variable_by_entity + admissible_aggfuncs = ['max', 'mean', 'min', 'sum', 'count', 'sum_abs'] assert aggfunc in admissible_aggfuncs + assert columns or index or values if baseline_simulation is not None: tax_benefit_system = baseline_simulation.tax_benefit_system @@ -437,10 +462,11 @@ def compute_pivot_table(simulation = None, baseline_simulation = None, aggfunc = return data_frame.pivot_table(index = index, columns = columns, values = weight_variable, aggfunc = 'sum') -def create_data_frame_by_entity(simulation, variables = None, expressions = None, filter_by = None, index = False, - period = None, merge = False, id_variable_by_entity_key = None): +def create_data_frame_by_entity(simulation: Simulation, variables = None, expressions = None, filter_by = None, index = False, + period: Optional[Union[int, str, Period]] = None, merge = False): assert simulation is not None + id_variable_by_entity_key = simulation.id_variable_by_entity_key tax_benefit_system = simulation.tax_benefit_system assert tax_benefit_system is not None @@ -576,7 +602,7 @@ def compute_winners_loosers( baseline_simulation, variable: str, filter_by = None, - period = None, + period: Optional[Union[int, str, Period]] = None, absolute_minimal_detected_variation: float = 0, relative_minimal_detected_variation: float = .01, observations_threshold: int = None, @@ -585,7 +611,8 @@ def compute_winners_loosers( filtering_variable_by_entity = None, ) -> Dict[str, int]: """ - Compute the number of winners and loosers for a given variable + Compute the number of winners and loosers for a given variable. + Args: simulation: The OpenFisca simulation object baseline_simulation: The OpenFisca simulation to compare @@ -688,7 +715,266 @@ def compute_winners_loosers( } -def print_memory_usage(simulation): +def init_entity_data(simulation: Simulation, entity, filtered_input_data_frame, period, used_as_input_variables_by_entity): + used_as_input_variables = used_as_input_variables_by_entity[entity.key] + input_data_frame = filtered_input_data_frame + # input_data_frame = self.filter_input_variables(input_data_frame = input_data_frame) + diagnose_variable_mismatch(used_as_input_variables, input_data_frame) + + for column_name, column_serie in input_data_frame.items(): + variable_instance = simulation.tax_benefit_system.variables.get(column_name) + if variable_instance is None: + log.info(f"Ignoring {column_name} in input data") + continue + + if variable_instance.entity.key != entity.key: + log.info("Ignoring variable {} which is not part of entity {} but {}".format( + column_name, entity.key, variable_instance.entity.key)) + continue + init_variable_in_entity(simulation, entity.key, column_name, column_serie, period) + + +def inflate(simulation: Simulation, inflator_by_variable = None, period: Optional[Union[int, str, Period]] = None, target_by_variable = None): + tax_benefit_system = simulation.tax_benefit_system + for variable_name in set(inflator_by_variable.keys()).union(set(target_by_variable.keys())): + assert variable_name in tax_benefit_system.variables, \ + "Variable {} is not a valid variable of the tax-benefit system".format(variable_name) + if variable_name in target_by_variable: + inflator = inflator_by_variable[variable_name] = \ + target_by_variable[variable_name] / simulation.compute_aggregate( + variable = variable_name, period = period) + log.info('Using {} as inflator for {} to reach the target {} '.format( + inflator, variable_name, target_by_variable[variable_name])) + else: + assert variable_name in inflator_by_variable, 'variable_name is not in inflator_by_variable' + log.info('Using inflator {} for {}. The target is thus {}'.format( + inflator_by_variable[variable_name], + variable_name, inflator_by_variable[variable_name] * simulation.compute_aggregate( + variable = variable_name, period = period) + )) + inflator = inflator_by_variable[variable_name] + + array = simulation.calculate_add(variable_name, period = period) + assert array is not None + simulation.delete_arrays(variable_name, period = period) # delete existing arrays + simulation.set_input(variable_name, period, inflator * array) # insert inflated array + + +def init_simulation(tax_benefit_system, period, data): + builder = SimulationBuilder() + builder.create_entities(tax_benefit_system) + + collection = data.get("collection") + custom_input_data_frame = data.get("custom_input_data_frame", do_nothing) + data_year = data.get("data_year") + survey = data.get('survey') + builder.used_as_input_variables = data.get("used_as_input_variables") + + default_source_types = [ + 'input_data_frame', + 'input_data_table', + 'input_data_frame_by_entity', + 'input_data_frame_by_entity_by_period', + 'input_data_table_by_entity_by_period', + 'input_data_table_by_period', + ] + source_types = [ + source_type_ + for source_type_ in default_source_types + if data.get(source_type_, None) is not None + ] + assert len(source_types) < 2, "There are too many data source types" + assert len(source_types) >= 1, "There should be one data source type included in {}".format( + default_source_types) + source_type = source_types[0] + source = data[source_type] + + if source_type == 'input_data_frame_by_entity': + assert data_year is not None + source_type = 'input_data_frame_by_entity_by_period' + source = {periods.period(data_year): source} + + input_data_survey_prefix = data.get("input_data_survey_prefix") if data is not None else None + + if source_type == 'input_data_frame': + simulation = builder.init_all_entities(tax_benefit_system, source, period) + + if source_type == 'input_data_table': + # Case 1: fill simulation with a unique input_data_frame given by the attribute + if input_data_survey_prefix is not None: + openfisca_survey_collection = SurveyCollection.load(collection = collection) + openfisca_survey = openfisca_survey_collection.get_survey("{}_{}".format( + input_data_survey_prefix, data_year)) + input_data_frame = openfisca_survey.get_values(table = "input").reset_index(drop = True) + else: + NotImplementedError + + custom_input_data_frame(input_data_frame, period = period) + simulation = builder.init_all_entities(tax_benefit_system, input_data_frame, builder, period) # monolithic dataframes + + elif source_type == 'input_data_table_by_period': + # Case 2: fill simulation with input_data_frame by period containing all entity variables + input_data_table_by_period = data.get("input_data_table_by_period") + for period, table in input_data_table_by_period.items(): + period = periods.period(period) + log.debug('From survey {} loading table {}'.format(survey, table)) + input_data_frame = load_table(collection = collection, survey = survey, input_data_survey_prefix = input_data_survey_prefix, table = table) + custom_input_data_frame(input_data_frame, period = period) + simulation = builder.init_all_entities(tax_benefit_system, input_data_frame, builder, period) # monolithic dataframes + + elif source_type == 'input_data_frame_by_entity_by_period': + for period, input_data_frame_by_entity in source.items(): + period = periods.period(period) + for entity in tax_benefit_system.entities: + input_data_frame = input_data_frame_by_entity.get(entity.key) + if input_data_frame is None: + continue + custom_input_data_frame(input_data_frame, period = period, entity = entity.key) + builder.init_entity_structure(tax_benefit_system, entity, input_data_frame) # TODO complete args + + simulation = builder.build(tax_benefit_system) + simulation.id_variable_by_entity_key = builder.id_variable_by_entity_key # Should be propagated to enhanced build + + for period, input_data_frame_by_entity in source.items(): + for entity in tax_benefit_system.entities: + input_data_frame = input_data_frame_by_entity.get(entity.key) + if input_data_frame is None: + log.debug("No input_data_frame found for entity {} at period {}".format(entity, period)) + continue + custom_input_data_frame(input_data_frame, period = period, entity = entity.key) + simulation.init_entity_data(entity, input_data_frame, period, builder.used_as_input_variables_by_entity) + + elif source_type == 'input_data_table_by_entity_by_period': + # Case 3: fill simulation with input_data_table by entity_by_period containing a dictionnary + # of all periods containing a dictionnary of entity variables + input_data_table_by_entity_by_period = source + simulation = None + for period, input_data_table_by_entity in input_data_table_by_entity_by_period.items(): + period = periods.period(period) + + if simulation is None: + for entity in tax_benefit_system.entities: + table = input_data_table_by_entity.get(entity.key) + if table is None: + continue + if survey is not None: + input_data_frame = load_table(collection = collection, survey = survey, table = table) + else: + input_data_frame = load_table(collection = collection, survey = 'input', table = table) + custom_input_data_frame(input_data_frame, period = period, entity = entity.key) + builder.init_entity_structure(tax_benefit_system, entity, input_data_frame) # TODO complete args + + simulation = builder.build(tax_benefit_system) + simulation.id_variable_by_entity_key = builder.id_variable_by_entity_key # Should be propagated to enhanced build + + for entity in tax_benefit_system.entities: + table = input_data_table_by_entity.get(entity.key) + if table is None: + continue + if survey is not None: + input_data_frame = load_table(collection = collection, survey = survey, table = table) + else: + input_data_frame = load_table(collection = collection, survey = 'input', table = table) + custom_input_data_frame(input_data_frame, period = period, entity = entity.key) + simulation.init_entity_data(entity, input_data_frame, period, builder.used_as_input_variables_by_entity) + else: + pass + + if data_year is not None: + simulation.period = periods.period(data_year) + + return simulation + + +def init_variable_in_entity(simulation: Simulation, entity, variable_name, series, period): + variable = simulation.tax_benefit_system.variables[variable_name] + + # np.issubdtype cannot handles categorical variables + if (not pd.api.types.is_categorical_dtype(series)) and np.issubdtype(series.values.dtype, np.floating): + if series.isnull().any(): + log.debug('There are {} NaN values for {} non NaN values in variable {}'.format( + series.isnull().sum(), series.notnull().sum(), variable_name)) + log.debug('We convert these NaN values of variable {} to {} its default value'.format( + variable_name, variable.default_value)) + series.fillna(variable.default_value, inplace = True) + assert series.notnull().all(), \ + 'There are {} NaN values for {} non NaN values in variable {}'.format( + series.isnull().sum(), series.notnull().sum(), variable_name) + + enum_variable_imputed_as_enum = ( + variable.value_type == Enum + and ( + pd.api.types.is_categorical_dtype(series) + or not ( + np.issubdtype(series.values.dtype, np.integer) + or np.issubdtype(series.values.dtype, float) + ) + ) + ) + + if enum_variable_imputed_as_enum: + if series.isnull().any(): + log.debug('There are {} NaN values ({}% of the array) in variable {}'.format( + series.isnull().sum(), series.isnull().mean() * 100, variable_name)) + log.debug('We convert these NaN values of variable {} to {} its default value'.format( + variable_name, variable.default_value._name_)) + series.fillna(variable.default_value._name_, inplace = True) + possible_values = variable.possible_values + index_by_category = dict(zip( + possible_values._member_names_, + range(len(possible_values._member_names_)) + )) + series.replace(index_by_category, inplace = True) + + if series.values.dtype != variable.dtype: + log.debug( + 'Converting {} from dtype {} to {}'.format( + variable_name, series.values.dtype, variable.dtype) + ) + + array = series.values.astype(variable.dtype) + # TODO is the next line needed ? + # Might be due to values returning also ndarray like objects + # for instance for categories or + np_array = np.array(array, dtype = variable.dtype) + if variable.definition_period == YEAR and period.unit == MONTH: + # Some variables defined for a year are present in month/quarter dataframes + # Cleaning the dataframe would probably be better in the long run + log.warn(f"Trying to set a monthly value for variable {variable_name}, which is defined on a year. The montly values you provided will be summed.") + + if simulation.get_array(variable_name, period.this_year) is not None: + array_sum = simulation.get_array(variable_name, period.this_year) + np_array + simulation.set_input(variable_name, period.this_year, array_sum) + else: + simulation.set_input(variable_name, period.this_year, np_array) + + else: + simulation.set_input(variable_name, period, np_array) + + +def new_from_tax_benefit_system( + tax_benefit_system, + debug = None, + trace = None, + data = None, + memory_config = None, + period: Optional[Union[int, str, Period]] = None, + custom_initialize = None, + ): + + simulation = Simulation.init_simulation(tax_benefit_system, period, data) + simulation.debug = debug + simulation.trace = trace + simulation.opt_out_cache = True if simulation.tax_benefit_system.cache_blacklist is not None else False + simulation.memory_config = memory_config + + if custom_initialize: + custom_initialize(simulation) + + return simulation + + +def print_memory_usage(simulation: Simulation): memory_usage_by_variable = simulation.get_memory_usage()['by_variable'] try: usage_stats = simulation.tracer.usage_stats @@ -723,7 +1009,7 @@ def set_weight_variable_by_entity( simulation.weight_variable_by_entity = weight_variable_by_entity -def summarize_variable(simulation, variable = None, weighted = False, force_compute = False): +def summarize_variable(simulation: Simulation, variable = None, weighted = False, force_compute = False): """Print a summary of a variable including its memory usage. Args: @@ -851,6 +1137,11 @@ def summarize_variable(simulation, variable = None, weighted = False, force_comp Simulation.create_data_frame_by_entity = create_data_frame_by_entity Simulation.compute_quantiles = compute_quantiles Simulation.compute_winners_loosers = compute_winners_loosers +Simulation.new_from_tax_benefit_system = new_from_tax_benefit_system +Simulation.inflate = inflate +Simulation.init_entity_data = init_entity_data +Simulation.init_simulation = init_simulation +Simulation.init_variable_in_entity = init_variable_in_entity Simulation.print_memory_usage = print_memory_usage Simulation.set_weight_variable_by_entity = set_weight_variable_by_entity Simulation.summarize_variable = summarize_variable diff --git a/openfisca_survey_manager/tests/test_create_data_frame_by_entity.py b/openfisca_survey_manager/tests/test_create_data_frame_by_entity.py index a12a1365..d2998ee4 100644 --- a/openfisca_survey_manager/tests/test_create_data_frame_by_entity.py +++ b/openfisca_survey_manager/tests/test_create_data_frame_by_entity.py @@ -6,33 +6,32 @@ log = logging.getLogger(__name__) -class TestCreateDataFrameByEntity(unittest.TestCase): - def test_create_data_frame_by_entity(self): - survey_scenario = create_randomly_initialized_survey_scenario() - period = '2017-01' - df_by_entity = survey_scenario.create_data_frame_by_entity( - variables = ['salary', 'rent'], - period = period, - ) - salary = survey_scenario.calculate_variable('salary', period = period) - rent = survey_scenario.calculate_variable('rent', period = period) - for entity, df in df_by_entity.items(): - assert not df.empty, "{} dataframe is empty".format(entity) - assert (df_by_entity['person']['salary'] == salary).all().all() - assert (df_by_entity['household']['rent'] == rent).all().all() +def test_create_data_frame_by_entity(): + survey_scenario = create_randomly_initialized_survey_scenario() + period = '2017-01' + df_by_entity = survey_scenario.create_data_frame_by_entity( + variables = ['salary', 'rent'], + period = period, + ) + salary = survey_scenario.calculate_variable('salary', period = period) + rent = survey_scenario.calculate_variable('rent', period = period) + for entity, df in df_by_entity.items(): + assert not df.empty, "{} dataframe is empty".format(entity) + assert (df_by_entity['person']['salary'] == salary).all().all() + assert (df_by_entity['household']['rent'] == rent).all().all() - def test_create_data_frame_by_entity_with_index(self): - survey_scenario = create_randomly_initialized_survey_scenario() - period = '2017-01' - data_frame_by_entity = survey_scenario.create_data_frame_by_entity( - variables = ['salary', 'rent', "person_id", "household_id"], - period = period, - index = True - ) - for entity, input_dataframe in data_frame_by_entity.items(): - print(f"{entity} for {period}") # noqa T201 - print(input_dataframe.columns) # noqa T201 - if entity == "person": - self.assertIn("person_id", input_dataframe.columns.to_list()) - if entity == "household": - self.assertIn("household_id", input_dataframe.columns.to_list()) +def test_create_data_frame_by_entity_with_index(): + survey_scenario = create_randomly_initialized_survey_scenario() + period = '2017-01' + data_frame_by_entity = survey_scenario.create_data_frame_by_entity( + variables = ['salary', 'rent', "person_id", "household_id"], + period = period, + index = True + ) + for entity, input_dataframe in data_frame_by_entity.items(): + print(f"{entity} for {period}") # noqa T201 + print(input_dataframe.columns) # noqa T201 + if entity == "person": + assert "person_id" in input_dataframe.columns.to_list() + if entity == "household": + assert "household_id" in input_dataframe.columns.to_list() diff --git a/openfisca_survey_manager/tests/test_marginal_tax_rate.py b/openfisca_survey_manager/tests/test_marginal_tax_rate.py index fb85ceb9..82d918f4 100644 --- a/openfisca_survey_manager/tests/test_marginal_tax_rate.py +++ b/openfisca_survey_manager/tests/test_marginal_tax_rate.py @@ -23,7 +23,7 @@ def test_compute_marginal_tax_rate(): (1 - .15), relative_error_margin = 1e-6, ) - survey_scenario.compute_marginal_tax_rate(target_variable = 'disposable_income', period = 2017, simulation = "baseline") + # survey_scenario.compute_marginal_tax_rate(target_variable = 'disposable_income', period = 2017, simulation = "baseline") if __name__ == "__main__": diff --git a/openfisca_survey_manager/utils.py b/openfisca_survey_manager/utils.py index 731f814e..100547cd 100644 --- a/openfisca_survey_manager/utils.py +++ b/openfisca_survey_manager/utils.py @@ -1,5 +1,3 @@ - - import logging import os import pandas as pd @@ -12,6 +10,10 @@ log = logging.getLogger(__name__) +def do_nothing(*args, **kwargs): + return None + + def inflate_parameters(parameters, inflator, base_year, last_year = None, ignore_missing_units = False, start_instant = None, round_ndigits = 2): """