Program,Process,Thread
在介紹Thread之前,我們必須先把Program和Process這兩個觀念作一個釐清。
- Program:一群程式碼的集合,用以解決特定的問題。以物件導向的觀念來類比,相當於Class。
- Process:由Program所產生的執行個體,一個Program可以同時執行多次,產生多個Process。以物件導向的觀念來類比,相當於Object。每一個Process又由以下兩個東西組成
- 一個Memory Space。相當於Object的variable,不同Process的Memory Space也不同,彼此看不到對方的Memory Space。
- 一個以上的Thread。Thread代表從某個起始點開始(例如main),到目前為止所有函數的呼叫路徑,以及這些呼叫路徑上所用到的區域變數。當然程式的執行狀態,除了紀錄在主記憶體外,CPU內部的暫存器(如Program Counter, Stack Pointer, Program Status Word等)也需要一起紀錄。所以Thread又由下面兩項組成
- Stack:紀錄函數呼叫路徑,以及這些函數所用到的區域變數
- 目前CPU的狀態
如何產生Thread
Java以java.lang.Thread這個類別來表示Thread。Class Thread有兩個Constructor:
- Thread()
- Thread(Runnable)
第一個Constrctor沒有參數,第二個需要一個Runnable物件當參數。Runnable是一個interface,定義於java.lang內,其宣告為
public interface Runnable { public void run(); }
使用Thread()產生的Thread,其進入點為Thread裡的run();使用Thread(Runnable)產生的Thread,其進入點為Runnable物件裡的run()。當run()結束時,這個Thread也就結束了;這和main()結束有相同的效果。其用法以下面範例說明:
public class ThreadExample1 extends Thread { public void run() { // override Thread's run() System.out.println("Here is the starting point of Thread."); for (;;) { // infinite loop to print message System.out.println("User Created Thread"); } } public static void main(String[] argv) { Thread t = new ThreadExample1(); // 產生Thread物件 t.start(); // 開始執行t.run() for (;;) { System.out.println("Main Thread"); } } }
以上程式執行後,螢幕上會持續印出"User Created Thread"或"Main Thread"的字樣。利用Runnable的寫法如下
public class ThreadExample2 implements Runnable { public void run() { // implements Runnable run() System.out.println("Here is the starting point of Thread."); for (;;) { // infinite loop to print message System.out.println("User Created Thread"); } } public static void main(String[] argv) { Thread t = new Thread(new ThreadExample2()); // 產生Thread物件 t.start(); // 開始執行Runnable.run(); for (;;) { System.out.println("Main Thread"); } } }
Thread的優先權與影響資源的相關方法
Thread.setPriority(int)可以設定Thread的優先權,數字越大優先權越高。Thread定義了3個相關的static final variable
public static final int MAX_PRIORITY 10 public static final int MIN_PRIORITY 1 public static final int NORM_PRIORITY 5
要提醒讀者的是,優先權高的Thread其佔有CPU的機會比較高,但優先權低的也都會有機會執行到。其他有關Thread執行的方法有:
- yield():先讓給別的Thread執行
- sleep(int time):休息time mini second(1/1000秒)
- join():呼叫ThreadA.join()的執行緒會等到ThreadA結束後,才能繼續執行
你可以執行下面的程式,看看yield()的效果
public class ThreadExample1 extends Thread { public void run() { // overwrite Thread's run() System.out.println("Here is the starting point of Thread."); for (int i =0; i < 20; i++) { // infinite loop to print message System.out.println("User Created Thread"); yield(); } } public static void main(String[] argv) { Thread t = new ThreadExample1(); // 產生Thread物件 t.start(); // 開始執行t.run() for (int i =0; i < 20; i++) { System.out.println("Main Thread"); yield(); } } }
===================================================================================
Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Main Thread Here is the starting point of Thread. User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread User Created Thread
===================================================================================
觀看join的效果
public class JoinExample extends Thread { String myId; public JoinExample(String id) { myId = id; } public void run() { // overwrite Thread's run() for (int i=0; i < 20; i++) { System.out.println(myId+" Thread"); } } public static void main(String[] argv) { Thread t1 = new JoinExample("T1"); // 產生Thread物件 Thread t2 = new JoinExample("T2"); // 產生Thread物件 t1.start(); // 開始執行t1.run() t2.start(); try { t1.join(); // 等待t1結束 t2.join(); // 等待t2結束 } catch (InterruptedException e) {} for (int i=0;i < 5; i++) { System.out.println("Main Thread"); } } }
==================================================================================
T1 Thread T1 Thread T1 Thread T1 Thread T1 Thread T1 Thread T2 Thread T2 Thread T2 Thread T2 Thread T2 Thread T2 Thread T2 Thread T2 Thread T2 Thread T2 Thread T1 Thread T1 Thread T1 Thread T2 Thread T2 Thread T2 Thread T1 Thread T1 Thread T2 Thread T1 Thread T1 Thread T1 Thread T2 Thread T2 Thread T2 Thread T2 Thread T2 Thread T1 Thread T1 Thread T1 Thread T1 Thread T1 Thread T2 Thread T1 Thread Main Thread Main Thread Main Thread Main Thread Main Thread
======================================================================================
觀看sleep的效果
public class SleepExample extends Thread { String myId; public SleepExample(String id) { myId = id; } public void run() { // overwrite Thread's run() for (int i=0; i < 20; i++) { System.out.println(myId+" Thread"); try { sleep(100); } catch (InterruptedException e) {} } } public static void main(String[] argv) { Thread t1 = new SleepExample("T1"); // 產生Thread物件 Thread t2 = new SleepExample("T2"); // 產生Thread物件 t1.start(); // 開始執行t1.run() t2.start(); } }
=======================================================================================
T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread T1 Thread T2 Thread
Critical Section(關鍵時刻)的保護措施
如果設計者沒有提供保護機制的話,Thread取得和失去CPU控制權的時機是由作業系統來決定。也就是說Thread可能在執行任何一個機器指令時,被作業系統取走CPU控制權,並交給另一個Thread。由於某些真實世界的動作是不可分割的,例如跨行轉帳X圓由A帳戶到B帳戶,轉帳前後這兩個帳戶的總金額必須相同,但以程式來實作時,卻無法用一個指令就完成,如轉帳可能要寫成下面的這一段程式碼
if (A >= X) { A = A - X; // 翻譯成3個機器指令LOAD A, SUB X, STORE A B = B +X; }
如果兩個Thread同時要存取A,B兩帳戶進行轉帳,假設當Thread one執行到SUBX後被中斷,Threadtwo接手執行完成另一個轉帳要求,然後Threadone繼續執行未完成的動作,請問這兩個轉帳動作正確嗎?我們以A=1000,B=0,分別轉帳100,200圓來說明此結果
LOAD A // Thread 1, 現在A還是1000 SUB 100 // Thread 1 LOAD A // 假設此時Thread 1被中斷,Thread 2接手, 因為Thread 1 還沒有執行STORE A, 所以變數A還是1000 SUB 200 // Thread 2 STORE A // Thread 2, A = 800 LOAD B // Thread 2, B現在是0 ADD 200 // Thread 2 STORE B // B=200 STORE A // Thread 1拿回控制權, A = 900 LOAD B // Thread 1, B = 200 ADD 100 // Thread 1 STORE B // B = 300
你會發現執行完成後A=900,B=300,也就是說銀行平白損失了200圓。當然另外的執行順序可能造成其他不正確的結果。我們把這問題再整理一下:
- 寫程式時假設指令會循序執行
- 某些不可分割的動作,需要以多個機器指令來完成
- Thread執行時可能在某個機器指令被中斷
- 兩個Thread可能執行同一段程式碼,存取同一個資料結構
- 這樣就破壞了第1點的假設
因此在撰寫多執行緒的程式時,必須特別考慮這種狀況(又稱為race condition)。Java的解決辦法是,JVM會在每個物件上擺一把鎖(lock),然後程式設計者可以宣告執行某一段程式(通常是用來存取共同資料結構的程式碼, 又稱為Critical Section)時,必須拿到某物件的鎖才行,這個鎖同時間最多只有一個執行緒可以擁有它。
public class Transfer extends Thread { public static Object lock = new Object(); public static int A = 1000; public static int B = 0; private int amount; public Transfer(int x) { amount = x; } public void run() { synchronized(lock) { // 取得lock,如果別的thread A已取得,則目前這個thread會等到thread A釋放該lock if (A >= amount) { A = A - amount; B = B + amount; } } // 離開synchronized區塊後,此thread會自動釋放lock } public static void main(String[] argv) { Thread t1 = new Transfer(100); Thread t2 = new Transfer(200); t1.start(); t2.start(); } }
除了synchronized(ref)的語法可以鎖定ref指到的物件外,synchronized也可以用在object method前面,表示要鎖定this物件才能執行該方法。以下是Queue結構的範例
public class Queue { private Object[] data; private int size; private int head; private int tail; public Queue(int maxLen) { data = new Object[maxLen]; } public synchronized Object deQueue() { Object tmp = data[head]; data[head] = null; head = (head+1)%data.length; size--; return tmp; } public synchronized void enQueue(Object c) { data[tail++] = c; tail %= data.length; size++; } }
雖然上面的程式正確無誤,但並未考慮資源不足時該如何處理。例如Queue已經沒有資料了,卻還想拿出來;或是Queue裡已經塞滿了資料,使用者卻還要放進去?我們當然可以使用Exception Handling的機制:
public class Queue { private Object[] data; private int size; private int head; private int tail; public Queue(int maxLen) { data = new Object[maxLen]; } public synchronized Object deQueue() throws Exception { if (size == 0) { throw new Exception(); } Object tmp = data[head]; data[head] = null; head = (head+1)%data.length; size--; return tmp; } public synchronized void enQueue(Object c) throws Exception { if (size >= maxLen) { throw new Exception(); } data[tail++] = c; tail %= data.length; size++; } }
但假設我們的執行環境是,某些Thread專門負責讀取使用者的需求,並把工作放到Queue裡面,某些Thread則專門由Queue裡抓取工作需求做進一步處理。這種架構的好處是,可以把慢速或不定速的輸入(如透過網路讀資料,連線速度可能差很多),和快速的處理分開,可使系統的反應速度更快,更節省資源。那麼以Exceptoin來處理Queue空掉或爆掉的情況並不合適,因為使用Queue的人必須處理例外狀況,並不斷的消耗CPU資源:
public class Getter extends Thread { Queue q; public Getter(Queue q) { this.q = q; } public void run() { for (;;) { try { Object data = q.deQueue(); // processing } catch(Exception e) { // if we try to sleep here, user may feel slow response // if we do not sleep, CPU will be wasted } } } } public class Putter extends Thread { Queue q; public Putter(Queue q) { this.q = q; } public void run() { for (;;) { try { Object data = null; // get user request q.enQueue(data); } catch(Exception e) { // if we try to sleep here, user may feel slow response // if we do not sleep, CPU will be wasted } } } } public class Main { public static void main(String[] argv) { Queue q = new Queue(10); Getter r1 = new Getter(q); Getter r2 = new Getter(q); Putter w1 = new Putter(q); Putter w2 = new Putter(q); r1.start(); r2.start(); w1.start(); w2.start(); } }
為了解決這類資源分配的問題,Java Object提供了下面三個method:
- wait():使呼叫此方法的Thread進入Blocking Mode,並設為等待該Object, 呼叫wait()時, 該Thread必須擁有該物件的lock。Blocking Mode下的Thread必須釋放所有手中的lock,並且無法使用CPU。
- notifyAll():讓等待該Object的所有Thread進入Runnable Mode。
- notify():讓等待該Object的某一個Thread進入Runnable Mode。
所謂Runnable Mode是指該Thread隨時可由作業系統分配CPU資源。Blocking Mode表示該Thread正在等待某個事件發生,作業系統不會讓這種Thread取得CPU資源。前一個Queue的範例就可以寫成:
public class Queue { private Object[] data; private int size; private int head; private int tail; public Queue(int maxLen) { data = new Object[maxLen]; } public synchronized Object deQueue() { while (size==0) { // When executing here, Thread must have got lock and be in running mode // Let current Thread wait this object(to sleeping mode) try { wait(); // to sleeping mode, and release all lock } catch(Exception ex) {}; } Object tmp = data[head]; data[head] = null; head = (head+1)%data.length; if (size==data.length) { // wake up all Threads waiting this object notifyAll(); } size--; return tmp; } // release lock public synchronized void enQueue(Object c) { while (size==data.length) { // When executing here, Thread must have got lock and be in running mode // Let current thread wait this object(to sleeping mode) try { wait(); // to sleeping mode, and release all lock } catch(Exception ex) {}; } data[tail++] = c; tail %= data.length; size++; if (size==1) { // wake up all Threads waiting this object notifyAll(); } } } public class ReaderWriter extends Thread { public static final int READER = 1; public static final int WRITER = 2; private Queue q; private int mode; public void run() { for (int i=0; i < 1000; i++) { if (mode==READER) { q.deQueue(); } else if (mode==WRITER) { q.enQueue(new Integer(i)); } } } public ReaderWriter(Queue q, int mode) { this.q = q; this.mode = mode; } public static void main(String[] args) { Queue q = new Queue(5); ReaderWriter r1, r2, w1, w2; (w1 = new ReaderWriter(q, WRITER)).start(); (w2 = new ReaderWriter(q, WRITER)).start(); (r1 = new ReaderWriter(q, READER)).start(); (r2 = new ReaderWriter(q, READER)).start(); try { w1.join(); // wait until w1 complete w2.join(); // wait until w2 complete r1.join(); // wait until r1 complete r2.join(); // wait until r2 complete } catch(InterruptedException epp) { } } }
Multiple Reader-Writer Monitors
上一節的Queue資料結構,不論是enQueue()或deQueue()都會更動到Queue的內容。而在許多應用裡,資料結構可以允許同時多個讀一個寫。本節舉出幾個不同的例子,說明多個Reader-Writer時的可能排程法。
Single Reader-Writer, 只同時允許一個執行緒存取
public class SingleReaderWriter { int n; // number of reader and write, 0 or 1 public synchronized void startReading() throws InterruptedException { while (n != 0) { wait(); } n = 1; } public synchronized void stopReading() { n = 0; notify(); } public synchronized void startWriting() throws InterruptedException { while (n != 0) { wait(); } n = 1; } public synchronized void stopWriting() { n = 0; notify(); } } // 這是一個使用範例, 程式能否正確執行要靠呼叫正確的start和stop public class WriterThread extends Thread { SingleReaderWriter srw; public WriterThread(SingleReaderWriter srw) { this.srw = srw; } public void run() { startWring(); // insert real job here stopWriting(); } } public class ReaderThread extends Thread { SingleReaderWriter srw; public ReaderThread(SingleReaderWriter srw) { this.srw = srw; } public void run() { startReading(); // insert real job here stopReading(); } } public class Test { public static void main(String[] argv) { SingleReaderWriter srw = new SingleReaderWriter; // create four threads (new WriterThread(srw)).start(); (new WriterThread(srw)).start(); (new ReaderThread(srw)).start(); (new ReaderThread(srw)).start(); } }
其他可能的策略實作如下:
Reader優先:
public class ReadersPreferredMonitor { int nr; // The number of threads currently reading, nr > = 0 int nw; // The number of threads currently writing, 0 or 1 int nrtotal; // The number of threads either reading or waiting to read, nrtotal > = nr int nwtotal; // The number of threads either writing or waiting to write public synchronized void startReading() throws InterruptedException { nrtotal++; // 想要read的thread又多了一個 while (nw != 0) { // 還有write thread正在write wait(); } nr++; // 正在讀的thread多了一個 } public synchronized void startWriting() throws InterruptedException { nwtotal++; // 想要寫的thread又多了一個 while (nrtotal+nw != 0) { // 只要有thread想要讀,或是有thread正在寫,禮讓 wait(); } nw = 1; } public synchronized void stopReading() { nr--; // 正在讀的少一個 nrtotal--; // 想要讀的少一個 if (nrtotal == 0) { // 如果沒有要讀的,叫醒想寫的 notify(); } } public synchronized void stopWriting() { nw = 0; // 沒有thread正在寫 nwtotal--; // 想寫的少一個 notifyAll(); // 叫醒所有想讀和想寫的 } }
Writer優先:
public class WritersPreferredMonitor { int nr; // The number of threads currently reading, nr > = 0 int nw; // The number of threads currently writing, 0 or 1 int nrtotal; // The number of threads either reading or waiting to read, nrtotal > = nr int nwtotal; // The number of threads either writing or waiting to write public synchronized void startReading() throws InterruptedException { nrtotal++; // 想要read的thread又多了一個 while (nwtotal != 0) { // 還有thread想要write wait(); } nr++; // 正在讀的thread多了一個 } public synchronized void startWriting() throws InterruptedException { nwtotal++; // 想要寫的thread又多了一個 while (nr+nw != 0) { // 有thread正在讀,或是有thread正在寫 wait(); } nw = 1; } public synchronized void stopReading() { nr--; // 正在讀的少一個 nrtotal--; // 想要讀的少一個 if (nr == 0) { // 如果沒有正在讀的,叫醒所有的(包括想寫的) notifyAll(); } } public synchronized void stopWriting() { nw = 0; // 沒有thread正在寫 nwtotal--; // 想寫的少一個 notifyAll(); // 叫醒所有想讀和想寫的 } }
Reader和Writer交互執行:
public class AlternatingReadersWritersMonitor { int[] nr = new int[2]; // The number of threads currently reading int thisBatch; // Index in nr of the batch of readers currently reading(0 or 1) int nextBatch = 1; // Index in nr of the batch of readers waitin to read(always 1-thisBatch) int nw; // The number of threads currently writing(0 or 1) int nwtotal; // The number of threads either writing or waiting to write public synchronized void startReading() throws InterruptedException { if (nwtotal == 0) { // 沒有thread要write, 將reader都放到目前要處理的這一批 nr[thisBatch]++; } else { nr[nextBatch]++; int myBatch = nextBatch; while (thisBatch != myBatch) { wait(); } } } public synchronized void stopReading() { nr[thisBatch]--; if (nr[thisBatch] == 0) { // 目前這批的reader都讀完了,找下一個writer notifyAll(); } } public synchronized void startWriting() throws InterruptedException { nwtotal++; while (nr[thisBatch]+nw != 0) { // 目前這批還沒完,或有thread正在寫 wait(); } nw = 1; } public synchronized void stopWriting() { nw = 0; nwtotal--; int tmp = thisBatch; // 交換下一批要讀的 thisBatch = nextBatch; nextBatch = tmp; notifyAll(); } }
給號依序執行
public class TakeANumberMonitor { int nr; // The number of threads currently reading int nextNumber; // The number to be taken by the next thread to arrive int nowServing; // The number of the thread to be served next public synchronized void startReading() throws InterruptedException { int myNumber = nextNumber++; while (nowServing != myNumber) { // 還沒輪到我 wait(); } nr++; // 多了一個Reader nowServing++; // 準備檢查下一個 notifyAll(); } public synchronized void startWriting() throws InterruptedException { int myNumber = nextNumber++; while (nowServing != myNumber) { // 還沒輪到我 wait(); } while (nr > 0) { // 要等所有的Reader結束 wait(); } } public synchronized void stopReading() { nr--; // 少了一個Reader if (nr == 0) { notifyAll(); } } public synchronized void stopWriting() { nowServing++; // 準備檢查下一個 notifyAll(); } }
Reference:
https://programming.im.ncnu.edu.tw/J_Chapter9.htm
No comments:
Post a Comment