Hadoop – spark – பகுதி 5

Spark என்பது hadoop-ன் துணைத்திட்டமாக 2009-ம் ஆண்டு உருவாக்கப்பட்டது. பின்னர் 2010-ல் திறந்த மூல மென்பொருள் கருவியாக BSD உரிமத்தின் கீழ் வெளியிடப்பட்டது. 2013-ம் ஆண்டு இது அறக்கட்டளையுடன் இணைந்தது முதல் சிறப்பாக செயல்பட்டு வருகிறது. இதிலும் தரவுகளை சேமிக்க hdfs-தான் பயன்படுகிறது. ஆனால் சேமிக்கப்பட்டுள்ள தரவுகளை அணுகுவதற்கு வெறும் mapreduce-யோடு நின்று விடாமல் spark sql, spark streaming,graphx, MLlib (Machine Learning Library) போன்ற பல்வேறு அம்சங்களை வழங்குகிறது. மேலும் java, scala, python போன்ற பல்வேறு மொழிகளை ஆதரிக்கிறது. எனவே Analytics-க்குத் தேவைப்படும் applications-ஐ இவைகளுக்கான API-ஐப் பயன்படுத்தி எந்த மொழியில் வேண்டுமானாலும் எழுதலாம்.

Spark என்பது தரவுகளின் மீது அதிவேகமாக இயங்கி கணக்கீடுகளை நிகழ்த்த உதவும் ஒரு கட்டமைப்பு ஆகும். Scalable, flexible, fault-taularant, cost-effective (இவைகளுக்கான விளக்கத்தைக் கீழே காணலாம்) போன்ற அனைத்து விஷயங்களிலும் Hadoop & Sparkஇரண்டும் சிறந்து விளங்கினாலும், வேகம் (speed) எனும் ஒரு விஷயத்தில் மட்டும் hadoop-ஐ விட sparkசிறந்து விளங்குகிறது. ஏனெனில் ‘in-memory cluster computing’ என்பது இதில் ஒரு சிறப்பு அம்சமாக விளங்குகிறது. அதாவது hadoopபோன்று ஒவ்வொரு முறையும் hard disk-ல் சென்று தரவுகளைத் தேடாமல் அதிகமாக அணுகப்படும் தரவுகளை RAM-ல் சேமித்து வைத்துக் கொண்டு வேண்டிய நேரத்தில் துரிதமாக எடுத்து வெளிப்படுத்துவதே ‘in-memory cluster computing’ எனப்படும். அதாவது computing-க்குத் தேவையானவற்றை in-memoryயிலும், referencing dataset-ஐ external storage-லும் சேமிக்கிறது. இத்தகைய ‘in-memory’ functions மூலம் disk-க்குச் செல்லும் read/write operations-ன் அளவு குறைவதால் இதன் வேகம் அதிகரிக்கிறது.

• scalable : நமது தேவைக்கேற்ப கணினிகளை இணைத்தோ/நீக்கியோ பயன்படுத்தும் வசதி. தரவுகளின் எண்ணிக்கை பெருகப் பெருக கணினிகளை சேர்த்துக்கொண்டே செல்வது scale-up என்றும், தேவையில்லாத தருணங்களில் இணைப்பிலிருந்து நீக்கிவிடுவது scale-down என்றும் அழைக்கப்படும்.
• flexible : நமது தேவைக்கேற்ப ஒன்றை முழுமையாக மாற்றி அமைக்கும் வசதி. உதாரணத்துக்கு libre office என்பது flexible அல்ல. ஏனெனில் இதன் config files-ஐ மாற்றி அமைக்கும் உரிமை நமக்குக் கிடையாது. ஆனால் hadoop & spark போன்றவற்றின் config files அனைத்தும் நமது கட்டுப்பாட்டுக்குள் உள்ளதால் அதனை நாம் எவ்வாறு வேண்டுமானாலும் மாற்றி அமைத்துக் கொள்ளலாம்.
• fault-tolerant : இதில் அவ்வளவு எந்தஒரு தகவல் இழப்போ, தவறுளோ சுலபமாக நிகழ்ந்து விடாத நிலை. இதில் தரவுகள் அனைத்தும் பிரித்து சேமிக்கப்படுகின்றன மற்றும் backup எடுத்து வைக்கப்படுகின்றன. எனவே ஏதேனும் ஒன்றில் தவறு நிகழ்ந்தால் கூட, மற்றொன்று செயல்பட்டு அதனை சரி செய்துவிடும்.
• cost-effective : பெருந்தொகை செலுத்தத் தேவையில்லை. திறந்த மூல நிரலாகக் கிடைக்கிறது.

