1
"""
2
Copywrite:
3
This product includes GeoLite2 data created by MaxMind, available from
4
https://www.maxmind.com
5
(attribution requirement)
6
"""
7

8 8
import logging
9

10 8
logger = logging.getLogger(__name__)
11

12

13 8
class API_AccessLogger:
14
    """
15
    Extract access information from HTTP requests to be saved by the database
16
    Calculate geo data using geoip2 if the library and its files are available
17
    otherwise, just extracts the basic information
18
    """
19

20 8
    def __init__(self, geo_file_path):
21

22 8
        self.geoip2_reader = None
23 8
        try:
24 8
            import geoip2.database
25

26 0
            self.geoip2_reader = geoip2.database.Reader(geo_file_path)
27 0
            logger.info(f"Initialized geoip2 with {geo_file_path} successfully.")
28 8
        except ImportError:
29 8
            logger.error(
30
                f"Cannot import geoip2 module. To use API access logging, you need "
31
                f"to install it manually using `pip install geoip2`"
32
            )
33 0
        except FileNotFoundError:
34 0
            logger.error(
35
                f"Geoip cites file cannot be read from {geo_file_path}.\n"
36
                f"Make sure to manually download the file from: \n"
37
                f"https://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz\n"
38
                f"Then, set the geo_file_path in qcfractal_config.yaml in your base_folder "
39
                f"(default base_folder is ~/.qca/qcfractal/qcfractal_config.yaml)."
40
            )
41

42 8
    def get_api_access_log(self, request, access_type=None, extra_params=None):
43

44 0
        log = {}
45

46 0
        if not access_type:
47 0
            log["access_type"] = request.uri[1:]  # remove /
48
        else:
49 0
            log["access_type"] = access_type
50

51 0
        log["access_method"] = request.method  # GET or POST
52

53
        # get the real IP address behind a proxy or ngnix
54 0
        x_real_ip = request.headers.get("X-Real-IP", None)
55 0
        log["ip_address"] = x_real_ip or request.remote_ip
56

57 0
        log["user_agent"] = request.headers["User-Agent"]
58

59
        # TODO: extract needed info, maybe handle by caller
60 0
        log["extra_params"] = extra_params
61
        # Or, will saved as string anyway
62
        # log.extra_access_params = request.json
63

64
        # extra geo data if available
65 0
        extra = self.get_geoip2_data(log["ip_address"])
66 0
        log.update(extra)
67

68 0
        return log
69

70 8
    def get_geoip2_data(self, ip_address):
71 0
        out = {}
72

73 0
        if not self.geoip2_reader:
74 0
            return out
75

76 0
        try:
77 0
            loc_data = self.geoip2_reader.city(ip_address)
78 0
            out["city"] = loc_data.city.name
79 0
            out["country"] = loc_data.country.name
80 0
            out["country_code"] = loc_data.country.iso_code
81 0
            out["ip_lat"] = str(loc_data.location.latitude)
82 0
            out["ip_long"] = str(loc_data.location.longitude)
83 0
            out["postal_code"] = loc_data.postal.code
84 0
            out["subdivision"] = loc_data.subdivisions.most_specific.name
85 0
        except:  # lgtm [py/catch-base-exception]
86 0
            logger.error(f"Problem getting geoip2 data for {ip_address}")
87

88 0
        return out

Read our documentation on viewing source code .

Loading