001/* 002 * Copyright (c) 2013 Nu Echo Inc. All rights reserved. 003 */ 004 005package com.nuecho.rivr.core.channel.synchronous; 006 007import java.util.*; 008import java.util.concurrent.*; 009 010import org.slf4j.*; 011import org.slf4j.helpers.*; 012 013import com.nuecho.rivr.core.channel.*; 014import com.nuecho.rivr.core.channel.synchronous.step.*; 015import com.nuecho.rivr.core.dialogue.*; 016import com.nuecho.rivr.core.servlet.*; 017import com.nuecho.rivr.core.util.*; 018 019/** 020 * Implementation of {@link DialogueChannel} allowing turns to be apparently 021 * exchanged in a regular synchronous fashion while underneath, it is done in 022 * asynchronously. 023 * <p> 024 * This is the most useful implementation of {@link DialogueChannel}. It is used 025 * by the {@link DialogueServlet} to handle different HTTP requests occurring in 026 * separate threads. 027 * <p> 028 * When the controller (such as the {@link DialogueServlet}) uses the 029 * {@link SynchronousDialogueChannel}, it uses the 030 * {@link #start(Dialogue, FirstTurn, Duration, DialogueContext)} method to 031 * start the dialogue and the {@link #doTurn(InputTurn, Duration)} method to 032 * send input turns to the dialogue. In exchange, the 033 * {@link SynchronousDialogueChannel} will return a {@link Step}: 034 * <ul> 035 * <li>{@link OutputTurnStep}: if the dialogue sends an OutputTurn in 036 * response</li> 037 * <li>{@link LastTurnStep}: when the dialogue is done</li> 038 * <li>{@link ErrorStep}: if an error occurred following the delivery of the 039 * {@link InputTurn}</li> 040 * </ul> 041 * <h3>States</h3> 042 * <p> 043 * The {@link SynchronousDialogueChannel} has states related to the dialogue: 044 * <ul> 045 * <li>started</li> 046 * <li>done</li> 047 * </ul> 048 * The {@link #isDialogueStarted()} and {@link #isDialogueDone()} methods 049 * correspond to those states. Additionally, the {@link #isDialogueActive()} 050 * method tells if the dialogue is <i>started</i> but not yet <i>done</i>. 051 * <h3>Time-out values</h3> 052 * <p> 053 * The {@link SynchronousDialogueChannel} internally keeps two 054 * {@link SynchronousQueue SynchronousQueues}: 055 * <ul> 056 * <li>one for communication of output turn from the {@link Dialogue} to the 057 * controller</li> 058 * <li>one for communication of input turn from the controller to the 059 * {@link Dialogue}</li> 060 * </ul> 061 * The default timeout values used while waiting from controller or dialogue can 062 * be set with {@link #setDefaultReceiveFromControllerTimeout(Duration)} and 063 * {@link #setDefaultReceiveFromDialogueTimeout(Duration)}. These properties are 064 * used if the <code>timeout</code> parameter is <code>null</code> for the 065 * {@link #doTurn(OutputTurn, Duration)}, 066 * {@link #start(Dialogue, FirstTurn, Duration, DialogueContext)} and 067 * {@link #doTurn(InputTurn, Duration)} methods. 068 * <p> 069 * If not set, <code>defaultReceiveFromControllerTimeout</code> defaults to <b>5 070 * minutes</b> and <code>defaultReceiveFromDialogueTimeout</code> defaults to 071 * <b>1 minute</b>. This gives 5 minutes for the controller (like the 072 * {@link DialogueServlet}) to provide the {@link InputTurn} and 1 minute for 073 * the controller to provide the next turn or error before a {@link Timeout} 074 * exception is raised. 075 * <p> 076 * <b>Note:</b> There also exists a less important <code>sendTimeout</code> 077 * property which indicates the maximum duration for send operations. A send 078 * operation occurs when the controller sends the input turn to the dialogue and 079 * when the dialogue send the output turn, the last turn or an error to the 080 * controller. Since we expect the dialogue and the controller to already be in 081 * a waiting state when the other thread sends the turn, the send operation 082 * should not block. For this reason, the default value for this property is 083 * <b>5 seconds</b>. This property can be set with the 084 * {@link #setSendTimeout(Duration)} property. 085 * <p> 086 * 087 * @param <F> type of {@link FirstTurn} 088 * @param <L> type of {@link LastTurn} 089 * @param <O> type of {@link OutputTurn} 090 * @param <I> type of {@link InputTurn} 091 * @param <C> type of {@link DialogueContext} 092 * @see DialogueServlet 093 * @see Step 094 * @see OutputTurnStep 095 * @see LastTurnStep 096 * @see ErrorStep 097 * @author Nu Echo Inc. 098 */ 099public final class SynchronousDialogueChannel<I extends InputTurn, O extends OutputTurn, F extends FirstTurn, L extends LastTurn, C extends DialogueContext<I, O>> 100 implements DialogueChannel<I, O> { 101 102 private NamedSynchronousQueue<Step<O, L>> mFromDialogueToController = new NamedSynchronousQueue<Step<O, L>>("dialogue to controller", 103 true); 104 private NamedSynchronousQueue<I> mFromControllerToDialogue = new NamedSynchronousQueue<I>("controller to dialogue", 105 true); 106 private Thread mDialogueThread; 107 108 private Duration mSendTimeout = Duration.seconds(5); 109 110 private Duration mDefaultReceiveFromDialogueTimeout = Duration.minutes(1); 111 private Duration mDefaultReceiveFromControllerTimeout = Duration.minutes(5); 112 113 private final List<DialogueChannelListener<I, O>> mListener = new ArrayList<DialogueChannelListener<I, O>>(); 114 private Logger mLogger = NOPLogger.NOP_LOGGER; 115 116 private volatile boolean mStopped; 117 private volatile boolean mDialogueStarted; 118 private volatile boolean mDialogueDone; 119 120 /** 121 * Gets the maximum duration for send operations. A send operation occurs 122 * when the controller sends the input turn to the dialogue and when the 123 * dialogue send the output turn, the last turn or an error to the 124 * controller. 125 * 126 * @return send duration 127 */ 128 public Duration getSendTimeout() { 129 return mSendTimeout; 130 } 131 132 /** 133 * Sets the maximum duration for send operations. A send operation occurs 134 * when the controller sends the input turn to the dialogue and when the 135 * dialogue send the output turn, the last turn or an error to the 136 * controller. Since we expect the dialogue and the controller to already be 137 * in a waiting state when the other thread sends the turn, the send 138 * operation should not block. For this reason, the default value for this 139 * property is <b>5 seconds</b>. 140 * 141 * @param sendTimeout The timeout value. Cannot be <code>null</code>. A 142 * value of Duration.ZERO (or equivalent) means to wait forever. 143 */ 144 public void setSendTimeout(Duration sendTimeout) { 145 Assert.notNull(sendTimeout, "sendTimeout"); 146 mSendTimeout = sendTimeout; 147 } 148 149 /** 150 * Retrieves the maximum duration for send operations. A send operation 151 * occurs when the controller sends the input turn to the dialogue and when 152 * the dialogue send the output turn, the last turn or an error to the 153 * controller. 154 * 155 * @return default time allowed for the dialogue to generate the next output 156 * turn, the last turn or an error when not specified by the 157 * controller. 158 */ 159 public Duration getDefaultReceiveFromDialogueTimeout() { 160 return mDefaultReceiveFromDialogueTimeout; 161 } 162 163 /** 164 * Sets the maximum duration the controller thread can wait for a turn or an 165 * error from the dialogue thread when not specified by the controller. If 166 * this method is not called, it defaults to 1 minute. 167 * 168 * @param defaultReceiveFromDialogueTimeout The default timeout to use when 169 * not specified by the controller. Cannot be <code>null</code>. 170 */ 171 public void setDefaultReceiveFromDialogueTimeout(Duration defaultReceiveFromDialogueTimeout) { 172 Assert.notNull(defaultReceiveFromDialogueTimeout, "defaultReceiveFromDialogueTimeout"); 173 mDefaultReceiveFromDialogueTimeout = defaultReceiveFromDialogueTimeout; 174 } 175 176 /** 177 * Gets the maximum duration the dialogue thread can wait for a turn from 178 * the controller thread when not specified by the dialogue. 179 * 180 * @return the maximum time allowed for the dialogue to wait for the 181 * controller. 182 */ 183 public Duration getDefaultReceiveFromControllerTimeout() { 184 return mDefaultReceiveFromControllerTimeout; 185 } 186 187 /** 188 * Sets the maximum duration the dialogue thread can wait for a turn from 189 * the controller thread when not specified by the dialogue. 190 * 191 * @param defaultReceiveFromControllerTimeout The default timeout to use 192 * when not specified by the dialogue. Cannot be 193 * <code>null</code>. 194 */ 195 public void setDefaultReceiveFromControllerTimeout(Duration defaultReceiveFromControllerTimeout) { 196 Assert.notNull(defaultReceiveFromControllerTimeout, "defaultReceiveFromControllerTimeout"); 197 mDefaultReceiveFromControllerTimeout = defaultReceiveFromControllerTimeout; 198 } 199 200 /** 201 * Sets the logger for this dialogue channel. 202 * 203 * @param logger The logger. Cannot be <code>null</code>. 204 */ 205 public void setLogger(Logger logger) { 206 Assert.notNull(logger, "logger"); 207 mLogger = logger; 208 } 209 210 /** 211 * Starts a {@link Dialogue} in a new thread. 212 * 213 * @param dialogue Dialogue to start. Cannot be <code>null</code>. 214 * @param firstTurn First turn used passed to 215 * {@link Dialogue#run(FirstTurn, DialogueContext)} method of the 216 * dialogue. Cannot be <code>null</code>. 217 * @param timeout maximum time allowed to receive the turn from the 218 * dialogue. If <code>null</code>, uses the 219 * <code>defaultReceiveFromDialogueTimeout</code> property. A 220 * value of Duration.ZERO (or equivalent) means to wait forever. 221 * @param context Dialogue context to pass to 222 * {@link Dialogue#run(FirstTurn, DialogueContext)} method of the 223 * dialogue. Cannot be <code>null</code>. 224 * @return the first Step of the dialogue. 225 * @throws Timeout If no result can be obtain from dialogue after delay 226 * specified by <code>timeout</code> parameter. 227 * @throws InterruptedException if the dialogue has been interrupted. 228 */ 229 public Step<O, L> start(final Dialogue<I, O, F, L, C> dialogue, final F firstTurn, Duration timeout, final C context) 230 throws Timeout, InterruptedException { 231 232 Runnable runnable = new Runnable() { 233 @Override 234 public void run() { 235 mDialogueStarted = true; 236 for (DialogueChannelListener<I, O> listener : mListener) { 237 listener.onStart(SynchronousDialogueChannel.this); 238 } 239 240 Step<O, L> lastStep; 241 try { 242 L lastTurn = dialogue.run(firstTurn, context); 243 lastStep = new LastTurnStep<O, L>(lastTurn); 244 } catch (Throwable throwable) { 245 mLogger.error("Error in dialogue.", throwable); 246 lastStep = new ErrorStep<O, L>(throwable); 247 } 248 249 try { 250 if (!mStopped) { 251 mLogger.trace("Last step: {}", lastStep); 252 send(mFromDialogueToController, lastStep, mSendTimeout); 253 } 254 } catch (Timeout exception) { 255 mLogger.warn("Timeout while sending last step.", exception); 256 } catch (InterruptedException exception) { 257 mLogger.info("Dialogue interrupted while sending last step.", exception); 258 Thread.currentThread().interrupt(); 259 } catch (Throwable throwable) { 260 mLogger.info("Unexpected error while sending last step.", throwable); 261 } finally { 262 mDialogueDone = true; 263 mFromDialogueToController = null; // ensure we can't receive further output turns from dialogue 264 mFromControllerToDialogue = null; // ensure we can't send further input turns to dialogue 265 for (DialogueChannelListener<I, O> listener : mListener) { 266 listener.onStop(SynchronousDialogueChannel.this); 267 } 268 mLogger.info("Dialogue ended."); 269 } 270 } 271 }; 272 273 mDialogueThread = new Thread(runnable, "Dialogue " + context.getDialogueId()); 274 mDialogueThread.start(); 275 mLogger.info("Dialogue started."); 276 277 return receive(mFromDialogueToController, timeout); 278 } 279 280 /** 281 * Tells if the dialogue has started. 282 * 283 * @return <code>true</code> if the dialogue has started, <code>false</code> 284 * otherwise. 285 */ 286 public boolean isDialogueStarted() { 287 return mDialogueStarted; 288 } 289 290 /** 291 * Tells if the dialogue has ended. 292 * 293 * @return <code>true</code> if the dialogue has ended, <code>false</code> 294 * otherwise. 295 */ 296 public boolean isDialogueDone() { 297 return mDialogueDone; 298 } 299 300 /** 301 * Tells if the dialogue has started but not yet ended. 302 * 303 * @return <code>true</code> if the dialogue has started but not ended yet, 304 * <code>false</code> otherwise. 305 */ 306 public boolean isDialogueActive() { 307 return mDialogueStarted && !mDialogueDone; 308 } 309 310 /** 311 * Stops the dialogue and wait for the dialogue thread to end. 312 * 313 * @param timeout Time to wait for dialogue thread to terminate. A value of 314 * Duration.ZERO (or equivalent) means to wait forever. 315 * @throws InterruptedException if the current thread was interrupted while 316 * waiting for the dialogue thread to terminate. 317 */ 318 public void stop(Duration timeout) throws InterruptedException { 319 stop(); 320 join(timeout); 321 } 322 323 /** 324 * Stops the dialogue. 325 */ 326 public void stop() { 327 mStopped = true; 328 mDialogueThread.interrupt(); 329 } 330 331 /** 332 * Waits for the dialogue thread to end. 333 * 334 * @param timeout maximum time to wait for the thread to end. A value of 335 * Duration.ZERO (or equivalent) means to wait forever. 336 * @throws InterruptedException if the current thread was interrupted while 337 * waiting for the dialogue thread to terminate. 338 */ 339 public void join(Duration timeout) throws InterruptedException { 340 mDialogueThread.join(timeout.getMilliseconds()); 341 } 342 343 /** 344 * Performs a turn exchange: the dialogue channel will return the 345 * {@link InputTurn} 346 * 347 * @param turn The output turn to send. Cannot be <code>null</code>. 348 * @param timeout maximum time allowed to receive the turn from the 349 * dialogue. If <code>null</code>, uses the 350 * <code>defaultReceiveFromControllerTimeout</code>. A value of 351 * Duration.ZERO (or equivalent) means to wait forever. 352 * @return the received {@link InputTurn}. Cannot be <code>null</code>. 353 * @throws Timeout if the dialogue channel has not been able to give the 354 * InputTurn before <code>timeout</code> parameter. 355 * @throws InterruptedException if the thread has been interrupted while 356 * waiting for the result. 357 */ 358 359 @Override 360 public I doTurn(O turn, Duration timeout) throws Timeout, InterruptedException { 361 verifyState(); 362 mLogger.trace("OutputTurn: {}", turn); 363 OutputTurnStep<O, L> turnStep = new OutputTurnStep<O, L>(turn); 364 if (timeout == null) { 365 timeout = mDefaultReceiveFromControllerTimeout; 366 } 367 return exchange(mFromDialogueToController, mFromControllerToDialogue, turnStep, mSendTimeout, timeout); 368 } 369 370 /** 371 * Performs a turn exchange: the dialogue channel will return the Step 372 * 373 * @param turn the input turn to send to the dialogue 374 * @param timeout maximum time allowed to receive the turn from the 375 * dialogue. If <code>null</code>, uses the 376 * <code>defaultReceiveFromDialogueTimeout</code> property. A 377 * value of Duration.ZERO (or equivalent) means to wait forever. 378 * @throws Timeout If no result can be obtain from dialogue after delay 379 * specified by <code>timeout</code> parameter. 380 * @throws InterruptedException if the thread was interrupted wile waiting 381 * for the dialogue step. 382 * @return the {@link Step} wrapping the dialogue next step 383 */ 384 public Step<O, L> doTurn(I turn, Duration timeout) throws Timeout, InterruptedException { 385 verifyState(); 386 mLogger.trace("InputTurn: {}", turn); 387 if (timeout == null) { 388 timeout = mDefaultReceiveFromDialogueTimeout; 389 } 390 return exchange(mFromControllerToDialogue, mFromDialogueToController, turn, mSendTimeout, timeout); 391 } 392 393 private void verifyState() { 394 if (mDialogueThread == null) throw new IllegalStateException("Dialogue is not set"); 395 if (!mDialogueThread.isAlive()) throw new IllegalStateException("Dialogue is not started"); 396 if (mStopped) throw new IllegalStateException("Dialogue is stopped"); 397 } 398 399 private <S, R> R exchange(NamedSynchronousQueue<S> sendQueue, 400 NamedSynchronousQueue<R> receiveQueue, 401 S itemToSend, 402 Duration sendTimeout, 403 Duration receiveTimeout) throws Timeout, InterruptedException { 404 try { 405 send(sendQueue, itemToSend, sendTimeout); 406 return receive(receiveQueue, receiveTimeout); 407 } catch (InterruptedException interruptedException) { 408 if (mStopped) throw new DialogueChannelStopped(); 409 else throw interruptedException; 410 } 411 } 412 413 private static <R> R receive(NamedSynchronousQueue<R> receiveQueue, Duration timeout) throws Timeout, 414 InterruptedException { 415 if (receiveQueue == null) throw new IllegalStateException("Receive queue is closed."); 416 R result = receiveQueue.poll(timeout.getMilliseconds(), TimeUnit.MILLISECONDS); 417 if (result == null) 418 throw new Timeout("Timed-out in receive() after " + timeout + " in [" + receiveQueue.getName() + "]"); 419 return result; 420 } 421 422 private static <S> void send(NamedSynchronousQueue<S> sendQueue, S itemToSend, Duration timeout) throws Timeout, 423 InterruptedException { 424 if (sendQueue == null) throw new IllegalStateException("Send queue is closed."); 425 boolean success = sendQueue.offer(itemToSend, timeout.getMilliseconds(), TimeUnit.MILLISECONDS); 426 if (!success) throw new Timeout("Timed-out in send() after " + timeout + " in [" + sendQueue.getName() + "]"); 427 } 428 429 @Override 430 public void addListener(DialogueChannelListener<I, O> listener) { 431 mListener.add(listener); 432 } 433 434 @Override 435 public void removeListener(DialogueChannelListener<I, O> listener) { 436 mListener.remove(listener); 437 } 438}