Book Cover

Chapter 14

Thoughts on System Design for Big Data

Note: The examples below are abridged; the book contains more details.

  1. Partitioning
  2. Retry Policies
  3. Amazon DynamoDB - Part I
  4. Amazon DynamoDB - Part II
  5. Amazon DynamoDB - Part III
  6. Amazon DynamoDB - Part IV
  7. Amazon DynamoDB - Part V
  8. Amazon DynamoDB - Part VI

Partitioning

To illustrate how this partitioning scheme allows for a balanced cluster assignment, we used 4450 email addresses from the Enron dataset to simulate arbitrary email addresses (keys) and we calculated how they would be assigned across our 5 clusters using the Python script below:

N = 5
counts = [0 for i in range(N)]
for email in open('chapter14/enron.txt'):
  counts[hash(email.rstrip()) % N] += 1
print(counts)

Retry Policies

Here’s an example of a probabilistic exponential backoff poller; this one starts with an initial delay of 2 milliseconds and gives up when the total delay exceeds 60 milliseconds; it also allows resetting the delay interval with a probability of 0.42 (give it a try; run it multiple times):

import java.util.Random;
import java.util.concurrent.TimeUnit;

class ProbabilisticExponentialBackoffPoller {
  public Object[] poll(
      final TimeUnit timeUnit,
      final int initialDelay,
      final int maxTotalDelay,
      final double delayResetProbability)
        throws InterruptedException {
    final Random random = new Random();
    Object[] messages = readMessages();
    int currentDelay = initialDelay;
    int totalDelay = 0;
    
    while (messages.length == 0 && totalDelay <= maxTotalDelay) {
      System.out.printf("Sleeping for %d %s\n", currentDelay, timeUnit);
      timeUnit.sleep(currentDelay);
      totalDelay += currentDelay;
      currentDelay *= 2;
      
      if (delayResetProbability > random.nextDouble()) {
        System.out.println("Resetting delay interval");
        currentDelay = initialDelay;
      }
      
      messages = readMessages();
    }
    return messages;
  }
  
  private Object[] readMessages() {
    return new Object[0];
  }
}

public class Main {
  public static void main(String args[]) throws Exception {
    new ProbabilisticExponentialBackoffPoller().poll(TimeUnit.MILLISECONDS, 2, 60, 0.42);
  }
}

Amazon DynamoDB - Part I

We created a table called reminder to store user-requested reminders for our previous example of a reminder bot. Now we can execute the following code to create reminders:

import boto3

dynamodb = boto3.resource('dynamodb')
dynamodb.Table('reminder').put_item(
  Item={
    'timestamp': 1511643670,
    'userID': '[email protected]',
    'ttl': 1511730070,
    'text': 'write a DynamoDB example',
  }
)

Amazon DynamoDB - Part II

To get that item we just created, we simply can query for it:

from pprint import pprint

import boto3

dynamodb = boto3.resource('dynamodb')
response = dynamodb.Table('reminder').get_item(
  Key={
  'timestamp': 1511643670,
  'userID': '[email protected]',
  }
)
pprint(response['Item'])
## {u'text': u'write a DynamoDB example',
##   u'timestamp': Decimal('1511643670'),
##   u'ttl': Decimal('1511730070'),
##   u'userID': u'[email protected]'}

Amazon DynamoDB - Part III

DyanmoDB also support batch operations:

import boto3

dynamodb = boto3.resource('dynamodb')
with dynamodb.Table('reminder').batch_writer() as batch:
  batch.put_item(
    Item={
      'timestamp': 1511647270,
      'userID': '[email protected]',
      'ttl': 1511733670,
      'text': 'batch write example',
    }
  )
  batch.put_item(
    Item={
      'timestamp': 1511647270,
      'userID': '@voicera.ai',
      'ttl': 1511733670,
      'text': 'batch write example',
    }
  )  
  batch.put_item(
    Item={
      'timestamp': 1511650870,
      'userID': '[email protected]',
      'ttl': 1511737270,
      'text': 'another item to write',
    }
  )
  batch.delete_item(
    Key={
      'timestamp': 1511643670,
      'userID': '[email protected]',
    }
  )

Amazon DynamoDB - Part IV

Because we specified the timestamp field as the partition key, querying for reminders at a specific timestamp is efficient using a key condition:

from pprint import pprint

import boto3
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')
response = dynamodb.Table('reminder').query(
  KeyConditionExpression=Key('timestamp').eq(1511647270)
)
pprint(response['Items'])
## [{u'text': u'batch write example',
##    u'timestamp': Decimal('1511647270'),
##    u'ttl': Decimal('1511733670'),
##    u'userID': u'[email protected]'},
##  {u'text': u'another user',
##    u'timestamp': Decimal('1511647270'),
##    u'ttl': Decimal('1511733670'),
##    u'userID': u'[email protected]'}]

Amazon DynamoDB - Part V

To get all reminders that belong to a specific user, we scan the table using the following attribute condition:

from pprint import pprint

import boto3
from boto3.dynamodb.conditions import Attr

dynamodb = boto3.resource('dynamodb')
response = dynamodb.Table('reminder').scan(
  FilterExpression=Attr('userID').eq('[email protected]')
)
pprint(response['Items'])
## [{u'text': u'batch write example',
##    u'timestamp': Decimal('1511647270'),
##    u'ttl': Decimal('1511733670'),
##    u'userID': u'[email protected]'},
##  {u'text': u'another item to write',
##    u'timestamp': Decimal('1511650870'),
##    u'ttl': Decimal('1511737270'),
##    u'userID': u'[email protected]'}]

Amazon DynamoDB - Part VI

NoSQL databases are a great fit for unstructured data, like in the following example:

from pprint import pprint

import boto3
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('reminder')
with table.batch_writer() as batch:
  batch.put_item(
    Item={
      'timestamp': 1511652070,
      'userID': '[email protected]',
      'ttl': 1511738470,
      'checklist': ['milk', 'eggs', 'bread']
    }
  )
  batch.put_item(
    Item={
      'timestamp': 1511652070,
      'userID': '[email protected]',
      'ttl': 1511738470,
      'actions': [{
        'callback': 'example.com/callback',
        'payload': {'text': 'flash office lights'},
        'method': 'POST'
      }]
    }
  )

response = table.query(
  KeyConditionExpression=Key('timestamp').eq(1511652070)
)
pprint(response['Items'])
## [{u'checklist': [u'milk', u'eggs', u'bread'],
##   u'timestamp': Decimal('1511652070'),
##   u'ttl': Decimal('1511738470'),
##   u'userID': u'[email protected]'},
## {u'actions': [{u'callback': u'example.com/callback',
##     u'method': u'POST',
##     u'payload': {u'text': u'flash office lights'}}],
##   u'timestamp': Decimal('1511652070'),
##   u'ttl': Decimal('1511738470'),
##   u'userID': u'[email protected]'}]