aiven'blog

zaqar客户端操作(es)

1. 说明

本文描述了消息服务客户端操作,目前客户端命令全部带有es标签,由于需求对zaqar的改动较大,在不大范围修改原有client的代码前提下,独立出新的的命令行模块。

2. 功能模块简述

目前消息服务的客户端主要包括以下几大功能模块:

  • 队列管理
  • 主题管理
  • 订阅管理
  • 消息管理

所有客户端操作命令如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[root@node-145 ~(keystone_admin)]# openstack -h | grep es-
[--os-queues-api-version <queues-api-version>]
--os-queues-api-version <queues-api-version>
es-messaging message-consume Consume messages and return a list of consume messages (python-zaqarclient)
es-messaging message-delete Delete consume messages (python-zaqarclient)
es-messaging message-list List all messages for a given queue (python-zaqarclient)
es-messaging message-post Post messages for a given queue (python-zaqarclient)
es-messaging message-publish Publish messages for a given topic (python-zaqarclient)
es-messaging queue-create Create a queue (python-zaqarclient)
es-messaging queue-delete Delete a queue (python-zaqarclient)
es-messaging queue-list List available queues (python-zaqarclient)
es-messaging queue-purge Purge a queue (python-zaqarclient)
es-messaging queue-show Get queue monitor (python-zaqarclient)
es-messaging queue-update Set queue metadata (python-zaqarclient)
es-messaging subscription-create Create a subscription for topic (python-zaqarclient)
es-messaging subscription-delete Delete a subscription (python-zaqarclient)
es-messaging subscription-list List available subscriptions (python-zaqarclient)
es-messaging subscription-show Display subscription details (python-zaqarclient)
es-messaging subscription-update Update a subscription (python-zaqarclient)
es-messaging topic-create Create a topic (python-zaqarclient)
es-messaging topic-delete Delete a topic (python-zaqarclient)
es-messaging topic-list List available topics (python-zaqarclient)
es-messaging topic-show Get topic monitor (python-zaqarclient)
es-messaging topic-update Set topic metadata (python-zaqarclient)

3.命令操作

3.1 队列管理

队列管理主要是针对消息服务中队列资源的生命周期管理,主要功能包括:队列的创建,队列删除,队列查询,队列清空,队列修改,队列监控。

  • 队列创建

队列创建需要传递的参数主要是队列的名称,名称必须是项目里唯一,否则不会创建新的队列。命令为:

1
openstack es-messaging queue-create {queue_name}

举例说明:

1
2
3
4
5
6
[root@node-145 ~(keystone_admin)]# openstack es-messaging queue-create test
+-------+-------+
| Field | Value |
+-------+-------+
| Name | test |
+-------+-------+
  • 队列更新

根据队列的名称更新队列的元数据,可更新的内容包括,最大长度,生命周期,独占时间,延迟时间。

1
openstack es-messaging queue-update {queue_name} {queue_metadata}

输入参数说明:

queue_metadata: 元数据字典,可选。
_max_messages_post_size:最大长度,可选。
_default_message_ttl:生命周期,可选。
claim_ttl:独占时间,可选。
delay_ttl:延迟时间,可选。

举例说明:

1
[root@node-145 ~(keystone_admin)]# openstack es-messaging queue-update test '{"_max_messages_post_size": 233333}'
  • 队列列表

查询队列的列表。支持detail参数,表明获取详细列表。命令为:

1
openstack es-messaging queue-list --detail

输出说明:

Name: 队列名称。
Metadata_Dict:队列的元数据字典,包括生命周期,最大长度,延迟时间,独占时间。
Href:资源链接,暂时无意义。
Created_at:队列创建时间,utc时间。
Updated_at:队列的更新时间,utc时间,初始化和创建时间相同。

