Введение в PySpark

МЕНЮ


Искусственный интеллект
Поиск
Регистрация на сайте
Помощь проекту

ТЕМЫ


Новости ИИРазработка ИИВнедрение ИИРабота разума и сознаниеМодель мозгаРобототехника, БПЛАТрансгуманизмОбработка текстаТеория эволюцииДополненная реальностьЖелезоКиберугрозыНаучный мирИТ индустрияРазработка ПОТеория информацииМатематикаЦифровая экономика

Авторизация



RSS


RSS новости


Python считается из основных языков программирования в областях Data Science и Big Data, поэтому не удивительно, что Apache Spark предлагает интерфейс и для него. Data Scientist’ы, которые знают Python, могут запросто производить параллельные вычисления с PySpark. Читайте в нашей статье об инициализации Spark-приложения в Python, различии между Pandas и PySpark, доступных форматов для чтения и записи, а также интеграция с базами данных.

Инициализация через SparkContext, SparkConf и SparkSession

В первую очередь, Spark создает SparkContext — объект, который определяет, как получить доступ к кластеру в момент выполнения программы. Также определяются параметры конфигурации через SparkConf. К ним может относиться кластерный менеджер (master), с которым соединяется приложение через URL, название приложения, количество ядер и т.д (с полным списком можно ознакомиться в документации). Вот так может выглядеть инициализация Spark:

conf = SparkConf().setAppName('appName').setMaster('local[*]')  sc = SparkContext(conf=conf)  spark = SparkSession(sc)  

Начиная со Spark 2.0, был представлен SparkSession, который служит единой точкой входа и устраняет необходимость явно создавать SparkConf и SparkContext, поскольку они инкапсулируются в SparkSession. Все функции, которые были доступны в SparkContext, теперь доступны в SparkSession. Например, код выше может быть переписан следующим образом:

spark = SparkSession.builder   	.master('local[*]')   	.appName('appName')   	.getOrCreate()  

Pandas и PySpark

Python-библиотека Pandas — главный инструмент Data Scientist’а при работе с данными. Интерфейсы Pandas и PySpark имеют множество сходств, поэтому тем, кто уже знаком с Pandas, не трудно понять и PySpark. В частности, оба используют DataFrame для представления табличных данных, отсюда следует множество схожих методов. При разных названиях, они пересекаются с операциями SQL: методы distinct, where в PySpark соответствуют методам unique, filter в Pandas.

Главное отличие между PySpark и Pandas состоит в режиме выполнения. PySpark реализует lazy execution (ленивое выполнение), в то время как Pandas — eager execution (мгновенное выполнение). Допустим, требуется загрузить данные на диске и применить к ним операции трансформации (map). Pandas выполнит их тут же. PySpark же сохранит всю последовательность необходимых операций и выполнит их в том случае, когда данные понадобятся. Ниже приведены примеры с сортировкой данных в PySpark и Pandas.

import pandas as pd  data = pd.read_csv(“file.csv”)  data = data.apply(lambda x: sorted(x)) # сразу же изменяются данные  
data = spark.read.csv(“file.csv”)  def sort_cols_asc(input_df):      return input_df.select(*sorted(input_df.columns))  data = data.transform(sort_cols_asc) # в ожидании выполнения операций  data.show() # вот теперь применит  

Отметим также — очень просто перейти от PySpark к Pandas и наоборот:

# Из Pandas в PySpark   spark_df = spark.createDataFrame(pandasDF)    # Из PySpark в Pandas   pandasDF = spark_df.toPandas()  

Доступные форматы для чтения и записи

PySpark поддерживает такие основные форматы, как CSV, JSON, ORC, Parquet. Разберемся с синтаксисом чтения и записи, который практически одинаковый.

  • CSV (Comma-separated values, значения, разделенные запятыми) — наиболее часто используемый формат для хранения датасетов:
    data = spark.read.csv(“file.csv”) # прочитать  data.write.csv(”file_dir”, sep=',') # записать  	
  • JSON (JavaScript Object Notation) применяется для сериализации данных, используется также в MongoDB:
    data = spark.read.json(“file.json”)  data.write.json(”file_dir”)  
  • ORC (Optimized Row Columnar) — форматхранения данных экосистем Apache Hadoop:
    data = spark.read.orc(“orc_file”)  data.write.orc()  
  • Parquet- еще один формат экосистем Apache Hadoop, который может изменяться в соответствии с изменением данных, а также поддерживает слияние схем:
    data = spark.read.parquet(“parquet_file”)  data.write.parquet(”file_dir”)  

Такие примеры записи написаны больше в Python-стиле. PySpark также поддерживает функциональный стиль программирования. Вот так, например, будет выглядеть чтение CSV-файла:

data = spark.read   	.format(“json”)   	.load(“file.json”)   	.option("header", True)  

Доступные базы данных

Также PySpark может взаимодействовать с SQL и NoSQL базами данных. Рассмотрим также доступные базы данных и их синтаксис взаимодействия. Ниже будет уже подразумеваться, что все базы данных подключены, поэтому остается только установить взаимодействие с PySpark.

  • Реляционные СУБД, например, MySQLили PostgreSQL. Для чтения необходимо указать соединение с базой данных, соответствующую таблицу и пароль:
    dbURL = "jdbc:mysql://localhost/bigdataschool"  data = spark.read.   	.format("jdbc")   	.options(  		url=dbURL, database="bigdataschool",   		dbtable='some_table', user="root",  		password="your_pass")   	.load()  
  • Cassandra- распределенная NoSQL СУБД с упором на надежность и работу с большими данными (Big Data). Здесь необходимо указать таблицу, а также пространство ключей:
    data = spark.read   	.format("org.apache.spark.sql.cassandra")   	.option(table="t2", keyspace="test")   	.load()  
  • MongoDB- также является NoSQL-СУБД, которая использует формат JSON. Нужно указать соединение с коллекцией:
    data = spark.read   	.format("com.mongodb.spark.sql.DefaultSource")  	.option("uri","mongodb://127.0.0.1/bigdataschool.courses")   	.load()  
  • ApacheHive — СУБД на основе платформы Hadoop. В методе table указывается названии таблицы в формате <база_данных>.<таблица>:
    courses = spark.table("bigdataschool.courses")  

Подобным образом, DataFrame можно записать в базу данных. Правда, для Apache Hive при инициализации SparkSession необходимо включить его поддержку, вызвав метод enableHiveSupport [1]:

spark = SparkSession       .builder       .appName("Python Spark SQL Hive integration example")       .config("spark.sql.warehouse.dir", "spark-warehouse")       .enableHiveSupport()   	.getOrCreate()  

В следующий раз рассмотрим пример выполнения SQL-операций PySpark на конкретном датасете в Google Colab. А как на практике использовать PySpark в проектах аналитики больших данных, вы узнаете на специализированном курсе «Анализ данных с Apache Spark» в нашем лицензированном учебном центре обучения и повышения квалификации разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве.


Источник: www.bigdataschool.ru

Комментарии: