1
# -*- coding: utf-8 -*-
2 6
"""
3
Batchrunner
4
===========
5

6
A single class to manage a batch run or parameter sweep of a given model.
7

8
"""
9 6
import copy
10 6
import random
11 6
from itertools import product, count
12 6
from multiprocessing import Pool, cpu_count
13 6
import pandas as pd
14 6
from tqdm import tqdm
15 6
from collections import OrderedDict
16

17

18 6
class ParameterError(TypeError):
19 6
    MESSAGE = (
20
        "Parameters must map a name to a value. "
21
        "These names did not match parameters: {}"
22
    )
23

24 6
    def __init__(self, bad_names):
25 0
        self.bad_names = bad_names
26

27 6
    def __str__(self):
28 0
        return self.MESSAGE.format(self.bad_names)
29

30

31 6
class VariableParameterError(ParameterError):
32 6
    MESSAGE = (
33
        "Variable_parameters must map a name to a sequence of values. "
34
        "These parameters were given with non-sequence values: {}"
35
    )
36

37 6
    def __init__(self, bad_names):
38 0
        super().__init__(bad_names)
39

40

41 6
class FixedBatchRunner:
42
    """ This class is instantiated with a model class, and model parameters
43
    associated with one or more values. It is also instantiated with model and
44
    agent-level reporters, dictionaries mapping a variable name to a function
45
    which collects some data from the model or its agents at the end of the run
46
    and stores it.
47

48
    Note that by default, the reporters only collect data at the *end* of the
49
    run. To get step by step data, simply have a reporter store the model's
50
    entire DataCollector object.
51
    """
52

53 6
    def __init__(
54
            self,
55
            model_cls,
56
            parameters_list=None,
57
            fixed_parameters=None,
58
            iterations=1,
59
            max_steps=1000,
60
            model_reporters=None,
61
            agent_reporters=None,
62
            display_progress=True,
63
    ):
64
        """ Create a new BatchRunner for a given model with the given
65
        parameters.
66

67
        Args:
68
            model_cls: The class of model to batch-run.
69
            parameters_list: A list of dictionaries of parameter sets.
70
                The model will be run with dictionary of parameters.
71
                For example, given parameters_list of
72
                    [{"homophily": 3, "density": 0.8, "minority_pc": 0.2},
73
                    {"homophily": 2, "density": 0.9, "minority_pc": 0.1},
74
                    {"homophily": 4, "density": 0.6, "minority_pc": 0.5}]
75
                3 models will be run, one for each provided set of parameters.
76
            fixed_parameters: Dictionary of parameters that stay same through
77
                all batch runs. For example, given fixed_parameters of
78
                    {"constant_parameter": 3},
79
                every instantiated model will be passed constant_parameter=3
80
                as a kwarg.
81
            iterations: The total number of times to run the model for each set
82
                of parameters.
83
            max_steps: Upper limit of steps above which each run will be halted
84
                if it hasn't halted on its own.
85
            model_reporters: The dictionary of variables to collect on each run
86
                at the end, with variable names mapped to a function to collect
87
                them. For example:
88
                    {"agent_count": lambda m: m.schedule.get_agent_count()}
89
            agent_reporters: Like model_reporters, but each variable is now
90
                collected at the level of each agent present in the model at
91
                the end of the run.
92
            display_progress: Display progresss bar with time estimation?
93

94
        """
95 6
        self.model_cls = model_cls
96 6
        if parameters_list is None:
97 6
            parameters_list = []
98 6
        self.parameters_list = list(parameters_list)
99 6
        self.fixed_parameters = fixed_parameters or {}
100 6
        self._include_fixed = len(self.fixed_parameters.keys()) > 0
101 6
        self.iterations = iterations
102 6
        self.max_steps = max_steps
103

104 6
        self.model_reporters = model_reporters
105 6
        self.agent_reporters = agent_reporters
106

107 6
        if self.model_reporters:
108 6
            self.model_vars = {}
109

110 6
        if self.agent_reporters:
111 6
            self.agent_vars = {}
112

113
        # Make Compatible with Python 3.5
114 6
        self.datacollector_model_reporters = OrderedDict()
115 6
        self.datacollector_agent_reporters = OrderedDict()
116

117 6
        self.display_progress = display_progress
118

119 6
    @property
120 2
    def _make_model_args(self):
