0 0 投票数

## 自旋锁

#include "xchg.c"
#include "spin.h"
void spinlock_init(spinlock_t *lock) {
lock->flag = 0;
}
//自旋锁初始化函数
void spinlock_acquire(spinlock_t *lock) {
while (xchg(&lock->flag, 1) == 1)
;//进行自旋
}
//自旋锁申请函数
void spinlock_release(spinlock_t *lock) {
lock->flag = 0;
}
//自旋锁释放函数

## 互斥信号量

typedef struct __mutex_t {
int flag;
} mutex_t;
//互斥锁结构体
void mutex_init(mutex_t *lock) {
lock->flag = 0;
}
//自旋锁初始化函数

sys_futex()的调用我不仔细说了，主要用到的动作是FUTEX_WAITFUTEX_WAKE

void lock(loct_t* m) {
while (TestAndSet(&m->guard, 1) == 1)
; // acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
m->guard = 0;
park();
}
}

void unlock(){
while (TestAndSet(&m->guard, 1) == 1)
; // acquired guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
unpark(queue_remove(m->q)); // hold lock (for next thread!)
m->guard = 0;
}

if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
unpark(queue_remove(m->q)); // hold lock (for next thread!)
m->guard = 0;

sys_futex(&lock->flag, FUTEX_WAKE, 1, NULL, NULL, 0);

void mutex_release(mutex_t *lock) {//自旋锁释放
xchg(&lock->flag, 0);
sys_futex(&lock->flag, FUTEX_WAKE, 1, NULL, NULL, 0);
}

void mutex_acquire(mutex_t *lock) {//自旋锁申请
if (xchg(&lock->flag, 1) == 0) {
return;
} else {
sys_futex(&lock->flag, FUTEX_WAIT, 1, NULL, NULL ,0);
mutex_acquire(lock);
}
}

sys_futex(&lock->flag, FUTEX_WAIT, 1, NULL, NULL ,0);

sys_futex(&lock->flag, FUTEX_WAIT, 1, NULL, NULL ,0);
mutex_acquire(lock);

## 线程安全的计数器

void counter_init(counter_t *c, int val) {
c->val = val;
lock_init((void*) &c->lock);
}

int counter_get_value(counter_t *c) {
lock_acquire((void*) &c->lock);
int val = c->val;
lock_release((void*) &c->lock);
return val;
}

void counter_increment(counter_t *c) {
lock_acquire((void*) &c->lock);
c->val++;
lock_release((void*) &c->lock);
}

void counter_decrement(counter_t *c) {
lock_acquire((void*) &c->lock);
c->val--;
lock_release((void*) &c->lock);
}

int getAcuracy(counter_t* c) {
for (int i = 0; i < NUMCPUS; i++){
c->global += c->local[i];
c->local[i] = 0;
}
int val = c->global;
return val;
}

void counter_increment(counter_t *c) {
}

void counter_decrement(counter_t *c) {
}

if (c->local[cpu] >= c->threshold || c->local[cpu] <= -c->threshold)

#include "lock.h"

typedef struct __counter_t {
int global;
int threshold;
#if LOCK_TYPE == SPIN_LOCK
#elif LOCK_TYPE ==  MUTEX_LOCK
#else
#endif//不同的lock_type对应不同的内容
} counter_t;
int DEFAULT_THRESHOLD = 1024;
void counter_init(counter_t *c, int value);
//计数器初始化函数
int counter_get_value(counter_t *c);
//获得计数器的当前值
void counter_increment(counter_t *c);
//计数增加
void counter_decrement(counter_t *c);
//技术减少
void update(counter_t* c, int threadID, int amt);
//更新计数器

void counter_init(counter_t* c, int value)
{//计数器初始化函数
c->threshold = DEFAULT_THRESHOLD;
c->global = value;
lock_init((void*)&c->glock);
int i;
for (i = 0; i < THREAD_NUM; i++){
c->local[i] = 0;
lock_init((void*)&c->llock[i]);
}//初始化申请的锁
//注意调用的是lock提供的接口函数而不是直接调用锁的函数
}

int counter_get_value(counter_t* c)
{//获得计数器的值
int i;
for (i = 0; i < THREAD_NUM; i++){
lock_acquire((void*)&c->llock[i]);
lock_acquire((void*)&c->glock);
c->global += c->local[i];
lock_release((void*)&c->glock);
c->local[i] = 0;
lock_release((void*)&c->llock[i]);
}
lock_acquire((void*)&c->glock);
int val = c->global;
lock_release((void*)&c->glock);
return val;
}