举例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@node-145 ~(keystone_admin)]# openstack es-messaging queue-list
+-------------+
| Name |
+-------------+
| test |
| test_queues |
+-------------+
[root@node-145 ~(keystone_admin)]# openstack es-messaging queue-list --detail
+-------------+---------------------------------------------------------------------------------------------------------+------------------------+----------------------+----------------------+
| Name | Metadata_Dict | Href | Created_at | Updated_at |
+-------------+---------------------------------------------------------------------------------------------------------+------------------------+----------------------+----------------------+
| test | {u'_max_messages_post_size': 65536, u'claim_ttl': 30, u'delay_ttl': 0, u'_default_message_ttl': 345600} | /v2/queues/test | 2017-10-16T03:32:49Z | 2017-10-16T03:32:49Z |
| test_queues | {u'_max_messages_post_size': 65536, u'claim_ttl': 30, u'delay_ttl': 0, u'_default_message_ttl': 345600} | /v2/queues/test_queues | 2017-10-16T03:32:54Z | 2017-10-16T03:32:54Z |
+-------------+---------------------------------------------------------------------------------------------------------+------------------------+----------------------+----------------------+
  • 队列查询

根据队列的名字查询队列的详细信息。该操作输出包括队列的监控信息。命令为:

1
openstack es-messaging queue-show {queue_name}

输出说明:

Name: 队列名称。
Metadata_Dict: 队列的元数据字典,包括生命周期,最大长度,延迟时间,独占时间。
msg_counts: 非批量发送消息个数。
msg_bytes: 非批量发送消息的大小。
bulk_msg_counts:批量发送消息的个数。
bulk_msg_bytes: 批量发送消息的大小。
consume_msg_counts: 消费消息的个数。
consume_msg_bytes: 消费消息的大小。
active_msgs: 活动消息个数。
inactive_msgs:非活动消息个数,指消费冷却时间中。
delayed_msgs: 处于延迟状态消息的个数。
deleted_msgs: 已删除的消息的个数。
Created_at: 队列创建时间。
Updated_at: 队列更新时间。

举例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@node-145 ~(keystone_admin)]# openstack es-messaging queue-show test
+--------------------+---------------------------------------------------------------------------------------------------------+
| Field | Value |
+--------------------+---------------------------------------------------------------------------------------------------------+
| Name | test |
| Metadata | {u'_max_messages_post_size': 65536, u'claim_ttl': 30, u'delay_ttl': 0, u'_default_message_ttl': 345600} |
| msg_counts | 0 |
| msg_bytes | 0.0 |
| bulk_msg_counts | 0 |
| bulk_msg_bytes | 0.0 |
| consume_msg_counts | 0 |
| consume_msg_bytes | 0.0 |
| active_msgs | 0 |
| inactive_msgs | 0 |
| delayed_msgs | 0 |
| deleted_msgs | 0 |
| Created_at | 2017-10-16T03:32:49Z |
| Updated_at | 2017-10-16T03:32:49Z |
+--------------------+---------------------------------------------------------------------------------------------------------+
  • 队列清空

根据队列的名称清空队列里所有消息。命令为:

1
openstack es-messaging queue-purge {queue_name}

举例说明:

1
[root@node-145 ~(keystone_admin)]# openstack es-messaging queue-purge test
  • 队列删除

根据队列名称删除队列,删除不存在的队列名也不会报错。命令为:

1
openstack es-messaging queue-delete {queue_name}

举例说明:

1
[root@node-145 ~(keystone_admin)]# openstack es-messaging queue-delete test

3.2 主题管理

主题管理主要是针对消息服务中主题资源的生命周期管理,主要功能包括:主题创建,主题删除,主题查询,主题修改,主题监控。

  • 主题创建

主题创建需要传递的参数主要是主题的名称,名称必须是项目里唯一,否则不会创建新的主题。命令为:

1
openstack es-messaging topic-create {topic_name}

举例说明:

1
2
3
4
5
6
[root@node-145 ~(keystone_admin)]# openstack es-messaging topic-create test
+-------+-------+
| Field | Value |
+-------+-------+
| Name | test |
+-------+-------+
  • 主题更新

根据主题的名称更新主题的元数据,可更新的内容包括,最大长度,生命周期。

1
openstack es-messaging topic-update {topic_name} {topic_metadata}

输入参数说明:

topic_metadata: 元数据字典,可选。
_max_messages_post_size:最大长度,可选。
_default_message_ttl:生命周期,可选。

举例说明:

1
[root@node-145 ~(keystone_admin)]# openstack es-messaging topic-update test '{"_max_messages_post_size": 233333}'
  • 主题列表

