RequestProcessor
instance with limited throughput (probably
* set to 1), the IDE would try to run all your requests in parallel otherwise.
*
+ * + * Since version 6.3 there is a conditional support for interruption of long running tasks. + * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel } + * but if the task was already running, one was out of luck. Since version 6.3 + * the thread running the task is interrupted and the Runnable can check for that + * and terminate its execution sooner. In the runnable one shall check for + * thread interruption (done from {@link RequestProcessor.Task#cancel }) and + * if true, return immediatelly as in this example: + *
+ * public void run () { + * while (veryLongTimeLook) { + * doAPieceOfIt (); + * + * if (Thread.interrupted ()) return; + * } + * } + *+ * * @author Petr Nejedly, Jaroslav Tulach */ public final class RequestProcessor { @@ -112,6 +130,9 @@ /** The maximal number of processors that can perform the requests sent * to this RequestProcessors. If 1, all the requests are serialized. */ private int throughput; + + /** support for interrupts or not? */ + private boolean interruptThread; /** Creates new RequestProcessor with automatically assigned unique name. */ public RequestProcessor() { @@ -131,10 +152,41 @@ * @since OpenAPI version 2.12 */ public RequestProcessor(String name, int throughput) { + this(name, throughput, false); + } + + /** Creates a new named RequestProcessor with defined throughput which + * can support interruption of the thread the processor runs in. + * There always was a way how to cancel not yet running task using {@link RequestProcessor.Task#cancel } + * but if the task was already running, one was out of luck. With this + * constructor one can create a {@link RequestProcessor} which threads + * thread running tasks are interrupted and the Runnable can check for that + * and terminate its execution sooner. In the runnable one shall check for + * thread interruption (done from {@link RequestProcessor.Task#cancel }) and + * if true, return immediatelly as in this example: + *
+ * public void run () { + * while (veryLongTimeLook) { + * doAPieceOfIt (); + * + * if (Thread.interrupted ()) return; + * } + * } + *+ * + * @param name the name to use for the request processor thread + * @param throughput the maximal count of requests allowed to run in parallel + * @param interruptThread true if {@link RequestProcessor.Task#cancel} shall interrupt the thread + * + * @since 6.3 + */ + public RequestProcessor(String name, int throughput, boolean interruptThread) { this.throughput = throughput; this.name = (name != null) ? name : ("OpenIDE-request-processor-" + (counter++)); + this.interruptThread = interruptThread; } - + + /** The getter for the shared instance of the
RequestProcessor
.
*
* @return an instance of RequestProcessor that is capable of performing
@@ -364,21 +416,19 @@
}
Task askForWork(Processor worker, String debug) {
- synchronized (processorLock) {
- if (stopped || queue.isEmpty()) { // no more work in this burst, return him
- processors.remove(worker);
- Processor.put(worker, debug);
- running--;
-
- return null;
- } else { // we have some work for the worker, pass it
-
- Item i = (Item) queue.remove(0);
- Task t = i.getTask();
- i.clear();
+ if (stopped || queue.isEmpty()) { // no more work in this burst, return him
+ processors.remove(worker);
+ Processor.put(worker, debug);
+ running--;
+
+ return null;
+ } else { // we have some work for the worker, pass it
+
+ Item i = (Item) queue.remove(0);
+ Task t = i.getTask();
+ i.clear(worker);
- return t;
- }
+ return t;
}
}
@@ -458,7 +508,7 @@
notifyRunning();
if (item != null) {
- item.clear();
+ item.clear(null);
}
item = new Item(this, RequestProcessor.this);
@@ -490,7 +540,19 @@
*/
public boolean cancel() {
synchronized (processorLock) {
- boolean success = (item == null) ? false : item.clear();
+ boolean success;
+
+ if (item == null) {
+ success = false;
+ } else {
+ Processor p = item.getProcessor();
+ success = item.clear(null);
+
+ if (p != null) {
+ p.interruptTask(this, RequestProcessor.this);
+ item = null;
+ }
+ }
if (success) {
notifyFinished(); // mark it as finished
@@ -541,17 +603,17 @@
*/
public void waitFinished() {
if (isRequestProcessorThread()) { //System.err.println("Task.waitFinished on " + this + " from other task in RP: " + Thread.currentThread().getName());
-
boolean toRun;
synchronized (processorLock) {
// correct line: toRun = (item == null) ? !isFinished (): (item.clear() && !isFinished ());
// the same: toRun = !isFinished () && (item == null ? true : item.clear ());
- toRun = !isFinished() && ((item == null) || item.clear());
+ toRun = !isFinished() && ((item == null) || item.clear(null));
}
if (toRun) { //System.err.println(" ## running it synchronously");
- run();
+ Processor processor = (Processor)Thread.currentThread();
+ processor.doEvaluate (this, processorLock, RequestProcessor.this);
} else { // it is already running in other thread of this RP
if (lastThread != Thread.currentThread()) {
@@ -586,7 +648,7 @@
boolean toRun;
synchronized (processorLock) {
- toRun = !isFinished() && ((item == null) || item.clear());
+ toRun = !isFinished() && ((item == null) || item.clear(null));
}
if (toRun) {
@@ -614,7 +676,7 @@
/* One item representing the task pending in the pending queue */
private static class Item extends Exception {
private final RequestProcessor owner;
- private Task action;
+ private Object action;
private boolean enqueued;
Item(Task task, RequestProcessor rp) {
@@ -624,22 +686,30 @@
}
Task getTask() {
- return action;
+ Object a = action;
+
+ return (a instanceof Task) ? (Task) a : null;
}
/** Annulate this request iff still possible.
* @returns true if it was possible to skip this item, false
* if the item was/is already processed */
- boolean clear() {
+ boolean clear(Processor processor) {
synchronized (owner.processorLock) {
- action = null;
+ action = processor;
return enqueued ? owner.queue.remove(this) : true;
}
}
+ Processor getProcessor() {
+ Object a = action;
+
+ return (a instanceof Processor) ? (Processor) a : null;
+ }
+
int getPriority() {
- return action.getPriority();
+ return getTask().getPriority();
}
public Throwable fillInStackTrace() {
@@ -669,6 +739,9 @@
//private Item task;
private RequestProcessor source;
+
+ /** task we are working on */
+ private RequestProcessor.Task todo;
private boolean idle = true;
/** Waiting lock */
@@ -771,7 +844,6 @@
}
}
- Task todo;
String debug = null;
ErrorManager em = logger();
@@ -782,8 +854,12 @@
}
// while we have something to do
- while ((todo = current.askForWork(this, debug)) != null) {
- // if(todo != null) {
+ for (;;) {
+ // need the same sync as interruptTask
+ synchronized (current.processorLock) {
+ todo = current.askForWork(this, debug);
+ if (todo == null) break;
+ }
setPrio(todo.getPriority());
try {
@@ -813,15 +889,54 @@
doNotify(todo, t);
}
- // to improve GC
- todo = null;
-
- // }
+ // need the same sync as interruptTask
+ synchronized (current.processorLock) {
+ // to improve GC
+ todo = null;
+ // and to clear any possible interrupted state
+ // set by calling Task.cancel ()
+ Thread.interrupted();
+ }
}
if (loggable) {
logger().log(ErrorManager.INFORMATIONAL, "Work finished " + getName()); // NOI18N
}
+ }
+ }
+
+ /** Evaluates given task directly.
+ */
+ final void doEvaluate (Task t, Object processorLock, RequestProcessor src) {
+ Task previous = todo;
+ boolean interrupted = Thread.interrupted();
+ try {
+ todo = t;
+ t.run ();
+ } finally {
+ synchronized (processorLock) {
+ todo = previous;
+ if (interrupted || todo.item == null) {
+ if (src.interruptThread) {
+ // reinterrupt the thread if it was interrupted and
+ // we support interrupts
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
+
+ /** Called under the processorLock */
+ public void interruptTask(Task t, RequestProcessor src) {
+ if (t != todo) {
+ // not running this task so
+ return;
+ }
+
+ if (src.interruptThread) {
+ // otherwise interrupt this thread
+ interrupt();
}
}
Index: openide/util/test/unit/src/org/openide/util/RequestProcessorTest.java
===================================================================
RCS file: /cvs/openide/util/test/unit/src/org/openide/util/RequestProcessorTest.java,v
retrieving revision 1.3
diff -u -r1.3 RequestProcessorTest.java
--- openide/util/test/unit/src/org/openide/util/RequestProcessorTest.java 20 Jun 2005 15:02:11 -0000 1.3
+++ openide/util/test/unit/src/org/openide/util/RequestProcessorTest.java 22 Jun 2005 21:15:43 -0000
@@ -690,7 +690,7 @@
}
public void testCancelInterruptsTheRunningThread () throws Exception {
- RequestProcessor rp = new RequestProcessor ("Cancellable");
+ RequestProcessor rp = new RequestProcessor ("Cancellable", 1, true);
class R implements Runnable {
public boolean checkBefore;
@@ -750,9 +750,72 @@
assertTrue ("But interupted after", r.checkAfter);
}
}
+
+ public void testCancelDoesNotInterruptTheRunningThread () throws Exception {
+ RequestProcessor rp = new RequestProcessor ("Not Cancellable", 1, false);
+
+ class R implements Runnable {
+ public boolean checkBefore;
+ public boolean checkAfter;
+ public boolean interrupted;
+
+ public synchronized void run () {
+ checkBefore = Thread.interrupted();
+
+ notifyAll ();
+
+ try {
+ wait ();
+ interrupted = false;
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ }
+
+ notifyAll ();
+
+ try {
+ wait ();
+ } catch (InterruptedException ex) {
+ }
+
+ checkAfter = Thread.interrupted();
+
+ notifyAll ();
+ }
+ }
+
+ R r = new R ();
+ synchronized (r) {
+ RequestProcessor.Task t = rp.post (r);
+ r.wait ();
+ assertTrue ("The task is already running", !t.cancel ());
+ r.notifyAll ();
+ r.wait ();
+ r.notifyAll ();
+ r.wait ();
+ assertFalse ("The task has not been interrupted", r.interrupted);
+ assertTrue ("Not before", !r.checkBefore);
+ assertTrue ("Not after - as the notification was thru InterruptedException", !r.checkAfter);
+ }
+
+ // interrupt after the task has finished
+ r = new R ();
+ synchronized (r) {
+ RequestProcessor.Task t = rp.post (r);
+ r.wait ();
+ r.notifyAll ();
+ r.wait ();
+ assertTrue ("The task is already running", !t.cancel ());
+ r.notifyAll ();
+ r.wait ();
+ assertTrue ("The task has not been interrupted by exception", !r.interrupted);
+ assertFalse ("Not interupted before", r.checkBefore);
+ assertFalse ("Not interupted after", r.checkAfter);
+ }
+ }
public void testInterruptedStatusIsClearedBetweenTwoTaskExecution () throws Exception {
- RequestProcessor rp = new RequestProcessor ("testInterruptedStatusIsClearedBetweenTwoTaskExecution");
+ RequestProcessor rp = new RequestProcessor ("testInterruptedStatusIsClearedBetweenTwoTaskExecution", 1, true);
final RequestProcessor.Task[] task = new RequestProcessor.Task[1];
// test interrupted status is cleared after task ends
@@ -797,7 +860,7 @@
}
public void testInterruptedStatusWorksInInversedTasks() throws Exception {
- RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasks");
+ RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasks", 1, true);
class Fail implements Runnable {
public Fail (String n) {
@@ -874,7 +937,7 @@
}
public void testInterruptedStatusWorksInInversedTasksWhenInterruptedSoon() throws Exception {
- RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasksWhenInterruptedSoon");
+ RequestProcessor rp = new RequestProcessor ("testInterruptedStatusWorksInInversedTasksWhenInterruptedSoon", 1, true);
class Fail implements Runnable {
public RequestProcessor.Task wait;