securisec / chepy
1 4
import sys
2 4
import base64
3 4
import binascii
4 4
import pathlib
5 4
import webbrowser
6 4
import logging
7 4
import inspect
8 4
import io
9 4
import itertools
10 4
import subprocess
11 4
from configparser import ConfigParser
12 4
from urllib.parse import urljoin
13 4
from pprint import pformat
14 4
from typing import Any, Tuple, List, Union
15

16 4
import pyperclip
17 4
import ujson
18 4
import jsonpickle
19 4
import regex as re
20 4
from decorator import decorator
21

22 4
from .modules.internal.colors import yellow, cyan, green, magenta, blue, red
23

24

25 4
class ChepyDecorators(object):
26
    """A class to house all the decorators for Chepy
27
    """
28

29 4
    @staticmethod
30 4
    @decorator
31
    def call_stack(func, *args, **kwargs):
32
        """This decorator is used to get the method name and 
33
        arguments and save it to self.stack. The data from 
34
        self.stack is predominantly used to save recepies. 
35
        """
36 4
        func_sig = dict()
37 4
        func_self = args[0]
38 4
        func_sig["function"] = func.__name__
39

40 4
        bound_args = inspect.signature(func).bind(*args, **kwargs)
41 4
        bound_args.apply_defaults()
42

43 4
        func_arguments = dict(bound_args.arguments)
44 4
        del func_arguments["self"]
45 4
        func_sig["args"] = func_arguments
46 4
        func_self._stack.append(func_sig)
47

48 4
        return func(*args, **kwargs)
49

50

51 4
class ChepyCore(object):
52
    """The `ChepyCore` class for Chepy is primarily used as an interface 
53
    for all the current modules/classes in Chepy, or for plugin development. 
54
    The `ChepyCore` class is what provides the various attributes like **states**, 
55
    **buffers**, etc and is required to use and extend Chepy.
56
    
57
    Args:
58
        \*data (tuple): The core class takes arbitrary number of arguments as \*args.
59

60
    Attributes:
61
        states (dict): Contains all the current states. Each arg passed to 
62
            the ChepyCore class will be considered a state.
63
        buffers (dict): Contains all the current buffers if a buffer is saved. 
64
        state (Any): The data in the current state. The state changes each time a 
65
            Chepy method is called. 
66
    
67
    Returns:
68
        Chepy: The Chepy object. 
69
    """
70

71 4
    def __init__(self, *data):
72 4
        self.states = dict(list(enumerate(data)))
73
        #: Holder for the initial state
74 4
        self.__initial_states = dict(list(enumerate(data)))
75
        #: Value of the initial state
76 4
        self._current_index = 0
77 4
        self.buffers = dict()
78
        #: Alias for `write_to_file`
79 4
        self.write = self.write_to_file
80
        #: Alias for `out`
81 4
        self.bake = self.out
82
        #: Alias for `web`
83 4
        self.cyberchef = self.web
84
        #: Alias for `load_file`
85 4
        self.read_file = self.load_file
86
        #: Holds all the methods that are called/chanined and their args
87 4
        self._stack = list()
88

89
        #: Log level
90 4
        self.log_level = logging.INFO
91
        #: Log format message
92 4
        self.log_format = "%(levelname)-2s - %(message)s"
93 4
        logging.getLogger().setLevel(self.log_level)
94 4
        logging.basicConfig(format=self.log_format)
95

96 4
    @property
97
    def state(self):
98 4
        return self.states[self._current_index]
99

100 4
    @state.setter
101
    def state(self, val):
102 4
        self.states[self._current_index] = val
103

104 4
    def __str__(self):
105 4
        try:
106 4
            if isinstance(self.state, bytearray):
107 4
                return re.sub(rb"[^\x00-\x7f]", b".", self.state).decode()
108
            else:
109 4
                return self._convert_to_str()
110
        except UnicodeDecodeError:  # pragma: no cover
111
            return "Could not convert to str, but the data exists in the states. Use o, output or out() to access the values"
112
        except:  # pragma: no cover
113
            logging.exception(
114
                "\n\nCannot print current state. Either chain with "
115
                "another method, or use one of the output methods "
116
                "Example: .o, .output, .state or .out()\n\n"
117
            )
118
            return ""
119

120 4
    def _pickle_class(self, obj: Any) -> Any:
121
        """This method takes another object as an argument and 
122
        pickels that into a json object using jsonpickel. The 
123
        return value is a dictionary
124
        
125
        Args:
126
            obj (Any): Any object
127
        
128
        Returns:
129
            Any: unpickeled JSON as a python object. 
130
        """
131 4
        return ujson.loads(jsonpickle.encode(obj, unpicklable=True))
132

133 4
    def _load_as_file(self) -> object:
134
        """This method is used when a function or a method expects 
135
        a file path to load a file. Instead of passing a file path, 
136
        this method allows passing an io.BytesIO object instead.
137
        
138
        Returns:
139
            object: io.BytesIO object
140
        """
141 4
        return io.BytesIO(self._convert_to_bytes())
142

143 4
    def _abs_path(self, path: str):
