Schedule using Airflow
இனிவரும் பகுதியில் இதற்கு முன் பகுதியில் கண்ட அதே விஷயத்தை Airflow கொண்டு உருவாக்கித் திட்டமிடுவது பற்றிக் காணலாம். கீழ்கண்ட கட்டளைகள் Airflow இயங்குவதற்குத் தேவையான விஷயத்தை இன்ஸ்டால் செய்யும்.
$ sudo pip3 install apache-airflow
$ sudo pip3 install flask
$ sudo pip3 install flask_bcrypt
$ sudo pip3 install kombu==4.5.0
இவை இயங்கி முடிந்தவுடன் நமது கணினியின் ஹோம் டைரக்டரியில் airflow எனும் டைரக்டரி உருவாக்கப்படும்.
இதற்கு முன் பகுதியில் பகுதியில் நாம் உருவாக்கிய இரண்டு பணிகளுக்கும் ஷெல் ஸ்கிரிப்ட்க்கு பதிலாக, இரண்டு பைத்தான் புரோகிராமை உருவாக்கி அவற்றை task1, task2 எனும் பெயரில் dags டைரக்டரியின் கீழ் (~/airflow/dags) சேமிக்கவும்.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from airflow import DAG | |
import os | |
from airflow.operators.python_operator import PythonOperator | |
from airflow.models import Variable | |
def backup(): | |
backup_cmd = 'mysqldump -u root -ppassword sample > "/home/shrini/backups/"sample_"$(date +"%Y%m%d_%H%M%S").sql"' | |
os.system(backup_cmd) | |
dag = DAG('mysql_backup', description='Mysql backup every hour', schedule_interval='00 * * * *', start_date=datetime(2020, 11, 2), catchup=False) | |
backup_operator = PythonOperator(task_id='backup_task', python_callable=backup, dag=dag) |
நிரலுக்கான விளக்கம்:
Airflow-வில் பல ஆப்பரேட்டர்கள் உள்ளன. அவற்றில் எதன் துணைகொண்டு நாம் இந்தச் செயலை செய்யப் போகிறோமோ அதனை முதலில் import செய்து கொள்ள வேண்டும். இங்கு பைத்தான் ஆப்பரேட்டரை இம்போட் செய்துள்ளோம். பின்பு backup எடுப்பதற்கான கட்டளையை ஒரு function-க்குள் எழுதி இயக்கியுள்ளோம். எப்போதும் ஒரு ஷெல் கட்டளையை பைத்தான் மொழியில் இயக்குவதற்கு os.system() எனும் method பயன்படும். பின்பு இதற்கான dag வரையறுக்கப்படுகிறது. அது தனக்கென ஒரு id, வரையறை மற்றும் எப்போதெல்லாம் இயங்க வேண்டும் ஆகிய மதிப்புகளைப் பெற்றிருப்பதைக் காணலாம். இந்த id தான் Airflow UI திரையில் வெளிப்படும். மேலும் start_date என்பது எந்தத் தேதியிலிருந்து இச்செயலை செய்ய வேண்டும் என்பதைக் குறிக்கிறது. அடுத்து catchup=False என்றிருந்தால், இடையில் சர்வரின் இயக்கம் தடைபடுகின்ற நேரத்தில், அது இயங்க வேண்டிய சுழற்சிகளை நிறுத்திவிடும். அதாவது Airflow server-ன் இயக்கம் இன்று நிறுத்தப்பட்டு நாளை மீண்டும் தொடக்கினால், தொடங்கிய நேரம் முதல் jobs ஓடத் தொடங்கும். அதுவே catchup=True என்றிருந்தால், விட்ட நேரத்திற்கும் சேர்த்து இயங்கி முடிக்கும்.
கடைசியாக இந்த dag மற்றும் function ஆகியவற்றை மதிப்புகளாகக் கொண்ட ஒரு பைத்தான் ஆப்பரேட்டர் இச்செயலுக்காக உருவாக்கப்படுகிறது.
task2:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from airflow import DAG | |
import os | |
from airflow.operators.python_operator import PythonOperator | |
def pushfile(): | |
push_cmd = 'rsync -arv -e "ssh -i /home/shrini/shrini-freepem.pem" /home/shrini/backups ubuntu@35.166.185.40:/home/ubuntu' | |
os.system(push_cmd) | |
def clr(): | |
clr_cmd = 'rm -f /home/shrini/backups/*' | |
os.system(clr_cmd) | |
dag = DAG('push_remote', description='Pushing files to remote every day', schedule_interval='20 00 * * *', start_date=datetime(2020, 11, 2), catchup=False) | |
pushfile_operator = PythonOperator(task_id='pushfile_task', python_callable=pushfile, dag=dag) | |
clear_operator = PythonOperator(task_id='clearing_task', python_callable=clr, dag=dag) | |
pushfile_operator >> clear_operator |
நிரலுக்கான விளக்கம்:
இங்கு backup எடுக்கப்பட்ட கோப்புகளை ரிமோட் சர்வருக்கு அனுப்புதல், அனுப்பிய பின் அவற்றை தற்போதைய டைரக்டரியில் இருந்து நீக்குதல் ஆகிய இரண்டு செயல்களுக்கும் இரண்டு தனித்தனி function எழுதப்பட்டுள்ளது. இச்செயல்கள் ஒவ்வொரு நாளும் நள்ளிரவு 12:20 மணிக்கு நடக்குமாறு schedule செய்யப்பட்டுள்ளது. அதன் பிறகு இவ்விரண்டு செயல்களுக்கும் தனித்தனியே இரண்டு ஆப்பரேட்டர்கள் உருவாக்கப்பட்டுள்ளன. முதல் ஆப்பரேட்டரின் செயல்பாடு முடிந்தவுடன் இரண்டாவது ஆப்பரேட்டரின் செயல்பாடு துவங்க வேண்டும் என்பதை >> எனும் குறியீடு குறிக்கிறது.
மேற்கண்ட இரண்டு பணிகளுக்கான நிரல்களையும் dags டைரக்டரியின் கீழ் (~/airflow/dags) சேமித்தவுடன் Airflow-வைத் துவக்குவதற்கான கட்டளையை அளிக்கவும்.
$ airflow initdb
இது ‘Failed to import’ என்பது போன்று ஏதேனும் error-ஐ வெளிப்படுத்தினால், நம்முடைய நிரல்களில் ஏதோ தவறு இருக்கிறது என்று அர்த்தம். ஆகவே இக்கட்டளை எவ்வித இடையூறும் இல்லாமல் ஓடி முடிந்தபின், இதற்கான webserver மற்றும் scheduler ஆகிய இரண்டையும் இரண்டு தனித்தனி டெர்மினல்களில் ஓட விடவும்.
$ airflow webserver
$ airflow scheduler
Airflow UI
webserver மற்றும் scheduler ஆகிய இரண்டும் தனித்தனி டெர்மினல்களில் ஓடிக்கொண்டிருக்கும் போது கீழ்க்கண்ட முகவரியில் சென்று காணவும். இதுதான் நாம் உருவாக்கி ஷெட்யூல் செய்துள்ள பணிகளைத் திரையில் காண உதவும் இடைமுகப்பு ஆகும்.
192.168.1.15:8080/admin/
நம்முடைய dag பைத்தான் ஆப்பரேட்டர் கொண்டு எழுதப்பட்டது. இதேபோல bash ஆப்பரேட்டர், http ஆப்பரேட்டர், docker, hive, kubernetes என பலப்பல ஆப்பரேட்டர்கள் உள்ளன. அவை ஒவ்வொன்றுக்குமான உதாரணங்கள் தான் இங்கே இருப்பவை எல்லாம். இவற்றில் நம்முடைய dag-ஐ ஃபில்டர் செய்து கொள்ளவும்.
இதில் நம்முடைய dag-ன் பெயர், எப்போது schedule செய்யப்பட்டுள்ளது, சமீபமாக ஓடி முடிந்த dag-ன் நிலைகள், கடைசியாக எப்போது ஓடியது ஆகியவற்றைக் குறிக்கும் வகையில் பல்வேறு உப தலைப்புகள் உள்ளன. இவற்றில் Links எனும் உபதலைப்பின் கீழ் dag-ஐத் தூண்டுதல், அவற்றின் செயல்பாடுகளை வரைபட வடிவில் வெளிப்படுத்துதல், அவற்றுக்கான logs- ஐ வெளிப்படுத்துதல், மூல நிரலை வெளிப்படுத்துதல் போன்ற பல்வேறு செயல்களுக்கான ஐகான்கள் உள்ளன.
இவற்றில் Trigger Dag எனும் ஐகானின் மீது சொடுக்கினால், DAG Runs எனும் உபதலைப்பின் கீழ் இளம்பச்சை நிறத்தில் ஒரு வட்டம் உருவாவதைக் காணலாம். அதாவது dag ஓடத் தொடங்கியிருக்கிறது என்று அர்த்தம். Schedule-ன் படி இல்லாமால், நமக்கு எப்போது தேவையோ அப்போது dag-ஐ ஓட்டிப் பார்க்க இந்த ஐகான் பயன்படும். Recent Tasks, DAG Runs ஆகிய உப தலைப்புகளின் கீழ் உள்ள பல்வேறு நிற வட்டங்கள் dag ஓட்டத்தின் பல்வேறு நிலைகளைக் குறிக்கின்றன. அவை கரும்பச்சையாக இருந்தால் வெற்றிகரமாக ஓடியது என்றும், இளம்பச்சையாக இருந்தால் ஓடிக்கொண்டிருக்கிறது என்றும், சிகப்பாக இருந்தால் ஓடித் தோல்வியுற்றது என்றும் அர்த்தம்.
அடுத்து Tree View ஐகானின் மீது சொடுக்கினால் அது பின்வருமாறு ஒரு வரைபடத்தை வெளிப்படுத்தும். அதேபோல் Graph view-மீதும் சொடுக்கிப் பார்க்கவும்.
Logs ஐகானின் மீது சொடுக்கினால் அது பின்வருமாறு வெளிப்படுத்தும். அதேபோல் மூல நிரலைக் காட்டும் ஐகானின் மீது சொடுக்கினால் அதற்கான நிரல் திரையில் வெளிப்படுவதைக் காணலாம்.
Variables
backup எடுக்கும் பணிக்காக எழுதப்பட்ட பைத்தான் நிரலில் அக்கட்டளையைக் கொஞ்சம் கவனிக்கவும். Mysql-க்குள் உள் நுழைவதற்காக -u வைத் தொடர்ந்து பயனர்பெயர்(root) மற்றும் -p ஐத் தொடர்ந்து கடவுச்சொல் (password) ஆகியவை நேரடியாகவே கொடுக்கப்பட்டுள்ளன. ஒவ்வொரு முறை டேட்டாபேஸின் பாஸ்வேர்டை மாற்றும் போதும், புதிய பயனராக உள் நுழைய முயலும் போதும் இந்த மூல நிரலில் சென்று மாற்றம் செய்ய வேண்டும். இதற்கு பதிலாக இதற்கான variable-களை UI திரையில் உருவாக்கிக் கொண்டு அதனை நாம் நம்முடைய நிரல்களில் பயன்படுத்தலாம். இதன் மூலம் மதிப்பினை மாற்றினாலும் மூல நிரலில் கை வைக்கத் தேவையில்லை. UI திரையில் சென்று அந்த variable-களுக்கான மதிப்பினை மாற்றினால் போதும்.
முதலில் Airflow UI -ல் சென்று Action -> Variable -> Create ஆகிய இணைப்பின் மீது சொடுக்கவும். அது key, val ஆகிய பெட்டிகளைக் கொண்ட திரையை வெளிப்படுத்தும். அதில் நாம் உருவாக்க வேண்டிய variable-ன் பெயர் மற்றும் அதன் மதிப்பினை கீழ்கண்டவாறு அளிக்கவும்.
பின் list எனும் இணைப்பின் மீது சொடுக்கினால் நாம் உருவாக்கிய variable-களும் அவற்றின் மதிப்புகளும் பின்வருமாறு வெளிப்படும்.
திரையில் உருவாக்கப்பட்ட mysql_username, mysql_password ஆகியவை நம்முடைய நிரலுக்குள் import செய்யப்படுகின்றன. பின் இவ்விரு variable-களின் மதிப்பும் usr, pwd ஆகிய பெயரில் நிரலுக்குள் செலுத்தப்படுகின்றன. இது பின்வருமாறு.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from airflow import DAG | |
import os | |
from airflow.operators.python_operator import PythonOperator | |
from airflow.models import Variable | |
usr = Variable.get("mysql_username") | |
pwd = Variable.get("mysql_password") | |
def backup(): | |
backup_cmd = 'mysqldump -u '+usr+' -p'+pwd+' sample > "/home/shrini/backups/"sample_"$(date +"%Y%m%d_%H%M%S").sql"' | |
os.system(backup_cmd) | |
dag = DAG('mysql_backup', description='Mysql backup every hour', schedule_interval='00 * * * *', start_date=datetime(2020, 11, 2), catchup=False) | |
backup_operator = PythonOperator(task_id='backup_task', python_callable=backup, dag=dag) |
Bash Operator
மேற்கண்ட அதே நிரலை பைத்தான் ஆப்பரேட்டர் இல்லாமல் bash ஆப்பரேட்டர் கொண்டு உருவாக்கப்பட்ட ப்ரோக்ராம் பின்வருமாறு.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from airflow import DAG | |
from airflow.operators.bash_operator import BashOperator | |
dag = DAG('bash_exp', description='Mysql backup every hour', schedule_interval='00 * * * *', start_date=datetime(2020, 11, 2), catchup=False) | |
operator = BashOperator(task_id='backup_task', bash_command='mysqldump -u root -ppassword sample > "/home/shrini/backups/"sample_"$(date +"%Y%m%d_%H%M%S").sql"',dag=dag) |
இதில் ஷெல் கட்டளைகள் நேரடியாக இயக்கப்படுவதைக் காணலாம்.
பொதுவாக dag-ன் பெயர்தான் திரையில் வெளிப்படும். ஆகவே dag-ன் பெயரையே புரோகிராமின் பெயராக வைத்துக்கொள்வது நல்லது. புதிதாக bash ஆப்பரேட்டர் கொண்டு உருவாக்கப்பட்ட dag-ஐ திரையில் ரன் செய்து பார்க்கவும். அது தோல்வியுற்று சிகப்பு நிற வளையத்தை வெளிப்படுத்துவதைக் காணலாம்.
என்ன காரணம் என்று தெரிந்து கொள்ள அவ்வளையத்தின் மீது சொடுக்கினால் அதற்கான Run Id பின்வருமாறு வெளிப்படும்.
அந்த Id-ன் மீது சொடுக்கினால், அது பின்வருமாறு திரையை வெளிப்படுத்தும்.
அதில் view log-ன் மீது சொடுக்கவும். என்ன காரணம் என்பது பின்வருமாறு வெளிப்படுத்தப்பட்டுள்ளது.
[2020-11-09 11:59:08,149] {bash_operator.py:157} INFO – mysqldump: Got error: 2002: Can’t connect to local MySQL server through socket ‘/var/run/mysqld/mysqld.sock’ (2) when trying to connect
அதாவது Mysql-ஐத் தொடர்பு கொள்வதில் ஏதோ சிக்கல் என்று வெளிப்படுத்தியுள்ளது. எனவே Mysql இயங்குகின்ற இடத்தில் சென்று அதற்கான சேவையைத் துவக்கி அதன் இயக்கத்தை உறுதிபடுத்திக் கொள்ளவும்.
இதுவரை பல்வேறு சர்வர்களில் ஓடிக்கொண்டிருக்கும் பணிகளை மேலாண்மை செய்ய உதவும் Airflow கருவியை பற்றிப் பார்த்தோம். அடுத்ததாக பல்வேறு சர்வர்களில் ஒரே நேரத்தில் பல மென்பொருட்களை நிறுவுதல், மேம்படுத்துதல், நீக்குதல் போன்ற பல பணிகளைச் செய்ய உதவும் Ansible – ஐப் பற்றிப் பார்க்கலாம்.