from typing import TYPE_CHECKING, Any, Callable
import numpy
from numpy.typing import ArrayLike
from policyengine_core import projectors
from policyengine_core.entities import Entity, Role
from policyengine_core.enums import EnumArray
from policyengine_core.populations.population import Population
from policyengine_core.periods.period_ import Period
from typing import Optional, Container
if TYPE_CHECKING:
from policyengine_core.simulations import Simulation
[docs]class GroupPopulation(Population):
def __init__(self, entity: Entity, members: Population):
super().__init__(entity)
self.members: Population = members
self._members_entity_id: ArrayLike = None
self._members_role: ArrayLike = None
self._members_position: ArrayLike = None
self._ordered_members_map = None
def __call__(
self,
variable_name: str,
period: Period = None,
options: Optional[Container[str]] = None,
):
variable = self.simulation.tax_benefit_system.variables.get(
variable_name
)
if variable.entity.is_person:
return self.sum(self.members(variable_name, period, options))
else:
return super().__call__(variable_name, period, options)
[docs] def clone(
self, simulation: "Simulation", members: Population
) -> "GroupPopulation":
result = GroupPopulation(self.entity, members)
result.simulation = simulation
result._holders = {
variable: holder.clone(self)
for (variable, holder) in self._holders.items()
}
result.count = self.count
result.ids = self.ids
result._members_entity_id = self._members_entity_id
result._members_role = self._members_role
result._members_position = self._members_position
result._ordered_members_map = self._ordered_members_map
return result
@property
def members_position(self) -> ArrayLike:
if (
self._members_position is None
and self.members_entity_id is not None
):
# We could use self.count and self.members.count , but with the current initilization, we are not sure count will be set before members_position is called
nb_entities = numpy.max(self.members_entity_id) + 1
nb_persons = len(self.members_entity_id)
self._members_position = numpy.empty_like(self.members_entity_id)
counter_by_entity = numpy.zeros(nb_entities)
for k in range(nb_persons):
entity_index = self.members_entity_id[k]
self._members_position[k] = counter_by_entity[entity_index]
counter_by_entity[entity_index] += 1
return self._members_position
@members_position.setter
def members_position(self, members_position: ArrayLike) -> None:
self._members_position = members_position
@property
def members_entity_id(self) -> ArrayLike:
return self._members_entity_id
@members_entity_id.setter
def members_entity_id(self, members_entity_id: ArrayLike) -> None:
self._members_entity_id = members_entity_id
@property
def members_role(self) -> ArrayLike:
if self._members_role is None:
default_role = self.entity.flattened_roles[0]
self._members_role = numpy.repeat(
default_role, len(self.members_entity_id)
)
return self._members_role
@members_role.setter
def members_role(self, members_role: ArrayLike):
if members_role is not None:
self._members_role = numpy.array(members_role)
@property
def ordered_members_map(self) -> ArrayLike:
"""
Mask to group the persons by entity
This function only caches the map value, to see what the map is used for, see value_nth_person method.
"""
if self._ordered_members_map is None:
self._ordered_members_map = numpy.argsort(self.members_entity_id)
return self._ordered_members_map
[docs] def get_role(self, role_name: str) -> Role:
return next(
(
role
for role in self.entity.flattened_roles
if role.key == role_name
),
None,
)
# Aggregation persons -> entity
[docs] @projectors.projectable
def sum(self, array: ArrayLike, role: Role = None) -> ArrayLike:
"""
Return the sum of ``array`` for the members of the entity.
``array`` must have the dimension of the number of persons in the simulation
If ``role`` is provided, only the entity member with the given role are taken into account.
Example:
>>> salaries = household.members('salary', '2018-01') # e.g. [2000, 1500, 0, 0, 0]
>>> household.sum(salaries)
>>> array([3500])
"""
self.entity.check_role_validity(role)
self.members.check_array_compatible_with_entity(array)
if role is not None:
role_filter = self.members.has_role(role)
return numpy.bincount(
self.members_entity_id[role_filter],
weights=array[role_filter],
minlength=self.count,
)
else:
return numpy.bincount(self.members_entity_id, weights=array)
[docs] @projectors.projectable
def any(self, array: ArrayLike, role: Role = None) -> ArrayLike:
"""
Return ``True`` if ``array`` is ``True`` for any members of the entity.
``array`` must have the dimension of the number of persons in the simulation
If ``role`` is provided, only the entity member with the given role are taken into account.
Example:
>>> salaries = household.members('salary', '2018-01') # e.g. [2000, 1500, 0, 0, 0]
>>> household.any(salaries >= 1800)
>>> array([True])
"""
sum_in_entity = self.sum(array, role=role)
return sum_in_entity > 0
[docs] @projectors.projectable
def reduce(
self,
array: ArrayLike,
reducer: Callable,
neutral_element: Any,
role: Role = None,
) -> ArrayLike:
self.members.check_array_compatible_with_entity(array)
self.entity.check_role_validity(role)
position_in_entity = self.members_position
role_filter = self.members.has_role(role) if role is not None else True
filtered_array = numpy.where(role_filter, array, neutral_element)
result = self.filled_array(
neutral_element
) # Neutral value that will be returned if no one with the given role exists.
# We loop over the positions in the entity
# Looping over the entities is tempting, but potentielly slow if there are a lot of entities
biggest_entity_size = numpy.max(position_in_entity) + 1
for p in range(biggest_entity_size):
values = self.value_nth_person(
p, filtered_array, default=neutral_element
)
result = reducer(result, values)
return result
[docs] @projectors.projectable
def all(self, array: ArrayLike, role: Role = None) -> ArrayLike:
"""
Return ``True`` if ``array`` is ``True`` for all members of the entity.
``array`` must have the dimension of the number of persons in the simulation
If ``role`` is provided, only the entity member with the given role are taken into account.
Example:
>>> salaries = household.members('salary', '2018-01') # e.g. [2000, 1500, 0, 0, 0]
>>> household.all(salaries >= 1800)
>>> array([False])
"""
return self.reduce(
array, reducer=numpy.logical_and, neutral_element=True, role=role
)
[docs] @projectors.projectable
def max(self, array: ArrayLike, role: Role = None) -> ArrayLike:
"""
Return the maximum value of ``array`` for the entity members.
``array`` must have the dimension of the number of persons in the simulation
If ``role`` is provided, only the entity member with the given role are taken into account.
Example:
>>> salaries = household.members('salary', '2018-01') # e.g. [2000, 1500, 0, 0, 0]
>>> household.max(salaries)
>>> array([2000])
"""
return self.reduce(
array,
reducer=numpy.maximum,
neutral_element=-numpy.infty,
role=role,
)
[docs] @projectors.projectable
def min(self, array: ArrayLike, role: Role = None) -> ArrayLike:
"""
Return the minimum value of ``array`` for the entity members.
``array`` must have the dimension of the number of persons in the simulation
If ``role`` is provided, only the entity member with the given role are taken into account.
Example:
>>> salaries = household.members('salary', '2018-01') # e.g. [2000, 1500, 0, 0, 0]
>>> household.min(salaries)
>>> array([0])
>>> household.min(salaries, role = Household.PARENT) # Assuming the 1st two persons are parents
>>> array([1500])
"""
return self.reduce(
array,
reducer=numpy.minimum,
neutral_element=numpy.infty,
role=role,
)
[docs] @projectors.projectable
def nb_persons(self, role: Role = None) -> ArrayLike:
"""
Returns the number of persons contained in the entity.
If ``role`` is provided, only the entity member with the given role are taken into account.
"""
if role:
if role.subroles:
role_condition = numpy.logical_or.reduce(
[self.members_role == subrole for subrole in role.subroles]
)
else:
role_condition = self.members_role == role
return self.sum(role_condition)
else:
return numpy.bincount(self.members_entity_id)
# Projection person -> entity
[docs] @projectors.projectable
def value_from_person(
self, array: ArrayLike, role: Role, default: Any = 0
) -> ArrayLike:
"""
Get the value of ``array`` for the person with the unique role ``role``.
``array`` must have the dimension of the number of persons in the simulation
If such a person does not exist, return ``default`` instead
The result is a vector which dimension is the number of entities
"""
self.entity.check_role_validity(role)
if role.max != 1:
raise Exception(
"You can only use value_from_person with a role that is unique in {}. Role {} is not unique.".format(
self.key, role.key
)
)
self.members.check_array_compatible_with_entity(array)
members_map = self.ordered_members_map
result = self.filled_array(default, dtype=array.dtype)
if isinstance(array, EnumArray):
result = EnumArray(result, array.possible_values)
role_filter = self.members.has_role(role)
entity_filter = self.any(role_filter)
result[entity_filter] = array[members_map][role_filter[members_map]]
return result
[docs] @projectors.projectable
def value_nth_person(
self, n: int, array: ArrayLike, default: Any = 0
) -> ArrayLike:
"""
Get the value of array for the person whose position in the entity is n.
Note that this position is arbitrary, and that members are not sorted.
If the nth person does not exist, return ``default`` instead.
The result is a vector which dimension is the number of entities.
"""
self.members.check_array_compatible_with_entity(array)
positions = self.members_position
nb_persons_per_entity = self.nb_persons()
members_map = self.ordered_members_map
result = self.filled_array(default, dtype=array.dtype)
# For households that have at least n persons, set the result as the value of criteria for the person for which the position is n.
# The map is needed b/c the order of the nth persons of each household in the persons vector is not necessarily the same than the household order.
result[nb_persons_per_entity > n] = array[members_map][
positions[members_map] == n
]
if isinstance(array, EnumArray):
result = EnumArray(result, array.possible_values)
return result
[docs] @projectors.projectable
def value_from_first_person(self, array: ArrayLike):
return self.value_nth_person(0, array)
# Projection entity -> person(s)
[docs] def project(self, array: ArrayLike, role: Role = None) -> ArrayLike:
self.check_array_compatible_with_entity(array)
self.entity.check_role_validity(role)
if role is None:
return array[self.members_entity_id]
else:
role_condition = self.members.has_role(role)
return numpy.where(
role_condition, array[self.members_entity_id], 0
)