144
        """Returns the absolute path by expanding home dir
145
        
146
        Args:
147
            path (str): Path to expand
148
        
149
        Returns:
150
            object: Path object
151
        """
152 4
        return pathlib.Path(path).expanduser().absolute()
153

154 4
    def _info_logger(self, data: str) -> None:
155
        """Just a binding for logger.info
156
        
157
        Args:
158
            data (str): Message to log
159
        
160
        Returns:
161
            Chepy: The Chepy object. 
162
        """
163 4
        logging.info(blue(data))
164 4
        return None
165

166
    def _warning_logger(self, data: str) -> None:  # pragma: no cover
167
        """Just a binding for logger.warning
168
        
169
        Args:
170
            data (str): Message to log
171
        
172
        Returns:
173
            Chepy: The Chepy object. 
174
        """
175
        logging.warning(yellow(data))
176
        return None
177

178
    def _error_logger(self, data: str) -> None:  # pragma: no cover
179
        """Just a binding for logger.error
180
        
181
        Args:
182
            data (str): Message to log
183
        
184
        Returns:
185
            Chepy: The Chepy object. 
186
        """
187
        logging.error(red(data))
188
        return None
189

190 4
    def fork(self, methods: List[Tuple[Union[str, object], dict]]):
191
        """Run multiple methods on all available states
192
        
193
        Method names in a list of tuples. If using in the cli, 
194
        this should not contain any spaces.
195

196
        Args:
197
            methods (List[Tuple[Union[str, object], dict]]): Required. 
198
                List of tuples
199
        
200
        Returns:
201
            Chepy: The Chepy object. 
202

203
        Examples:
204
            This method takes an array of method names and their args as an list of 
205
            tuples; the first value of the tuple is the method name as either a string, 
206
            or as an object, and the second value is a ditionary of arguments. The keys of 
207
            in the dictionary are method argument names, while the values are argument 
208
            values.
209

210
            >>> from chepy import Chepy
211
            >>> c = Chepy("some", "data")
212
            >>> c.fork([("to_hex",), ("hmac_hash", {"secret_key": "key"})])
213
            >>> # this is how to use fork methods with a string
214
            >>> c.fork([(c.to_hex,), (c.hmac_hash, {"secret_key": "key"})])
215
            >>> # This is how to use fork using methods
216
            >>> print(c.states)
217
            {0: 'e46dfcf050c0a0d135b73856ab8e3298f9cc4105', 1: '1863d1542629590e3838543cbe3bf6a4f7c706ff'}
218
        """
219 4
        for i in self.states:
220 4
            self.change_state(i)
221 4
            for method in methods:
222 4
                if type(method[0]).__name__ == "method":
223 4
                    method_name = method[0].__name__  # type: ignore
224 4
                elif isinstance(method[0], str):
225 4
                    method_name = method[0]
226 4
                if len(method) > 1:
227 4
                    self.states[i] = getattr(self, method_name)(**method[1]).o
228
                else:
229 4
                    self.states[i] = getattr(self, method_name)().o
230 4
        return self
231

232 4
    @ChepyDecorators.call_stack
233 4
    def set_state(self, data: Any):
234
        """Set any arbitrary values in the current state
235

236
        This method is simply changing the value of the instantiated 
237
        state with an arbitrary value. 
238
        
239
        Args:
240
            data (Any): Any data type
241
        
242
        Returns:
243
            Chepy: The Chepy object. 
244

245
        Examples:
246
            >>> c = Chepy("some data")
247
            >>> print(c.state)
248
            some data
249
            >>> c.set_state("New data")
250
            >>> print(c.state)
251
            New data
252
        """
253 4
        self.state = data
254 4
        return self
255

256 4
    @ChepyDecorators.call_stack
257
    def create_state(self):
258
        """Create a new empty state
259
        
260
        Returns:
261
            Chepy: The Chepy object. 
262
        """
263 4
        self.states[len(self.states)] = {}
264 4
        return self
265

266 4
    @ChepyDecorators.call_stack
267 4
    def copy_state(self, index: int):
268
        """Copy the current state to a new state
269
        
270
        Args:
271
            index (int): Index of new state
272
        
273
        Returns:
274
            Chepy: The Chepy object. 
275
        """
276 4
        self.states[index] = self.states.get(self._current_index)
277 4
        return self
278

279 4
    @ChepyDecorators.call_stack
280 4
    def change_state(self, index: int):
281
        """Change current state by index
282

283
        Same behaviour as switch_state
284
        
285
        Args:
286
            index (int): Index of new state
287
        
288
        Raises:
289
            TypeError: If specified index does not exist
290
        
291
        Returns:
292
            Chepy: The Chepy object.
293
        """
294
        if index > len(self.states):  # pragma: no cover
295
            raise TypeError("Specified index does not exist")
296 4
        self._current_index = index
297 4
        return self
298

299 4
    @ChepyDecorators.call_stack
300
    def switch_state(self, index: int):  # pragma: no cover
