1
#!/usr/bin/env python
2
# import base64
3 1
from contextlib import contextmanager
4 1
from typing import List, Tuple, Union, Dict  # noqa: F401
5 1
import logging
6 1
import os
7 1
import time
8 1
import sys
9 1
import inspect
10

11 1
import traitlets as T
12 1
from traitlets import default, validate, TraitError
13

14
# from traitlets import validate
15 1
from traitlets.config.configurable import Configurable
16 1
from traitlets.config import Config
17 1
from jsonextended import edict
18 1
from six import string_types
19 1
import jsonschema
20

21 1
import ipypublish
22 1
from ipypublish.utils import (
23
    pathlib,
24
    handle_error,
25
    read_file_from_directory,
26
    get_module_path,
27
    get_valid_filename,
28
    find_entry_point,
29
)
30 1
from ipypublish import schema
31 1
from ipypublish.convert.nbmerge import merge_notebooks
32 1
from ipypublish.convert.config_manager import (
33
    get_export_config_path,
34
    load_export_config,
35
    load_template,
36
    create_exporter_cls,
37
)
38

39

40 1
def dict_to_config(config, unflatten=True, key_as_tuple=False):
41 1
    if unflatten:
42 1
        config = edict.unflatten(config, key_as_tuple=key_as_tuple, delim=".")
43 1
    return Config(config)
44

45

46 1
class IpyPubMain(Configurable):
47

48 1
    conversion = T.Unicode(
49
        "latex_ipypublish_main", help="key or path to conversion configuration"
50
    ).tag(config=True)
51

52 1
    plugin_folder_paths = T.Set(
53
        T.Unicode(),
54
        default_value=(),
55
        help="a list of folders containing conversion configurations",
56
    ).tag(config=True)
57

58 1
    @validate("plugin_folder_paths")
59 1
    def _validate_plugin_folder_paths(self, proposal):
60 1
        folder_paths = proposal["value"]
61 1
        for path in folder_paths:
62 1
            if not os.path.exists(path):
63 0
                raise TraitError(
64
                    "the configuration folder path does not exist: " "{}".format(path)
65
                )
66 1
        return proposal["value"]
67

68 1
    outpath = T.Union(
69
        [T.Unicode(), T.Instance(pathlib.Path)],
70
        allow_none=True,
71
        default_value=None,
72
        help="path to output converted files",
73
    ).tag(config=True)
74

75 1
    folder_suffix = T.Unicode(
76
        "_files",
77
        help=(
78
            "suffix for the folder name where content will be dumped "
79
            "(e.g. internal images). "
80
            "It will be a sanitized version of the input filename, "
81
            "followed by the suffix"
82
        ),
83
    ).tag(config=True)
84

85 1
    ignore_prefix = T.Unicode(
86
        "_", help=("prefixes to ignore, " "when finding notebooks to merge")
87
    ).tag(config=True)
88

89 1
    meta_path_placeholder = T.Unicode(
90
        "${meta_path}",
91
        help=(
92
            "all string values in the export configuration containing "
93
            "this placeholder will be be replaced with the path to the "
94
            "notebook from which the metadata was obtained"
95
        ),
96
    ).tag(config=True)
97

98 1
    files_folder_placeholder = T.Unicode(
99
        "${files_path}",
100
        help=(
101
            "all string values in the export configuration containing "
102
            "this placeholder will be be replaced with the path "
103
            "(relative to outpath) to the folder where files will be dumped"
104
        ),
105
    ).tag(config=True)
106

107 1
    validate_nb_metadata = T.Bool(
108
        True,
109
        help=(
110
            "before running the exporter, validate that "
111
            "the notebook level metadata is valid again the schema"
112
        ),
113
    ).tag(config=True)
114

115 1
    pre_conversion_funcs = T.Dict(
116
        help=(
117
            "a mapping of file extensions to functions that can convert"
118
            "that file type Instance(nbformat.NotebookNode) = func(pathstr)"
119
        )
120
    ).tag(config=True)
121

122 1
    @default("pre_conversion_funcs")
123 1
    def _default_pre_conversion_funcs(self):
124 1
        try:
125 1
            import jupytext  # noqa: F401
126 0
        except ImportError:
127 0
            return {}
128

129 1
        try:
130 1
            from jupytext import read
131 0
        except ImportError:
132
            # this is deprecated in newer versions
133 0
            from jupytext import readf as read  # noqa: F401
134

135 1
        return {".Rmd": read, ".md": read}
136

137 1
    @validate("pre_conversion_funcs")
138 1
    def _validate_pre_conversion_funcs(self, proposal):
