Pthread


■基本

■生成

#include <pthread.h>
int pthread_create(pthread_t *id, pthread_attr_t *attr, void* (*funcA)(void*), void* arg);

新しいスレッドが生成され、funcAに引数argが渡されて実行されます。
idには生成されたスレッドのスレッドIDが格納されます。
リソース的な問題で生成出来なかった場合は0以外が返ってきます。


■属性設定

●初期化

int pthread_attr_init(pthread_attr_t *attr);

attrオブジェクトを初期化します。

●破棄

int pthread_attr_destroy(pthread_attr_t *attr);

attrオブジェクトを破棄します。
一度破棄したオブジェクトを再度初期化をしなければ使用できません。


●デタッチ属性設定

int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
int pthread_attr_getdetachstate(const pthread_attr_t *attr,  int  *detachstate);

デタッチ属性を設定したり取得したりします。
設定値は、PTHREAD_CREATE_JOINABLEと、PTHREAD_CREATE_DETACHEDです。


●スケジューリングポリシー属性設定

int pthread_attr_setschedpolicy(pthread_attr_t *attr, int policy);
int pthread_attr_getschedpolicy(const pthread_attr_t *attr, int *policy);

スケジューリングポリシーの設定と取得をします。
設定値は、
SCHED_OTHER(デフォルト)
SCHED_RR(ラウンドロビン)
SCHED_FIFO(先入れ先出し)

工事中


●スケジューリングパラメーター属性設定

int pthread_attr_setschedparam(pthread_attr_t *attr, const struct sched_param *param);
int pthread_attr_getschedparam(const pthread_attr_t *attr, struct sched_param *param);

#include <sched.h>

int sched_setparam(pid_t pid, const struct sched_param *p);
int sched_getparam(pid_t pid, struct sched_param *p);

struct sched_param {
    ...
    int sched_priority;
    ...
};

スケジューリングポリシーがSCHED_OTHERのスレッドに対してスケジューリングパラメーターを設定します。
sched_param構造体のメンバーsched_priorityにプライオリティーを設定して使用します。


int sched_get_priority_max(int policy);
int sched_get_priority_min(int policy); 

policyにSCHED_OTHERを設定して、使える範囲のプライオリティー値を取得します。



●スケジューリング属性継承属性設定

int pthread_attr_setinheritsched(pthread_attr_t *attr, int inherit);
int pthread_attr_getinheritsched(const pthread_attr_t *attr, int *inherit);

スケジューリング属性を作成したスレッドから継承するか、明示的に設定するかをしていします。
設定値は、
PTHREAD_EXPLICIT_SCHED(デフォルト,明示的に設定する)
PTHREAD_INHERIT_SCHED(継承する)


■破棄

void pthread_exit(void *retval);

この関数を呼び出したスレッドを終了します。
retvalにはスレッドの戻りとして渡したい物のアドレスを入れて返します。これはpthread_joinの第二引数でもらい受けたりします。
スレッドは独自のスタックを持ちますが、通常のスタックと使い方は一緒なので、retvalの先にローカル変数を指定してはいけません。


■待ち合わせ

int pthread_join(pthread_t id, void **ret);

idの示すスレッドIDを持ったスレッドの終了を待ちます。(プロセスのwait関数のような存在)
retはスレッドからの戻り値を指し示すポインターへのポインターを指定します。
関数自体は成功すると0を、失敗するとそれ以外を返します。

待ち合わせをしなくても良いスレッドを、デタッチスレッドと呼びます。
デタッチスレッドは終了時にリソースを解放しますが、そうでないスレッドはwait待ち合わせの為にリソースを保存しておきます。

int pthread_detach(pthread_t id);

この関数が呼ばれるとスレッドはデタッチスレッドになります。
また、スレッド属性でデタッチスレッドである事を指定する事も出来ます。


■キャンセル

int pthread_cancel(pthread_t id);

idで指定したスレッドに取消要求を送ります。


int pthread_setcancelstate(int state, int *oldstate);

引数stateに要求に対してどうするかの設定をします。
設定値は、PTHREAD_CANCEL_ENABLE(デフォルト,受け入れる)と、PTHREAD_CANCEL_DISABLE(無視する)です。
oldstateは以前の設定値を取得するのに使います。以前の状態が不要な場合はNULLを指定します。


int pthread_setcanceltype(int type, int oldtype);

キャンセル要求に対しての挙動をどうするかを引数typeに設定します。
oldtypeには以前の設定値が入ります。不要な場合はNULLを設定します。
設定値は、PTHREAD_CANCEL_ASYNCHRONOUS(直ちに実行)、PTHREAD_CANCEL_DEFERRED(デフォルト,特定の関数が実行されるまで待つ)
特定の関数:pthread_join, pthread_cond_wait, pthread_cond_timedwait, pthread_testcancel, sem_wait
(実装依存ですが、システムコールのいくつかも取消ポイントになっているようです)







