Skip to main content

Server Socket

1 reply [Last post]
dc5
Offline
Joined: 2008-08-31

Hey,

When a server socket is waiting to accept incoming connections, is it possible to stop/close the server socket? or do you have to wait until the socket has received a connection or timed out?

Cheers,

DC5

Message was edited by: dc5

Reply viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.
boylejohnr
Offline
Joined: 2008-10-27

I do not think there is a way to exit the thread. However if interested for the JXTAServerPipe I have extended to remove the need for a thread. JXTA is rather wastefull of resources especially when trying to scale to thousands of peer groups.

Every JXTAServer pipe also has three fixed executor threads, I consolidated to one static executor. Shutdown the executor in the super class.

The code is a cludge, when get time would like to get more involved in JXTA to make more lightwieght and scalable. A lot of good work on the NIO integration, but higher up a bit wasteful. For instance a reliable retransmitter thread is required for every BIDI connection.. Could be better pooled or just avoided...

You could try similar for the JXTA Socket, which I have avoided to date since based on blocking IO semantics.

public class JxtaServerPipeNonBlockingAccept extends JxtaServerPipe
{
/**
* This was per class, now for all instances and uses a cached pool instead.
*/
private static final ExecutorService executor = Executors.newCachedThreadPool();
private final ConnectionListener listener;

public JxtaServerPipeNonBlockingAccept(PeerGroup netPeerGroup, PipeAdvertisement adv, ConnectionListener listener)
throws IOException
{
super(netPeerGroup, adv);
this.listener = listener;
Contract.assertNotNull(listener, "Require a listener for connect events");
unsetParentExecutorHack();
}

private void unsetParentExecutorHack()
{
try
{
Field field = JxtaServerPipe.class.getDeclaredField("executor");
field.setAccessible(true);
// TODO: Need to promote change to the JXTA community to remove this
// class completely.
ExecutorService executorLocal = (ExecutorService) field.get(this);
executorLocal.shutdown();
} catch (Exception e)
{
throw new Error("Unable to clean up thread executor created by JXTAServerPipe", e);
}
}

/**
* In order to make none blocking need to override the pipe event, direct
* lift.
*/
@Override
public void pipeMsgEvent(PipeMsgEvent event)
{
Message message = event.getMessage();
if (message == null)
{
return;
}
ConnectionProcessor processor = new ConnectionProcessor(message);
executor.execute(processor);
}

/**
* Direct lift from parent.
*
* @author boylejohnr
*
*/
private class ConnectionProcessor implements Runnable
{

private Message message;

ConnectionProcessor(Message message)
{
this.message = message;
}

public void run()
{
JxtaBiDiPipe bidi = processMessage(message);
// make sure we have a socket returning
if (bidi != null)
{
// Instead of putting message on queue for the accept, will
// now simply call back there is no other way to construct
// this class.
listener.onAccept(bidi);
}
}
}

@Override
@Deprecated
public JxtaBiDiPipe accept()
{
throw new UnsupportedOperationException("Accept is not supported on this class, must use listener");
}

public interface ConnectionListener
{
/**
* Expected that the listener deal with this message as quickly as
* possible or create thread to handle the ongoing work and accept
* immediately.
*
* @param pipe
*/
public void onAccept(JxtaBiDiPipe pipe);
}

/**
* This is a direct lift from the parent, to enable thread reduction. Log
* messages needed to be converted and the group accessed through method.
*
* @param msg
* @return
*/
private JxtaBiDiPipe processMessage(Message msg)
{

PipeAdvertisement outputPipeAdv = null;
PeerAdvertisement peerAdv = null;
StructuredDocument credDoc = null;
try
{
MessageElement el = msg.getMessageElement(nameSpace, credTag);

if (el != null)
{
credDoc = StructuredDocumentFactory.newStructuredDocument(el);
}

el = msg.getMessageElement(nameSpace, reqPipeTag);
if (el != null)
{
XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(el);
outputPipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(asDoc);
}

el = msg.getMessageElement(nameSpace, remPeerTag);
if (el != null)
{
XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(el);
peerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(asDoc);
}

el = msg.getMessageElement(nameSpace, reliableTag);
boolean isReliable = false;
if (el != null)
{
isReliable = Boolean.valueOf((el.toString()));
if (log.isTraceEnabled())
{
log.trace("Connection request [isReliable] :" + isReliable);
}
}

el = msg.getMessageElement(nameSpace, directSupportedTag);
boolean directSupported = false;
if (el != null)
{
directSupported = Boolean.valueOf((el.toString()));
if (log.isTraceEnabled())
{
log.trace("Connection request [directSupported] :" + directSupported);
}
}

Messenger msgr;
boolean direct = false;
if (directSupported)
{
msgr = JxtaBiDiPipe.getDirectMessenger(getGroup(), outputPipeAdv, peerAdv);
if (msgr == null)
{
msgr = JxtaBiDiPipe.lightweightOutputPipe(getGroup(), outputPipeAdv, peerAdv);
} else
{
direct = true;
}
} else
{
msgr = JxtaBiDiPipe.lightweightOutputPipe(getGroup(), outputPipeAdv, peerAdv);
}

if (msgr != null)
{
if (log.isTraceEnabled())
{
log.trace("Reliability set to :" + isReliable);
}
PipeAdvertisement newpipe = newInputPipe(getGroup(), outputPipeAdv);
JxtaBiDiPipe pipe = new JxtaBiDiPipe(getGroup(), msgr, newpipe, credDoc, isReliable, direct);

pipe.setRemotePeerAdvertisement(peerAdv);
pipe.setRemotePipeAdvertisement(outputPipeAdv);
sendResponseMessage(getGroup(), msgr, newpipe);
return pipe;
}
} catch (IOException e)
{
// deal with the error
if (log.isTraceEnabled())
{
log.error("IOException occured", e);
}
}
return null;
}

}