Select Page

Getting familiar with Pthreads.


Download ph.c:

#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include <sys/time.h>

#define SOL
#define NBUCKET 5
#define NKEYS 100000

struct entry {
  int key;
  int value;
  struct entry *next;
};
struct entry *table[NBUCKET];
int keys[NKEYS];
int nthread = 1;
volatile int done;


double
now()
{
 struct timeval tv;
 gettimeofday(&tv, 0);
 return tv.tv_sec + tv.tv_usec / 1000000.0;
}

static void
print(void)
{
  int i;
  struct entry *e;
  for (i = 0; i < NBUCKET; i++) {
    printf("%d: ", i);
    for (e = table[i]; e != 0; e = e->next) {
      printf("%d ", e->key);
    }
    printf("\n");
  }
}

static void 
insert(int key, int value, struct entry **p, struct entry *n)
{
  struct entry *e = malloc(sizeof(struct entry));
  e->key = key;
  e->value = value;
  e->next = n;
  *p = e;
}

static 
void put(int key, int value)
{
  int i = key % NBUCKET;
  insert(key, value, &table[i], table[i]);
}

static struct entry*
get(int key)
{
  struct entry *e = 0;
  for (e = table[key % NBUCKET]; e != 0; e = e->next) {
    if (e->key == key) break;
  }
  return e;
}

static void *
thread(void *xa)
{
  long n = (long) xa;
  int i;
  int b = NKEYS/nthread;
  int k = 0;
  double t1, t0;

  //  printf("b = %d\n", b);
  t0 = now();
  for (i = 0; i < b; i++) {
    // printf("%d: put %d\n", n, b*n+i);
    put(keys[b*n + i], n);
  }
  t1 = now();
  printf("%ld: put time = %f\n", n, t1-t0);

  // Should use pthread_barrier, but MacOS doesn't support it ...
  __sync_fetch_and_add(&done, 1);
  while (done < nthread) ;

  t0 = now();
  for (i = 0; i < NKEYS; i++) {
    struct entry *e = get(keys[i]);
    if (e == 0) k++;
  }
  t1 = now();
  printf("%ld: get time = %f\n", n, t1-t0);
  printf("%ld: %d keys missing\n", n, k);
  return NULL;
}

int
main(int argc, char *argv[])
{
  pthread_t *tha;
  void *value;
  long i;
  double t1, t0;

  if (argc < 2) {
    fprintf(stderr, "%s: %s nthread\n", argv[0], argv[0]);
    exit(-1);
  }
  nthread = atoi(argv[1]);
  tha = malloc(sizeof(pthread_t) * nthread);
  srandom(0);
  assert(NKEYS % nthread == 0);
  for (i = 0; i < NKEYS; i++) {
    keys[i] = random();
  }
  t0 = now();
  for(i = 0; i < nthread; i++) {
    assert(pthread_create(&tha[i], NULL, thread, (void *) i) == 0);
  }
  for(i = 0; i < nthread; i++) {
    assert(pthread_join(tha[i], &value) == 0);
  }
  t1 = now();
  printf("completion time = %f\n", t1-t0);
}

Read it. The program is basically a key-value store. There’s a table with NBUCKET columns, where each column corresponds to a singly linked list of entrys. The program creates nthread number of threads. There are in total NKEYS keys, and each thread is responsible for “randomly” inserting (using put()) NKEYS/nthread number of entrys into one of the table columns. After all the threads have done put()ing, they try to walkthrough the entire table (using get()) to check whether these NKEYS keys have been successfully inserted into table.

Compile, then run it:

peilin@PWN:~/6.828/2018/homework/threads_and_locking$ gcc -g -O2 -o ph ph.c -pthread
peilin@PWN:~/6.828/2018/homework/threads_and_locking$ ./ph 2
0: put time = 0.013939
1: put time = 0.014364
1: get time = 7.341332
1: 16425 keys missing
0: get time = 7.357036
0: 16425 keys missing
completion time = 7.371710

Yeah we’ve got 16425 keys missing. Why?

The thing is, consider the following scenario: Say now the first entry in table[2] is entry A. Thread 1 wants to insert entry B into table[2]. Unfortunately, thread 2 also wants to insert entry C into table[2], at the same time. What would happen, then? Take a look at insert():

  int i = key % NBUCKET;
  insert(key, value, &table[i], table[i]);
static void 
insert(int key, int value, struct entry **p, struct entry *n)
{
  struct entry *e = malloc(sizeof(struct entry));
  e->key = key;
  e->value = value;
  e->next = n;
  *p = e;
}

Now table[2] contains a pointer to C, which will be assigned to both A->next and B->next. The last step of insert(), however, is updating the linked list head, table[2] to the address of the newly allocated entry, namely &A and &B. This is where the collision happens: If &A get copied to table[2] first, it will be overwritten by &B. B->next still points at C. Later when the threads perform get(), A won’t be on the linked list, therefore won’t be found. And vice versa. The point is, one of the entry will be missing if two threads try to insert into the same column at the same time.