void update(counter_t* c, int threadID, int amt)
{//计数器更新函数
lock_acquire((void*)&c->llock[cpu]);
c->local[cpu]  += amt;
if (c->local[cpu] >= c->threshold || c->local[cpu] <= -(c->threshold)) {
lock_acquire((void*)&c->glock);
c->global += c->local[cpu];
lock_release((void*)&c->glock);
c->local[cpu] = 0;
}
lock_release((void*)&c->llock[cpu]);
}

void counter_increment(counter_t *c)
{//计数器加
}

void counter_decrement(counter_t *c)
{//计数器减
}

## 线程安全的链表

#include "lock.c"

void list_init(list_t* L)
{//链表初始化函数
lock_init((void*)&L->lock);
}

void list_insert(list_t *L, unsigned int key)
{//链表插入函数
node_t *new = malloc(sizeof(node_t));
if (new == NULL) {
perror("malloc");
return;
}//异常处理
new->key = key;
lock_acquire((void*)&L->lock);
lock_release((void*)&L->lock);
}

void* list_lookup(list_t *L, unsigned int key)
{//链表结点查询函数
void* rv = NULL;
lock_acquire((void*)&L->lock);
while (curr) {
if (curr->key == key) {
rv = curr;
break;
}
curr = curr->next;
}
lock_release((void*)&L->lock);
return rv;
}

void list_delete(list_t *L, unsigned int key)
{//链表结点删除函数
return;
lock_acquire((void*)&L->lock);
node_t *index = L->head, *pre = NULL;
while (index != NULL) {
if (index->key == key) {
break;
}
pre = index;
index = index->next;
}
if (index != NULL) {
if (pre == NULL) {
free(index);
}
else {
pre->next = index->next;
free(index);
}
}
lock_release((void*)&L->lock);
}
void list_free(list_t *L)
{//链表释放函数
if(L!=NULL)
{
node_t *tmp1,*tmp2;
while(tmp1!=NULL)
{//先释放node结点
tmp2=tmp1->next;
free(tmp1);
tmp1=tmp2;
}
free(L);//最后释放自己
}
}

## 线程安全的哈希表

Hash 函数可以用最简单的对 key 取模。

void hash_free(hash_t *H)
{
int i;
for(i=0;i<H->buckets;i++)
list_free((list_t*)&(hash->lists[i]));
free(H->lists);//释放指向list的指针的空间
free(H);//最后释放哈希表
}

int *a=(int*)malloc(sizeof(int)*100);
for(i=0;i<100;i++)
free(a+i);

#define BUCKETS (101)

typedef struct __hash_t {
int buckets;
list_t* lists;
} hash_t;//包含分类元和一个指向list的指针

void hash_init(hash_t *H, int buckets)
{//哈希表初始化
H->buckets = buckets;
H->lists = (list_t*) malloc(sizeof(list_t) * buckets);
for (int i = 0; i < buckets; ++i)
list_init((list_t*)&(H->lists[i]));
}

void hash_insert(hash_t *H, int key)
{//哈希表插入函数
int bucket = key % H->buckets;
return list_insert((list_t*)&(H->lists[bucket]), key);
}

void* hash_lookup(hash_t *H, int key)
{//哈希表查询函数
int bucket = key % H->buckets;
return list_lookup((list_t*)&(H->lists[bucket]), key);
}

void hash_delete(hash_t *hash, unsigned int key)
{//哈希表删除函数
int bucket = key % hash->buckets;
list_delete((list_t*)&(hash->lists[bucket]), key);
}

void hash_free(hash_t *H)
{//哈希表释放函数
int i;
for(i=0;i<H->buckets;i++)
{//先释放每个指针对应的list
list_t *L=H->lists+i;
if(L!=NULL)
{
node_t *tmp1,*tmp2;
while(tmp1!=NULL)
{
tmp2=tmp1->next;
free(tmp1);
tmp1=tmp2;
}
}
}//注意不能直接调用list_free函数
free(H->lists);//释放指向list的指针的空间
free(H);//最后释放哈希表
}

## 二相锁

// atomic_increment(mutex), lock->count++;
while (xchg(&(lock->atomic),1) == 1)
;
lock->count++;
lock->atomic = 0;

// atomic_decrement(mutex), lock->count--;
while (xchg(&(lock->atomic),1) == 1)
;
lock->count--;
lock->atomic = 0;
return;

futex_wait (mutex, v);的实现是：

sys_futex(&(lock->mutex), FUTEX_WAIT, v, 0, 0, 0);

futex_wake (mutex);的实现是：
sys_futex(&(lock->mutex), FUTEX_WAKE, lock->count, 0, 0, 0);