■ID確認

pthread_t pthread_self(void);

自分自身のスレッドIDを返します。

// sample -----------------------------------
#include <iostream>
#include <pthread.h>
using namespace std;

void* thread_funcA(void* arg){
  for(int i=0;i<3;i++){
    sleep(1);
    cout << (char*)arg << pthread_self() << endl;
  }
  pthread_exit((void*)"from thread1");
}

void* thread_funcB(void* arg){
  for(int i=0;i<3;i++){
    sleep(1);
    cout << (char*)arg << pthread_self() << endl;
  }
  pthread_exit((void*)"from thread2");
}

int main(){
  char str[] = "thread number is ";
  pthread_t threadID, threadID2;
  void *thread_res, *thread_res2;
  int res = pthread_create(&threadID, NULL, thread_funcA, (void*)str);
  int res2 = pthread_create(&threadID2, NULL, thread_funcB, (void*)str);

  sleep(5);//先に他のスレッド終る

  pthread_join(threadID, &thread_res);
  pthread_join(threadID2, &thread_res2);

  cout << (char*)thread_res << endl;
  cout << (char*)thread_res2 << endl;
  return 0;
}
// ------------------------------------------------

$ g++ -g -lpthread -D_REENTRANT main.cpp



■同期化

■セマフォ(手旗信号)

コードのある部分に対して門番のような役目を果たします。
POSIX Real-time Extensionsから取られたスレッド操作に使う物と、System Vセマフォと呼ばれるプロセス同期化に使う物の2種類あります。
互いに似ているものの、互換性は保証されておらず、使用する関数も別になります。

●初期化

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int val);

sem_t型のオブジェクトをvalで指定した値で初期化します。
psharedでセマフォの種類を特定します。0でプロセスローカルなセマフォになります。(それ以外はサポートしていないシステムもあります)

●増加

int sem_post(sem_t *sem);

semをアトミックな操作でインクリメントします。
0で初期化しておけば、最初sem_postで1になります。

●減少

int sem_wait(sem_t *sem);

semをアトミックな操作でデクリメントします。
semが0だった場合、semを他のスレッドがインクリメントしてくれるまで待ちます。

●破棄
int sem_destroy(sem_t *sem);

semを破棄します。
待機中のスレッドが有る場合にこの関数を実行するとエラーが返ります。




■mutex(mutal exclusion 相互排除)

コードのある部分を保護する排他制御装置の役目を果たします。

●初期化

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);

pthread_mutex_t型のmutexを初期化します。
attrには属性を定義します。NULLをしていするとデフォルトの属性が設定されます。
デフォルト属性はfastとなります。
fastでは、既にロックがかかったmutexに対して再度ロックを掛けようとするとブロックされ、デッドロックに陥ります。
二重ロックに対してエラーを返したり、複数回ロックの再帰的に対応するように属性を変更する事ができます。



pthread_mutex_t mutex00 = PTHREAD_MUTEX_INITIALIZER;

このマクロを使う事によって実行のオーバーヘッドを減らして初期化する事が出来ます。


●ロック開始

int pthread_mutex_lock(pthread_mutex_t *mutex);

mutexオブジェクトを使ってクリティカルセクションの保護を開始します。


●ロック解除

int pthread_mutex_unlock(pthread_mutex_t *mutex);

mutexオブジェクトを使ってクリティカルセクションの保護を解放します。






※セマフォ、mutex共に使う関数は正常の場合0を返し、エラーの場合はそれ以外を返します。




※throughput(スループット)   単位時間当たりの処理数。沢山処理が出来る事を、スループットが高いと言います。特にバッチ処理で多く用いられる概念です。
※Response time(レスポンスタイム)     ある処理を要求してからその結果(処理結果でもエラーでも構いません)が返ってくるまでの時間で、特にオンライン処理で多く用いられる概念です。
※TAT(ターンアラウンドタイム、タット)   ある処理を要求してからその結果を「完全に」受け取るまでの時間です。この概念には広い意味があり、例えばオンラインショッピングをしたときに商品を注文 してから自宅に届くまでの時間のことを指したりすることもあります。



戻 る



おまけ

■Javaのスレッド


実装方法

●Thread クラスを継承すし、runメソッドをオーバーライドしstartメソッドを呼ぶ。
runメソッドを直接呼んでも、runメソッドが実 行されるだけで、スレッドは生成されません。

●Runnableインターフェイスを実装し、runメソッドをオーバーライドしstartメソッドを呼ぶ。
runメソッドを直接呼んでも、runメソッドが実 行されるだけで、スレッドは生成されません。


