pumpwood_communication.airflow

MicroService class to comunicate with Airflow.

  1"""MicroService class to comunicate with Airflow."""
  2import math
  3import string
  4import random
  5import datetime
  6import pandas as pd
  7import copy
  8from typing import List
  9from airflow_client import client
 10from airflow_client.client.api import dag_api
 11from airflow_client.client.api import dag_run_api
 12from airflow_client.client.model.dag_run import DAGRun
 13from airflow_client.client.api import monitoring_api
 14from pumpwood_communication.exceptions import AirflowMicroServiceException
 15
 16
 17class AirflowMicroService():
 18    """Class to facilitate interaction with Airflow API."""
 19
 20    _airflow_config = None
 21    _api_client = None
 22
 23    def __init__(self, server_url: str = None, username: str = None,
 24                 password: str = None) -> None:
 25        """
 26        Create new AirflowMicroService object.
 27
 28        Args:
 29            server_url:
 30                URL of the server that will be connected.
 31            username:
 32                Username that will be logged on.
 33            password:
 34                Variable to be converted to JSON and posted along.
 35
 36        Returns:
 37            AirflowMicroService: New AirflowMicroService object
 38        """
 39        if (server_url is not None) and (username is not None) and (
 40                password is not None):
 41            self._airflow_config = client.Configuration(
 42                host=server_url, username=username,
 43                password=password)
 44            self._api_client = client.ApiClient(self._airflow_config)
 45            self.initiated = True
 46            try:
 47                self.health_check()
 48            except Exception:
 49                msg = (
 50                    "!! AirflowMicroService initiated, but health "
 51                    "check failed !!")
 52                print(msg)
 53        else:
 54            self.initiated = False
 55
 56    def init(self, server_url: str, username: str, password: str) -> None:
 57        """
 58        Init AirflowMicroService object.
 59
 60        Args:
 61            server_url:
 62                url of the server that will be connected.
 63            username:
 64                Username that will be logged on.
 65            password:
 66                Variable to be converted to JSON and posted along
 67        Returns:
 68            No return
 69        Raises:
 70            AirflowMicroServiceException: If some of the argument is None.
 71        """
 72        if (server_url is not None) and (username is not None) and (
 73                password is not None):
 74            self._airflow_config = client.Configuration(
 75                host=server_url,
 76                username=username,
 77                password=password)
 78            self._api_client = client.ApiClient(self._airflow_config)
 79            self.initiated = True
 80            try:
 81                self.health_check()
 82            except Exception:
 83                msg = (
 84                    "!! AirflowMicroService initiated, but health "
 85                    "check failed !!")
 86                print(msg)
 87        else:
 88            msg = (
 89                "AirflowMicroService object init must have server_url, "
 90                "username and password not None.")
 91            raise AirflowMicroServiceException(message=msg)
 92
 93    def health_check(self) -> None:
 94        """
 95        Test connection to Airflow API.
 96
 97        Args:
 98            No args.
 99        Raises:
100            AirflowMicroServiceException: If it is not possible to list one
101                dag using the API and return its error.
102        """
103        if not self.initiated:
104            msg = "AirflowMicroservice not initiated"
105            raise AirflowMicroServiceException(message=msg)
106
107        api_instance = monitoring_api.MonitoringApi(self._api_client)
108        try:
109            # Get instance status
110            api_response = api_instance.get_health()
111            return api_response
112        except Exception as e:
113            raise AirflowMicroServiceException(message=str(e))
114
115    def get_dag(self, dag_id: str) -> dict:
116        """
117        Get Dag information using its dag_id.
118
119        Args:
120            dag_id:
121                ID of the DAG to get information.
122
123        Returns [dict]:
124            Return a dictionary with dag information.
125        """
126        self.health_check()
127
128        dag_api_instance = dag_api.DAGApi(self._api_client)
129        try:
130            dag_info = dag_api_instance.get_dag(dag_id)
131        except Exception as e:
132            msg = "Dag id not found on Airflow, full error:\n{}".format(str(e))
133            raise AirflowMicroServiceException(msg, payload={"dag_id": dag_id})
134        return dag_info
135
136    def list_dags(self, only_active: bool = True, tags: List[str] = [],
137                  max_results: int = math.inf):
138        """
139        List all dags on Airflow.
140
141        Args:
142            only_active:
143                List only active DAGs.
144            tags:
145                Filter DASs using tags.
146            max_results:
147                Limit query results.
148        """
149        self.health_check()
150
151        dag_api_instance = dag_api.DAGApi(self._api_client)
152
153        offset = 0
154        list_all_dags = []
155        while True:
156            dags = dag_api_instance.get_dags(
157                limit=100, offset=offset, tags=tags,
158                only_active=only_active,
159                order_by="-next_dagrun")["dags"]
160
161            # Check if all results were fetched
162            if len(dags) == 0:
163                break
164
165            list_all_dags.extend(dags)
166            offset = len(list_all_dags)
167
168            # Check if fetched have passed max_results
169            if max_results <= len(list_all_dags):
170                break
171        return [x.to_dict() for x in list_all_dags]
172
173    def run_dag(self, dag_id: str, arguments: dict = {},
174                paused_raise_error: bool = True,
175                dag_run_id: str = None,
176                dag_run_id_sufix: str = None) -> dict:
177        """
178        Run an Airflow DAG passing arguments as arguments.
179
180        Args:
181            dag_id:
182                Dag id that will called.
183            arguments:
184                Dictionary with arguments to be passed to dag run on Airflow
185                as conf.
186            paused_raise_error:
187                Raise error if DAG is paused or inactive at the moment it is
188                asked to run.
189
190        Return [dict]:
191            Return dictionary with dag run information.
192
193        Raise:
194            AirflowMicroServiceException: If DAG not found.
195            AirflowMicroServiceException: If DAG is inactive.
196            AirflowMicroServiceException: If DAG paused and
197                paused_raise_error=True.
198            AirflowMicroServiceException: If other exception when asking DAG
199                to run on Airflow.
200        """
201        self.health_check()
202
203        # Checking on DAG object
204        dag_info = self.get_dag(dag_id)
205        if not dag_info["is_active"]:
206            msg = "DAG [{}] is not active".format(dag_id)
207            raise AirflowMicroServiceException(
208                message=msg, payload={"dag_id": dag_id})
209        if dag_info["is_paused"] and paused_raise_error:
210            msg = "DAG [{}] is paused and paused_raise_error=True".format(
211                dag_id)
212            raise AirflowMicroServiceException(
213                message=msg, payload={"dag_id": dag_id})
214
215        # Running DAG
216        if dag_run_id is None:
217            now_str = datetime.datetime.now().isoformat()
218            random_letters = "".join(
219                random.choices(string.ascii_uppercase, k=12))
220            dag_run_id = (
221                "{time}__{random_letters}").format(
222                    time=now_str, random_letters=random_letters)
223
224        if dag_run_id_sufix is None:
225            dag_run_id = dag_run_id + "__dag_id[{}]".format(dag_id)
226        else:
227            if dag_run_id_sufix != "":
228                dag_run_id = dag_run_id + "__" + dag_run_id_sufix
229
230        # Create dag run object and sent it to Airflow queue
231        dag_run_id = DAGRun(dag_run_id=dag_run_id, conf=arguments)
232        dagrun_api_instance = dag_run_api.DAGRunApi(self._api_client)
233        dagrun_result = dagrun_api_instance.post_dag_run(dag_id, dag_run_id)
234        return dagrun_result.to_dict()
235
236    def list_dag_runs(self, dag_id: str,
237                      limit: int = 100,
238                      execution_date_gte: str = None,
239                      execution_date_lte: str = None,
240                      start_date_gte: str = None,
241                      start_date_lte: str = None,
242                      end_date_gte: str = None,
243                      end_date_lte: str = None,
244                      state: list = None,
245                      order_by: str = "-execution_date") -> list:
246        """
247        List dag runs ordered inverted to creation time.
248
249        Args:
250            dag_id:
251                Id of the dag to list dag runs.
252            limit:
253                Limit the number of dag runs to be returned.
254            execution_date_gte:
255                Query parameters.
256            execution_date_lte:
257                Query parameters.
258            start_date_gte:
259                Query parameters.
260            start_date_lte:
261                Query parameters.
262            end_date_gte:
263                Query parameters.
264            end_date_lte:
265                Query parameters.
266            state:
267                Query parameters.
268            order_by:
269                Query parameters.
270        Return [list]:
271            Return DAG run associated with ETLJob DAG.
272        """
273        self.health_check()
274
275        api_instance = dag_run_api.DAGRunApi(self._api_client)
276        get_dag_runs_args = {
277            "limit": 100,
278            "dag_id": dag_id,
279            "order_by": order_by
280        }
281
282        if execution_date_gte is not None:
283            if type(execution_date_gte) is str:
284                execution_date_gte = pd.to_datetime(
285                    execution_date_gte).to_pydatetime()
286            get_dag_runs_args["execution_date_gte"] = execution_date_gte
287
288        if execution_date_lte is not None:
289            if type(execution_date_lte) is str:
290                execution_date_lte = pd.to_datetime(
291                    execution_date_lte).to_pydatetime()
292            get_dag_runs_args["execution_date_lte"] = execution_date_lte
293
294        if start_date_gte is not None:
295            if type(start_date_gte) is str:
296                start_date_gte = pd.to_datetime(
297                    start_date_gte).to_pydatetime()
298            get_dag_runs_args["start_date_gte"] = start_date_gte
299
300        if start_date_lte is not None:
301            if type(start_date_lte) is str:
302                start_date_lte = pd.to_datetime(
303                    start_date_lte).to_pydatetime()
304            get_dag_runs_args["start_date_lte"] = start_date_lte
305
306        if end_date_gte is not None:
307            if type(end_date_gte) is str:
308                end_date_gte = pd.to_datetime(
309                    end_date_gte).to_pydatetime()
310            get_dag_runs_args["end_date_gte"] = end_date_gte
311
312        if end_date_lte is not None:
313            if type(end_date_lte) is str:
314                end_date_lte = pd.to_datetime(
315                    end_date_lte).to_pydatetime()
316            get_dag_runs_args["end_date_lte"] = end_date_lte
317
318        if state is not None:
319            get_dag_runs_args["state"] = state
320
321        offset = 0
322        all_results = []
323        while True:
324            temp_get_dag_runs_args = copy.deepcopy(get_dag_runs_args)
325            temp_get_dag_runs_args["offset"] = offset
326            results = api_instance.get_dag_runs(
327                **temp_get_dag_runs_args)['dag_runs']
328
329            if len(results) == 0:
330                break
331
332            all_results.extend(results)
333            offset = len(all_results)
334
335            if limit <= len(all_results):
336                break
337        return [x.to_dict() for x in all_results]
338
339    def get_dag_run(self, dag_id: str, dag_run_id: str) -> dict:
340        """
341        Get DAG run information.
342
343        Args:
344            dag_id:
345                Identification of the DAG.
346            dag_run_id:
347                Identification of the DAG run.
348        Kwargs:
349            No Kwargs.
350        Return [dict]:
351            Serialized DAG run information.
352        """
353        self.health_check()
354
355        api_instance = dag_run_api.DAGRunApi(self._api_client)
356        try:
357            return api_instance.get_dag_run(dag_id, dag_run_id).to_dict()
358        except client.ApiException as e:
359            msg = (
360                "If was not possible to find dag run with dag_id[{dag_id}]"
361                "and dag_run_id[{dag_run_id}]. Error:\n{error}").format(
362                    dag_id=dag_id, dag_run_id=dag_run_id, error=str(e))
363            raise AirflowMicroServiceException(
364                message=msg, payload={
365                    "dag_id": dag_id, "dag_run_id": dag_run_id})
366            print("Exception when calling DAGRunApi->get_dag_run: %s\n" % e)
class AirflowMicroService:
 18class AirflowMicroService():
 19    """Class to facilitate interaction with Airflow API."""
 20
 21    _airflow_config = None
 22    _api_client = None
 23
 24    def __init__(self, server_url: str = None, username: str = None,
 25                 password: str = None) -> None:
 26        """
 27        Create new AirflowMicroService object.
 28
 29        Args:
 30            server_url:
 31                URL of the server that will be connected.
 32            username:
 33                Username that will be logged on.
 34            password:
 35                Variable to be converted to JSON and posted along.
 36
 37        Returns:
 38            AirflowMicroService: New AirflowMicroService object
 39        """
 40        if (server_url is not None) and (username is not None) and (
 41                password is not None):
 42            self._airflow_config = client.Configuration(
 43                host=server_url, username=username,
 44                password=password)
 45            self._api_client = client.ApiClient(self._airflow_config)
 46            self.initiated = True
 47            try:
 48                self.health_check()
 49            except Exception:
 50                msg = (
 51                    "!! AirflowMicroService initiated, but health "
 52                    "check failed !!")
 53                print(msg)
 54        else:
 55            self.initiated = False
 56
 57    def init(self, server_url: str, username: str, password: str) -> None:
 58        """
 59        Init AirflowMicroService object.
 60
 61        Args:
 62            server_url:
 63                url of the server that will be connected.
 64            username:
 65                Username that will be logged on.
 66            password:
 67                Variable to be converted to JSON and posted along
 68        Returns:
 69            No return
 70        Raises:
 71            AirflowMicroServiceException: If some of the argument is None.
 72        """
 73        if (server_url is not None) and (username is not None) and (
 74                password is not None):
 75            self._airflow_config = client.Configuration(
 76                host=server_url,
 77                username=username,
 78                password=password)
 79            self._api_client = client.ApiClient(self._airflow_config)
 80            self.initiated = True
 81            try:
 82                self.health_check()
 83            except Exception:
 84                msg = (
 85                    "!! AirflowMicroService initiated, but health "
 86                    "check failed !!")
 87                print(msg)
 88        else:
 89            msg = (
 90                "AirflowMicroService object init must have server_url, "
 91                "username and password not None.")
 92            raise AirflowMicroServiceException(message=msg)
 93
 94    def health_check(self) -> None:
 95        """
 96        Test connection to Airflow API.
 97
 98        Args:
 99            No args.
100        Raises:
101            AirflowMicroServiceException: If it is not possible to list one
102                dag using the API and return its error.
103        """
104        if not self.initiated:
105            msg = "AirflowMicroservice not initiated"
106            raise AirflowMicroServiceException(message=msg)
107
108        api_instance = monitoring_api.MonitoringApi(self._api_client)
109        try:
110            # Get instance status
111            api_response = api_instance.get_health()
112            return api_response
113        except Exception as e:
114            raise AirflowMicroServiceException(message=str(e))
115
116    def get_dag(self, dag_id: str) -> dict:
117        """
118        Get Dag information using its dag_id.
119
120        Args:
121            dag_id:
122                ID of the DAG to get information.
123
124        Returns [dict]:
125            Return a dictionary with dag information.
126        """
127        self.health_check()
128
129        dag_api_instance = dag_api.DAGApi(self._api_client)
130        try:
131            dag_info = dag_api_instance.get_dag(dag_id)
132        except Exception as e:
133            msg = "Dag id not found on Airflow, full error:\n{}".format(str(e))
134            raise AirflowMicroServiceException(msg, payload={"dag_id": dag_id})
135        return dag_info
136
137    def list_dags(self, only_active: bool = True, tags: List[str] = [],
138                  max_results: int = math.inf):
139        """
140        List all dags on Airflow.
141
142        Args:
143            only_active:
144                List only active DAGs.
145            tags:
146                Filter DASs using tags.
147            max_results:
148                Limit query results.
149        """
150        self.health_check()
151
152        dag_api_instance = dag_api.DAGApi(self._api_client)
153
154        offset = 0
155        list_all_dags = []
156        while True:
157            dags = dag_api_instance.get_dags(
158                limit=100, offset=offset, tags=tags,
159                only_active=only_active,
160                order_by="-next_dagrun")["dags"]
161
162            # Check if all results were fetched
163            if len(dags) == 0:
164                break
165
166            list_all_dags.extend(dags)
167            offset = len(list_all_dags)
168
169            # Check if fetched have passed max_results
170            if max_results <= len(list_all_dags):
171                break
172        return [x.to_dict() for x in list_all_dags]
173
174    def run_dag(self, dag_id: str, arguments: dict = {},
175                paused_raise_error: bool = True,
176                dag_run_id: str = None,
177                dag_run_id_sufix: str = None) -> dict:
178        """
179        Run an Airflow DAG passing arguments as arguments.
180
181        Args:
182            dag_id:
183                Dag id that will called.
184            arguments:
185                Dictionary with arguments to be passed to dag run on Airflow
186                as conf.
187            paused_raise_error:
188                Raise error if DAG is paused or inactive at the moment it is
189                asked to run.
190
191        Return [dict]:
192            Return dictionary with dag run information.
193
194        Raise:
195            AirflowMicroServiceException: If DAG not found.
196            AirflowMicroServiceException: If DAG is inactive.
197            AirflowMicroServiceException: If DAG paused and
198                paused_raise_error=True.
199            AirflowMicroServiceException: If other exception when asking DAG
200                to run on Airflow.
201        """
202        self.health_check()
203
204        # Checking on DAG object
205        dag_info = self.get_dag(dag_id)
206        if not dag_info["is_active"]:
207            msg = "DAG [{}] is not active".format(dag_id)
208            raise AirflowMicroServiceException(
209                message=msg, payload={"dag_id": dag_id})
210        if dag_info["is_paused"] and paused_raise_error:
211            msg = "DAG [{}] is paused and paused_raise_error=True".format(
212                dag_id)
213            raise AirflowMicroServiceException(
214                message=msg, payload={"dag_id": dag_id})
215
216        # Running DAG
217        if dag_run_id is None:
218            now_str = datetime.datetime.now().isoformat()
219            random_letters = "".join(
220                random.choices(string.ascii_uppercase, k=12))
221            dag_run_id = (
222                "{time}__{random_letters}").format(
223                    time=now_str, random_letters=random_letters)
224
225        if dag_run_id_sufix is None:
226            dag_run_id = dag_run_id + "__dag_id[{}]".format(dag_id)
227        else:
228            if dag_run_id_sufix != "":
229                dag_run_id = dag_run_id + "__" + dag_run_id_sufix
230
231        # Create dag run object and sent it to Airflow queue
232        dag_run_id = DAGRun(dag_run_id=dag_run_id, conf=arguments)
233        dagrun_api_instance = dag_run_api.DAGRunApi(self._api_client)
234        dagrun_result = dagrun_api_instance.post_dag_run(dag_id, dag_run_id)
235        return dagrun_result.to_dict()
236
237    def list_dag_runs(self, dag_id: str,
238                      limit: int = 100,
239                      execution_date_gte: str = None,
240                      execution_date_lte: str = None,
241                      start_date_gte: str = None,
242                      start_date_lte: str = None,
243                      end_date_gte: str = None,
244                      end_date_lte: str = None,
245                      state: list = None,
246                      order_by: str = "-execution_date") -> list:
247        """
248        List dag runs ordered inverted to creation time.
249
250        Args:
251            dag_id:
252                Id of the dag to list dag runs.
253            limit:
254                Limit the number of dag runs to be returned.
255            execution_date_gte:
256                Query parameters.
257            execution_date_lte:
258                Query parameters.
259            start_date_gte:
260                Query parameters.
261            start_date_lte:
262                Query parameters.
263            end_date_gte:
264                Query parameters.
265            end_date_lte:
266                Query parameters.
267            state:
268                Query parameters.
269            order_by:
270                Query parameters.
271        Return [list]:
272            Return DAG run associated with ETLJob DAG.
273        """
274        self.health_check()
275
276        api_instance = dag_run_api.DAGRunApi(self._api_client)
277        get_dag_runs_args = {
278            "limit": 100,
279            "dag_id": dag_id,
280            "order_by": order_by
281        }
282
283        if execution_date_gte is not None:
284            if type(execution_date_gte) is str:
285                execution_date_gte = pd.to_datetime(
286                    execution_date_gte).to_pydatetime()
287            get_dag_runs_args["execution_date_gte"] = execution_date_gte
288
289        if execution_date_lte is not None:
290            if type(execution_date_lte) is str:
291                execution_date_lte = pd.to_datetime(
292                    execution_date_lte).to_pydatetime()
293            get_dag_runs_args["execution_date_lte"] = execution_date_lte
294
295        if start_date_gte is not None:
296            if type(start_date_gte) is str:
297                start_date_gte = pd.to_datetime(
298                    start_date_gte).to_pydatetime()
299            get_dag_runs_args["start_date_gte"] = start_date_gte
300
301        if start_date_lte is not None:
302            if type(start_date_lte) is str:
303                start_date_lte = pd.to_datetime(
304                    start_date_lte).to_pydatetime()
305            get_dag_runs_args["start_date_lte"] = start_date_lte
306
307        if end_date_gte is not None:
308            if type(end_date_gte) is str:
309                end_date_gte = pd.to_datetime(
310                    end_date_gte).to_pydatetime()
311            get_dag_runs_args["end_date_gte"] = end_date_gte
312
313        if end_date_lte is not None:
314            if type(end_date_lte) is str:
315                end_date_lte = pd.to_datetime(
316                    end_date_lte).to_pydatetime()
317            get_dag_runs_args["end_date_lte"] = end_date_lte
318
319        if state is not None:
320            get_dag_runs_args["state"] = state
321
322        offset = 0
323        all_results = []
324        while True:
325            temp_get_dag_runs_args = copy.deepcopy(get_dag_runs_args)
326            temp_get_dag_runs_args["offset"] = offset
327            results = api_instance.get_dag_runs(
328                **temp_get_dag_runs_args)['dag_runs']
329
330            if len(results) == 0:
331                break
332
333            all_results.extend(results)
334            offset = len(all_results)
335
336            if limit <= len(all_results):
337                break
338        return [x.to_dict() for x in all_results]
339
340    def get_dag_run(self, dag_id: str, dag_run_id: str) -> dict:
341        """
342        Get DAG run information.
343
344        Args:
345            dag_id:
346                Identification of the DAG.
347            dag_run_id:
348                Identification of the DAG run.
349        Kwargs:
350            No Kwargs.
351        Return [dict]:
352            Serialized DAG run information.
353        """
354        self.health_check()
355
356        api_instance = dag_run_api.DAGRunApi(self._api_client)
357        try:
358            return api_instance.get_dag_run(dag_id, dag_run_id).to_dict()
359        except client.ApiException as e:
360            msg = (
361                "If was not possible to find dag run with dag_id[{dag_id}]"
362                "and dag_run_id[{dag_run_id}]. Error:\n{error}").format(
363                    dag_id=dag_id, dag_run_id=dag_run_id, error=str(e))
364            raise AirflowMicroServiceException(
365                message=msg, payload={
366                    "dag_id": dag_id, "dag_run_id": dag_run_id})
367            print("Exception when calling DAGRunApi->get_dag_run: %s\n" % e)