Solution: use locks. Pthreads defined some handy functions / definitions for us to use, including:

pthread_mutex_t lock;
pthread_mutex_init(&lock, NULL);
pthread_mutex_lock(&lock);
pthread_mutex_unlock(&lock);

When a thread wants to put(), it should first do pthread_mutex_lock() to acquire a lock. After put()ing, it “return”s the lock by doing pthread_mutex_unlock(). My solution defines one lock for each table entries.

// gcc -g -O2 -o ph ph.c -pthread

#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include <sys/time.h>

#define SOL
#define NBUCKET 5
#define NKEYS 100000

struct entry {
  int key;
  int value;
  struct entry *next;
};
struct entry *table[NBUCKET];
int keys[NKEYS];
int nthread = 1;
volatile int done;

// declare one lock for each table entry
pthread_mutex_t locks[NBUCKET];

double
now()
{
 struct timeval tv;
 gettimeofday(&tv, 0);
 return tv.tv_sec + tv.tv_usec / 1000000.0;
}

static void
print(void)
{
  int i;
  struct entry *e;
  for (i = 0; i < NBUCKET; i++) {
    printf("%d: ", i);
    for (e = table[i]; e != 0; e = e->next) {
      printf("%d ", e->key);
    }
    printf("\n");
  }
}

static void 
insert(int key, int value, struct entry **p, struct entry *n)
{
  struct entry *e = malloc(sizeof(struct entry));
  e->key = key;
  e->value = value;
  e->next = n;
  *p = e;
}

static 
void put(int key, int value)
{
  int i = key % NBUCKET;

  // acquire locks[i] first before modifying table[i]
  pthread_mutex_lock(&locks[i]);

  insert(key, value, &table[i], table[i]);

  // release locks[i] after you are done with it
  pthread_mutex_unlock(&locks[i]);
}

static struct entry*
get(int key)
{
  struct entry *e = 0;
  for (e = table[key % NBUCKET]; e != 0; e = e->next) {
    if (e->key == key) break;
  }
  return e;
}

static void *
thread(void *xa)
{
  long n = (long) xa;
  int i;
  int b = NKEYS/nthread;
  int k = 0;
  double t1, t0;

  //  printf("b = %d\n", b);
  t0 = now();
  for (i = 0; i < b; i++) {
    // printf("%d: put %d\n", n, b*n+i);
    put(keys[b*n + i], n);
  }
  t1 = now();
  printf("%ld: put time = %f\n", n, t1-t0);

  // Should use pthread_barrier, but MacOS doesn't support it ...
  __sync_fetch_and_add(&done, 1);
  while (done < nthread) ;

  t0 = now();
  for (i = 0; i < NKEYS; i++) {
    struct entry *e = get(keys[i]);
    if (e == 0) k++;
  }
  t1 = now();
  printf("%ld: get time = %f\n", n, t1-t0);
  printf("%ld: %d keys missing\n", n, k);
  return NULL;
}

int
main(int argc, char *argv[])
{
  pthread_t *tha;
  void *value;
  long i;
  double t1, t0;

  if (argc < 2) {
    fprintf(stderr, "%s: %s nthread\n", argv[0], argv[0]);
    exit(-1);
  }
  nthread = atoi(argv[1]);
  tha = malloc(sizeof(pthread_t) * nthread);
  srandom(0);
  assert(NKEYS % nthread == 0);
  for (i = 0; i < NKEYS; i++) {
    keys[i] = random();
  }

  // initialize the locks
  for(i = 0; i < NBUCKET; i++) {
    pthread_mutex_init(&locks[i], NULL);
  }

  t0 = now();
  for(i = 0; i < nthread; i++) {
    assert(pthread_create(&tha[i], NULL, thread, (void *) i) == 0);
  }
  for(i = 0; i < nthread; i++) {
    assert(pthread_join(tha[i], &value) == 0);
  }
  t1 = now();
  printf("completion time = %f\n", t1-t0);
}

Compile and run it again:

peilin@PWN:~/6.828/2018/homework/threads_and_locking$ gcc -g -O2 -o ph ph.c -pthread
peilin@PWN:~/6.828/2018/homework/threads_and_locking$ ./ph 4
2: put time = 0.012107
0: put time = 0.012344
3: put time = 0.012706
1: put time = 0.013270
0: get time = 7.150409
0: 0 keys missing
3: get time = 7.160527
3: 0 keys missing
2: get time = 7.161781
2: 0 keys missing
1: get time = 7.232478
1: 0 keys missing
completion time = 7.247822

Nice! No keys missing, and we’ve done 4 times work (each thread checks all the NKEYS number of keys in get()) than without multithreading, using roughly same amount of time.

This concludes this simple homework. See you next time!