JPCERT コーディネーションセンター

CON09-C. ロックフリープログラミングの手法を使うときは ABA 問題を避ける

CON09-C. ロックフリープログラミングの手法を使うときは ABA 問題を避ける

ロックフリープログラミング (Lock-free programming) の手法を用いると、ロックを明示的に使用することなく、共有データ構造を同時に更新することができる。この手法を用いることで、長時間に渡って待機するスレッドがなくなり、プログラムの性能は向上する。

ロックフリープログラミングには下記の利点がある。

ロックフリープログラミングの手法では、CAS (compare and swap) や LL/SC (load linked/store conditional) といった特別なアトミックプロセッサ命令、あるいは C の標準関数である atomic_compare_exchange を使う必要がある

ロックフリープログラミングの応用例には次のものがある。

ABA 問題は、同期の過程で発生する次のような問題である。ある記憶域からの読み取りが2回行われ、2回とも同じ値が読み取られたとする。2回の読み出しの間に、他のスレッドが値を変更して何か作業を行った後、値を元に戻していたら、最初のスレッドは、値は変更されなかったという間違った判断をしてしまう。

違反コード

以下の違反コード例は、配列中の最大値の要素を0に書き換えようとしている。このコードはマルチスレッド環境で実行され、すべての変数は他のスレッドからアクセスできるものと仮定する。

#include <stdatomic.h>

/*
 * index が array の中で最大値を持つ要素を指すように設定し
 * value を array の最大値に設定する
 */
void find_max_element(atomic_int array[], size_t *index, int *value);

static atomic_int array[];

void func(void) {
  size_t index;
  int value;
  find_max_element(array, &index, &value);
  /* ... */
  if (!atomic_compare_exchange_strong(array[index], &value, 0)) {
    /* Handle error */
  }
}

CAS (compare-and-swap) 操作は、array[index] の現在の値が value である場合にのみ、これを0に設定する。しかし、次に挙げる理由により、このコードは配列の最大値を必ずしもゼロに書き換えない。

適合コード (Mutex)

以下の適合コードは、ミューテックスを使用して、処理の最中にデータが変更されることを防ぐ。このコードはスレッドセーフではあるが、ロックフリーではなくなる。

#include <stdatomic.h>
#include <threads.h>

static atomic_int array[];
static mtx_t array_mutex;

void func(void) {
  size_t index;
  int value;
  if (thrd_success != mtx_lock(&array_mutex)) {
    /* エラー処理 */
  }
  find_max_element(array, &index, &value);
  /* ... */
  if (!atomic_compare_exchange_strong(array[index], &value, 0)) {
    /* Handle error */
  }
  if (thrd_success != mtx_unlock(&array_mutex)) {
    /* エラー処理 */
  }
}
違反コード (GNU Glib)

このコードは、ロックフリープログラミングの手法を使ってキューデータ構造を実装している。glib ライブラリを使っている。関数 CAS() は内部的に g_atomic_pointer_compare_and_exchange() を使用している。

#include <glib.h>
#include <glib-object.h>

typedef struct node_s {
  void *data;
  Node *next;
} Node;

typedef struct queue_s {
  Node *head;
  Node *tail;
} Queue;

Queue* queue_new(void) {
  Queue *q = g_slice_new(sizeof(Queue));
  q->head = q->tail = g_slice_new(sizeof(Node));
  return q;
}

void queue_enqueue(Queue *q, gpointer data) {
  Node *node;
  Node *tail;
  Node *next;

  node = g_slice_new(Node);
  node->data = data;
  node->next = NULL;
  while (TRUE) {
    tail = q->tail;
    next = tail->next;
    if (tail != q->tail) {
      continue;
    }
    if (next != NULL) {
      CAS(&q->tail, tail, next);
      continue;
    }
    if (CAS(&tail->next, null, node)) {
      break;
    }
  }
  CAS(&q->tail, tail, node);
}

gpointer queue_dequeue(Queue *q) {
  Node *node;
  Node *head;
  Node *tail;
  Node *next;
  gpointer data;

  while (TRUE) {
    head = q->head;
    tail = q->tail;
    next = head->next;
    if (head != q->head) {
      continue;
    }
    if (next == NULL) {
      return NULL; /* Empty */
    }
    if (head == tail) {
      CAS(&q->tail, tail, next);
      continue;
    }
    data = next->data;
    if (CAS(&q->head, head, next)) {
      break;
    }
  }
  g_slice_free(Node, head);
  return data;
}

