Skip to main content

Concurrency, BlockingQueue not useable for simple usecase?

3 replies [Last post]
hjohn
Offline
Joined: 2007-05-04

I have this use case for a LinkedBlockingQueue, that I can't get to work because of what seems to be limitations in the concurrency package. Since a lot of time went into this new system, I can't help but wonder if I'm doing something wrong or perhaps am missing the point.

Want I want is this:

I have a LinkedBlockingQueue on which a thread waits using the take() method. There are two use cases for this queue:

1) I want to add something to the queue and want it processed when my thread gets to it adding as much items as you want -- so far so good, works like a charm.

2) I want to add something to the queue, but only if the queue is currently empty (the use case is: the user wants to start a certain task immediately, but is only allowed to do so when no tasks are currently running or queued).

No matter what I do, case 2 won't work with LinkedBlockingQueue (atleast not in a thread safe manner).

For example, I tried doing this:

Thread 1:

for(;;) {
Object o;
synchronized(queue) {
o = queue.take();
}
// do stuff
}

Thread 2:

synchronized(queue) {
if(queue.isEmpty()) {
queue.add(new Object());
}
}

Surprisingly, this code deadlocks on isEmpty(). For some reason, the take() method wants to obtain the synchronized lock on queue, which is not possible because Thread 2 already has it. I assume this is because internally isEmpty() wants to obtain the internal lock of the queue, and to get it Thread 1 has to wake up for a second.

This means there is no way to externally synchronize action to be taken on a queue, and since there is no way to get the internal lock the queue uses (it is not even available to subclasses) I can't add this functionality some other way either.

The only option left as far as I can see is to write my own BlockingQueue, but I'm hoping someone here has a better solution.

Reply viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.
tarbo
Offline
Joined: 2006-12-18

The concurrency package is bulit mainly around not having to use the [i]synchronized[/i] statement. Consider removing it from your threads. Don't worry; locking and synchronising is done internally.

whartung
Offline
Joined: 2003-06-13

> The concurrency package is bulit mainly around not
> having to use the [i]synchronized[/i] statement.
> Consider removing it from your threads. Don't
> worry; locking and synchronising is done internally.

But, that's not the point.

The point is that he wants to atomically add an item to a queue, but only if the queue is empty.

[code]
if (queue.isEmpty()) {
queue.add(new Object());
}
[/code]

This is not an atomic operation. Between the "isEmpty" and the "queue.add", and item can be added.

I don't have a solution to you "addIfEmpty" problem. The other option is you use wrapper that you can synchronize around rather than using the queue directly.

pietblok
Offline
Joined: 2003-07-17

You might move the constraint from the put thread to the take thread and mark the object that has a constraint with some interface (i.e. OnlyOnEmtptyQueue).

Switch from LinkedBlockungQueue to LinkedBlockingDeque.

Now, in the put thread, use putFirst unconditional for the constrained object and use put or putLast for non constrained objects.

In the take thread, implement the constraint as follows:

while true {
Object nextItem = queue.take();
if (nextItem instanceof OnlyOnEmptyQueue && queue.size() != 0) {
// message user that nextItem will not be processed
} else {
// process nextItem
}
}

Hope this helps.

Piet