Мы используем куки, чтобы пользоваться сайтом было удобно.
Хорошо
to the top
>
>
>
Big/Bug Data: анализируем исходный код …

Big/Bug Data: анализируем исходный код Apache Flink

15 Дек 2020

Приложения, использующиеся в области Big Data, обрабатывают огромные объемы информации, причем часто это происходит в реальном времени. Естественно, такие приложения должны обладать высокой надежностью, чтобы никакая ошибка в коде не могла помешать обработке данных. Для достижения высокой надежности необходимо пристально следить за качеством кода проектов, разрабатываемых для этой области. Решением данной проблемы и занимается статический анализатор PVS-Studio. Сегодня в качестве подопытного для анализатора был выбран проект Apache Flink, разработанный организацией Apache Software Foundation - одним из лидеров на рынке ПО для Big Data.

0781_Apache_Flink_ru/image1.png

Что же такое Apache Flink? Это open-source фреймворк для распределенной обработки больших объемов данных. Он был разработан как альтернатива Hadoop MapReduce в 2010 году в Техническом университете Берлина. Основу фреймворка составляет движок распределенного исполнения приложений пакетной и потоковой обработки данных. Написан этот движок на языках Java и Scala. Сегодня Apache Flink возможно использовать в проектах, написанных с использованием языков Java, Scala, Python и даже SQL.

Анализ проекта

Загрузив исходный код проекта, я запустил сборку проекта командой 'mvn clean package -DskipTests', указанной в инструкции на GitHub. Пока шла сборка, я при помощи утилиты CLOC выяснил что в проекте 10838 Java-файлов, в которых имеется около 1.3 миллионов строк кода. Причем тестовых Java файлов оказалось аж 3833 штуки, а это больше 1/3 всех Java файлов. Также я заметил, что в проекте используется статический анализатор кода FindBugs и утилита Cobertura, предоставляющая информацию о покрытии кода тестами. Учитывая все это, становится ясно, что разработчики Apache Flink тщательно следили за качеством кода и покрытием тестами при разработке.

После удачной сборки я открыл проект в IntelliJ IDEA и запустил анализ при помощи плагина PVS-Studio for IDEA and Android Studio. Предупреждения анализатора распределились следующим образом:

  • 183 High;
  • 759 Medium;
  • 545 Low.

Примерно 2/3 срабатываний анализатора PVS-Studio выданы на тестовые файлы. Если учесть этот факт и размер кодовой базы проекта, то можно сказать, что разработчикам Apache Flink удалось сохранить качество кода на высоте.

Изучив предупреждения анализатора более подробно, я выбрал самые интересные на мой взгляд. Так давайте же посмотрим, что же удалось найти PVS-Studio в данном проекте!

GetFreeTrialImage

Всего лишь немного невнимательности

V6001 There are identical sub-expressions 'processedData' to the left and to the right of the '==' operator. CheckpointStatistics.java(229)

@Override
public boolean equals(Object o) 
{
  ....
  CheckpointStatistics that = (CheckpointStatistics) o;
  return id == that.id &&
    savepoint == that.savepoint &&
    triggerTimestamp == that.triggerTimestamp &&
    latestAckTimestamp == that.latestAckTimestamp &&
    stateSize == that.stateSize &&
    duration == that.duration &&
    alignmentBuffered == that.alignmentBuffered &&
    processedData == processedData &&                // <=
    persistedData == that.persistedData &&
    numSubtasks == that.numSubtasks &&
    numAckSubtasks == that.numAckSubtasks &&
    status == that.status &&
    Objects.equals(checkpointType, that.checkpointType) &&
    Objects.equals(
      checkpointStatisticsPerTask, 
      that.checkpointStatisticsPerTask);
}

На фоне других выражений в return данная ошибка не сильно бросается в глаза. При переопределении метода equals для класса CheckpointStatistics программист допустил ошибку в выражении processedData == processedData, которое не имеет смысла, потому что всегда истинно. Аналогично остальным выражениям в return должны были сравниваться поля текущего объекта this и объекта that: processedData == that.processedData. Данная ситуация это один из типичных паттернов ошибок, найденных в функциях сравнения, которые подробно описаны в статье "Зло живет в функциях сравнения". Вот так и получается, что всего лишь "немного невнимательности" сломало логику проверки эквивалентности объектов класса CheckpointStatistics.

Выражение всегда истинно

V6007 Expression 'input2.length > 0' is always true. Operator.java(283)

public static <T> Operator<T> createUnionCascade(Operator<T> input1, 
                                                 Operator<T>... input2) 
{
  if (input2 == null || input2.length == 0) 
  {
    return input1;                                // <=
  } 
  else if (input2.length == 1 && input1 == null) 
  {
    return input2[0];
  }
  ....
  if (input1 != null) 
  {
    ....
  } 
  else if (input2.length > 0 && input2[0] != null) // <=
  {
    ....
  } 
  else 
  {
    ....
  }
}