301
        """Switch current state by index
302

303
        Same behaviour as change_state
304
        
305
        Args:
306
            index (int): Index of new state
307
        
308
        Raises:
309
            TypeError: If specified index does not exist
310
        
311
        Returns:
312
            Chepy: The Chepy object.
313
        """
314
        if index > len(self.states):  # pragma: no cover
315
            raise TypeError("Specified index does not exist")
316
        self._current_index = index
317
        return self
318

319 4
    @ChepyDecorators.call_stack
320 4
    def delete_state(self, index: int):
321
        """Delete a state specified by the index
322
        
323
        Args:
324
            index (int): Index of state
325
        
326
        Returns:
327
            Chepy: The Chepy object. 
328
        """
329 4
        try:
330 4
            del self.states[index]
331
        except KeyError:  # pragma: no cover
332
            logging.warning("{} does not exist".format(index))
333 4
        return self
334

335 4
    @ChepyDecorators.call_stack
336 4
    def get_state(self, index: int) -> Any:
337
        """Returns the value of the specified state. 
338

339
        This method does not chain with other methods of Chepy
340
        
341
        Args:
342
            index (int): The index of the state
343
        
344
        Returns:
345
            Any: Any value that is in the specified state
346
        """
347 4
        return self.states.get(index)
348

349 4
    @ChepyDecorators.call_stack
350 4
    def save_buffer(self, index: int = None):
351
        """Save current state in a buffer 
352

353
        Buffers are temporary holding areas for anything that is in the state. 
354
        The state can change, but the buffer does not. Can be chained with other 
355
        methods. Use in conjunction with `load_buffer` to load buffer back into 
356
        the state. 
357
        
358
        Args:
359
            index (int, optional): The index to save the state in, defaults to next index if None
360
        
361
        Returns:
362
            Chepy: The Chepy object. 
363
        """
364 4
        if index is not None:
365 4
            self.buffers[index] = self.state
366
        else:
367 4
            self.buffers[len(self.buffers)] = self.state
368 4
        return self
369

370 4
    @ChepyDecorators.call_stack
371 4
    def load_buffer(self, index: int):
372
        """Load the specified buffer into state
373
        
374
        Args:
375
            index (int): Index key of an existing buffer
376
        
377
        Returns:
378
            Chepy: The Chepy object. 
379

380
        Examples:
381
            >>> c = Chepy("A").save_buffer()
382
            >>> # this saves the current value of state to a new buffer
383
            >>> c.to_hex()
384
            >>> # operate on a state, in this case, convert to hex.
385
            >>> c.state
386
            "41"
387
            >>> c.buffers
388
            {0: "A"}
389
            >>> c.load_buffer(0)
390
            >>> # loads the content of the buffer back into the current state. 
391
            >>> c.state
392
            "A"
393
        """
394 4
        self.state = self.buffers[index]
395 4
        return self
396

397 4
    @ChepyDecorators.call_stack
398 4
    def delete_buffer(self, index: int):
399
        """Delete a buffer item
400
        
401
        Args:
402
            index (int): Key of buffer item
403
        
404
        Returns:
405
            Chepy: The Chepy object. 
406
        """
407 4
        try:
408 4
            del self.buffers[index]
409
        except KeyError:  # pragma: no cover
410
            logging.warning("{} does not exist".format(index))
411 4
        return self
412

413 4
    @ChepyDecorators.call_stack
414 4
    def substring(self, pattern: Union[str, bytes], group: int = 0):
415
        """Choose a substring from current state as string 
416

417
        The preceeding methods will only run on the substring and 
418
        not the original state. Group capture is supported. 
419
        
420
        Args:
421
            pattern (Union[str, bytes]): Pattern to match.
422
            group (int, optional): Group to match. Defaults to 0.
423
        
424
        Returns:
425
            Chepy: The Chepy object. 
426
        """
427 4
        self.state = re.search(pattern, self._convert_to_str()).group(group)
428 4
        return self
429

430 4
    def _convert_to_bytes(self) -> bytes:
431
        """This method is used to coerce the curret object in 
432
        the state variable into a string. The method should be 
433
        called inside any method that operates on a string object 
434
        instead of calling `self.state` directly to avoid errors. 
435
        
436
        Raises:
437
            NotImplementedError: If type coercian isnt available 
438
                for the current state type.
439
        """
440 4
        if isinstance(self.state, bytes):
441 4
            return self.state
442 4
        elif isinstance(self.state, str):
443 4
            return self.state.encode()
444 4
        elif isinstance(self.state, int):
445 4
            return str(self.state).encode()
446 4
        elif isinstance(self.state, dict):
447 4
            return str(self.state).encode()
448 4
        elif isinstance(self.state, list):
449 4
            return str(self.state).encode()
450
        elif isinstance(self.state, bool):  # pragma: no cover
451
            return str(self.state).encode()
452 4
        elif isinstance(self.state, bytearray):
453 4
            return bytes(self.state)
454
        else:  # pragma: no cover
455
            # todo check more types here
456
            raise NotImplementedError
457

458 4
    def _convert_to_bytearray(self) -> bytearray:
459
        """Attempts to coerce the current state into a 
460
        `bytesarray` object
461
        """
462 4
        return bytearray(self._convert_to_bytes())