முன்பெல்லாம் hadoop-ஆல் செய்யப்படும் batch-processing முறையே போதுமானதாக இருந்தது. ஒரு குறிப்பிட்ட விஷயத்தின் அடிப்படையில் தரவுகளைப் பல்வேறு தொகுதிகளாகப் பிரித்து, அவைகளைக் கொண்டு கணக்கீடு செய்வதே batch processing எனப்படும். இது பொதுவாக historical data-வை process செய்ய உதவும். உதாரணத்துக்கு மக்கள் தொகை கணக்கெடுப்பு எனும் நிகழ்வானது ஒட்டு மொத்தமாக கி.பி.1-லிருந்து கி.பி.2018-வரை என்று நிகழாமல், காலநேர அடிப்படையில் ….,1990-2000, 2001-2010,… எனும் பத்து பத்து வருடங்களாகப் பிரிக்கப்பட்டு நிகழ்த்தப்படுவதைச் சொல்லலாம். ஆனால் அண்மையில்தான், நிகழ்காலத்தில் வந்துகொண்டே இருக்கக்கூடிய தரவுகளைக் கையாள்வதற்கான தேவை ஏற்பட்டது. உதாரணத்துக்கு credit/debit அட்டைகள் வழங்கும் நிறுவனங்களில் ஒரு நிமிடத்திற்கு பல்லாயிரக்கணக்கானோர் அந்த அட்டைகளைத் தேய்த்து பணப் பரிமாற்றம் செய்யும் விவரங்கள் வந்த வண்ணம் இருக்கும். இதுவே continuous streaming எனப்படும். இம்முறையில் வரும் தரவுகள் ‘real-time data’ அல்லது ‘fast-data’ எனப்படும். hadoop-ஆல் continuous streaming-ல் வரக்கூடியவற்றைக் கையாள முடியாது. ஆனால் இவற்றைக் கையாள்வதற்கான யுக்திகள் spark-ல் காணப்படுகின்றன. எனவே batch applications, Iterative algorithmns, Interactive queries, continuous steaming போன்ற அதிக அளவு வேலைப்பலு கொண்ட துறைகளில் Spark பயன்படுத்தப்படுகின்றன.

Spark-ன் கட்டமைப்புக் கூறுகள்:
Spark-core என்பது இதன் அனைத்து விதமான செயல்பாடுகளுக்கும் அடித்தளமாக அமைகின்ற ஒரு விஷயம் ஆகும். இதன் சிறப்பம்சங்களான sql queries, streaming data, machine learning, graph algorithmnsஆகிய அனைத்தும் இதனை மையமாக வைத்தே செயல்படுகின்றன.