2つのスレッド(T1T2)が同時にキューに対する処理を行っていると想定する。キューは次のような状態である。

head -> A -> B -> C -> tail

このキューに対し、表に示す一連の処理が行われる。

スレッド

操作前のキュー

操作

操作後のキュー

T1

head -> A -> B -> C -> tail

queue_dequeue() 関数の処理に入る
head = A, tail = C
next = B
data = next->data; を実行後
このスレッドはプリエンプトされる

head -> A -> B -> C -> tail

T2

head -> A -> B -> C -> tail

ノードAを削除する

head -> B -> C -> tail

T2

head -> B -> C -> tail

ノードBを削除する

head -> C -> tail

T2

head -> C -> tail

ノードAをキューに追加する

head -> C -> A -> tail

T2

head -> C -> A -> tail

ノードCを削除する

head -> A -> tail

T2

head -> A -> tail

ノードDをキューに新規追加する
キューへの追加後、T2 はプリエンプトされる

head -> A -> D -> tail

T1

head -> A -> D -> tail

スレッド1の処理が再開する
local head = q->head = A を比較 (この場合、真)
q->head をノード Bに変更する (しかしノードBは削除されている)

未定義 {}

この表の処理順序に従うと、head は解放済みメモリを参照することになる。また、(munmap() などを使用して) 回収されたメモリがオペレーティングシステムに返されると、そのメモリへのアクセスは致命的なアクセス違反エラーになる。ABA 問題は、リストから削除されたノードの再利用または削除されたノードが占有していたメモリの回収が理由で発生している。

適合コード (GNU Glib、ハザードポインタ)

[Michael 2004] によると、核にある考え方は、ロックフリーな動的オブジェクトにアクセスしようとするスレッド毎に、ハザードポインタと呼ばれるシングルライター、マルチリーダーの共有ポインタを複数 (典型的には1つか2つ) 用意して関連づけるというものである。ハザードポインタの値は null か、ノードへの参照である。ハザードポインタは、スレッドがアクセスする可能性がまだ残っているノードを参照する。スレッドは、これらのノードがまだ有効なものであるかどうかをチェックせずにアクセスする可能性がある。各ハザードポインタはそのオーナーであるスレッドだけが書き込むことができ、他のスレッドは読み取りのみ行うことができる。

以下のコード例では、関連アルゴリズムとの通信は、ハザードポインタと、不要になったノードのアドレスを引数にしてスレッドが呼び出す RetireNode() 関数を介してのみ実行される。

擬似コード
/* ハザードポインタに関する型と構造体 */
structure HPRecType {
  HP[K]:*Nodetype;
  Next:*HPRecType;
}

/* HPRec list のヘッダ */
HeadHPRec: *HPRecType;
/* Per-thread private variables */
rlist: listType; /* 初期値は空 */
rcount: integer; /* 初期値は 0 */

/* 不要になったノードを処理するルーチン */
RetiredNode(node:*NodeType) {
  rlist.push(node);
  rcount++;
  if(rcount >= R)
    Scan(HeadHPRec);
}

/* スキャンルーチン */
Scan(head:*HPRecType) {
  /* ステージ 1: HP リストをスキャンし、非 null である値を plist へ挿入する */
  plist.init();
  hprec<-head;
  while (hprec != null) {
    for (i<-0 to K-1) {
      hptr<-hprec^HP[i];
      if (hptr!= null)
        plist.insert(hptr);
    }
    hprec<-hprec^Next;
  }

  /* ステージ 2: plist をサーチする */
  tmplist<-rlist.popAll();
  rcount<-0;
  node<-tmplist.pop();
  while (node != null) {
    if (plist.lookup(node)) {
      rlist.push(node);
      rcount++;
    }
    else {
      PrepareForReuse(node);
    }
    node<-tmplist.pop();
  }
  plist.free();
}

スキャンは2つのステージから成る。第一のステージでは、ハザードポインタリストをスキャンして 非 null 値を探す。非 null 値が見つかると、plist と呼ばれるローカルリストに挿入される。このリストは、ハッシュテーブルとして実装できる。第二のステージでは、rlist の各ノードが plist のポインタと照合される。照合した結果一致するものが見つからない場合、ノードは再利用できるものとして識別される。一致するものが見つかった場合、実行中のスレッドによる次のスキャンまで rlist に保持される。plist に対する挿入と検索は、定数時間で行うことができる。メモリ再利用メソッドの役目は、メモリの再利用を許可するとともに、回収したノードをいつ安全に再利用できるか判断することである。