Class to facilitate interaction with Airflow API.

AirflowMicroService(server_url: str = None, username: str = None, password: str = None)
24    def __init__(self, server_url: str = None, username: str = None,
25                 password: str = None) -> None:
26        """
27        Create new AirflowMicroService object.
28
29        Args:
30            server_url:
31                URL of the server that will be connected.
32            username:
33                Username that will be logged on.
34            password:
35                Variable to be converted to JSON and posted along.
36
37        Returns:
38            AirflowMicroService: New AirflowMicroService object
39        """
40        if (server_url is not None) and (username is not None) and (
41                password is not None):
42            self._airflow_config = client.Configuration(
43                host=server_url, username=username,
44                password=password)
45            self._api_client = client.ApiClient(self._airflow_config)
46            self.initiated = True
47            try:
48                self.health_check()
49            except Exception:
50                msg = (
51                    "!! AirflowMicroService initiated, but health "
52                    "check failed !!")
53                print(msg)
54        else:
55            self.initiated = False

Create new AirflowMicroService object.

Arguments:
  • server_url: URL of the server that will be connected.
  • username: Username that will be logged on.
  • password: Variable to be converted to JSON and posted along.
Returns:

AirflowMicroService: New AirflowMicroService object

def init(self, server_url: str, username: str, password: str) -> None:
57    def init(self, server_url: str, username: str, password: str) -> None:
58        """
59        Init AirflowMicroService object.
60
61        Args:
62            server_url:
63                url of the server that will be connected.
64            username:
65                Username that will be logged on.
66            password:
67                Variable to be converted to JSON and posted along
68        Returns:
69            No return
70        Raises:
71            AirflowMicroServiceException: If some of the argument is None.
72        """
73        if (server_url is not None) and (username is not None) and (
74                password is not None):
75            self._airflow_config = client.Configuration(
76                host=server_url,
77                username=username,
78                password=password)
79            self._api_client = client.ApiClient(self._airflow_config)
80            self.initiated = True
81            try:
82                self.health_check()
83            except Exception:
84                msg = (
85                    "!! AirflowMicroService initiated, but health "
86                    "check failed !!")
87                print(msg)
88        else:
89            msg = (
90                "AirflowMicroService object init must have server_url, "
91                "username and password not None.")
92            raise AirflowMicroServiceException(message=msg)

