Half-Sync/Half-Async Pattern
Intent
The Half-Sync/Half-Async pattern decouples synchronous I/O from asynchronous I/O in a system to simplify concurrent programming effort without degrading execution efficiency.Decompose the services of concurrent software into two separated layers�synchronous and asynchronous�and add a queueing layer to mediate communication between them.
Process higher-level services, such as domain functionality, database queries, or file transfers, synchronously in separate threads or processes. Conversely, process lower-level system services, such as short-lived protocol handlers driven by interrupts from network hardware, asynchronously. If services in the synchronous layer must communicate with services in the asynchronous layer, have them exchange messages via a queuing layer.
Note : Concurrent software often performs both asynchronous and synchronous service processing. Asynchrony is used to process low-level system services efficiently, synchrony to simplify application service processing. To benefit from both programming models, however, it is essential to coordinate asynchronous and synchronous service processing efficiently.
Structure
Implementation
In below example the main method creates an asynchronous service which does not block the main thread while the task is being performed. The main thread continues its work which is similar to Async Method Invocation pattern. The difference between them is that there is a queuing layer between Asynchronous layer and synchronous layer, which allows for different communication patterns between both layers. Such as Priority Queue can be used as queuing layer to prioritize the way tasks are executed. Our implementation is just one simple way of implementing this pattern, there are many variants possible as described in its applications.Step 1: Create an interface which represents some computation that is performed asynchronously and its result. The computation is typically done is background threads and the result is posted back in form of callback. The callback does not implement isComplete, cancel as it is out of scope of this pattern. Note here is this interface extends Callable interface.
public interface AsyncTask<O> extends Callable<O> {
/**
* Is called in context of caller thread before call to {@link #call()}. Large tasks should not be
* performed in this method as it will block the caller thread. Small tasks such as validations
* can be performed here so that the performance penalty of context switching is not incurred in
* case of invalid requests.
*/
void onPreCall();
/**
* A callback called after the result is successfully computed by {@link #call()}. In our
* implementation this method is called in context of background thread but in some variants, such
* as Android where only UI thread can change the state of UI widgets, this method is called in
* context of UI thread.
*/
void onPostCall(O result);
/**
* A callback called if computing the task resulted in some exception. This method is called when
* either of {@link #call()} or {@link #onPreCall()} throw any exception.
*
* @param throwable error cause
*/
void onError(Throwable throwable);
/**
* This is where the computation of task should reside. This method is called in context of
* background thread.
*/
@Override
O call() throws Exception;
}
Step 2: This is the asynchronous layer which does not block when a new request arrives. It just passes the request to the synchronous layer which consists of a queue i.e. a BlockingQueue and a pool of threads i.e.ThreadPoolExecutor. Out of this pool of worker threads one of the thread picks up the task and executes it synchronously in the background and the result is posted back to the caller via a callback.
public class AsynchronousService {
/*
* This represents the queuing layer as well as synchronous layer of the
* pattern. The thread pool contains worker threads which execute the tasks
* in blocking/synchronous manner. Long running tasks should be performed in
* the background which does not affect the performance of main thread.
*/
private ExecutorService service;
/**
* Creates an asynchronous service using {@code workQueue} as communication
* channel between asynchronous layer and synchronous layer. Different types
* of queues such as Priority queue, can be used to control the pattern of
* communication between the layers.
*/
public AsynchronousService(BlockingQueue<Runnable> workQueue) {
service = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, workQueue);
}
/**
* A non-blocking method which performs the task provided in background and
* returns immediately.
* <p>
* On successful completion of task the result is posted back using callback
* method {@link AsyncTask#onPostCall(Object)}, if task execution is unable
* to complete normally due to some exception then the reason for error is
* posted back using callback method {@link AsyncTask#onError(Throwable)}.
* <p>
* NOTE: The results are posted back in the context of background thread in
* this implementation.
*/
public <T> void execute(final AsyncTask<T> task) {
try {
// some small tasks such as validation can be performed here.
task.onPreCall();
} catch (Exception e) {
task.onError(e);
return;
}
service.submit(new FutureTask<T>(task) {
@Override
protected void done() {
super.done();
try {
/*
* called in context of background thread. There is other
* variant possible where result is posted back and sits in
* the queue of caller thread which then picks it up for
* processing. An example of such a system is Android OS,
* where the UI elements can only be updated using UI
* thread. So result must be posted back in UI thread.
*/
task.onPostCall(get());
} catch (InterruptedException e) {
// should not occur
} catch (ExecutionException e) {
task.onError(e.getCause());
}
}
});
}
}
Step 2: Let's test this design pattern
public class Client{
/**
* Program entry point
*
* @param args
* command line args
*/
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
AsynchronousService service = new AsynchronousService(workQueue);
/*
* A new task to calculate sum is received but as this is main thread,
* it should not block. So it passes it to the asynchronous task layer
* to compute and proceeds with handling other incoming requests. This
* is particularly useful when main thread is waiting on Socket to
* receive new incoming requests and does not wait for particular
* request to be completed before responding to new request.
*/
service.execute(new ArithmeticSumTask(1000));
/*
* New task received, lets pass that to async layer for computation. So
* both requests will be executed in parallel.
*/
service.execute(new ArithmeticSumTask(500));
service.execute(new ArithmeticSumTask(2000));
service.execute(new ArithmeticSumTask(1));
}
/**
*
* ArithmeticSumTask
*
*/
static class ArithmeticSumTask implements AsyncTask<Long> {
private long n;
public ArithmeticSumTask(long n) {
this.n = n;
}
/*
* This is the long running task that is performed in background. In our
* example the long running task is calculating arithmetic sum with
* artificial delay.
*/
@Override
public Long call() throws Exception {
return ap(n);
}
/*
* This will be called in context of the main thread where some
* validations can be done regarding the inputs. Such as it must be
* greater than 0. It's a small computation which can be performed in
* main thread. If we did validated the input in background thread then
* we pay the cost of context switching which is much more than
* validating it in main thread.
*/
@Override
public void onPreCall() {
if (n < 0) {
throw new IllegalArgumentException("n is less than 0");
}
}
@Override
public void onPostCall(Long result) {
// Handle the result of computation
System.out.println(result.toString());
}
@Override
public void onError(Throwable throwable) {
throw new IllegalStateException("Should not occur");
}
}
private static long ap(long i) {
try {
Thread.sleep(i);
} catch (InterruptedException e) {
System.out.println("Exception caught." + e);
}
return i * (i + 1) / 2;
}
}
Applicability
Use Half-Sync/Half-Async pattern whena system possesses following characteristics:
- the system must perform tasks in response to external events that occur asynchronously, like hardware interrupts in OS
- it is inefficient to dedicate separate thread of control to perform synchronous I/O for each external source of event
- the higher level tasks in the system can be simplified significantly if I/O is performed synchronously.
- one or more tasks in a system must run in a single thread of control, while other tasks may benefit from multi-threading.
Comments
Post a Comment