查询主题的列表。支持detail参数,表明获取详细列表。命令为:

1
openstack es-messaging topic-list --detail

输出说明:

Name: 主题名称。
Metadata_Dict:主题的元数据字典,包括生命周期,最大长度。
Href:资源链接,暂时无意义。
Created_at:主题创建时间,utc时间。
Updated_at:主题的更新时间,utc时间,初始化和创建时间相同。

举例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@node-145 ~(keystone_admin)]# openstack es-messaging topic-list
+-------+
| Name |
+-------+
| test |
| test1 |
+-------+
[root@node-145 ~(keystone_admin)]# openstack es-messaging topic-list --detail
+-------+---------------------------------------------------------------------+------------------+----------------------+----------------------+
| Name | Metadata_Dict | Href | Created_at | Updated_at |
+-------+---------------------------------------------------------------------+------------------+----------------------+----------------------+
| test | {u'_max_messages_post_size': 65536, u'_default_message_ttl': 86400} | /v2/topics/test | 2017-10-13T08:00:49Z | 2017-10-13T08:00:49Z |
| test1 | {u'_max_messages_post_size': 65536, u'_default_message_ttl': 86400} | /v2/topics/test1 | 2017-10-16T04:15:21Z | 2017-10-16T04:15:21Z |
+-------+---------------------------------------------------------------------+------------------+----------------------+----------------------+
  • 主题查询

根据主题的名字查询主题的详细信息。该操作输出包括主题的监控信息。命令为:

1
openstack es-messaging topic-show {topic_name}

输出说明:

Name: 主题名称。
Metadata_Dict:主题的元数据字典,包括生命周期,最大长度。
msg_counts:非批量发布消息个数。
msg_bytes:非批量发布消息的大小。
bulk_msg_counts:批量发布消息的个数。
bulk_msg_bytes:批量发布消息的大小。
sub_msg_counts: 成功发送订阅终端消息的个数。
sub_msg_bytes:成功发送订阅终端消息的大小。
total_sub_msg_counts: 总的发送订阅终端消息个数,包括失败的。
total_sub_msg_bytes: 总的发送订阅终端消息的大小,包括失败的。
Created_at: 主题创建时间。
Updated_at: 主题更新时间。

举例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[root@node-145 ~(keystone_admin)]# openstack es-messaging topic-show test
+----------------------+----------------------------------------------------------------------+
| Field | Value |
+----------------------+----------------------------------------------------------------------+
| Name | test |
| Metadata | {u'_max_messages_post_size': 233333, u'_default_message_ttl': 86400} |
| msg_counts | 0 |
| msg_bytes | 0.0 |
| bulk_msg_counts | 0 |
| bulk_msg_bytes | 0.0 |
| sub_msg_counts | 0 |
| sub_msg_bytes | 0.0 |
| total_sub_msg_counts | 0 |
| total_sub_msg_bytes | 0.0 |
| Created_at | 2017-10-13T08:00:49Z |
| Updated_at | 2017-10-16T04:16:54Z |
+----------------------+----------------------------------------------------------------------+
  • 主题删除

根据主题名称删除主题,删除不存在的主题名也不会报错。命令为:

1
openstack es-messaging topic-delete {topic_name}

举例说明:

1
[root@node-145 ~(keystone_admin)]# openstack es-messaging topic-delete test

3.3 订阅管理

订阅管理主要是消息服务中对主题的订阅进行管理,主题被订阅后,发到主题的消息会根据订阅的终端进行分发,目前终端支持webhookqueue两种类型。

  • 订阅创建

主题的订阅创建需要传递的参数主要是主题的名称,订阅终端,及options可选项。命令为:

1
openstack es-messaging subscription-create {topic_name} {subscriber} --options

输入参数说明:

subscriber: 订阅终端webhook:http://test.test.com,该调用方法为POSTbody为消息体。
      订阅终端queue:queue:{queue_name},将消息直接转存到该queue中,元数据与queue相同,必选。
