1
"""Markdown parser"""
2 1
import os
3 1
import logging
4 1
from typing import Dict, List
5 1
from . import constants as cts
6 1
import commonmark
7 1
import pandas as pd
8

9

10 1
logging.basicConfig(
11
    format="[%(asctime)s]{%(pathname)s:%(lineno)d}\n\
12
%(levelname)s: %(message)s",
13
    datefmt="%Y-%m-%d:%H:%M:%S",
14
    level=logging.ERROR,
15
)
16

17

18 1
def read_file(path: str) -> str:
19
    """ read file from path """
20 1
    with open(path) as input_file:
21 1
        data = input_file.read()
22 1
        return data
23

24

25 1
def get_file_names(directory_name: str) -> List[str]:
26
    """ Uses os library to find all markdown files in given directory """
27 1
    file_list = []
28 1
    for file in os.listdir(directory_name):
29 1
        filename = os.fsdecode(file)
30 1
        if filename.endswith(cts.MD_EXT) or filename.endswith(cts.TXT_EXT):
31 1
            file_list.append(os.path.join(directory_name, filename))
32
        else:
33 0
            continue
34

35 1
    return file_list
36

37

38 1
def merge_dict(dict_1, dict_2: Dict[str, str]) -> Dict[str, List[str]]:
39
    """Merge two dictionaries and store values of common keys in list"""
40 1
    if dict_1 is None:
41 1
        dict_1 = {k: [] for k in dict_2.keys()}
42 1
    elif isinstance(list(dict_1.values())[0], list) is False:
43 1
        dict_1 = {k: [v] for k, v in dict_1.items()}
44 1
    for key in dict_1.keys():
45 1
        try:
46 1
            dict_1[key].append(dict_2[key])
47 0
        except KeyError as err:
48 0
            dict_1[key].append("")
49 0
            logging.warning(f"Key does not exist: {err}")
50

51 1
    return dict_1
52

53

54 1
def collect_md(directory: str, is_clean=True) -> Dict[str, List[str]]:
55
    """A pipeline to collect all the md files in a directory to a dict"""
56 1
    file_names = get_file_names(directory)
57 1
    main_md_dict = None
58 1
    for file in file_names:
59 1
        individual_dict = md_parser(read_file(file), is_clean)
60 1
        main_md_dict = merge_dict(main_md_dict, individual_dict)
61 1
    return main_md_dict
62

63

64 1
def collect_md_text(directory: str, is_clean=True) -> List[str]:
65
    """A pipeline to collect all md files in a directory to a list of text"""
66 1
    file_names = get_file_names(directory)
67 1
    main_md_list = []
68 1
    for file in file_names:
69 1
        individual_dict = md_parser(read_file(file), is_clean)
70 1
        md_text = " ".join(individual_dict.values())
71 1
        main_md_list.append(md_text)
72 1
    return main_md_list
73

74

75 1
def md_parser(input_md: str, is_clean=True) -> Dict[str, str]:
76
    """Parse a markdown file and return as dict of headers and paragraphs"""
77 1
    ast = commonmark.Parser().parse(input_md)
78 1
    types = {}
79 1
    if is_clean:
80 1
        types = {"code_block", "link", "image", "code", "block_quote"}
81 1
    md_dict = {}
82 1
    cur_heading = ""
83 1
    for subnode, enter in ast.walker():
84 1
        if subnode.t == "heading" and enter:
85
            # set header as key name
86 1
            md_dict[subnode.first_child.literal.lower()] = ""
87 1
            cur_heading = subnode.first_child.literal.lower()
88 1
        elif (
89
            subnode.literal is not None
90
            and subnode.literal.lower() != cur_heading
91
            and subnode.t not in types
92
        ):
93
            # add related text to the header
94 1
            md_dict[cur_heading] += subnode.literal + " "
95
        else:
96 0
            continue
97 1
    print(md_dict)
98 1
    return md_dict
99

100

101 1
def build_pd(md_dict):
102 0
    df = pd.DataFrame(md_dict)
103 0
    return df
104

105

106 1
if __name__ == "__main__":
107 0
    build_pd(collect_md("resources/test"))

Read our documentation on viewing source code .

Loading