iyuan 发表于 2013-2-1 11:05:56

zeroMQ初体验-32.发布/订阅模式进阶-克隆模式-上

在发布/订阅模式中,特别是现实应用中,总会因为这样那样的问题导致订阅者丢失了所需的数据,如此,便有了重新获得的需求。通常来说,这个会由订阅者来完成,不过"千百个哈姆雷特"从工程的角度来看,实在不忍睹,完全违背了"复用"的概念。于是乎,"克隆模式"便呼之待出了。在发布端存储下这些消息,为了避免队列的堆积这样的杯具,也为了更好的订阅体验,kev-value似乎是不错的选择。

注意:这里的kev-value并非目前红火的nosql(虽然有些类似),可以理解成发布者的数据仓库(应该可以这么理解吧)。

为了简单明了,这里将会对整个机制做一个拆解。

更新数据的存储
模型图:
http://www.agoit.com/bbs/https://github.com/imatix/zguide/raw/master/images/fig68.png
服务器:
////  Clone server Model One////  Lets us build this source without creating a library#include "kvsimple.c"int main (void){    //  Prepare our context and publisher socket    zctx_t *ctx = zctx_new ();    void *publisher = zsocket_new (ctx, ZMQ_PUB);    zsocket_bind (publisher, "tcp://*:5556");    zclock_sleep (200);    zhash_t *kvmap = zhash_new ();    int64_t sequence = 0;    srandom ((unsigned) time (NULL));    while (!zctx_interrupted) {        //  Distribute as key-value message        kvmsg_t *kvmsg = kvmsg_new (++sequence);        kvmsg_fmt_key  (kvmsg, "%d", randof (10000));        kvmsg_fmt_body (kvmsg, "%d", randof (1000000));        kvmsg_send     (kvmsg, publisher);        kvmsg_store   (&kvmsg, kvmap);    }    printf (" Interrupted\n%d messages out\n", (int) sequence);    zhash_destroy (&kvmap);    zctx_destroy (&ctx);    return 0;}
客户端:
////  Clone client Model One////  Lets us build this source without creating a library#include "kvsimple.c"int main (void){    //  Prepare our context and updates socket    zctx_t *ctx = zctx_new ();    void *updates = zsocket_new (ctx, ZMQ_SUB);    zsocket_connect (updates, "tcp://localhost:5556");    zhash_t *kvmap = zhash_new ();    int64_t sequence = 0;    while (TRUE) {        kvmsg_t *kvmsg = kvmsg_recv (updates);        if (!kvmsg)            break;          //  Interrupted        kvmsg_store (&kvmsg, kvmap);        sequence++;    }    printf (" Interrupted\n%d messages in\n", (int) sequence);    zhash_destroy (&kvmap);    zctx_destroy (&ctx);    return 0;}
key-value库:
/*  =====================================================================    kvsimple - simple key-value message class for example applications    ---------------------------------------------------------------------    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>    Copyright other contributors as noted in the AUTHORS file.    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org    This is free software; you can redistribute it and/or modify it under    the terms of the GNU Lesser General Public License as published by    the Free Software Foundation; either version 3 of the License, or (at    your option) any later version.    This software is distributed in the hope that it will be useful, but    WITHOUT ANY WARRANTY; without even the implied warranty of    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU    Lesser General Public License for more details.    You should have received a copy of the GNU Lesser General Public    License along with this program. If not, see    <http://www.gnu.org/licenses/>.    =====================================================================*/#include "kvsimple.h"#include "zlist.h"//  Keys are short strings#define KVMSG_KEY_MAX   255//  Message is formatted on wire as 4 frames://  frame 0: key (0MQ string)//  frame 1: sequence (8 bytes, network order)//  frame 2: body (blob)#define FRAME_KEY       0#define FRAME_SEQ       1#define FRAME_BODY      2#define KVMSG_FRAMES    3//  Structure of our classstruct _kvmsg {    //  Presence indicators for each frame    int present ;    //  Corresponding 0MQ message frames, if any    zmq_msg_t frame ;    //  Key, copied into safe C string    char key ;};//  ---------------------------------------------------------------------//  Constructor, sets sequence as providedkvmsg_t *kvmsg_new (int64_t sequence){    kvmsg_t        *self;    self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));    kvmsg_set_sequence (self, sequence);    return self;}//  ---------------------------------------------------------------------//  Destructor//  Free shim, compatible with zhash_free_fnvoidkvmsg_free (void *ptr){    if (ptr) {        kvmsg_t *self = (kvmsg_t *) ptr;        //  Destroy message frames if any        int frame_nbr;        for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)            if (self->present )                zmq_msg_close (&self->frame );        //  Free object itself        free (self);    }}voidkvmsg_destroy (kvmsg_t **self_p){    assert (self_p);    if (*self_p) {        kvmsg_free (*self_p);        *self_p = NULL;    }}//  ---------------------------------------------------------------------//  Reads key-value message from socket, returns new kvmsg instance.kvmsg_t *kvmsg_recv (void *socket){    assert (socket);    kvmsg_t *self = kvmsg_new (0);    //  Read all frames off the wire, reject if bogus    int frame_nbr;    for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {        if (self->present )            zmq_msg_close (&self->frame );        zmq_msg_init (&self->frame );        self->present  = 1;        if (zmq_recvmsg (socket, &self->frame , 0) == -1) {            kvmsg_destroy (&self);            break;        }        //  Verify multipart framing        int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;        if (zsockopt_rcvmore (socket) != rcvmore) {            kvmsg_destroy (&self);            break;        }    }    return self;}//  ---------------------------------------------------------------------//  Send key-value message to socket; any empty frames are sent as such.voidkvmsg_send (kvmsg_t *self, void *socket){    assert (self);    assert (socket);    int frame_nbr;    for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {        zmq_msg_t copy;        zmq_msg_init (&copy);        if (self->present )            zmq_msg_copy (&copy, &self->frame );        zmq_sendmsg (socket, &copy,            (frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);        zmq_msg_close (&copy);    }}//  ---------------------------------------------------------------------//  Return key from last read message, if any, else NULLchar *kvmsg_key (kvmsg_t *self){    assert (self);    if (self->present ) {        if (!*self->key) {            size_t size = zmq_msg_size (&self->frame );            if (size > KVMSG_KEY_MAX)                size = KVMSG_KEY_MAX;            memcpy (self->key,                zmq_msg_data (&self->frame ), size);            self->key  = 0;        }        return self->key;    }    else        return NULL;}//  ---------------------------------------------------------------------//  Return sequence nbr from last read message, if anyint64_tkvmsg_sequence (kvmsg_t *self){    assert (self);    if (self->present ) {        assert (zmq_msg_size (&self->frame ) == 8);        byte *source = zmq_msg_data (&self->frame );        int64_t sequence = ((int64_t) (source ) << 56)                         + ((int64_t) (source ) << 48)                         + ((int64_t) (source ) << 40)                         + ((int64_t) (source ) << 32)                         + ((int64_t) (source ) << 24)                         + ((int64_t) (source ) << 16)                         + ((int64_t) (source ) << 8)                         +  (int64_t) (source );        return sequence;    }    else        return 0;}//  ---------------------------------------------------------------------//  Return body from last read message, if any, else NULLbyte *kvmsg_body (kvmsg_t *self){    assert (self);    if (self->present )        return (byte *) zmq_msg_data (&self->frame );    else        return NULL;}//  ---------------------------------------------------------------------//  Return body size from last read message, if any, else zerosize_tkvmsg_size (kvmsg_t *self){    assert (self);    if (self->present )        return zmq_msg_size (&self->frame );    else        return 0;}//  ---------------------------------------------------------------------//  Set message key as providedvoidkvmsg_set_key (kvmsg_t *self, char *key){    assert (self);    zmq_msg_t *msg = &self->frame ;    if (self->present )        zmq_msg_close (msg);    zmq_msg_init_size (msg, strlen (key));    memcpy (zmq_msg_data (msg), key, strlen (key));    self->present  = 1;}//  ---------------------------------------------------------------------//  Set message sequence numbervoidkvmsg_set_sequence (kvmsg_t *self, int64_t sequence){    assert (self);    zmq_msg_t *msg = &self->frame ;    if (self->present )        zmq_msg_close (msg);    zmq_msg_init_size (msg, 8);    byte *source = zmq_msg_data (msg);    source  = (byte) ((sequence >> 56) & 255);    source  = (byte) ((sequence >> 48) & 255);    source  = (byte) ((sequence >> 40) & 255);    source  = (byte) ((sequence >> 32) & 255);    source  = (byte) ((sequence >> 24) & 255);    source  = (byte) ((sequence >> 16) & 255);    source  = (byte) ((sequence >> 8)  & 255);    source  = (byte) ((sequence)       & 255);    self->present  = 1;}//  ---------------------------------------------------------------------//  Set message bodyvoidkvmsg_set_body (kvmsg_t *self, byte *body, size_t size){    assert (self);    zmq_msg_t *msg = &self->frame ;    if (self->present )        zmq_msg_close (msg);    self->present  = 1;    zmq_msg_init_size (msg, size);    memcpy (zmq_msg_data (msg), body, size);}//  ---------------------------------------------------------------------//  Set message key using printf formatvoidkvmsg_fmt_key (kvmsg_t *self, char *format, …){    char value ;    va_list args;    assert (self);    va_start (args, format);    vsnprintf (value, KVMSG_KEY_MAX, format, args);    va_end (args);    kvmsg_set_key (self, value);}//  ---------------------------------------------------------------------//  Set message body using printf formatvoidkvmsg_fmt_body (kvmsg_t *self, char *format, …){    char value ;    va_list args;    assert (self);    va_start (args, format);    vsnprintf (value, 255, format, args);    va_end (args);    kvmsg_set_body (self, (byte *) value, strlen (value));}//  ---------------------------------------------------------------------//  Store entire kvmsg into hash map, if key/value are set//  Nullifies kvmsg reference, and destroys automatically when no longer//  needed.voidkvmsg_store (kvmsg_t **self_p, zhash_t *hash){    assert (self_p);    if (*self_p) {        kvmsg_t *self = *self_p;        assert (self);        if (self->present         &&  self->present ) {            zhash_update (hash, kvmsg_key (self), self);            zhash_freefn (hash, kvmsg_key (self), kvmsg_free);        }        *self_p = NULL;    }}//  ---------------------------------------------------------------------//  Dump message to stderr, for debugging and tracingvoidkvmsg_dump (kvmsg_t *self){    if (self) {        if (!self) {            fprintf (stderr, "NULL");            return;        }        size_t size = kvmsg_size (self);        byte  *body = kvmsg_body (self);        fprintf (stderr, "", kvmsg_sequence (self));        fprintf (stderr, "", kvmsg_key (self));        fprintf (stderr, " ", size);        int char_nbr;        for (char_nbr = 0; char_nbr < size; char_nbr++)            fprintf (stderr, "%02X", body );        fprintf (stderr, "\n");    }    else        fprintf (stderr, "NULL message\n");}//  ---------------------------------------------------------------------//  Runs self test of classintkvmsg_test (int verbose){    kvmsg_t        *kvmsg;    printf (" * kvmsg: ");    //  Prepare our context and sockets    zctx_t *ctx = zctx_new ();    void *output = zsocket_new (ctx, ZMQ_DEALER);    int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc");    assert (rc == 0);    void *input = zsocket_new (ctx, ZMQ_DEALER);    rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc");    assert (rc == 0);    zhash_t *kvmap = zhash_new ();    //  Test send and receive of simple message    kvmsg = kvmsg_new (1);    kvmsg_set_key  (kvmsg, "key");    kvmsg_set_body (kvmsg, (byte *) "body", 4);    if (verbose)        kvmsg_dump (kvmsg);    kvmsg_send (kvmsg, output);    kvmsg_store (&kvmsg, kvmap);    kvmsg = kvmsg_recv (input);    if (verbose)        kvmsg_dump (kvmsg);    assert (streq (kvmsg_key (kvmsg), "key"));    kvmsg_store (&kvmsg, kvmap);    //  Shutdown and destroy all objects    zhash_destroy (&kvmap);    zctx_destroy (&ctx);    printf ("OK\n");    return 0;}