options:订阅的元数据选项,该参数可以不选。如果不选默认push_policy=EXPONENTIAL_DECAY_RETRY
push_policy: 订阅的重试策略,在将消息发送给订阅终端的时候如果发送失败会进行重试,具体策略规则见设计文档,可选。
tags:订阅的过滤标签,标签支持多个,目前没有限制,过滤规则见设计文档,可选。

举例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[root@node-145 ~(keystone_admin)]# openstack es-messaging subscription-create test http://test.test.com
+------------+----------------------------------------------+
| Field | Value |
+------------+----------------------------------------------+
| ID | 59e4562424ed673f607dceb7 |
| Subscriber | http://test.test.com |
| Options | {u'push_policy': u'EXPONENTIAL_DECAY_RETRY'} |
+------------+----------------------------------------------+
[root@node-145 ~(keystone_admin)]# openstack es-messaging subscription-create test queue:test --options '{"push_policy" : "BACKOFF_RETRY", "tags" : [ "test", "test2" ]}'
+------------+------------------------------------------------------------------+
| Field | Value |
+------------+------------------------------------------------------------------+
| ID | 59e44e0024ed673f277dcec3 |
| Subscriber | queue:test |
| Options | {u'push_policy': u'BACKOFF_RETRY', u'tags': [u'test', u'test2']} |
+------------+------------------------------------------------------------------+

  • 订阅更新

根据订阅的ID更新订阅的属性,目前可更新的属性有重试策略和标签。

1
openstack es-messaging subscription-update {topic_name} {subscription_id} --options {options}

输入参数说明:

topic_name: 主题名称,必选。
subscription_id:订阅id,必选。
options: 需要修改的属性,可选。
push_policy: 订阅的重试策略,在将消息发送给订阅终端的时候如果发送失败会进行重试,具体策略规则见设计文档,可选。
tags:订阅的过滤标签,标签支持多个,目前没有限制,过滤规则见设计文档,可选。

举例说明:

1
2
3
4
5
6
7
[root@node-145 ~(keystone_admin)]# openstack es-messaging subscription-update test 59e4562424ed673f607dceb7 --options '{"push_policy" : "EXPONENTIAL_DECAY_RETRY", "tags" : [ "test", "test2" ]}'
+---------+----------------------------------------------------------------------------+
| Field | Value |
+---------+----------------------------------------------------------------------------+
| ID | 59e4562424ed673f607dceb7 |
| Options | {u'push_policy': u'EXPONENTIAL_DECAY_RETRY', u'tags': [u'test', u'test2']} |
+---------+----------------------------------------------------------------------------+
  • 订阅列表

查询主题订阅的列表。命令为:

1
openstack es-messaging subscription-list {topic_name}

输出说明:

ID: 订阅ID
Subscriber:订阅终端。
Confirmed:订阅确认,目前暂不提供,默认不确认。
Options:订阅属性。
push_policy: 订阅的重试策略。
tags:订阅的过滤标签,标签支持多个。

举例说明:

1
2
3
4
5
6
7
[root@node-145 ~(keystone_admin)]# openstack es-messaging subscription-list test
+--------------------------+----------------------+-----------+----------------------------------------------------------------------------+
| ID | Subscriber | Confirmed | Options |
+--------------------------+----------------------+-----------+----------------------------------------------------------------------------+
| 59e44e0024ed673f277dcec3 | queue:test | False | {u'push_policy': u'BACKOFF_RETRY', u'tags': [u'test', u'test2']} |
| 59e4562424ed673f607dceb7 | http://test.test.com | False | {u'push_policy': u'EXPONENTIAL_DECAY_RETRY', u'tags': [u'test', u'test2']} |
+--------------------------+----------------------+-----------+----------------------------------------------------------------------------+
  • 订阅查询

根据主题的名字及订阅id查询主题订阅的的信息。命令为:

1
openstack es-messaging subscription-show {topic_name} {subscription_id}

输出说明:

ID: 订阅ID
Subscriber:订阅终端。
Confirmed:订阅确认,目前暂不提供,默认不确认。
Options:订阅属性。
push_policy: 订阅的重试策略。
tags:订阅的过滤标签,标签支持多个。

举例说明:

1
2
3
4
5
6
7
8
9
[root@node-145 ~(keystone_admin)]# openstack es-messaging subscription-show test 59e44e0024ed673f277dcec3
+------------+------------------------------------------------------------------+
| Field | Value |
+------------+------------------------------------------------------------------+
| ID | 59e44e0024ed673f277dcec3 |
| Subscriber | queue:test |
| Confirmed | False |
| Options | {u'push_policy': u'BACKOFF_RETRY', u'tags': [u'test', u'test2']} |
+------------+------------------------------------------------------------------+
  • 订阅删除

根据主题名称及订阅ID删除订阅,删除不存在的订阅也不会报错。命令为:

1
openstack es-messaging subscription-delete {topic_name} {subscription_id}

举例说明:

1
[root@node-145 ~(keystone_admin)]# openstack es-messaging subscription-delete test 59e44e0024ed673f277dcec3

3.4 消息管理

消息管理主要是消息服务中对消息的发送,消费,订阅等操作的处理。

  • 向队列发送消息

向指定队列里发送消息,支持批量发送,必须指定client-id。命令为:

1
openstack es-messaging message-post {queue_name} {messages} --client-id {client-id}

输入参数说明:

queue_name: 发送消息的目标队列,必选。
messages:消息列表,格式为JSON,必选。
body: 消息体,必须存在。内容为JSON,必选。
ttl:消息的生命周期,可选项。
delay_ttl: 消息的延迟时间,可选项。
client-id:发送消息的客户端id。必选项。

举例说明:

1
[root@node-145 ~(keystone_admin)]# openstack es-messaging message-post test '[{"body":{"yzy":"test", "msg": "你好111111111111111"}, "ttl": 63, "delay_ttl": 30},{"body":{"yzy":"test", "msg": "你好222222"}, "ttl": 631, "delay_ttl": 32}]' --client-id 38b4aee5-0a05-47f1-a627-f7c4c3e24382
  • 队列中消息列表查询

根据队列的名称查询该队列中的所有消息。

1
openstack es-messaging message-list {queue_name} --client-id {client_id} --echo --include-delayed --include-claimed --limit 1

输入参数说明:

queue_name: 队列名称,必选。
client-id:获取该client-id向该队列发送的消息,必选。
echo: 如果为True则证明要获取该client的消息,否则获取除了该client的消息。默认为False,可选。
include-delayed: 返回结果是否包括处于延迟状态的消息,可选。
include-claimed:返回结果是否处于消费状态的消息,可选。
limit:返回结果个数限制,可选。

输出说明:

ID: 消息ID
TTL:消息生命周期。
Age: 消息存活时间。
Status: 消息当前状态,目前包括active(可消费),inactive(消费冷却中),delayed(延迟中)。
Status_End_Time:状态过渡到active的时间,只有inactivedelay时有效。

举例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@node-145 ~(keystone_admin)]# openstack es-messaging message-list test --client-id 38b4aee5-0a05-47f1-a627-f7c4c3e24382 --echo --include-delayed --include-claimed
+--------------------------+------+-----+---------+----------------------+
| ID | TTL | Age | Status | Status_End_Time |
+--------------------------+------+-----+---------+----------------------+
| 59e4721a24ed673f277dcec6 | 6300 | 3103| inactive| 2017-10-16T09:39:23Z |
| 59e4721a24ed673f277dcec7 | 6310 | 35 | active | -- |
| 59e4721d24ed673f277dcec8 | 6300 | 32 | active | -- |
| 59e4721d24ed673f277dcec9 | 6310 | 32 | active | -- |
| 59e4721f24ed673f277dceca | 6300 | 30 | active | -- |
| 59e4721f24ed673f277dcecb | 6310 | 30 | delayed | 2017-10-16T08:47:59Z |
| 59e4722224ed673f277dcecc | 6300 | 27 | delayed | 2017-10-16T08:48:00Z |
| 59e4722224ed673f277dcecd | 6310 | 27 | delayed | 2017-10-16T08:48:02Z |
+--------------------------+------+-----+---------+----------------------+
  • 从队列中消费消息

从指定队列中消费消息,支持批量消费消息,支持自动删除消费消息。命令为:

1
openstack es-messaging message-consume {queue_name} --limit {limit} --auto-delete {1/0}

输入参数说明:

