Sriram
Sriram Creator of geeky things, mostly unused junk

Distributed Thinking: A gentle introduction to distributed processing using Apache Storm and Apache Spark - Part 2

Apache Spark LogoApache Storm Logo

Welcome to Part 2 of Distributed Thinking! Here I discuss about How to look at data in a distributed environment. In case you missed anything, click here for Part 0.

1. Distributed Thinking

1.3 How to look at data?

As discussed before, our primary goal is to transform the data into the form that we need. Let us take a classic example of counting lines in a text file, starting with a simple version using Java:

This is a simple transformation where the Data Source DS is the file in args[0], the connector is the try block with a BufferedReader and the Extract XT method is the br.readLine() method.
We filter the data by checking if the line read is null or not. The Transform process TR merely counts all lines that aren’t filtered out. It Transforms a line into the number 1 and adds it to the count.
The Load method LD is the System.out.println method which prints the lineCount onto the screen, which acts as the Data Sink SNK here.

Let’s break this program into blocks:

Line Count

The Connector block is responsible for getting the data from the underlying system.
The Distributed Transformation Code block takes the list of lines Line 0 to Line (N-1) and transforms it into a number 1 and then sums it up to N, which is the number of lines in the file.
The Output block is responsible for delivering the result to the output system, which may be a Data Sink DS or the screen.

Let’s rewrite this program to look like the blocks:

Here you can see that the FileConnector is responsible for getting the data from the underlying file and providing it for the Transformation code. The Transformation code is now in two methods. The first one is called transform1 which converts the String input, which is the line, into the Integer 1. The second one is called transform2 which adds the count to the dataSink which is an AtomicInteger in our case, but can be a Database or any other system.

Immutability and Side Effects

Here I need to bring up a very important concept called Immutability. While there are several ways to explain it, this is the basic concept:

An Object x is said to be Immutable if there is no function F which can change its value.
i.e. there is no F(x) = y where the value of x changes after calling F(). In other words, you can apply F1(x), F2(x) … Fn(x) but the value of x does not change.

Let’s look at an example of an object that is immutable and an object that is mutable:

1
2
3
4
5
6
7
8
9
10
11
12
13
int x = 10
def f1(y) = {
    y = y + 20
    print(y) // Prints 30
}
print(x) // Prints 10

AtomicInteger i = 10
def f2(y) = {
    y.incrementAndGet(20)
    print(y) // Prints 30
}
print(i) // Prints 30

Here you can see that the integer x is immutable. There is no method that can be called on x that will change the value 10. However, the AtomicInteger i is mutable, and the method f2 changes its value each time it’s called.

This brings us to another important concept: Avoiding Side Effects in Functions.

A side effect of a function F is the changes that the function causes to the state of the system outside the function. Let us look at the following method:

1
2
3
4
5
AtomicInteger countA = 0 
def containsA(x) {
    if (x.contains("A"))
        countA.incr()
}

Here the increment method containsA modifies the value countA, which is a side effect of the method. The called of the method does not have any control over the modifications that the method made. Why is this a problem? Look at the following code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def runSomething() {
    for (String x : xList) {
        int retries = 3
        do {
            try {
                containsA(x)
                doSomething()
                break
            } catch (e) {
                retries--
            }
        } while (retries > 0)
    }
}

Here the method runSomething() loops through a list of Strings xList and counts the number of As, then does something with doSomething(). Now in order to ensure that every line in xList is definitely accounted for, runSomething() performs retries on failures. This retry-on-failure is a common feature of distributed systems in order to guarantee reliability in large clusters where there can be many causes of failures - nodes are down, network is down, data is corrupt, etc.
In this case if doSomething() fails, the String x is retried. Now if containsA has found A in the String, it counts twice, thus leading to inaccuracy.
Let’s rewrite this code to avoid Side Effects:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
AtomicInteger countA = 0 
def containsA(x): Int {
    if (x.contains("A"))
        return 1
    return 0
}

def runSomething() {
    for (String x : xList) {
        int retries = 3
        do {
            int inc = 0
            try {
                inc = containsA(x)
                countA.incr(inc)
                doSomething()
                break
            } catch (e) {
                countA.decr(inc)
                retries--
            }
        } while (retries > 0)
    }
}

Now no matter the number of times containsA is called, the system remains unchanged, at the caller has the ability to now avoid side effects, thus making a more deterministic, reliable system.

Another important reason why we would avoid side effects is if the system depends on a global state that changes when a method is called, there is no easy way to replicate this change across all nodes of the cluster. Global counters on one machine do not hold the values of the counters in another machine.

To be able to process your code in a distributed way, the following pointers will help:

  1. Break the data into small immutable blocks which are only transformed from one immutable form to another immutable form
  2. Do not transform with side effects - avoid changing the state of the system in the transformations in order to allow retries and reproducible results in a distributed environment.

Often, the reason for an unreliable system yielding non-deterministic results (Different runs of the same data yields different results) is because either the data is mutable or the functions have side effects.

In the example above with the line counting, Transformation method transform2 is not free of side effects as it modifies the counter that is passed to it, which means that transform2 cannot be reliably used in a distributed environment.

In most problems, the final step of collecting the data into the data sink is not straight forward. Hence, Spark and Storm both provide means to collect the data in such a way that all the results of the previous step is grouped together on one machine in the end. This step is also called Reduce in the Map-Reduce terminology.

How to write these collectors and sinks for a distributed environment will be discussed later.


In the next post, let’s look at a more complex example, Word Count. Click here to continue reading.

Click on any of the links below to go directly to any part:
Part 0: Introduction
Part 1: When and Where
Part 2: How to look at data
Part 3: How to look at data (continued)
Part 4: Processing Data Streams
Part 5: Processing Large Data Chunks

comments powered by Disqus