Getting familiar with Pthreads.
Download ph.c:
#include <stdlib.h> #include <unistd.h> #include <stdio.h> #include <assert.h> #include <pthread.h> #include <sys/time.h> #define SOL #define NBUCKET 5 #define NKEYS 100000 struct entry { int key; int value; struct entry *next; }; struct entry *table[NBUCKET]; int keys[NKEYS]; int nthread = 1; volatile int done; double now() { struct timeval tv; gettimeofday(&tv, 0); return tv.tv_sec + tv.tv_usec / 1000000.0; } static void print(void) { int i; struct entry *e; for (i = 0; i < NBUCKET; i++) { printf("%d: ", i); for (e = table[i]; e != 0; e = e->next) { printf("%d ", e->key); } printf("\n"); } } static void insert(int key, int value, struct entry **p, struct entry *n) { struct entry *e = malloc(sizeof(struct entry)); e->key = key; e->value = value; e->next = n; *p = e; } static void put(int key, int value) { int i = key % NBUCKET; insert(key, value, &table[i], table[i]); } static struct entry* get(int key) { struct entry *e = 0; for (e = table[key % NBUCKET]; e != 0; e = e->next) { if (e->key == key) break; } return e; } static void * thread(void *xa) { long n = (long) xa; int i; int b = NKEYS/nthread; int k = 0; double t1, t0; // printf("b = %d\n", b); t0 = now(); for (i = 0; i < b; i++) { // printf("%d: put %d\n", n, b*n+i); put(keys[b*n + i], n); } t1 = now(); printf("%ld: put time = %f\n", n, t1-t0); // Should use pthread_barrier, but MacOS doesn't support it ... __sync_fetch_and_add(&done, 1); while (done < nthread) ; t0 = now(); for (i = 0; i < NKEYS; i++) { struct entry *e = get(keys[i]); if (e == 0) k++; } t1 = now(); printf("%ld: get time = %f\n", n, t1-t0); printf("%ld: %d keys missing\n", n, k); return NULL; } int main(int argc, char *argv[]) { pthread_t *tha; void *value; long i; double t1, t0; if (argc < 2) { fprintf(stderr, "%s: %s nthread\n", argv[0], argv[0]); exit(-1); } nthread = atoi(argv[1]); tha = malloc(sizeof(pthread_t) * nthread); srandom(0); assert(NKEYS % nthread == 0); for (i = 0; i < NKEYS; i++) { keys[i] = random(); } t0 = now(); for(i = 0; i < nthread; i++) { assert(pthread_create(&tha[i], NULL, thread, (void *) i) == 0); } for(i = 0; i < nthread; i++) { assert(pthread_join(tha[i], &value) == 0); } t1 = now(); printf("completion time = %f\n", t1-t0); }
Read it. The program is basically a key-value store. There’s a table
with NBUCKET
columns, where each column corresponds to a singly linked list of entry
s. The program creates nthread
number of threads. There are in total NKEYS
keys, and each thread is responsible for “randomly” inserting (using put()
) NKEYS/nthread
number of entry
s into one of the table
columns. After all the threads have done put()
ing, they try to walkthrough the entire table (using get()
) to check whether these NKEYS
keys have been successfully inserted into table
.
Compile, then run it:
peilin@PWN:~/6.828/2018/homework/threads_and_locking$ gcc -g -O2 -o ph ph.c -pthread
peilin@PWN:~/6.828/2018/homework/threads_and_locking$ ./ph 2
0: put time = 0.013939
1: put time = 0.014364
1: get time = 7.341332
1: 16425 keys missing
0: get time = 7.357036
0: 16425 keys missing
completion time = 7.371710
Yeah we’ve got 16425 keys missing. Why?
The thing is, consider the following scenario: Say now the first entry
in table[2]
is entry
A. Thread 1 wants to insert entry
B into table[2]
. Unfortunately, thread 2 also wants to insert entry
C into table[2]
, at the same time. What would happen, then? Take a look at insert()
:
int i = key % NBUCKET; insert(key, value, &table[i], table[i]);
static void insert(int key, int value, struct entry **p, struct entry *n) { struct entry *e = malloc(sizeof(struct entry)); e->key = key; e->value = value; e->next = n; *p = e; }
Now table[2]
contains a pointer to C, which will be assigned to both A->next
and B->next
. The last step of insert()
, however, is updating the linked list head, table[2]
to the address of the newly allocated entry
, namely &A and &B. This is where the collision happens: If &A get copied to table[2]
first, it will be overwritten by &B. B->next
still points at C. Later when the threads perform get()
, A won’t be on the linked list, therefore won’t be found. And vice versa. The point is, one of the entry
will be missing if two threads try to insert into the same column at the same time.
Solution: use locks. Pthreads defined some handy functions / definitions for us to use, including:
pthread_mutex_t lock; pthread_mutex_init(&lock, NULL); pthread_mutex_lock(&lock); pthread_mutex_unlock(&lock);
When a thread wants to put()
, it should first do pthread_mutex_lock()
to acquire a lock. After put()
ing, it “return”s the lock by doing pthread_mutex_unlock()
. My solution defines one lock for each table
entries.
// gcc -g -O2 -o ph ph.c -pthread #include <stdlib.h> #include <unistd.h> #include <stdio.h> #include <assert.h> #include <pthread.h> #include <sys/time.h> #define SOL #define NBUCKET 5 #define NKEYS 100000 struct entry { int key; int value; struct entry *next; }; struct entry *table[NBUCKET]; int keys[NKEYS]; int nthread = 1; volatile int done; // declare one lock for each table entry pthread_mutex_t locks[NBUCKET]; double now() { struct timeval tv; gettimeofday(&tv, 0); return tv.tv_sec + tv.tv_usec / 1000000.0; } static void print(void) { int i; struct entry *e; for (i = 0; i < NBUCKET; i++) { printf("%d: ", i); for (e = table[i]; e != 0; e = e->next) { printf("%d ", e->key); } printf("\n"); } } static void insert(int key, int value, struct entry **p, struct entry *n) { struct entry *e = malloc(sizeof(struct entry)); e->key = key; e->value = value; e->next = n; *p = e; } static void put(int key, int value) { int i = key % NBUCKET; // acquire locks[i] first before modifying table[i] pthread_mutex_lock(&locks[i]); insert(key, value, &table[i], table[i]); // release locks[i] after you are done with it pthread_mutex_unlock(&locks[i]); } static struct entry* get(int key) { struct entry *e = 0; for (e = table[key % NBUCKET]; e != 0; e = e->next) { if (e->key == key) break; } return e; } static void * thread(void *xa) { long n = (long) xa; int i; int b = NKEYS/nthread; int k = 0; double t1, t0; // printf("b = %d\n", b); t0 = now(); for (i = 0; i < b; i++) { // printf("%d: put %d\n", n, b*n+i); put(keys[b*n + i], n); } t1 = now(); printf("%ld: put time = %f\n", n, t1-t0); // Should use pthread_barrier, but MacOS doesn't support it ... __sync_fetch_and_add(&done, 1); while (done < nthread) ; t0 = now(); for (i = 0; i < NKEYS; i++) { struct entry *e = get(keys[i]); if (e == 0) k++; } t1 = now(); printf("%ld: get time = %f\n", n, t1-t0); printf("%ld: %d keys missing\n", n, k); return NULL; } int main(int argc, char *argv[]) { pthread_t *tha; void *value; long i; double t1, t0; if (argc < 2) { fprintf(stderr, "%s: %s nthread\n", argv[0], argv[0]); exit(-1); } nthread = atoi(argv[1]); tha = malloc(sizeof(pthread_t) * nthread); srandom(0); assert(NKEYS % nthread == 0); for (i = 0; i < NKEYS; i++) { keys[i] = random(); } // initialize the locks for(i = 0; i < NBUCKET; i++) { pthread_mutex_init(&locks[i], NULL); } t0 = now(); for(i = 0; i < nthread; i++) { assert(pthread_create(&tha[i], NULL, thread, (void *) i) == 0); } for(i = 0; i < nthread; i++) { assert(pthread_join(tha[i], &value) == 0); } t1 = now(); printf("completion time = %f\n", t1-t0); }
Compile and run it again:
peilin@PWN:~/6.828/2018/homework/threads_and_locking$ gcc -g -O2 -o ph ph.c -pthread
peilin@PWN:~/6.828/2018/homework/threads_and_locking$ ./ph 4
2: put time = 0.012107
0: put time = 0.012344
3: put time = 0.012706
1: put time = 0.013270
0: get time = 7.150409
0: 0 keys missing
3: get time = 7.160527
3: 0 keys missing
2: get time = 7.161781
2: 0 keys missing
1: get time = 7.232478
1: 0 keys missing
completion time = 7.247822
Nice! No keys missing, and we’ve done 4 times work (each thread checks all the NKEYS
number of keys in get()
) than without multithreading, using roughly same amount of time.
This concludes this simple homework. See you next time!