(mOSAIC) Component hub
Contents
Semantic and syntax
Overview
Notes:
- types of interfaces:
- management interfaces (the purpose of this "hub");
- service interfaces (the purpose of dedicated efficient protocols);
- technical solutions:
- message queues;
- custom middleware;
References:
CORBA & Components, slide 6;
Generic
Specifications:
<component-identifier> ::= <128bit-token> <group-identifier> ::= <128bit-token> <object-identifier> ::= <compound-name> <correlation> ::= <128bit-token> <component-selector> ::= ??? <error> ::= ??? <single-name> ::= /^\@[a-z0-9]([a-z0-9-]*[a-z0-9])$/ <compound-name> ::= <single-name> ++ ( "." ++ <single-name> ) * <128bit-token> ::= /^[0-9a-f]{32}$/ <utc> ::= <real> ;; (MUST) in seconds according to UTC
<generic-request> ::= { "__service__" : <compound-name>, "__version__" : <positive-integer>, "action" : <compound-name>, "component" : <component-identifier> | undefined, "correlation" : <correlation> | null | undefined, "expiration" : <utc>, ;; (SHOULD) at least 6 seconds in the future } <generic-return> ::= { "__service__" : <compound-name>, ;; (MUST) the same as for the request "__version__" : <positive-integer>, ;; (MUST) the same as for the request "action" : <compound-name>, ;; (SHOULD) the same as for the request ++ ".return" "component" : <component-identifier> | null | undefined, "ok" : <boolean>, "error" : <error> | undefined, ;; (MUST) when <stop-return>."ok" =:= false "correlation" : <correlation>, ;; (MUST) the same as for the request, if different from null or undefined "timestamp" : <utc>, (SHOULD) at most 6 seconds in the future }
;; on the wire for transports with "ordered attachment" support <attachments> ::= [ <attachment-identifier>, ... ] ;; on the wire for transports with "named attachment" support <attachments> ::= { <attachment-identifier> : <string>, ... } ;; on the wire for transports without attachment support <attachments> ::= { <attachment-identifier> : <base64>, ... } ;; in API's to give an "unified" JSON <attachments> ::= { <attachment-identifier> : <binary>, ... } <attachment-identifier> ::= <compound-name>
Control
Specifications:
- stop:
<stop-request> ::= { "__service__" : "mosaic.components.control", "__version__" : 1, "action" : "stop", "component" : <component-identifier>, "selector" : <component-selector>, "disposition" : null | <json>, "correlation" : <correlation>, "expiration" : <utc>, } <stop-return> ::= { "__service__" : "mosaic.components.control", "__version__" : 1, "action" : "stop.return", "component" : <component-identifier> | null, "ok" : <boolean>, "error" : <error> | undefined, ;; (MUST) when <stop-return>."ok" =:= false "correlation" : <correlation>, "timestamp" : <utc>, }
Registry
Specifications:
- register:
<register-request> ::= { "__service__" : "mosaic.components.registry", "__version__" : 1, "action" : "register", "component" : <component-identifier>, "groups" : [ <group-identifier>, ... ], "correlation" : <correlation>, "expiration" : <utc>, } <register-return> ::= { "__service__" : "mosaic.components.registry", "__version__" : 1, "action" : "register.return", "ok" : <boolean>, "error" : <error> | undefined, ;; (MUST) when <register-return>."ok" =:= false "correlation" : <correlation>, "timestamp" : <utc>, }
- unregister:
<unregister-request> ::= { "__service__" : "mosaic.components.registry", "__version__" : 1, "action" : "unregister", "component" : <component-identifier>, "groups" : [ <group-identifier>, ... ] | null, "correlation" : <correlation>, "expiration" : <utc>, } <unregister-return> ::= { "__service__" : "mosaic.components.registry", "__version__" : 1, "action" : "unregister.return", "ok" : <boolean>, "error" : <error> | undefined, ;; (MUST) when <unregister-return>."ok" =:= false "correlation" : <correlation>, "timestamp" : <utc>, }
Notes:
to unregister from all groups set the group attribute to null;
References:
Extended Process Registry for Erlang (Section 8 -- Improved model and implementation (gproc));
Exchange
Specifications:
- call:
<call-request> ::= { "__service__" : "mosaic.components.exchange", "__version__" : 1, "action" : "call", "component" : <component-identifier>, "selector" : <component-selector>, "object" : <object-identifier>, "operation" : <compound-name>, "inputs" : <json>, "attachments" : <attachments>, "correlation" : <correlation> | null | undefined, "expiration" : <utc>, } <call-return> ::= { "__service__" : "mosaic.components.exchange", "__version__" : 1, "action" : "call.return", "component" : <component-identifier> | null, "ok" : <boolean>, "outputs" : <json> | undefined, ;; (MUST) when <call-return>."ok" =:= true "error" : <error> | undefined, ;; (MUST) when <call-return>."ok" =:= false "correlation" : <correlation>, "timestamp" : <utc>, }
Notes:
to obtain a "fire-and-forget" semantic either (preferably) explicitly put the correlation attribute to null, or skip setting any value (the implicit variant);
- to obtain a multicast or broadcast semantic we use the selector;
- if the selector has a multicast or broadcast semantic, then the caller should expect multiple return messages with the same correlation value;
Notes:
- objects, as "children" of components;
- sessions, as state-full communication guards;
- notifications, as publish / subscribe mechanism;
References:
Attributes
Specifications:
- update:
<update-request> ::= { "__service__" : "mosaic.components.attributes", "__version__" : 1, "action" : "update", "component" : <component-identifier>, "include" : { <object-identifier> : { <attribute-identifier> : <json>, ... }, ... }, "exclude" : { <object-identifier-pattern> : [ <attribute-identifier-pattern>, ... ] | null, ... } | null, "correlation" : <correlation> | null | undefined, "expiration" : <utc>, } <update-return> ::= { "__service__" : "mosaic.component.attributes", "__version__" : 1, "action" : "update.return", "ok" : <boolean>, "error" : <error> | undefined, ;; (MUST) when <update-return>."ok" =:= false "correlation" : <correlation>, "timestamp" : <utc>, }
- select:
<select-request> ::= { "__service__" : "mosaic.components.attributes", "__version__" : 1, "action" : "select", "component" : <component-identifier>, "patterns" : { <object-identifier-pattern> : [ <attribute-identifier-pattern>, ... ] | null, ... } | null, "correlation" : <correlation>, "expiration" : <utc>, } <select-return> ::= { "__service__" : "mosaic.component.attributes", "__version__" : 1, "action" : "select.return", "ok" : <boolean>, "values" : { <object-identifier> : { <attribute-identifier> : <json>, ... }, ... }, ;; (MUST) when <select-return>."ok" =:= true "error" : <error> | undefined, ;; (MUST) when <select-return>."ok" =:= false "correlation" : <correlation>, "timestamp" : <utc>, }
Notes:
to completely replace all attributes, we could use the exclude field and put it to null; (the same for all the properties of a particular object;)
as in the case of exchange operations, to obtain a "fire-and-forget" semantic set the correlation field to null;
References:
Extended Process Registry for Erlang (Section 8 -- Improved model and implementation (gproc));
Monitoring
Notes:
- control;
- registry;
- attribute;
References:
Logging
References:
Connectors
ZeroMQ topology
Processes:
- component;
- hub;
Sockets:
- component:
of type XREQ;
- with identity set to the component identifier (as binary, i.e. 16 octets);
- connects to one or more hub frontends (as discovered from DNS, or well known endpoint);
- hub fronted:
of type XREQ;
- with identity set to a random identifier (as 16 octets binary);
Payloads:
component <-> hub frontend:
...
RabbitMQ topology
...
Related work
References:
Ice;
Implementations
Version 1
Details
- each component is identified as a set of processes (maybe constrained to a particular container or operating system isolation unit);
- for each component there is one "special" process called the "supervisor", which must implement the "component hub protocol"; (thus by "component" we usually understand this "supervisor" process;)
- usually this process also does the work, and the "special supervisor" role is implemented as some attached code running in its own thread; (currently all components act like this;)
- the protocol mandates the following:
- the supervisor is always started by the hub (or a delegate of it), which we'll call "harness";
the supervisor communicates with the hub via standard input / standard output (thus always through UNIX pipes, descriptors 0 and 1);
the supervisor must take care that all logging is done through standard error (descriptor 2), or else it will be terminated by the "harness";
- the supervisor reads and writes "messages" which are framed like (i.e. "length+value"):
<data-length : 32 bit big-endian unsigned integer> <data : arbitrary octet array of length `data-length`>
the messages are encoded as (thus only the data part, as the length is used only for framing):
<json-data : UTF-8 string delimited by a null byte `\0`> <binary-data : arbitrary octet array until the end of the frame>
it is obvious that any valid json-data does not contain any null bytes \0 (as it is against the UTF-8 standard);
it is obvious that the binary-data can contain any byte, including \0;
the json-data encodes various requests that are "understood" by the hub;
- common data-types:
<component-identifier> ::= <128bit-token> <group-identifier> ::= <128bit-token> <correlation> ::= <128bit-token> <128bit-token> ::= /^[0-9a-f]{32}$/
the call operation:
- it embodies the "request-reply" pattern;
- requests and replies are asynchronous and may be out-of-order;
- the supervisor sends (or receives for inbound requests):
{ "action" : "call", "component" : <component-identifier> | <group-identifier>, "operation" : <string>, "inputs" : <json>, "correlation" : <correlation> }
- the supervisor receives (or sends for inbound requests) one of the following:
{ "action" : "call-return", "correlation" : <correlation>, "ok" : true, "outputs" : <json> }
{ "action" : "call-return", "correlation" : <correlation>, "ok" : false, "error" : <json> }
the cast operation:
- it embodies the "send-and-forget" pattern;
- the supervisor sends or receives:
{ "action" : "cast", "component" : <component-identifier> | <group-identifier>, "operation" : <string>, "inputs" : <json> }
the register operation:
- it allows the supervisor to "register" the current component to one group;
- it is only initiated by the component;
- the supervisor sends:
{ "action" : "register", "group" : <group-identifier>, "correlation" : <correlation> }
- the supervisor receives one of the following:
{ "action" : "register-return", "correlation" : <correlation>, "ok" : true }
{ "action" : "register-return", "correlation" : <correlation>, "ok" : false, "error" : <json> }
Code
- Java implementation:
the BasicChannel is responsible for input / output, framing, encoding / decoding;
the BasicComponent is responsible for semantic;
AbacusComponentCallbacks -- simple example;
- Erlang implementation:
mosaic_harness_backend.erl -- handles input / output, and framing;
mosaic_harness_coders.erl -- handles low-level encoding / decoding;
mosaic_component_coders.erl -- handles high-level encoding / decoding;
mosaic_component_backend.erl -- acts as a "facade" for the developer;
mosaic_riak_kv_callbacks.erl -- Riak KV component supervisor;
mosaic_rabbitmq_callbacks.erl -- RabbitMQ component supervisor;
- NodeJS implementation:
- initial prototype:
mosaic_component_backend.js -- handles all concerns;
mosaic_component_abacus.js -- simple example;
- working prototype:
component-lib.js -- similar to mosaic_component_backend.js;
component-main.js -- "RealTime Feeds" components supervisor;
- initial prototype:
- Python implementation:
harness.py -- handles input / output, and framing;
messages.py -- handles encoding / decoding;
component.py (from `common`) -- handles life-cycle and dispatching;
component.py (from `backend`) -- acts as a "facade" for the developer;
abacus.py -- simple example;