Airflow ile Model Pipeline’ı Oluşturma ve Spark, Docker ve Airflow Ortamı Kurulumu

Enes Öztürk
9 min readMay 21, 2024

--

Docker, Airflow ve Spark gibi araçlar veri işleme süreçlerini otomatikleştirmek ve yönetmek için hayati öneme sahiptir. Bu yazıda, bu güçlü araçları kullanarak geliştirme ortamınızı nasıl kuracağınızı ve model pipeline’ı nasıl oluşturulup otomatikleştirildiğini detaylı bir şekilde açıklayacağım.

Sonuç olarak aşağıdaki çıktıya ulaşmış olacağız.

1. Terminal: Docker Ortamının Başlatılması

1.1. Docker Compose Dosyasını Kullanmak

Öncelikle, Docker Compose dosyasını kullanarak tüm gerekli bileşenleri başlatacağız. Bu dosya, tüm servislerin (Airflow, Spark, Postgres, Minio vb.) yapılandırmasını içerir.

1.1.1. Proje Klasörüne Giriş

İlk olarak, proje klasörüne girin. Bu klasör, Docker Compose yapılandırma dosyasını ve diğer gerekli dosyaları içerir.

cd 01_airflow_spark_sqoop

1.1.2. Docker Compose Kullanarak Servisleri Başlatma

Docker Compose dosyasını kullanarak servisleri başlatın ve inşa edin:

docker-compose up --build -d

Docker Compose yapılandırmasını kullanarak konteynerleri başlatır ve arka planda çalışmasını sağlar.

1.2. Konteynerlerin Durumunu Kontrol Etme

Docker konteynerlerinin düzgün çalıştığından emin olun:

docker ps --format "table {{.ID}}\t{{.Names}}\t{{.Status}}"
```
CONTAINER ID NAMES STATUS
c7e19ec72150 airflow-scheduler Up 7 minutes (healthy)
7029aa5d8bb5 airflow-webserver Up 7 minutes (healthy)
840940251eca sqoop Up 7 minutes
5a8996d4a374 spark_client Up 7 minutes
4cdc8a2487de postgres Up 7 minutes (healthy)
a1627aecee8c minio Up 7 minutes
```

Çalışan konteynerlerin durumunu tablo formatında gösterir. Sağlıklı durumda olduklarını doğrulayın.

1.2.1. Logları İnceleme

Airflow scheduler’ın düzgün çalıştığını kontrol etmek için logları inceleyin:

docker logs -f airflow-scheduler

Scheduler konteynerinin loglarını takip etmenizi sağlar ve olası hataları görmenize yardımcı olur.

1.3. Airflow Konteynerine Erişim

Airflow scheduler konteynerine bağlanarak dosya yapılandırmasını kontrol edebiliriz:

docker exec -it airflow-scheduler bash

1.3.1. DAG Dosyalarının Yerini Kontrol Etme

Airflow, DAG dosyalarını belirli bir dizinde arar. Bu dizini öğrenmek için airflow.cfg dosyasını inceleyin:

cat airflow.cfg | grep dags_folder
```
dags_folder = /opt/airflow/dags
```

DAG dosyalarının bulunduğu dizini gösterir (genellikle /opt/airflow/dags).

1.3.2. DAG Kontrol Süresini Öğrenme

Airflow’un DAG dosyalarını ne sıklıkla kontrol ettiğini öğrenmek için:

cat airflow.cfg | grep dag_dir_list_interval
```
dag_dir_list_interval = 300
```

Airflow’un her 300 saniyede bir DAG dizinini kontrol ettiğini gösterir. Bu süreyi ortam değişkenleriyle değiştirebilirsiniz.

1.4. Airflow Web Arayüzüne Erişim

Airflow web arayüzüne tarayıcınızdan şu adresle erişebilirsiniz:

http://localhost:8080

Giriş bilgileri:

AIRFLOW__WWM_USER_USERNAME=airflow
AIRFLOW__WWM_USER_PASSWORD=airflow

Bu arayüz üzerinden DAG’ları yönetebilir, çalıştırabilir ve izleyebilirsiniz.

2. Terminal: Spark Client Konteynerine Bağlanma ve Paket Kurulumu

2.1. Spark Client Konteynerine SSH ile Bağlanma

Spark client konteynerine bağlanarak gerekli paketleri kurabilir ve ortamı yapılandırabilirsiniz:

docker exec -it spark_client bash

2.1.1. Yeni Bir Klasör Oluşturma

Spark kodlarını ve diğer veri işleme kodlarını saklamak için bir klasör oluşturun:

mkdir dataops/
cd dataops

