Our website uses cookies to enhance your browsing experience.
Accept
to the top
close form

Fill out the form in 2 simple steps below:

Your contact information:

Step 1
Congratulations! This is your promo code!

Desired license type:

Step 2
Team license
Enterprise license
** By clicking this button you agree to our Privacy Policy statement
close form
Request our prices
New License
License Renewal
--Select currency--
USD
EUR
* By clicking this button you agree to our Privacy Policy statement

close form
Free PVS‑Studio license for Microsoft MVP specialists
* By clicking this button you agree to our Privacy Policy statement

close form
To get the licence for your open-source project, please fill out this form
* By clicking this button you agree to our Privacy Policy statement

close form
I am interested to try it on the platforms:
* By clicking this button you agree to our Privacy Policy statement

close form
check circle
Message submitted.

Your message has been sent. We will email you at


If you haven't received our response, please do the following:
check your Spam/Junk folder and click the "Not Spam" button for our message.
This way, you won't miss messages from our team in the future.

>
>
>
Big / Bug Data: analyzing the Apache Fl…

Big / Bug Data: analyzing the Apache Flink source code

Dec 15 2020

Applications used in the field of Big Data process huge amounts of information, and this often happens in real time. Naturally, such applications must be highly reliable so that no error in the code can interfere with data processing. To achieve high reliability, one needs to keep a wary eye on the code quality of projects developed for this area. The PVS-Studio static analyzer is one of the solutions to this problem. Today, the Apache Flink project developed by the Apache Software Foundation, one of the leaders in the Big Data software market, was chosen as a test subject for the analyzer.

0781_Apache_Flink/image1.png

So, what is Apache Flink? It is an open-source framework for distributed processing of large amounts of data. It was developed as an alternative to Hadoop MapReduce in 2010 at the Technical University of Berlin. The framework is based on the distributed execution engine for batch and streaming data processing applications. This engine is written in Java and Scala. Today, Apache Flink can be used in projects written using Java, Scala, Python, and even SQL.

Project analysis

After downloading the project's source code, I started building it using the 'mvn clean package -DskipTests' command specified in the instructions on GitHub. While the build was in progress, I used the CLOC utility and found out that there were 10838 Java files in the project, which had about 1.3 million lines of code. Moreover, there were as many as 3833 test Java files, which is more than 1/3 of all Java files. I also noticed that the project uses the FindBugs static code analyzer and the Cobertura utility. The latter provides information about code coverage by tests. Given all this, it becomes clear that the developers of Apache Flink carefully monitored the quality of the code and test coverage during development.

After a successful build, I opened the project in IntelliJ IDEA and started the analysis using the PVS-Studio for IDEA and Android Studio plugin. The analyzer's warnings distributed as follows:

  • 183 High;
  • 759 Medium;
  • 545 Low.

Approximately 2/3 of the PVS-Studio analyzer's warnings were issued for test files. If we take into account this fact and the size of the project's code base, we can say for sure that the developers of Apache Flink managed to keep the code quality at a high level.

After studying the analyzer's warnings in more detail, I chose the most interesting ones in my opinion. So let's see what PVS-Studio found in this project!

GetFreeTrialImage

Just a little carelessness

V6001 There are identical sub-expressions 'processedData' to the left and to the right of the '==' operator. CheckpointStatistics.java(229)

@Override
public boolean equals(Object o) 
{
  ....
  CheckpointStatistics that = (CheckpointStatistics) o;
  return id == that.id &&
    savepoint == that.savepoint &&
    triggerTimestamp == that.triggerTimestamp &&
    latestAckTimestamp == that.latestAckTimestamp &&
    stateSize == that.stateSize &&
    duration == that.duration &&
    alignmentBuffered == that.alignmentBuffered &&
    processedData == processedData &&                // <=
    persistedData == that.persistedData &&
    numSubtasks == that.numSubtasks &&
    numAckSubtasks == that.numAckSubtasks &&
    status == that.status &&
    Objects.equals(checkpointType, that.checkpointType) &&
    Objects.equals(
      checkpointStatisticsPerTask, 
      that.checkpointStatisticsPerTask);
}

