pumpwood_communication.airflow
MicroService class to comunicate with Airflow.
1"""MicroService class to comunicate with Airflow.""" 2import math 3import string 4import random 5import datetime 6import pandas as pd 7import copy 8from typing import List 9from airflow_client import client 10from airflow_client.client.api import dag_api 11from airflow_client.client.api import dag_run_api 12from airflow_client.client.model.dag_run import DAGRun 13from airflow_client.client.api import monitoring_api 14from pumpwood_communication.exceptions import AirflowMicroServiceException 15 16 17class AirflowMicroService(): 18 """Class to facilitate interaction with Airflow API.""" 19 20 _airflow_config = None 21 _api_client = None 22 23 def __init__(self, server_url: str = None, username: str = None, 24 password: str = None) -> None: 25 """ 26 Create new AirflowMicroService object. 27 28 Args: 29 server_url: 30 URL of the server that will be connected. 31 username: 32 Username that will be logged on. 33 password: 34 Variable to be converted to JSON and posted along. 35 36 Returns: 37 AirflowMicroService: New AirflowMicroService object 38 """ 39 if (server_url is not None) and (username is not None) and ( 40 password is not None): 41 self._airflow_config = client.Configuration( 42 host=server_url, username=username, 43 password=password) 44 self._api_client = client.ApiClient(self._airflow_config) 45 self.initiated = True 46 try: 47 self.health_check() 48 except Exception: 49 msg = ( 50 "!! AirflowMicroService initiated, but health " 51 "check failed !!") 52 print(msg) 53 else: 54 self.initiated = False 55 56 def init(self, server_url: str, username: str, password: str) -> None: 57 """ 58 Init AirflowMicroService object. 59 60 Args: 61 server_url: 62 url of the server that will be connected. 63 username: 64 Username that will be logged on. 65 password: 66 Variable to be converted to JSON and posted along 67 Returns: 68 No return 69 Raises: 70 AirflowMicroServiceException: If some of the argument is None. 71 """ 72 if (server_url is not None) and (username is not None) and ( 73 password is not None): 74 self._airflow_config = client.Configuration( 75 host=server_url, 76 username=username, 77 password=password) 78 self._api_client = client.ApiClient(self._airflow_config) 79 self.initiated = True 80 try: 81 self.health_check() 82 except Exception: 83 msg = ( 84 "!! AirflowMicroService initiated, but health " 85 "check failed !!") 86 print(msg) 87 else: 88 msg = ( 89 "AirflowMicroService object init must have server_url, " 90 "username and password not None.") 91 raise AirflowMicroServiceException(message=msg) 92 93 def health_check(self) -> None: 94 """ 95 Test connection to Airflow API. 96 97 Args: 98 No args. 99 Raises: 100 AirflowMicroServiceException: If it is not possible to list one 101 dag using the API and return its error. 102 """ 103 if not self.initiated: 104 msg = "AirflowMicroservice not initiated" 105 raise AirflowMicroServiceException(message=msg) 106 107 api_instance = monitoring_api.MonitoringApi(self._api_client) 108 try: 109 # Get instance status 110 api_response = api_instance.get_health() 111 return api_response 112 except Exception as e: 113 raise AirflowMicroServiceException(message=str(e)) 114 115 def get_dag(self, dag_id: str) -> dict: 116 """ 117 Get Dag information using its dag_id. 118 119 Args: 120 dag_id: 121 ID of the DAG to get information. 122 123 Returns [dict]: 124 Return a dictionary with dag information. 125 """ 126 self.health_check() 127 128 dag_api_instance = dag_api.DAGApi(self._api_client) 129 try: 130 dag_info = dag_api_instance.get_dag(dag_id) 131 except Exception as e: 132 msg = "Dag id not found on Airflow, full error:\n{}".format(str(e)) 133 raise AirflowMicroServiceException(msg, payload={"dag_id": dag_id}) 134 return dag_info 135 136 def list_dags(self, only_active: bool = True, tags: List[str] = [], 137 max_results: int = math.inf): 138 """ 139 List all dags on Airflow. 140 141 Args: 142 only_active: 143 List only active DAGs. 144 tags: 145 Filter DASs using tags. 146 max_results: 147 Limit query results. 148 """ 149 self.health_check() 150 151 dag_api_instance = dag_api.DAGApi(self._api_client) 152 153 offset = 0 154 list_all_dags = [] 155 while True: 156 dags = dag_api_instance.get_dags( 157 limit=100, offset=offset, tags=tags, 158 only_active=only_active, 159 order_by="-next_dagrun")["dags"] 160 161 # Check if all results were fetched 162 if len(dags) == 0: 163 break 164 165 list_all_dags.extend(dags) 166 offset = len(list_all_dags) 167 168 # Check if fetched have passed max_results 169 if max_results <= len(list_all_dags): 170 break 171 return [x.to_dict() for x in list_all_dags] 172 173 def run_dag(self, dag_id: str, arguments: dict = {}, 174 paused_raise_error: bool = True, 175 dag_run_id: str = None, 176 dag_run_id_sufix: str = None) -> dict: 177 """ 178 Run an Airflow DAG passing arguments as arguments. 179 180 Args: 181 dag_id: 182 Dag id that will called. 183 arguments: 184 Dictionary with arguments to be passed to dag run on Airflow 185 as conf. 186 paused_raise_error: 187 Raise error if DAG is paused or inactive at the moment it is 188 asked to run. 189 190 Return [dict]: 191 Return dictionary with dag run information. 192 193 Raise: 194 AirflowMicroServiceException: If DAG not found. 195 AirflowMicroServiceException: If DAG is inactive. 196 AirflowMicroServiceException: If DAG paused and 197 paused_raise_error=True. 198 AirflowMicroServiceException: If other exception when asking DAG 199 to run on Airflow. 200 """ 201 self.health_check() 202 203 # Checking on DAG object 204 dag_info = self.get_dag(dag_id) 205 if not dag_info["is_active"]: 206 msg = "DAG [{}] is not active".format(dag_id) 207 raise AirflowMicroServiceException( 208 message=msg, payload={"dag_id": dag_id}) 209 if dag_info["is_paused"] and paused_raise_error: 210 msg = "DAG [{}] is paused and paused_raise_error=True".format( 211 dag_id) 212 raise AirflowMicroServiceException( 213 message=msg, payload={"dag_id": dag_id}) 214 215 # Running DAG 216 if dag_run_id is None: 217 now_str = datetime.datetime.now().isoformat() 218 random_letters = "".join( 219 random.choices(string.ascii_uppercase, k=12)) 220 dag_run_id = ( 221 "{time}__{random_letters}").format( 222 time=now_str, random_letters=random_letters) 223 224 if dag_run_id_sufix is None: 225 dag_run_id = dag_run_id + "__dag_id[{}]".format(dag_id) 226 else: 227 if dag_run_id_sufix != "": 228 dag_run_id = dag_run_id + "__" + dag_run_id_sufix 229 230 # Create dag run object and sent it to Airflow queue 231 dag_run_id = DAGRun(dag_run_id=dag_run_id, conf=arguments) 232 dagrun_api_instance = dag_run_api.DAGRunApi(self._api_client) 233 dagrun_result = dagrun_api_instance.post_dag_run(dag_id, dag_run_id) 234 return dagrun_result.to_dict() 235 236 def list_dag_runs(self, dag_id: str, 237 limit: int = 100, 238 execution_date_gte: str = None, 239 execution_date_lte: str = None, 240 start_date_gte: str = None, 241 start_date_lte: str = None, 242 end_date_gte: str = None, 243 end_date_lte: str = None, 244 state: list = None, 245 order_by: str = "-execution_date") -> list: 246 """ 247 List dag runs ordered inverted to creation time. 248 249 Args: 250 dag_id: 251 Id of the dag to list dag runs. 252 limit: 253 Limit the number of dag runs to be returned. 254 execution_date_gte: 255 Query parameters. 256 execution_date_lte: 257 Query parameters. 258 start_date_gte: 259 Query parameters. 260 start_date_lte: 261 Query parameters. 262 end_date_gte: 263 Query parameters. 264 end_date_lte: 265 Query parameters. 266 state: 267 Query parameters. 268 order_by: 269 Query parameters. 270 Return [list]: 271 Return DAG run associated with ETLJob DAG. 272 """ 273 self.health_check() 274 275 api_instance = dag_run_api.DAGRunApi(self._api_client) 276 get_dag_runs_args = { 277 "limit": 100, 278 "dag_id": dag_id, 279 "order_by": order_by 280 } 281 282 if execution_date_gte is not None: 283 if type(execution_date_gte) is str: 284 execution_date_gte = pd.to_datetime( 285 execution_date_gte).to_pydatetime() 286 get_dag_runs_args["execution_date_gte"] = execution_date_gte 287 288 if execution_date_lte is not None: 289 if type(execution_date_lte) is str: 290 execution_date_lte = pd.to_datetime( 291 execution_date_lte).to_pydatetime() 292 get_dag_runs_args["execution_date_lte"] = execution_date_lte 293 294 if start_date_gte is not None: 295 if type(start_date_gte) is str: 296 start_date_gte = pd.to_datetime( 297 start_date_gte).to_pydatetime() 298 get_dag_runs_args["start_date_gte"] = start_date_gte 299 300 if start_date_lte is not None: 301 if type(start_date_lte) is str: 302 start_date_lte = pd.to_datetime( 303 start_date_lte).to_pydatetime() 304 get_dag_runs_args["start_date_lte"] = start_date_lte 305 306 if end_date_gte is not None: 307 if type(end_date_gte) is str: 308 end_date_gte = pd.to_datetime( 309 end_date_gte).to_pydatetime() 310 get_dag_runs_args["end_date_gte"] = end_date_gte 311 312 if end_date_lte is not None: 313 if type(end_date_lte) is str: 314 end_date_lte = pd.to_datetime( 315 end_date_lte).to_pydatetime() 316 get_dag_runs_args["end_date_lte"] = end_date_lte 317 318 if state is not None: 319 get_dag_runs_args["state"] = state 320 321 offset = 0 322 all_results = [] 323 while True: 324 temp_get_dag_runs_args = copy.deepcopy(get_dag_runs_args) 325 temp_get_dag_runs_args["offset"] = offset 326 results = api_instance.get_dag_runs( 327 **temp_get_dag_runs_args)['dag_runs'] 328 329 if len(results) == 0: 330 break 331 332 all_results.extend(results) 333 offset = len(all_results) 334 335 if limit <= len(all_results): 336 break 337 return [x.to_dict() for x in all_results] 338 339 def get_dag_run(self, dag_id: str, dag_run_id: str) -> dict: 340 """ 341 Get DAG run information. 342 343 Args: 344 dag_id: 345 Identification of the DAG. 346 dag_run_id: 347 Identification of the DAG run. 348 Kwargs: 349 No Kwargs. 350 Return [dict]: 351 Serialized DAG run information. 352 """ 353 self.health_check() 354 355 api_instance = dag_run_api.DAGRunApi(self._api_client) 356 try: 357 return api_instance.get_dag_run(dag_id, dag_run_id).to_dict() 358 except client.ApiException as e: 359 msg = ( 360 "If was not possible to find dag run with dag_id[{dag_id}]" 361 "and dag_run_id[{dag_run_id}]. Error:\n{error}").format( 362 dag_id=dag_id, dag_run_id=dag_run_id, error=str(e)) 363 raise AirflowMicroServiceException( 364 message=msg, payload={ 365 "dag_id": dag_id, "dag_run_id": dag_run_id}) 366 print("Exception when calling DAGRunApi->get_dag_run: %s\n" % e)
18class AirflowMicroService(): 19 """Class to facilitate interaction with Airflow API.""" 20 21 _airflow_config = None 22 _api_client = None 23 24 def __init__(self, server_url: str = None, username: str = None, 25 password: str = None) -> None: 26 """ 27 Create new AirflowMicroService object. 28 29 Args: 30 server_url: 31 URL of the server that will be connected. 32 username: 33 Username that will be logged on. 34 password: 35 Variable to be converted to JSON and posted along. 36 37 Returns: 38 AirflowMicroService: New AirflowMicroService object 39 """ 40 if (server_url is not None) and (username is not None) and ( 41 password is not None): 42 self._airflow_config = client.Configuration( 43 host=server_url, username=username, 44 password=password) 45 self._api_client = client.ApiClient(self._airflow_config) 46 self.initiated = True 47 try: 48 self.health_check() 49 except Exception: 50 msg = ( 51 "!! AirflowMicroService initiated, but health " 52 "check failed !!") 53 print(msg) 54 else: 55 self.initiated = False 56 57 def init(self, server_url: str, username: str, password: str) -> None: 58 """ 59 Init AirflowMicroService object. 60 61 Args: 62 server_url: 63 url of the server that will be connected. 64 username: 65 Username that will be logged on. 66 password: 67 Variable to be converted to JSON and posted along 68 Returns: 69 No return 70 Raises: 71 AirflowMicroServiceException: If some of the argument is None. 72 """ 73 if (server_url is not None) and (username is not None) and ( 74 password is not None): 75 self._airflow_config = client.Configuration( 76 host=server_url, 77 username=username, 78 password=password) 79 self._api_client = client.ApiClient(self._airflow_config) 80 self.initiated = True 81 try: 82 self.health_check() 83 except Exception: 84 msg = ( 85 "!! AirflowMicroService initiated, but health " 86 "check failed !!") 87 print(msg) 88 else: 89 msg = ( 90 "AirflowMicroService object init must have server_url, " 91 "username and password not None.") 92 raise AirflowMicroServiceException(message=msg) 93 94 def health_check(self) -> None: 95 """ 96 Test connection to Airflow API. 97 98 Args: 99 No args. 100 Raises: 101 AirflowMicroServiceException: If it is not possible to list one 102 dag using the API and return its error. 103 """ 104 if not self.initiated: 105 msg = "AirflowMicroservice not initiated" 106 raise AirflowMicroServiceException(message=msg) 107 108 api_instance = monitoring_api.MonitoringApi(self._api_client) 109 try: 110 # Get instance status 111 api_response = api_instance.get_health() 112 return api_response 113 except Exception as e: 114 raise AirflowMicroServiceException(message=str(e)) 115 116 def get_dag(self, dag_id: str) -> dict: 117 """ 118 Get Dag information using its dag_id. 119 120 Args: 121 dag_id: 122 ID of the DAG to get information. 123 124 Returns [dict]: 125 Return a dictionary with dag information. 126 """ 127 self.health_check() 128 129 dag_api_instance = dag_api.DAGApi(self._api_client) 130 try: 131 dag_info = dag_api_instance.get_dag(dag_id) 132 except Exception as e: 133 msg = "Dag id not found on Airflow, full error:\n{}".format(str(e)) 134 raise AirflowMicroServiceException(msg, payload={"dag_id": dag_id}) 135 return dag_info 136 137 def list_dags(self, only_active: bool = True, tags: List[str] = [], 138 max_results: int = math.inf): 139 """ 140 List all dags on Airflow. 141 142 Args: 143 only_active: 144 List only active DAGs. 145 tags: 146 Filter DASs using tags. 147 max_results: 148 Limit query results. 149 """ 150 self.health_check() 151 152 dag_api_instance = dag_api.DAGApi(self._api_client) 153 154 offset = 0 155 list_all_dags = [] 156 while True: 157 dags = dag_api_instance.get_dags( 158 limit=100, offset=offset, tags=tags, 159 only_active=only_active, 160 order_by="-next_dagrun")["dags"] 161 162 # Check if all results were fetched 163 if len(dags) == 0: 164 break 165 166 list_all_dags.extend(dags) 167 offset = len(list_all_dags) 168 169 # Check if fetched have passed max_results 170 if max_results <= len(list_all_dags): 171 break 172 return [x.to_dict() for x in list_all_dags] 173 174 def run_dag(self, dag_id: str, arguments: dict = {}, 175 paused_raise_error: bool = True, 176 dag_run_id: str = None, 177 dag_run_id_sufix: str = None) -> dict: 178 """ 179 Run an Airflow DAG passing arguments as arguments. 180 181 Args: 182 dag_id: 183 Dag id that will called. 184 arguments: 185 Dictionary with arguments to be passed to dag run on Airflow 186 as conf. 187 paused_raise_error: 188 Raise error if DAG is paused or inactive at the moment it is 189 asked to run. 190 191 Return [dict]: 192 Return dictionary with dag run information. 193 194 Raise: 195 AirflowMicroServiceException: If DAG not found. 196 AirflowMicroServiceException: If DAG is inactive. 197 AirflowMicroServiceException: If DAG paused and 198 paused_raise_error=True. 199 AirflowMicroServiceException: If other exception when asking DAG 200 to run on Airflow. 201 """ 202 self.health_check() 203 204 # Checking on DAG object 205 dag_info = self.get_dag(dag_id) 206 if not dag_info["is_active"]: 207 msg = "DAG [{}] is not active".format(dag_id) 208 raise AirflowMicroServiceException( 209 message=msg, payload={"dag_id": dag_id}) 210 if dag_info["is_paused"] and paused_raise_error: 211 msg = "DAG [{}] is paused and paused_raise_error=True".format( 212 dag_id) 213 raise AirflowMicroServiceException( 214 message=msg, payload={"dag_id": dag_id}) 215 216 # Running DAG 217 if dag_run_id is None: 218 now_str = datetime.datetime.now().isoformat() 219 random_letters = "".join( 220 random.choices(string.ascii_uppercase, k=12)) 221 dag_run_id = ( 222 "{time}__{random_letters}").format( 223 time=now_str, random_letters=random_letters) 224 225 if dag_run_id_sufix is None: 226 dag_run_id = dag_run_id + "__dag_id[{}]".format(dag_id) 227 else: 228 if dag_run_id_sufix != "": 229 dag_run_id = dag_run_id + "__" + dag_run_id_sufix 230 231 # Create dag run object and sent it to Airflow queue 232 dag_run_id = DAGRun(dag_run_id=dag_run_id, conf=arguments) 233 dagrun_api_instance = dag_run_api.DAGRunApi(self._api_client) 234 dagrun_result = dagrun_api_instance.post_dag_run(dag_id, dag_run_id) 235 return dagrun_result.to_dict() 236 237 def list_dag_runs(self, dag_id: str, 238 limit: int = 100, 239 execution_date_gte: str = None, 240 execution_date_lte: str = None, 241 start_date_gte: str = None, 242 start_date_lte: str = None, 243 end_date_gte: str = None, 244 end_date_lte: str = None, 245 state: list = None, 246 order_by: str = "-execution_date") -> list: 247 """ 248 List dag runs ordered inverted to creation time. 249 250 Args: 251 dag_id: 252 Id of the dag to list dag runs. 253 limit: 254 Limit the number of dag runs to be returned. 255 execution_date_gte: 256 Query parameters. 257 execution_date_lte: 258 Query parameters. 259 start_date_gte: 260 Query parameters. 261 start_date_lte: 262 Query parameters. 263 end_date_gte: 264 Query parameters. 265 end_date_lte: 266 Query parameters. 267 state: 268 Query parameters. 269 order_by: 270 Query parameters. 271 Return [list]: 272 Return DAG run associated with ETLJob DAG. 273 """ 274 self.health_check() 275 276 api_instance = dag_run_api.DAGRunApi(self._api_client) 277 get_dag_runs_args = { 278 "limit": 100, 279 "dag_id": dag_id, 280 "order_by": order_by 281 } 282 283 if execution_date_gte is not None: 284 if type(execution_date_gte) is str: 285 execution_date_gte = pd.to_datetime( 286 execution_date_gte).to_pydatetime() 287 get_dag_runs_args["execution_date_gte"] = execution_date_gte 288 289 if execution_date_lte is not None: 290 if type(execution_date_lte) is str: 291 execution_date_lte = pd.to_datetime( 292 execution_date_lte).to_pydatetime() 293 get_dag_runs_args["execution_date_lte"] = execution_date_lte 294 295 if start_date_gte is not None: 296 if type(start_date_gte) is str: 297 start_date_gte = pd.to_datetime( 298 start_date_gte).to_pydatetime() 299 get_dag_runs_args["start_date_gte"] = start_date_gte 300 301 if start_date_lte is not None: 302 if type(start_date_lte) is str: 303 start_date_lte = pd.to_datetime( 304 start_date_lte).to_pydatetime() 305 get_dag_runs_args["start_date_lte"] = start_date_lte 306 307 if end_date_gte is not None: 308 if type(end_date_gte) is str: 309 end_date_gte = pd.to_datetime( 310 end_date_gte).to_pydatetime() 311 get_dag_runs_args["end_date_gte"] = end_date_gte 312 313 if end_date_lte is not None: 314 if type(end_date_lte) is str: 315 end_date_lte = pd.to_datetime( 316 end_date_lte).to_pydatetime() 317 get_dag_runs_args["end_date_lte"] = end_date_lte 318 319 if state is not None: 320 get_dag_runs_args["state"] = state 321 322 offset = 0 323 all_results = [] 324 while True: 325 temp_get_dag_runs_args = copy.deepcopy(get_dag_runs_args) 326 temp_get_dag_runs_args["offset"] = offset 327 results = api_instance.get_dag_runs( 328 **temp_get_dag_runs_args)['dag_runs'] 329 330 if len(results) == 0: 331 break 332 333 all_results.extend(results) 334 offset = len(all_results) 335 336 if limit <= len(all_results): 337 break 338 return [x.to_dict() for x in all_results] 339 340 def get_dag_run(self, dag_id: str, dag_run_id: str) -> dict: 341 """ 342 Get DAG run information. 343 344 Args: 345 dag_id: 346 Identification of the DAG. 347 dag_run_id: 348 Identification of the DAG run. 349 Kwargs: 350 No Kwargs. 351 Return [dict]: 352 Serialized DAG run information. 353 """ 354 self.health_check() 355 356 api_instance = dag_run_api.DAGRunApi(self._api_client) 357 try: 358 return api_instance.get_dag_run(dag_id, dag_run_id).to_dict() 359 except client.ApiException as e: 360 msg = ( 361 "If was not possible to find dag run with dag_id[{dag_id}]" 362 "and dag_run_id[{dag_run_id}]. Error:\n{error}").format( 363 dag_id=dag_id, dag_run_id=dag_run_id, error=str(e)) 364 raise AirflowMicroServiceException( 365 message=msg, payload={ 366 "dag_id": dag_id, "dag_run_id": dag_run_id}) 367 print("Exception when calling DAGRunApi->get_dag_run: %s\n" % e)
Class to facilitate interaction with Airflow API.
24 def __init__(self, server_url: str = None, username: str = None, 25 password: str = None) -> None: 26 """ 27 Create new AirflowMicroService object. 28 29 Args: 30 server_url: 31 URL of the server that will be connected. 32 username: 33 Username that will be logged on. 34 password: 35 Variable to be converted to JSON and posted along. 36 37 Returns: 38 AirflowMicroService: New AirflowMicroService object 39 """ 40 if (server_url is not None) and (username is not None) and ( 41 password is not None): 42 self._airflow_config = client.Configuration( 43 host=server_url, username=username, 44 password=password) 45 self._api_client = client.ApiClient(self._airflow_config) 46 self.initiated = True 47 try: 48 self.health_check() 49 except Exception: 50 msg = ( 51 "!! AirflowMicroService initiated, but health " 52 "check failed !!") 53 print(msg) 54 else: 55 self.initiated = False
Create new AirflowMicroService object.
Arguments:
- server_url: URL of the server that will be connected.
- username: Username that will be logged on.
- password: Variable to be converted to JSON and posted along.
Returns:
AirflowMicroService: New AirflowMicroService object
57 def init(self, server_url: str, username: str, password: str) -> None: 58 """ 59 Init AirflowMicroService object. 60 61 Args: 62 server_url: 63 url of the server that will be connected. 64 username: 65 Username that will be logged on. 66 password: 67 Variable to be converted to JSON and posted along 68 Returns: 69 No return 70 Raises: 71 AirflowMicroServiceException: If some of the argument is None. 72 """ 73 if (server_url is not None) and (username is not None) and ( 74 password is not None): 75 self._airflow_config = client.Configuration( 76 host=server_url, 77 username=username, 78 password=password) 79 self._api_client = client.ApiClient(self._airflow_config) 80 self.initiated = True 81 try: 82 self.health_check() 83 except Exception: 84 msg = ( 85 "!! AirflowMicroService initiated, but health " 86 "check failed !!") 87 print(msg) 88 else: 89 msg = ( 90 "AirflowMicroService object init must have server_url, " 91 "username and password not None.") 92 raise AirflowMicroServiceException(message=msg)
Init AirflowMicroService object.
Arguments:
- server_url: url of the server that will be connected.
- username: Username that will be logged on.
- password: Variable to be converted to JSON and posted along
Returns:
No return
Raises:
- AirflowMicroServiceException: If some of the argument is None.
94 def health_check(self) -> None: 95 """ 96 Test connection to Airflow API. 97 98 Args: 99 No args. 100 Raises: 101 AirflowMicroServiceException: If it is not possible to list one 102 dag using the API and return its error. 103 """ 104 if not self.initiated: 105 msg = "AirflowMicroservice not initiated" 106 raise AirflowMicroServiceException(message=msg) 107 108 api_instance = monitoring_api.MonitoringApi(self._api_client) 109 try: 110 # Get instance status 111 api_response = api_instance.get_health() 112 return api_response 113 except Exception as e: 114 raise AirflowMicroServiceException(message=str(e))
Test connection to Airflow API.
Arguments:
- No args.
Raises:
- AirflowMicroServiceException: If it is not possible to list one dag using the API and return its error.
116 def get_dag(self, dag_id: str) -> dict: 117 """ 118 Get Dag information using its dag_id. 119 120 Args: 121 dag_id: 122 ID of the DAG to get information. 123 124 Returns [dict]: 125 Return a dictionary with dag information. 126 """ 127 self.health_check() 128 129 dag_api_instance = dag_api.DAGApi(self._api_client) 130 try: 131 dag_info = dag_api_instance.get_dag(dag_id) 132 except Exception as e: 133 msg = "Dag id not found on Airflow, full error:\n{}".format(str(e)) 134 raise AirflowMicroServiceException(msg, payload={"dag_id": dag_id}) 135 return dag_info
Get Dag information using its dag_id.
Arguments:
- dag_id: ID of the DAG to get information.
Returns [dict]: Return a dictionary with dag information.
137 def list_dags(self, only_active: bool = True, tags: List[str] = [], 138 max_results: int = math.inf): 139 """ 140 List all dags on Airflow. 141 142 Args: 143 only_active: 144 List only active DAGs. 145 tags: 146 Filter DASs using tags. 147 max_results: 148 Limit query results. 149 """ 150 self.health_check() 151 152 dag_api_instance = dag_api.DAGApi(self._api_client) 153 154 offset = 0 155 list_all_dags = [] 156 while True: 157 dags = dag_api_instance.get_dags( 158 limit=100, offset=offset, tags=tags, 159 only_active=only_active, 160 order_by="-next_dagrun")["dags"] 161 162 # Check if all results were fetched 163 if len(dags) == 0: 164 break 165 166 list_all_dags.extend(dags) 167 offset = len(list_all_dags) 168 169 # Check if fetched have passed max_results 170 if max_results <= len(list_all_dags): 171 break 172 return [x.to_dict() for x in list_all_dags]
List all dags on Airflow.
Arguments:
- only_active: List only active DAGs.
- tags: Filter DASs using tags.
- max_results: Limit query results.
174 def run_dag(self, dag_id: str, arguments: dict = {}, 175 paused_raise_error: bool = True, 176 dag_run_id: str = None, 177 dag_run_id_sufix: str = None) -> dict: 178 """ 179 Run an Airflow DAG passing arguments as arguments. 180 181 Args: 182 dag_id: 183 Dag id that will called. 184 arguments: 185 Dictionary with arguments to be passed to dag run on Airflow 186 as conf. 187 paused_raise_error: 188 Raise error if DAG is paused or inactive at the moment it is 189 asked to run. 190 191 Return [dict]: 192 Return dictionary with dag run information. 193 194 Raise: 195 AirflowMicroServiceException: If DAG not found. 196 AirflowMicroServiceException: If DAG is inactive. 197 AirflowMicroServiceException: If DAG paused and 198 paused_raise_error=True. 199 AirflowMicroServiceException: If other exception when asking DAG 200 to run on Airflow. 201 """ 202 self.health_check() 203 204 # Checking on DAG object 205 dag_info = self.get_dag(dag_id) 206 if not dag_info["is_active"]: 207 msg = "DAG [{}] is not active".format(dag_id) 208 raise AirflowMicroServiceException( 209 message=msg, payload={"dag_id": dag_id}) 210 if dag_info["is_paused"] and paused_raise_error: 211 msg = "DAG [{}] is paused and paused_raise_error=True".format( 212 dag_id) 213 raise AirflowMicroServiceException( 214 message=msg, payload={"dag_id": dag_id}) 215 216 # Running DAG 217 if dag_run_id is None: 218 now_str = datetime.datetime.now().isoformat() 219 random_letters = "".join( 220 random.choices(string.ascii_uppercase, k=12)) 221 dag_run_id = ( 222 "{time}__{random_letters}").format( 223 time=now_str, random_letters=random_letters) 224 225 if dag_run_id_sufix is None: 226 dag_run_id = dag_run_id + "__dag_id[{}]".format(dag_id) 227 else: 228 if dag_run_id_sufix != "": 229 dag_run_id = dag_run_id + "__" + dag_run_id_sufix 230 231 # Create dag run object and sent it to Airflow queue 232 dag_run_id = DAGRun(dag_run_id=dag_run_id, conf=arguments) 233 dagrun_api_instance = dag_run_api.DAGRunApi(self._api_client) 234 dagrun_result = dagrun_api_instance.post_dag_run(dag_id, dag_run_id) 235 return dagrun_result.to_dict()
Run an Airflow DAG passing arguments as arguments.
Arguments:
- dag_id: Dag id that will called.
- arguments: Dictionary with arguments to be passed to dag run on Airflow as conf.
- paused_raise_error: Raise error if DAG is paused or inactive at the moment it is asked to run.
Return [dict]: Return dictionary with dag run information.
Raise:
AirflowMicroServiceException: If DAG not found. AirflowMicroServiceException: If DAG is inactive. AirflowMicroServiceException: If DAG paused and paused_raise_error=True. AirflowMicroServiceException: If other exception when asking DAG to run on Airflow.
237 def list_dag_runs(self, dag_id: str, 238 limit: int = 100, 239 execution_date_gte: str = None, 240 execution_date_lte: str = None, 241 start_date_gte: str = None, 242 start_date_lte: str = None, 243 end_date_gte: str = None, 244 end_date_lte: str = None, 245 state: list = None, 246 order_by: str = "-execution_date") -> list: 247 """ 248 List dag runs ordered inverted to creation time. 249 250 Args: 251 dag_id: 252 Id of the dag to list dag runs. 253 limit: 254 Limit the number of dag runs to be returned. 255 execution_date_gte: 256 Query parameters. 257 execution_date_lte: 258 Query parameters. 259 start_date_gte: 260 Query parameters. 261 start_date_lte: 262 Query parameters. 263 end_date_gte: 264 Query parameters. 265 end_date_lte: 266 Query parameters. 267 state: 268 Query parameters. 269 order_by: 270 Query parameters. 271 Return [list]: 272 Return DAG run associated with ETLJob DAG. 273 """ 274 self.health_check() 275 276 api_instance = dag_run_api.DAGRunApi(self._api_client) 277 get_dag_runs_args = { 278 "limit": 100, 279 "dag_id": dag_id, 280 "order_by": order_by 281 } 282 283 if execution_date_gte is not None: 284 if type(execution_date_gte) is str: 285 execution_date_gte = pd.to_datetime( 286 execution_date_gte).to_pydatetime() 287 get_dag_runs_args["execution_date_gte"] = execution_date_gte 288 289 if execution_date_lte is not None: 290 if type(execution_date_lte) is str: 291 execution_date_lte = pd.to_datetime( 292 execution_date_lte).to_pydatetime() 293 get_dag_runs_args["execution_date_lte"] = execution_date_lte 294 295 if start_date_gte is not None: 296 if type(start_date_gte) is str: 297 start_date_gte = pd.to_datetime( 298 start_date_gte).to_pydatetime() 299 get_dag_runs_args["start_date_gte"] = start_date_gte 300 301 if start_date_lte is not None: 302 if type(start_date_lte) is str: 303 start_date_lte = pd.to_datetime( 304 start_date_lte).to_pydatetime() 305 get_dag_runs_args["start_date_lte"] = start_date_lte 306 307 if end_date_gte is not None: 308 if type(end_date_gte) is str: 309 end_date_gte = pd.to_datetime( 310 end_date_gte).to_pydatetime() 311 get_dag_runs_args["end_date_gte"] = end_date_gte 312 313 if end_date_lte is not None: 314 if type(end_date_lte) is str: 315 end_date_lte = pd.to_datetime( 316 end_date_lte).to_pydatetime() 317 get_dag_runs_args["end_date_lte"] = end_date_lte 318 319 if state is not None: 320 get_dag_runs_args["state"] = state 321 322 offset = 0 323 all_results = [] 324 while True: 325 temp_get_dag_runs_args = copy.deepcopy(get_dag_runs_args) 326 temp_get_dag_runs_args["offset"] = offset 327 results = api_instance.get_dag_runs( 328 **temp_get_dag_runs_args)['dag_runs'] 329 330 if len(results) == 0: 331 break 332 333 all_results.extend(results) 334 offset = len(all_results) 335 336 if limit <= len(all_results): 337 break 338 return [x.to_dict() for x in all_results]
List dag runs ordered inverted to creation time.
Arguments:
- dag_id: Id of the dag to list dag runs.
- limit: Limit the number of dag runs to be returned.
- execution_date_gte: Query parameters.
- execution_date_lte: Query parameters.
- start_date_gte: Query parameters.
- start_date_lte: Query parameters.
- end_date_gte: Query parameters.
- end_date_lte: Query parameters.
- state: Query parameters.
- order_by: Query parameters.
Return [list]: Return DAG run associated with ETLJob DAG.
340 def get_dag_run(self, dag_id: str, dag_run_id: str) -> dict: 341 """ 342 Get DAG run information. 343 344 Args: 345 dag_id: 346 Identification of the DAG. 347 dag_run_id: 348 Identification of the DAG run. 349 Kwargs: 350 No Kwargs. 351 Return [dict]: 352 Serialized DAG run information. 353 """ 354 self.health_check() 355 356 api_instance = dag_run_api.DAGRunApi(self._api_client) 357 try: 358 return api_instance.get_dag_run(dag_id, dag_run_id).to_dict() 359 except client.ApiException as e: 360 msg = ( 361 "If was not possible to find dag run with dag_id[{dag_id}]" 362 "and dag_run_id[{dag_run_id}]. Error:\n{error}").format( 363 dag_id=dag_id, dag_run_id=dag_run_id, error=str(e)) 364 raise AirflowMicroServiceException( 365 message=msg, payload={ 366 "dag_id": dag_id, "dag_run_id": dag_run_id}) 367 print("Exception when calling DAGRunApi->get_dag_run: %s\n" % e)
Get DAG run information.
Arguments:
- dag_id: Identification of the DAG.
- dag_run_id: Identification of the DAG run.
Kwargs:
No Kwargs.
Return [dict]: Serialized DAG run information.