void mutex_lock (int *mutex) {
int v;
/* Bit 31 was clear, we got the mutex (this is the fastpath) */
if (atomic_bit_test_set (mutex, 31) == 0)
return;
atomic_increment (mutex);
while (1) {
if (atomic_bit_test_set (mutex, 31) == 0) {
atomic_decrement (mutex);
return;
}
/* We have to wait now. First make sure the futex value
we are monitoring is truly negative (i.e. locked). */
v = *mutex;
if (v >= 0)
continue;
futex_wait (mutex, v);
}
}

void mutex_unlock (int *mutex) {
/* Adding 0x80000000 to the counter results in 0 if and only if
there are not other interested threads */
return;
/* There are other threads waiting for this mutex,
wake one of them up. */
futex_wake (mutex);
}

/* Adding 0x80000000 to the counter results in 0 if and only if
there are not other interested threads */
return;
/* There are other threads waiting for this mutex,
wake one of them up. */

#include "two_phase.h"
#include "sys_futex.c"
#include "xchg.c"

typedef struct __two_phase_t {
int mutex;
int atomic;
int count;
}

void two_phase_init(two_phase_t* lock){
lock->mutex = 0;
lock->atomic = 0;
lock->count = 0;
}

void two_phase_acquire(two_phase_t* lock){
if (xchg(&(lock->mutex, 31)) == 0)
return;

// atomic_increment(mutex), lock->count++;
while (xchg(&(lock->atomic),1) == 1)
;
lock->count++;
lock->atomic = 0;

while (1) {
if (xchg(&(lock->mutex, 31)) == 0) {
// atomic_decrement(mutex), lock->count--;
while (xchg(&(lock->atomic),1) == 1)
;
lock->count--;
lock->atomic = 0;
return;
}

int v = lock->mutex;

if (v >= 0)
continue;

sys_futex(&(lock->mutex), FUTEX_WAIT, v, 0, 0, 0);
}

}

void two_phase_release(two_phase_t *lock) {
lock->mutex = 0;
if (lock->count != 0)
sys_futex(&(lock->mutex), FUTEX_WAKE, lock->count, 0, 0, 0);
}

## 公平性

typedef struct __lock_t {
int ticket;
int turn;
} lock_t;

void lock_init(lock_t* lock){
lock->ticket = 0;
lock->turn = 0;
}

void lock(lock_t* lock) {
while (lock->turn != myturn)
; // spin
}

void unlock(lock_t * lock){
lock->turn = lock->turn + 1;
}

typedef struct __lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;

void lock_init(lock_t* lock){
m->flag = 0;
m->guard = 0;
queue_init(m->q);
}

void lock(loct_t* m) {
while (TestAndSet(&m->guard, 1) == 1)
; // acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
m->guard = 0;
park();
}
}

void unlock(){
while (TestAndSet(&m->guard, 1) == 1)
; // acquired guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
unpark(queue_remove(m->q)); // hold lock (for next thread!)
m->guard = 0;
}

typedef struct __node_t {
int value;
struct __node_t *next;
} node_t;

typedef struct __queue_t {
node_t *tail;
} queue_t;

void Queue_Init(queue_t *q) {
node_t *tmp = malloc(sizeof(node_t));
tmp->next = NULL;
}

void Queue_Enqueue(queue_t *q, int value) {
node_t *tmp = malloc(sizeof(node_t));
assert(tmp != NULL);
tmp->value = value;
tmp->next = NULL;

q->tail->next = tmp;
q->tail = tmp;
}

int Queue_Dequeue(queue_t *q, int *value) {
return -1; // queue was empty
}
free(tmp);
return 0;
}

#include "xchg.c"
#include "sys_futex.c"

typedef struct __node_t {
int value;
struct __node_t *next;
} node_t;

typedef struct __queue_t {
node_t *tail;
spinlock_t tailLock;
int size;
} queue_t;

void Queue_Init(queue_t *q) {
node_t *tmp = malloc(sizeof(node_t));
tmp->next = NULL;
spinlock_init(&q->tailLock);
q->size = 0;
}

void Queue_Enqueue(queue_t *q, int value) {
node_t *tmp = malloc(sizeof(node_t));
tmp->value = value;
tmp->next = NULL;

spinlock_acquire(&q->tailLock);
q->tail->next = tmp;
q->tail = tmp;
size++;
spinlock_release(&q->tailLock);
}

int Queue_Dequeue(queue_t *q, int *value) {
return -1; // queue was empty
}
size--;
free(tmp);
return 0;
}

typedef struct __justifylock_t {
int flag;
int guard;
queue_t *q;
} justifylock_t;

void justifylock_init(lock_t* lock){
m->flag = 0;
m->guard = 0;
Queue_Init(m->q);
}