В этом методе анализатор оказался внимательнее человека, о чем и решил сообщить в своей своеобразной манере, указав на то, что выражение input2.length > 0 всегда будет истинно. Причина в том, что если длина массива input2 будет равна 0, то условие input2 == null || input2.length == 0 первого if в методе будет истинно, и выполнение метода будет прервано, так и не дойдя до строчки с выражением input2.length > 0.

Всевидящий анализатор

V6007 Expression 'slotSharingGroup == null' is always false. StreamGraphGenerator.java(510)

private <T> Collection<Integer> transformFeedback(....)
{
  ....
  String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
  if (slotSharingGroup == null)
  {
    slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
  }
  ....
}

Анализатор сообщил, что выражение slotSharingGroup == null всегда ложно. А это наталкивает на мысль о том, что метод determineSlotSharingGroup никогда не вернет null. Неужели анализатор настолько умный, что смог вычислить все значения, которые может вернуть этот метод? Давайте-ка лучше все перепроверим сами:

public class StreamGraphGenerator 
{
  ....
  public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
  ....
  private String determineSlotSharingGroup(String specifiedGroup, 
                                           Collection<Integer> inputIds) 
  {
    if (specifiedGroup != null)
    {
      return specifiedGroup; // <= 1
    }
    else
    {
      String inputGroup = null;
      for (int id: inputIds)
      {
        String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
        if (inputGroup == null)
        {
          inputGroup = inputGroupCandidate;
        }
        else if (!inputGroup.equals(inputGroupCandidate))
        {
          return DEFAULT_SLOT_SHARING_GROUP; // <= 2
        }
      }
      return inputGroup == null 
             ? DEFAULT_SLOT_SHARING_GROUP 
             : inputGroup; // <= 3
    }
  }
  ....
}

По порядку пройдемся по всем return и посмотрим, что же может вернуть данный метод:

  • В первом return вернется аргумент метода specifiedGroup, но только если он не будет равен null.
  • return в цикле for вернет значение статического финального поля DEFAULT_SLOT_SHARING_GROUP, инициализированного строковым литералом;
  • И последний return в методе вернет значение переменной inputGroup, если оно не будет равно null. В противном случае вернется значение поля DEFAULT_SLOT_SHARING_GROUP.

Получается, что анализатор действительно смог вычислить невозможность возврата null из метода determineSlotSharingGroup и предупредил нас об этом, указав на бессмысленность проверки slotSharingGroup == null. И хотя данная ситуация не является ошибочной, однако подобная дополнительная защита анализатора сможет обнаружить ошибку в каком-нибудь другом случае. Например, когда необходимо, чтобы метод возвращал null при определенных условиях.

Собери их всех

V6007 Expression 'currentCount <= lastEnd' is always true. CountSlidingWindowAssigner.java(75)

V6007 Expression 'lastStart <= currentCount' is always true. CountSlidingWindowAssigner.java(75)

@Override
public Collection<CountWindow> assignWindows(....) throws IOException 
{
  Long countValue = count.value();
  long currentCount = countValue == null ? 0L : countValue;
  count.update(currentCount + 1);
  long lastId = currentCount / windowSlide;
  long lastStart = lastId * windowSlide;
  long lastEnd = lastStart + windowSize - 1;
  List<CountWindow> windows = new ArrayList<>();
  while (lastId >= 0 && 
         lastStart <= currentCount && 
         currentCount <= lastEnd) 
  {
    if (lastStart <= currentCount && currentCount <= lastEnd) // <=
    {
      windows.add(new CountWindow(lastId));
    }
    lastId--;
    lastStart -= windowSlide;
    lastEnd -= windowSlide;
  }
  return windows;
}

Анализатор предупреждает, что выражения currentCount <= lastEnd и lastStart <= currentCount всегда истинны. И ведь действительно, если посмотреть на условие цикла while, то там имеются точно такие же выражения. Это значит, что внутри цикла эти выражения всегда будут истинны, поэтому в список windows будут добавлены все объекты типа CountWindow созданные в цикле. Вариантов появления этой бессмысленной проверки множество, и первое, что приходит в голову, либо артефакт рефакторинга, либо перестраховка разработчика. Но это может быть и ошибка, если хотелось проверить что-то иное...

Некорректный порядок аргументов

V6029 Possible incorrect order of arguments passed to method: 'hasBufferForReleasedChannel', 'hasBufferForRemovedChannel'. NettyMessageClientDecoderDelegateTest.java(165), NettyMessageClientDecoderDelegateTest.java(166)

