1
#!/usr/bin/env python
2
# Copyright 2019-2020 AstroLab Software
3
# Author: Abhishek Chauhan, Julien Peloton
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16 4
import io
17 4
import confluent_kafka
18 4
from confluent_kafka import KafkaError
19

20 4
from fink_client.avroUtils import write_alert
21 4
from fink_client.avroUtils import _get_alert_schema
22 4
from fink_client.avroUtils import _decode_avro_alert
23

24 4
class AlertError(Exception):
25 4
    pass
26

27

28 4
class AlertConsumer:
29
    """
30
    High level Kafka consumer to receive alerts from Fink broker
31
    """
32

33 4
    def __init__(self, topics: list, config: dict, schema=None):
34
        """Creates an instance of `AlertConsumer`
35

36
        Parameters
37
        ----------
38
        topics : list of str
39
            list of topics to subscribe
40
        config: dict
41
            Dictionary of configurations. Allowed keys are:
42
            username: str
43
                username for API access
44
            password: str
45
                password for API access
46
            group_id: str
47
                group.id for Kafka consumer
48
            bootstrap.servers: str, optional
49
                Kafka servers to connect to
50
        """
51 4
        self._topics = topics
52 4
        self._kafka_config = _get_kafka_config(config)
53 4
        self._parsed_schema = _get_alert_schema(schema_path=schema)
54 4
        self._consumer = confluent_kafka.Consumer(self._kafka_config)
55 4
        self._consumer.subscribe(self._topics)
56

57 4
    def __enter__(self):
58 0
        return self
59

60 4
    def __exit__(self, type, value, traceback):
61 0
        self._consumer.close()
62

63 4
    def poll(self, timeout: float = -1) -> (str, dict):
64
        """ Consume one message from Fink server
65

66
        Parameters
67
        ----------
68
        timeout: float, optional
69
            maximum time to block waiting for a message
70
            if not set default is None i.e. wait indefinitely
71

72
        Returns
73
        ----------
74
        (topic, alert): tuple(str, dict)
75
            returns (None, None) on timeout
76
        """
77 4
        msg = self._consumer.poll(timeout)
78 4
        if msg is None:
79 0
            return None, None
80

81
        # msg.error() returns None or KafkaError
82 4
        if msg.error():
83 0
            error_message = """
84
            Error: {} topic: {}[{}] at offset: {} with key: {}
85
            """.format(
86
                msg.error(), msg.topic(),
87
                msg.partition(), msg.offset(),
88
                str(msg.key())
89
            )
90 0
            raise AlertError(error_message)
91

92 4
        topic = msg.topic()
93 4
        avro_alert = io.BytesIO(msg.value())
94 4
        alert = _decode_avro_alert(avro_alert, self._parsed_schema)
95

96 4
        return topic, alert
97

98 4
    def consume(self, num_alerts: int = 1, timeout: float = -1) -> list:
99
        """ Consume and return list of messages
100

101
        Parameters
102
        ----------
103
        num_messages: int
104
            maximum number of messages to return
105
        timeout: float
106
            maximum time to block waiting for messages
107
            if not set default is None i.e. wait indefinitely
108

109
        Returns
110
        ----------
111
        list: [tuple(str, dict)]
112
            list of topic, alert
113
            returns an empty list on timeout
114
        """
115 4
        alerts = []
116 4
        msg_list = self._consumer.consume(num_alerts, timeout)
117

118 4
        for msg in msg_list:
119 4
            topic = msg.topic()
120 4
            avro_alert = io.BytesIO(msg.value())
121 4
            alert = _decode_avro_alert(avro_alert, self._parsed_schema)
122

123 4
            alerts.append((topic, alert))
124

125 4
        return alerts
126

127 4
    def poll_and_write(
128
            self, outdir: str, timeout: float = -1,
129
            overwrite: bool = False) -> (str, dict):
130
        """ Consume one message from Fink server, save alert on disk and
131
        return (topic, alert)
132

133
        Parameters
134
        ----------
135
        outdir: str
136
            Folder to store the alert. It must exists.
137
        timeout: float, optional
138
            maximum time to block waiting for messages
139
            if not set default is None i.e. wait indefinitely
140
        overwrite: bool, optional
141
            If True, allow an existing alert to be overwritten.
142
            Default is False.
143

144
        Returns
145
        ----------
146
        (topic, alert): tuple(str, dict)
147
            returns (None, None) on timeout
148

149
        """
150 0
        topic, alert = self.poll(timeout)
151 0
        if topic is not None:
152
            # print('Alert written at {}'.format(outdir))
153 0
            write_alert(alert, self._parsed_schema, outdir, overwrite=overwrite)
154

155 0
        return topic, alert
156

157 4
    def available_topics(self) -> dict:
158
        """ Return available broker topics
159

160
        Note, this routine only display topics, but users need
161
        to be authorized to poll data.
162

163
        Returns
164
        ---------
165
        topics: dict
166
            Keys are topic names, values are metadata
167

168
        """
169 4
        return self._consumer.list_topics().topics
170

171 4
    def available_brokers(self) -> dict:
172
        """ Return available brokers
173

174
        Returns
175
        ---------
176
        brokers: dict
177
            Keys are broker ID, values are metadata with IP:PORT
178

179
        """
180 4
        return self._consumer.list_topics().brokers
181

182 4
    def close(self):
183
        """Close connection to Fink broker"""
184 4
        self._consumer.close()
185

186

187 4
def _get_kafka_config(config: dict) -> dict:
188
    """Returns configurations for a consumer instance
189

190
    Parameters
191
    ----------
192
    config: dict
193
        Dictionary of configurations
194

195
    Returns
196
    ----------
197
    kafka_config: dict
198
        Dictionary with configurations for creating an instance of
199
        a secured Kafka consumer
200
    """
201 4
    kafka_config = {}
202 4
    default_config = {
203
        "auto.offset.reset": "earliest"
204
    }
205

206 4
    if 'username' in config and 'password' in config:
207 4
        kafka_config["security.protocol"] = "sasl_plaintext"
208 4
        kafka_config["sasl.mechanism"] = "SCRAM-SHA-512"
209 4
        kafka_config["sasl.username"] = config["username"]
210 4
        kafka_config["sasl.password"] = config["password"]
211

212 4
    kafka_config["group.id"] = config["group_id"]
213

214 4
    kafka_config.update(default_config)
215

216
    # use servers if given
217 4
    if 'bootstrap.servers' in config:
218 4
        kafka_config["bootstrap.servers"] = config["bootstrap.servers"]
219
    else:
220
        # use default fink_servers
221 4
        fink_servers = [
222
            "localhost:9093",
223
            "localhost:9094",
224
            "localhost:9095"
225
        ]
226 4
        kafka_config["bootstrap.servers"] = "{}".format(",".join(fink_servers))
227

228 4
    return kafka_config

Read our documentation on viewing source code .

Loading