2.1.2. Sanal Ortam (virtualenv) Kurulumu

Python sanal ortam yönetimi için virtualenv paketini yükleyin:

python3 -m pip install virtualenv

2.1.3. Sanal Ortamın Oluşturulması ve Aktifleştirilmesi

Yeni bir sanal ortam oluşturun ve aktifleştirin:

python3 -m virtualenv airflowenv
source airflowenv/bin/activate

2.1.4. Gerekli Python Paketlerini Kurma

Proje için gerekli Python paketlerini yükleyin:

pip install jupyterlab pandas pyspark boto3

2.2. SSH Server ve sudo Kurulumu

SSH server ve sudo paketlerini yüklemek için:

apt update && apt install openssh-server sudo -y

2.2.1. Yeni Kullanıcı Oluşturma

SSH ile bağlanmak için yeni bir kullanıcı oluşturun ve şifresini ayarlayın:

useradd -rm -d /home/ssh_train -s /bin/bash -g root -G sudo -u 1000 ssh_train
echo 'ssh_train:Ankara06' | chpasswd

2.2.2. SSH Servisini Başlatma

SSH servisinin başlatılması için:

service ssh start

2.3. Jupyter Lab Başlatma

Jupyter Lab’i başlatmak için:

jupyter lab --ip 0.0.0.0 --port 8888 --allow-root

Jupyter Lab’i belirtilen IP ve port üzerinden erişilebilir hale getirir.

3. Terminal: Geliştirme Ortamının Kurulumu

3.1. Geliştirme Dizini Oluşturma ve Dosyaları Hazırlama

Geliştirme dizinini oluşturun ve gerekli Python dosyalarını oluşturun:

mkdir airflow_play/
cd airflow_play
touch airflow.py preprocess_data.py build_pipeline.py main.py

3.2. Dosyaları Konteynerlere Kopyalama

Geliştirme dosyalarını ilgili konteynerlere kopyalayın:

docker cp preprocess_data.py spark_client:/
docker cp build_pipeline.py spark_client:/
docker cp main.py spark_client:/
docker cp airflow.py airflow-scheduler:/opt/airflow/dags

Bu adımlar, Spark ve Airflow konteynerlerine kopyalayarak çalışma ortamını hazır hale getirir. Burada dikkat edilmesi gereken en önemli nokta, kodlarımız Airflow kaynaklarını kullanmadan sadece Spark kaynaklarını kullanmaktadır.

Docker, Airflow, Spark ve diğer bileşenlerle çalışmak için geliştirme ortamınızı nasıl kuracağınızı adım adım öğrendiniz. Docker Compose kullanarak ortamınızı kolayca başlattınız, Airflow ve Spark’ı yapılandırdınız ve gerekli Python paketlerini kurdunuz. Artık model kurma süreçlerinizi bu güçlü araçlar sayesinde otomatikleştirebilir ve yönetebilirsiniz.

Airflow ile SSH Üzerinden Spark İşlemleri (airflow.py)

Apache Airflow, veri işlem hatlarını (data pipelines) düzenlemek ve yönetmek için yaygın olarak kullanılan güçlü bir araçtır. Airflow kullanarak SSH üzerinden Spark işlemlerini nasıl gerçekleştirebileceğimizi göstereceğiz. Amacımız veri ön işleme, veri hattı oluşturma ve ana iş akışını çalıştıran bir DAG (Directed Acyclic Graph) oluşturmaktır.

Öncelikle, bu projede belirli tarihlerde çalışacak bir DAG tanımladık. DAG’ımız, belirli görevlerin sırasıyla yürütülmesini sağlar. Bu görevler, SSH bağlantısı kullanarak uzak bir sunucuda çalıştırılan Spark komutlarını içerir. Her bir görev, Airflow’un SSHOperator kullanılarak tanımlanmıştır ve bir Spark jobu çalıştırır.

Projemizin başlangıcında, gerekli ortamın ve bağımlılıkların tanımlanması önemlidir. source /dataops/airflowenv/bin/activate komutu ile sanal bir Python ortamı aktif hale getirilir. Bu ortamda gerekli olan tüm Python paketleri ve bağımlılıklar önceden yüklenmiş olmalıdır. Ardından, spark-submit komutu kullanılarak Spark jobları çalıştırılır. Bu komut, Delta Lake ile entegrasyonu sağlayan paketleri içerir ve veri işleme işlemlerini gerçekleştirmek için kullanılır.