463

464 4
    def _convert_to_str(self) -> str:
465
        """This method is used to coerce the curret object in 
466
        the state variable into bytes. The method should be 
467
        called inside any method that operates on a bytes object 
468
        instead of calling `self.state` directly to avoid errors. 
469
        
470
        Raises:
471
            NotImplementedError: If type coercian isnt available 
472
                for the current state type.
473
        """
474 4
        if isinstance(self.state, bytes):
475 4
            return self.state.decode()
476 4
        elif isinstance(self.state, str):
477 4
            return self.state
478 4
        elif isinstance(self.state, int):
479 4
            return str(self.state)
480 4
        elif isinstance(self.state, dict):
481 4
            return str(self.state)
482 4
        elif isinstance(self.state, list):
483 4
            return str(self.state)
484
        elif isinstance(self.state, bool):  # pragma: no cover
485
            return str(self.state)
486 4
        elif isinstance(self.state, bytearray):
487 4
            return bytearray(self.state).decode()
488
        else:  # pragma: no cover
489
            # todo check more types here
490
            raise NotImplementedError
491

492 4
    def _convert_to_int(self) -> int:
493
        """This method is used to coerce the curret object in 
494
        the state variable into an int. The method should be 
495
        called inside any method that operates on a int types 
496
        instead of calling `self.state` directly to avoid errors. 
497
        
498
        Raises:
499
            NotImplementedError: If type coercian isnt available 
500
                for the current state type.
501
        """
502 4
        if isinstance(self.state, int):
503 4
            return self.state
504 4
        elif isinstance(self.state, str) or isinstance(self.state, bytes):
505 4
            return int(self.state)
506
        else:  # pragma: no cover
507
            raise NotImplementedError
508

509 4
    @property
510
    def o(self):
511
        """Get the final output
512
        
513
        Returns:
514
            Any: Final output
515
        """
516 4
        return self.state
517

518 4
    @property
519
    def output(self):
520
        """Get the final output
521
        
522
        Returns:
523
            Any: Final output
524
        """
525 4
        return self.state
526

527 4
    @ChepyDecorators.call_stack
528 4
    def out(self) -> Any:
529
        """Get the final output
530
        
531
        Returns:
532
            Any: Final output
533
        """
534 4
        return self.state
535

536 4
    @ChepyDecorators.call_stack
537 4
    def out_as_str(self) -> str:
538
        """Get current value as str
539
        
540
        Returns:
541
            str: Current value as a string
542
        """
543 4
        return self._convert_to_str()
544

545 4
    @ChepyDecorators.call_stack
546 4
    def out_as_bytes(self) -> bytes:
547
        """Get current value as bytes
548
        
549
        Returns:
550
            bytes: Current value as bytes
551
        """
552 4
        return self._convert_to_bytes()
553

554 4
    @ChepyDecorators.call_stack
555 4
    def get_by_index(self, index: int):
556
        """Get an item by specifying an index
557
        
558
        Args:
559
            index (int): Index number to get
560
        
561
        Returns:
562
            Chepy: The Chepy object.
563
        """
564 4
        self.state = self.state[index]
565 4
        return self
566

567 4
    @ChepyDecorators.call_stack
568 4
    def get_by_key(self, key: str):
569
        """Get an object from a dict by key
570
        
571
        Args:
572
            key (str): A valid key
573
        
574
        Returns:
575
            Chepy: The Chepy object.
576
        """
577 4
        if isinstance(self.state, dict):
578 4
            self.state = self.state.get(key)
579 4
            return self
580
        else:  # pragma: no cover
581
            raise TypeError("State is not a dictionary")
582

583 4
    @ChepyDecorators.call_stack
584
    def copy_to_clipboard(self) -> None:  # pragma: no cover
585
        """Copy to clipboard
586
        
587
        Copy the final output to the clipboard. If an 
588
        error is raised, refer to the documentation on the error.
589
        
590
        Returns:
591
            None: Copies final output to the clipboard
592
        """
593
        pyperclip.copy(self._convert_to_str())
594
        return None
595

596 4
    @ChepyDecorators.call_stack
597
    def copy(self) -> None:  # pragma: no cover
598
        """Copy to clipboard
599
        
600
        Copy the final output to the clipboard. If an 
601
        error is raised, refer to the documentation on the error.
602
        
603
        Returns:
604
            None: Copies final output to the clipboard
605
        """
606
        self.copy_to_clipboard()
607
        return None
608

609 4
    @ChepyDecorators.call_stack
610
    def web(
611
        self,
612
        magic: bool = False,
613
        cyberchef_url: str = "https://gchq.github.io/CyberChef/",
614
    ) -> None:  # pragma: no cover
615
        """Opens the current string in CyberChef on the browser as hex
616

617
        Args:
618
            magic (bool, optional): Start with the magic method in CyberChef
619
            cyberchef_url (string, optional): Base url for Cyberchef
620
        
621
        Returns:
622
            None: Opens the current data in CyberChef
623
        """
624
        data = re.sub(
625
            b"=+$", "", base64.b64encode(binascii.hexlify(self._convert_to_bytes()))
626
        )