根据key获取数据
其实,当订阅者可以发出key来获取数据的时候,它已经不是一个纯粹的订阅者了,或许客户端的称谓会更合适些。
模型图:
http://www.agoit.com/bbs/https://github.com/imatix/zguide/raw/master/images/fig69.png

服务器:
////  Clone server Model Two////  Lets us build this source without creating a library#include "kvsimple.c"static int s_send_single (char *key, void *data, void *args);static void state_manager (void *args, zctx_t *ctx, void *pipe);int main (void){    //  Prepare our context and sockets    zctx_t *ctx = zctx_new ();    void *publisher = zsocket_new (ctx, ZMQ_PUB);    zsocket_bind (publisher, "tcp://*:5557");    int64_t sequence = 0;    srandom ((unsigned) time (NULL));    //  Start state manager and wait for synchronization signal    void *updates = zthread_fork (ctx, state_manager, NULL);    free (zstr_recv (updates));    while (!zctx_interrupted) {        //  Distribute as key-value message        kvmsg_t *kvmsg = kvmsg_new (++sequence);        kvmsg_fmt_key  (kvmsg, "%d", randof (10000));        kvmsg_fmt_body (kvmsg, "%d", randof (1000000));        kvmsg_send     (kvmsg, publisher);        kvmsg_send     (kvmsg, updates);        kvmsg_destroy (&kvmsg);    }    printf (" Interrupted\n%d messages out\n", (int) sequence);    zctx_destroy (&ctx);    return 0;}//  Routing information for a key-value snapshottypedef struct {    void *socket;           //  ROUTER socket to send to    zframe_t *identity;     //  Identity of peer who requested state} kvroute_t;//  Send one state snapshot key-value pair to a socket//  Hash item data is our kvmsg object, ready to sendstatic ints_send_single (char *key, void *data, void *args){    kvroute_t *kvroute = (kvroute_t *) args;    //  Send identity of recipient first    zframe_send (&kvroute->identity,        kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);    kvmsg_t *kvmsg = (kvmsg_t *) data;    kvmsg_send (kvmsg, kvroute->socket);    return 0;}//  This thread maintains the state and handles requests from//  clients for snapshots.//static voidstate_manager (void *args, zctx_t *ctx, void *pipe){    zhash_t *kvmap = zhash_new ();    zstr_send (pipe, "READY");    void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);    zsocket_bind (snapshot, "tcp://*:5556");    zmq_pollitem_t items [] = {        { pipe, 0, ZMQ_POLLIN, 0 },        { snapshot, 0, ZMQ_POLLIN, 0 }    };    int64_t sequence = 0;       //  Current snapshot version number    while (!zctx_interrupted) {        int rc = zmq_poll (items, 2, -1);        if (rc == -1 && errno == ETERM)            break;              //  Context has been shut down        //  Apply state update from main thread        if (items .revents & ZMQ_POLLIN) {            kvmsg_t *kvmsg = kvmsg_recv (pipe);            if (!kvmsg)                break;          //  Interrupted            sequence = kvmsg_sequence (kvmsg);            kvmsg_store (&kvmsg, kvmap);        }        //  Execute state snapshot request        if (items .revents & ZMQ_POLLIN) {            zframe_t *identity = zframe_recv (snapshot);            if (!identity)                break;          //  Interrupted            //  Request is in second frame of message            char *request = zstr_recv (snapshot);            if (streq (request, "ICANHAZ?"))                free (request);            else {                printf ("E: bad request, aborting\n");                break;            }            //  Send state snapshot to client            kvroute_t routing = { snapshot, identity };            //  For each entry in kvmap, send kvmsg to client            zhash_foreach (kvmap, s_send_single, &routing);            //  Now send END message with sequence number            printf ("Sending state shapshot=%d\n", (int) sequence);            zframe_send (&identity, snapshot, ZFRAME_MORE);            kvmsg_t *kvmsg = kvmsg_new (sequence);            kvmsg_set_key  (kvmsg, "KTHXBAI");            kvmsg_set_body (kvmsg, (byte *) "", 0);            kvmsg_send     (kvmsg, snapshot);            kvmsg_destroy (&kvmsg);        }    }    zhash_destroy (&kvmap);}
客户端:
////  Clone client Model Two////  Lets us build this source without creating a library#include "kvsimple.c"int main (void){    //  Prepare our context and subscriber    zctx_t *ctx = zctx_new ();    void *snapshot = zsocket_new (ctx, ZMQ_DEALER);    zsocket_connect (snapshot, "tcp://localhost:5556");    void *subscriber = zsocket_new (ctx, ZMQ_SUB);    zsocket_connect (subscriber, "tcp://localhost:5557");    zhash_t *kvmap = zhash_new ();    //  Get state snapshot    int64_t sequence = 0;    zstr_send (snapshot, "ICANHAZ?");    while (TRUE) {        kvmsg_t *kvmsg = kvmsg_recv (snapshot);        if (!kvmsg)            break;          //  Interrupted        if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {            sequence = kvmsg_sequence (kvmsg);            printf ("Received snapshot=%d\n", (int) sequence);            kvmsg_destroy (&kvmsg);            break;          //  Done        }        kvmsg_store (&kvmsg, kvmap);    }    //  Now apply pending updates, discard out-of-sequence messages    while (!zctx_interrupted) {        kvmsg_t *kvmsg = kvmsg_recv (subscriber);        if (!kvmsg)            break;          //  Interrupted        if (kvmsg_sequence (kvmsg) > sequence) {            sequence = kvmsg_sequence (kvmsg);            kvmsg_store (&kvmsg, kvmap);        }        else            kvmsg_destroy (&kvmsg);    }    zhash_destroy (&kvmap);    zctx_destroy (&ctx);    return 0;}