121
        """Prepare all combinations of parameter values for `run_all`
122

123
        Returns:
124
            Tuple with the form:
125
            (total_iterations, all_kwargs, all_param_values)
126
        """
127 6
        total_iterations = self.iterations
128 6
        all_kwargs = []
129 6
        all_param_values = []
130

131 6
        _count = len(self.parameters_list)
132 6
        if _count:
133 6
            for params in self.parameters_list:
134 6
                kwargs = params.copy()
135 6
                kwargs.update(self.fixed_parameters)
136 6
                all_kwargs.append(kwargs)
137 6
                all_param_values.append(list(params.values()))
138

139 6
        elif len(self.fixed_parameters):
140 6
            _count = 1
141 6
            kwargs = self.fixed_parameters.copy()
142 6
            all_kwargs.append(kwargs)
143 6
            all_param_values.append(list(kwargs.values()))
144

145 6
        total_iterations *= _count
146

147 6
        return total_iterations, all_kwargs, all_param_values
148

149 6
    def _make_model_args_mp(self):
150
        """Prepare all combinations of parameter values for `run_all`
151
        Due to multiprocessing requirements of @StaticMethod takes different input, hence the similar function
152
        Returns:
153
            List of list with the form:
154
            [[model_object, dictionary_of_kwargs, max_steps, iterations]]
155
        """
156 6
        total_iterations = self.iterations
157 6
        all_kwargs = []
158

159 6
        _count = len(self.parameters_list)
160 6
        if _count:
161 6
            for params in self.parameters_list:
162 6
                kwargs = params.copy()
163 6
                kwargs.update(self.fixed_parameters)
164
                # run each iterations specific number of times
165 6
                for _iter in range(self.iterations):
166 6
                    kwargs_repeated = kwargs.copy()
167 6
                    all_kwargs.append([self.model_cls, kwargs_repeated, self.max_steps, _iter])
168

169 6
        elif len(self.fixed_parameters):
170 0
            _count = 1
171 0
            kwargs = self.fixed_parameters.copy()
172 0
            all_kwargs.append(kwargs)
173

174 6
        total_iterations *= _count
175

176 6
        return all_kwargs, total_iterations
177

178 6
    def run_all(self):
179
        """ Run the model at all parameter combinations and store results. """
180 6
        run_count = count()
181 6
        total_iterations, all_kwargs, all_param_values = self._make_model_args
182

183 6
        with tqdm(total_iterations, disable=not self.display_progress) as pbar:
184 6
            for i, kwargs in enumerate(all_kwargs):
185 6
                param_values = all_param_values[i]
186 6
                for _ in range(self.iterations):
187 6
                    self.run_iteration(kwargs, param_values, next(run_count))
188 6
                    pbar.update()
189

190 6
    def run_iteration(self, kwargs, param_values, run_count):
191 6
        kwargs_copy = copy.deepcopy(kwargs)
192 6
        model = self.model_cls(**kwargs_copy)
193 6
        results = self.run_model(model)
194 6
        if param_values is not None:
195 6
            model_key = tuple(param_values) + (run_count,)
196
        else:
197 0
            model_key = (run_count,)
198

199 6
        if self.model_reporters:
200 6
            self.model_vars[model_key] = self.collect_model_vars(model)
201 6
        if self.agent_reporters:
202 6
            agent_vars = self.collect_agent_vars(model)
203 6
            for agent_id, reports in agent_vars.items():
204 6
                agent_key = model_key + (agent_id,)
205 6
                self.agent_vars[agent_key] = reports
206
        # Collects data from datacollector object in model
207 6
        if results is not None:
208 6
            if results.model_reporters is not None:
209 6
                self.datacollector_model_reporters[model_key] = results.get_model_vars_dataframe()
210 6
            if results.agent_reporters is not None:
211 6
                self.datacollector_agent_reporters[model_key] = results.get_agent_vars_dataframe()
212

213 6
        return (getattr(self, "model_vars", None), getattr(self, "agent_vars", None),
214
                getattr(self, "datacollector_model_reporters", None),
215
                getattr(self, "datacollector_agent_reporters", None))
216

217 6
    @staticmethod
218 2
    def run_wrappermp(iter_args):
219
        """
220
        Based on requirement of Python multiprocessing requires @staticmethod decorator;
221
        this is primarily to ensure functionality on Windows OS and doe not impact MAC or Linux distros
222

223
        :param iter_args: List of arguments for model run
224
            iter_args[0] = model object
225
            iter_args[1] = key word arguments needed for model object
226
            iter_args[2] = maximum number of steps for model
227
            iter_args[3] = number of time to run model for stochastic/random variation with same parameters
228
        :return:
229
            tuple of param values which serves as a unique key for model results
230
            model object
231
        """
232

233 6
        _model_i = iter_args[0]
234 6
        _kwargs = iter_args[1]
235 6
        _max_steps = iter_args[2]
236 6
        _iteration = iter_args[3]
237

238 6
        def run_iteration_mp(_model_i, _kwargs, _max_steps, _iteration):
239
            """
240
            :type _model_i: object of model
241
            :type _kwargs: dict of keyword argument dictionary for model
242
            :type _max_steps: int of number of steps
243
            :type _iteration: int of number of iterations
244
            """
245
            # instantiate version of model with correct parameters
246 6
            model = _model_i(**_kwargs)
247 6
            while model.running and model.schedule.steps < _max_steps:
248 6
                model.step()
249

250
            # add iteration number to dictionary to make unique_key
251 6
            _kwargs["iteration"] = _iteration
252

253
            # convert kwargs dict to tuple to  make consistent
254 6
            _param_values = tuple(_kwargs.values())
255

256 6
            return _param_values, model
257

258 6
        return run_iteration_mp(_model_i, _kwargs, _max_steps, _iteration)
259

260 6
    def run_model(self, model):
261
        """ Run a model object to completion, or until reaching max steps.
262

263
        If your model runs in a non-standard way, this is the method to modify
264
        in your subclass.
265

266
        """
267 6
        while model.running and model.schedule.steps < self.max_steps:
268 6
            model.step()
269

270 6
        if hasattr(model, "datacollector"):
271 6
            return model.datacollector
272
        else:
273 6
            return None
274

275 6
    def collect_model_vars(self, model):
276
        """ Run reporters and collect model-level variables. """
277 6
        model_vars = OrderedDict()
278 6
        for var, reporter in self.model_reporters.items():
279 6
            model_vars[var] = reporter(model)
280

281 6
        return model_vars
282

283 6
    def collect_agent_vars(self, model):
284
        """ Run reporters and collect agent-level variables. """
285 6
        agent_vars = OrderedDict()
286 6
        for agent in model.schedule._agents.values():
287 6
            agent_record = OrderedDict()
288 6
            for var, reporter in self.agent_reporters.items():
289 6
                agent_record[var] = getattr(agent, reporter)
290 6
            agent_vars[agent.unique_id] = agent_record
291 6
        return agent_vars
292

293 6
    def get_model_vars_dataframe(self):
294
        """ Generate a pandas DataFrame from the model-level variables
295
        collected.
296
        """
297

298 6
        return self._prepare_report_table(self.model_vars)
299

300 6
    def get_agent_vars_dataframe(self):
301
        """ Generate a pandas DataFrame from the agent-level variables
302
        collected.
303
        """
304

305 6
        return self._prepare_report_table(self.agent_vars, extra_cols=["AgentId"])
306

307 6
    def get_collector_model(self):
308
        """
309
        Passes pandas dataframes from datacollector module in dictionary format of model reporters
310
        :return: dict {(Param1, Param2,...,iteration): <DataCollector Pandas DataFrame>}
311
        """
312

313 6
        return self.datacollector_model_reporters
314

315 6
    def get_collector_agents(self):
316
        """
317
                Passes pandas dataframes from datacollector module in dictionary format of agent reporters
318
                :return: dict {(Param1, Param2,...,iteration): <DataCollector Pandas DataFrame>}
319
                """
320 6
        return self.datacollector_agent_reporters
321

322 6
    def _prepare_report_table(self, vars_dict, extra_cols=None):
323
        """
324
        Creates a dataframe from collected records and sorts it using 'Run'
325
        column as a key.
326
        """
327 6
        extra_cols = ["Run"] + (extra_cols or [])
328 6
        index_cols = set()
329 6
        for params in self.parameters_list:
330 6
            index_cols |= params.keys()
331 6
        index_cols = list(index_cols) + extra_cols
332

333 6
        records = []
334 6
        for param_key, values in vars_dict.items():