627
        if magic:
628
            url = urljoin(
629
                cyberchef_url,
630
                "#recipe=From_Hex('None')Magic(3,false,false,'')&input={}".format(
631
                    data.decode()
632
                ),
633
            )
634
        else:
635
            url = urljoin(
636
                cyberchef_url,
637
                "#recipe=From_Hex('None')&input={}".format(data.decode()),
638
            )
639
        webbrowser.open_new_tab(url)
640
        return None
641

642 4
    @ChepyDecorators.call_stack
643 4
    def http_request(
644
        self,
645
        method: str = "GET",
646
        params: dict = {},
647
        json: dict = None,
648
        headers: dict = {},
649
        cookies: dict = {},
650
    ):
651
        """Make a http/s request
652

653
        Make a HTTP/S request and work with the data in Chepy. Most common http 
654
        methods are supported; but some methods may not provide a response body. 
655
        
656
        Args:
657
            method (str, optional): Request method. Defaults to 'GET'.
658
            params (dict, optional): Query Args. Defaults to {}.
659
            json (dict, optional): Request payload. Defaults to None.
660
            headers (dict, optional): Headers for request. Defaults to {}.
661
            cookies (dict, optional): Cookies for request. Defaults to {}.
662
        
663
        Raises:
664
            NotImplementedError: If state is not a string or dictionary
665
            requests.RequestException: If response status code is not 200
666
        
667
        Returns:
668
            Chepy: A dictionary containing body, status and headers. The Chepy object.
669

670
        Examples:
671
            By default, this methed with make a GET request, But supports most 
672
            common methods. 
673
            
674
                >>> c = Chepy("http://example.com").http_request()
675
                >>> c.get_by_key("headers")
676

677
            This method can also be used to make more complex requests by specifying 
678
            headers, cookies, body data etc.
679

680
                >>> c = Chepy("https://en4qpftrmznwq.x.pipedream.net")
681
                >>> c.http_request(
682
                >>>    method="POST", 
683
                >>>    headers={"My-header": "some header"}, 
684
                >>>    json={"some": "data"}
685
                >>> )
686
                >>> print(c.get_by_key("body"))
687
                {"success": true}
688
        """
689

690
        def json2str(obj):  # pragma: no cover
691
            if isinstance(obj, dict):
692
                return obj
693
            elif isinstance(obj, str):
694
                return json.loads(obj)
695
            else:
696
                raise NotImplementedError
697

698 4
        try:
699 4
            from requests import request
700
        except ImportError:  # pragma: no cover
701
            self._error_logger("Could not import requests. pip install requests")
702
            return self
703

704 4
        params = json2str(params)
705 4
        headers = json2str(headers)
706 4
        cookies = json2str(cookies)
707 4
        res = request(
708
            method=method,
709
            url=self.state,
710
            params=params,
711
            json=json,
712
            headers=headers,
713
            cookies=cookies,
714
        )
715 4
        self.state = {
716
            "body": res.text,
717
            "status": res.status_code,
718
            "headers": dict(res.headers),
719
        }
720 4
        return self
721

722 4
    @ChepyDecorators.call_stack
723 4
    def load_from_url(
724
        self,
725
        method: str = "GET",
726
        params: dict = {},
727
        json: dict = None,
728
        headers: dict = {},
729
        cookies: dict = {},
730
    ):
731
        """Load binary content from a url
732

733
        Most common http methods are supported; but some methods may not provide a response body. 
734
        
735
        Args:
736
            method (str, optional): Request method. Defaults to 'GET'.
737
            params (dict, optional): Query Args. Defaults to {}.
738
            json (dict, optional): Request payload. Defaults to None.
739
            headers (dict, optional): Headers for request. Defaults to {}.
740
            cookies (dict, optional): Cookies for request. Defaults to {}.
741
        
742
        Raises:
743
            NotImplementedError: If state is not a string or dictionary
744
            requests.RequestException: If response status code is not 200
745
        
746
        Returns:
747
            Chepy: A bytearray of the response content. The Chepy object.
748

749
        Examples:
750
            By default, this methed with make a GET request, But supports most 
751
            common methods. 
752
            
753
                >>> c = Chepy("http://example.com/file.png").load_from_url()
754
                >>> b'\\x89PNG...'
755
        """
756

757
        def json2str(obj):  # pragma: no cover
758
            if isinstance(obj, dict):
759
                return obj
760
            elif isinstance(obj, str):
761
                return json.loads(obj)
762
            else:
763
                raise NotImplementedError
764

765 4
        try:
766 4
            from requests import request
767
        except ImportError:  # pragma: no cover
768
            self._error_logger("Could not import requests. pip install requests")
769
            return self
770

771 4
        params = json2str(params)
772 4
        headers = json2str(headers)
773 4
        cookies = json2str(cookies)
774 4
        res = request(
775
            method=method,
776
            url=self.state,
777
            params=params,
778
            json=json,
779
            headers=headers,
780
            cookies=cookies,
781
        )
782 4
        self.state = io.BytesIO(res.content).read()