void justifylock_acquire(loct_t* m) {
while (xchg(&m->guard, 1) == 1)
; // acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
Queue_Enqueue(m->q, gettid());
m->guard = 0;
park();
}
}

void justifylock_release(loct_t* m){
while (xchg(&m->guard, 1) == 1)
; // acquired guard lock by spinning
if (q->size == 0)
m->flag = 0; // let go of lock; no one wants it
else
unpark(Queue_Dequeue(m->q)); // hold lock (for next thread!)
m->guard = 0;
}

## 统一接口lock

#define PTHREAD_LOCK   0
#define SPIN_LOCK      1
#define MUTEX_LOCK     2

#define LOCK_TYPE 0
//标志锁的类型
void lock_init(void* lock_t);
//锁初始化函数
void lock_acquire(void *lock_t);
//锁申请函数
void lock_release(void *lock_t);
//锁释放函数

## 条件变量

#define atomic_add(P, V) __sync_add_and_fetch((P), (V))
#define cmpxchg(P, O, N) __sync_val_compare_and_swap((P), (O), (N))
#define INT_MAX 2147483647
#define cv_type 1
#define type_value 1
static inline unsigned xchg_32(void *ptr, unsigned x)
{//将x放入寄存器中与第一个指针参数所指的内容交换，返回所指内容原先的值
__asm__ __volatile__("xchgl %0,%1"
:"=r" ((unsigned) x)
:"m" (*(volatile unsigned *)ptr), "0" (x)
:"memory");

return x;
}
//32位寄存器交换指令
typedef struct __Conditional_variables
{
mutex_t *m;
int seq;
}CV_t;
//我们设计的条件变量的数据结构
void cv_init(CV_t *c,mutex_t *m)
{
c->m=(mutex_t*)malloc(sizeof(mutex_t));
mutex_init(c->m);
c->seq=0;
}
//条件变量初始化函数
void cv_free(CV_t *c)
{
free(c->m);
free(c);
}
//条件变量释放函数
//由于空间很少，在测试的时候这部分可忽略
void cv_signal(CV_t *c)
{
//唤醒一个进程
sys_futex(&c->seq,FUTEX_WAKE_PRIVATE,1,NULL,NULL,0);
}
//唤醒睡眠的单个进程
{
mutex_t *m=c->m;
if(m->flag==0)
//判断是否有进程在等待
return;
//唤醒每一个进程
sys_futex(&c->seq,FUTEX_REQUEUE_PRIVATE,1,(void *)INT_MAX,m,0);
return;
}
//广播唤醒进程
void cv_wait(CV_t *c,mutex_t *m)
{
int seq=c->seq;
if(c->m!=m)
{
//if(c->m) return ;
cmpxchg(&(c->m->flag),NULL,&(m->flag));
if(c->m!=m) return ;
}
mutex_release(m);
sys_futex(&c->seq,FUTEX_WAIT_PRIVATE,seq,NULL,NULL,0);
while(xchg_32(&(m->flag),2))
{
sys_futex(&m->flag,FUTEX_WAIT_PRIVATE,2,NULL,NULL,0);
}
return;
}
//让一个进程睡眠，直到被唤醒

pthread_t *p;
unsigned count=0;
mutex_t lock;
CV_t cv;
void increment_count()
{//计数增加函数
if (count == 0)
count = count + 1;
}
else
{//调用我们的条件变量
mutex_acquire(&lock);
if (count == 0)
//cv_signal(&cv);
count = count + 1;
mutex_release(&lock);
}
}

void decrement_count()
{//计数减少函数
while (count == 0)
count = count - 1;
}
else
{//调用我们的条件变量
mutex_acquire(&lock);
if (count == 0)
cv_wait(&cv,&lock);
count = count - 1;
mutex_release(&lock);
}
}

void test1(void *arg)
{//测试函数1，计数增加
int val = *(int*)arg;
while (val--)
{
increment_count();
}
}

void test2(void *arg)
{//测试函数2，计数减少
int val = *(int*)arg;
while (val--)
{
decrement_count();
}
}

for (i = 0; i < thread_no; ++i)
{//产生进程
//int ran=rand()%100+1;生成随机数
//一种思路是用随机数来引导程序进行加减法
//但问题在于不确定性很大，有可能出现死锁情况
//实际测试中，控制在一半有很大概率会发生死锁
//控制在0.51/0.49死锁概率仍然很高
//说明：先让一半的进程进行减法，再让一半的进程进行加法
//这样一定会发生饥饿现象
else
}
for (i = 0; i < thread_no; ++i)
{
//进程销毁
}

0 0 投票数