fly_hyp 发表于 2013-1-15 02:57:37

这是我自己使用memcached工具

是python 写的,只适用到最基本的类库,应该各个版本都能够运行。是一个命令提示符界面。启动以后会有简单提示。
 
#! /usr/bin/env python# -*- coding: iso-8859-1 -*-import telnetlibimport timeimport stringimport sysimport osimport os.pathENCODING = sys.getfilesystemencoding()host = {}host['ip']=''host['port']=11211tn = Nonedef sendCmd(tn,cmd):tn.write( cmd + "\n")time.sleep(0.5)print "\t --" + cmd    #print tn.read_some()    #print "\t--read_very_eager"#print tn.read_very_eager()   print tn.read_very_lazy()   print tn.read_very_eager() def test():    global tntn = telnetlib.Telnet(host['ip'],host['port'])#tn.set_debuglevel(2)#print tn.read_some( )   #sendCmd(tn,"stats")#sendCmd(tn,"stats sizes")#sendCmd(tn,"stats items")#sendCmd(tn,"stats slabs")#sendCmd(tn,"stats malloc")#sendCmd(tn,"stats maps")#sendCmd(tn,"stats cachedump 1 10000")    #sendCmd(tn,"stats detail dump")#stats      tn.close()class Shell:    shortCmdMap = {}    def statsCmd(self,uCmd):    global tn    if tn == None:      outPut(u" memcached is not connect")      return   cmd = uCmd.encode(ENCODING)    sendCmd(tn,cmd)      def open(self,uCmd):    global tn    parts = string.split(uCmd," ")    if len(parts) == 3 :      host['ip'] = parts      host['port'] = parts      tn = telnetlib.Telnet(parts,int(parts))    else:      outPut(u" open connent param error")                            def handleCmd(self,uCmd):    global tn,host    if tn == None:      outPut(u" memcached is not connect")    else:      outPut(u"connect to:" + host['ip'] + u"," + host['port'])    if uCmd == u"env":      print os.environ      return    if uCmd == u"list" or uCmd == u"l":      files = os.listdir("")      self.shortCmdMap = {}      index = 0      for f in files:                uFile = f.decode(ENCODING)      #outPut(uFile)      uFile = uFile.lower()      if uFile.endswith(".bat"):          index += 1          uShortCmd = u"#" + str(index)          self.shortCmdMap = uFile          outPut(uShortCmd +" " +uFile)      return    if uCmd == u"help" or uCmd == u"h":      printHelpInfo()      return    if uCmd.startswith(u"#"):      uRealCmd = self.shortCmdMap      if uRealCmd != None:      os.system(uRealCmd)      return    print uCmd    if uCmd.startswith(u"open "):      self.open(uCmd)      return   if uCmd.startswith(u"stats"):      self.statsCmd(uCmd)      return         def processCmd(self):    while True:      cmd = raw_input(u'\n$:'.encode(ENCODING))      uCmd = cmd.decode(ENCODING)      uCmd = string.strip(uCmd)      if uCmd == u"exit":      outPut("bye-bye")      break       else:            self.handleCmd(uCmd)      outPut(uCmd)    def printHelpInfo():outPut(u"help(h) print help ")    outPut(u"list(l) list all command")outPut(u"env display all env info")outPut(u"open host port")outPut(u"stats")    outPut(u"stats reset")outPut(u"stats sizes")outPut(u"stats items")outPut(u"stats slabs")outPut(u"stats malloc")outPut(u"stats maps")outPut(u"stats maps")outPut(u"stats cachedump slab_id limit_num")outPut(u"exit")    def outPut(ustr):print ustr.encode(ENCODING)if __name__ == '__main__':    outPut(u"memcached stat shell:")s = Shell()printHelpInfo()s.processCmd()
页: [1]
查看完整版本: 这是我自己使用memcached工具