139 1
        for ext, func in proposal["value"].items():
140 1
            if not ext.startswith("."):
141 0
                raise TraitError(
142
                    "the extension key should start with a '.': " "{}".format(ext)
143
                )
144 1
            try:
145 1
                func("string")
146
                # TODO should do this safely with inspect,
147
                # but no obvious solution
148
                # to check if it only requires one string argument
149 1
            except TypeError:
150 0
                raise TraitError(
151
                    "the function for {} can not be "
152
                    "called with a single string arg: "
153
                    "{}".format(ext, func)
154
                )
155 1
            except Exception:
156 1
                pass
157 1
        return proposal["value"]
158

159 1
    log_to_stdout = T.Bool(True, help="whether to log to sys.stdout").tag(config=True)
160

161 1
    log_level_stdout = T.Enum(
162
        ["debug", "info", "warning", "error", "DEBUG", "INFO", "WARNING", "ERROR"],
163
        default_value="INFO",
164
        help="the logging level to output to stdout",
165
    ).tag(config=True)
166

167 1
    log_stdout_formatstr = T.Unicode("%(levelname)s:%(name)s:%(message)s").tag(
168
        config=True
169
    )
170

171 1
    log_to_file = T.Bool(False, help="whether to log to file").tag(config=True)
172

173 1
    log_level_file = T.Enum(
174
        ["debug", "info", "warning", "error", "DEBUG", "INFO", "WARNING", "ERROR"],
175
        default_value="INFO",
176
        help="the logging level to output to file",
177
    ).tag(config=True)
178

179 1
    log_file_path = T.Unicode(
180
        None,
181
        allow_none=True,
182
        help="if None, will output to {outdir}/{ipynb_name}.nbpub.log",
183
    ).tag(config=True)
184

185 1
    log_file_formatstr = T.Unicode("%(levelname)s:%(name)s:%(message)s").tag(
186
        config=True
187
    )
188

189 1
    default_ppconfig_kwargs = T.Dict(
190
        trait=T.Bool(),
191
        default_value=(
192
            ("pdf_in_temp", False),
193
            ("pdf_debug", False),
194
            ("launch_browser", False),
195
        ),
196
        help=(
197
            "convenience arguments for constructing the post-processors "
198
            "default configuration"
199
        ),
200
    ).tag(config=True)
201

202 1
    default_pporder_kwargs = T.Dict(
203
        trait=T.Bool(),
204
        default_value=(
205
            ("dry_run", False),
206
            ("clear_existing", False),
207
            ("dump_files", False),
208
            ("create_pdf", False),
209
            ("serve_html", False),
210
            ("slides", False),
211
        ),
212
        help=(
213
            "convenience arguments for constructing the post-processors " "default list"
214
        ),
215
    ).tag(config=True)
216

217
    # TODO validate that default_ppconfig/pporder_kwargs can be parsed to funcs
218

219 1
    default_exporter_config = T.Dict(help="default configuration for exporters").tag(
220
        config=True
221
    )
222

223 1
    @default("default_exporter_config")
224 1
    def _default_exporter_config(self):
225 1
        temp = "${files_path}/{unique_key}_{cell_index}_{index}{extension}"
226 1
        return {"ExtractOutputPreprocessor": {"output_filename_template": temp}}
227

228 1
    def _create_default_ppconfig(
229
        self, pdf_in_temp=False, pdf_debug=False, launch_browser=False
230
    ):
231
        """create a default config for postprocessors"""
232 1
        return Config(
233
            {
234
                "PDFExport": {
235
                    "files_folder": "${files_path}",
236
                    "convert_in_temp": pdf_in_temp,
237
                    "debug_mode": pdf_debug,
238
                    "open_in_browser": launch_browser,
239
                    "skip_mime": False,
240
                },
241
                "RunSphinx": {"open_in_browser": launch_browser},
242
                "RemoveFolder": {"files_folder": "${files_path}"},
243
                "CopyResourcePaths": {"files_folder": "${files_path}"},
244
                "ConvertBibGloss": {"files_folder": "${files_path}"},
245
            }
246
        )
247

248 1
    def _create_default_pporder(
249
        self,
250
        dry_run=False,
251
        clear_existing=False,
252
        dump_files=False,
253
        create_pdf=False,
254
        serve_html=False,
255
        slides=False,
256
    ):
257
        """create a default list of postprocessors to run"""
258 1
        default_pprocs = [
259
            "remove-blank-lines",
260
            "remove-trailing-space",
261
            "filter-output-files",
262
        ]
263 1
        if slides:
264 1
            default_pprocs.append("fix-slide-refs")
265 1
        if not dry_run:
266 1
            if clear_existing:
267 0
                default_pprocs.append("remove-folder")
268 1
            default_pprocs.append("write-text-file")
269 1
            if dump_files or create_pdf or serve_html:
270 1
                default_pprocs.extend(
271
                    ["write-resource-files", "copy-resource-paths", "convert-bibgloss"]
272
                )
273 1
            if create_pdf:
274 1
                default_pprocs.append("pdf-export")
275 1
            elif serve_html:
276 0
                default_pprocs.append("reveal-server")
277

278 1
        return default_pprocs
279

280 1
    @property
281 1
    def logger(self):
282 1
        return logging.getLogger("ipypublish")
283

284 1
    @contextmanager
285 1
    def _log_handlers(self, ipynb_name, outdir):
286

287 1
        root = logging.getLogger()
288 1
        root_level = root.level
289 1
        log_handlers = []
290

291 1
        try:
292 1
            root.setLevel(logging.DEBUG)
293

294 1
            if self.log_to_stdout:
295
                # setup logging to terminal
296 1
                slogger = logging.StreamHandler(sys.stdout)
297 1
                slogger.setLevel(getattr(logging, self.log_level_stdout.upper()))
298 1
                formatter = logging.Formatter(self.log_stdout_formatstr)
299 1
                slogger.setFormatter(formatter)
300 1
                slogger.propogate = False
301 1
                root.addHandler(slogger)
302 1
                log_handlers.append(slogger)
303

304 1
            if self.log_to_file:
305
                # setup logging to file
306 1
                if self.log_file_path:
307 0
                    path = self.log_file_path
308
                else:
309 1
                    path = os.path.join(outdir, ipynb_name + ".nbpub.log")
310

311 1
                if not os.path.exists(os.path.dirname(path)):
312 1
                    os.makedirs(os.path.dirname(path))
313

314 1
                flogger = logging.FileHandler(path, "w")
315 1
                flogger.setLevel(getattr(logging, self.log_level_file.upper()))
316 1
                formatter = logging.Formatter(self.log_file_formatstr)
317 1
                flogger.setFormatter(formatter)
318 1
                flogger.propogate = False
319 1
                root.addHandler(flogger)
320 1
                log_handlers.append(flogger)
321

322 1
            yield
323

324
        finally:
325

326 1
            root.setLevel(root_level)
327 1
            for handler in log_handlers:
328 1
                handler.close()
329 1
                root.removeHandler(handler)
330

331 1
    def __init__(self, config=None):
332
        """
333
        Public constructor
334

335
        Parameters
336
        ----------
337
        config: traitlets.config.Config
338
            User configuration instance.
339

340
        """
341
        # with_default_config = self.default_config
342
        # if config:
343
        #     with_default_config.merge(config)
344 1
        if config is None:
345 1
            config = {}
346 1
        if not isinstance(config, Config):
347 1
            config = Config(config)
348 1
        with_default_config = config
349

350 1
        super(IpyPubMain, self).__init__(config=with_default_config)
351

352 1
    def __call__(self, ipynb_path, nb_node=None):
353
        """see IpyPubMain.publish"""
354 1
        return self.publish(ipynb_path, nb_node)
355

356 1
    def publish(self, ipynb_path, nb_node=None):
357
        """ convert one or more Jupyter notebooks to a published format
358

359
        paths can be string of an existing file or folder,
360
        or a pathlib.Path like object
361

362
        all files linked in the documents are placed into a single files_folder
363

364
        Parameters
365
        ----------
366
        ipynb_path: str or pathlib.Path
367
            notebook file or directory
368
        nb_node: None or nbformat.NotebookNode
369
            a pre-converted notebook
370

371
        Returns
372
        --------
373
        outdata: dict
374
            containing keys;
375
            "outpath", "exporter", "stream", "main_filepath", "resources"
376

377
        """
378
        # setup the input and output paths
379 1
        if isinstance(ipynb_path, string_types):
380 1
            ipynb_path = pathlib.Path(ipynb_path)
381 1
        ipynb_name, ipynb_ext = os.path.splitext(ipynb_path.name)
382 1
        outdir = (
383
            os.path.join(os.getcwd(), "converted")
384
            if self.outpath is None
385
            else str(self.outpath)
386
        )
387

388 1
        with self._log_handlers(ipynb_name, outdir):
389

390 1
            if not ipynb_path.exists() and not nb_node:
391 0
                handle_error(
392
                    "the notebook path does not exist: {}".format(ipynb_path),
393
                    IOError,
394
                    self.logger,
395
                )
396

397
            # log start of conversion
398 1
            self.logger.info(
399
                "started ipypublish v{0} at {1}".format(
400
                    ipypublish.__version__, time.strftime("%c")
401
                )
402
            )
403 1
            self.logger.info(
404
                "logging to: {}".format(os.path.join(outdir, ipynb_name + ".nbpub.log"))
405
            )
406 1
            self.logger.info("running for ipynb(s) at: {0}".format(ipynb_path))
407 1
            self.logger.info(
408
                "with conversion configuration: {0}".format(self.conversion)
409
            )
410

411 1
            if nb_node is None and ipynb_ext in self.pre_conversion_funcs:
412 0
                func = self.pre_conversion_funcs[ipynb_ext]
413 0
                self.logger.info(
414
                    "running pre-conversion with: {}".format(inspect.getmodule(func))
415
                )
416 0
                try:
417 0
                    nb_node = func(ipynb_path)
418 0
                except Exception as err:
419 0
                    handle_error(
420
                        "pre-conversion failed for {}: {}".format(ipynb_path, err),
421
                        err,
422
                        self.logger,
423
                    )
424

425
            # doesn't work with folders
426
            # if (ipynb_ext != ".ipynb" and nb_node is None):
427
            #     handle_error(
428
            #         'the file extension is not associated with any '
429
            #         'pre-converter: {}'.format(ipynb_ext),
430
            # TypeError, self.logger)
431

432 1
            if nb_node is None:
433
                # merge all notebooks
434
                # TODO allow notebooks to remain separate
435
                # (would require creating a main.tex with the preamble in etc )
436
                # Could make everything a 'PyProcess',
437
                # with support for multiple streams
438 1
                final_nb, meta_path = merge_notebooks(
439
                    ipynb_path, ignore_prefix=self.ignore_prefix
440
                )
441
            else:
442 0
                final_nb, meta_path = (nb_node, ipynb_path)
443

444
            # validate the notebook metadata against the schema
445 1
            if self.validate_nb_metadata:
446 1
                nb_metadata_schema = read_file_from_directory(
447
                    get_module_path(schema),
448
                    "doc_metadata.schema.json",
449
                    "doc_metadata.schema",
450
                    self.logger,
451
                    interp_ext=True,
452
                )
453 1
                try:
454 1
                    jsonschema.validate(final_nb.metadata, nb_metadata_schema)
455 0
                except jsonschema.ValidationError as err:
456 0
                    handle_error(
457
                        "validation of notebook level metadata failed: {}\n"
458
                        "see the doc_metadata.schema.json for full spec".format(
459
                            err.message
460
                        ),
461
                        jsonschema.ValidationError,
462
                        logger=self.logger,
463
                    )
464

465
            # set text replacements for export configuration
466 1
            replacements = {
467
                self.meta_path_placeholder: str(meta_path),
468
                self.files_folder_placeholder: "{}{}".format(
469
                    get_valid_filename(ipynb_name), self.folder_suffix
470
                ),
471
            }
472

473 1
            self.logger.debug("notebooks meta path: {}".format(meta_path))
474

475
            # load configuration file
476 1
            (
477
                exporter_cls,
478
                jinja_template,
479
                econfig,
480
                pprocs,
481
                pconfig,
482
            ) = self._load_config_file(replacements)
483

484
            # run nbconvert
485 1
            self.logger.info("running nbconvert")
486 1
            exporter, stream, resources = self.export_notebook(
487
                final_nb, exporter_cls, econfig, jinja_template
488
            )
489

490
            # postprocess results
491 1
            main_filepath = os.path.join(outdir, ipynb_name + exporter.file_extension)
492

493 1
            for post_proc_name in pprocs:
494 1
                proc_class = find_entry_point(
495
                    post_proc_name,
496
                    "ipypublish.postprocessors",
497
                    self.logger,
498
                    "ipypublish",
499
                )
500 1
                proc = proc_class(pconfig)
501 1
                stream, main_filepath, resources = proc.postprocess(
502
                    stream, exporter.output_mimetype, main_filepath, resources
503
                )
504

505 1
            self.logger.info("process finished successfully")
506

507 1
        return {
508
            "outpath": outdir,
509
            "exporter": exporter,
510
            "stream": stream,
511
            "main_filepath": main_filepath,
512
            "resources": resources,
513
        }
514

515 1
    def _load_config_file(self, replacements):
516
        # find conversion configuration
517 1
        self.logger.info("finding conversion configuration: {}".format(self.conversion))
518 1
        export_config_path = None
519 1
        if isinstance(self.conversion, string_types):
