Привет, коллеги.

С недавнего времени у нас на проекте появился Spark. В процессе разработки мы сталкиваемся с множеством трудностей, и узнаём много нового. Хочется для себя систематизировать эти знания и заодно поделиться ими с окружающими. Поэтому я решил написать цикл статей про использование Apache Spark. Эта статья первая, и она будет вводной.

Итак, про сам Spark написано уже довольно много, в том числе и на самом Хабре раз и два. Поэтому придется немного повториться.

Apache Spark — это фреймворк с помощью которого можно создавать приложения для распределенной обработки данных. Со своей стороны Spark предоставляет программное API для работы с данными, в которое входят: загрузка, сохранение, трансформация и агрегация, плюс множество всяких мелочей, например, возможность локального запуска в целях разработки и отладки кода.

Кроме того, Spark отвечает за распределенное выполнение вашего приложения. Он сам раскидывает ваш код по всем узлам кластера, разбивает на подзадачи, создаёт план выполнения и следит за успешностью. Если на каком либо узле произошел сбой, и какая-то подзадача завершилась с ошибкой, она обязательно будет перезапущена.

Гибкость Spark заключается еще и в том, что ваши приложения могут быть запущены под управлением разных распределенных систем:

  • Stand-alone mode. В этом режиме вы можете самостоятельно развернуть инфраструктуру Spark, он сам будет управлять всеми ресурсами кластера и выполнять ваши приложения.
  • Yarn. Это вычислительная платформа, входящая в экосистему Hadoop. Ваше spark-приложение может быть запущено на Hadoop кластере, под управлением этой платформы.
  • Mesos. Еще одна альтернативная система управления ресурсами кластера.
  • Local mode. Локальный режим, создан для разработки и отладки, чтобы облегчить нашу с вами жизнь.

У всех этих систем есть свои преимущества, которые актуальны для разного рода задач и требований.

Почему Spark становится № 1?

Давайте разберемся, почему в последнее время популярность Spark растет, и почему он стал вытеснять старый добрый Hadoop MapReduce (далее просто MR).

Всё дело в новом архитектурном подходе, который значительно выигрывает в производительности у классических MR приложений.

Дело тут вот в чем: MR начинал разрабатываться в 2000-х годах, кода оперативная память стоила дорого, и 64-х битные системы еще не захватили мир. Поэтому разработчики пошли тогда по единственно верному пути — реализовали обмен промежуточными данными через жесткий диск (или, если быть точным, через распределенную файловую систему HDFS). Т.е все промежуточные данные между Map и Reduce фазами, сбрасывались в HDFS. Как следствие, много времени тратилось на дисковый ввод/вывод и репликацию данных между узлами Hadoop кластера.

Spark появился позже и уже в совершенно других условиях. Теперь промежуточные данные сериализуются и хранятся в оперативной памяти, а обмен данными между узлами происходит напрямую, через сеть, без лишних абстракций. Стоит сказать, что дисковый ввод/вывод всё-таки используется (на этапе shuffle). Но его интенсивность значительно меньше.

Кроме того, инициализация и запуск задач Spark происходит теперь намного быстрее за счет JVM оптимизаций. МapReduce запускает для каждой задачи новую JVM со всеми вытекающими последствиями (загрузка всех JAR файлов, JIT компиляция, итд), в то время как Spark на каждом узле держит запущенную JVM, управляя при этом запуском задач через RPC вызовы. Ну и наконец, Spark оперирует RDD абстракциями (Resilient Distributed Dataset), которые более универсальны, чем MapReduce. Хотя для справедливости надо сказать, что есть Cascading. Это обёртка над MR, призванная добавить гибкости.

Кроме того, есть еще одно, очень важное обстоятельство — Spark позволяет разрабатывать приложения не только для задач пакетной обработки данных (batch processing), но и для работы с потоками данных (stream processing). Предоставляя при этом единый подход и единое API (правда с небольшими различиями).

А как это выглядит в коде?

Spark API достаточно хорошо документирован на оф. сайте, но для целостности рассказа давайте кратко с ним познакомимся на примере, который вы можете запустить у себя локально.

Вычислим для каждого пользователя общее количество посещенных им сайтов. И вернем Топ10 в отсортированном по убыванию виде.

public class UsersActivities {

    public static void main( String[] args ) {

        final JavaSparkContext sc = new JavaSparkContext(
                new SparkConf()
                        .setAppName("Spark user-activity")
                        .setMaster("local[2]")            //local - означает запуск в локальном режиме.
                        .set("spark.driver.host", "localhost")    //это тоже необходимо для локального режима
        );

        //Здесь могла быть загрузка из файла sc.textFile("users-visits.log");
        //Но я решил применить к входным данным метод parallelize(); Для наглядности

        List<String> visitsLog = Arrays.asList(
                "user_id:0000, habrahabr.ru",
                "user_id:0001, habrahabr.ru",
                "user_id:0002, habrahabr.ru",
                "user_id:0000, abc.ru",
                "user_id:0000, yxz.ru",
                "user_id:0002, qwe.ru",
                "user_id:0002, zxc.ru",
                "user_id:0001, qwe.ru"
                //итд, дофантазируйте дальше сами :)
        );

        JavaRDD<String> visits = sc.parallelize(visitsLog);

        //из каждой записи делаем пары: ключ (user_id), значение (1 - как факт посещения)
        // (user_id:0000 : 1)
        JavaPairRDD<String, Integer> pairs = visits.mapToPair(
            (String s) -> {
                String[] kv = s.split(",");
                return new Tuple2<>(kv[0], 1);
            }
        );

        //суммируем факты посещений для каждого user_id
        JavaPairRDD<String, Integer> counts = pairs.reduceByKey(
            (Integer a, Integer b) -> a + b
        );

        //сортируем по Value и возвращаем первые 10 записей
        List<Tuple2<String, Integer>> top10 = counts.takeOrdered(
                10,
                new CountComparator()
        );

        System.out.println(top10);
    }

    //Такие дела, компаратор должен быть Serializable. Иначе (в случае анонимного класса), получим исключение
    //SparkException: Task not serializable
    //http://stackoverflow.com/questions/29301704/apache-spark-simple-word-count-gets-sparkexception-task-not-serializable
    public static class CountComparator implements Comparator<Tuple2<String, Integer>>, Serializable {

        @Override
        public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {

            return o2._2()-o1._2();
        }
    }
}

Да, стоит сказать, что Spark API доступно для Scala, Java и Python. Но всё-таки изначально оно проектировалось именно под Scala. Как бы там ни было, у нас в проекте используется Java 8, и в целом мы вполне довольны. Переходить на скалу пока не видим никакого смысла.

В следующей статье мы подробно рассмотрим stream processing: зачем он нужен, как он используется у нас в проекте, и что такое SparkSQL.

Анатолий Никулин, Software Architect

Оригинал статьи на Хабре

RSS