Skip to main content

How can i create an output pipe???

No replies
smart_one
Offline
Joined: 2011-11-22
Points: 0

dear all , i have a problem for serveral days.

when i create outputpipe ,it's have an exception throw that is: Output Pipe could not be resolved after 10000ms.

here is the exception code: outputPipe = pipeSev.createOutputPipe(pipeAdv, 10000);

------------------------------------------------------------------

and this is the source code:

first i run a peer as a server to accept the pipe:

public class Peer1 {

private DiscoveryService discoveryService = null;

private PipeService pipeService = null;

private PeerGroup restoNet = null;

private PeerGroupID peerGroupID = null;

private InputPipe inputPipe = null;

private ModuleClassID myService1ID;
public static void main(String[] args) {
Logger.getLogger("net.jxta").setLevel(Level.SEVERE);
Peer1 peer1 = new Peer1();

peer1.launchJXTA();

}
private void launchJXTA() {
try {
NetworkConfigurator config = new NetworkConfigurator(NetworkConfigurator.ADHOC_NODE,
new File(new File(".jxta"), "ServiceServer").toURI());
// config.setPeerID(IDFactory.newPeerID(IDFactory.newPeerGroupID()));
// config.setUseMulticast(true);
//----
// config.addSeedRelay(new URI("http://138.96.254.42:80"));
// config.addSeedRendezvous(new URI("http://138.96.254.42:80"));
config.setPrincipal("peer4");
config.setPassword("888888888");
// config.setTcpEnabled(true);
// config.setTcpIncoming(true);
// config.setTcpOutgoing(true);
config.setTcpInterfaceAddress("169.254.65.251");
config.setTcpEndPort(9701);
config.save();
restoNet = new NetPeerGroupFactory().getInterface();
peerGroupID = restoNet.getPeerGroupID();
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
discoveryService = restoNet.getDiscoveryService();
pipeService = restoNet.getPipeService();
startServer();
}

private void startServer() {
this.buildModuleAdvertisement();
this.buildModuleSpecificationAdvertisement();

}
private void buildModuleAdvertisement(){
ModuleClassAdvertisement myService1ModuleAdvertisement = (ModuleClassAdvertisement)AdvertisementFactory.newAdvertisement(ModuleClassAdvertisement.getAdvertisementType());
myService1ModuleAdvertisement.setName("JXTAMOD");
myService1ModuleAdvertisement.setDescription("Service 1");
myService1ID = IDFactory.newModuleClassID();
myService1ModuleAdvertisement.setModuleClassID(myService1ID);
System.out.println("Publishing our Module Advertisement......");
try {
discoveryService.publish(myService1ModuleAdvertisement);
discoveryService.remotePublish(myService1ModuleAdvertisement);
} catch (IOException e) {
System.out.println("Error during publish of Module Advertisement");
e.printStackTrace();
System.exit(-1);
}

}
private void buildModuleSpecificationAdvertisement(){
ModuleSpecAdvertisement moduleSpecAdv = (ModuleSpecAdvertisement)AdvertisementFactory.newAdvertisement(ModuleSpecAdvertisement.getAdvertisementType());
moduleSpecAdv.setName("JXTASPEC");
moduleSpecAdv.setVersion("Version 1.0");
moduleSpecAdv.setCreator("Lyndon");
moduleSpecAdv.setModuleSpecID(IDFactory.newModuleSpecID(myService1ID));
moduleSpecAdv.setSpecURI("<http://www.jxta.org/CH15EX2>");
System.out.println("Publishing Module Specification Advertisement...");

PipeAdvertisement pipeAdv = this.createPipeAdvertisement();
moduleSpecAdv.setPipeAdvertisement(pipeAdv);

StructuredTextDocument doc = (StructuredTextDocument) moduleSpecAdv.getDocument(MimeMediaType.XMLUTF8);
StringWriter out = new StringWriter();
try {
doc.sendToWriter(out);
System.out.println(out.toString());
out.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

try {
discoveryService.publish(moduleSpecAdv);
discoveryService.remotePublish(moduleSpecAdv);
inputPipe = pipeService.createInputPipe(pipeAdv);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
while(true){
System.out.println("Waiting for client message to arrive");
Message msg;
try {
msg = inputPipe.waitForMessage();
} catch (InterruptedException e) {
inputPipe.close();
System.out.println("ServiceServer:Error listening for message");
return;
}
String receiveContent = null;
Message.ElementIterator en = msg.getMessageElements();
if(!en.hasNext()){
return ;
}
MessageElement msgElement = msg.getMessageElement(null,"DataTag");
if(msgElement.toString()!=null){
receiveContent = msgElement.toString();
}
if(receiveContent!=null){
System.out.println("ServiceServer:receive message: "+receiveContent);
}else{
System.out.println("ServiceServer: error could not find the tag");
}

}

}

private PipeAdvertisement createPipeAdvertisement(){
PipeAdvertisement pipeAdvertisement = null;
pipeAdvertisement = (PipeAdvertisement)AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());
pipeAdvertisement.setPipeID(createPipeID(peerGroupID));
pipeAdvertisement.setName("Peer1 Pipe");
pipeAdvertisement.setDescription("JXTA create first pipe");
pipeAdvertisement.setType(PipeService.UnicastType);
return pipeAdvertisement;
}

private PipeID createPipeID(PeerGroupID groupID){
PipeID pipeID = null;
pipeID = IDFactory.newPipeID(groupID);
return pipeID;
}
}

----------------------------------------------------------

and then the client code:

public class Peer2 {

private PeerGroup restoNet = null;//PeerGroup

private DiscoveryService disco;

private PipeService pipeSev;

private PipeAdvertisement pipeAdv = null;

private OutputPipe outputPipe;

public static void main(String[] args) {
// TODO Auto-generated method stub
// Logger.getLogger("net.jxta").setLevel(Level.SEVERE);
Peer2 peer2 = new Peer2();
peer2.launchJXTA();
}

private void launchJXTA() {
System.out.println("Lauching Peer into JXTA NetWork...");
try {
NetworkConfigurator config = new NetworkConfigurator(NetworkConfigurator.ADHOC_NODE,
new File(new File(".jxta"), "ServiceClient").toURI());
config.setPrincipal("peer3");
config.setPassword("888888888");
//----
// config.addSeedRelay(new URI("http://138.96.254.42:80"));
// config.addSeedRendezvous(new URI("http://138.96.254.42:80"));
// config.setTcpEnabled(true);
// config.setTcpIncoming(true);
// config.setTcpOutgoing(true);
config.setTcpInterfaceAddress("169.254.65.251");
config.setTcpEndPort(9702);
config.save();
restoNet = new NetPeerGroupFactory().getInterface();
} catch (Exception e) {
System.out.println("Unable to create PeerGroup - Failure");
System.exit(-1);
}
startClient();

}

private void getService(){
System.out.println("Getting Services....");
disco = restoNet.getDiscoveryService();
pipeSev = restoNet.getPipeService();
}

private void startClient(){
getService();
Enumeration en;
while(true){
try {
en = disco.getLocalAdvertisements(DiscoveryService.ADV, "Name", "JXTASPEC");
if((en!=null)&&en.hasMoreElements()){
break;
}
disco.getRemoteAdvertisements(null, DiscoveryService.ADV, "Name", "JXTASPEC", 1,null);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.print(".");
}
System.out.println("we found the service advertisement");
ModuleSpecAdvertisement moduleSpecAdv = (ModuleSpecAdvertisement)en.nextElement();

StructuredTextDocument doc = (StructuredTextDocument)moduleSpecAdv.getDocument(MimeMediaType.XMLUTF8);
StringWriter out = new StringWriter();
try {
doc.sendToWriter(out);
System.out.println(out.toString());
out.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

pipeAdv = moduleSpecAdv.getPipeAdvertisement();
if(null == pipeAdv){
System.out.println("pipeAdv is null");
}
try {
outputPipe = pipeSev.createOutputPipe(pipeAdv, 10000);//====================>here is the error
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String data = "Hello my friend" ;
Message msg = new Message();

StringMessageElement sme = new StringMessageElement("DataTag",data,null);
msg.addMessageElement(null,sme);
try {
outputPipe.send(msg);
System.out.println("message \""+data+"\" sent ot the ServiceServer");
} catch (IOException e) {
e.printStackTrace();
System.out.println("ServiceClient:Error sending message to the service");
}

}

}

any body ,please ,help!!!

thanks advance!