• Spark sql : structured மற்றும் semi-structured data-வைக் கையாள்கிறது. core-ன் மீது அமைக்கப்படும் ஒரு சிறு கருவி ஆகும். schema RDD எனப்படும் ஒரு புதிய முறையினை இது வழங்குகிறது. இதனைப் பற்றிக் கீழே விளக்கமாகக் காணலாம். இதன் மூலம் continuous streaming முறையில் வரும் தரவுகளை சிறுசிறு batches-ஆகப் பிரித்து அதனை RDD-ஆக மாற்றி analytics-ஐ நிகழ்த்துகிறது.
• Streaming data : நிகழ்காலத்தில் தரவுகள் வந்து கொண்டே இருப்பதே streaming data எனப்படும். Cricket scoring, facebook, twitter, banking, whether forecastingபோன்றவை இதற்கு உதாரணங்களாக அமையும்.
• MLlib : spark-ஆனது disk-ல் மட்டும் அல்லாமல் ram-யிலும் தரவுகளை சேமிப்பதால் இத்தகைய distributed memory based architecture-க்கு ஏற்ற வகையில் distributed machine learning framework-ஆக இது சிறந்து விளங்குகிறது. Hadoop கட்டமைப்பில் இருக்கும் mahout -ஐ விட spark-ன் MLlib-ஆனது 9 மடங்கு வேகத்துடன் சிறந்து விளங்குகிறது.
• GraphX : distributed graph processing-க்கான ஒரு framework ஆகும். உதாரணத்துக்கு facebook, twitter போன்ற வலைத்ததளங்களில் ஒருவர் எத்தனை பேருடன் இணைந்துள்ளார், அவர்களுடன் எத்தனை பேர் இணைந்துள்ளனர், எத்தனை பேர் பின் தொடர்கின்றனர் என்பது போன்ற விவரங்களெல்லாம் ஒரு அச்சுப்பட வடிவில் இணைக்கப்பட்டிருக்கும். இத்தகைய அச்சுப்படங்களை வைத்து தரவுகளைக் கணக்கீடு செய்ய உதவும் ஒரு API-ஆக graphx விளங்குகிறது. Pregal abstraction என்பது அந்த API பெயர் ஆகும்.

Spark-ஐ நிறுவுதல்:

இதனை பின்வரும் 3 விதங்களில் நிறுவலாம்.

• standalone : இதில் spark என்பது hdfs-ன் மீது இயங்குகிறது. அதற்கென்று தனியாக இட ஒதுக்கீடு அளிக்கிறது. இதில் spark-ம் mapreduce-ம் ஒத்தவாறு ஓடிக்கொண்டிருக்கும்.
• Hadoop yarn : cluster management என்பது spark-ன் இயல்பான செயல்பாடுகளில் ஒன்றாக அமைகிறது. அதற்கென்று தனியாக yarn-என்ற ஒன்றோ, mesos-என்ற ஒன்றோ தேவையில்லை. ஆனால் ஏற்கனவே hadoop-ன் மீது yarn செயல்பட்டுக் கொண்டிருப்பின் அதனை நீக்கிவிட்டு spark-ஐ நிறுவத் தேவையில்லை. அதனுடன் சேர்த்தே spark-ஐ நிறுவலாம்.
• Spark in Map Reduce – hadoop மற்றும் mareduce மட்டும் இயங்கிக்கொண்டிருக்கும் ஒரு கட்டமைப்பில், நமது தேவைக்கேற்ப spark-ஐயும் சேர்த்துக்கொள்ளலாம். இதற்கென்று எந்தஒரு சிறப்பு அனுமதிகளும் தேவையில்லை.

Spark-ஐ standalone mode-ல் நிறுவுவதற்கான படிகள் பின்வருமாறு.
1.Spark-ஐ பதிவிறக்கம் செய்து /usr/local -ல் மாற்றி வைக்கவும்.
$ wget archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
$ tar -xzvf spark-2.3.1-bin-hadoop2.7.tgz
$ sudo mv ~/spark-2.3.1-bin-hadoop2.7 /usr/local

2. நமது home directory-ல் உள்ள .bashrc -க்குள் spark-ன் விவரங்களை இணைக்கவும்.

$ vim ~/.bashrc
export PATH=$PATH:/usr/local/spark-2.3.1-bin-hadoop2.7/bin
$ source ~/.bashrc

3. பின்னர் command line-ல் சென்று pyspark என அடித்தால் spark shell வெளிப்படுவதைக் காணலாம்.

$ pyspark


Resilient Distributed Datasets:

இதுவே spark-ன் அடிப்படை. இம்முறையில்தான் spark-ல் தரவுகள் சேமிக்கப்படுகின்றன. ஆய்வாளர்கள் mapreduce-ன் பண்புகளான replication, serialization மற்றும் disk IOபோன்றவற்றால் அதன் வேகம் குறைவதைக் கண்டறிந்தனர். எனவே வேகத்தை அதிகரிக்கும் பொருட்டு ‘in-memory computing’ என்ற ஒன்றைக் கண்டறிந்தனர். RDD (Resilient Distributed Datasets) என்பது இதனை implement செய்யும் ஒரு framework-ஆகும்.