520 1
            outformat_path = pathlib.Path(self.conversion)
521
        else:
522 0
            outformat_path = self.conversion
523 1
        if outformat_path.exists():  # TODO use pathlib approach
524
            # if is outformat is a path that exists, use that
525 1
            export_config_path = outformat_path
526
        else:
527
            # else search internally
528 1
            export_config_path = get_export_config_path(
529
                self.conversion, self.plugin_folder_paths
530
            )
531

532 1
        if export_config_path is None:
533 1
            handle_error(
534
                "could not find conversion configuration: {}".format(self.conversion),
535
                IOError,
536
                self.logger,
537
            )
538

539
        # read conversion configuration and create
540 1
        self.logger.info("loading conversion configuration")
541 1
        data = load_export_config(export_config_path)
542 1
        self.logger.info("creating exporter")
543 1
        exporter_cls = create_exporter_cls(data["exporter"]["class"])
544 1
        self.logger.info("creating template and loading filters")
545 1
        template_name = "template_file"
546 1
        jinja_template = load_template(template_name, data["template"])
547 1
        self.logger.info("creating process configuration")
548 1
        export_config = self._create_export_config(
549
            data["exporter"], template_name, replacements
550
        )
551 1
        pprocs, pproc_config = self._create_pproc_config(
552
            data.get("postprocessors", {}), replacements
553
        )
554

555 1
        return (exporter_cls, jinja_template, export_config, pprocs, pproc_config)
556

557 1
    def _create_export_config(self, exporter_data, template_name, replacements):
558
        # type: (dict, Dict[str, str]) -> Config
559 1
        config = {}
560 1
        exporter_name = exporter_data["class"].split(".")[-1]
561

562 1
        config[exporter_name + ".template_file"] = template_name
563 1
        config[exporter_name + ".filters"] = exporter_data.get("filters", [])
564

565 1
        preprocessors = []
566 1
        for preproc in exporter_data.get("preprocessors", []):
567 1
            preprocessors.append(preproc["class"])
568 1
            preproc_name = preproc["class"].split(".")[-1]
569 1
            for name, val in preproc.get("args", {}).items():
570 1
                config[preproc_name + "." + name] = val
571

572 1
        config[exporter_name + ".preprocessors"] = preprocessors
573

574 1
        for name, val in exporter_data.get("other_args", {}).items():
575 1
            config[name] = val
576

577 1
        final_config = self.default_exporter_config
578 1
        final_config.update(config)
579

580 1
        replace_placeholders(final_config, replacements)
581

582 1
        return dict_to_config(final_config, True)
583

584 1
    def _create_pproc_config(self, pproc_data, replacements):
585

586 1
        if "order" in pproc_data:
587 1
            pprocs_list = pproc_data["order"]
588
        else:
589 1
            pprocs_list = self._create_default_pporder(**self.default_pporder_kwargs)
590

591 1
        pproc_config = self._create_default_ppconfig(**self.default_ppconfig_kwargs)
592

593 1
        if "config" in pproc_data:
594 1
            override_config = pproc_data["config"]
595 1
            pproc_config.update(override_config)
596

597 1
        replace_placeholders(pproc_config, replacements)
598

599 1
        return pprocs_list, pproc_config
600

601 1
    def export_notebook(self, final_nb, exporter_cls, config, jinja_template):
602

603 1
        kwargs = {"config": config}
604 1
        if jinja_template is not None:
605 1
            kwargs["extra_loaders"] = [jinja_template]
606 1
        try:
607 1
            exporter = exporter_cls(**kwargs)
608 1
        except TypeError:
609 1
            self.logger.warning(
610
                "the exporter class can not be parsed "
611
                "the arguments: {}".format(list(kwargs.keys()))
612
            )
613 1
            exporter = exporter_cls()
614

615 1
        body, resources = exporter.from_notebook_node(final_nb)
616 1
        return exporter, body, resources
617

618

619 1
def replace_placeholders(mapping, replacements):
620
    """ recurse through a mapping and perform (in-place) string replacements
621

622
    Parameters
623
    ----------
624
    mapping:
625
        any object which has an items() attribute
626
    replacements: dict
627
        {placeholder: replacement}
628

629
    """
630 1
    for key, val in mapping.items():
631 1
        if isinstance(val, string_types):
632 1
            for instr, outstr in replacements.items():
633 1
                val = val.replace(instr, outstr)
634 1
            mapping[key] = val
635 1
        elif hasattr(val, "items"):
636 1
            replace_placeholders(val, replacements)

Read our documentation on viewing source code .

Loading