#902 BatchRunner and DataCollector Updates

Closed tpike3 tpike3 Pseudo commit used to compare (1a1f28c...9e6a9e6)
Missing base report.

Unable to compare commits because the base of the pull request did not upload a coverage report.

Changes found in between 1a1f28c...9e6a9e6 (pseudo...base) which prevent comparing this pull request.


@@ -7,24 +7,18 @@
Loading
7 7
8 8
"""
9 9
import copy
10 +
import random
10 11
from itertools import product, count
12 +
from multiprocessing import Pool, cpu_count
11 13
import pandas as pd
12 14
from tqdm import tqdm
13 -
14 -
import random
15 -
16 -
try:
17 -
    from pathos.multiprocessing import ProcessPool
18 -
except ImportError:
19 -
    pathos_support = False
20 -
else:
21 -
    pathos_support = True
15 +
from collections import OrderedDict
22 16
23 17
24 18
class ParameterError(TypeError):
25 19
    MESSAGE = (
26 -
        "parameters must map a name to a value. "
27 -
        "These names did not match paramerets: {}"
20 +
        "Parameters must map a name to a value. "
21 +
        "These names did not match parameters: {}"
28 22
    )
29 23
30 24
    def __init__(self, bad_names):
@@ -36,7 +30,7 @@
Loading
36 30
37 31
class VariableParameterError(ParameterError):
38 32
    MESSAGE = (
39 -
        "variable_parameters must map a name to a sequence of values. "
33 +
        "Variable_parameters must map a name to a sequence of values. "
40 34
        "These parameters were given with non-sequence values: {}"
41 35
    )
42 36
@@ -57,23 +51,23 @@
Loading
57 51
    """
58 52
59 53
    def __init__(
60 -
        self,
61 -
        model_cls,
62 -
        parameters_list=None,
63 -
        fixed_parameters=None,
64 -
        iterations=1,
65 -
        max_steps=1000,
66 -
        model_reporters=None,
67 -
        agent_reporters=None,
68 -
        display_progress=True,
54 +
            self,
55 +
            model_cls,
56 +
            parameters_list=None,
57 +
            fixed_parameters=None,
58 +
            iterations=1,
59 +
            max_steps=1000,
60 +
            model_reporters=None,
61 +
            agent_reporters=None,
62 +
            display_progress=True,
69 63
    ):
70 64
        """ Create a new BatchRunner for a given model with the given
71 65
        parameters.
72 66
73 67
        Args:
74 68
            model_cls: The class of model to batch-run.
75 69
            parameters_list: A list of dictionaries of parameter sets.
76 -
                The model will be run with dictionary of paramters.
70 +
                The model will be run with dictionary of parameters.
77 71
                For example, given parameters_list of
78 72
                    [{"homophily": 3, "density": 0.8, "minority_pc": 0.2},
79 73
                    {"homophily": 2, "density": 0.9, "minority_pc": 0.1},
@@ -116,8 +110,13 @@
Loading
116 110
        if self.agent_reporters:
117 111
            self.agent_vars = {}
118 112
113 +
        # Make Compatible with Python 3.5
114 +
        self.datacollector_model_reporters = OrderedDict()
115 +
        self.datacollector_agent_reporters = OrderedDict()
116 +
119 117
        self.display_progress = display_progress
120 118
119 +
    @property
121 120
    def _make_model_args(self):
122 121
        """Prepare all combinations of parameter values for `run_all`
123 122
@@ -129,27 +128,57 @@
Loading
129 128
        all_kwargs = []
130 129
        all_param_values = []
131 130
132 -
        count = len(self.parameters_list)
133 -
        if count:
131 +
        _count = len(self.parameters_list)
132 +
        if _count:
134 133
            for params in self.parameters_list:
135 134
                kwargs = params.copy()
136 135
                kwargs.update(self.fixed_parameters)
137 136
                all_kwargs.append(kwargs)
138 -
                all_param_values.append(params.values())
137 +
                all_param_values.append(list(params.values()))
138 +
139 139
        elif len(self.fixed_parameters):
140 -
            count = 1
140 +
            _count = 1
141 141
            kwargs = self.fixed_parameters.copy()
