The message queue based on HBase: HQueue
1. The HQueue profile
HQueue is adding a search page fetching offline system based on HBase team developed a set of distributed, a persistent message queue.It used HTable message data storage, with the aid of HBase KeyValue Coprocessor will be the original data encapsulation into message data formats for storage, and based on the HBase Client API to encapsulate the HQueue Client API access for news.
HQueue can be effectively used in the need to store the time series data, as graphs Job and input, output, such as cost for the upstream and downstream sharing data, etc.
2. The HQueue features
Because the HQueue is based on HBase to access news, so standing on the shoulders of the hadoop and HBase, make its have the following characteristics:
(1) support multiple Partitions, can be set up according to demand the size of the Queue to support high concurrent access (HBase Region);
(2) supports automatic Failover, any machine Down, Partition can be automatically migrated to other machines (HBase Failover mechanism);
(3) support for dynamic load balancing, dynamic Partition can be scheduled to the most reasonable machine (HBase LoadBalance mechanism, can dynamically adjust);
(4) of the message by using the HBase persistent storage, do not lose data (HBase HLog and HDFS Append);
(5) of the queue, speaking, reading and writing mode and natural suit HBase storage properties, good concurrent read and write performance (stored in MemStore latest news and write the message directly into MemStore, usually scenarios are memory level operation);
(6) support message classified according to the Topic access (the Qualifier in HBase);
(7) support message TTL, automatic cleaning expired messages (level of HBase support KeyValue TTL);
(8) HQueue = HTable Schema Design + HQueue Coprocessor + HBase Client Wrapper, extension development completely, without any Hack work, can automatically update with the HBase;
(9) HQueue Client API based on HBase Client Wrapper for simple packaging, HBase ThriftServer makes its support for multiple languages apis, so the HQueue is also easy to encapsulate the multilingual API;
(10) HQueue Client API to support natural Hadoop graphs InputFormat mechanism of Job and cost, using the Locality feature will allow computing to store recent machines;
(11) HQueue support news subscription mechanism (HQueue and subsequent version 0.3).
3. The HQueue system design and processing
3.1. HQueue system structure
HQueue system structure as shown in figure (1) :
Figure (1) : HQueue system structure
(1) each Queue has a HTable, create the Queue Presharding Table may be able to create, to load balance.
(2) each Queue can have multiple Partitions (HBase Regions), these Partitions evenly distributed multiple Region in HBase cluster Servers.
(3) each Partition can multiple in HBase cluster Region live migration in the Servers.Any a Region Server hang up, running on the HQueue Partition can be automatically migrated to other Region on the Server, and the data is not lost.When the cluster load imbalance, the HQueue Partition will automatically be migrated HMaster Region Server load is low.
(4) each Message corresponding to an HBase KeyValue Pair, stored in HBase in MessageID namely time order in the Region.MessageID by Timestamp and under the same Timestamp on the SequenceID constitute, detailed information, refer to the Message storage structure parts.
3.2. The Message storage structure
The Message storage structure as shown in figure (2) :
Figure (2) : the Message storage structure
(1) RowKey: made up of PartitionID and MessageID.
; PartitionID: a Queue can have multiple Partitions, at present the most support Short.
MAX_VALUE Partitions.Partition ID can create the Message object is not specified, but
when sending a Message set, or is not specified and the use of a random Partition ID.
; MessageID: namely the message ID, it consists of two parts, the Timestamp and
SequenceID.Timestamp is the time when news writing HQueue stamp,
milliseconds.SequenceID is under the same Timestamp message sequence number, at
present the most support under the same Timestamp Short. MAX_VALUE Messages.
(2) the Column: the Column Family and Message Topic.
; The Column Family: HBase Column Family, here is a fixed value "message".
; Message Topic: HBase Column Qualifier, news Topic names.The user can according to
need the Message storage under different switchable viewer, interested can also be
obtained from the Queue switchable viewer Message data.
(3) Value: the message content.
3.3. HQueue news writing and Coprocessor treatment process
HQueue using HQueue Client API to write message data, in order to ensure the message only and orderly, HQueue dealing with of user write messages with Coprocessor MessageID, then immediately into the HBase MemStore, make its can be access to, finally HLog of persistence.Specific processing logic as shown in figure (3) :
Figure (3) : writing data and Coprocessor treatment process
(1) the HQueue encapsulates the HQueue Client API, users can use the provided the Put method to write the message in the HQueue.
(2) the HQueue Client will use Message. MakeKeyValueRow () is used to complete the Message data structure into HBase Rowkey.Required by the HQueue RowKey format can participate in the above content.
(3) the HQueue Client after complete RowKey transformation, will call HTable put method according to HBase standard written procedures to complete the written message.
(4) registered on the HQueue HQueueCoprocessor, it extends the
BaseRegionObserver.HRegion before actually writing the message data, will be called HQueueCoprocessor preBatchMutate method, this method is mainly used to adjust the MessageID, guarantee the MessageID only and orderly. (5) in HQueueCoprocessor preBatchMutate method at the same time will adjust the Durability as SKIP_WAL, such HBase will not take the initiative to the message data persistence into HLog.
(6) HRegion after writing the message data, will be called HQueueCoprocessor postBatchMutate method, this method is mainly to complete the message data persistence into HLog function.
3.4. HQueue Scan process
For the convenience of Scan data from the Queue, HQueue encapsulates ClientScanner, provides QueueScanner, PartitionScanner and
CombinedPartitionScanner Scanner, used in different scenarios.HQueue Scan for the specific process as shown in figure (4) :
Figure (4) : HQueue Scan process
(1) the user can according to need from the HQueue Client to get the required Queue Scanner, at present mainly provide three Scanner:
; QueueScanner: used to Scan all Partitions in the Queue of data;
; PartitionScanner: used to Scan the Queue specified in the Partition of the data;
; CombinedPartitionScanner: used to Scan the Queue at a specified number of data
(2) after the user access to the Scanner, which can invoke loop Scanner method in turn to take out the next message data, countless according to return, until the end of the Scan.After the Scan, users should take the initiative to close the Scanner in order to release resources in a timely manner.
(3) the user no longer USES the previously created Queue object, should take the initiative to close the Queue in order to release resources in a timely manner.
3.5. HQueue subscription process
3.5.1 track of the overall process
HQueue since version 0.3 provides subscription function, a subscriber can subscribe to a Queue multiple Partitions, switchable viewer.Using Scanner with users, compared to an active way of Scan message data subscription form has once (1) the message data is written to the Queue will be actively push to subscribers, more timely message delivered,(2) the subscriber passive to receive new messages, you can save when there is no new message data HQueue redundant Scan operation, reduce the system overhead, etc.
HQueue subscription process processing logic as shown in figure (5) :
Figure (5) : HQueue subscription process processing logic
(1) the HQueue subscription is mainly composed of the Subscriber, the ZooKeeper and Coprocessor of these three parts.Among them:
; Subscrier: subscribers.Mainly to write subscription information ZoeoKeeper,start
listening, receive new messages and callback registration on its message processing
function (MessageListener), and other functions.
; ZooKeeper: used to hold the subscriber subscription information submitted, including
subscriber subscription Queue, Partitions and switchable viewer;The subscriber
information such as address and Checkpoint, more detailed information, refer to
; Coprocessor: mainly to complete the subscription information was obtained from the
ZooKeeper, use InternalScanner Scan the latest message from the Queue and will be a
new message is sent to the subscriber and update the ZooKeeper current Checkpoint, and
(2) the Coprocessor of main process is as follows:
Step 1: create the Subscriber, add subscription information and message processing function, to write subscription information into the ZooKeeper,start to monitor waits
to receive new messages.Write the ZooKeeper subscription information mainly includes:
; The subscriber subscription Queue name;
; Subscriber subscription Queuee Partitions and initial ID message on each Partition.A
subscriber can subscribe to multiple Partitions, if not specified, then all the Partitions that
subscribe to the Queue.
; Subscribers subscribe to news switchable viewer.A subscriber can subscribe to multiple
Topics, if not specified, then think all switchable viewer to subscribe to the Queue.
; The subscriber Addresss/Hostname and listener ports.The user to create a subscriber can
specify the listener port, if not specified, then will randomly select a currently available
port as a listener port.
Step 2: Coprocessor subscription information was obtained from the ZooKeeper and registered associated with the ZooKeeper Watcher, so that they are in the subscription information change when they are able to timely notify the Coprocessor.Coprocessor after get to the subscription information, depending on the need to create SubscriptionWorker worker threads, so that from the HQueue Partition Scan the message and the message is sent to the Subscriber.
Step 3: Coprocessor from HQueue Partition Scan new messages.
Step 4: there will be a new message is sent to the Subscriber Coprocessor. Step 5: when the Subscriber receives the new message, will callback registered callback functions.
Step 6: after waiting for the new message is sent successfully, Coprocessor will update the ZooKeeper news Checkpoint for subsequent use.
Step 7: the Subscriber unsubscribe, and deleted from the ZooKeeper necessary subscription information.
Step 8: they will pass the registration on the Watcher shall notify to the Coprocessor, the change of the Subscriber subscription information according to the variation of subscription information Coprocessor, suspended SubscriptionWorker worker threads, etc.
3.5.2. HQueue Subscriber
HQueue Subscriber structure and the main processing logic as shown in figure (6) :
Figure (6) : HQueue Subscriber structure and the main processing logic
(1) the Subscriber is mainly composed of two parts: SubscriberZooKeeper and Thrift Server.SubscriberZooKeeper mainly complete ZooKeeper related of several operations, including writing subscription information, delete, subscription information, etc.Coprocessor and communications between the Subscriber done through Thrift, initiating Thrift in the Subscriber Server, monitor the specified port, waiting for receiving Coprocessor new messages sent to come over.
(2) the Subscriber through Thrift Server receives a new news, will register the callback on the callback function (MessageListeners), and the status code is returned to the Coprocessor.
(3) can register multiple MessageListeners, on the one Subscriber multiple MessageListeners will be called in turn.
3.5.3. HQueue Coprocessor
Structure and main processing logic HQueue Coprocessor as shown in figure (7) :
Figure (7) : HQueue Coprocessor structure and the main processing logic Among them:
(1) Coprocessor: mainly consists of two parts SubscriptionZooKeeper and
; SubscriptionZooKeeper: mainly complete the ZooKeeper related work, including
subscription information was obtained from the ZooKeeper and register the Watcher,
SubscriptionWorker update the ZooKeeper Checkpoint operation, etc.
; SubscriptionWorker mainly include MessageScanner and MessageSender two parts again,
mainly to complete the Scan new messages, sending a message to the Subscriber and
update the Checkpoint operation.
(2) create InternalScanner MessageScanner mainly completed, from the Queue in
the Partition Scan new messages, and put it in the buffer Queue secondary operation. ; When no free space in the buffer queue, MessageScanner will wait until the message in
the buffer queue is MessageSender consumed, make the remaining space.