001/*
002 * Copyright (c) 2013 Nu Echo Inc. All rights reserved.
003 */
004
005package com.nuecho.rivr.core.channel.synchronous;
006
007import java.util.*;
008import java.util.concurrent.*;
009
010import org.slf4j.*;
011import org.slf4j.helpers.*;
012
013import com.nuecho.rivr.core.channel.*;
014import com.nuecho.rivr.core.channel.synchronous.step.*;
015import com.nuecho.rivr.core.dialogue.*;
016import com.nuecho.rivr.core.servlet.*;
017import com.nuecho.rivr.core.util.*;
018
019/**
020 * Implementation of {@link DialogueChannel} allowing turns to be apparently
021 * exchanged in a regular synchronous fashion while underneath, it is done in
022 * asynchronously.
023 * <p>
024 * This is the most useful implementation of {@link DialogueChannel}. It is used
025 * by the {@link DialogueServlet} to handle different HTTP requests occurring in
026 * separate threads.
027 * <p>
028 * When the controller (such as the {@link DialogueServlet}) uses the
029 * {@link SynchronousDialogueChannel}, it uses the
030 * {@link #start(Dialogue, FirstTurn, Duration, DialogueContext)} method to
031 * start the dialogue and the {@link #doTurn(InputTurn, Duration)} method to
032 * send input turns to the dialogue. In exchange, the
033 * {@link SynchronousDialogueChannel} will return a {@link Step}:
034 * <ul>
035 * <li>{@link OutputTurnStep}: if the dialogue sends an OutputTurn in
036 * response</li>
037 * <li>{@link LastTurnStep}: when the dialogue is done</li>
038 * <li>{@link ErrorStep}: if an error occurred following the delivery of the
039 * {@link InputTurn}</li>
040 * </ul>
041 * <h3>States</h3>
042 * <p>
043 * The {@link SynchronousDialogueChannel} has states related to the dialogue:
044 * <ul>
045 * <li>started</li>
046 * <li>done</li>
047 * </ul>
048 * The {@link #isDialogueStarted()} and {@link #isDialogueDone()} methods
049 * correspond to those states. Additionally, the {@link #isDialogueActive()}
050 * method tells if the dialogue is <i>started</i> but not yet <i>done</i>.
051 * <h3>Time-out values</h3>
052 * <p>
053 * The {@link SynchronousDialogueChannel} internally keeps two
054 * {@link SynchronousQueue SynchronousQueues}:
055 * <ul>
056 * <li>one for communication of output turn from the {@link Dialogue} to the
057 * controller</li>
058 * <li>one for communication of input turn from the controller to the
059 * {@link Dialogue}</li>
060 * </ul>
061 * The default timeout values used while waiting from controller or dialogue can
062 * be set with {@link #setDefaultReceiveFromControllerTimeout(Duration)} and
063 * {@link #setDefaultReceiveFromDialogueTimeout(Duration)}. These properties are
064 * used if the <code>timeout</code> parameter is <code>null</code> for the
065 * {@link #doTurn(OutputTurn, Duration)},
066 * {@link #start(Dialogue, FirstTurn, Duration, DialogueContext)} and
067 * {@link #doTurn(InputTurn, Duration)} methods.
068 * <p>
069 * If not set, <code>defaultReceiveFromControllerTimeout</code> defaults to <b>5
070 * minutes</b> and <code>defaultReceiveFromDialogueTimeout</code> defaults to
071 * <b>1 minute</b>. This gives 5 minutes for the controller (like the
072 * {@link DialogueServlet}) to provide the {@link InputTurn} and 1 minute for
073 * the controller to provide the next turn or error before a {@link Timeout}
074 * exception is raised.
075 * <p>
076 * <b>Note:</b> There also exists a less important <code>sendTimeout</code>
077 * property which indicates the maximum duration for send operations. A send
078 * operation occurs when the controller sends the input turn to the dialogue and
079 * when the dialogue send the output turn, the last turn or an error to the
080 * controller. Since we expect the dialogue and the controller to already be in
081 * a waiting state when the other thread sends the turn, the send operation
082 * should not block. For this reason, the default value for this property is
083 * <b>5 seconds</b>. This property can be set with the
084 * {@link #setSendTimeout(Duration)} property.
085 * <p>
086 *
087 * @param <F> type of {@link FirstTurn}
088 * @param <L> type of {@link LastTurn}
089 * @param <O> type of {@link OutputTurn}
090 * @param <I> type of {@link InputTurn}
091 * @param <C> type of {@link DialogueContext}
092 * @see DialogueServlet
093 * @see Step
094 * @see OutputTurnStep
095 * @see LastTurnStep
096 * @see ErrorStep
097 * @author Nu Echo Inc.
098 */
099public final class SynchronousDialogueChannel<I extends InputTurn, O extends OutputTurn, F extends FirstTurn, L extends LastTurn, C extends DialogueContext<I, O>>
100        implements DialogueChannel<I, O> {
101
102    private NamedSynchronousQueue<Step<O, L>> mFromDialogueToController = new NamedSynchronousQueue<Step<O, L>>("dialogue to controller",
103                                                                                                                true);
104    private NamedSynchronousQueue<I> mFromControllerToDialogue = new NamedSynchronousQueue<I>("controller to dialogue",
105                                                                                              true);
106    private Thread mDialogueThread;
107
108    private Duration mSendTimeout = Duration.seconds(5);
109
110    private Duration mDefaultReceiveFromDialogueTimeout = Duration.minutes(1);
111    private Duration mDefaultReceiveFromControllerTimeout = Duration.minutes(5);
112
113    private final List<DialogueChannelListener<I, O>> mListener = new ArrayList<DialogueChannelListener<I, O>>();
114    private Logger mLogger = NOPLogger.NOP_LOGGER;
115
116    private volatile boolean mStopped;
117    private volatile boolean mDialogueStarted;
118    private volatile boolean mDialogueDone;
119
120    /**
121     * Gets the maximum duration for send operations. A send operation occurs
122     * when the controller sends the input turn to the dialogue and when the
123     * dialogue send the output turn, the last turn or an error to the
124     * controller.
125     *
126     * @return send duration
127     */
128    public Duration getSendTimeout() {
129        return mSendTimeout;
130    }
131
132    /**
133     * Sets the maximum duration for send operations. A send operation occurs
134     * when the controller sends the input turn to the dialogue and when the
135     * dialogue send the output turn, the last turn or an error to the
136     * controller. Since we expect the dialogue and the controller to already be
137     * in a waiting state when the other thread sends the turn, the send
138     * operation should not block. For this reason, the default value for this
139     * property is <b>5 seconds</b>.
140     *
141     * @param sendTimeout The timeout value. Cannot be <code>null</code>. A
142     *            value of Duration.ZERO (or equivalent) means to wait forever.
143     */
144    public void setSendTimeout(Duration sendTimeout) {
145        Assert.notNull(sendTimeout, "sendTimeout");
146        mSendTimeout = sendTimeout;
147    }
148
149    /**
150     * Retrieves the maximum duration for send operations. A send operation
151     * occurs when the controller sends the input turn to the dialogue and when
152     * the dialogue send the output turn, the last turn or an error to the
153     * controller.
154     *
155     * @return default time allowed for the dialogue to generate the next output
156     *         turn, the last turn or an error when not specified by the
157     *         controller.
158     */
159    public Duration getDefaultReceiveFromDialogueTimeout() {
160        return mDefaultReceiveFromDialogueTimeout;
161    }
162
163    /**
164     * Sets the maximum duration the controller thread can wait for a turn or an
165     * error from the dialogue thread when not specified by the controller. If
166     * this method is not called, it defaults to 1 minute.
167     *
168     * @param defaultReceiveFromDialogueTimeout The default timeout to use when
169     *            not specified by the controller. Cannot be <code>null</code>.
170     */
171    public void setDefaultReceiveFromDialogueTimeout(Duration defaultReceiveFromDialogueTimeout) {
172        Assert.notNull(defaultReceiveFromDialogueTimeout, "defaultReceiveFromDialogueTimeout");
173        mDefaultReceiveFromDialogueTimeout = defaultReceiveFromDialogueTimeout;
174    }
175
176    /**
177     * Gets the maximum duration the dialogue thread can wait for a turn from
178     * the controller thread when not specified by the dialogue.
179     *
180     * @return the maximum time allowed for the dialogue to wait for the
181     *         controller.
182     */
183    public Duration getDefaultReceiveFromControllerTimeout() {
184        return mDefaultReceiveFromControllerTimeout;
185    }
186
187    /**
188     * Sets the maximum duration the dialogue thread can wait for a turn from
189     * the controller thread when not specified by the dialogue.
190     *
191     * @param defaultReceiveFromControllerTimeout The default timeout to use
192     *            when not specified by the dialogue. Cannot be
193     *            <code>null</code>.
194     */
195    public void setDefaultReceiveFromControllerTimeout(Duration defaultReceiveFromControllerTimeout) {
196        Assert.notNull(defaultReceiveFromControllerTimeout, "defaultReceiveFromControllerTimeout");
197        mDefaultReceiveFromControllerTimeout = defaultReceiveFromControllerTimeout;
198    }
199
200    /**
201     * Sets the logger for this dialogue channel.
202     *
203     * @param logger The logger. Cannot be <code>null</code>.
204     */
205    public void setLogger(Logger logger) {
206        Assert.notNull(logger, "logger");
207        mLogger = logger;
208    }
209
210    /**
211     * Starts a {@link Dialogue} in a new thread.
212     *
213     * @param dialogue Dialogue to start. Cannot be <code>null</code>.
214     * @param firstTurn First turn used passed to
215     *            {@link Dialogue#run(FirstTurn, DialogueContext)} method of the
216     *            dialogue. Cannot be <code>null</code>.
217     * @param timeout maximum time allowed to receive the turn from the
218     *            dialogue. If <code>null</code>, uses the
219     *            <code>defaultReceiveFromDialogueTimeout</code> property. A
220     *            value of Duration.ZERO (or equivalent) means to wait forever.
221     * @param context Dialogue context to pass to
222     *            {@link Dialogue#run(FirstTurn, DialogueContext)} method of the
223     *            dialogue. Cannot be <code>null</code>.
224     * @return the first Step of the dialogue.
225     * @throws Timeout If no result can be obtain from dialogue after delay
226     *             specified by <code>timeout</code> parameter.
227     * @throws InterruptedException if the dialogue has been interrupted.
228     */
229    public Step<O, L> start(final Dialogue<I, O, F, L, C> dialogue, final F firstTurn, Duration timeout, final C context)
230            throws Timeout, InterruptedException {
231
232        Runnable runnable = new Runnable() {
233            @Override
234            public void run() {
235                mDialogueStarted = true;
236                for (DialogueChannelListener<I, O> listener : mListener) {
237                    listener.onStart(SynchronousDialogueChannel.this);
238                }
239
240                Step<O, L> lastStep;
241                try {
242                    L lastTurn = dialogue.run(firstTurn, context);
243                    lastStep = new LastTurnStep<O, L>(lastTurn);
244                } catch (Throwable throwable) {
245                    mLogger.error("Error in dialogue.", throwable);
246                    lastStep = new ErrorStep<O, L>(throwable);
247                }
248
249                try {
250                    if (!mStopped) {
251                        mLogger.trace("Last step: {}", lastStep);
252                        send(mFromDialogueToController, lastStep, mSendTimeout);
253                    }
254                } catch (Timeout exception) {
255                    mLogger.warn("Timeout while sending last step.", exception);
256                } catch (InterruptedException exception) {
257                    mLogger.info("Dialogue interrupted while sending last step.", exception);
258                    Thread.currentThread().interrupt();
259                } catch (Throwable throwable) {
260                    mLogger.info("Unexpected error while sending last step.", throwable);
261                } finally {
262                    mDialogueDone = true;
263                    mFromDialogueToController = null; // ensure we can't receive further output turns from dialogue
264                    mFromControllerToDialogue = null; // ensure we can't send further input turns to dialogue
265                    for (DialogueChannelListener<I, O> listener : mListener) {
266                        listener.onStop(SynchronousDialogueChannel.this);
267                    }
268                    mLogger.info("Dialogue ended.");
269                }
270            }
271        };
272
273        mDialogueThread = new Thread(runnable, "Dialogue " + context.getDialogueId());
274        mDialogueThread.start();
275        mLogger.info("Dialogue started.");
276
277        return receive(mFromDialogueToController, timeout);
278    }
279
280    /**
281     * Tells if the dialogue has started.
282     *
283     * @return <code>true</code> if the dialogue has started, <code>false</code>
284     *         otherwise.
285     */
286    public boolean isDialogueStarted() {
287        return mDialogueStarted;
288    }
289
290    /**
291     * Tells if the dialogue has ended.
292     *
293     * @return <code>true</code> if the dialogue has ended, <code>false</code>
294     *         otherwise.
295     */
296    public boolean isDialogueDone() {
297        return mDialogueDone;
298    }
299
300    /**
301     * Tells if the dialogue has started but not yet ended.
302     *
303     * @return <code>true</code> if the dialogue has started but not ended yet,
304     *         <code>false</code> otherwise.
305     */
306    public boolean isDialogueActive() {
307        return mDialogueStarted && !mDialogueDone;
308    }
309
310    /**
311     * Stops the dialogue and wait for the dialogue thread to end.
312     *
313     * @param timeout Time to wait for dialogue thread to terminate. A value of
314     *            Duration.ZERO (or equivalent) means to wait forever.
315     * @throws InterruptedException if the current thread was interrupted while
316     *             waiting for the dialogue thread to terminate.
317     */
318    public void stop(Duration timeout) throws InterruptedException {
319        stop();
320        join(timeout);
321    }
322
323    /**
324     * Stops the dialogue.
325     */
326    public void stop() {
327        mStopped = true;
328        mDialogueThread.interrupt();
329    }
330
331    /**
332     * Waits for the dialogue thread to end.
333     *
334     * @param timeout maximum time to wait for the thread to end. A value of
335     *            Duration.ZERO (or equivalent) means to wait forever.
336     * @throws InterruptedException if the current thread was interrupted while
337     *             waiting for the dialogue thread to terminate.
338     */
339    public void join(Duration timeout) throws InterruptedException {
340        mDialogueThread.join(timeout.getMilliseconds());
341    }
342
343    /**
344     * Performs a turn exchange: the dialogue channel will return the
345     * {@link InputTurn}
346     *
347     * @param turn The output turn to send. Cannot be <code>null</code>.
348     * @param timeout maximum time allowed to receive the turn from the
349     *            dialogue. If <code>null</code>, uses the
350     *            <code>defaultReceiveFromControllerTimeout</code>. A value of
351     *            Duration.ZERO (or equivalent) means to wait forever.
352     * @return the received {@link InputTurn}. Cannot be <code>null</code>.
353     * @throws Timeout if the dialogue channel has not been able to give the
354     *             InputTurn before <code>timeout</code> parameter.
355     * @throws InterruptedException if the thread has been interrupted while
356     *             waiting for the result.
357     */
358
359    @Override
360    public I doTurn(O turn, Duration timeout) throws Timeout, InterruptedException {
361        verifyState();
362        mLogger.trace("OutputTurn: {}", turn);
363        OutputTurnStep<O, L> turnStep = new OutputTurnStep<O, L>(turn);
364        if (timeout == null) {
365            timeout = mDefaultReceiveFromControllerTimeout;
366        }
367        return exchange(mFromDialogueToController, mFromControllerToDialogue, turnStep, mSendTimeout, timeout);
368    }
369
370    /**
371     * Performs a turn exchange: the dialogue channel will return the Step
372     *
373     * @param turn the input turn to send to the dialogue
374     * @param timeout maximum time allowed to receive the turn from the
375     *            dialogue. If <code>null</code>, uses the
376     *            <code>defaultReceiveFromDialogueTimeout</code> property. A
377     *            value of Duration.ZERO (or equivalent) means to wait forever.
378     * @throws Timeout If no result can be obtain from dialogue after delay
379     *             specified by <code>timeout</code> parameter.
380     * @throws InterruptedException if the thread was interrupted wile waiting
381     *             for the dialogue step.
382     * @return the {@link Step} wrapping the dialogue next step
383     */
384    public Step<O, L> doTurn(I turn, Duration timeout) throws Timeout, InterruptedException {
385        verifyState();
386        mLogger.trace("InputTurn: {}", turn);
387        if (timeout == null) {
388            timeout = mDefaultReceiveFromDialogueTimeout;
389        }
390        return exchange(mFromControllerToDialogue, mFromDialogueToController, turn, mSendTimeout, timeout);
391    }
392
393    private void verifyState() {
394        if (mDialogueThread == null) throw new IllegalStateException("Dialogue is not set");
395        if (!mDialogueThread.isAlive()) throw new IllegalStateException("Dialogue is not started");
396        if (mStopped) throw new IllegalStateException("Dialogue is stopped");
397    }
398
399    private <S, R> R exchange(NamedSynchronousQueue<S> sendQueue,
400                              NamedSynchronousQueue<R> receiveQueue,
401                              S itemToSend,
402                              Duration sendTimeout,
403                              Duration receiveTimeout) throws Timeout, InterruptedException {
404        try {
405            send(sendQueue, itemToSend, sendTimeout);
406            return receive(receiveQueue, receiveTimeout);
407        } catch (InterruptedException interruptedException) {
408            if (mStopped) throw new DialogueChannelStopped();
409            else throw interruptedException;
410        }
411    }
412
413    private static <R> R receive(NamedSynchronousQueue<R> receiveQueue, Duration timeout) throws Timeout,
414            InterruptedException {
415        if (receiveQueue == null) throw new IllegalStateException("Receive queue is closed.");
416        R result = receiveQueue.poll(timeout.getMilliseconds(), TimeUnit.MILLISECONDS);
417        if (result == null)
418            throw new Timeout("Timed-out in receive() after " + timeout + " in [" + receiveQueue.getName() + "]");
419        return result;
420    }
421
422    private static <S> void send(NamedSynchronousQueue<S> sendQueue, S itemToSend, Duration timeout) throws Timeout,
423            InterruptedException {
424        if (sendQueue == null) throw new IllegalStateException("Send queue is closed.");
425        boolean success = sendQueue.offer(itemToSend, timeout.getMilliseconds(), TimeUnit.MILLISECONDS);
426        if (!success) throw new Timeout("Timed-out in send() after " + timeout + " in [" + sendQueue.getName() + "]");
427    }
428
429    @Override
430    public void addListener(DialogueChannelListener<I, O> listener) {
431        mListener.add(listener);
432    }
433
434    @Override
435    public void removeListener(DialogueChannelListener<I, O> listener) {
436        mListener.remove(listener);
437    }
438}