В предыдущей статье я рассказал, как организовать систему распределенного машинного обучения на GPU NVidia, используя язык Java с фреймворками Spring, Spark ML, XGBoost, DJL в standalone кластере Spark. Особенностью поставленной задачи являлось организация системы под управлением ОС Windows 10 Pro, в Docker-контейнерах. Эксперимент оказался не вполне успешным. В данной статье я покажу, как воспользоваться имеющимися наработками и запустить Spark Jobs в Kubernetes в режимах client и cluster, опишу особенности работы с Cassandra в Spark, покажу пример обучения модели и ее дальнейшего использования. В этот раз буду использовать язык Kotlin. Репозиторий с кодом доступен на GitLab.
Данная статья представляет интерес для тех, кто интересуется системами Big Data и стремится создать систему, позволяющую, в том числе, выполнять задачи распределенного машинного обучения на Spark в Kubernetes, используя GPU NVidia и Cassandra для хранения данных.
Версии библиотек и фреймворков
Первое, что нужно принять во внимание при построении подобной системы - она довольно сложная, с рядом элементов, которые не совместимы друг с другом. Так как используется Rapids, использовать Boot 3 не получится - для него требуется Java 17, и, хоть Spark и поддерживает данную версию, ускоритель GPU-вычислений Rapids на момент написания статьи не поддерживает данную версию в полной мере. Скорее всего, Java 17 будет добавлена в версии, которая также обзаведется поддержкой Spark 3.4.0. По этой причине, последний Spark (на момент написания статьи 3.4.0) брать за основу не стоит. К тому же, DataStax так же не обновила свой cassandra connector, и данная версия не поддерживается.
Версии компонентов, с которыми система заработала:
JDK 8
Spring Boot 2.7.11
Spark 3.3.2
NVidia Rapids 23.04.0
Cassandra 4.1.1
PostgreSQL 15.3-1.pgdg110+1
scala-library 2.12.15
spark-cassandra-connector_2.12 3.3.0
com.fasterxml.jackson.core 2.13.5
На уровне инфраструктуры:
Kubernetes 1.26.3
ContainerD 1.7.0
NVidia GPU Operator 0.13.0
NVidia Driver 530.30.02-1 (ставится с CUDA)
CUDA - желательно не ниже 11.8 в базовом образе NVidia, в последних же драйверах по умолчанию 12+
Ubuntu 22.04.2
Конфигурация стенда:
Узел
CPU
RAM
GPU
Адрес в ЛВС
master1
Intel i7-2700k
16 Gb
NVidia GTX 1650 4 Gb
192.168.0.150
worker1
AMD 3800x
32 Gb
NVidia RTX 2600 6 Gb
192.168.0.125
Особенности
Не использовать spring-boot-starter-parent. Он тянет за собой множество библиотек, которые конфликтуют с библиотеками Spark. В том числе, для последующего перехода на Java 17 (выпустят же когда-нибудь разработчики Rapids библиотеку, поддерживающую данную версию) лучше не использовать Tomcat, и использовать Undertow (на момент написания статьи были проблемы с classloader'ами Rapids и Tomcat, а на Undertow успешно запустилось). Но и тут есть особенности: необходимо подключить корректную версию Jackson (указана выше) и ряд библиотек jakarta и javax servlet:
Подключить scala-library 2.12.15. Зависимость spark-cassandra-connector_2.12 3.3.0 использует версию 2.12.11, а Spark 3.3.2 - 2.12.15. Если не заменить, будут конфликты SerialVersionUID. Чтобы заменить, в pom.xml просто нужно поставить scala-library перед spark-cassandra-connector.
Могут быть другие нюансы совместимости компонентов, следует быть готовым к отладке. Полностью рабочую конфигурацию можно посмотреть в репозитории.
Подготовка инфраструктуры
В этот раз постараюсь этот вопрос изложить покороче, но на важных моментах остановлюсь подробнее. Подготовка образа executor подробно описана в предыдущей статье, но в этой сам метод сборки будет отличаться. В первую очередь нужно подготовить кластер Kubernetes. Описывать установку не буду, скажу лишь, что взял KubeSpray и раскатал 1.26.3.
Начиная с версии Spark 3.1.1, поддержка Spark в Kubernetes доведена production-ready. Это означает, что появлилась возможность использовать Kubernetes как менеджер кластера Spark, указывая в качестве мастера Kube API Server:
k8s://https://${KUBE_API_SERVER}:6443
или
k8s://https://kubernetes.default.svc
Теперь, при запуске Spark Job, в кластере K8S будут запускаться Spark Executors с указанным Docker образом, ресурсами и прочими необходимыми конфигами. Если образ уже находится в локальном репозитории машины, POD поднимается довольно быстро, на моих машинах в течении 5 секунд после старта приложения поднимались 2 экзекуктора, полностью готовых к работе.
В связке со Spring приложением есть возможность создать Spring Bean с JavaSparkContext/SparkSession. Однако, есть другой способ: создавать SparkSession под каждую Spark Job, чтобы высвобождать ресурсы кластера. Это хорошо для редких задач по расписанию / запросу, в том числе запросам с различающимся количеством необходимых ресурсов (количество Spark Executors, CPU, память, видеопамять и т.п.), но имеет накладные расходы на поднятие экзекуторов и время выполнения задач. На моей практике все команды для работы со Spark требовали “прогрева”, т.е. каждая первая операция выполнялась дольше, чем все последующие. Стоит иметь это в виду, и, если необходимо постоянно запускать одинаковые задачи по расписанию, имеет смысл зарезервировать ресурсы кластера под спарковые экзекуторы.
Nvidia GPU Operator
Для использования ресурсов GPU внутри кластера Kubernetes необходимо установить Nvidia GPU Operator. Следует выполнить:
на каждой машине должно подняться по одному поду, и, если экзекнуться в них, команда nvidia-smi должна вывести результат. Обращаю внимание, что для запуска экзекуторов не нужно поднимать их вручную, K8S Scheduler сделает это сам. Манифест приведен для примера и проверки работоспособности GPU Operator, после проверки его можно удалить.
Образ Spark executor
Следует изменить spark/kubernetes/dockerfiles/Dockerfile:
entrypoint.sh из прошлой статьи пригодится, но для локального standalone кластера из одного воркера (подходит для отладки в Windows). spark/kubernetes/dockerfiles/entrypoint.sh следует изменить до стандартного вида:
entryoint.sh
#!/bin/bash # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # echo commands to the terminal output set -ex # Check whether there is a passwd entry for the container UID myuid=$(id -u) mygid=$(id -g) # turn off -e for getent because it will return error code in anonymous uid case set +e uidentry=$(getent passwd $myuid) set -e # If there is no passwd entry for the container UID, attempt to create one if [ -z "$uidentry" ] ; then if [ -w /etc/passwd ] ; then echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd else echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID" fi fi SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*" env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=(.*)/1/g' > /tmp/java_opts.txt readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH" fi if ! [ -z ${PYSPARK_PYTHON+x} ]; then export PYSPARK_PYTHON fi if ! [ -z ${PYSPARK_DRIVER_PYTHON+x} ]; then export PYSPARK_DRIVER_PYTHON fi # If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor. # It does not set SPARK_DIST_CLASSPATH if already set, to avoid overriding customizations of this value from elsewhere e.g. Docker/K8s. if [ -n "${HADOOP_HOME}" ] && [ -z "${SPARK_DIST_CLASSPATH}" ]; then export SPARK_DIST_CLASSPATH="$($HADOOP_HOME/bin/hadoop classpath)" fi if ! [ -z ${HADOOP_CONF_DIR+x} ]; then SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH"; fi if ! [ -z ${SPARK_CONF_DIR+x} ]; then SPARK_CLASSPATH="$SPARK_CONF_DIR:$SPARK_CLASSPATH"; elif ! [ -z ${SPARK_HOME+x} ]; then SPARK_CLASSPATH="$SPARK_HOME/conf:$SPARK_CLASSPATH"; fi case "$1" in driver) shift 1 CMD=( "$SPARK_HOME/bin/spark-submit" --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client "$@" ) ;; executor) shift 1 CMD=( ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP --resourceProfileId $SPARK_RESOURCE_PROFILE_ID ) ;; *) echo "Non-spark-on-k8s command provided, proceeding in pass-through mode..." CMD=("$@") ;; esac # Execute the container CMD under tini for better hygiene exec /usr/bin/tini -s -- "${CMD[@]}"
Файл spark/kubernetes/dockerfiles/decom.sh должен выглядеть следующим образом:
decom.sh
#!/usr/bin/env bash # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # set -ex echo "Asked to decommission" # Find the pid to signal date | tee -a ${LOG} WORKER_PID=$(ps -o pid -C java | tail -n 1| awk '{ sub(/^[ ]+/, ""); print }') echo "Using worker pid $WORKER_PID" kill -s SIGPWR ${WORKER_PID} # For now we expect this to timeout, since we don't start exiting the backend. echo "Waiting for worker pid to exit" # If the worker does exit stop blocking the cleanup. timeout 60 tail --pid=${WORKER_PID} -f /dev/null date echo "Done" date sleep 1
Скрипт сборки образов, который собирает два образа - для чистого JVM и PySpark (на всякий случай):
Для драйвера необходим K8S Service Account с полными правами на неймспейс (в данном примере namespace default, но в продуктивном кластере, разумеется, следует выделять отдельный namespace). Следующий манифест создает необходимы SA и CRB:
На ряду с указанием мастера, необходимо указать параметр
spark.submit.deployMode
Значением cluster или client.
Cluster mode
В режиме работы Cluster приложение (spark driver) существует вне кластера K8S, его под не создается внутри кластера. Хорошо подходит для отладки, либо когда приложение запускается в отдельном кластере/сервере.
В режиме работы Client приложение (spark driver) существует в кластере, поднимается как POD. При этом следует указать дополнительные настройки (будет рассмотрено ниже, в разделе с конфигурацией приложения).
После применения манифеста автоматически поднимаются два Spark Executor в отдельных подах:
Cassandra
Cassandra — великолепный инструмент для хранения больших объемов данных. Это OLTP БД, те. рассчитанная на частые записи. Также особенностью этой БД является то, что модель данных нужно строить исходя из того, как будут запрашиваться данные: при не корректном задании ключей партиционирования и ключей кластеризации таблиц легко прийти к запросам с ALLOW FILTERING или организовать неравномерное заполнение узлов кластера Cassandra.
Для частных запросов на чтение больше подходят аналитические (OLAP) БД. Если посмотреть в сторону wide?column DB, то аналогом Cassandra может быть Clickhouse. Однако, бывают различные случаи принципиального выбора Cassandra как источника аналитических данных и невозможности использования Clickhouse: недостаток вычислительных ресурсов, компетенций команд разработки и администрирования, и прочие ограничения.
Результатом такого решения может быть сложность выборки необходимых данных, так как, в отличие от реляционных БД, в Cassandra нет механизма отношений и поддержки join?операций. В таком случае можно использовать Apache Spark, который позволяет выполнять запросы к Cassandra как к реляционной базе данных. Если имеется несколько Cassandra датацентров, один из них можно настроить для аналитических операций, разгрузив при этом датацентр(ы) для транзакций записи.
Связкой Spark?Cassandra можно реализовать ETL?процессы, как между таблицами Cassandra, так и в другие БД, например, в тот же Clickhouse, используя различные коннекторы Spark.
Сущности Big Data системы
Случайно или нет, статья публикуется во время сезона Big Data на Habr. Поскольку статья является руководством и ориентирована в том числе для новичков в этой области, кратко рассмотрим некоторые ключевые понятия: что такое ETL, Data Lake, DWH и Data Mart.
Data Lake — это централизованное хранилище данных, которое позволяет хранить данные в исходном формате без предварительной структуризации или преобразования. Data Lake предоставляет возможность хранить большие объемы разнородных данных, включая структурированные, полуструктурированные и неструктурированные данные. Это позволяет проводить различные виды анализа, исследований и обработки данных в будущем, когда появится необходимость.
DWH — это хранилище данных, и это специально организованная структура, предназначенная для хранения и управления данными. DWH интегрирует данные из разных источников, таких как транзакционные базы данных, файлы, веб?сервисы и другие источники данных. Он предоставляет единое и консолидированное представление данных, которое удобно для анализа и отчетности.
Data Mart (Витрина данных) представляет собой сегментированное подмножество данных из хранилища данных (т. е. является подмножеством DWH), ориентированное на конкретные потребности бизнеса или отделов компании. Data Mart содержит данные, специально организованные и структурированные для поддержки аналитических запросов и принятия решений в конкретной области или функциональном подразделении компании.
ETL (Извлечение, Трансформация и Загрузка) — это процесс, который обеспечивает передачу данных из исходных источников в целевые системы хранения данных. В процессе ETL данные извлекаются из различных источников, затем подвергаются трансформации, включающей очистку, фильтрацию, преобразование и объединение данных, и, наконец, загружаются в целевую систему хранения данных, такую как DWH или Data Mart. Процесс ETL играет важную роль в обеспечении актуальности, целостности и качества данных в аналитической системе.
Пример процесса работы с данными в рассматриваемой системе
Рассмотрим, как можно использовать получившуюся систему на конкретном примере. Предметная область — аналитика цен финансовых инструментов, торгующихся на фондовых рынках. Так система строится на основе вычислений на GPU, попробуем построить модель машинного обучения, которая предсказывает цену курса акций NVidia (тикер NVDA) на несколько периодов вперед. Анализ проводился на данных дневного таймфрейма (1 day) с предсказанием на 2 периода (2 дня) вперед.
Схематично, система выглядит следующим образом:
Красной пунктирной линией показаны ETL процессы из источников данных, которые в рассматриваемой системе можно принять за Data Lakes. Данные из них забираются сервисом Data Extractor, который развернут на k8s node pool только с CPU, и записываются в OLTP DWH кластер Cassandra.
По необходимости/расписанию/событию Analytics Service, который развернут на k8s node pool с GPU, забирает данные из нескольких таблиц DWH кластера, трансформирует их в единую структуру и записывает в Data Mart кластер Cassandra, который может быть настроен для OLAP операций. Это тоже ETL процесс, он обозначен черными сплошными линиями, выполняется посредством Spark Executors, которые назначены Analytics Service — в терминологии Spark это driver.
Говоря об инфраструктуре решения, не будет лишним упомянуть еще пару инструментов для работы в с Big Data системой:
Apache Zeppelin — мощный инструмент визуализации и анализа данных. Он предоставляет интерактивную среду для разработки, выполнения и представления результатов вычислений на основе больших объемов данных. С помощью Apache Zeppelin можно создавать и запускать ноутбуки, которые содержат код на различных языках программирования, включая Java, Scala, Kotlin и многие другие. Zeppelin обладает широким набором интегрированных визуализаций и возможностей интерактивного анализа данных, что делает его полезным инструментом для работы с результатами аналитики и машинного обучения. Zeppelin имеет возможность работы с интерпретатором Spark.
Spring Cloud Data Flow (SCDF) — это распределенная система управления потоками данных (Data Flow) в облачной среде. Она предоставляет инфраструктуру и инструменты для развертывания, управления и мониторинга сложных потоков данных между различными источниками и приемниками данных. SCDF позволяет создавать и конфигурировать потоки данных в виде графа, состоящего из различных компонентов обработки, таких как источники, преобразования, фильтры и назначения. С помощью SCDF можно управлять и масштабировать потоки данных в распределенной среде.
Оба эти инструмента, Apache Zeppelin и Spring Cloud Data Flow, могут быть полезными в контексте рассматриваемой системы для аналитики цен финансовых инструментов. Apache Zeppelin предоставит удобную среду для визуализации и анализа данных, а Spring Cloud Data Flow поможет в управлении потоками данных и обработке информации между различными компонентами системы.
Конфигурация приложения
Стоит подробнее рассмотреть конфигурацию приложения для работы со Spark.
(1) — если приложение (spark driver) не использует ресурсы GPU напрямую, следует указать в данной настройке «0», чтобы под него не резервировался необходимый Spark Executor'ам ресурс.
(2) — настройки cassandra spark connector. Для подключения экзекуторов к кластеру Cassandra следует указать данные настройки. Коннектор cassandra имеет большое число настроек, все можно найти здесь;
(3) — так как приложение может работать не только в k8s, настройки, которые предназначены только для него, не стоит указывать. Здесь должен быть указан, как минимум, необходимый для работы приложения образ spark executor, имя driver k8s Pod (можно получить с помощью InetAddress.getLocalHost().getHostName();), namespace, в котором будут создаваться executor Pods, и Service Account. Все это было создано ранее.
JavaSparkContext может быть передан в объект SparkSession, который далее будет использоваться для работы с executor'ами.
Работа с Cassandra в Spark
Получение данных
В сервисе будем использовать Dataset как объект с данными. Существуют так же RDD и Dataframe, и, почему в Java/Kotlin стоит использовать Dataset, можно почитать здесь и здесь.
Базовый метод получения данных из кассандра:
AbstractCassandraRepository
abstract class AbstractCassandraRepository constructor( private val sparkSession: SparkSession ) { companion object { internal const val keyspace: String = "instrument_data" } fun cassandraDataset(keyspace: String, table: String): Dataset<Row> { val cassandraDataset: Dataset<Row> = sparkSession.read() .format("org.apache.spark.sql.cassandra") .option("keyspace", keyspace) .option("table", table) .load() cassandraDataset.createOrReplaceTempView(table) return cassandraDataset } }
Здесь мы работаем со спарковой сессией: указываем ей, что Dataset<Row> должен быть прочитан из кассандры, указываем кейспейс и таблицу, указываем имя датасета, чтобы в последующих запросах можно было обращаться к колонкам таблицы по ее имени.
Естественно, никого не интересует получать все данные и делать full scan таблицы — у каждой есть Partition Key, по которому нужно искать необходимые данные.
Рассмотрим пример получения данных из таблицы instrument_data.time_series_history:
instrument_data.time_series_history
create table instrument_data.time_series_history ( ticker text, task_number uuid, datetime timestamp, timeframe text, close decimal, high decimal, low decimal, open decimal, volume bigint, primary key (ticker, task_number, datetime, timeframe) ) with compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}; create index time_series_history_datetime_index on instrument_data.time_series_history (datetime); create index time_series_history_task_number_index on instrument_data.time_series_history (task_number); create index time_series_history_timeframe_index on instrument_data.time_series_history (timeframe);
У нее имеется Partition Key ticker и три Clustering Column:
task_number - номер задания на получение и анализ данных в смежной системе;
datetime - дата и время записи;
timeframe - таймфрейм временного ряда (1day в рассматриваемом случае).
Задача — доставать данные по ticker, task_number и datetime between (dateStart, dateEnd) из всех таблиц.
Таким образом, можно обратиться к предыдущему методу и отфильтровать значения. Функционал данного метода будет аналогичен запросу данных из таблицы с указанными полями. При этом Spark не будет доставать весь датасет, затем его фильтровать — в экзекуторы попадут уже отфильтрованные данные.
Данный метод аналогичен запросу:
select * from instrument_data.time_series_history where ticker = ?1 AND task_number = ?2 AND datetime > ?3 and datetime < ?4;
Если не передавать partition key, увеличение времени запроса можно отследить на DAG запроса в Spark UI. Это лежит на поверхности, можно очень быстро разобраться, на получении каких данных запрос тормозит работу.
Чтобы выполнить select по определенным столбцам, уже следует создать репозиторий для отдельной таблицы и выполнить запрос следующим образом:
TimeSeriesRepository
@Component class TimeSeriesRepository( sparkSession: SparkSession ) : AbstractIndicatorRepository(sparkSession, "time_series_history") { fun getDataset( ticker: String, taskNumber : UUID, dateStart : LocalDate, dateEnd : LocalDate ) : Dataset<Row> { val dataset = getBaseDataSet(ticker, taskNumber, dateStart, dateEnd) .selectExpr( "to_date(datetime) as dateTime", "CAST(close AS Double) as close" ) dataset.createOrReplaceTempView("ts") return dataset } }
Тогда конечный датасет вернет таблицу из двух столбцов с датой и временем и ценой закрытия, преобразованной в Double. Double необходим при передаче данных на вычисление в GPU.
Здесь у Java/Kotlin разработчика может возникнуть желание сделать запрос данных через CompletableFuture<Dataset<Row>> и выполнить их все в одном тредпуле. Однако, не стоит этого делать. Спарк сам распараллеливает работу и создает FutureTask's, программисту не стоит распараллеливать код на разные потоки.
и комбинируем:
combineDatasets
private fun combineDatasets( timeSeries : Dataset<Row>, emaDataSet : Dataset<Row>, stochasticDataset : Dataset<Row>, bBandsDataset : Dataset<Row>, macdDataset : Dataset<Row>, rsiDataset : Dataset<Row>, smaDataset : Dataset<Row>, willrDataset : Dataset<Row> ): Dataset<Row> { val result = timeSeries .join(emaDataSet, timeSeries.col("datetime") .equalTo(emaDataSet.col("datetime")), "leftouter") .join(stochasticDataset, timeSeries.col("datetime") .equalTo(stochasticDataset.col("datetime")), "leftouter") .join(bBandsDataset, timeSeries.col("datetime") .equalTo(bBandsDataset.col("datetime")), "leftouter") .join(macdDataset, timeSeries.col("datetime") .equalTo(macdDataset.col("datetime")), "leftouter") .join(rsiDataset, timeSeries.col("datetime") .equalTo(rsiDataset.col("datetime")), "leftouter") .join(smaDataset, timeSeries.col("datetime") .equalTo(smaDataset.col("datetime")), "leftouter") .join(willrDataset, timeSeries.col("datetime") .equalTo(willrDataset.col("datetime")), "leftouter") .selectExpr( "ts.dateTime as dateTime", "ema.emaTimePeriod as emaTimePeriod", "ema.ema as ema", "stoch.fastKPeriod as fastKPeriod", "stoch.slowKPeriod as slowKPeriod", "stoch.slowDPeriod as slowDPeriod", "stoch.slowD as slowD", "stoch.slowK as slowK", "bb.timePeriod as bbTimePeriod", "bb.sd as bbSd", "bb.lowerBand as lowerBand", "bb.middleBand as middleBand", "bb.upperBand as upperBand", "macd.signalPeriod as signalPeriod", "macd.fastPeriod as fastPeriod", "macd.slowPeriod as slowPeriod", "macd.macd as macd", "macd.macdHist as macdHist", "macd.macdSignal as macdSignal", "rsi.timePeriod as rsiTimePeriod", "rsi.rsi as rsi", "sma.timePeriod as smaTimePeriod", "sma.sma as sma", "willr.timePeriod as willrTimePeriod", "willr.willr as willr", "ts.close as close" ) val windowSpec = Window.orderBy(functions.asc("dateTime")) return result .withColumn("id", functions.row_number().over(windowSpec)) .filter(functions.col("dateTime").isNotNull) }
В методе combineDatasets происходит объединение таблиц, которое Cassandra не поддерживает, а Spark предоставляет такую возможность.
В результирующий датасет добавляется колонка id, которая содержит номер строки. Здесь же предварительно сортируются данные по колонке dateTime и отфильтровываются строки с пустыми ячейками.
Стоит отметить, что есть иной способ работы с Dataset — можно обратиться к объекту sparkSession, вызвать метод sql() и передать в него обычный SQL запрос на объединение данных, и он будет работать.
Если все?таки предпочтительно использовать SQL запросы, можно это делать, различий в скорости работы в моих без учета предварительных фильтров для датасетов не было замечено. Spark поддерживает все основные функции, с единственной оговоркой — не стоит писать свои собственные функции (UDF) и стараться выполнить запрос, это плохо влияет на производительность. За помощью в этом вопросе следует обратиться к оф. документации. Spark понимает ANSI формат. Если нужно перевести запрос, написанный на другом диалекте, можно воспользоваться инструментом.
В абзаце выше не просто так выделено полужирным с подчеркиванием одно важное условие. Допустим, мы получили датасет timeSeries, и сразу же хотим через sparkSession.sql() нативным запросом сджойнить остальные таблицы. Опытным путем установлено, что sparkSession при запросе через метод sql() с переданным нативным запросом делает фильтр только по основной таблице timeSeries, и не применяет фильтры для остальных таблиц, тем самым получая все данные из присоединяемых таблиц кассандры, что самым негативным образом сказывется на производительности — от 2 до 3 и более раз.
Использовать sparkSession.sql() можно, но только на тех данных, которые невозможно предварительно отфильтровать при выполнении основной логики джобы. А еще лучше — не использовать, и писать запросы через методы.
Здесь происходит получение основных данных, затем они передаются в метод datasetWithLabel. Мы хотим натренировать модель машинного обучения, и нам нужен label - колонка со значениями цены закрытия торгов финансовым инструментом на дату offset позднее (в нашем случае 2 дня).
datasetWithLabel
private fun datasetWithLabel( mainDataset: Dataset<Row>, offset: Long ): Dataset<Row> { val labelDataset = getLabelDatasetFromMain(mainDataset, offset) val combinedDataset = labelDataset .join( mainDataset, labelDataset.col("id_eval") .equalTo(mainDataset.col("id")), LEFT_OUTER_JOIN ) .withColumn("dateTimeUnix", functions.unix_timestamp(functions.col("dateTime"))) .withColumn("labelDateTimeUnix", functions.unix_timestamp(functions.col("labelDateTime"))) return combinedDataset .withColumn( "id", functions.coalesce(combinedDataset.col("id_eval"), combinedDataset.col("id")) ) .drop("id_eval") .selectExpr(*allColumns) .filter(functions.col("dateTime").isNotNull) .orderBy(functions.asc("dateTime")) } private fun getLabelDatasetFromMain( mainDataset: Dataset<Row>, offset: Long ): Dataset<Row> { val windowSpec = Window.orderBy(functions.asc("dateTime")) val labelDateTimeColumn = functions.lead(mainDataset.col("dateTime"), offset.toInt()).over(windowSpec) val labelColumn = functions.lead(mainDataset.col("close"), offset.toInt()).over(windowSpec) val labelDataset = mainDataset .withColumn("labelDateTime", labelDateTimeColumn) .withColumn("label", labelColumn) .filter(functions.col("labelDateTime").isNotNull) .select("labelDateTime", "label") .orderBy("labelDateTime") return labelDataset .withColumn("id_eval", functions.row_number() .over(Window.orderBy(functions.asc("labelDateTime")))) .select("id_eval", "labelDateTime", "label") }
В методе getLabelDatasetFromMain мы используем полученный исходный датасет для как источник данных для новых колонок - используя функцию lead
val labelDateTimeColumn = functions.lead(mainDataset.col("dateTime"), offset.toInt()).over(windowSpec) val labelColumn = functions.lead(mainDataset.col("close"), offset.toInt()).over(windowSpec)
Мы определяем дату и цену финансового инструмента на offset значений вперед. Следует понимать, что на бирже ведутся не каждый день, есть выходные и праздничные дни, когда биржа не работает. Таким образом, в исходных датасетах всегда будут промежутки по датам. Можно решать это путем модификации данных — заполнять промежутки с отсутствующими датами ценой последнего закрытия. А можно воспользоваться функцией lead() на имеющемся датасете. В данном примере я пошел именно таким путем, заодно получился хороший пример использования функций spark sql и составления датасетов. В этом же методе присоединяется новая колонка с id_eval.
Затем в методе datasetWithLabel два датасета объединяются по значениям id_eval и id, в результате чего получается финальный датасет.
Нам интересно получать данные батчами и перекладывать их в новую таблицу.
У меня получился такой не хитрый код для этой задачи:
DataTransformService
@Service class DataTransformService( private val dataReaderService: DataReaderService, private val combinedDataRepository : CombinedDataRepository ) { companion object { private val log = LogManager.getLogger(this::class.java.name) } fun transformData( ticker : String, taskNumber : UUID, dateStart : LocalDate, dateEnd : LocalDate, offset : Long, batchSize : Int ) { var currentBatchOffset = 0 var i = 0 var tdf: Dataset<Row>? do { log.info("Data transformation: task {}, iteration {},: currentOffset {}", taskNumber,i + 1, currentBatchOffset) tdf = dataReaderService.getDatasetWithLabel( ticker, taskNumber, dateStart, dateEnd, offset, currentBatchOffset, batchSize ) if (tdf.isEmpty) break combinedDataRepository.saveData(tdf.selectExpr(*DataReaderService.allColumns), ticker, taskNumber) currentBatchOffset += batchSize i++ } while (tdf?.isEmpty == false) log.info("Data transformation of task {} completed", taskNumber) } }
в методе combinedDataRepository.saveData(tdf.selectExpr(*DataReaderService.allColumns), ticker, taskNumber) производится сохранение полученного батча.
Сохранение данных из Spark в Cassandra
Сохранение в Cassandra также производится через коннектор:
здесь, кроме уже очевидных параметров, есть два заслуживающих внимания:
(1) - режим, при котором производится запись. append - добавляет данные;
(2) - подтверждение удаления ранее записанных данных. Здесь выставлен false, и, в сочетании с предыдущей настройкой, спарк не будет стирать данные в кассандре при записи. С настройками
имеется возможность очистить таблицу перед записью новых данных.
Перед записью предварительно можно подшаманить датасет для совпадения столбцов и прочих нужд:
CombinedDataRepository
@Component class CombinedDataRepository(sparkSession: SparkSession) : AbstractIndicatorRepository(sparkSession, "combined_data") { fun saveData( dataset : Dataset<Row>, ticker : String, taskNumber : UUID ) { val modifiedDataset = dataset .withColumn("dateTime", from_unixtime(col("dateTimeUnix"))) .selectExpr( "dateTime as datetime", "bbTimePeriod as bbands_time_period", "bbSd as sd", "emaTimePeriod as ema_time_period", "ema", "fastKPeriod as fast_k_period", "slowKPeriod as slow_k_period", "slowDPeriod as slow_d_period", "slowD as slow_d", "slowK as slow_k", "lowerBand as lower_band", "middleBand as middle_band", "upperBand as upper_band", "signalPeriod as macd_signal_period", "fastPeriod as macd_fast_period", "slowPeriod as macd_slow_period", "macd", "macdHist as macd_hist", "macdSignal as macd_signal", "rsiTimePeriod as rsi_time_period", "rsi", "smaTimePeriod as sma_time_period", "sma", "willrTimePeriod as willr_time_period", "willr", "close") .withColumn("ticker", lit(ticker)) .withColumn("task_number", lit(taskNumber.toString())) .withColumn("timeframe", lit("1day")) .na().drop() saveDataSet(modifiedDataset) } }
Здесь происходит преобразование имен столбцов к формату целевой таблицы. Также UUID приводится к string, проставляется константа таймфрейма (потому что потому), и, если есть строки с пустыми значениями, таковые удаляются.
Маппинг результатов
Получить результат работы Dataset<Row> можно несколькими способами:
используя Encoder:
val result = joinResult.as(Encoders.bean(ModelTrainResult.class)); val resultList : List<ModelTrainResult> = result.collectAsList();
или вручную разбирая каждый объект Row:
ModelTrainResult
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) data class ModelTrainResult @JsonCreator constructor( @get:JsonProperty("date_time") val dateTime : LocalDateTime, @get:JsonProperty("close") val close : Double, @get:JsonProperty("label_datetime") val labelDateTime : LocalDateTime, @get:JsonProperty("real_result") val realResult : Double, @get:JsonProperty("prediction") val prediction : Double, @get:JsonProperty("error") val error : Double ) { companion object { private val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") fun listFromDataset(dataset: Dataset<Row>): List<ModelTrainResult> { val list = mutableListOf<ModelTrainResult>() val iterator = dataset.toLocalIterator() while (iterator.hasNext()) { val row = iterator.next() list.add(fromRow(row)) } return list } private fun fromRow(row: Row): ModelTrainResult { return ModelTrainResult( LocalDateTime.parse(row.getString(0), formatter), row.getDouble(1), LocalDateTime.parse(row.getString(2), formatter), row.getDouble(3), row.getDouble(4), row.getDouble(5) ) } }
По моему мнению, второй вариант предпочтительнее для сложных структур, требующих преобразования из типов данных в датасете в целевой тип данных, как, например, дата в примере выше. Но первый вариант так же имеет место быть, его можно использовать для структур, состоящих, к примеру, только из строк.
Заметки о производительности после рассмотрения запросов
Следует понимать, что операции join, groupBy, shuffle (выполняется самим спарком в момент мапинга строк датасета в java pojo, например) — очень дорогие по времени операции, обязательно требующие тюнинга. Спарк — распределенная система, и как всякая распределенная система, требует синхронизации для централизованного получения результата. Датасеты получаются на разных экзекуторах, и во время join операций экзекуторам приходится обмениваться имеющейся у них информацией, что явно влечет затраты на сериализацию, сетевой вызов, десериализацию, дисковый ввод?вывод и непосредственно сопоставление полученных данных.
Снизить негативное влияние shuffle и join можно, есть очень хорошая статья на эту тему.
В Spark вы пишете код, который преобразует данные, этот код обрабатывается лениво и под капотом преобразуется в план запроса, который материализуется, когда вы вызываете действие, такое как collect () или write (). Spark делит данные на разделы, которые обрабатываются испол нителями, каждый из которых будет обрабатывать набор разделов. Операции, выполняемые в пределах одного раздела, называются узкими операциями и включают в себя такие функции, как map или filter. С другой стороны, агрегации — это широкие операции.которые требуют перемещения данных между узлами, что очень дорого. Сам план запроса может быть двух основных типов: логический план и физический план, который мы обсудим позже.
Следует помнить, что главное правило, касающееся производительности Spark, звучит так: Minimize Data Shuffles. Это вызвано широкими операциями в Spark, такими как соединения или агрегации, которые очень дороги из?за перетасовки данных.
Но, к сожалению, зачастую, особенно, в распределенной системе, без shuffle не обойтись.
Обучение модели
Подробно останавливаться на обучении модели не вижу смысла, в предыдущей статье разобраны примеры. В этом примере покажу, как обучал XGBoostRegressor и опишу результаты и найденные проблемы.
trainModel
fun trainModel( ticker : String, taskNumber : UUID, dateStart : LocalDate, dateEnd : LocalDate, evalPivotPoint : Long, offset : Long, modelParameters : AnalyticsRequest.ModelParameters ) : ModelTrainResultResponse { val pivot = dateEnd.minusDays(evalPivotPoint) val tdf = dataReaderService.getDatasetWithLabel(ticker, taskNumber, dateStart, pivot, offset) val edf = dataReaderService.getDatasetWithLabel(ticker, taskNumber, pivot, dateEnd, offset) .selectExpr(*allColumns) val modelParams = createModelParams(modelParameters) val regressor = xgBoostRegressor(modelParams) val model: PredictionModel<Vector, XGBoostRegressionModel> = regressor.fit(tdf) val predictions = model.transform(edf) combinedDataRepository.saveData(tdf.selectExpr(*allColumns).unionAll(edf), ticker, taskNumber) modelService.saveModel(model, taskNumber, modelParameters) val result = predictions.withColumn("error", col("prediction").minus(col(labelName))) return ModelTrainResultResponse(ModelTrainResult.listFromDataset(result.selectExpr(*resultExp))) }
Получаем train и eval датасеты, создаем новую модель, обучаем и сохраняем в postgres. В этом примере используется датасет из таблицы, в которую мы предварительно трансформировали данных в предыдущем разделе статьи.
saveModel
fun saveModel( model : PredictionModel<Vector, XGBoostRegressionModel>, taskId : UUID, modelParameters : AnalyticsRequest.ModelParameters ) { val byteArrayOutputStream = ByteArrayOutputStream() ObjectOutputStream(byteArrayOutputStream).use { it.writeObject(model) } val modelByteArray: ByteArray = byteArrayOutputStream.toByteArray() val jsonParams : JsonNode = objectMapper.convertValue(modelParameters, JsonNode::class.java) val entity = ModelEntity(modelByteArray, taskId, jsonParams) modelRepository.save(entity) log.info("Model for task id {} saved. Parameters map: {}, jsonNode: {}", taskId, modelParameters, jsonParams) }
Здесь модель преобразуется в байтовый поток и сохраняется в ячейку с типом данных bytea.
Параметры модели преобразуются в Json и сохраняются в одну таблицу с моделью.
Использование сохраненной модели
Сохраненную модель можно загрузить для дальнейшего использования и/или переобучения.
loadModel
internal inline fun <reified T> loadModel(modelId: Long): T { val optional = modelRepository.findById(modelId) val entity = optional.get() val modelByteArray = entity.model val byteArrayInputStream = ByteArrayInputStream(modelByteArray) val modelObject = ObjectInputStream(byteArrayInputStream).use { it.readObject() } if (modelObject is T) { return modelObject } else { throw ServiceException.withMessage("Model id $modelId has incorrect format") } }
predictWithExistingModel
fun predictWithExistingModel( ticker : String, taskNumber : UUID, dateStart : LocalDate, dateEnd : LocalDate, modelId : Long ): StockPredictDto { val model: PredictionModel<Vector, XGBoostRegressionModel> = modelService.loadModel(modelId) val data = dataReaderService.getMainDataset(ticker, taskNumber, dateStart, dateEnd) var predictions = model.transform(data) predictions = predictions.select("dateTime", "prediction") return StockPredictDto.fromDataset(predictions) }
Проблемы
1. Я пытался реализовать инкрементальное обучение модели XGBoost, но столкнулся с тем, что его реализация на Java (в отличие от Python) не поддерживает данный тип, и модель с каждым инкрементом не апдейтится, а переобучается, поэтому требуется полный набор данных для обучения. Поэтому примера с инкрементальным и потоковым обучением показать в рамках данной статьи не получится, но есть простор для новых исследований.
Ответы разработчиков XGboost по этой теме привожу здесь:
2. В виду того, что данные преобразуются в Double для передачи на GPU, в результирующей таблице наблюдается классическая проблема с хранением типов с плавающей точкой:
способов решения данной проблемы не нашел, нужен дополнительный ресерч.
3. Определение партиций при использовании оконных функций в распределенной системе - если их не определить, то есть вероятность существенной деградации производительности. Тему мало ресерчил, нужно отдельно погружаться и много тестировать. При первом подходе получал не корректные результаты объединения таблиц.
Ладно тебе со своей биг датой, что с ценой акций?
Если вкратце, то результаты так себе (: С наскока тему аналитики движения цены финансовых инструментов не возьмешь, особенно основываясь только на технических индикаторах. В этой предметной области нужно подбирать параметры технических индикаторов, сами индикаторы, и другие данные, исходя из личной торговой стратегии. В данном примере используется по одному индикатору EMA и SMA, хотя в реальном случае их может быть больше, с различными параметрами. Также индикаторы делятся на опережающие и запаздывающие, и нужно исходить из задачи, что нужно получить, чтобы в итоге достичь результата. Дополнительно нужно делать модели, анализирующие фигуры технического анализа и потоки новостей из авторитетных источников, и комбинировать модели под свою задачу.
Особенно плохо получившаяся модель реагирует на «выбросы», так, на скриншоте можно видеть, что последний скачок цены с $305 до $379 предсказать не удалось, да и еще предикт пошел в сторону уменьшения.
Но в целом я результатом доволен: удалось реализовать инфраструктуру, обладающую широкими возможностями горизонтального масштабирования, благодаря Kubernetes, Spark и Cassandra, и при этом есть возможность писать код на Java/Kotlin, которые мне импонируют на порядок больше, чем Python.
Тесты производительности распределенной системы по сравнению с одним инстансом не проводились — возможно, будет темой отдельной статьи. Нужно найти достаточно большой датасет, подобрать параметры модели для приемлемых практических результатов и подготовить тестовые стенды.