அதாவது RDD-ல் சேமிக்கப்படும் தரவுகள் காரிய முறைப்படி (logically) பல்வேறு தொகுதிகளாகப் பிரிக்கப்படும். அவை cluster-ல் இணைக்கப்பட்டுள்ள பல்வேறு nodes-ல் செயல்பட்டு அதனதன் தரவு ஆய்வுக்கான வேலையைத் துவங்குகின்றன. இவையே jobs எனப்படும். இத்தகைய rdd முறையில் தரவுகளை சேமிக்கும்போது அத்தரவுகளுக்கான memory-ஆனது ஒரு object-ஆக சேமிக்கப்பட்டு அவை அதனுடைய jobs-க்கிடையே பகிரப்படுகின்றன. இதுவே immutable distributed collection of objects என்றழைக்கப்படுகிறதுன. RDD என்பதனை python, java, scala போன்ற எந்தஒரு மொழியில் வேண்டுமானாலும் எழுதலாம்.

spark shell-ல் எவ்வாறு ஒரு rdd-ஐ உருவாக்கி அதற்குள் விவரங்களை செலுத்துவது என்று பார்க்கலாம். கீழ்க்கண்ட எடுத்துக்காட்டில் x எனும் பெயர் கொண்ட rdd-ஐ உருவாக்கி அதற்குள் employees எனும் கோப்பிற்குள் உள்ள விவரங்கள் உட்செலுத்தப்பட்டுள்ளன. பின்னர் அதன் மீது செயல்படும் collect() எனும் function-ஆனது rdd-ல் உள்ள விவரங்களை எடுத்து வெளிக்காட்டும்.

