yjl49 发表于 2013-2-3 11:21:35

OF 同步异步问题的改进

 
之前的一篇文章中提到过OF中的同步异步问题,这里针对文章最后描述的两个缺点做些改进。
这里提供一个公共的专门用于处理S2S通信问题的IQ类S2SDataManager。
可以提供多种接口:1.阻塞等待固定时长;2.阻塞等待,并在接收到结果或超时后继续;3.发送后直接返回;4.发送后直接返回,提供回调接口。
 
public class S2SDataManager implements IQResultListener{    private XMPPServer server;    private RoutingTable routing;    private long timeout=3000;//3 秒超时    //用来存储接收到的S2S结果包    private Map<String,Packet> results = new ConcurrentHashMap<String,Packet>();    //用来存储同步锁    private Map<String,Notifyer> noties = new ConcurrentHashMap<String,Notifyer>();    //用来存储回调接口实现类对象    private Map<String,S2SIQResultListener> liteners = new ConcurrentHashMap<String,S2SIQResultListener>();    //用来存储超时处理对象    private Map<String,Long> gards = new ConcurrentHashMap<String,Long>();      private static class S2SDataManagerContainer{       private static S2SDataManager instance = new S2SDataManager();    }       public static S2SDataManager getInstance(){       return S2SDataManagerContainer.instance;    }   private S2SDataManager(){      //一些初始化    }    //方法1    public Packet send(Packet packet,int time){      //类似于OF中原有的方法    }    //方法2    public Packet send(Packet packet){      String id = packet.getID();      Notifyer noty = new Notifyer(id);      noties.put(id,noty);      addIQRouterListener(id);      synchronized(noty){         routing.routePacket(packet.getTo(),packet.true);         try{                            noty.wait(timeout);         }         catch(InterruptedException ex){            //log it         }      }      Packet reply =results.remove(packet.getID());      return reply;    }    @override    public void receivedAnswer(IQ iq){      this.results.put(iq.getID(),iq);      Notifyer noty = noties.remove(iq.getID());      if(noty!=null){            synchronized(noty){               try{                                    noty.notify();               }               catch(Exception ex){                  //log it               }            }      }    }      //方法3    public void send(Packet packet){      routing.routePacket(Packet.getTo(),packet,true);    }    //方法4    public void send(Packet packet,S2SIQResultListener listener){          String id = packet.getID();          addIQRouterListener(id);          listeners.put(id,listener);          gards.put(id,System.currentTimeMillis()+timeout);          routing.routePacket(packet.getTo(),packet,true);    }    @Override    public void answerTimeout(String id){       //外部超时通知处理    }    private class TimeOutTask extends TimerTask{               @Override      public void run(){            final Iterator<Map.Entry<String,Long> it = gards.entitySet().iterator();            while(it.hasNext){                final Map.Entry<String,Long> point= it.next();                if(System.currentTimeMillis()<point.getValue()){                  continue;                }                listeners.remove(point.getKey());                it.remove();            }      }    }}

 
页: [1]
查看完整版本: OF 同步异步问题的改进