Projede üç ana görev tanımladık:

  1. Veri Ön İşleme (preprocess_data_task):/preprocess_data.py script'ini çalıştırarak ham veriyi işleyip temizler. Temizlenen veri, daha sonraki adımlar için hazırlanır.
  2. Veri Hattı Oluşturma (build_pipeline_task):/build_pipeline.py script'ini çalıştırarak veri hattını oluşturur. Veri hattı, verinin işlenip analiz edilmesi için gerekli adımları içerir.
  3. Ana İş Akışı (main_task):/main.py script'ini çalıştırarak ana iş akışını yönetir. Bu adım, veri analizi ve raporlama gibi nihai işlemleri içerir.

Airflow’un esnek yapısı sayesinde görevler arasında bağımlılıklar tanımlayabiliriz. preprocess_data_task, build_pipeline_task görevinden önce çalıştırılır ve sonrasında main_task görevine geçilir. Bu sıralama, verinin doğru bir şekilde işlenmesini ve analiz edilmesini sağlar.

Apache Airflow kullanarak SSH üzerinden Spark joblarını nasıl çalıştırabileceğimizi öğrendik. Airflow’un güçlü zamanlama ve iş akışı yönetim yetenekleri, veri mühendisliği ve veri bilimi projelerinde büyük kolaylık sağlar. SSHOperator ile uzak sunucularda güvenli bir şekilde komutlar çalıştırabilir ve büyük veri işleme görevlerini otomatikleştirebiliriz.

Airflow’un sunduğu esneklik ve ölçeklenebilirlik, veri işleme süreçlerini optimize etmek ve yönetmek için ideal bir çözüm sunar. Bu projede öğrendiklerimizi daha geniş veri işleme hatlarına uygulayarak, veri analizi ve raporlama süreçlerimizi daha verimli hale getirebiliriz.

from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow import DAG
start_date = datetime(2024, 3, 2)default_args = {
'owner': 'airflow',
'start_date': start_date,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
ssh_command = '''
source /dataops/airflowenv/bin/activate &&
spark-submit --master local[*] --packages io.delta:delta-core_2.12:2.4.0 {script_path} -ep http://minio:9000 -aki dataops -sak Ankara06 -sfu https://raw.githubusercontent.com/enessoztrk/datatest/main/mock_data.csv
'''
with DAG('airflow_data_pipeline', default_args=default_args, schedule_interval='@once', catchup=False) as dag:
preprocess_data_task = SSHOperator(
task_id='preprocess_data_task',
ssh_conn_id='spark_ssh_conn',
command=ssh_command.format(script_path='/preprocess_data.py')
)
build_pipeline_task = SSHOperator(
task_id='build_pipeline_task',
ssh_conn_id='spark_ssh_conn',
command=ssh_command.format(script_path='/build_pipeline.py')
)
main_task = SSHOperator(
task_id='main_task',
ssh_conn_id='spark_ssh_conn',
command=ssh_command.format(script_path='/main.py')
)
preprocess_data_task >> build_pipeline_task >> main_task

Veri Ön İşleme (preprocess_data.py)

Amacımız, verileri dönüştürüp temizleyerek analiz ve modelleme için uygun hale getirmektir. İşlem adımlarımız şunlardır:

  1. Veri Türlerinin Dönüştürülmesi: Tarih ve sayısal sütunların uygun veri türlerine dönüştürülmesi.
  2. Feature Seçilmesi: Analiz için gerekli sütunların seçilmesi.
  3. Eksik Değerlerin Doldurulması: Eksik veya hatalı verilerin uygun stratejilerle doldurulması.
  4. Kategorik Değerlerin Düzenlenmesi: Cinsiyet sütunundaki hatalı değerlerin düzeltilmesi ve eksik değerlerin doldurulması.

Veri Türlerinin Dönüştürülmesi

İlk adım, veri sütunlarının uygun veri türlerine dönüştürülmesidir. Tarih sütunu date tipine, yaş, tıklama sayısı ve reklam fiyatı sütunları integer tipine dönüştürülür. Bu dönüşümler, veri analizi ve modelleme işlemleri için gereklidir.

df_s3 = df.withColumn("date", F.to_date(F.col("date"), "dd/MM/yyyy")) \
.withColumn("age", df["age"].cast("integer")) \
.withColumn("ads_click_count", df["ads_click_count"].cast("integer")) \
.withColumn("ads_price", df["ads_price"].cast("integer"))

Özelliklerin Seçilmesi

Analiz için gerekli olan sütunlar seçilir. Bu örnekte, customer_id, age, ads_click_count, ads_price, country ve gender sütunlarını seçiyoruz.

selected_features = ['customer_id', 'age', 'ads_click_count', 'ads_price', 'country', 'gender']
df_s3 = df_s3.select(selected_features)

Eksik Değerlerin Doldurulması

Yaş sütununda eksik veya sıfır değerler bulunabilir. Bu değerler, yaşın ortalaması ile doldurulur. Bu adım, veri analizi sırasında eksik değerlerin sorun yaratmasını engeller.

mean_age = df_s3.select(F.mean('age')).collect()[0][0]
df_s3 = df_s3.withColumn('age', F.when(df_s3['age'] == 0, mean_age).otherwise(df_s3['age']).cast(IntegerType()))

Reklam tıklama sayısı ve reklam fiyatı sütunlarındaki eksik değerler, medyan değeri ile doldurulur. Medyan, aşırı değerlerin etkisini azaltarak daha doğru bir veri seti oluşturur.

imputer = Imputer(strategy='median', inputCols=['ads_click_count', 'ads_price'], outputCols=['ads_click_count', 'ads_price'])
df_s3 = imputer.fit(df_s3).transform(df_s3)

Kategorik Değerlerin Düzenlenmesi

Cinsiyet sütunundaki hatalı değerler, Male ve Female değerlerine dönüştürülür. Bilinmeyen veya eksik cinsiyet değerleri Unknown olarak işaretlenir.

df_s3 = df_s3.withColumn("gender", F.when((df_s3["gender"] == "Male") | 
(df_s3["gender"] == "Female"),
df_s3["gender"]
).otherwise("Unknown"))
df_s3 = df_s3.fillna({'gender': 'Unknown'})

PySpark kullanarak veri ön işleme adımlarını gerçekleştirdik. Veri türlerini dönüştürdük, gerekli sütunları seçtik, eksik değerleri doldurduk ve kategorik verileri düzenledik. Bu adımlar, veri setimizi analiz ve modelleme için uygun hale getirdi.

Veri ön işleme, makine öğrenimi ve veri bilimi projelerinin temel taşlarından biridir. Doğru şekilde gerçekleştirilen veri ön işleme, model performansını artırır ve daha doğru sonuçlar elde etmenizi sağlar.

Data Pipeline Oluşturma (build_pipeline.py)

Müşteri verilerini işleyip makine öğrenimi modelleri için uygun hale getiren bir veri işleme pipeline oluşturacağız. Veri özelliklerinin birleştirilmesi, ölçeklendirilmesi ve kategorik verilerin kodlanması gibi işlemleri içerecektir.

Feature Hazırlanması

Öncelikle, işleme tabi tutulacak sütunları belirliyoruz. Bu örnekte, ads_click_count ve ads_price sütunlarını normalizasyon için seçiyoruz.

norm_col = ["ads_click_count", "ads_price"]

Feature Birleştirilmesi

VectorAssembler, birden fazla sütunu birleştirerek tek bir özellik vektörü oluşturur. Bu adım, modelleme sürecinde kullanılacak verilerin uygun formatta olmasını sağlar.

assembler = (VectorAssembler()
.setHandleInvalid("skip")
.setInputCols(norm_col)
.setOutputCol("unscaled_features"))

Feature Ölçeklendirilmesi

StandardScaler, özellikleri standart hale getirerek ölçekler arası farklılıkları minimize eder. Bu adım, özellikle gradyan tabanlı algoritmalar için önemlidir.

scaler = StandardScaler() \
.setInputCol("unscaled_features") \
.setOutputCol("features")

Kategorik Verilerin Kodlanması

Kategorik veriler, makine öğrenimi algoritmaları tarafından doğrudan işlenemez. Bu nedenle, bu verilerin sayısal formatta kodlanması gereklidir. StringIndexer, kategorik verileri sayısal indekslere dönüştürür.

gender_indexer = StringIndexer(inputCol="gender", outputCol="indexed_gender")
country_indexer = StringIndexer(inputCol="country", outputCol="indexed_country")

OneHotEncoder, indekslenmiş kategorik verileri ikili (binary) vektörlere dönüştürerek modelleme sürecine uygun hale getirir.

gender_encoder = OneHotEncoder(inputCol="indexed_gender", outputCol="encoded_gender")
country_encoder = OneHotEncoder(inputCol="indexed_country", outputCol="encoded_country")

Pipeline Oluşturulması

Tüm bu aşamaları bir araya getirerek bir Pipeline oluşturuyoruz. Pipeline, verinin ardışık adımlarla işlenmesini sağlayan bir yapı sunar.

pipeline_obj = Pipeline().setStages([gender_indexer, country_indexer, gender_encoder, country_encoder, assembler, scaler])

Pipeline modelini veri setimize uygulayarak eğitiyoruz.

pipeline_model = pipeline_obj.fit(df)

PySpark kullanarak bir veri işleme boru hattı nasıl oluşturulacağını öğrendik. Verileri birleştirip ölçeklendirdik, kategorik verileri kodladık ve tüm bu adımları bir boru hattı içinde topladık.

Main Workflow Oluşturma (main.py)

Bu kodda ise, PySpark kullanarak bir veri işleme boru hattı oluşturma ve S3 ile etkileşim kurma adımlarını hızlıca gözden geçireceğiz. Bu süreçte, verileri GitHub’dan veya veri tabanından çekip, S3'e yükleyip, Spark DataFrame’lerine dönüştürüp işleme adımlarını içeren bir uygulama geliştireceğiz.

Öncelikle gerekli kütüphaneleri ve Spark ortamını başlatıyoruz.

import findspark
findspark.init("/opt/spark")
import boto3
import logging
import botocore
import pandas as pd
from pyspark.sql import SparkSession
from build_pipeline import build_pipeline
from preprocess_data import preprocess_data

Spark Oturumu Oluşturma

Spark oturumu, veri işleme görevlerini gerçekleştireceğimiz ana bileşendir.

spark = SparkSession.builder \
.appName("Data&MLOps Engineer | Enes Ozturk") \
.master("local[2]") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0,org.postgresql:postgresql:42.7.1") \
.getOrCreate()

S3 Client Oluşturma

Verileri S3'ten alıp yazmak için bir S3 istemcisine ihtiyacımız var.

def get_s3_client():
return boto3.client('s3',
endpoint_url='http://minio:9000',
aws_access_key_id='dataops',
aws_secret_access_key='Ankara06',
config=botocore.client.Config(signature_version='s3v4'))

Veri Yükleme ve Kaydetme Fonksiyonları

Verileri GitHub’dan yükleyip S3'e kaydediyoruz.

def load_df_from_github(url):
try:
logging.info(f"Loading data from {url}")
return pd.read_csv(url, low_memory=False)
except Exception as e:
logging.exception("Error loading data from GitHub", e)
def write_df_to_s3(df, bucket, key):
s3 = get_s3_client()
try:
csv_buffer = df.to_csv(index=False)
s3.put_object(Bucket=bucket, Key=key, Body=csv_buffer)
logging.info(f"{key} saved to S3 bucket {bucket}")
except Exception as e:
logging.exception("Error writing DataFrame to S3", e)

S3'ten Veri Yükleme

S3'ten veriyi çekip Pandas DataFrame olarak yüklüyoruz.

def load_df_from_s3(bucket, key):
s3 = get_s3_client()
try:
logging.info(f"Loading {bucket}/{key}")
obj = s3.get_object(Bucket=bucket, Key=key)
return pd.read_csv(obj['Body'], low_memory=False)
except botocore.exceptions.ClientError as err:
logging.exception("Error loading data from S3", err)

Ana İşlem Bloğu

Veriyi yükleyip, ön işleme tabi tutup, sonrasında pipelinedan geçiriyoruz.

s3 = get_s3_client()
bucket_name = "airflow-demo"
# Bucket kontrolü ve oluşturma
existing_buckets = s3.list_buckets()['Buckets']
existing_bucket_names = [bucket['Name'] for bucket in existing_buckets]
if bucket_name not in existing_bucket_names:
s3.create_bucket(Bucket=bucket_name)
# Veriyi GitHub'dan yükleyip S3'e kaydetme
github_url = 'https://raw.githubusercontent.com/enessoztrk/datatest/main/mock_data.csv'
df = load_df_from_github(github_url)
write_df_to_s3(df, bucket='airflow-demo', key='mock_data.csv')
# S3'ten veriyi yükleyip Spark DataFrame'e dönüştürme
df_s3 = load_df_from_s3(bucket='airflow-demo', key='mock_data.csv')
df_s3 = spark.createDataFrame(df_s3)
df_s3 = preprocess_data(df_s3)
# Boru hattını oluşturma ve veriyi işleme
pipeline_model = build_pipeline(df_s3)
df_s3 = pipeline_model.transform(df_s3)
# İşlenmiş veriyi S3'e kaydetme
write_df_to_s3(df_s3.toPandas(), bucket='airflow-demo', key='processed_data.csv')

Veriyi GitHub’dan veya veritabanından alıp S3'e yükler, Spark DataFrame’ine dönüştürüp ön işler ve son olarak pipelinedan geçirir. İşlenmiş veri tekrar S3'e kaydedilir.

En son yazılardan haberdar olmak veya benimle iletişime geçmek için:

Linkedin: https://www.linkedin.com/in/enessoztrk/

Yeni yazılarımda görüşmek üzere…🧠

--

--