Arquitectura de Software 16 Oct 2006 06:32 pm

Patrones: Reactor

Este patrón es uno de los bonitos si tu proyecto tiene que ver con el par problema/solución en el que es aplicable. El Reactor está especialmente indicado en aplicaciones que deben responder ante diferentes eventos de forma que no tengamos que preocuparnos por el típico bucle de espera de eventos que luego deben ser despachados al método o función adecuados.

La idea es tener todo un esqueleto hecho (volvemos a ser vagos) y sólo programar la parte específica de la aplicación. De esta forma primero registramos en pares evento-manejador de evento, y luego es el Framework que implementa el patrón el que nos llama cuando ha habido una ocurrencia de algo a lo estábamos registrados.

Esto es lo que se llama inversión de control: No somos nosotros los que llamamos, es el framework el que lo hace.

Reactor

Descripción
Es un patrón de arquitectura que nos permite a aplicaciones orientadas a eventos demultiplexar y despachar las peticiones de servicio que llegan desde uno o más clientes.

Contexto de uso
En Osmius tanto en el Centro de Supervisión como en el Agente Maestro se repite la situación que nos permite aprovecharnos de este patrón. Ambos procesos tiene un servicio para la recepción de mensajes a través de varios sockets. En el caso del centro de supervisión recibiremos conexiones y datos de múltiples agentes maestros y, en el caso de éstos últimos, recibirán peticiones de conexión y de envío de mensajes de los agentes gestionados.
Los agentes también manejan varios orígenes de eventos como son los sockets para la recepción de órdenes y un nuevo tipo de origen constituido por las expiraciones de los contadores de tiempo asociados a los periodos de monitorización de las diferentes variables de las instancias monitorizadas.

Necesitamos quedarnos a la espera de la aparición de eventos en determinados recursos, detectar su aparición, enviar o despachar su tratamiento a los métodos adecuados y realizar el tratamiento adecuado en cada caso.

Además de nuevo nos encontramos con el requerimiento multiplataforma y la deseada reusabilidad, con lo que programar los mecanismos de demultiplexación de forma fija para determinada plataforma nos impedirá alcanzar los objetivos además de perder tiempo y recursos.

Beneficios en el caso de Osmius:

  • Reutilización del código no específico.
  • Claridad de separación de demultiplexación y despacho de eventos, de su tratamiento. Nos centramos en codificar éste último.
  • Portabilidad.

Ejemplo de Uso
En este patrón hay que cambiar la mentalidad típica de programación de aplicaciones en las que es la aplicación la que programa el bucle de captura de eventos y se encarga de llamar de manera activa a las funciones para el tratamiento de cada evento.

Este patrón y el framework en que se encuadra dentro de Osmius implementan inversión de control. Por inversión de control entendemos que es el framework el que llama a determinados métodos gancho de objetos que habremos previamente registrado con el Reactor, al ocurrir los eventos.

En el caso de los eventos Osmius que deben ejecutarse al vencer determinado periodo de tiempo lo primero que hacemos en crear nuestra clase haciendo que sea derivada de ACE_Event_Handler utilizando la herencia.

class OSM_Event : public ACE_Event_Handler
{
public:
// Constructor al que le pasamos un reactor por si queremos uno distinto del de
// por defecto.
OSM_Event (ACE_Reactor *r= ACE_Reactor::instance());

// Método gancho que se llama siempre así que será llamado cuando expiren los contadores
// de tiempo registrador.
// ACE_Time_Value es otra clase de la Wrapper Facades para encapsular el tratamiento de
// tiempos y fechas de forma portable.
virtual int handle_timeout (const ACE_Time_Value &tv,
const void *);
// Tratamiento específico de inicialización para Osmius.
int open(const ACE_TCHAR* name, const OSM_Instance_Base* instance,
const int interval , const int i_delay);

// Cierre del objeto.
int close(void);

// Método gancho llamado cuando este objeto se saca del framework del reactor y es
// destruido.
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
ACE_Reactor_Mask = 0);

private:
..........
..........
..........
};

En la implementación de handle_timeout() ponemos el tratamiento específico de nuestra aplicación, en nuetro caso ejecutamos la acción contra la instancia que nos devuelve un valor que metemos en un mensaje que le pasamos al manejador de mensajes MsgManager a través de su método put().