Init AirflowMicroService object.

Arguments:
  • server_url: url of the server that will be connected.
  • username: Username that will be logged on.
  • password: Variable to be converted to JSON and posted along
Returns:

No return

Raises:
  • AirflowMicroServiceException: If some of the argument is None.
def health_check(self) -> None:
 94    def health_check(self) -> None:
 95        """
 96        Test connection to Airflow API.
 97
 98        Args:
 99            No args.
100        Raises:
101            AirflowMicroServiceException: If it is not possible to list one
102                dag using the API and return its error.
103        """
104        if not self.initiated:
105            msg = "AirflowMicroservice not initiated"
106            raise AirflowMicroServiceException(message=msg)
107
108        api_instance = monitoring_api.MonitoringApi(self._api_client)
109        try:
110            # Get instance status
111            api_response = api_instance.get_health()
112            return api_response
113        except Exception as e:
114            raise AirflowMicroServiceException(message=str(e))

Test connection to Airflow API.

Arguments:
  • No args.
Raises:
  • AirflowMicroServiceException: If it is not possible to list one dag using the API and return its error.
def get_dag(self, dag_id: str) -> dict:
116    def get_dag(self, dag_id: str) -> dict:
117        """
118        Get Dag information using its dag_id.
119
120        Args:
121            dag_id:
122                ID of the DAG to get information.
123
124        Returns [dict]:
125            Return a dictionary with dag information.
126        """
127        self.health_check()
128
129        dag_api_instance = dag_api.DAGApi(self._api_client)
130        try:
131            dag_info = dag_api_instance.get_dag(dag_id)
132        except Exception as e:
133            msg = "Dag id not found on Airflow, full error:\n{}".format(str(e))
134            raise AirflowMicroServiceException(msg, payload={"dag_id": dag_id})
135        return dag_info

