001/*
002 * Copyright (c) 2013 Nu Echo Inc. All rights reserved.
003 */
004
005package com.nuecho.rivr.core.servlet.session;
006
007import java.util.*;
008
009import org.slf4j.*;
010
011import com.nuecho.rivr.core.channel.*;
012import com.nuecho.rivr.core.dialogue.*;
013import com.nuecho.rivr.core.servlet.*;
014import com.nuecho.rivr.core.util.*;
015
016/**
017 * Stores {@link Session} and check for expirations.
018 * <p>
019 * Rivr has its own SessionContainer independent from the Web server. This
020 * solves many issues related to cookies and encoding of session ID in the URI.
021 * <p>
022 * A clean-up thread checks periodically that every session has not timed-out.
023 * The scan period and the session time-out value can be specified in the
024 * {@link #SessionContainer(Logger, Duration, Duration, String) constructor},
025 * although they are normally specified via the {@link DialogueServlet}.
026 * 
027 * @see DialogueServlet#setSessionScanPeriod(Duration)
028 * @see DialogueServlet#setSessionTimeout(Duration)
029 * @author Nu Echo Inc.
030 * @param <F> type of {@link FirstTurn}
031 * @param <L> type of {@link LastTurn}
032 * @param <O> type of {@link OutputTurn}
033 * @param <I> type of {@link InputTurn}
034 * @param <C> type of {@link DialogueContext}
035 */
036public final class SessionContainer<I extends InputTurn, O extends OutputTurn, F extends FirstTurn, L extends LastTurn, C extends DialogueContext<I, O>> {
037
038    private final Logger mLogger;
039    private final Duration mSessionTimeout;
040    private final Duration mTimeoutCheckScanPeriod;
041    private final String mName;
042
043    private final Map<String, Session<I, O, F, L, C>> mSessions = new HashMap<String, Session<I, O, F, L, C>>();
044    private final Map<String, Long> mLastAccess = new HashMap<String, Long>();
045    private boolean mStopped;
046    private Thread mTimeoutCheckScanThread;
047
048    public SessionContainer(Logger logger, Duration sessionTimeout, Duration timeoutCheckScanPeriod, String name) {
049        mLogger = logger;
050        mSessionTimeout = sessionTimeout;
051        mTimeoutCheckScanPeriod = timeoutCheckScanPeriod;
052        mName = name;
053        launchThread();
054    }
055
056    private void launchThread() {
057
058        Runnable cleanUpRunnable = new Runnable() {
059
060            @Override
061            public void run() {
062
063                while (!mStopped) {
064                    try {
065                        Iterator<String> sessionIdsIterator = mSessions.keySet().iterator();
066
067                        long currentTime = System.currentTimeMillis();
068                        while (sessionIdsIterator.hasNext()) {
069                            String sessionId = sessionIdsIterator.next();
070                            Long lastAccess = mLastAccess.get(sessionId);
071                            if (lastAccess != null) {
072                                if (currentTime - lastAccess > mSessionTimeout.getMilliseconds()) {
073                                    Session<I, O, F, L, C> sessionContext = mSessions.get(sessionId);
074                                    sessionContext.stop();
075                                }
076                            }
077                        }
078
079                        Thread.sleep(mTimeoutCheckScanPeriod.getMilliseconds());
080                    } catch (InterruptedException interruptedException) {
081                        if (mStopped) {
082                            //Interrupts can be swallowed if you know the thread is about to exit
083                        } else {
084                            Thread.currentThread().interrupt();
085                        }
086                    } catch (Throwable throwable) {
087                        mLogger.error("Error during session time-out check.", throwable);
088                    }
089                }
090
091                Iterator<String> sessionIdsIterator = mSessions.keySet().iterator();
092
093                //stopping all sessions
094                while (sessionIdsIterator.hasNext()) {
095                    mSessions.get(sessionIdsIterator.next()).stop();
096                }
097            }
098        };
099
100        mTimeoutCheckScanThread = new Thread(cleanUpRunnable, "Session cleanup thread for " + mName);
101        mTimeoutCheckScanThread.setDaemon(true);
102        mTimeoutCheckScanThread.start();
103    }
104
105    public void addSession(Session<I, O, F, L, C> session) {
106        String sessionId = session.getId();
107        mSessions.put(sessionId, session);
108        updateLastAccessTime(sessionId);
109    }
110
111    public void removeSession(String sessionId) {
112        mSessions.remove(sessionId);
113        mLastAccess.remove(sessionId);
114    }
115
116    public Session<I, O, F, L, C> getSession(String sessionId) {
117        updateLastAccessTime(sessionId);
118        Session<I, O, F, L, C> session = mSessions.get(sessionId);
119        if (session != null) {
120            session.keepAlive();
121        }
122        return session;
123    }
124
125    private void updateLastAccessTime(String sessionId) {
126        mLastAccess.put(sessionId, System.currentTimeMillis());
127    }
128
129    public synchronized void stop() {
130        if (!mStopped) {
131
132            final Collection<Session<I, O, F, L, C>> sessions = new HashSet<Session<I, O, F, L, C>>(mSessions.values());
133
134            for (Session<I, O, F, L, C> session : sessions) {
135                try {
136                    mLogger.info("Stopping session {}.", session.getId());
137                    session.stop();
138                } catch (Throwable throwable) {
139                    mLogger.error("Unable to stop session {}.", session.getId(), throwable);
140                }
141            }
142
143            Runnable waitForDialoguesToTerminate = new Runnable() {
144                @Override
145                public void run() {
146                    for (Session<I, O, F, L, C> session : sessions) {
147                        String sessionId = session.getId();
148                        try {
149                            mLogger.info("Waiting for dialogue thread {} to terminate.", sessionId);
150                            session.getDialogueChannel().join(Duration.seconds(10));
151                            mLogger.info("Dialogue thread {} terminated.", sessionId);
152                        } catch (InterruptedException throwable) {
153                            mLogger.error("Stopped waiting for dialogue threads to terminate.", sessionId);
154                            return;
155                        }
156                    }
157                }
158            };
159
160            Thread waitForDialoguesToTerminateThread = new Thread(waitForDialoguesToTerminate,
161                                                                  "Session container/waiting for dialogue termination");
162
163            waitForDialoguesToTerminateThread.start();
164            try {
165                waitForDialoguesToTerminateThread.join(Duration.seconds(10).getMilliseconds());
166                if (waitForDialoguesToTerminateThread.isAlive()) {
167                    waitForDialoguesToTerminateThread.interrupt();
168                    waitForDialoguesToTerminateThread.join(Duration.seconds(2).getMilliseconds());
169                }
170
171                for (Session<I, O, F, L, C> session : sessions) {
172                    if (!session.getDialogueChannel().isDialogueDone()) {
173                        mLogger.warn("Dialogue {} is still not terminated.  Possible leak detected.", session.getId());
174                    }
175                }
176
177            } catch (InterruptedException exception) {
178                waitForDialoguesToTerminateThread.interrupt();
179            } finally {
180                mStopped = true;
181                mTimeoutCheckScanThread.interrupt();
182            }
183        }
184    }
185
186    public Collection<String> getSessionIds() {
187        return new HashSet<String>(mSessions.keySet());
188    }
189}