Orchestrating your first pipeline on Apache Airflow with LocalExecutor
Learn how to install and configure Airflow with the LocalExecutor, while developing and triggering your first parallel-task pipeline using the PythonOperator.
On this introductory article, you’ll learn how to give your first steps into Airflow with task parallelism. Airflow may be a powerful tool for task orchestration, packaging together an incredible versatility, user interface and an excellent logging/monitoring system for pipeline execution. However, the correct installation and configuration of this tool is ultimately important for building useful solutions on your environment.
Please notice that this article won’t cover the details of a DAG file (since it would make the article just too big), so, please refer to the official documentation on DAG files if you need!
So, how can we start molding Airflow to correctly handle our needs? Let’s get into it!
Understanding your Orchestrator
Before starting with a hands-on approach with Airflow, we need to make it clear on why we need to go through all this article to begin with, so, let’s take a quick look into Airflow structure:
The previous diagram represents the Airflow single-node structure, which is the same structure that you’ll get once you fresh install it, and it’s the same that we will be using to cover this article.
So, to sum up the structure and its functionalities:
- Airflow.cfg:
This is where all configurations are stored, and because of that, we will be using this file quite a while after our fresh installation process. - Web Server:
This sub-system of Airflow is where you, as a final user, will spend most of your time. That’s because all the logging, monitoring, variables management and DAG information are presented here. - Scheduler:
Think about the scheduler as the metronome of your system. If you’re not a music person, the metronome is a tool that tells the pace of your music, or, in our environment, it keeps things orchestrated, synced together. It’s also responsible for creating DagRuns and TaskInstances, before delegating it to the responsible sub-systems (metadata, executor and etc). The Scheduler also contains the Executor, but for us to better understand this last one, I’ve decided to separate it. - Metadata:
This is, basically, the brains behind Airflow, and I didn’t call it a brain for no reason. The metadata is a database responsible for putting together all the information needed for the whole environment (even for advanced multi-node structures that we will discuss later on this article). And since it’s a database, it also comes with its limitations, so, even if it’s possible to exchange data between tasks (through XComs), it’s definitely not recommended to use Airflow for data processing, since you may run out of resources while doing so (although you can use it for conditional triggering and so on). To handle this kind of data limitations, there are a lot of Operators that will be able to delegate tasks for the proper environment to handle them. - Executors:
Last, but not least, this sub-system is in charge for distributing and coordinating tasks execution between workers, and because of that it becomes a very important choice for your project, since every Executor has its own specifications. Notice that, on the previous diagram, the Executor contains a queue element inside it, that is valid for both SequentialExecutor and LocalExecutor, and it’s called a single-node structure because the queue is bound to the same environment of the worker, which means that we will have to keep every sub-system running on the same machine.
For beginners, the first “difficulty” with Airflow occurs because, by default, it comes configured to use the SequentialExecutor, which is, basically, a queue that executes one task at a time. That easily becomes a problem once you start dealing with multiple tasks and conditioning, since you will probably need to run a lot of things and you really don’t want to keep waiting for each task to finish, and that’s precisely what we will be covering from now on.
Setting the Stage
So, now that we understand the basic structure of our orchestrator and why it is so important to properly install and configure it, let’s define our basic needs.
Requisites:
First of all, this article has been tested on a Linux OS (Pop!_OS 20.04 LTS 64-bit), so make sure to adapt the next procedures to your OS or use a virtual machine.
- Python:
Since Airflow is written in Python, we have to install the necessary software to be able to run python scripts. For this article, Python 3.8 was installed and you can find it in here. Also, you should be able to go through all this article with Python 2.x as well, but you will need to do some modifications on the installation process. - Pip:
Pip is the package installer for Python. You can use pip to install packages from the Python Package Index and other indexes. You can find its documentation in here. For this article, pip v20.0.2 from Python 3.8 package was installed. - PostgreSQL:
To use the LocalExecutor, we will need to configure the metadata database to use PostgreSQL as its data engine, therefore, you need to install PostgreSQL on your machine as well. You can find more information on how to install it here. We will configure it later, so just make sure to have it installed.
Installing Airflow:
- First, make sure to update your system and install build-essential with:
sudo apt-get update
sudo apt-get install build-essential
(Optional) Create and activate a virtual environment for Airflow:
python3 -m venv <your_venv_name>
source <your_venv_name>/bin/activate
2. Install additional tools:
sudo apt-get install python3-dev
pip3 install wheel
3. Install Airflow:
pip3 install apache-airflow==1.10.14 --constraint https://raw.githubusercontent.com/apache/airflow/constraints-1.10.14/constraints-3.8.txt
Notice that on the Airflow installation command we have 1.10.14 and constraints-3.8.txt versioning. The first one is the Airflow version we want to install, and the second one (3.8) is the Python version we are using, so, if you want to install another version of Airflow, or if you’re using another version of Python, those are the points you need to change. The constraints are basically modules that need to be installed with Airflow for everything to work just fine, so, feel free to explore this argument with the official documentation.
4. Check for Airflow paths:
airflow info
With this command you will get a lot of information about your Airflow installation, such as your Executor, your SQL Alchemy Connection, DAGs Folder, Plugins Folder, Base Log Folder and much more!
5. Initialize Airflow:
airflow db init
Now, if everything succeeded with no errors (warnings may occur but will be fixed later), you can now run your Airflow Webserver with the following command:
airflow webserver
Now, if you go to http://localhost:8080 on your browser, you should be able to see your Airflow Web Server UI, success!
If this is your first time initializing Airflow, you’ll probably see a “No user yet created…” warning message. That’s because we need a user to interact with Airflow UI, so, don’t worry, let’s get into it:
6. Create your user:
airflow users create -u <your_username> -p <your_password> -f <your_first_name> -l <your_last_name> -r Admin
Notice that we had a parameter -r that we set it as Admin, that’s your user’ role on Airflow. For this article it is OK to create it as an Admin, but you need to make sure to give your users the proper roles on a project, so, you can check it on the official documentation as well.
Now that your user is created, you can now sign in into Airflow Web Server, so we should be good to start creating our DAG file.
Define your score: myDAG.py
Now, it’s finally the time to put some music on our orchestra, so, open up your favorite Python IDE and let’s start coding our first DAG file, named myDAG.py, that should contain the following code:
Save the file as myDAG.py inside your Airflow DAG’s folder and start the Scheduler (since we’ve already started the Web Server, it should be up and running by now) with the following command:
airflow scheduler
Let the music play
If you’ve followed the steps above correctly, you should see your new DAG file in your Airflow UI:
So, click on the DAG file name and we should be redirected to the DAG Tree View:
And by clicking into “Graph View”, we should have a better understanding on how tasks will be executed with this DAG structure, which will allow us to see the parallel tasks after “first_task”:
So, if you’re seeing the same structure as described by the image above, set your DAG state to “ON” (click on the On/Off switch next to the DAG name), and just click on “Trigger DAG” and let it run. Once it finish running (it should not have any errors since we are just using simple Operators in this DAG), go to the “Gantt” tab and you should see something like:
So, by checking on the Gantt Diagram for our pipeline execution, you will see that our “parallel” tasks were executed sequentially. That’s because, by default, Airflow uses the SequentialExecutor to execute tasks, so, it will need to wait for each task to finish before starting the next one, and that’s precisely what we do not want to happen in this article, so… How do solve this?
A step beyond: parallelism
Thankfully, Airflow comes prepared to handle a lot of situations regarding to tasks orchestration, and, since we would eventually need to deal with parallelism, we can count with the LocalExecutor.
Configuring the LocalExecutor:
- Change the executor on airflow.cfg:
# On airflow.cfg file, look for the line containing:executor = SequentialExecutor# and change it to:executor = LocalExecutor
2. Configure your Postgres user:
# In your terminalsudo -u postgres psqlpostgres=# ALTER USER postgres PASSWORD 'postgres';
postgres=# \q
Now we have redefined our Postgres credentials, notice that I’ve set these credentials just to be able to better configure Airflow for this article, so, if you’re not using it just for learning purposes, I’d recommend you to take care of your DB credentials.
3. Install Postgres package to Airflow:
# In your python environment (venv or not)pip3 install 'apache-airflow[postgres]'
4. Change your SQL Alchemy Conn inside airflow.cfg:
By default, Airflow uses the SQLite DB as connection from SQL Alchemy. But this DB can’t handle the LocalExecutor, so we need to change it to our new Postgres DB by altering the following configuration line:
# On airflow.cfg, look for the line containing:sql_alchemy_conn = sqlite:///.../airflow.db# and change it to:sql_alchemy_conn = postgresql+psycopg2://postgres:postgres@localhost/postgres
5. Check for DB connection:
# In your terminal:airflow db check
If Airflow could successfully connect to yours Postgres DB, you will see an INFO containing a “Connection Successful” message in it, so now we are good to go.
6. Init your Airflow DB:
# In your terminal:airflow db init
By running the DB init command, you should see the “No user yet created” warning once again, that’s because we’ve changed our database, and we’ve lost all our metadata with it, so, be very careful when changing your database configurations!
7. Create our new Airflow user:
airflow users create -u <your_username> -p <your_password> -f <your_first_name> -l <your_last_name> -r Admin
Applying new configurations:
Save your airflow.cfg file and restart your Airflow instance by stopping all running services and then enter:
# In your terminalairflow db initairflow webserver# On a second terminal:airflow scheduler
Enjoy the show!
Now that our tasks are running through the LocalExecutor, you should be able to run multiple tasks in parallel, so, let’s get back to our Web Server and test it out!
Trigger “MyDAG” once again and wait for it to finish, then, move to the “Gantt” tab once again and you should see this beautiful diagram:
Voilá!
Now all of our parallel tasks can be executed simultaneously, granting us all the power needed to run a lot of solutions on our local Airflow instance.
And thus, we get to the finish of this article with a totally functional and useful Airflow instance that can run most complex DAG solutions while keeping our time in time (pun intended), please feel free to play around with the LocalExecutor and check for the official documentation if needed!
Not quite your tempo?
Still here? Great!
Hopefully the LocalExecutor will be able to handle most of your projects, but if you intend to use Airflow on a real production environment, you’re probably wondering if it is possible to escalate it to infinity with clusters or something like this, right?
Well, the answer is YES.
Airflow comes with another executor, called the CeleryExecutor, which separates the workers from the queue, allowing us to process as many tasks as we want from as many machines as needed.
Unfortunately this is another whole process of Airflow configuration that will be covered by another article on the near future, so, if you’re interested on this type of orchestration, stay tuned (pun intended as well)!