1
#!/usr/bin/env python
2
# Copyright 2019-2020 AstroLab Software
3
# Author: Abhishek Chauhan, Julien Peloton
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16 4
import unittest
17 4
import os
18 4
import io
19 4
import confluent_kafka
20 4
import fastavro
21 4
from fink_client.consumer import AlertConsumer
22 4
from fink_client.avroUtils import AlertReader
23 4
from fink_client.avroUtils import encode_into_avro
24 4
from fink_client.avroUtils import get_legal_topic_name
25

26 4
from fink_client.configuration import load_credentials
27

28

29 4
class TestIntegration(unittest.TestCase):
30

31 4
    def setUp(self):
32

33 4
        data_path = os.path.abspath(os.path.join(
34
            os.path.dirname(__file__), 'data'))
35 4
        schema_path = os.path.abspath(os.path.join(
36
            os.path.dirname(__file__),
37
            '../schemas/distribution_schema_0p2.avsc'))
38

39 4
        r = AlertReader(data_path)
40 4
        alerts = r.to_list()
41

42 4
        conf = load_credentials(tmp=True)
43

44 4
        kafka_servers = conf['servers']
45 4
        p = confluent_kafka.Producer({
46
            'bootstrap.servers': kafka_servers})
47

48 4
        for alert in alerts:
49 4
            avro_data = encode_into_avro(alert, schema_path)
50 4
            topic = get_legal_topic_name(alert['cdsxmatch'])
51 4
            p.produce(topic, avro_data)
52 4
        p.flush()
53

54
        # instantiate an AlertConsumer
55 4
        mytopics = conf["mytopics"]
56

57 4
        myconfig = {
58
            'bootstrap.servers': kafka_servers,
59
            'group_id': conf['group_id']}
60

61 4
        self.consumer = AlertConsumer(mytopics, myconfig, schema=schema_path)
62

63 4
    def test_poll(self):
64 4
        topic, alert = self.consumer.poll()
65 4
        self.assertIsNotNone(alert)
66 4
        self.assertTrue(fastavro.validate(alert, self.consumer._parsed_schema))
67

68 4
    def test_consume(self):
69 4
        num_messages = 1
70 4
        alerts = self.consumer.consume(num_messages)
71 4
        self.assertEqual(len(alerts), num_messages)
72

73 4
    def test_topics(self):
74 4
        topics = self.consumer.available_topics()
75 4
        self.assertTrue('rrlyr' in topics.keys())
76

77 4
    def test_broker_name(self):
78 4
        brokers = self.consumer.available_brokers()
79 4
        self.assertTrue(0 in brokers.keys())
80

81 4
    def tearDown(self):
82 4
        self.consumer.close()
83

84

85 4
class TestComponents(unittest.TestCase):
86

87 4
    def test_get_alert_schema(self):
88
        # download and check if a valid schema is downloaded
89 4
        from fink_client.consumer import _get_alert_schema
90 4
        schema = _get_alert_schema()
91 4
        self.assertIsInstance(schema, dict)
92

93 4
    def test_get_kafka_config(self):
94 4
        from fink_client.consumer import _get_kafka_config
95 4
        myconfig = {
96
            "username": "Alice",
97
            "password": "Alice-secret",
98
            "group_id": "test_group"
99
        }
100 4
        kafka_config = _get_kafka_config(myconfig)
101

102 4
        valid_config = (
103
            "security.protocol" in kafka_config and
104
            "sasl.mechanism" in kafka_config and
105
            "group.id" in kafka_config and
106
            "bootstrap.servers" in kafka_config
107
        )
108

109 4
        self.assertTrue(valid_config)
110

111 4
    def test_decode_avro_alert(self):
112 4
        from fink_client.consumer import _decode_avro_alert
113 4
        schema = {
114
            'name': 'test',
115
            'type': 'record',
116
            'fields': [
117
                {'name': 'name', 'type': 'string'},
118
                {'name': 'fav_num', 'type': 'int'}
119
            ]
120
        }
121 4
        record = {u'name': u'Alice', u'fav_num': 63}
122

123 4
        b = io.BytesIO()
124 4
        fastavro.schemaless_writer(b, schema, record)
125 4
        read_record = _decode_avro_alert(b, schema)
126

127 4
        self.assertDictEqual(record, read_record)
128

129

130 4
if __name__ == "__main__":
131 4
    unittest.main()

Read our documentation on viewing source code .

Loading