int
OSM_Event::handle_timeout(const ACE_Time_Value &tv, const void *)
{
......
......
ACE_Time_Value time_before = ACE_OS::gettimeofday();
// Creamos el mensaje y le ponemos los campos iniciales
OSM_Message* osmius_message=0;
ACE_NEW (osmius_message, OSM_Message);
osmius_message.typ_message ("N", 1);
......
......
ACE_Time_Value time_after;
// Ejecutamos la acción que sea contra la instancia y recuperamos el valor en “value”.
this->action_.execute(this->cmd_line_,ACE_OS::strlen(this->cmd_line_),
this->timeout_,
val_text ,
&val_len , &value )

// Ponemos el OSM_Message dentro de un objeto ACE_Message_Block para su envío.
ACE_Message_Block* mblk=0;
mblk = (ACE_Message_Block*) &osmius_message;
// Prepare the message to be sent.
osmius_message->encode();

// Enviamos el mensaje al agente maestro o donde lo envíe ins_manager con put().
// The pointer to the instance manager is in our osm_instance owner.
this->osm_instance_->ins_manager()->put(mblk,0)

return 0;
};

Por último tenemos que registrarnos con el reactor para que nos llame cada vez que venza el periodo de tiempo (interval) asignado al evento.

int
OSM_Event::open(const ACE_TCHAR* name, const OSM_Instance_Base* instance,
const int interval, const int i_delay);
{
ACE_OS::strncpy(this->name_, name,OSM_Message::CODLEN);
this->cod_message_[OSM_Message::CODLEN]=0;
this->osm_instance_ = instance;

if (interval > 0)
this->interval_ = interval;
else
this->interval_ = 5;

if (initial_delay > 0)
this->initial_delay_ = initial_delay;
else
this->initial_delay_ = 5;

// Nos registramos con el reactor a través de la llamada para contadores de tiempo
// que se llama schedule_timer().
ACE_Time_Value tv_interval(this->interval_);
ACE_Time_Value tv_delay(this->initial_delay_);
this->reactor()->schedule_timer(this, 0, tv_delay,tv_interval)
......
......

return 0;
}

Si queremos aplicar este patrón orientado a cuando se reciban datos de red en determinado socket como es el caso en el servicio que recoje los mensajes en el Master Agent para a su vez enviarlos al Centro de Supervisión:
Declaramos la clase.

class ACE_Svc_Export OSM_MA_MsgReceiver
: public ACE_Svc_Handler
// Derivamos de ACE_Svc_Handler que es a su vez derivada de ACE_Event_Handler utilizando
// templates para indicar que utilizaremos sockets para la comunicación y que no queremos
// sincornización ya que no vamos a utilizar en este caso multihilo.
{
public:
// Constructor.
OSM_MA_MsgReceiver (OSM_MA_MsgSender *handler = 0)
: msg_sender_ (handler) {}

// Método gancho de inicialización que será llamado cuando un agente haga una nueva
// conexión.
virtual int open (void *);

// Método gancho para cierre y para desregistrarse con el reactor.
virtual int close (u_long = 0);

protected:
// Método gancho que se llamará cuando un cliente (agente) tenga un mensaje que
// enviar (con send()).

virtual int handle_input (ACE_HANDLE handle);

// Método gancho que se llama cuando un cliente termina.
// Cerramos el socket y quitamos el cliente de “connected_clients_”.
virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
ACE_Reactor_Mask = 0);

// Puntero al servicio MessageSender.
OSM_MA_MsgSender *msg_sender_;

// Lleva la cuenta de los descriptores de los clientes conectados.
ACE_Handle_Set connected_clients_;
};

Registro con el Reactor para cuando hay datos en el socket en el método open():

int OSM_MA_MsgReceiver::open (void *)
{
ACE_HANDLE handle = peer ().get_handle ();
if (reactor ()->register_handler
(handle, this, ACE_Event_Handler::READ_MASK) == -1)
return -1;
connected_clients_.set_bit (handle);
return 0;
}

Tratamiento en handle_input():

int OSM_MA_MsgReceiver::handle_input (ACE_HANDLE client_handle)
{
ACE_Message_Block *mblk = 0;
OSM_MsgHandler message_handler(client_handle);

// Creamos un bloque para poner el mensaje y enviarlo luego.
if (-1 == message_handler.recv_message (mblk))
{
return -1;
}

// Enviamos el mensaje.
if (-1 == msg_sender_->put(mblk))
{
// Could not put the message.
mblk->release ();
return -1;
}

mblk->release ();
return 0;

}

El patrón Reactor dentro del framework que proporciona ACE se utiliza extensamente en Osmius para envío y recepción de datos a través de sockets, capturar expiraciones de contadores del tiempo y para recoger llamadas desde señales como la pulsación de Control-C por un usuario.

Comments are closed.