783 4
        return self
784

785 4
    @ChepyDecorators.call_stack
786 4
    def load_dir(self, pattern: str = "*"):
787
        """Load all file paths in a directory
788
        
789
        Args:
790
            pattern (str, optional): File pattern to match. Defaults to "*".
791
        
792
        Returns:
793
            Chepy: The Chepy object. 
794
        """
795 4
        files = [x for x in pathlib.Path(self.state).glob(pattern) if x.is_file()]
796 4
        self.states = {x[0]: str(x[1]) for x in enumerate(files) if x[1].is_file()}
797 4
        return self
798

799 4
    @ChepyDecorators.call_stack
800 4
    def load_file(self, binary_mode: bool = False):
801
        """If a path is provided, load the file
802
        
803
        Args:
804
            binary_mode (bool, optional): Force load in binary mode.
805
        
806
        Returns:
807
            Chepy: The Chepy object. 
808
        
809
        Examples:
810
            >>> c = Chepy("/path/to/file")
811
            >>> # at the moment, the state only contains the string "/path/to/file"
812
            >>> c.load_file() # this will load the file content into the state
813
        """
814 4
        path = pathlib.Path(str(self.state)).expanduser().absolute()
815 4
        if binary_mode:
816 4
            with open(path, "rb") as f:
817 4
                self.states[self._current_index] = bytearray(f.read())
818
        else:
819 4
            try:
820 4
                with open(path, "r") as f:  # type: ignore
821 4
                    self.states[self._current_index] = f.read()
822 4
            except UnicodeDecodeError:
823 4
                with open(path, "rb") as f:
824 4
                    self.states[self._current_index] = bytearray(f.read())
825 4
        return self
826

827 4
    def write_to_file(self, path: str) -> None:
828
        """Save the state to disk. Return None.
829
        
830
        Args:
831
            path (str): The file path to save in.
832
        
833
        Returns:
834
            None: Returns None
835

836
        Examples:
837
            >>> c = Chepy("some data").write_to_file('/some/path/file', as_binary=True)
838
        """
839
        if isinstance(path, bytes):  # pragma: no cover
840
            path = path.decode()
841 4
        with open(str(self._abs_path(path)), "w+") as f:
842 4
            f.write(self._convert_to_str())
843 4
        self._info_logger("File written to {}".format(self._abs_path(path)))
844 4
        return None
845

846
    def write_binary(self, path: str) -> None:  # pragma: no cover
847
        """Save the state to disk. Return None.
848
        
849
        Args:
850
            path (str): The file path to save in.
851
        
852
        Returns:
853
            None: Returns None
854

855
        Examples:
856
            >>> c = Chepy("some data").write_binary('/some/path/file')
857
        """
858
        if isinstance(path, bytes):  # pragma: no cover
859
            path = path.decode()
860
        with open(str(self._abs_path(path)), "wb+") as f:
861
            f.write(self.state)
862
        self._info_logger("File written to {}".format(self._abs_path(path)))
863
        return None
864

865 4
    def save_recipe(self, path: str):
866
        """Save the current recipe
867

868
        A recipe will be all the previous methdos called on the 
869
        chepy instance along with their args
870
        
871
        Args:
872
            path (str): The path to save the recipe
873
        
874
        Returns:
875
            Chepy: The Chepy object. 
876

877
        Examples:
878
            >>> c = Chepy("some data").to_hex().base64_encode()
879
            >>> c.save_recipe("/path/to/recipe)
880
            >>> c.out()
881
            NzM2ZjZkNjUyMDY0NjE3NDYx
882
        """
883 4
        with self._abs_path(path) as f:
884 4
            f.write_text(ujson.dumps(self._stack))
885 4
        self._info_logger("Saved recipe to {}".format(str(path)))
886 4
        return self
887

888 4
    def load_recipe(self, path: str):
889
        """Load and run a recipe
890
        
891
        Args:
892
            path (str): Path to recipe file
893
        
894
        Returns:
895
            Chepy: The Chepy object.
896

897
        Examples:
898
            >>> c = Chepy("some data").load_recipe("/path/to/recipe").out()
899
            NzM2ZjZkNjUyMDY0NjE3NDYx
900
        """
901 4
        with self._abs_path(path) as f:
902 4
            recipes = ujson.loads(f.read_text())
903 4
            for recipe in recipes:
904 4
                function = recipe["function"]
905 4
                args = recipe["args"]
906 4
                if len(args) > 0:
907 4
                    getattr(self, function)(**args)
908
                else:
909 4
                    getattr(self, function)()
910 4
        return self
911

912 4
    @ChepyDecorators.call_stack
913 4
    def loop(self, iterations: int, callback: str, args: dict = {}):
914
        """Loop and apply callback n times
915
        
916
        Args:
917
            iterations (int): Number of iterations to loop
918
            callback (str): The Chepy method to loop over
919
            args (dict, optional): Optional arguments for the callback. Defaults to {}.
920
        
921
        Returns:
922
            Chepy: The Chepy object. 
923

924
        Examples:
925
            >>> c = Chepy("VmpGb2QxTXhXWGxTYmxKV1lrZDRWVmx0ZEV0alZsSllaVWRHYWxWVU1Eaz0=")
926
            >>> c.loop(iterations=6, callback='hmac_hash', args={'key': 'secret'})
927
            securisec
928
        """