Get Dag information using its dag_id.

Arguments:
  • dag_id: ID of the DAG to get information.

Returns [dict]: Return a dictionary with dag information.

def list_dags( self, only_active: bool = True, tags: List[str] = [], max_results: int = inf):
137    def list_dags(self, only_active: bool = True, tags: List[str] = [],
138                  max_results: int = math.inf):
139        """
140        List all dags on Airflow.
141
142        Args:
143            only_active:
144                List only active DAGs.
145            tags:
146                Filter DASs using tags.
147            max_results:
148                Limit query results.
149        """
150        self.health_check()
151
152        dag_api_instance = dag_api.DAGApi(self._api_client)
153
154        offset = 0
155        list_all_dags = []
156        while True:
157            dags = dag_api_instance.get_dags(
158                limit=100, offset=offset, tags=tags,
159                only_active=only_active,
160                order_by="-next_dagrun")["dags"]
161
162            # Check if all results were fetched
163            if len(dags) == 0:
164                break
165
166            list_all_dags.extend(dags)
167            offset = len(list_all_dags)
168
169            # Check if fetched have passed max_results
170            if max_results <= len(list_all_dags):
171                break
172        return [x.to_dict() for x in list_all_dags]

List all dags on Airflow.

Arguments:
  • only_active: List only active DAGs.
  • tags: Filter DASs using tags.
  • max_results: Limit query results.
def run_dag( self, dag_id: str, arguments: dict = {}, paused_raise_error: bool = True, dag_run_id: str = None, dag_run_id_sufix: str = None) -> dict:
174    def run_dag(self, dag_id: str, arguments: dict = {},
175                paused_raise_error: bool = True,
176                dag_run_id: str = None,
177                dag_run_id_sufix: str = None) -> dict:
178        """
179        Run an Airflow DAG passing arguments as arguments.
180
181        Args:
182            dag_id:
183                Dag id that will called.
184            arguments:
185                Dictionary with arguments to be passed to dag run on Airflow
186                as conf.
187            paused_raise_error:
188                Raise error if DAG is paused or inactive at the moment it is
189                asked to run.
190
191        Return [dict]:
192            Return dictionary with dag run information.
193
194        Raise:
195            AirflowMicroServiceException: If DAG not found.
196            AirflowMicroServiceException: If DAG is inactive.
197            AirflowMicroServiceException: If DAG paused and
198                paused_raise_error=True.
199            AirflowMicroServiceException: If other exception when asking DAG
200                to run on Airflow.
201        """
202        self.health_check()
203
204        # Checking on DAG object
205        dag_info = self.get_dag(dag_id)
206        if not dag_info["is_active"]:
207            msg = "DAG [{}] is not active".format(dag_id)
208            raise AirflowMicroServiceException(
209                message=msg, payload={"dag_id": dag_id})
210        if dag_info["is_paused"] and paused_raise_error:
211            msg = "DAG [{}] is paused and paused_raise_error=True".format(
212                dag_id)
213            raise AirflowMicroServiceException(
214                message=msg, payload={"dag_id": dag_id})
215
216        # Running DAG
217        if dag_run_id is None:
218            now_str = datetime.datetime.now().isoformat()
219            random_letters = "".join(
220                random.choices(string.ascii_uppercase, k=12))
221            dag_run_id = (
222                "{time}__{random_letters}").format(
223                    time=now_str, random_letters=random_letters)
224
225        if dag_run_id_sufix is None:
226            dag_run_id = dag_run_id + "__dag_id[{}]".format(dag_id)
227        else:
228            if dag_run_id_sufix != "":
229                dag_run_id = dag_run_id + "__" + dag_run_id_sufix
230
231        # Create dag run object and sent it to Airflow queue
232        dag_run_id = DAGRun(dag_run_id=dag_run_id, conf=arguments)
233        dagrun_api_instance = dag_run_api.DAGRunApi(self._api_client)
234        dagrun_result = dagrun_api_instance.post_dag_run(dag_id, dag_run_id)
235        return dagrun_result.to_dict()

Run an Airflow DAG passing arguments as arguments.

Arguments:
  • dag_id: Dag id that will called.
  • arguments: Dictionary with arguments to be passed to dag run on Airflow as conf.
  • paused_raise_error: Raise error if DAG is paused or inactive at the moment it is asked to run.

Return [dict]: Return dictionary with dag run information.

Raise:

AirflowMicroServiceException: If DAG not found. AirflowMicroServiceException: If DAG is inactive. AirflowMicroServiceException: If DAG paused and paused_raise_error=True. AirflowMicroServiceException: If other exception when asking DAG to run on Airflow.

def list_dag_runs( self, dag_id: str, limit: int = 100, execution_date_gte: str = None, execution_date_lte: str = None, start_date_gte: str = None, start_date_lte: str = None, end_date_gte: str = None, end_date_lte: str = None, state: list = None, order_by: str = '-execution_date') -> list:
237    def list_dag_runs(self, dag_id: str,
238                      limit: int = 100,
239                      execution_date_gte: str = None,
240                      execution_date_lte: str = None,
241                      start_date_gte: str = None,
242                      start_date_lte: str = None,
243                      end_date_gte: str = None,
244                      end_date_lte: str = None,
245                      state: list = None,
246                      order_by: str = "-execution_date") -> list:
247        """
248        List dag runs ordered inverted to creation time.
249
250        Args:
251            dag_id:
252                Id of the dag to list dag runs.
253            limit:
254                Limit the number of dag runs to be returned.
255            execution_date_gte:
256                Query parameters.
257            execution_date_lte:
258                Query parameters.
259            start_date_gte:
260                Query parameters.
261            start_date_lte:
262                Query parameters.
263            end_date_gte:
264                Query parameters.
265            end_date_lte:
266                Query parameters.
267            state:
268                Query parameters.
269            order_by:
270                Query parameters.
271        Return [list]:
272            Return DAG run associated with ETLJob DAG.
273        """
274        self.health_check()
275
276        api_instance = dag_run_api.DAGRunApi(self._api_client)
277        get_dag_runs_args = {
278            "limit": 100,
279            "dag_id": dag_id,
280            "order_by": order_by
281        }
282
283        if execution_date_gte is not None:
284            if type(execution_date_gte) is str:
285                execution_date_gte = pd.to_datetime(
286                    execution_date_gte).to_pydatetime()
287            get_dag_runs_args["execution_date_gte"] = execution_date_gte
288
289        if execution_date_lte is not None:
290            if type(execution_date_lte) is str:
291                execution_date_lte = pd.to_datetime(
292                    execution_date_lte).to_pydatetime()
293            get_dag_runs_args["execution_date_lte"] = execution_date_lte
294
295        if start_date_gte is not None:
296            if type(start_date_gte) is str:
297                start_date_gte = pd.to_datetime(
298                    start_date_gte).to_pydatetime()
299            get_dag_runs_args["start_date_gte"] = start_date_gte
300
301        if start_date_lte is not None:
302            if type(start_date_lte) is str:
303                start_date_lte = pd.to_datetime(
304                    start_date_lte).to_pydatetime()
305            get_dag_runs_args["start_date_lte"] = start_date_lte
306
307        if end_date_gte is not None:
308            if type(end_date_gte) is str:
309                end_date_gte = pd.to_datetime(
310                    end_date_gte).to_pydatetime()
311            get_dag_runs_args["end_date_gte"] = end_date_gte
312
313        if end_date_lte is not None:
314            if type(end_date_lte) is str:
315                end_date_lte = pd.to_datetime(
316                    end_date_lte).to_pydatetime()
317            get_dag_runs_args["end_date_lte"] = end_date_lte
318
319        if state is not None:
320            get_dag_runs_args["state"] = state
321
322        offset = 0
323        all_results = []
324        while True:
325            temp_get_dag_runs_args = copy.deepcopy(get_dag_runs_args)
326            temp_get_dag_runs_args["offset"] = offset
327            results = api_instance.get_dag_runs(
328                **temp_get_dag_runs_args)['dag_runs']
329
330            if len(results) == 0:
331                break
332
333            all_results.extend(results)
334            offset = len(all_results)
335
336            if limit <= len(all_results):
337                break
338        return [x.to_dict() for x in all_results]

List dag runs ordered inverted to creation time.

Arguments:
  • dag_id: Id of the dag to list dag runs.
  • limit: Limit the number of dag runs to be returned.
  • execution_date_gte: Query parameters.
  • execution_date_lte: Query parameters.
  • start_date_gte: Query parameters.
  • start_date_lte: Query parameters.
  • end_date_gte: Query parameters.
  • end_date_lte: Query parameters.
  • state: Query parameters.
  • order_by: Query parameters.

Return [list]: Return DAG run associated with ETLJob DAG.

def get_dag_run(self, dag_id: str, dag_run_id: str) -> dict:
340    def get_dag_run(self, dag_id: str, dag_run_id: str) -> dict:
341        """
342        Get DAG run information.
343
344        Args:
345            dag_id:
346                Identification of the DAG.
347            dag_run_id:
348                Identification of the DAG run.
349        Kwargs:
350            No Kwargs.
351        Return [dict]:
352            Serialized DAG run information.
353        """
354        self.health_check()
355
356        api_instance = dag_run_api.DAGRunApi(self._api_client)
357        try:
358            return api_instance.get_dag_run(dag_id, dag_run_id).to_dict()
359        except client.ApiException as e:
360            msg = (
361                "If was not possible to find dag run with dag_id[{dag_id}]"
362                "and dag_run_id[{dag_run_id}]. Error:\n{error}").format(
363                    dag_id=dag_id, dag_run_id=dag_run_id, error=str(e))
364            raise AirflowMicroServiceException(
365                message=msg, payload={
366                    "dag_id": dag_id, "dag_run_id": dag_run_id})
367            print("Exception when calling DAGRunApi->get_dag_run: %s\n" % e)

Get DAG run information.

Arguments:
  • dag_id: Identification of the DAG.
  • dag_run_id: Identification of the DAG run.
Kwargs:

No Kwargs.

Return [dict]: Serialized DAG run information.