Index: src/main/java/org/ow2/petals/component/framework/process/JBIAcceptorManager.java =================================================================== --- src/main/java/org/ow2/petals/component/framework/process/JBIAcceptorManager.java (revision 31597) +++ src/main/java/org/ow2/petals/component/framework/process/JBIAcceptorManager.java (working copy) @@ -21,10 +21,9 @@ package org.ow2.petals.component.framework.process; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -52,14 +51,16 @@ */ public class JBIAcceptorManager implements NotificationListener { - protected final Logger logger; + private final Logger logger; - private ThreadPoolExecutor acceptorPool = null; + private final List acceptorPool = new ArrayList(); + + private final JBIAcceptorThreadFactory acceptorFactory; + + private final JBIProcessorManager jbiProcessorManager; - protected final JBIProcessorManager jbiProcessorManager; + private final AbstractComponent component; - protected final AbstractComponent component; - private final RuntimeConfigurationNotifier runtimeConfiguration; /** @@ -69,35 +70,38 @@ * @since 3.0 * */ - private class JBIAcceptorThreadFactory implements ThreadFactory { + private class JBIAcceptorThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final ThreadGroup group; + + private final Logger logger; - public JBIAcceptorThreadFactory() { + private final AbstractComponent component; + + private final JBIProcessorManager jbiProcessorManager; + + public JBIAcceptorThreadFactory(final AbstractComponent component, final JBIProcessorManager jbiProcessorManager, final Logger logger) { + this.component = component; + this.jbiProcessorManager = jbiProcessorManager; + this.logger = logger; this.group = new ThreadGroup(Thread.currentThread().getThreadGroup(), - JBIAcceptorManager.this.component.getContext().getComponentName() + this.component.getContext().getComponentName() + "-JBIListener"); } - /* - * (non-Javadoc) - * - * @see java.util.concurrent.ThreadFactory#newThread(java.lang.Runnable) - */ - public Thread newThread(final Runnable r) { - final String name = JBIAcceptorManager.this.component.getContext().getComponentName() + public MessageExchangeAcceptor newThread() { + final String name = this.component.getContext().getComponentName() + " -JBI Acceptor Thread #" + Integer.toString(this.threadNumber.getAndIncrement()); - JBIAcceptorManager.this.logger.log(Level.FINE, + this.logger.log(Level.FINE, "Creating a new thread for JBIAcceptor Factory : " + name); final MessageExchangeAcceptor meaProcessor = new MessageExchangeAcceptor( - JBIAcceptorManager.this.component, JBIAcceptorManager.this.jbiProcessorManager - .getThreadPool(), JBIAcceptorManager.this.jbiProcessorManager - .getObjectPool(), JBIAcceptorManager.this.logger, this.group, name); + this.component, this.jbiProcessorManager.getThreadPool(), + this.jbiProcessorManager.getObjectPool(), this.logger, this.group, name); meaProcessor.setDaemon(true); return meaProcessor; } @@ -123,6 +127,7 @@ this.jbiProcessorManager = jbiProcessorManager; this.logger = log; this.runtimeConfiguration = runtimeConfiguration; + this.acceptorFactory = new JBIAcceptorThreadFactory(this.component, this.jbiProcessorManager, this.logger); } public void init() throws JBIException { @@ -138,11 +143,17 @@ public void start() throws JBIException { // Note : since core-pool-size and maximum-pool-size are the same, the // pool size is fixed. - this.acceptorPool = new ThreadPoolExecutor(this.component.getComponentConfiguration() + /*this.acceptorPool = new ThreadPoolExecutor(this.component.getComponentConfiguration() .getAcceptorPoolSize().getValue(), this.component.getComponentConfiguration() .getAcceptorPoolSize().getValue(), 60, TimeUnit.SECONDS, new SynchronousQueue(), new JBIAcceptorThreadFactory()); - this.acceptorPool.prestartAllCoreThreads(); + this.acceptorPool.prestartAllCoreThreads();*/ + final int corePoolSize = this.component.getComponentConfiguration().getAcceptorPoolSize().getValue(); + for (int i=0; i itAcceptorPool = this.acceptorPool.iterator(); + while (itAcceptorPool.hasNext()) { + final MessageExchangeAcceptor acceptorThread = itAcceptorPool.next(); + itAcceptorPool.remove(); + this.stopAcceptorThread(acceptorThread); + } } public void shutdown() throws JBIException { @@ -163,6 +183,21 @@ throw new JBIException("Error unregistering the runtime configuration listener", e); } } + + private void stopAcceptorThread(final MessageExchangeAcceptor acceptorThread) { + acceptorThread.ends(); + try { + // Wait the end of the pending task + acceptorThread.join(500); + + if (!acceptorThread.isInterrupted()) { + acceptorThread.interrupt(); + } + + } catch (InterruptedException e) { + this.logger.log(Level.WARNING, "The stop one acceptor of the acceptor thread pool was interrupted", e); + } + } /** * Reset the core Pool size. @@ -172,7 +207,26 @@ private void setCorePoolSize(final int size) { if (this.acceptorPool != null) { this.logger.info("Reset the acceptor thread pool size to " + size); - this.acceptorPool.setCorePoolSize(size); + if (size > this.acceptorPool.size()) { + final int nbThreadToCreate = size - this.acceptorPool.size(); + for (int i=0; i itAcceptorPool = this.acceptorPool.iterator(); + while (itAcceptorPool.hasNext() && nbThreadToStop > 0) { + final MessageExchangeAcceptor acceptorThread = itAcceptorPool.next(); + itAcceptorPool.remove(); + this.stopAcceptorThread(acceptorThread); + } + } + else { + this.logger.fine("Unchange value, so no acceptor thread created or stopped !"); + } } } Index: src/main/java/org/ow2/petals/component/framework/process/MessageExchangeAcceptor.java =================================================================== --- src/main/java/org/ow2/petals/component/framework/process/MessageExchangeAcceptor.java (revision 31597) +++ src/main/java/org/ow2/petals/component/framework/process/MessageExchangeAcceptor.java (working copy) @@ -51,7 +51,7 @@ private DeliveryChannel deliveryChannel; - private boolean running = false; + private volatile boolean running = false; private ThreadPoolExecutor jbiProcessorThreadPool; @@ -96,6 +96,7 @@ messageExchangeProcessor.setMessageExchange(exchange); this.jbiProcessorThreadPool.execute(messageExchangeProcessor); + } catch (final PEtALSCDKException e) { this.logger.log(Level.SEVERE, e.getMessage(), e); } catch (final RejectedExecutionException e) { @@ -119,4 +120,9 @@ } } } + + public void ends() { + this.running = false; + this.logger.fine("Thread '" + this.getName() + "' stops accepting JBI messages"); + } }