335 6
            record = dict(zip(index_cols, param_key))
336 6
            record.update(values)
337 6
            records.append(record)
338

339 6
        df = pd.DataFrame(records)
340 6
        rest_cols = set(df.columns) - set(index_cols)
341 6
        ordered = df[index_cols + list(sorted(rest_cols))]
342 6
        ordered.sort_values(by="Run", inplace=True)
343 6
        if self._include_fixed:
344 6
            for param in self.fixed_parameters.keys():
345 6
                val = self.fixed_parameters[param]
346

347
                # avoid error when val is an iterable
348 6
                _vallist = [val for i in range(ordered.shape[0])]
349 6
                ordered[param] = _vallist
350 6
        return ordered
351

352

353 6
class ParameterProduct:
354 6
    def __init__(self, variable_parameters):
355 6
        self.param_names, self.param_lists = zip(
356
            *(copy.deepcopy(variable_parameters)).items()
357
        )
358 6
        self._product = product(*self.param_lists)
359

360 6
    def __iter__(self):
361 6
        return self
362

363 6
    def __next__(self):
364 6
        return dict(zip(self.param_names, next(self._product)))
365

366

367
# Roughly inspired by sklearn.model_selection.ParameterSampler.  Does not handle
368
# distributions, only lists.
369 6
class ParameterSampler:
370 6
    def __init__(self, parameter_lists, n, random_state=None):
371 6
        self.param_names, self.param_lists = zip(
372
            *(copy.deepcopy(parameter_lists)).items()
373
        )
374 6
        self.n = n
375 6
        if random_state is None:
376 0
            self.random_state = random.Random()
377 6
        elif isinstance(random_state, int):
378 6
            self.random_state = random.Random(random_state)
379
        else:
380 0
            self.random_state = random_state
381 6
        self.count = 0
382

383 6
    def __iter__(self):
384 6
        return self
385

386 6
    def __next__(self):
387 6
        self.count += 1
388 6
        if self.count <= self.n:
389 6
            return dict(
390
                zip(
391
                    self.param_names,
392
                    [self.random_state.choice(p_list) for p_list in self.param_lists],
393
                )
394
            )
395 6
        raise StopIteration()
396

397

398 6
class BatchRunner(FixedBatchRunner):
399
    """ This class is instantiated with a model class, and model parameters
400
    associated with one or more values. It is also instantiated with model and
401
    agent-level reporters, dictionaries mapping a variable name to a function
402
    which collects some data from the model or its agents at the end of the run
403
    and stores it.
404

405
    Note that by default, the reporters only collect data at the *end* of the
406
    run. To get step by step data, simply have a reporter store the model's
407
    entire DataCollector object.
408

409
    """
410

411 6
    def __init__(
412
            self,
413
            model_cls,
414
            variable_parameters=None,
415
            fixed_parameters=None,
416
            iterations=1,
417
            max_steps=1000,
418
            model_reporters=None,
419
            agent_reporters=None,
420
            display_progress=True,
421
    ):
422
        """ Create a new BatchRunner for a given model with the given
423
        parameters.
424

425
        Args:
426
            model_cls: The class of model to batch-run.
427
            variable_parameters: Dictionary of parameters to lists of values.
428
                The model will be run with every combo of these paramters.
429
                For example, given variable_parameters of
430
                    {"param_1": range(5),
431
                     "param_2": [1, 5, 10]}
432
                models will be run with {param_1=1, param_2=1},
433
                    {param_1=2, param_2=1}, ..., {param_1=4, param_2=10}.
434
            fixed_parameters: Dictionary of parameters that stay same through
435
                all batch runs. For example, given fixed_parameters of
436
                    {"constant_parameter": 3},
437
                every instantiated model will be passed constant_parameter=3
438
                as a kwarg.
439
            iterations: The total number of times to run the model for each
440
                combination of parameters.
441
            max_steps: Upper limit of steps above which each run will be halted
442
                if it hasn't halted on its own.
443
            model_reporters: The dictionary of variables to collect on each run
444
                at the end, with variable names mapped to a function to collect
445
                them. For example:
446
                    {"agent_count": lambda m: m.schedule.get_agent_count()}
447
            agent_reporters: Like model_reporters, but each variable is now
448
                collected at the level of each agent present in the model at
449
                the end of the run.
450
            display_progress: Display progress bar with time estimation?
451

452
        """
453 6
        if variable_parameters is None:
454 6
            super().__init__(
455
                model_cls,
456
                variable_parameters,
457
                fixed_parameters,
458
                iterations,
459
                max_steps,
460
                model_reporters,
461
                agent_reporters,
462
                display_progress,
463
            )
464
        else:
465 6
            super().__init__(
466
                model_cls,
467
                ParameterProduct(variable_parameters),
468
                fixed_parameters,
469
                iterations,
470
                max_steps,
471
                model_reporters,
472
                agent_reporters,
473
                display_progress,
474
            )
475

476

477 6
class BatchRunnerMP(BatchRunner):
478
    """ Child class of BatchRunner, extended with multiprocessing support. """
479

480 6
    def __init__(self, model_cls, nr_processes=None, **kwargs):
481
        """ Create a new BatchRunnerMP for a given model with the given
482
        parameters.
483

484
        model_cls: The class of model to batch-run.
485
        nr_processes: int
486
                      the number of separate processes the BatchRunner
487
                      should start, all running in parallel.
488
        kwargs: the kwargs required for the parent BatchRunner class
489
        """
490 6
        if nr_processes is None:
491
            # identify the number of processors available on users machine
492 6
            available_processors = cpu_count()
493 6
            self.processes = available_processors
494 6
            print("BatchRunner MP will use {} processors.".format(self.processes))
495
        else:
496 6
            self.processes = nr_processes
497

498 6
        super().__init__(model_cls, **kwargs)
499 6
        self.pool = Pool(self.processes)
500

501 6
    def result_prep_mp(self, results):
502
        """
503
        Helper Function
504
        :param results: Takes results dictionary from Processpool and single processor debug run and fixes format to
505
        make compatible with BatchRunner Output
506
        :updates model_vars and agents_vars so consistent across all batchrunner
507
        """
508
        # Take results and convert to dictionary so dataframe can be called
509 6
        for model_key, model in results.items():
510 6
            if self.model_reporters:
511 6
                self.model_vars[model_key] = self.collect_model_vars(model)
512 6
            if self.agent_reporters:
513 6
                agent_vars = self.collect_agent_vars(model)
514 6
                for agent_id, reports in agent_vars.items():
515 6
                    agent_key = model_key + (agent_id,)
516 6
                    self.agent_vars[agent_key] = reports
517 6
            if hasattr(model, "datacollector"):
518 6
                if model.datacollector.model_reporters is not None:
519 6
                    self.datacollector_model_reporters[model_key] = model.datacollector.get_model_vars_dataframe()
520 6
                if model.datacollector.agent_reporters is not None:
521 6
                    self.datacollector_agent_reporters[model_key] = model.datacollector.get_agent_vars_dataframe()
522

523
        # Make results consistent
524 6
        if len(self.datacollector_model_reporters.keys()) == 0:
525 6
            self.datacollector_model_reporters = None
526 6
        if len(self.datacollector_agent_reporters.keys()) == 0:
527 6
            self.datacollector_agent_reporters = None
528

529 6
    def run_all(self):
530
        """
531
        Run the model at all parameter combinations and store results,
532
        overrides run_all from BatchRunner.
533
        """
534

535 6
        run_iter_args, total_iterations = self._make_model_args_mp()
536
        # register the process pool and init a queue
537
        # store results in ordered dictionary
538 6
        results = {}
539

540 6
        if self.processes > 1:
541 6
            with tqdm(total_iterations, disable=not self.display_progress) as pbar:
542 6
                for params, model in self.pool.imap_unordered(self.run_wrappermp, run_iter_args):
543 6
                    results[params] = model
544 6
                    pbar.update()
545

546 6
                self.result_prep_mp(results)
547
        # For debugging model due to difficulty of getting errors during multiprocessing
548
        else:
549 6
            for run in run_iter_args:
550 6
                params, model_data = self.run_wrappermp(run)
551 6
                results[params] = model_data
552

553 6
            self.result_prep_mp(results)
554

555
        # Close multi-processing
556 6
        self.pool.close()
557

558 6
        return (getattr(self, "model_vars", None), getattr(self, "agent_vars", None),
559
                getattr(self, "datacollector_model_reporters", None),
560
                getattr(self, "datacollector_agent_reporters", None))

Read our documentation on viewing source code .

Loading