airflow/models/connection.py
changed.
Other files ignored by Codecov
tests/cli/commands/test_connection_command.py
has changed.
28 | 28 | from airflow.exceptions import AirflowNotFoundException |
|
29 | 29 | from airflow.hooks.base import BaseHook |
|
30 | 30 | from airflow.models import Connection |
|
31 | - | from airflow.secrets.local_filesystem import _create_connection, load_connections_dict |
|
31 | + | from airflow.secrets.local_filesystem import load_connections_dict |
|
32 | 32 | from airflow.utils import cli as cli_utils, yaml |
|
33 | 33 | from airflow.utils.cli import suppress_logs_and_warning |
|
34 | 34 | from airflow.utils.session import create_session |
238 | 238 | ||
239 | 239 | @cli_utils.action_logging |
|
240 | 240 | def connections_import(args): |
|
241 | - | """Imports connections from a given file""" |
|
241 | + | """Imports connections from a file""" |
|
242 | 242 | if os.path.exists(args.file): |
|
243 | 243 | _import_helper(args.file) |
|
244 | 244 | else: |
246 | 246 | ||
247 | 247 | ||
248 | 248 | def _import_helper(file_path): |
|
249 | - | """Helps import connections from a file""" |
|
249 | + | """Load connections from a file and save them to the DB. On collision, skip.""" |
|
250 | 250 | connections_dict = load_connections_dict(file_path) |
|
251 | 251 | with create_session() as session: |
|
252 | - | for conn_id, conn_values in connections_dict.items(): |
|
252 | + | for conn_id, conn in connections_dict.items(): |
|
253 | 253 | if session.query(Connection).filter(Connection.conn_id == conn_id).first(): |
|
254 | 254 | print(f'Could not import connection {conn_id}: connection already exists.') |
|
255 | 255 | continue |
|
256 | 256 | ||
257 | - | allowed_fields = [ |
|
258 | - | 'extra', |
|
259 | - | 'description', |
|
260 | - | 'conn_id', |
|
261 | - | 'login', |
|
262 | - | 'conn_type', |
|
263 | - | 'host', |
|
264 | - | 'password', |
|
265 | - | 'schema', |
|
266 | - | 'port', |
|
267 | - | 'uri', |
|
268 | - | 'extra_dejson', |
|
269 | - | ] |
|
270 | - | filtered_connection_values = { |
|
271 | - | key: value for key, value in conn_values.items() if key in allowed_fields |
|
272 | - | } |
|
273 | - | connection = _create_connection(conn_id, filtered_connection_values) |
|
274 | - | session.add(connection) |
|
257 | + | session.add(conn) |
|
275 | 258 | session.commit() |
|
276 | 259 | print(f'Imported connection {conn_id}') |
19 | 19 | import json |
|
20 | 20 | import warnings |
|
21 | 21 | from json import JSONDecodeError |
|
22 | - | from typing import Dict, Optional |
|
22 | + | from typing import Dict, Optional, Union |
|
23 | 23 | from urllib.parse import parse_qsl, quote, unquote, urlencode, urlparse |
|
24 | 24 | ||
25 | 25 | from sqlalchemy import Boolean, Column, Integer, String, Text |
117 | 117 | password: Optional[str] = None, |
|
118 | 118 | schema: Optional[str] = None, |
|
119 | 119 | port: Optional[int] = None, |
|
120 | - | extra: Optional[str] = None, |
|
120 | + | extra: Optional[Union[str, dict]] = None, |
|
121 | 121 | uri: Optional[str] = None, |
|
122 | 122 | ): |
|
123 | 123 | super().__init__() |
|
124 | 124 | self.conn_id = conn_id |
|
125 | 125 | self.description = description |
|
126 | + | if extra and not isinstance(extra, str): |
|
127 | + | extra = json.dumps(extra) |
|
126 | 128 | if uri and ( # pylint: disable=too-many-boolean-expressions |
|
127 | 129 | conn_type or host or login or password or schema or port or extra |
|
128 | 130 | ): |
Files | Coverage |
---|---|
airflow | 88.44% |
Project Totals (1243 files) | 88.44% |