private void testNettyMessageClientDecoding(
       boolean hasEmptyBuffer,
       boolean hasBufferForReleasedChannel,
       boolean hasBufferForRemovedChannel) throws Exception 
{
  ....
  List<BufferResponse> messages = createMessageList (
    hasEmptyBuffer,
    hasBufferForReleasedChannel,
    hasBufferForRemovedChannel);
  ....
}

Отсутствие в Java возможности вызова метода с именованными параметрами иногда играет злую шутку с разработчиками. Именно это и произошло при вызове метода createMessageList, на который указал анализатор. При взгляде на определение этого метода становится ясно, что параметр hasBufferForRemovedChannel должен передаваться в метод перед параметром hasBufferForReleasedChannel:

private List<BufferResponse> createMessageList(
  boolean hasEmptyBuffer,
  boolean hasBufferForRemovedChannel,
  boolean hasBufferForReleasedChannel) 
{
  ....
  if (hasBufferForReleasedChannel) {
    addBufferResponse(messages, 
                      releasedInputChannelId, 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  if (hasBufferForRemovedChannel) {
    addBufferResponse(messages, 
                      new InputChannelID(), 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  ....
  return messages;
}

Однако при вызове метода разработчик перепутал порядок этих аргументов, из-за чего логика метода createMessageList будет нарушена, если значения перепутанных аргументов будут отличаться.

Ох уж этот копипаст

V6032 It is odd that the body of method 'seekToFirst' is fully equivalent to the body of another method 'seekToLast'. RocksIteratorWrapper.java(53), RocksIteratorWrapper.java(59)

public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
  ....
  private RocksIterator iterator;
  ....

  @Override
  public void seekToFirst() {
    iterator.seekToFirst(); // <=
    status(); 
  }
  
  @Override
  public void seekToLast() {
    iterator.seekToFirst();  // <=
    status();
  }
  
  ....
}

Тела методов seekToFirst и seekToLast совпадают. Причем оба метода используются в коде.

Что-то здесь нечисто! И действительно, если посмотреть какие методы имеются у объекта iterator, то станет понятно, какую ошибку помог найти анализатор:

public class RocksIterator extends AbstractRocksIterator<RocksDB>
{
  ....
}

public abstract class AbstractRocksIterator<....> extends ....
{
  ....
  public void seekToFirst() // <=
  {
    assert this.isOwningHandle();
    this.seekToFirst0(this.nativeHandle_);
  }
  
  public void seekToLast() // <=
  {
    assert this.isOwningHandle();
    this.seekToLast0(this.nativeHandle_);
  }
  ....
}

Получается, что метод seekToLast класса RocksIteratorWrapper был создан копипастом метода seekToFirst этого же класса. Однако по каким-то причинам разработчик забыл заменить вызов метода seekToFirst у iterator на seekToLast.

Путаница с форматными строками

V6046 Incorrect format. A different number of format items is expected. Arguments not used: 1. UnsignedTypeConversionITCase.java(102)

public static void prepareMariaDB() throws IllegalStateException {
  ....
  if (!initDbSuccess) {
    throw new IllegalStateException(
      String.format(
        "Initialize MySQL database instance failed after {} attempts," + // <=
        " please open an issue.", INITIALIZE_DB_MAX_RETRY));
  }
}

Форматные строки метода String.format и логгеров в Java различаются. В отличие от форматной строки метода String.format, где места подстановки аргументов указываются при помощи символа '%', в форматных строках логгеров вместо этого используется комбинация символов '{}'. Из-за этой путаницы и произошла эта ошибка. В качестве форматной строки в метод String.format передается строка, которая, скорее всего, была скопирована из другого места, где она использовалась в каком-нибудь логгере. В результате, в сообщении исключения IllegalStateException не произойдет подстановки значения поля INITIALIZE_DB_MAX_RETRY вместо '{}', и тот, кто поймает или залоггирует это исключение, так и не узнает сколько попыток подключения к БД было произведено.

Ненормальное распределение

V6048 This expression can be simplified. Operand 'index' in the operation equals 0. CollectionUtil.java(76)

public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets;                                 // <=
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))

           .add(element); 
  }

  return buckets.values();
}

Метод partition разделяет элементы из коллекции elements на несколько сегментов, после чего возвращает эти сегменты. Однако из-за ошибки, на которую указал анализатор, никакого разделения происходить не будет. Выражение, при помощи которого определяют номер сегмента index % numBuckets, всегда будет равно 0, потому что index всегда равен 0. Изначально я подумал, что код этого метода был подвергнут рефакторингу, в результате которого забыли добавить увеличение переменной index в цикле for. Но, просмотрев коммит, где этот метод был добавлен, выяснилось, что эта ошибка появилась вместе с этим методом. Исправленный вариант кода:

public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets; 
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))
           .add(element);
    index++;
  }

  return buckets.values();
}

Несовместимый тип