142 142
            all_kwargs.append(kwargs)
143 -
            all_param_values.append(kwargs.values())
143 +
            all_param_values.append(list(kwargs.values()))
144 +
145 +
        total_iterations *= _count
146 +
147 +
        return total_iterations, all_kwargs, all_param_values
144 148
145 -
        total_iterations *= count
149 +
    def _make_model_args_mp(self):
150 +
        """Prepare all combinations of parameter values for `run_all`
151 +
        Due to multiprocessing requirements of @StaticMethod takes different input, hence the similar function
152 +
        Returns:
153 +
            List of list with the form:
154 +
            [[model_object, dictionary_of_kwargs, max_steps, iterations]]
155 +
        """
156 +
        total_iterations = self.iterations
157 +
        all_kwargs = []
146 158
147 -
        return (total_iterations, all_kwargs, all_param_values)
159 +
        _count = len(self.parameters_list)
160 +
        if _count:
161 +
            for params in self.parameters_list:
162 +
                kwargs = params.copy()
163 +
                kwargs.update(self.fixed_parameters)
164 +
                # run each iterations specific number of times
165 +
                for _iter in range(self.iterations):
166 +
                    kwargs_repeated = kwargs.copy()
167 +
                    all_kwargs.append([self.model_cls, kwargs_repeated, self.max_steps, _iter])
168 +
169 +
        elif len(self.fixed_parameters):
170 +
            _count = 1
171 +
            kwargs = self.fixed_parameters.copy()
172 +
            all_kwargs.append(kwargs)
173 +
174 +
        total_iterations *= _count
175 +
176 +
        return all_kwargs, total_iterations
148 177
149 178
    def run_all(self):
150 179
        """ Run the model at all parameter combinations and store results. """
151 180
        run_count = count()
152 -
        total_iterations, all_kwargs, all_param_values = self._make_model_args()
181 +
        total_iterations, all_kwargs, all_param_values = self._make_model_args
153 182
154 183
        with tqdm(total_iterations, disable=not self.display_progress) as pbar:
155 184
            for i, kwargs in enumerate(all_kwargs):
@@ -159,11 +188,9 @@
Loading
159 188
                    pbar.update()
160 189
161 190
    def run_iteration(self, kwargs, param_values, run_count):
162 -
        kwargscopy = copy.deepcopy(kwargs)
163 -
        model = self.model_cls(**kwargscopy)
164 -
        self.run_model(model)
165 -
166 -
        # Collect and store results:
191 +
        kwargs_copy = copy.deepcopy(kwargs)
192 +
        model = self.model_cls(**kwargs_copy)
193 +
        results = self.run_model(model)
167 194
        if param_values is not None:
168 195
            model_key = tuple(param_values) + (run_count,)
169 196
        else:
@@ -176,7 +203,59 @@
Loading
176 203
            for agent_id, reports in agent_vars.items():
177 204
                agent_key = model_key + (agent_id,)
178 205
                self.agent_vars[agent_key] = reports
179 -
        return (getattr(self, "model_vars", None), getattr(self, "agent_vars", None))
206 +
        # Collects data from datacollector object in model
207 +
        if results is not None:
208 +
            if results.model_reporters is not None:
209 +
                self.datacollector_model_reporters[model_key] = results.get_model_vars_dataframe()
210 +
            if results.agent_reporters is not None:
211 +
                self.datacollector_agent_reporters[model_key] = results.get_agent_vars_dataframe()
212 +
213 +
        return (getattr(self, "model_vars", None), getattr(self, "agent_vars", None),
214 +
                getattr(self, "datacollector_model_reporters", None),
215 +
                getattr(self, "datacollector_agent_reporters", None))
216 +
217 +
    @staticmethod
218 +
    def run_wrappermp(iter_args):
219 +
        """
220 +
        Based on requirement of Python multiprocessing requires @staticmethod decorator;
221 +
        this is primarily to ensure functionality on Windows OS and doe not impact MAC or Linux distros
222 +
223 +
        :param iter_args: List of arguments for model run
224 +
            iter_args[0] = model object
225 +
            iter_args[1] = key word arguments needed for model object
226 +
            iter_args[2] = maximum number of steps for model
227 +
            iter_args[3] = number of time to run model for stochastic/random variation with same parameters
228 +
        :return:
229 +
            tuple of param values which serves as a unique key for model results
230 +
            model object
231 +
        """
232 +
233 +
        _model_i = iter_args[0]
234 +
        _kwargs = iter_args[1]
235 +
        _max_steps = iter_args[2]
236 +
        _iteration = iter_args[3]
237 +
238 +
        def run_iteration_mp(_model_i, _kwargs, _max_steps, _iteration):
239 +
            """
240 +
            :type _model_i: object of model
241 +
            :type _kwargs: dict of keyword argument dictionary for model
242 +
            :type _max_steps: int of number of steps
243 +
            :type _iteration: int of number of iterations
244 +
            """
245 +
            # instantiate version of model with correct parameters
246 +
            model = _model_i(**_kwargs)
247 +
            while model.running and model.schedule.steps < _max_steps:
248 +
                model.step()
249 +
250 +
            # add iteration number to dictionary to make unique_key
251 +
            _kwargs["iteration"] = _iteration
252 +
253 +
            # convert kwargs dict to tuple to  make consistent
254 +
            _param_values = tuple(_kwargs.values())
255 +
256 +
            return _param_values, model
257 +
258 +
        return run_iteration_mp(_model_i, _kwargs, _max_steps, _iteration)
180 259
181 260
    def run_model(self, model):
182 261
        """ Run a model object to completion, or until reaching max steps.
@@ -188,18 +267,24 @@
Loading
188 267
        while model.running and model.schedule.steps < self.max_steps:
189 268
            model.step()
190 269
270 +
        if hasattr(model, "datacollector"):
271 +
            return model.datacollector
272 +
        else:
273 +
            return None
274 +
191 275
    def collect_model_vars(self, model):
192 276
        """ Run reporters and collect model-level variables. """
193 -
        model_vars = {}
277 +
        model_vars = OrderedDict()
194 278
        for var, reporter in self.model_reporters.items():
195 279
            model_vars[var] = reporter(model)
280 +
196 281
        return model_vars
197 282
198 283
    def collect_agent_vars(self, model):
199 284
        """ Run reporters and collect agent-level variables. """
200 -
        agent_vars = {}
285 +
        agent_vars = OrderedDict()
201 286
        for agent in model.schedule._agents.values():
202 -
            agent_record = {}
287 +
            agent_record = OrderedDict()
203 288
            for var, reporter in self.agent_reporters.items():
204 289
                agent_record[var] = getattr(agent, reporter)
205 290
            agent_vars[agent.unique_id] = agent_record
@@ -208,17 +293,32 @@
Loading
208 293
    def get_model_vars_dataframe(self):
209 294
        """ Generate a pandas DataFrame from the model-level variables
210 295
        collected.
211 -
212 296
        """
297 +
213 298
        return self._prepare_report_table(self.model_vars)
214 299
215 300
    def get_agent_vars_dataframe(self):
216 301
        """ Generate a pandas DataFrame from the agent-level variables
217 302
        collected.
218 -
219 303
        """
304 +
220 305
        return self._prepare_report_table(self.agent_vars, extra_cols=["AgentId"])
221 306
307 +
    def get_collector_model(self):
308 +
        """
309 +
        Passes pandas dataframes from datacollector module in dictionary format of model reporters
310 +
        :return: dict {(Param1, Param2,...,iteration): <DataCollector Pandas DataFrame>}
311 +
        """
312 +
313 +
        return self.datacollector_model_reporters
314 +
315 +
    def get_collector_agents(self):
316 +
        """
317 +
                Passes pandas dataframes from datacollector module in dictionary format of agent reporters
318 +
                :return: dict {(Param1, Param2,...,iteration): <DataCollector Pandas DataFrame>}
319 +
                """
320 +
        return self.datacollector_agent_reporters
321 +
222 322
    def _prepare_report_table(self, vars_dict, extra_cols=None):
223 323
        """
224 324
        Creates a dataframe from collected records and sorts it using 'Run'
@@ -245,12 +345,11 @@
Loading
245 345
                val = self.fixed_parameters[param]
246 346
247 347
                # avoid error when val is an iterable