This error is not very noticeable among other expressions in return. When overriding the equals method for the CheckpointStatistics class, the programmer made an error in the processedData == processedData expression, which doesn't make sense because it is always true. Similarly to the rest of the expressions in return, the fields of the current this object and that object had to be compared: processedData == that.processedData. This case is one of the typical error patterns found in comparison functions, described in detail in the article "The Evil within the comparison functions". So, it turns out that just "a little carelessness" broke the logic of checking the equivalence of objects of the CheckpointStatistics class.

Expression is always true

V6007 Expression 'input2.length > 0' is always true. Operator.java(283)

public static <T> Operator<T> createUnionCascade(Operator<T> input1, 
                                                 Operator<T>... input2) 
{
  if (input2 == null || input2.length == 0) 
  {
    return input1;                                // <=
  } 
  else if (input2.length == 1 && input1 == null) 
  {
    return input2[0];
  }
  ....
  if (input1 != null) 
  {
    ....
  } 
  else if (input2.length > 0 && input2[0] != null) // <=
  {
    ....
  } 
  else 
  {
    ....
  }
}

In this method, the analyzer turned out to be more attentive than a person, and reported this in its own peculiar way, pointing out that the expression input2.length > 0 will always be true. The reason is that if the length of the input2 array is 0, then the condition input2 == null || input2.length == 0 of the first if in the method will be true. As a result, the method execution will be interrupted before reaching the line with the expression input2.length > 0.

All-seeing analyzer

V6007 Expression 'slotSharingGroup == null' is always false. StreamGraphGenerator.java(510)

private <T> Collection<Integer> transformFeedback(....)
{
  ....
  String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
  if (slotSharingGroup == null)
  {
    slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
  }
  ....
}

The analyzer reported that the expression slotSharingGroup == null is always false. This suggests that the determineSlotSharingGroup method will never return null. Is the analyzer so smart that it could calculate all the values that this method can return? Let's double-check it ourselves:

public class StreamGraphGenerator 
{
  ....
  public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
  ....
  private String determineSlotSharingGroup(String specifiedGroup, 
                                           Collection<Integer> inputIds) 
  {
    if (specifiedGroup != null)
    {
      return specifiedGroup; // <= 1
    }
    else
    {
      String inputGroup = null;
      for (int id: inputIds)
      {
        String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
        if (inputGroup == null)
        {
          inputGroup = inputGroupCandidate;
        }
        else if (!inputGroup.equals(inputGroupCandidate))
        {
          return DEFAULT_SLOT_SHARING_GROUP; // <= 2
        }
      }
      return inputGroup == null 
             ? DEFAULT_SLOT_SHARING_GROUP 
             : inputGroup; // <= 3
    }
  }
  ....
}

Let's go through all the returns in order and see what this method can return:

  • The first return results in the argument of the specifiedGroupmethod, but only if it is not null.
  • return in the for loop returns the value of the static final field DEFAULT_SLOT_SHARING_GROUP, initialized with a string literal;
  • And the last return in the method returns the value of the inputGroup variable if it is not null. Otherwise, the value of the DEFAULT_SLOT_SHARING_GROUP field is returned.

It turns out that the analyzer was really able to calculate the impossibility of returning null from the determineSlotSharingGroup method and warned us about this, pointing out that theslotSharingGroup == null check was pointless. Although this situation is not erroneous, however, such additional protection of the analyzer will be able to detect the error in some other case. For example, when a method must return null under certain conditions.

Collect them all

V6007 Expression 'currentCount <= lastEnd' is always true. CountSlidingWindowAssigner.java(75)

V6007 Expression 'lastStart <= currentCount' is always true. CountSlidingWindowAssigner.java(75)

@Override
public Collection<CountWindow> assignWindows(....) throws IOException 
{
  Long countValue = count.value();
  long currentCount = countValue == null ? 0L : countValue;
  count.update(currentCount + 1);
  long lastId = currentCount / windowSlide;
  long lastStart = lastId * windowSlide;
  long lastEnd = lastStart + windowSize - 1;
  List<CountWindow> windows = new ArrayList<>();
  while (lastId >= 0 && 
         lastStart <= currentCount && 
         currentCount <= lastEnd) 
  {
    if (lastStart <= currentCount && currentCount <= lastEnd) // <=
    {
      windows.add(new CountWindow(lastId));
    }
    lastId--;
    lastStart -= windowSlide;
    lastEnd -= windowSlide;
  }
  return windows;
}