この実装では、削除するポインタをハザードポインタに格納することで、ポインタが他のスレッドに再利用されることを防ぎ、ABA 問題を回避している。

コード
#include <glib.h>
#include <glib-object.h>

void queue_enqueue(Queue *q, gpointer data) {
  Node *node;
  Node *tail;
  Node *next;

  node = g_slice_new(Node);
  node->data = data;
  node->next = NULL;
  while (TRUE) {
    tail = q->tail;
    HAZARD_SET(0, tail);  /* tail を hazard に設定 */
    if (tail != q->tail) {  /* tail が変更されていないことを確認 */
      continue;
    }
    next = tail->next;
    if (tail != q->tail) {
      continue;
    }
    if (next != NULL) {
      CAS(&q->tail, tail, next);
      continue;
    }
    if (CAS(&tail->next, null, node) {
      break;
    }
  }
  CAS(&q->tail, tail, node);
}

gpointer queue_dequeue(Queue *q) {
  Node *node;
  Node *head;
  Node *tail;
  Node *next;
  gpointer data;

  while (TRUE) {
    head = q->head;
    LF_HAZARD_SET(0, head);  /* head を hazard に設定 */
    if (head != q->head) {  /* head が変更されていないことを確認 */
      continue;
    }
    tail = q->tail;
    next = head->next;
    LF_HAZARD_SET(1, next);  /* next を hazard に設定 */
    if (head != q->head) {
      continue;
    }
    if (next == NULL) {
      return NULL; /* 空 */
    }
    if (head == tail) {
      CAS(&q->tail, tail, next);
      continue;
    }
    data = next->data;
    if (CAS(&q->head, head, next)) {
      break;
    }
  }
  LF_HAZARD_UNSET(head);  /*
                           * head を回収し、
                           * 必要なら再利用する
                           */
  return data;
}
適合コード (GNU Glib、ミューテックス)

以下の適合コードでは、mtx_lock() を使ってキューをロックしている。スレッド1がキューをロックしてから処理を行うと、スレッド2 はキューに対する処理を実行できなくなるため、ABA 問題を回避することができる。

#include <threads.h>
#include <glib-object.h>

typedef struct node_s {
  void *data;
  Node *next;
} Node;

typedef struct queue_s {
  Node *head;
  Node *tail;
  mtx_t mutex;
} Queue;

Queue* queue_new(void) {
  Queue *q = g_slice_new(sizeof(Queue));
  q->head = q->tail = g_slice_new(sizeof(Node));
  return q;
}

int queue_enqueue(Queue *q, gpointer data) {
  Node *node;
  Node *tail;
  Node *next;

  /*
   * 内容にアクセスするまえにキューをロック
   * 返り値をチェックして成功したかどうかを確認
   */
  if (thrd_success != mtx_lock(&(q->mutex))) {
    return -1;  /* Indicate failure */
  } else {
    node = g_slice_new(Node);
    node->data = data;
    node->next = NULL;

    if(q->head == NULL) {
      q->head = node;
      q->tail = node;
    } else {
      q->tail->next = node;
      q->tail = node;
    }
    /* Unlock the mutex and check the return code */
    if (thrd_success != mtx_unlock(&(queue->mutex))) {
      return -1;  /* Indicate failure */
    }
  }
  return 0;
}

gpointer queue_dequeue(Queue *q) {
  Node *node;
  Node *head;
  Node *tail;
  Node *next;
  gpointer data;

  if (thrd_success != mtx_lock(&(q->mutex)) {
    return NULL;  /* Indicate failure */
  } else {
    head = q->head;
    tail = q->tail;
    next = head->next;
    data = next->data;
    q->head = next;
    g_slice_free(Node, head);
    if (thrd_success != mtx_unlock(&(queue->mutex))) {
      return NULL;  /* Indicate failure */
    }
  }
  return data;
}
リスク評価

競合状態が発生する可能性は低い。競合状態が発生すると、解放済みのメモリの読取りは、プログラムの異常終了や意図しない情報漏えいにつながる可能性がある。

レコメンデーション

深刻度

可能性

修正コスト

優先度

レベル

CON09-C

P2

L3

参考資料
[Apiki 2006] "Lock-Free Programming on AMD Multi-Core System"
[Asgher 2000] "Practical Lock-Free Buffers"
[Michael 2004] "Hazard Pointers"
翻訳元

これは以下のページを翻訳したものです。

CON09-C. Avoid the ABA problem when using lock-free algorithms (revision 49)

Top へ

Topへ
最新情報(RSSメーリングリストTwitter