929 4
        assert isinstance(callback, str), "Callback must be a string"
930 4
        assert isinstance(iterations, int), "Iterations must be an intiger"
931 4
        assert isinstance(args, dict), "Args must be a dick"
932

933 4
        stack_loop_index = next(
934
            itertools.dropwhile(
935
                lambda x: self._stack[x]["function"] != "loop",
936
                reversed(range(len(self._stack))),
937
            )
938
        )
939

940 4
        for _ in range(int(iterations)):
941 4
            d = getattr(self, callback)(**args)
942

943 4
        self._stack = self._stack[: stack_loop_index + 1]
944 4
        return self
945

946 4
    @ChepyDecorators.call_stack
947 4
    def loop_list(self, callback: str, args: dict = {}):
948
        """Loop over an array and run a Chepy method on it
949
        
950
        Args:
951
            callback (str): Chepy method as string
952
            args (dict, optional): Dictionary of args. If in cli, dont use spaces. Defaults to {}.
953
        
954
        Returns:
955
            Chepy: The Chepy object
956

957
        Examples:
958
            This method is capable of running a callable from either 
959
            a string, or a chepy method.
960

961
            >>> c = Chepy(["an", "array"])
962
            >>> c.loop_list('to_hex').loop_list('hmac_hash', {'key': 'secret'})
963
            ['5cbe6ca2a66b380aec1449d4ebb0d40ac5e1b92e', '30d75bf34740e8781cd4ec7b122e3efd8448e270']
964
        """
965 4
        assert isinstance(self.state, list), "State is not a list"
966 4
        assert isinstance(callback, str), "Callback must be a string"
967 4
        hold = []
968 4
        current_state = self.state
969
        # find the last index that this method was run
970 4
        stack_loop_index = next(
971
            itertools.dropwhile(
972
                lambda x: self._stack[x]["function"] != "loop_list",
973
                reversed(range(len(self._stack))),
974
            )
975
        )
976
        if isinstance(args, str):  # pragma: no cover
977
            args = ujson.loads(args)
978 4
        try:
979 4
            for index, data in enumerate(current_state):
980 4
                self.state = current_state[index]
981 4
                if args:
982 4
                    hold.append(getattr(self, callback)(**args).o)
983
                else:
984 4
                    hold.append(getattr(self, callback)().o)
985 4
            self._stack = self._stack[: stack_loop_index + 1]
986 4
            self.state = hold
987 4
            return self
988
        except:  # pragma: no cover
989
            self.state = current_state
990
            raise
991

992 4
    @ChepyDecorators.call_stack
993 4
    def loop_dict(self, keys: list, callback: str, args: dict = {}):
994
        """
995
        Loop over a dictionary and apply the callback to the value
996
        
997
        Args:
998
            keys (list): List of keys to match. If in cli, dont use spaces.
999
            callback (str): Chepy method as string
1000
            args (dict, optional): Dictionary of args. If in cli, dont use spaces. Defaults to {}.
1001
        
1002
        Returns:
1003
            Chepy: The Chepy object. 
1004

1005
        Examples:
1006
            >>> c = Chepy({'some': 'hahahaha', 'lol': 'aahahah'})
1007
            >>> c.loop_dict(['some'], 'hmac_hash', {'key': 'secret'}).o
1008
            {'some': '99f77ec06a3c69a4a95371a7888245ba57f47f55', 'lol': 'aahahah'}
1009
            
1010
            We can combine `loop_list` and `loop_dict` to loop over a list of dictionaries.
1011

1012
            >>> data = [{"some": "val"}, {"some": "another"}, {"lol": "lol"}, {"another": "aaaa"}]
1013
            >>> c = Chepy(data)
1014
            >>> c.loop_list("loop_dict", {"keys": ["some", "lol"], "callback": "to_upper_case"})
1015
            [
1016
                {"some": "VAL"},
1017
                {"some": "ANOTHER"},
1018
                {"lol": "LOL"},
1019
                {"another": "aaaa"},
1020
            ]
1021
        """
1022 4
        assert isinstance(callback, str), "Callback must be a string"
1023 4
        hold = {}
1024 4
        current_state = self.state
1025
        # find the last index that this method was run
1026 4
        stack_loop_index = next(
1027
            itertools.dropwhile(
1028
                lambda x: self._stack[x]["function"] != "loop_dict",
1029
                reversed(range(len(self._stack))),
1030
            )
1031
        )
1032

1033
        if isinstance(keys, str):  # pragma: no cover
1034
            keys = ujson.loads(keys)
1035

1036
        if isinstance(args, str):  # pragma: no cover
1037
            args = ujson.loads(args)
1038 4
        try:
1039 4
            dict_keys = current_state.keys()
1040 4
            for key in keys:
1041 4
                if current_state.get(key) is not None:
1042 4
                    self.state = current_state.get(key)
1043 4
                    if args:
1044 4
                        hold[key] = getattr(self, callback)(**args).o
1045
                    else:
1046 4
                        hold[key] = getattr(self, callback)().o
1047 4
            for unmatched_key in list(set(dict_keys) - set(keys)):
1048 4
                hold[unmatched_key] = current_state[unmatched_key]
1049 4
            self._stack = self._stack[: stack_loop_index + 1]
1050 4
            self.state = hold
1051 4
            return self
1052
        except:  # pragma: no cover
1053
            self.state = current_state
1054
            raise
1055

1056 4
    @ChepyDecorators.call_stack
1057 4
    def debug(self, verbose: bool = False):
1058
        """Debug the current instance of Chepy
1059
        
1060
        This method does not change the state. 
1061

1062
        Args:
1063
            verbose (bool, optional): Show verbose info. Defaults to False.
1064
        
1065
        Returns:
1066
            Chepy: The Chepy object. 
1067
        """
1068 4
        print(cyan("Current state:"), yellow(str(self._current_index)))
1069 4
        print(cyan("Current states:"), yellow(str(len(self.states))))
1070 4
        print(
1071
            cyan("Current state types:"),
1072
            yellow(str({k: type(v).__name__ for k, v in self.states.items()})),
1073
        )
1074 4
        print(cyan("Current buffers:"), yellow(str(len(self.buffers))))
1075 4
        print(
1076
            cyan("Current buffer types:"),
1077
            yellow(str({k: type(v).__name__ for k, v in self.buffers.items()})),
1078
        )
1079 4
        if verbose:
1080 4
            print(magenta("States:"), self.states)
1081 4
            print(magenta("Buffers:"), self.buffers)
1082 4
        return self
1083

1084 4
    @ChepyDecorators.call_stack
1085
    def reset(self):
1086
        """Reset states back to their initial values
1087
        
1088
        Returns:
1089
            Chepy: The Chepy object. 
1090
        """
1091 4
        self.states = self.__initial_states
1092 4
        return self
1093

1094 4
    @ChepyDecorators.call_stack
1095
    def load_command(self):  # pragma: no cover
1096
        """Run the command in state and get the output
1097
        
1098
        Returns:
1099
            Chepy: The Chepy object. 
1100

1101
        Examples:
1102
            This method can be used to interace with the shell and Chepy 
1103
            directly by ingesting a commands output in Chepy. 
1104

1105
            >>> c = Chepy("ls -l").shell_output().o
1106
            test.html
1107
            ...
1108
            test.py
1109
        """
1110
        self.state = subprocess.getoutput(self.state)
1111
        return self
1112

1113 4
    @ChepyDecorators.call_stack
1114
    def pretty(self, indent: int = 2):  # pragma: no cover
1115
        """Prettify the state. 
1116
        
1117
        Args:
1118
            indent (int, optional): Indent level. Defaults to 2.
1119
        
1120
        Returns:
1121
            Chepy: The Chepy object. 
1122
        """
1123
        self.state = pformat(self.state, indent=int(indent))
1124
        return self
1125

1126
    def plugins(self, enable: str) -> None:  # pragma: no cover
1127
        """Use this method to enable or disable Chepy plugins. 
1128

1129
        Valid options are `true` or `false`. Once this method completes, 
1130
        it does call sys.exit(). 
1131

1132
        Args:
1133
            enable (str): Set to `true` or `false`
1134

1135
        Returns:
1136
            None
1137
        """
1138
        assert enable in ["true", "false"], "Valid values are true and false"
1139
        conf_path = pathlib.Path().home() / ".chepy" / "chepy.conf"
1140
        c = ConfigParser()
1141
        c.read(conf_path)
1142
        c.set("Plugins", "enableplugins", enable)
1143
        with open(conf_path, "w") as f:
1144
            c.write(f)
1145
        if enable:
1146
            self._info_logger(
1147
                green(
1148
                    "Plugins have been enabled. Restart Chepy for effects to take place."
1149
                )
1150
            )
1151
        else:
1152
            self._info_logger(
1153
                green(
1154
                    "Plugins have been disabled. Restart Chepy for effects to take place."
1155
                )
1156
            )
1157
        sys.exit()
1158
        return None
1159

1160
    def set_plugin_path(self, path: str) -> None:  # pragma: no cover
1161
        """Use this method to set the path for Chepy plugins. 
1162

1163
        Args:
1164
            path (str): Path to plugins directory
1165

1166
        Returns:
1167
            None
1168
        """
1169
        expand_path = self._abs_path(path)
1170
        if expand_path.exists():
1171
            conf_path = pathlib.Path().home() / ".chepy" / "chepy.conf"
1172
            c = ConfigParser()
1173
            c.read(conf_path)
1174
            c.set("Plugins", "pluginpath", str(expand_path))
1175
            with open(conf_path, "w") as f:
1176
                c.write(f)
1177
            self._info_logger(green("Plugin path has been set. Restart for changes."))
1178
            sys.exit()
1179
            return None
1180
        else:
1181
            raise AttributeError("The path does not exist")

Read our documentation on viewing source code .

Loading