The analyzer warns that the expressions currentCount <= lastEnd and lastStart <= currentCount are always true. And indeed, if you look at the condition of the while loop, there are exactly the same expressions. This means that these expressions will always be true inside the loop, so all CountWindow objects created in the loop will be added to the windows list. There are many options for the appearance of this meaningless check, and the first thing that comes to mind is either a refactoring artifact or a developer's playing safe. But it can also be an error if the author wanted to check something else...

Incorrect order of arguments

V6029 Possible incorrect order of arguments passed to method: 'hasBufferForReleasedChannel', 'hasBufferForRemovedChannel'. NettyMessageClientDecoderDelegateTest.java(165), NettyMessageClientDecoderDelegateTest.java(166)

private void testNettyMessageClientDecoding(
       boolean hasEmptyBuffer,
       boolean hasBufferForReleasedChannel,
       boolean hasBufferForRemovedChannel) throws Exception 
{
  ....
  List<BufferResponse> messages = createMessageList (
    hasEmptyBuffer,
    hasBufferForReleasedChannel,
    hasBufferForRemovedChannel);
  ....
}

Java's lack of the ability to call a method with named parameters sometimes plays a cruel joke with developers. This is exactly what happened when calling the createMessageList method that the analyzer pointed to. The definition of this method explains why the hasBufferForRemovedChannel parameter must be passed to the method before the hasBufferForReleasedChannel parameter:

private List<BufferResponse> createMessageList(
  boolean hasEmptyBuffer,
  boolean hasBufferForRemovedChannel,
  boolean hasBufferForReleasedChannel) 
{
  ....
  if (hasBufferForReleasedChannel) {
    addBufferResponse(messages, 
                      releasedInputChannelId, 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  if (hasBufferForRemovedChannel) {
    addBufferResponse(messages, 
                      new InputChannelID(), 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  ....
  return messages;
}

However, when calling the method, the developer messed up the order of these arguments, which is why the logic of the createMessageList method will be broken if the values of the mixed arguments are different.

Naughty copy-paste

V6032 It is odd that the body of method 'seekToFirst' is fully equivalent to the body of another method 'seekToLast'. RocksIteratorWrapper.java(53), RocksIteratorWrapper.java(59)

public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
  ....
  private RocksIterator iterator;
  ....

  @Override
  public void seekToFirst() {
    iterator.seekToFirst(); // <=
    status(); 
  }
  
  @Override
  public void seekToLast() {
    iterator.seekToFirst();  // <=
    status();
  }
  
  ....
}

The bodies of the seekToFirst and seekToLast methods are the same. What is more, both methods are used in the code.

There is something screwy here! Indeed, if you look at the methods of the iterator object, it becomes clear what error the analyzer helped find:

public class RocksIterator extends AbstractRocksIterator<RocksDB>
{
  ....
}

public abstract class AbstractRocksIterator<....> extends ....
{
  ....
  public void seekToFirst() // <=
  {
    assert this.isOwningHandle();
    this.seekToFirst0(this.nativeHandle_);
  }
  
  public void seekToLast() // <=
  {
    assert this.isOwningHandle();
    this.seekToLast0(this.nativeHandle_);
  }
  ....
}

It turns out that the seekToLast method of the RocksIteratorWrapper class was a copy-paste of the seekToFirst method of the same class. However, for some reasons, the developer forgot to replace the seekToFirst method call in iterator with seekToLast.

Confusion with format strings

V6046 Incorrect format. A different number of format items is expected. Arguments not used: 1. UnsignedTypeConversionITCase.java(102)

public static void prepareMariaDB() throws IllegalStateException {
  ....
  if (!initDbSuccess) {
    throw new IllegalStateException(
      String.format(
        "Initialize MySQL database instance failed after {} attempts," + // <=
        " please open an issue.", INITIALIZE_DB_MAX_RETRY));
  }
}

The format strings of the String.format method and loggers differ in Java. Unlike the format string of the String.format method, where argument placeholders are specified using the '%' character, format strings of loggers use a combination of '{} ' characters instead. This confusion is to blame for this error. As a format string, the String.format method is passed a string that was most likely copied from another place where it was used in some logger. As a result, the value of the INITIALIZE_DB_MAX_RETRY field will not be substituted for '{} ' in the IllegalStateException message. Whoever catches or logs this exception will not know how many attempts to connect to the database were made.

Non-normal distribution

V6048 This expression can be simplified. Operand 'index' in the operation equals 0. CollectionUtil.java(76)

public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets;                                 // <=
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))

           .add(element); 
  }

  return buckets.values();
}