V6066 The type of object passed as argument is incompatible with the type of collection: String, ListStateDescriptor<NextTransactionalIdHint>. FlinkKafkaProducer.java(1083)

public interface OperatorStateStore 
{
  Set<String> getRegisteredStateNames();
}
public class FlinkKafkaProducer<IN> extends ....
{
  ....
  private static final 
  ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
  NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = ....;

  @Override
  public void initializeState(FunctionInitializationContext context).... 
  {
    ....
    if (context.getOperatorStateStore()
               .getRegisteredStateNames()
               .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR))    // <=
    {
       migrateNextTransactionalIdHindState(context);
    }
    ....
  }
}

Выражение, на которое указал анализатор, всегда будет ложно, а значит вызова метода migrateNextTransactionalIdHindState никогда не произойдет. Как же так случилось, что кто-то ищет в коллекции типа Set<String> элемент совсем другого типа - ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>? Без помощи анализатора такая ошибка, скорее всего, очень долго бы жила в коде, так как в глаза она не бросается и без тщательной проверки данного метода её просто невозможно найти.

Неатомарное изменение переменной

V6074 Non-atomic modification of volatile variable. Inspect 'currentNumAcknowledgedSubtasks'. PendingCheckpointStats.java(131)

boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
  TaskStateStats taskStateStats = taskStats.get(jobVertexId);

  if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
    currentNumAcknowledgedSubtasks++;                // <=
    latestAcknowledgedSubtask = subtask;

    currentStateSize += subtask.getStateSize();      // <=

    long processedData = subtask.getProcessedData();
    if (processedData > 0) {
      currentProcessedData += processedData;         // <=
    }

    long persistedData = subtask.getPersistedData();
    if (persistedData > 0) {
      currentPersistedData += persistedData;         // <=
    }
    return true;
  } else {
    return false;
  }
}

Плюс еще 3 предупреждения анализатора в том же самом методе:

  • V6074 Non-atomic modification of volatile variable. Inspect 'currentStateSize'. PendingCheckpointStats.java(134)
  • V6074 Non-atomic modification of volatile variable. Inspect 'currentProcessedData'. PendingCheckpointStats.java(138)
  • V6074 Non-atomic modification of volatile variable. Inspect 'currentPersistedData'. PendingCheckpointStats.java(143)

Анализатор подсказал, что аж 4 volatile поля в методе изменяются неатомарно. И анализатор, как всегда, оказывается прав, потому что операции ++ и +=, на самом деле, - это последовательность из нескольких операций чтения-изменения-записи. Как известно, значение volatile поля видно всем потокам, а это значит, что из-за состояния гонки часть изменений поля может быть утеряна. Более подробную информацию об этом вы можете прочитать в описании диагностики.

Заключение

В Big Data проектах надежность является одним из ключевых требований, поэтому за качеством кода в них необходимо пристально следить. В этом разработчикам Apache Flink помогали несколько инструментов, а также они написали значительное количество тестов. Однако даже в таких условиях анализатор PVS-Studio смог найти ошибки. От ошибок невозможно полностью избавиться, но использование различных инструментов статического анализа кода регулярно позволит приблизится к этому идеалу. Да-да, именно регулярно. Только при регулярном использовании статический анализ показывает свою эффективность, о чём более подробно рассказано в этой статье.

Популярные статьи по теме


Комментарии (0)

Следующие комментарии next comments
close comment form
close form

Заполните форму в два простых шага ниже:

Ваши контактные данные:

Шаг 1
Поздравляем! У вас есть промокод!

Тип желаемой лицензии:

Шаг 2
Team license
Enterprise license
** Нажимая на кнопку, вы даете согласие на обработку
своих персональных данных. См. Политику конфиденциальности
close form
Запросите информацию о ценах
Новая лицензия
Продление лицензии
--Выберите валюту--
USD
EUR
RUB
* Нажимая на кнопку, вы даете согласие на обработку
своих персональных данных. См. Политику конфиденциальности

close form
Бесплатная лицензия PVS‑Studio для специалистов Microsoft MVP
* Нажимая на кнопку, вы даете согласие на обработку
своих персональных данных. См. Политику конфиденциальности

close form
Для получения лицензии для вашего открытого
проекта заполните, пожалуйста, эту форму
* Нажимая на кнопку, вы даете согласие на обработку
своих персональных данных. См. Политику конфиденциальности

close form
Мне интересно попробовать плагин на:
* Нажимая на кнопку, вы даете согласие на обработку
своих персональных данных. См. Политику конфиденциальности

close form
check circle
Ваше сообщение отправлено.

Мы ответим вам на


Если вы так и не получили ответ, пожалуйста, проверьте, отфильтровано ли письмо в одну из следующих стандартных папок:

  • Промоакции
  • Оповещения
  • Спам