>>> x = sc.textFile (“file:///home/nithya/employees.txt”)
>>> x.collect()

>>> x.count() (தரவுகளின் எண்ணிக்கையை வெளிக்காட்டும்)

>>> x.first() (முதல் dataset-ஐ வெளிக்காட்டும்)

>>> x.take(3) (parameter-ன் எண்ணிக்கையைப் பொறுத்து dataset-ஐ எடுத்து வெளிக்காட்டும்)

>>> x.takeSample(False,4,1) (Random-ஆக எடுத்து dataset-ஐ வெளிக்காட்டும்)

Spark – Mysql இணைப்பை ஏற்படுத்துதல்:
Spark மற்றும் Mysql இரண்டையும் இணைத்து எவ்வாறு ஒரு database-ல் உள்ள தரவுகளை spark-க்குள் செலுத்துவது என்று பார்க்கலாம். இதில் உள்ள படிகள் பின்வருமாறு.
1. முதலில் ஒரு சிறிய அட்டவணையை database-ல் உருவாக்கவும்.

$ mysql -u nithya -p
> create database sample;
> use sample;
> create table info(id int,name varchar(20),age int);
> insert into info values(1,”Aarthi”,25),(2,”Sathya”,18);
> Select * from info;

2. mysql.jar எனும் கோப்பினை கீழ்க்கண்ட முகவரியிலிருந்து பதிவிறக்கம் செய்யவும். இதுவே database-வுடன் இணைப்பை ஏற்படுத்துவதற்கு உதவும்.

$ wget www.java2s.com/Code/JarDownload/mysql/mysql.jar.zip

$ unzip mysql.jar.zip

3. அடுத்ததாக pyspark கட்டளையை இந்த jar file-வுடன் இணைத்து பின்வருமாறு இயக்கவும். அதற்கு முன்னர் நீங்கள் இக்கட்டளையை இயக்குவதற்கு root-ஆக login செய்ய வேண்டும். இவை பின்வருமாறு.

$ sudo su –

$ /usr/local/spark-2.3.1-bin-hadoop2.7/bin/pyspark –jars /home/nithya/mysql.jar

இக்கட்டளை இயங்கி முடிந்த பின்னர் spark shell வெளிப்படுவதைக் காணலாம். இவ்வாறு pyspark-வுடன் mysql.jarஐ இணைத்து உருவாக்கப்படும் spark shell-ல் நாம் சுலபமாக database-ஐ அணுகி அதற்குள் உள்ள அட்டவணைகளில் இருக்கும் தகவல்களை rdd-ஆக மாற்ற முடியும்.

4. இப்போது mysql-க்குள் சற்று முன் நாம் உருவாக்கிய info அட்டவணையிலுள்ள தரவுகளை spark-க்குள் செலுத்துவதற்கு sqlContext என்பது பயன்படுகிறது. இது ஒரு dataframe-ஐ உருவாக்கி அதற்குள் அட்டவணையில் இருக்கும் தரவுகளை spark-க்குள் செலுத்துகிறது. கீழ்க்கண்ட எடுத்துக்காட்டில் df எனும் பெயர்கொண்ட dataframe உருவாக்கப்பட்டுள்ளது.

>>> df = sqlContext.read.format(“jdbc”).options( url=”jdbc:“,driver = “com.mysql.jdbc.Driver”,dbtable = “info”,user=”nithya”, password=”nithya”).load()

show() எனும் function தரவுகளை வெளிக்காட்டுவதற்குப் பயன்படுகிறது.
>>> df.show()

Dataframe என்பது rdd-ன் பின்புலமாக இயங்கி கணக்கீடுகளை நிகழ்த்த உதவும் ஒன்றாகும். pandas என்பதனை dataframe பயன்படுத்தி இக்கணக்கீடுகளை நிகழ்த்துகிறது. உதாரணத்துக்கு Excel application-ன் பின்புறத்தில் கணக்கீடுகளை நிகழ்த்த உதவும் matrix operations போல இவை இரண்டும் spark-ன் பின்புலமாக செயல்படுகின்றன.

Dataframes-ன் செயல்பாடுகள்:

country, countrylanguage எனும் 2 அட்டவணைகளை உள்ளடக்கிய ஒரு sql கோப்பானது ஒரு வலைத்தள முகவரியிலிருந்து கிடைக்கிறது. அதனை பதிவிறக்கம் செய்து இவ்விரண்டு அட்டவணைகளையும் இணைத்து எவ்வாறு நமக்கு வேண்டிய dataframe-ஐ உருவாக்குவது என்று இப்பகுதியில் பார்க்கப் போகிறோம். இதில் உள்ள படிகள் பின்வருமாறு.

1. முதலில் mysql-க்குள் சென்று ‘world’ எனும் பெயர் கொண்ட ஒரு database-ஐ உருவாக்கவும். இதில்தான் sql கோப்பினை பதிவேற்றம் செய்யப்போகிறோம்.

$ mysql -u nithya -p
> create database world;

2. கீழ்க்கண்ட முகவரியிலிருந்து அந்த sql கோப்பினை பதிவிறக்கம் செய்யவும். பின்னர் நாம் உருவாக்கியுள்ள world db-க்குள் கோப்பினை import செய்வதற்கான கட்டளை கொடுக்கப்பட்டுள்ளது.

$ wget downloads.mysql.com/docs/world.sql.zip
$ unzip world.sql.zip
$ mysql -u nithya -p<world.sql

3. இப்போது mysql-ல் சென்று பார்த்தால் city, country, countrylanguage எனும் 3 அட்டவணைகளும் world db-க்குள் செலுத்தப்பட்டிருப்பதைக் காணலாம்.

4. country, countrylanguage எனும் அட்டவணைகளில் உள்ள columns-ஐப் பின்வருமாறு காணலாம்.


5. இப்போது c dataframe-க்குள் country-ல் உள்ளவற்றையும், l dataframe-க்குள் countrylanguage-ல் உள்ளவற்றையும் உட்செலுத்தியுள்ளோம்.

>>> c = sqlContext.read.format(“jdbc”).options( url=”jdbc:“,driver = “com.mysql.jdbc.Driver”,dbtable = “country”,user=”nithya”, password=”nithya”).load()

>>> c.persist() (ஒவ்வொரு column-ஐயும் அதனதன் datatype-வுடன் வெளிப்படுத்துகிறது)

>>> l = sqlContext.read.format(“jdbc”).options( url=”jdbc:“,driver = “com.mysql.jdbc.Driver”,dbtable = “countrylanguage”,user=”nithya”, password=”nithya”).load().persist()

>>> c.columns (ஒரு dataframe-ல் உள்ள columns-ஐ வெளிப்படுத்துகிறது)

>>> l.columns

6. இப்போது ஒரு நாட்டினுடைய அடையாள எண், அந்நாட்டின் பெயர், அங்கு பேசப்படுகின்ற மொழி இம்மூன்றையும் எடுக்க, மேற்கூறிய 2 dataframe-ஐயும் countrycode-ஆல் இணைக்கலாம். இதிலுள்ள படிகள் பின்வருமாறு.

c frame-ல் உள்ள முதல் இரண்டு columns-ஆன code, name ஆகியவை ctry_name எனும் புதிய dataframe-ல் சேமிக்கப்படுகின்றன. அவ்வாறே l frame-ல் உள்ள முதல் இரண்டு columns-ஆன countrycode, language ஆகியவை ctry_lang எனும் frame-ல் சேமிக்கப்படுகின்றன. பின்னர் இவ்விரண்டு frame-ஐயும் join() மூலம் இணைத்து ctry_name_lang எனும் ஒரு புதிய frame உருவாக்கப்படுகிறது.

>>> ctry_name = c.rdd.map(lambda row:(row[0],row[1]))
>>> ctry_lang = l.rdd.map(lambda row : (row[0],row[1]))
>>> ctry_name_lang = ctry_name.join(ctry_lang)

அவ்வாறு உருவான புதிய frame-ன் மீது .take(3) எனக் கொடுத்து முதல் 3 records-ஐ எடுத்து நமக்கு வேண்டியவாறு உள்ளதா என சரிபார்க்கலாம்.
>>> ctry_name_lang.take(3)

அதன் மீதே distinct().count() எனக் கொடுத்து பல்வேறு நாடுகளில் பேசப்படுகின்ற தனித்தனி மொழிகளின் எண்ணிக்கையை எடுக்கலாம்.
>>> ctry_name_lang.distinct().count()

Text file-ஐprocess செய்தல்:

ஒரு text file-ல் உள்ளவற்றை rdd-ஆக மாற்றி அதன் மீது செயல்படும் பல்வேறு functions-ஐக் கீழே காணலாம். கீழ்க்கண்ட எடுத்துக்காட்டில் kanchi எனும் கோப்பிற்குள் உள்ளவை i எனும் பெயர் கொண்ட rdd-ல் சேமிக்கப்படுகிறது.
>>> i = sc.textFile(“file:///home/nithya/kanchi.txt”)

1. i-ன் மீது செயல்படும் collect() -ஆனது rdd-ல் எவ்வாறு அனைத்து வரிகளும் சேமிக்கப்பட்டுள்ளது என்பதைக் காட்டுகிறது. பொதுவாக rdd என்பது அதனுள் உள்ளவற்றை வரிகளாகப் பிரித்து list வடிவில் வைத்துக்கொள்ளும். அதனுள் உள்ள ஒவ்வொரு வரியையும் list-ன் item-ஆகப் பிரிக்கும். கீழ்க்கண்ட எடுத்துக்காட்டில் ஒவ்வொரு வரியின் துவக்கத்திலும் u’ எனக் காணப்படுவது இதை உணர்த்துகிறது.

>>> i.collect()

2. i-ன் மீது செயல்படும் map() -ஆனது rdd-ல் உள்ள ஒவ்வொரு வரியையும் அடைப்புக் குறிக்குள் [] பிரித்து, பின்னர் அதிலுள்ள ஒவ்வொரு வார்த்தையையும் இடைவெளிகளை வைத்துப் பிரிக்கிறது. எனவே இதன் வடிவம் [ [.,.,.,],[.,.,.,],[.,.,.,.] ] இவ்வாறு அமைகிறது. கீழ்க்கண்ட எடுத்துக்காட்டில் ஒவ்வொரு வார்த்தையின் துவக்கத்திலும் u’ எனக் காணப்பட்டாலும் ஒவ்வொரு வாக்கியமும் தனித்தனி அடைப்புக் குறிக்குள் [ ] அடங்கி, பிறகு மொத்தமாக ஒரு அடைப்புக் குறிக்குள் அடங்குவதைக் காணலாம். ஆகையால் இது ‘iterable of iterables’ என்று அழைக்கப்படுகிறது.

>>> a_map = i.map(lambda line : line.split(” “))
>>> a_map.take(2)

3. i-ன் மீது செயல்படும் flatmap() -ஆனது rdd-ல் உள்ள ஒவ்வொரு வார்த்தையையும் இடைவெளிகளை வைத்துப் பிரித்து அவற்றை list-ன் item-ஆக சேமிக்கிறது. கீழ்க்கண்ட எடுத்துக்காட்டில் ஒவ்வொரு வார்த்தையின் துவக்கத்திலும் u’ எனக் காணப்படுவது இதை உணர்த்துகிறது. இதன் வடிவம் [.,.,.,.,.,] என்று அமைகிறது. இது ‘iterable of strings’ என்று அழைக்கப்படுகிறது.

>>> b_flatmap = i.flatMap(lambda line : line.split(” “))
>>> b_flatmap.take(20)

4. i-ன் மீது செயல்படும் filter() -ஆனது கொடுக்கப்பட்டுள்ள வார்த்தை இடம்பெற்றுள்ள வரிகளை எடுத்து தனியாக வேறொரு rdd-ல் சேமித்துக்கொள்கிறது. அதன் மீது செயல்படும் count() அவ்வார்த்தை எத்தனை முறை இடம்பெற்றுள்ளது என்பதையும், collect() அவ்வார்த்தை இடம்பெற்றுள்ள வரிகளையும் வெளிப்படுத்துகிறது.
>>> c_fil = i.filter(lambda line : (“Kanchipuram” in line))
>>> c_fil.count()
>>> c_fil.collect()

5. தரவுகளின் எண்ணிக்கை அதிகமாக இருக்கும்போது மாதிரிக்கு ஒருசில வரிகளை எடுக்க விரும்பினால் sample-ஐப் பயன்படுத்தலாம். இது random-ஆக வரிகளை எடுத்து வெளிப்படுத்தும்.
>>> d_smp = i.sample(True,0.5,3)
>>> d_smp.collect()

6. ஒரு rdd-ன் மீது செயல்படும் getNumPartitions() என்பது rdd எத்தனை பகுதிகளாகப் பிரிக்கப்பட்டுள்ளது என்பதை வெளிப்படுத்தும். இங்கு rdd-ஐ உருவாக்கும்போதே அது எத்தனை பகுதிகளாகப் பிரிக்க வேண்டும் எனும் எண்ணிக்கை கொடுக்கப்பட்டுள்ளது.
>>> j = sc.textFile(“file:///home/nithya/Kanchi.txt”,4)
>>> j.getNumPartitions()

Union, Join, Intersection:
1. ஒரே வடிவம் கொண்ட rdd-ன் மீது செயல்படும் union() function-ஆனது அவை இரண்டையும் மொத்தமாக இணைத்து பின்வருமாறு வெளிப்படுத்தும். இதற்கான கட்டளைகளின் வடிவம் பின்வருமாறு.
>>> personal = [(“Loves”,”Baby”),(“Expert In”,”Cooking”),(“Hates”,”Misunderstandings”)]
>>> professional = [(“Loves”,”Technology”),(“Expert In”,”IOT”),(“Hates”,”Information hiding”)]
>>> per = sc.parallelize(personal)
>>> prof = sc.parallelize(professional)
>>> per.union(prof).collect()


2. அதுவே இவை இரண்டும் join() மூலம் இணைக்கப்பட்டால், அவை ஒரு பொதுவான மதிப்பால் இணைக்கப்பட்டு அதனடிப்படையில் இரண்டிலிருந்தும் தரவுகளை எடுத்து வெளிப்படுத்தும். இது பின்வருமாறு.
>>> per.join(prof).collect()

3. intersect() மூலம் இரண்டு rdd-ஐ இணைக்கும்போது அவற்றின் வடிவம் வெவ்வேறாக இருந்தாலும், அதிலிருக்கும் பொதுவான தகவல்களை மட்டும் எடுத்து வெளிப்படுத்தும். இது பின்வருமாறு.
>>> volunteers = [“Rubhini”,”Kala”,”Rukmani”,”Devi”,”Pavithra”,”Pranitha”,”Kamatchi”]
>>> members = [“Kavitha”,”Shruthi”,”Rubhini”,”Kala”,”Malathi”]
>>> vol = sc.parallelize(volunteers)
>>> mem = sc.parallelize(members)
>>> vol.intersection(mem).collect()


User defined functions:
நாமே நமக்கேற்றவாறு functions-ஐ உருவாக்கி எவ்வாறு ஒரு தனித்தியங்கும் spark application-ஐ உருவாக்குவது என்று இப்பகுதியில் பார்க்கலாம். girls.txt எனும் கோப்பிற்குள் பெண்களின் பெயர்கள், அவர்களின் வயது, அவர்கள் வகிக்கும் பதவி என்பது போன்ற விவரங்களெல்லாம் கொடுக்கப்பட்டுள்ளன. இதை வைத்து மொத்தம் எத்தனை பெண்கள் உள்ளனர், ஒவ்வொரு பதவியிலும் எத்தனை பெண்கள் உள்ளனர், அதில் 30 வயதுக்கு உட்பட்டவர்களும், 50 வயதுக்கு மேற்பட்டவர்களும் பதவி உயர்வுக்கு தகுதி இல்லாதவர்களாகக் கருதப்பட்டு அதில் எத்தனை பேர் வருகின்றனர் என்பது போன்ற விஷயங்களெல்லாம் user defined functions-ஆக எழுதப்பட்டுள்ளன. இதனை age_grouping.pyஎனும் கோப்பிற்குள் காணலாம்.


from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('MyFirstStandaloneApp')
sc = SparkContext(conf=conf)
fle = sc.textFile("file:///home/nithya/girls.txt")
def age_calc(data):
sno,fname,lname,age,desig,mob,location = data.split(",")
return fname,lname,age_group(int(age)),desig,location,int(age)
def age_group(age):
if age < 30 :
return '0-30'
elif age < 40:
return '30-40'
elif age < 50:
return '40-50'
elif age < 60:
return '50-60'
else :
return '60+'
age_map = fle.map(age_calc)
freq = age_map.map(lambda line : line[3]).countByValue()
print "Total no.of ladies",fle.count()
print "Total no.of ladies on each designation",dict(freq)
Under_age = sc.accumulator(0)
Over_age = sc.accumulator(0)
def promotion_check(data):
global Over_age, Under_age
age_grp = data[2]
if(age_grp == "50-60"):
Over_age +=1
if(age_grp == "0-30"):
Under_age +=1
return data
df = age_map.map(promotion_check).collect()
print "Not qualified for promotions due to under age",Under_age
print "Not qualified for promotions due to over age",Over_age

view raw

age_grouping.py

hosted with ❤ by GitHub

இந்த age_grouping.py கோப்பினை இயக்குவதற்கு முன்னர் spark-ன் conf-க்குள் சென்று log4j.properties.template என இருக்கும் கோப்பினை log4j.properties எனப் பெயர் மாற்றம் செய்து, அதில் log4j.logger.org=OFF எனும் பண்பினை சேர்க்கவும். இது பின்வருமாறு.
$ sudo cp /usr/local/spark-2.3.1-bin-hadoop2.7/conf/log4j.properties.template /usr/local/spark-2.3.1-bin-hadoop2.7/conf/log4j.properties
$ sudo vim /usr/local/spark-2.3.1-bin-hadoop2.7/conf/log4j.properties
log4j.logger.org=OFF

இப்போது age_grouping.py கோப்பினை இயக்குவதற்கான கட்டளை பின்வருமாறு.
$ /usr/local/spark-2.3.1-bin-hadoop2.7/bin/spark-submit ~/age_grouping.py

இதன் வெளியீடு நாம் எதிர்பார்த்த வண்ணம் உள்ளதைக் காணலாம்.

%d bloggers like this: