In connections_import, each connection was deserialized and stored into a Connection model instance rather than a dictionary, so an erroneous call to the dictionary methods .items() resulted in an AttributeError. With this fix, connection information is loaded from dictionaries directly into the Connection constructor and committed to the DB.
Showing 2 of 3 files from the diff.
Newly tracked file
airflow/cli/commands/connection_command.py
changed.
Newly tracked file
airflow/models/connection.py
changed.
Other files ignored by Codecov
tests/cli/commands/test_connection_command.py
has changed.
@@ -28,7 +28,7 @@
Loading
28 | 28 | from airflow.exceptions import AirflowNotFoundException |
|
29 | 29 | from airflow.hooks.base import BaseHook |
|
30 | 30 | from airflow.models import Connection |
|
31 | - | from airflow.secrets.local_filesystem import _create_connection, load_connections_dict |
|
31 | + | from airflow.secrets.local_filesystem import _parse_secret_file |
|
32 | 32 | from airflow.utils import cli as cli_utils, yaml |
|
33 | 33 | from airflow.utils.cli import suppress_logs_and_warning |
|
34 | 34 | from airflow.utils.session import create_session |
@@ -238,7 +238,7 @@
Loading
238 | 238 | ||
239 | 239 | @cli_utils.action_logging |
|
240 | 240 | def connections_import(args): |
|
241 | - | """Imports connections from a given file""" |
|
241 | + | """Imports connections from a file""" |
|
242 | 242 | if os.path.exists(args.file): |
|
243 | 243 | _import_helper(args.file) |
|
244 | 244 | else: |
@@ -246,31 +246,16 @@
Loading
246 | 246 | ||
247 | 247 | ||
248 | 248 | def _import_helper(file_path): |
|
249 | - | """Helps import connections from a file""" |
|
250 | - | connections_dict = load_connections_dict(file_path) |
|
249 | + | """Load connections from a file and save them to the DB. On collision, skip.""" |
|
250 | + | connections_dict = _parse_secret_file(file_path) |
|
251 | 251 | with create_session() as session: |
|
252 | - | for conn_id, conn_values in connections_dict.items(): |
|
252 | + | for conn_id, conn_dict in connections_dict.items(): |
|
253 | 253 | if session.query(Connection).filter(Connection.conn_id == conn_id).first(): |
|
254 | 254 | print(f'Could not import connection {conn_id}: connection already exists.') |
|
255 | 255 | continue |
|
256 | 256 | ||
257 | - | allowed_fields = [ |
|
258 | - | 'extra', |
|
259 | - | 'description', |
|
260 | - | 'conn_id', |
|
261 | - | 'login', |
|
262 | - | 'conn_type', |
|
263 | - | 'host', |
|
264 | - | 'password', |
|
265 | - | 'schema', |
|
266 | - | 'port', |
|
267 | - | 'uri', |
|
268 | - | 'extra_dejson', |
|
269 | - | ] |
|
270 | - | filtered_connection_values = { |
|
271 | - | key: value for key, value in conn_values.items() if key in allowed_fields |
|
272 | - | } |
|
273 | - | connection = _create_connection(conn_id, filtered_connection_values) |
|
257 | + | # Add the connection to the DB |
|
258 | + | connection = Connection(conn_id, **dict(conn_dict.items())) |
|
274 | 259 | session.add(connection) |
|
275 | 260 | session.commit() |
|
276 | 261 | print(f'Imported connection {conn_id}') |
@@ -117,7 +117,7 @@
Loading
117 | 117 | password: Optional[str] = None, |
|
118 | 118 | schema: Optional[str] = None, |
|
119 | 119 | port: Optional[int] = None, |
|
120 | - | extra: Optional[Union[str,dict]] = None, |
|
120 | + | extra: Optional[Union[str, dict]] = None, |
|
121 | 121 | uri: Optional[str] = None, |
|
122 | 122 | ): |
|
123 | 123 | super().__init__() |
Files | Coverage |
---|---|
airflow | 88.45% |
Project Totals (1243 files) | 88.45% |
884112843
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file.
The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files.
The size and color of each slice is representing the number of statements and the coverage, respectively.