The partition method splits elements from the elements collection into several segments, and then returns these segments. However, due to the error indicated by the analyzer, no separation will occur. The expression used to determine the number of the segment index % numBuckets will always be 0, because index is always 0. Initially, I thought that the code of this method was refactored, as a result of which they forgot to add an increase in the index variable in the for loop. But after reviewing the commit where this method was added, it turned out that this error appeared along with this method. Correct variant of the code:

public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets; 
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))
           .add(element);
    index++;
  }

  return buckets.values();
}

Incompatible type

V6066 The type of object passed as argument is incompatible with the type of collection: String, ListStateDescriptor<NextTransactionalIdHint>. FlinkKafkaProducer.java(1083)

public interface OperatorStateStore 
{
  Set<String> getRegisteredStateNames();
}
public class FlinkKafkaProducer<IN> extends ....
{
  ....
  private static final 
  ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
  NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = ....;

  @Override
  public void initializeState(FunctionInitializationContext context).... 
  {
    ....
    if (context.getOperatorStateStore()
               .getRegisteredStateNames()
               .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR))    // <=
    {
       migrateNextTransactionalIdHindState(context);
    }
    ....
  }
}

The expression indicated by the analyzer will always be false, which means that the migrateNextTransactionalIdHindState method will never be called. How is it that someone looks for a completely different type of element in a Set<String> type collection - ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>? Without the help of the analyzer, such an error would most likely live in the code for a very long time, since it is not noticeable, and it is simply impossible to find it without a thorough check of this method.

Non-atomic change of the variable

V6074 Non-atomic modification of volatile variable. Inspect 'currentNumAcknowledgedSubtasks'. PendingCheckpointStats.java(131)

boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
  TaskStateStats taskStateStats = taskStats.get(jobVertexId);

  if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
    currentNumAcknowledgedSubtasks++;                // <=
    latestAcknowledgedSubtask = subtask;

    currentStateSize += subtask.getStateSize();      // <=

    long processedData = subtask.getProcessedData();
    if (processedData > 0) {
      currentProcessedData += processedData;         // <=
    }

    long persistedData = subtask.getPersistedData();
    if (persistedData > 0) {
      currentPersistedData += persistedData;         // <=
    }
    return true;
  } else {
    return false;
  }
}

Plus 3 more analyzer warnings in the same method:

  • V6074 Non-atomic modification of volatile variable. Inspect 'currentStateSize'. PendingCheckpointStats.java(134)
  • V6074 Non-atomic modification of volatile variable. Inspect 'currentProcessedData'. PendingCheckpointStats.java(138)
  • V6074 Non-atomic modification of volatile variable. Inspect 'currentPersistedData'. PendingCheckpointStats.java(143)

The analyzer suggested that as many as 4 volatile fields in the method change non-atomically. And the analyzer, as always, turns out to be right, because the ++ and += operations are actually a sequence of several read-edit-write operations. As we know, the value of the volatile field is visible to all threads, which means that due to the race condition, some of the field changes may be lost. For more information about this, see the diagnostic description.

Conclusion

In Big Data projects, reliability is one of the key requirements, so the quality of the code in them must be closely monitored. The developers of Apache Flink were assisted by several tools, and they also wrote a significant number of tests. However, even under these conditions, the PVS-Studio analyzer was able to find errors. You can't completely get rid of errors, but the use of various static code analysis tools on a regular basis will allow you to get closer to this perfect picture. Yes, you read it right, it's all about regular use. Only with consistent application, static analysis shows its effectiveness, which is described in more detail in this article.



Comments (0)

Next comments next comments
close comment form