#!/usr/bin/env python #-*- coding: iso8859-15 -*- ########################################################## # SIGNALS_FTP v1.0 # ########################################################## # Autor: Juan Miguel Taboada Godoy # # Fecha: Szczecin, 05 de Septiembre de 2006 # # Descripción: Adquisitor de ficheros de Telemando de # # tipo SIGNALS mediante FTP # # Versión: 2006122300 # # # # Codigo fuente bajo licencia GNU/GPL # # Centrologic (Computational Logistic Center) # # http://www.centrologic.com - info@centrologic.com # ########################################################## # Librerías que voy a usar {{{1 import time import datetime import os import shutil from lib.FTP import FTP from lib.DATE import tempo_limit from lib.HERENCIA import BASE,DEFAULTCONFIG # }}}1 # ### CONFIG ### ###################################### ## Acceso a Telemando mediante SIGNALS_FTP: configuracion por defecto class CONFIG(DEFAULTCONFIG): # Constructor {{{1 ## Constructor ## \param self - def __init__(self): # Cargo las acciones del padre DEFAULTCONFIG.__init__(self,"SIGNALS_FTP") # }}}1 # Config Telemando {{{1 # download_descarga {{{2 ## Actualizar cada n-segundos ## \param self - ## \param arg Segundos entre acutalizaciones def download_tempo(self,arg=None): if (arg!=None): self._download_tempo=arg return self._download_tempo; # }}}2 # server {{{ ## Servidor FTP ## \param self - ## \param arg Direccion IP o nombre del servidor FTP def server(self,arg=None): if (arg!=None): self._server=arg return self._server; # }}}2 # port {{{2 ## Puerto del servidor FTP ## \param self - ## \param arg Puerto def port(self,arg=None): if (arg!=None): self._port=arg return self._port; # }}}2 # user {{{2 ## Usuario de acceso ## \param self - ## \param arg Usuario de acceso al FTP def user(self,arg=None): if (arg!=None): self._user=arg return self._user; # }}}2 # passwd {{{2 ## Clave ## \param self - ## \param arg Clave del usuario para acceso por FTP def passwd(self,arg=None): if (arg!=None): self._passwd=arg return self._passwd; # }}}2 # recv {{{2 ## Directorio del servidor en el que estan los registros ## \param self - ## \param arg Directorio del servidor def recv(self,arg=None): if (arg!=None): self._recv=arg return self._recv # }}}2 # save {{{2 ## Directorio del servidor en el que guardar una copia de los registros ## \param self - ## \param arg Directorio del servidor def save(self,arg=None): if (arg!=None): self._save=arg return self._save # }}}2 # dirtemp {{{2 ## Directorio donde almacenar temporalmente los registros descargados ## \param self - ## \param arg Directorio local def dirtemp(self,arg=None): if (arg!=None): self._dirtemp=arg return self._dirtemp # }}}2 # bloqueo {{{2 ## Fichero que controla el bloqueo ## \param self - ## \param arg Nombre del fichero def bloqueo(self,arg=None): if (arg!=None): self._bloqueo=arg return self._bloqueo # }}}2 # espera_flush {{{2 ## Tiempo de espera del FLUSH ## \param self - ## \param arg Ciclos de espera def espera_flush(self,arg=None): if (arg!=None): self._espera_flush=arg return self._espera_flush # }}}2 # Guarda el tiempo de timeout de uso del FTP {{{2 ## Tiempo de timeout para el FTP ## \param self - ## \param arg Ciclos de espera para timeout del FTP def timeout_espera_ftp(self,arg=None): if (arg!=None): self._timeout_espera_ftp=arg return self._timeout_espera_ftp # }}}2 # }}}1 # Checker {{{1 ## Comprueba que el conjunto minimo de datos requeridos por la clase han sido declarados por el usuario: \n ## - download_tempo() ## - debug() ## - recv() ## - save() ## - dirtemp() ## - bloqueo() ## - espera_flush() ## - timeout_esperar_ftp() ## \param self - def check(self): self._check("download_tempo") self._check("debug") self._check("recv") self._check("save") self._check("dirtemp") self._check("bloqueo") self._check("espera_flush") self._check("timeout_espera_ftp") self.check_default() # }}}1 # ### TELEMANDO ### ################################### ## Acceso a Telemando mediante SIGNALS_FTP: Telemando class TELEMANDO(BASE): # Constructor {{{1 ## Constructor ## \param self - ## \param id Identificador del Telemando (ALFANUMERICO) def __init__(self,id): # Cargo las acciones del padre BASE.__init__(self,id,"SIGNALS_FTP") # Inicializo el estado interno del Telemando self.__incongruencias=[] # }}}1 # Guarda la config {{{1 ## Guarda la configuracion del Telemando ## \param self - ## \param config Copnfiguración por defecto obtenida de la clase CONFIG ## \param server Direccion IP o nombre del servidor FTP ## \param fichero Nombre del fichero a descargar ## \param user Usuario del servidor FTP ## \param password Clave del usuario FTP ## \param port Puerto del servidor FTP def config(self,config,server,fichero,user=None,password=None,port=None): # Configuración por defecto self.default_config(config) # Cargo los datos en la config self.server(server) self.fichero(fichero) if (user!=None): self.user(user) if (password!=None): self.passwd(password) if (port!=None): self.port(port) # Chequeo la config self.check() # }}}1 # Guarda el server {{{1 ## Servidor FTP ## \param self - ## \param arg Direccion IP o nombre del servidor FTP def server(self,arg=None): if (arg!=None): self._server=arg return self._server # }}}1 # Guarda el fichero {{{1 ## Fichero a descargar del servidor FTP ## \param self - ## \param arg Nombre del fichero def fichero(self,arg=None): if (arg!=None): self._fichero=arg return self._fichero # }}}1 # Guarda el usuario {{{1 ## Usuario de acceso ## \param self - ## \param arg Usuario de acceso al FTP def user(self,arg=None): if (arg!=None): self._user=arg return self._user # }}}1 # Guarda la clave {{{1 ## Clave ## \param self - ## \param arg Clave del usuario para acceso por FTP def passwd(self,arg=None): if (arg!=None): self._passwd=arg return self._passwd # }}}1 # Guarda el puerto {{{1 ## Puerto del servidor FTP ## \param self - ## \param arg Puerto def port(self,arg=None): if (arg!=None): self._port=arg return self._port # }}}1 # Guarda el tiempo de espera del FLUSH {{{1 ## Tiempo de espera del FLUSH ## \param self - ## \param arg Ciclos de espera def espera_flush(self,arg=None): if (arg!=None): self._espera_flush=arg return self._espera_flush # }}}1 # Guarda el tiempo de timeout de uso del FTP {{{1 ## Tiempo de timeout para el FTP ## \param self - ## \param arg Ciclos de espera para timeout del FTP def timeout_espera_ftp(self,arg=None): if (arg!=None): self._timeout_espera_ftp=arg return self._timeout_espera_ftp # }}}1 # Checker {{{1 ## Comprueba que el conjunto minimo de datos requeridos por la clase han sido declarados por el usuario: \n ## - server() ## - fichero() ## \param self - def check(self): self._check("server") self._check("fichero") self.check_default() # }}}1 # Descarga los datos del Telemando {{{1 ## \internal ## Descarga los datos del Telemando ## \param self - def download_extendido(self): # Creo el nombre del fichero origen {{{2 origen=self.valor("fichero") # }}}2 # Creo el nombre del fichero destino {{{2 date=datetime.datetime.fromtimestamp(time.time()) fecha=date.timetuple() destino="%s_%s.signal" % (self.internal_id(),tempo_limit(fecha,self.valor("download_tempo"))) # }}}2 self.debug("Inicia la descarga de datos [%s,%s]" % (origen,destino)) # Cambio al directorio destino {{{2 self.debug("Cambia al directorio %s" % (self.valor("dirtemp"))) pwd=os.getcwd() try: os.chdir(self.valor("dirtemp")) except Exception: # Genero un error porque no puedo cambiarme al directorio destino raise IOError,"Error al cambiar al directorio \"%s\"" % (self.valor("dirtemp")) # Habilito una incongruencia self.habilitar(("sincronizar1",pwd)) # }}}2 # Descarga de los registros del Telemando físico {{{2 # Revisa si el registro existe {{{3 self.debug("Comprobando existencia previa del registro") file_exists=((os.path.exists("bak/%s.dat" % (destino))) or (os.path.exists("bak/%s.dat.gz" % (destino)))) # }}}3 # Test de existencia: Si no dispone del registo se lo baja if (not (file_exists)): # Conecto al FTP {{{3 self.debug("Conectando al FTP... ftp://%s:%s" % (self.valor("server"),self.valor("port"))) try: ftp=FTP(self.valor("server"),self.valor("port"),self.valor("user"),self.valor("passwd")) except Exception,e: # Genero un error raise ConnectionError,"No se pudo conectar al servidor FTP: %s" % e # }}}3 # Obtengo la hora actual {{{3 self.debug("Obteniendo la hora") hora=time.time() # }}}3 # Recolecto el registro del servidor {{{3 self.debug("Recolectando el registro del servidor") comprimido=False try: comprimido=self.recolectar_registro(ftp,self.valor("recv"),self.valor("save"),self.valor("fichero"),hora) except Exception,e: # Desbloqueo el servidor {{{4 m="Registro no recolectado por error en el proceso: %s" % e try: self.debug("Desconectando del servidor FTP...") ftp.DISCONNECT() except Exception,e: m="%s\n%s" % (m,"No he podido desconectar del servidor de FTP: %s" % (e)) # }}}4 # Devuelvo un error de comunicaciones raise ConnectionError,m # }}}3 # Desconecto del servidor FTP sin importar lo que pase {{{3 try: self.debug("Desconectando del servidor FTP...") ftp.DISCONNECT() except Exception,e: self.debug("Ocultado error en Telemando %s al desconectar del servidor de FTP: %s" % (self.internal_id(),e)) # }}}3 # Procesa los datos descargados {{{3 self.debug("Procesando los datos descargados") # Copio el fichero de registros al directorio de salida {{{4 self.debug("Copiando el fichero de registros al directorio de salida") try: if (not comprimido): # Si no esta comprimido trabajo normalmente os.rename("%s.signal.dat" % (origen),"%s.dat" % (destino)) shutil.copyfile("%s.dat" % (destino),"dat/%s.dat" % (destino)) shutil.copyfile("%s.dat" % (destino),"bak/%s.dat" % (destino)) os.unlink("%s.dat" % (destino)) else: # Si esta comprimido trabajo con la extension gz os.rename("%s.signal.dat.gz" % (origen),"%s.dat.gz" % (destino)) shutil.copyfile("%s.dat.gz" % (destino),"dat/%s.dat.gz" % (destino)) shutil.copyfile("%s.dat.gz" % (destino),"bak/%s.dat.gz" % (destino)) os.unlink("%s.dat.gz" % (destino)) except Exception,e: self.debug("Error en Telemando %s al manejar el fichero de registros %s. Error: %s" % (self.internal_id(),origen,e)) # }}}4 # }}}3 else: # Muestro el mensaje de cancelacion self.debug("Cancelada descarga, no es necesario sincronizar") # Cambio al directorio origen try: os.chdir(pwd) except Exception,e: # Genero un error porque no puedo cambiarme al directorio destino raise TerminalError,"No se ha podido volver al directorio origen: %s" % (e) # Desabilito la incongruencia self.deshabilitar(("sincronizar1",pwd,self.valor("dirtemp"))) # }}}2 # Vuelvo al directorio origen {{{2 self.debug("Volviendo al directorio %s" % (pwd)) try: os.chdir(pwd) except Exception,e: # Genero un error porque no puedo cambiarme al directorio destino raise TerminalError,"No se ha podido volver al directorio origen: %s" % (e) # Desabilito la incongruencia self.deshabilitar(("sincronizar1",pwd,self.valor("dirtemp"))) # }}}2 # }}}1 # Recolector del registro {{{1 ## \internal ## Recolector del registro ## \param self - ## \param ftp Conector al servidor FTP ## \param recv Directorio remoto en el que esta el registro a recolectar ## \param save Directorio remoto en el que guardar una copia de seguridad ## \param fichero Nombre del fichero a recolectar ## \param hora Hora que poner al fichero local una vez recibido def recolectar_registro(self,ftp,recv,save,fichero,hora): # Me muevo al directorio de recepción de registros self.debug("[FTP] CD %s" % (recv)) try: ftp.CD(recv) except Exception,e: raise IOError,"Error al situarme en el directorio destino: %s" # Creo el mensaje FLUSH self.debug("[FLUSH] Creo localmente el fichero con la accion") try: FILE=open("msg.tmp","a") FILE.write("FLUSH") FILE.close() except Exception,e: raise IOError,"Error no he podido crear el fichero temporal con la accion FLUSH. Error: %s" % (e) # Envio el mensaje FLUSH self.debug("[FLUSH] Envio el fichero con la accion al Telemando") try: ftp.PUT("msg.tmp") except Exception,e: os.unlink("msg.tmp") raise IOError,"No he podido enviar el fichero con la accion FLUSH. Error: %s" % (e) # Borro el mensaje local os.unlink("msg.tmp") # Renombro el mensaje FLUSH para que se ejecute su accion self.debug("[FLUSH] Renombro en el Telemando el fichero para que se ejecute su accion") try: ftp.RENAME("msg.tmp","%s.signal.dat.flush" % (fichero)) except Exception,e: self.debug("[FLUSH] No he podido renombrar el fichero del Telemando. Error: %s" % (e)) m="No he podido renombrar el fichero con la accion FLUSH del Telemando. Error: %s" % (e) try: ftp.REMOVE("msg.tmp") except Exception,e: self.debug("[FLUSH] No he podido eliminar el mensaje temporal del Telemando. Error: %s" % (e)) m="%s\n%s" % (m,"No he podido eliminar el mensaje temporal con la accion FLUSH del Telemando. Error: %s" % (e)) raise IOError,m # Espero a que la accion este realizada self.debug("[FLUSH] Esperando a que se realize la accion") procesado=False veces=0 while ((not procesado) and (veces<=self.valor("espera_flush"))): # Recojo si el servidor ha procesado el FLUSH o no procesado=(not ftp.EXISTF("%s.signal.dat.flush" % (fichero))) # Si el mensaje no ha sido procesado if (not procesado): # Duerme 1 segundos time.sleep(1) # Incremento las veces veces=veces+1 # Si salimos del bucle por error (indico que hubo un error) if (not procesado): # Envio el mensaje de debug self.debug("[FLUSH] El fichero con la accion no ha sido procesado por el Telemando") # Elimino la accion del servidor m="El fichero con la accion FLUSH no ha sido procesado por el Telemando" try: ftp.REMOVE("%s.signal.dat.flush" % (fichero)) except Exception,e: self.debug("[FLUSH] No se pudo eliminar del Telemando el fichero con la solicitud. Error: %s" % (e)) m="%s\n%s" % (m,"No se pudo eliminar el fichero con la solicitud FLUSH del Telemando. Error: %s" % (e)) # Genero una excepcion con el error raise MotorError,m # Timeout timeout=self.valor("timeout_espera_ftp") # Compruebo si el registro esta comprimido comprimido=(ftp.EXISTF("%s.signal.dat.reg.gz" % (fichero))) # Descargo el registro if (not comprimido): # Si no esta comprimido lo descargo normalmente self.debug("[FTP] GET %s.signal.dat.reg (Timeout: %s segundos)" % (fichero,timeout)) ftp.GET("%s.signal.dat.reg" % (fichero),timeout) else: # Si esta comprimido lo descargo con extension gz self.debug("[FTP] GET %s.signal.dat.reg.gz (Timeout: %s segundos)" % (fichero,timeout)) ftp.GET("%s.signal.dat.reg.gz" % (fichero),timeout) # Creo un nuevo nombre para el regisro (para ser guardados) nuevo_registro="%d_%s" % (int(hora),fichero) # Subo los registros a la carpeta de guardados self.debug("[FTP-BAK] PWD") actual=ftp.PWD() self.debug("[FTP-BAK] CD %s" % (save)) ftp.CD(save) # Subo el fichero if (not comprimido): # Si no esta comprimido lo subo normalmente self.debug("[FTP-BAK] PUT %s.signal.dat.reg (Timeout: %s segundos)" % (fichero,timeout)) ftp.PUT("%s.signal.dat.reg" % (fichero),timeout) # Renombro el fichero remoto self.debug("[FTP-BAK] RENAME %s.signal.dat.reg %s.signal.dat" % (fichero,nuevo_registro)) ftp.RENAME("%s.signal.dat.reg" % (fichero),"%s.signal.dat" % (nuevo_registro)) else: # Si esta comprimido lo descargo con extension gz self.debug("[FTP-BAK] PUT %s.signal.dat.reg.gz (Timeout: %s segundos)" % (fichero,timeout)) ftp.PUT("%s.signal.dat.reg.gz" % (fichero),timeout) # Renombro el fichero remoto self.debug("[FTP-BAK] RENAME %s.signal.dat.reg.gz %s.signal.dat.gz" % (fichero,nuevo_registro)) ftp.RENAME("%s.signal.dat.reg.gz" % (fichero),"%s.signal.dat.gz" % (nuevo_registro)) # Vuelvo al directorio general self.debug("[FTP-BAK] CD %s" % (actual)) ftp.CD(actual) # Renombro el fichero local if (not comprimido): # Si no esta comprimido lo renombro normalmente self.debug("Renombra el fichero local a la extension esperada") os.rename("%s.signal.dat.reg" % (fichero),"%s.signal.dat" % (fichero)) else: # Si esta comprimido uso extension gz self.debug("Renombra el fichero local a la extension esperada") os.rename("%s.signal.dat.reg.gz" % (fichero),"%s.signal.dat.gz" % (fichero)) # Creo el mensaje CLEAN porque ya hemos terminado el proceso completo self.debug("[CLEAN] Creo localmente el fichero con la accion") try: FILE=open("msg.tmp","a") FILE.write("CLEAN") FILE.close() except Exception,e: raise IOError,"Error no he podido crear el fichero temporal con la accion CLEAN. Error: %s" % (e) # Envio el mensaje CLEAN self.debug("[CLEAN] Envio el fichero con la accion al Telemando") try: ftp.PUT("msg.tmp") except Exception,e: os.unlink("msg.tmp") raise IOError,"No he podido enviar el fichero con la accion CLEAN. Error: %s" % (e) # Borro el mensaje local os.unlink("msg.tmp") # Renombro el mensaje CLEAN para que se ejecute su accion self.debug("[CLEAN] Renombro en el Telemando el fichero para que se ejecute su accion") try: ftp.RENAME("msg.tmp","%s.signal.dat.clean" % (fichero)) except Exception,e: self.debug("[CLEAN] No he podido renombrar el fichero del Telemando. Error: %s" % (e)) m="No he podido renombrar el fichero con la accion CLEAN del Telemando. Error: %s" % (e) try: ftp.REMOVE("msg.tmp") except Exception,e: self.debug("[CLEAN] No he podido eliminar el mensaje temporal del Telemando. Error: %s" % (e)) m="%s\n%s" % (m,"No he podido eliminar el mensaje temporal con la accion CLEAN del Telemando. Error: %s" % (e)) raise IOError,m # Devuelve si el fichero descargado está comprimido o no return comprimido # }}}1 # Control de descarga de los Telemandos {{{1 ## Control de descarga de los Telemandos (esta clase simplifica el proceso de recolectar Excepciones) ## \param self - def download(self): self.debug("Descargando datos para el Telemando '%s'" % (self.internal_id())) try: self.download_extendido() except Exception,e: # Error definitivo de entrada/salida # Si hay error al sincronizar, no se valida el TELEMANDO (podría disponer del resultado en la BD) # ANULADO: raise InvalidarTelemando, "%s" % (e) self.debug("El Telemando %s no ha sincronizado debido a un error: %s" % (self.internal_id(),e)) # Necesito estabilizar el Telemando para seguir usándolo self.estabilizar() raise self.debug("") # }}}1 # Método para resolver incongruencias {{{1 ## \internal ## Metodo para resolver incongruencias ## \param self - def estabilizar(self): # Si hay incongruencias que resolver, las resuelvo for elemento in self.__incongruencias: # Sincronizar 1 if (elemento[0]=="sincronizar1"): self.debug("Resolviendo incongruencia 'sincronizar1'") # Obtengo el directorio al que debo cambiar pwd=elemento[1] try: # Cambio al directorio os.chdir(pwd) except Exception,e: raise TerminalError,"ESTABILIZADOR: No se ha podido volver al directorio origen: %s" % (e) # Si es una incongruencia desconocida else: # La situación es incongruente y no puede ser resuelta (lanzo una excepción Terminal) raise TerminalError,"ESTABILIZADOR: Se han detectado incongruencias desconocidas" # Todas las incongruencias han sido resueltas (la lista queda vacía) self.__incongruencias=[] # }}}1 # Método para habilitar incongruencias {{{1 ## \internal ## Metodo para habilitar incongruencias ## \param self - ## \param incongruencia ID Alfanumerico de la incongruencia def habilitar(self,incongruencia): # Las incongruencias las inserto al principio (mantengo lista invertida) self.debug("Activando control de incongruencia '%s'" % (incongruencia[0])) self.__incongruencias.insert(0,incongruencia) # }}}1 # Método para deshabilitar incongruencias {{{1 ## \internal ## Metodo para deshabilitar incongruencias ## \param self - ## \param incongruencia ID Alfanumerico de la incongruencia def deshabilitar(self,incongruencia): # Inicializo una lista vacía temp=[] # Para toda la lista (manteniendo el orden establecido) for elemento in self.__incongruencias: # Me quedo con las que no coincidan if (elemento!=incongruencia): self.debug("Desactivando control de incongruencia '%s'" % (incongruencia[0])) # Cargo el elemento a la lista temporal temp.append(elemento) # Rehago las lista de incongruencias con la temporal self.__incongruencias=temp # }}}1 # EXCEPTION CLASSES {{{1 # Excepciones básicas # Except (General Exception) {{{2 class Except(Exception): def __init__(self,string): self.string=string def __str__(self): return self.string #}}}2 # ConnectionError (Conexión errónea al servidor) {{{2 class ConnectionError(Exception): def __init__(self,string): self.string=string def __str__(self): return self.string # }}}2 # IOError (Error de entrada/salida) {{{2 class IOError(Exception): def __init__(self,string): self.string=string def __str__(self): return self.string # }}}2 # ExecutionError (Error de ejecucción) {{{2 class ExecutionError(Exception): def __init__(self,string): self.string=string def __str__(self): return self.string # }}}2 # StructureError (Error de estructura) {{{2 class StructureError(Exception): def __init__(self,string): self.string=string def __str__(self): return self.string # }}}2 # LockError (Server locked error) {{{2 class LockError(Exception): def __init__(self,string): self.string=string def __str__(self): return self.string # }}}2 # MotorError (Error en el motor del Telemando) {{{2 class MotorError(Exception): def __init__(self,string): self.string=string def __str__(self): return self.string # }}}2 # Excepciones graves # TerminalError (Error irrecuperable del programa) {{{2 class TerminalError(Exception): def __init__(self,string): self.string=string def __str__(self): return self.string # }}}2 # }}}1