001/* 002 * Copyright (c) 2013 Nu Echo Inc. All rights reserved. 003 */ 004 005package com.nuecho.rivr.core.servlet.session; 006 007import java.util.*; 008 009import org.slf4j.*; 010 011import com.nuecho.rivr.core.channel.*; 012import com.nuecho.rivr.core.dialogue.*; 013import com.nuecho.rivr.core.servlet.*; 014import com.nuecho.rivr.core.util.*; 015 016/** 017 * Stores {@link Session} and check for expirations. 018 * <p> 019 * Rivr has its own SessionContainer independent from the Web server. This 020 * solves many issues related to cookies and encoding of session ID in the URI. 021 * <p> 022 * A clean-up thread checks periodically that every session has not timed-out. 023 * The scan period and the session time-out value can be specified in the 024 * {@link #SessionContainer(Logger, Duration, Duration, String) constructor}, 025 * although they are normally specified via the {@link DialogueServlet}. 026 * 027 * @see DialogueServlet#setSessionScanPeriod(Duration) 028 * @see DialogueServlet#setSessionTimeout(Duration) 029 * @author Nu Echo Inc. 030 * @param <F> type of {@link FirstTurn} 031 * @param <L> type of {@link LastTurn} 032 * @param <O> type of {@link OutputTurn} 033 * @param <I> type of {@link InputTurn} 034 * @param <C> type of {@link DialogueContext} 035 */ 036public final class SessionContainer<I extends InputTurn, O extends OutputTurn, F extends FirstTurn, L extends LastTurn, C extends DialogueContext<I, O>> { 037 038 private final Logger mLogger; 039 private final Duration mSessionTimeout; 040 private final Duration mTimeoutCheckScanPeriod; 041 private final String mName; 042 043 private final Map<String, Session<I, O, F, L, C>> mSessions = new HashMap<String, Session<I, O, F, L, C>>(); 044 private final Map<String, Long> mLastAccess = new HashMap<String, Long>(); 045 private boolean mStopped; 046 private Thread mTimeoutCheckScanThread; 047 048 public SessionContainer(Logger logger, Duration sessionTimeout, Duration timeoutCheckScanPeriod, String name) { 049 mLogger = logger; 050 mSessionTimeout = sessionTimeout; 051 mTimeoutCheckScanPeriod = timeoutCheckScanPeriod; 052 mName = name; 053 launchThread(); 054 } 055 056 private void launchThread() { 057 058 Runnable cleanUpRunnable = new Runnable() { 059 060 @Override 061 public void run() { 062 063 while (!mStopped) { 064 try { 065 Iterator<String> sessionIdsIterator = mSessions.keySet().iterator(); 066 067 long currentTime = System.currentTimeMillis(); 068 while (sessionIdsIterator.hasNext()) { 069 String sessionId = sessionIdsIterator.next(); 070 Long lastAccess = mLastAccess.get(sessionId); 071 if (lastAccess != null) { 072 if (currentTime - lastAccess > mSessionTimeout.getMilliseconds()) { 073 Session<I, O, F, L, C> sessionContext = mSessions.get(sessionId); 074 sessionContext.stop(); 075 } 076 } 077 } 078 079 Thread.sleep(mTimeoutCheckScanPeriod.getMilliseconds()); 080 } catch (InterruptedException interruptedException) { 081 if (mStopped) { 082 //Interrupts can be swallowed if you know the thread is about to exit 083 } else { 084 Thread.currentThread().interrupt(); 085 } 086 } catch (Throwable throwable) { 087 mLogger.error("Error during session time-out check.", throwable); 088 } 089 } 090 091 Iterator<String> sessionIdsIterator = mSessions.keySet().iterator(); 092 093 //stopping all sessions 094 while (sessionIdsIterator.hasNext()) { 095 mSessions.get(sessionIdsIterator.next()).stop(); 096 } 097 } 098 }; 099 100 mTimeoutCheckScanThread = new Thread(cleanUpRunnable, "Session cleanup thread for " + mName); 101 mTimeoutCheckScanThread.setDaemon(true); 102 mTimeoutCheckScanThread.start(); 103 } 104 105 public void addSession(Session<I, O, F, L, C> session) { 106 String sessionId = session.getId(); 107 mSessions.put(sessionId, session); 108 updateLastAccessTime(sessionId); 109 } 110 111 public void removeSession(String sessionId) { 112 mSessions.remove(sessionId); 113 mLastAccess.remove(sessionId); 114 } 115 116 public Session<I, O, F, L, C> getSession(String sessionId) { 117 updateLastAccessTime(sessionId); 118 Session<I, O, F, L, C> session = mSessions.get(sessionId); 119 if (session != null) { 120 session.keepAlive(); 121 } 122 return session; 123 } 124 125 private void updateLastAccessTime(String sessionId) { 126 mLastAccess.put(sessionId, System.currentTimeMillis()); 127 } 128 129 public synchronized void stop() { 130 if (!mStopped) { 131 132 final Collection<Session<I, O, F, L, C>> sessions = new HashSet<Session<I, O, F, L, C>>(mSessions.values()); 133 134 for (Session<I, O, F, L, C> session : sessions) { 135 try { 136 mLogger.info("Stopping session {}.", session.getId()); 137 session.stop(); 138 } catch (Throwable throwable) { 139 mLogger.error("Unable to stop session {}.", session.getId(), throwable); 140 } 141 } 142 143 Runnable waitForDialoguesToTerminate = new Runnable() { 144 @Override 145 public void run() { 146 for (Session<I, O, F, L, C> session : sessions) { 147 String sessionId = session.getId(); 148 try { 149 mLogger.info("Waiting for dialogue thread {} to terminate.", sessionId); 150 session.getDialogueChannel().join(Duration.seconds(10)); 151 mLogger.info("Dialogue thread {} terminated.", sessionId); 152 } catch (InterruptedException throwable) { 153 mLogger.error("Stopped waiting for dialogue threads to terminate.", sessionId); 154 return; 155 } 156 } 157 } 158 }; 159 160 Thread waitForDialoguesToTerminateThread = new Thread(waitForDialoguesToTerminate, 161 "Session container/waiting for dialogue termination"); 162 163 waitForDialoguesToTerminateThread.start(); 164 try { 165 waitForDialoguesToTerminateThread.join(Duration.seconds(10).getMilliseconds()); 166 if (waitForDialoguesToTerminateThread.isAlive()) { 167 waitForDialoguesToTerminateThread.interrupt(); 168 waitForDialoguesToTerminateThread.join(Duration.seconds(2).getMilliseconds()); 169 } 170 171 for (Session<I, O, F, L, C> session : sessions) { 172 if (!session.getDialogueChannel().isDialogueDone()) { 173 mLogger.warn("Dialogue {} is still not terminated. Possible leak detected.", session.getId()); 174 } 175 } 176 177 } catch (InterruptedException exception) { 178 waitForDialoguesToTerminateThread.interrupt(); 179 } finally { 180 mStopped = true; 181 mTimeoutCheckScanThread.interrupt(); 182 } 183 } 184 } 185 186 public Collection<String> getSessionIds() { 187 return new HashSet<String>(mSessions.keySet()); 188 } 189}