queue_name: 队列名称,必选。
limit:消息消费的个数,可选。
auto-delete: 如果为1则消费到的消息会被自动删除,默认为不删除,可选。

输出说明:

Messages: 消费得到的消息体。
Handle:消费的handle
Message_ID:消息ID
TTL:消息的生命周期。
Age: 消息已经存活的时间。
Times:消息被消费过的次数。
Initial Time:消息的第一次消费时间,utc时间。
Next Time:消息下一次可以消费的时间,只要当前时间大于该时间,则可以消费,utc时间。
Created At:消息的创建时间,utc时间。

举例说明:

1
2
3
4
5
6
7
8
[root@node-145 ~(keystone_admin)]# openstack es-messaging message-consume test --limit 3 --auto-delete 1
+-----------------------------------------------------------+--------------------------+--------------------------+------+------+-------+----------------------+----------------------+----------------------+
| Messages | Handle | Message_ID | TTL | Age | Times | Initial Time | Next Time | Created At |
+-----------------------------------------------------------+--------------------------+--------------------------+------+------+-------+----------------------+----------------------+----------------------+
| {u'msg': u'\u4f60\u597d222222', u'yzy': u'test'} | 59e47b0524ed673f277dced1 | 59e4721a24ed673f277dcec7 | 6310 | 2284 | 1 | 2017-10-16T09:25:25Z | 2017-10-16T09:25:55Z | 2017-10-16T08:47:22Z |
| {u'msg': u'\u4f60\u597d111111111111111', u'yzy': u'test'} | 59e47b0524ed673f277dced2 | 59e4721d24ed673f277dcec8 | 6300 | 2281 | 1 | 2017-10-16T09:25:25Z | 2017-10-16T09:25:55Z | 2017-10-16T08:47:25Z |
| {u'msg': u'\u4f60\u597d222222', u'yzy': u'test'} | 59e47b0624ed673f277dced3 | 59e4721d24ed673f277dcec9 | 6310 | 2281 | 1 | 2017-10-16T09:25:25Z | 2017-10-16T09:25:55Z | 2017-10-16T08:47:25Z |
+-----------------------------------------------------------+--------------------------+--------------------------+------+------+-------+----------------------+----------------------+----------------------+
  • 删除消息

根据消费的handle删除该消息,只有消费过的消息才能被删除,并且该handle没有过期,支持批量删除。命令为:

1
openstack es-messaging message-delete {queue_name} {message_handles}

输入参数说明:

queue_name: 队列名称,必选。
message_handles:消息消费的handles,支持多个,多个用逗号隔开,必选。

输出说明:

Expired: 过期的消息handle列表,删除失败。
Invalid:错误的消息handle列表,删除失败。
Success:删除成功的消息handle列表。

举例说明:

1
2
3
4
5
6
7
8
[root@node-145 ~(keystone_admin)]# openstack es-messaging message-delete test 59e47e8424ed673f607dcebb,59e47e8424ed673f607dcebc,59e47e8424ed673f607dcebs
+---------+------------------------------------------------------------+
| Field | Value |
+---------+------------------------------------------------------------+
| Expired | [u'59e47e8424ed673f607dcebb'] |
| Invalid | [u'59e47e8424ed673f607dcebs'] |
| Success | [u'59e47e8424ed673f607dcebc'] |
+---------+------------------------------------------------------------+
  • 向主题发布消息

向指定主题发送消息,支持批量。如果有订阅者,则订阅者会接收到该消息,如果订阅终端为webhook,则要求存在该WEB-Service,否则无法成功订阅。命令为:

1
openstack es-messaging message-publish {topic_name} {messages} --client-id {client-id}

输入参数说明:

topic_name: 主题名称,必选。
messages:消息列表,格式为JSON,必选。
body: 消息体,必须存在。内容为JSON,必选。
tags:消息的过滤标签,可选项。
client-id:发送消息的客户端id。必选项。

举例说明:

1
[root@node-145 ~(keystone_admin)]# openstack es-messaging message-publish test '[{"body":{"yzy":"test"},"tags": ["test"]}]' --client-id 38b4aee5-0a05-47f1-a627-f7c4c3e24382

热评文章