Apache Airflow allows you to programmatically author, schedule and monitor workflows as directed acyclic graphs (DAGs) of tasks. It helps you to automate scripts to do various tasks.
In this tutorial, we are going to show you how you can easily connect to an Amazon Redshift instance from Apache Airflow. You can use same procedure to connect to any of your data sources, including Salesforce, using a Progress DataDirect JDBC Driver.
java -jar PROGRESS_DATADIRECT_JDBC_INSTALL.jarfrom datetime import timedelta import airflowimport osimport csvfrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.hooks.jdbc_hook import JdbcHook#Creating JDBC connection using Conn IDJdbcConn = JdbcHook(jdbc_conn_id='Redshift')def getconnection(): JdbcConn.get_connection('Redshift') print("connected")def writerrecords(): id=JdbcConn.get_records(sql="SELECT * FROM customer") with open('records.csv', 'w') as csvFile: writer = csv.writer(csvFile) writer.writerows(id)def displyrecords(): with open('records.csv', 'rt')as f: data = csv.reader(f) for row in data: print(row)default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': airflow.utils.dates.days_ago(2), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1),} dag = DAG( 'datadirect_sample', default_args=default_args, schedule_interval="@daily") t1 = PythonOperator( task_id='getconnection', python_callable=getconnection, dag=dag,)t2 = PythonOperator( task_id='WriteRecords', python_callable=writerrecords, dag=dag,) t3 = PythonOperator( task_id='DisplayRecords', python_callable=displyrecords, dag=dag,) t1 >> t2 >> t3
We hope this tutorial helped you to get started with how you can access Amazon Redshift data from Apache Airflow. Please contact us if you need help or have any questions.