248 -
                vallist = [val for i in range(ordered.shape[0])]
249 -
                ordered[param] = vallist
348 +
                _vallist = [val for i in range(ordered.shape[0])]
349 +
                ordered[param] = _vallist
250 350
        return ordered
251 351
252 352
253 -
# This is kind of a useless class, but it does carry the 'source' parameters with it
254 353
class ParameterProduct:
255 354
    def __init__(self, variable_parameters):
256 355
        self.param_names, self.param_lists = zip(
@@ -296,6 +395,7 @@
Loading
296 395
        raise StopIteration()
297 396
298 397
398 +
# TODO: No difference- deleting will remove whitespace issue
299 399
class BatchRunner(FixedBatchRunner):
300 400
    """ This class is instantiated with a model class, and model parameters
301 401
    associated with one or more values. It is also instantiated with model and
@@ -310,15 +410,15 @@
Loading
310 410
    """
311 411
312 412
    def __init__(
313 -
        self,
314 -
        model_cls,
315 -
        variable_parameters=None,
316 -
        fixed_parameters=None,
317 -
        iterations=1,
318 -
        max_steps=1000,
319 -
        model_reporters=None,
320 -
        agent_reporters=None,
321 -
        display_progress=True,
413 +
            self,
414 +
            model_cls,
415 +
            variable_parameters=None,
416 +
            fixed_parameters=None,
417 +
            iterations=1,
418 +
            max_steps=1000,
419 +
            model_reporters=None,
420 +
            agent_reporters=None,
421 +
            display_progress=True,
322 422
    ):
323 423
        """ Create a new BatchRunner for a given model with the given
324 424
        parameters.
@@ -348,7 +448,7 @@
Loading
348 448
            agent_reporters: Like model_reporters, but each variable is now
349 449
                collected at the level of each agent present in the model at
350 450
                the end of the run.
351 -
            display_progress: Display progresss bar with time estimation?
451 +
            display_progress: Display progress bar with time estimation?
352 452
353 453
        """
354 454
        super().__init__(
@@ -363,67 +463,87 @@
Loading
363 463
        )
364 464
365 465
366 -
class MPSupport(Exception):
367 -
    def __str__(self):
368 -
        return (
369 -
            "BatchRunnerMP depends on pathos, which is either not "
370 -
            "installed, or the path can not be found. "
371 -
        )
372 -
373 -
374 466
class BatchRunnerMP(BatchRunner):
375 467
    """ Child class of BatchRunner, extended with multiprocessing support. """
376 468
377 -
    def __init__(self, model_cls, nr_processes=2, **kwargs):
469 +
    def __init__(self, model_cls, nr_processes=None, **kwargs):
378 470
        """ Create a new BatchRunnerMP for a given model with the given
379 471
        parameters.
380 472
381 -
        Args:
382 -
            model_cls: The class of model to batch-run.
383 -
            nr_processes: the number of separate processes the BatchRunner
384 -
                should start, all running in parallel.
385 -
            kwargs: the kwargs required for the parent BatchRunner class
473 +
        model_cls: The class of model to batch-run.
474 +
        nr_processes: int
475 +
                      the number of separate processes the BatchRunner
476 +
                      should start, all running in parallel.
477 +
        kwargs: the kwargs required for the parent BatchRunner class
386 478
        """
387 -
        if not pathos_support:
388 -
            raise MPSupport
479 +
        if nr_processes is None:
480 +
            # identify the number of processors available on users machine
481 +
            available_processors = cpu_count()
482 +
            self.processes = available_processors
483 +
            print("BatchRunner MP will use {} processors.".format(self.processes))
484 +
        else:
485 +
            self.processes = nr_processes
486 +
389 487
        super().__init__(model_cls, **kwargs)
390 -
        self.pool = ProcessPool(nodes=nr_processes)
488 +
        self.pool = Pool(self.processes)
489 +
490 +
    def result_prep_mp(self, results):
491 +
        """
492 +
        Helper Function
493 +
        :param results: Takes results dictionary from Processpool and single processor debug run and fixes format to
494 +
        make compatible with BatchRunner Output
495 +
        :updates model_vars and agents_vars so consistent across all batchrunner
496 +
        """
497 +
        # Take results and convert to dictionary so dataframe can be called
498 +
        for model_key, model in results.items():
499 +
            if self.model_reporters:
500 +
                self.model_vars[model_key] = self.collect_model_vars(model)
501 +
            if self.agent_reporters:
502 +
                agent_vars = self.collect_agent_vars(model)
503 +
                for agent_id, reports in agent_vars.items():
504 +
                    agent_key = model_key + (agent_id,)
505 +
                    self.agent_vars[agent_key] = reports
506 +
            if hasattr(model, "datacollector"):
507 +
                if model.datacollector.model_reporters is not None:
508 +
                    self.datacollector_model_reporters[model_key] = model.datacollector.get_model_vars_dataframe()
509 +
                if model.datacollector.agent_reporters is not None:
510 +
                    self.datacollector_agent_reporters[model_key] = model.datacollector.get_agent_vars_dataframe()
511 +
512 +
        # Make results consistent
513 +
        if len(self.datacollector_model_reporters.keys()) == 0:
514 +
            self.datacollector_model_reporters = None
515 +
        if len(self.datacollector_agent_reporters.keys()) == 0:
516 +
            self.datacollector_agent_reporters = None
391 517
392 518
    def run_all(self):
393 519
        """
394 520
        Run the model at all parameter combinations and store results,
395 521
        overrides run_all from BatchRunner.
396 522
        """
397 -
        run_count = count()
398 -
        total_iterations, all_kwargs, all_param_values = self._make_model_args()
399 523
524 +
        run_iter_args, total_iterations = self._make_model_args_mp()
400 525
        # register the process pool and init a queue
401 -
        job_queue = []
402 -
        with tqdm(total_iterations, disable=not self.display_progress) as pbar:
403 -
            for i, kwargs in enumerate(all_kwargs):
404 -
                param_values = all_param_values[i]
405 -
                for _ in range(self.iterations):
406 -
                    # make a new process and add it to the queue
407 -
                    job_queue.append(
408 -
                        self.pool.uimap(
409 -
                            self.run_iteration,
410 -
                            (kwargs,),
411 -
                            (param_values,),
412 -
                            (next(run_count),),
413 -
                        )
414 -
                    )
415 -
            # empty the queue
416 -
            results = []
417 -
            for task in job_queue:
418 -
                for model_vars, agent_vars in list(task):
419 -
                    results.append((model_vars, agent_vars))
420 -
                pbar.update()
421 -
422 -
            # store the results
423 -
            for model_vars, agent_vars in results:
424 -
                if self.model_reporters:
425 -
                    for model_key, model_val in model_vars.items():
426 -
                        self.model_vars[model_key] = model_val
427 -
                if self.agent_reporters:
428 -
                    for agent_key, reports in agent_vars.items():
429 -
                        self.agent_vars[agent_key] = reports
526 +
        # store results in ordered dictionary
527 +
        results = {}
528 +
529 +
        if self.processes > 1:
530 +
            with tqdm(total_iterations, disable=not self.display_progress) as pbar:
531 +
                for params, model in self.pool.imap_unordered(self.run_wrappermp, run_iter_args):
532 +
                    results[params] = model
533 +
                    pbar.update()
534 +
535 +
                self.result_prep_mp(results)
536 +
        # For debugging model due to difficulty of getting errors during multiprocessing
537 +
        else:
538 +
            for run in run_iter_args:
539 +
                params, model_data = self.run_wrappermp(run)
540 +
                results[params] = model_data
541 +
542 +
            self.result_prep_mp(results)
543 +
544 +
        # Close multi-processing
545 +
        self.pool.close()
546 +
547 +
        return (getattr(self, "model_vars", None), getattr(self, "agent_vars", None),
548 +
                getattr(self, "datacollector_model_reporters", None),
549 +
                getattr(self, "datacollector_agent_reporters", None))

@@ -40,6 +40,7 @@
Loading
40 40
import itertools
41 41
from operator import attrgetter
42 42
import pandas as pd
43 +
import types
43 44
44 45
45 46
class DataCollector:
@@ -57,7 +58,6 @@
Loading
57 58
58 59
    def __init__(self, model_reporters=None, agent_reporters=None, tables=None):
59 60
        """ Instantiate a DataCollector with lists of model and agent reporters.
60 -
61 61
        Both model_reporters and agent_reporters accept a dictionary mapping a
62 62
        variable name to either an attribute name, or a method.
63 63
        For example, if there was only one model-level reporter for number of
@@ -84,6 +84,17 @@
Loading
84 84
            If you want to pickle your model you must not use lambda functions.
85 85
            If your model includes a large number of agents, you should *only*
86 86
            use attribute names for the agent reporter, it will be much faster.
87 +
88 +
            Model reporters can take three types of arguments:
89 +
            lambda like above:
90 +
            {"agent_count": lambda m: m.schedule.get_agent_count() }
91 +
            method with @property decorators
92 +
            {"agent_count": schedule.get_agent_count()
93 +
            class attributes of model
94 +
            {"model_attribute": "model_attribute"}
95 +
            functions with paramters that have placed in a list
96 +
            {"Model_Function":[function, [param_1, param_2]]}
97 +
87 98
        """
88 99
        self.model_reporters = {}
89 100
        self.agent_reporters = {}
@@ -153,18 +164,36 @@
Loading
153 164
        else:
154 165
155 166
            def get_reports(agent):
156 -
                prefix = (agent.model.schedule.steps, agent.unique_id)
167 +
                _prefix = (agent.model.schedule.steps, agent.unique_id)
157 168
                reports = tuple(rep(agent) for rep in rep_funcs)
158 -
                return prefix + reports
169 +
                return _prefix + reports
159 170
160 171
        agent_records = map(get_reports, model.schedule.agents)
161 172
        return agent_records
162 173
163 174
    def collect(self, model):
164 175
        """ Collect all the data for the given model object. """
165 176
        if self.model_reporters:
177 +
166 178
            for var, reporter in self.model_reporters.items():
167 -
                self.model_vars[var].append(reporter(model))
179 +
                # Check if Lambda operator
180 +
                if isinstance(reporter, types.LambdaType):
181 +
                    self.model_vars[var].append(reporter(model))
182 +
                # Check if model attribute
183 +
                elif isinstance(reporter, partial):
184 +
                    self.model_vars[var].append(reporter(model))
185 +
                # Check if function with arguments
186 +
                elif isinstance(reporter, list):
187 +
                    self.model_vars[var].append(reporter[0](*reporter[1]))
188 +
                else:
189 +
                    try:
190 +
                        self.model_vars[var].append(reporter)
191 +
                    except TypeError:
192 +
                        print("Reporters must be in dictionary in one of the following forms: /n\
193 +
                        - key_name: <class attribute> /n \
194 +
                        - key_name: function (e.g schedule.get_agent_count) /n \
195 +
                        - key_name: lambda function /n \
196 +
                        - key_name: [function, [<arguments for function>]")
168 197
169 198
        if self.agent_reporters:
170 199
            agent_records = self._record_agents(model)
@@ -192,9 +221,9 @@
Loading
192 221
                raise Exception("Could not insert row with missing column")
193 222
194 223
    @staticmethod
195 -
    def _getattr(name, object):
224 +
    def _getattr(name, _object):
196 225
        """ Turn around arguments of getattr to make it partially callable."""
197 -
        return getattr(object, name, None)
226 +
        return getattr(_object, name, None)
198 227
199 228
    def get_model_vars_dataframe(self):
200 229
        """ Create a pandas DataFrame from the model variables.

Unable to process changes.

No base report to compare against.

50 Commits

Hiding 1 contexual commits
+65
+59
+9
-3
Hiding 7 contexual commits
-64
-70
-9
+15
Hiding 8 contexual commits
+64
+70
+9
-15
Hiding 6 contexual commits
+3 Files
+49
+48
+1
Hiding 1 contexual commits
+9
-2
-7
Hiding 2 contexual commits Hiding 1 contexual commits
Hiding 7 contexual commits
+1
+1
Hiding 4 contexual commits
-3 Files
-50
-58
+2
+6
Pull Request Base Commit
Files Coverage
mesa 86.41%
Project Totals (20 files) 86.41%
Loading