重新发布更新
上面的模型中,数据都集中在一点,或许会有服务器崩溃而导致数据丢失的顾虑,那么,把数据放到客户端呢?
模型图:
http://www.agoit.com/bbs/https://github.com/imatix/zguide/raw/master/images/fig70.png
服务器:
////  Clone server Model Three////  Lets us build this source without creating a library#include "kvsimple.c"static int s_send_single (char *key, void *data, void *args);//  Routing information for a key-value snapshottypedef struct {    void *socket;           //  ROUTER socket to send to    zframe_t *identity;     //  Identity of peer who requested state} kvroute_t;int main (void){    //  Prepare our context and sockets    zctx_t *ctx = zctx_new ();    void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);    zsocket_bind (snapshot, "tcp://*:5556");    void *publisher = zsocket_new (ctx, ZMQ_PUB);    zsocket_bind (publisher, "tcp://*:5557");    void *collector = zsocket_new (ctx, ZMQ_PULL);    zsocket_bind (collector, "tcp://*:5558");    int64_t sequence = 0;    zhash_t *kvmap = zhash_new ();    zmq_pollitem_t items [] = {        { collector, 0, ZMQ_POLLIN, 0 },        { snapshot, 0, ZMQ_POLLIN, 0 }    };    while (!zctx_interrupted) {        int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);        //  Apply state update sent from client        if (items .revents & ZMQ_POLLIN) {            kvmsg_t *kvmsg = kvmsg_recv (collector);            if (!kvmsg)                break;          //  Interrupted            kvmsg_set_sequence (kvmsg, ++sequence);            kvmsg_send (kvmsg, publisher);            kvmsg_store (&kvmsg, kvmap);            printf ("I: publishing update %5d\n", (int) sequence);        }        //  Execute state snapshot request        if (items .revents & ZMQ_POLLIN) {            zframe_t *identity = zframe_recv (snapshot);            if (!identity)                break;          //  Interrupted            //  Request is in second frame of message            char *request = zstr_recv (snapshot);            if (streq (request, "ICANHAZ?"))                free (request);            else {                printf ("E: bad request, aborting\n");                break;            }            //  Send state snapshot to client            kvroute_t routing = { snapshot, identity };            //  For each entry in kvmap, send kvmsg to client            zhash_foreach (kvmap, s_send_single, &routing);            //  Now send END message with sequence number            printf ("I: sending shapshot=%d\n", (int) sequence);            zframe_send (&identity, snapshot, ZFRAME_MORE);            kvmsg_t *kvmsg = kvmsg_new (sequence);            kvmsg_set_key  (kvmsg, "KTHXBAI");            kvmsg_set_body (kvmsg, (byte *) "", 0);            kvmsg_send     (kvmsg, snapshot);            kvmsg_destroy (&kvmsg);        }    }    printf (" Interrupted\n%d messages handled\n", (int) sequence);    zhash_destroy (&kvmap);    zctx_destroy (&ctx);    return 0;}//  Send one state snapshot key-value pair to a socket//  Hash item data is our kvmsg object, ready to sendstatic ints_send_single (char *key, void *data, void *args){    kvroute_t *kvroute = (kvroute_t *) args;    //  Send identity of recipient first    zframe_send (&kvroute->identity,        kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);    kvmsg_t *kvmsg = (kvmsg_t *) data;    kvmsg_send (kvmsg, kvroute->socket);    return 0;}
客户端:
////  Clone client Model Three////  Lets us build this source without creating a library#include "kvsimple.c"int main (void){    //  Prepare our context and subscriber    zctx_t *ctx = zctx_new ();    void *snapshot = zsocket_new (ctx, ZMQ_DEALER);    zsocket_connect (snapshot, "tcp://localhost:5556");    void *subscriber = zsocket_new (ctx, ZMQ_SUB);    zsocket_connect (subscriber, "tcp://localhost:5557");    void *publisher = zsocket_new (ctx, ZMQ_PUSH);    zsocket_connect (publisher, "tcp://localhost:5558");    zhash_t *kvmap = zhash_new ();    srandom ((unsigned) time (NULL));    //  Get state snapshot    int64_t sequence = 0;    zstr_send (snapshot, "ICANHAZ?");    while (TRUE) {        kvmsg_t *kvmsg = kvmsg_recv (snapshot);        if (!kvmsg)            break;          //  Interrupted        if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {            sequence = kvmsg_sequence (kvmsg);            printf ("I: received snapshot=%d\n", (int) sequence);            kvmsg_destroy (&kvmsg);            break;          //  Done        }        kvmsg_store (&kvmsg, kvmap);    }    int64_t alarm = zclock_time () + 1000;    while (!zctx_interrupted) {        zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };        int tickless = (int) ((alarm - zclock_time ()));        if (tickless < 0)            tickless = 0;        int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);        if (rc == -1)            break;              //  Context has been shut down        if (items .revents & ZMQ_POLLIN) {            kvmsg_t *kvmsg = kvmsg_recv (subscriber);            if (!kvmsg)                break;          //  Interrupted            //  Discard out-of-sequence kvmsgs, incl. heartbeats            if (kvmsg_sequence (kvmsg) > sequence) {                sequence = kvmsg_sequence (kvmsg);                kvmsg_store (&kvmsg, kvmap);                printf ("I: received update=%d\n", (int) sequence);            }            else                kvmsg_destroy (&kvmsg);        }        //  If we timed-out, generate a random kvmsg        if (zclock_time () >= alarm) {            kvmsg_t *kvmsg = kvmsg_new (0);            kvmsg_fmt_key  (kvmsg, "%d", randof (10000));            kvmsg_fmt_body (kvmsg, "%d", randof (1000000));            kvmsg_send     (kvmsg, publisher);            kvmsg_destroy (&kvmsg);            alarm = zclock_time () + 1000;        }    }    printf (" Interrupted\n%d messages in\n", (int) sequence);    zhash_destroy (&kvmap);    zctx_destroy (&ctx);    return 0;}
克隆子树
事实上,并不是所有的消费者都愿意消费发布者所提供的所有信息,那么,针对特别的群体,只需提供一个子集就可以了。
服务器:
////  Clone server Model Four////  Lets us build this source without creating a library#include "kvsimple.c"static int s_send_single (char *key, void *data, void *args);//  Routing information for a key-value snapshottypedef struct {    void *socket;           //  ROUTER socket to send to    zframe_t *identity;     //  Identity of peer who requested state    char *subtree;          //  Client subtree specification} kvroute_t;int main (void){    //  Prepare our context and sockets    zctx_t *ctx = zctx_new ();    void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);    zsocket_bind (snapshot, "tcp://*:5556");    void *publisher = zsocket_new (ctx, ZMQ_PUB);    zsocket_bind (publisher, "tcp://*:5557");    void *collector = zsocket_new (ctx, ZMQ_PULL);    zsocket_bind (collector, "tcp://*:5558");    int64_t sequence = 0;    zhash_t *kvmap = zhash_new ();    zmq_pollitem_t items [] = {        { collector, 0, ZMQ_POLLIN, 0 },        { snapshot, 0, ZMQ_POLLIN, 0 }    };    while (!zctx_interrupted) {        int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);        //  Apply state update sent from client        if (items .revents & ZMQ_POLLIN) {            kvmsg_t *kvmsg = kvmsg_recv (collector);            if (!kvmsg)                break;          //  Interrupted            kvmsg_set_sequence (kvmsg, ++sequence);            kvmsg_send (kvmsg, publisher);            kvmsg_store (&kvmsg, kvmap);            printf ("I: publishing update %5d\n", (int) sequence);        }        //  Execute state snapshot request        if (items .revents & ZMQ_POLLIN) {            zframe_t *identity = zframe_recv (snapshot);            if (!identity)                break;          //  Interrupted            //  Request is in second frame of message            char *request = zstr_recv (snapshot);            char *subtree = NULL;            if (streq (request, "ICANHAZ?")) {                free (request);                subtree = zstr_recv (snapshot);            }            else {                printf ("E: bad request, aborting\n");                break;            }            //  Send state snapshot to client            kvroute_t routing = { snapshot, identity, subtree };            //  For each entry in kvmap, send kvmsg to client            zhash_foreach (kvmap, s_send_single, &routing);            //  Now send END message with sequence number            printf ("I: sending shapshot=%d\n", (int) sequence);            zframe_send (&identity, snapshot, ZFRAME_MORE);            kvmsg_t *kvmsg = kvmsg_new (sequence);            kvmsg_set_key  (kvmsg, "KTHXBAI");            kvmsg_set_body (kvmsg, (byte *) subtree, 0);            kvmsg_send     (kvmsg, snapshot);            kvmsg_destroy (&kvmsg);            free (subtree);        }    }    printf (" Interrupted\n%d messages handled\n", (int) sequence);    zhash_destroy (&kvmap);    zctx_destroy (&ctx);    return 0;}//  Send one state snapshot key-value pair to a socket//  Hash item data is our kvmsg object, ready to sendstatic ints_send_single (char *key, void *data, void *args){    kvroute_t *kvroute = (kvroute_t *) args;    kvmsg_t *kvmsg = (kvmsg_t *) data;    if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))    &&  memcmp (kvroute->subtree,                kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {        //  Send identity of recipient first        zframe_send (&kvroute->identity,            kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);        kvmsg_send (kvmsg, kvroute->socket);    }    return 0;}
客户端:
////  Clone client Model Four////  Lets us build this source without creating a library#include "kvsimple.c"#define SUBTREE "/client/"int main (void){    //  Prepare our context and subscriber    zctx_t *ctx = zctx_new ();    void *snapshot = zsocket_new (ctx, ZMQ_DEALER);    zsocket_connect (snapshot, "tcp://localhost:5556");    void *subscriber = zsocket_new (ctx, ZMQ_SUB);    zsocket_connect (subscriber, "tcp://localhost:5557");    zsockopt_set_subscribe (subscriber, SUBTREE);    void *publisher = zsocket_new (ctx, ZMQ_PUSH);    zsocket_connect (publisher, "tcp://localhost:5558");    zhash_t *kvmap = zhash_new ();    srandom ((unsigned) time (NULL));    //  Get state snapshot    int64_t sequence = 0;    zstr_sendm (snapshot, "ICANHAZ?");    zstr_send  (snapshot, SUBTREE);    while (TRUE) {        kvmsg_t *kvmsg = kvmsg_recv (snapshot);        if (!kvmsg)            break;          //  Interrupted        if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {            sequence = kvmsg_sequence (kvmsg);            printf ("I: received snapshot=%d\n", (int) sequence);            kvmsg_destroy (&kvmsg);            break;          //  Done        }        kvmsg_store (&kvmsg, kvmap);    }    int64_t alarm = zclock_time () + 1000;    while (!zctx_interrupted) {        zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };        int tickless = (int) ((alarm - zclock_time ()));        if (tickless < 0)            tickless = 0;        int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);        if (rc == -1)            break;              //  Context has been shut down        if (items .revents & ZMQ_POLLIN) {            kvmsg_t *kvmsg = kvmsg_recv (subscriber);            if (!kvmsg)                break;          //  Interrupted            //  Discard out-of-sequence kvmsgs, incl. heartbeats            if (kvmsg_sequence (kvmsg) > sequence) {                sequence = kvmsg_sequence (kvmsg);                kvmsg_store (&kvmsg, kvmap);                printf ("I: received update=%d\n", (int) sequence);            }            else                kvmsg_destroy (&kvmsg);        }        //  If we timed-out, generate a random kvmsg        if (zclock_time () >= alarm) {            kvmsg_t *kvmsg = kvmsg_new (0);            kvmsg_fmt_key  (kvmsg, "%s%d", SUBTREE, randof (10000));            kvmsg_fmt_body (kvmsg, "%d", randof (1000000));            kvmsg_send     (kvmsg, publisher);            kvmsg_destroy (&kvmsg);            alarm = zclock_time () + 1000;        }    }    printf (" Interrupted\n%d messages in\n", (int) sequence);    zhash_destroy (&kvmap);    zctx_destroy (&ctx);    return 0;}
(未完待续)
页: [1]
查看完整版本: zeroMQ初体验-32.发布/订阅模式进阶-克隆模式-上