並 行と並列

通常のスレッド処理は、高速なスイッチングで実行スレッドを切替えるために、並行処理 (concurrent)になります。それに対して複数のCPUが搭載されたコンピューターではOSの実装依存ですが、並列処理(parallel)がさ れる事があります。

自分は昔、PC自作などでデュアルCPUに拘った時期がありましたが、CPUの高速化と、仕 事場が16CPUとかになってしまったので、あまり拘りがなくなってしまいした。


スレッドの終了

Javaの場合はメインスレッ ドが終了しても、他のスレッドが終らなければ処理は終了しません。
(スレッド生成時にsetDaemonメソッドでしていしたデーモ ンスレッドは除きます)


スレッドの停止

Threadクラスのsleepメソッドにmsec単位で引数指定します。nsec指定したい時は第二引数にしていします。(ナノ指定はパソコンだとあまりあてにならないと思います)


スレッドの同期

sychronizedキーワードでブロックを囲みます。(関数ブロックも)
関数ブロックの場合は戻り値の前にsychronized指定します。
ブロックの場合はsychronizedと書いた後に()でロックを取るインスタンスを指定します。
概ねthisを渡す事になると思います。

Javaのスレッド同期はインスタンス単位で行われます。
別インスタンスであれば、sychronizedブロックも同時実行されます。

staticなメソッドをsychronized指定した場合は、クラスオブジェクトのロックを使用します。
(そのクラスに対応したJava.lang.classのインスタンスを指定するらしいです)

sychronizedはスコープ内でreturnやexceptionが発生しても、スコープアウト時にロックを解放してくれます。


スレッドの通信

インスタンスは、それぞれwait setというwaitメソッドを実行している(待っている)スレッドの集合体を持っています。
wait setに居るスレッドを起こすには下記のパターンがあります。
●notifyメソッドで通知を受ける
●notifyallメソッドで、
●interruptメソッド
●waitメソッドのタイムアウトが来る


java.lang.Oblectのメソッドたち

waitメソッド

実行したスレッドをwait setに入れます。
スレッドがロックを持っている時に実行できます。(そうでない場合は例外が投げられます)
wait setに入る時にロックは解放されます。


notifyメソッド

wait setの中のスレッドのどれか(実装依存)一つ出します。
スレッドがロックを持っている時に実行できます。(そうでない場合は例外が投げられます)
メソッド実行後もロックは保持されているので、出されたスレッドはロック解放を待ちます。


notifyallメソッド

wait setの中のスレッド全てを出します。
スレッドがロックを持っている時に実行できます。(そうでない場合は例外が投げられます)
メソッド実行後もロックは保持されているので、出されたスレッドはロック解放を待ちます。
解放後に一つのスレッドがロックを獲得し、その他のスレッドはブロック状態でまちます。


race condition(レースコンディション)

スレッドの競争で起こる状況をいいます。


thread safe(スレッドセーフ)

マルチスレッドでも問題無い状態、または問題ない関数やオブジェクトの事をいいます。
フレームワーク側でスレッドセーフ化している場合、利用者側は意識しなくても良いです。


shared resource(シェアードリソース)

共有資源。スレッド間で共有されるオブジェクトの事をいいます。


critical section(クリティカルセクション)

シングルスレッドで動かす必要が有る部分をいいます。


dead lock(デッドロック)

スレッド同士が互いの持つロックの解放を待つ状態をいいます。

●複数の共有リソースが有る
●共有リソースのロックを持ったまま違う共有リソースのロックを取りに行く
●共有リソースを取りに行く順番が決まっていない


atomic(アトミック)

これ以上分割出来ないもの事いいます。
基本系の代入や参照はアトミックです。
int a;
a = 123;
a = 456;
この代入操作を複数のスレッドが違う行を実行しても、123か456の値がaには入ります。
Javaの言語使用だとこれがdoubleやlongだった場合は、volatile宣言されていなればアトミックではなく値は保証されません。
(実装依存で大概の環境ではアトミックとして実装されているようです)


Single Threaded Execution

保護すべき場所をクリティカルセクションとして設定してシングルスレッドのみが操作するようにする設計法です。


Immutable

生成時以外にフィールドを変更しないようにクラス設計をし、sychronizedを使用せずパフォーマンスを上げるようにする設計法です。

immutable(イミュータブル)
mutable(ミュータブル)
イミュターブルは不変を意味し、ミュータブルは変化するものを意味します


Guarded Suspension

実行可能かを表すフィールドを持ち、実行可能になるまで待つようにする設計法です。


Single Threaded Execution

保護すべき場所をクリティカルセクションとして設定してシングルスレッドのみが操作するようにする設計法です。



工事中





戻 る