同一微服务连接多套RocketMQ集群

需要实现的通讯消息架构

通讯消息连接架构

分析

: 以下分析均在rocketmq4.0.0-incubating源码上进行

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 从上一句可以看出:MQClientManager为Producer连接管理器,用户管理连接MQ的TCP客户端的连接

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {
mQClientFactory.start();
}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
// 从上一句可以看出:找到对应的客户端通讯实例是通过一个clientId在factoryTable内存缓存中进行查询的

if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}

return instance;
}

org.apache.rocketmq.client.ClientConfig#buildMQClientId

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());

sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}

return sb.toString();
}
// 从以上方法可以看出:这个clientId信息中包含当前微服务的IP,当前模块实例名(默认通过changeInstanceNameToPID更改为进程ID值)和一个unitName

结论: 经以上源码可以得到一个模块需要连接多个RocketMQ集群,则需要生产多个MQClientInstance,换言之,则需要在获取MQClientInstance时传递不同的ClientID即可。
由于__ClientID=localIP + instanceName + unitName__,所以只需要创建Producer对象时传入不同的instanceName或unitName值即可。


观点仅代表自己,期待你的留言。

Java Script Engine

Java Script Engine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public static void main(String[] agrs) throws ScriptException {
final ScriptEngine javascriptEngine = new ScriptEngineManager().getEngineByName("javascript");
final Bindings globalBindings = javascriptEngine.getBindings(ScriptContext.GLOBAL_SCOPE);
globalBindings.put("a", 5);
System.out.println("-------Engine bindings scope--------------");
final Bindings javascriptEngineBindings = javascriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
javascriptEngineBindings.put("x", 20);
javascriptEngineBindings.put("y", 20.4);
javascriptEngineBindings.put("z", 1);
final String[] scriptArray = {"x*y+z", "x*(y+z)", "a+x*(y+z)"};
eval(scriptArray, javascriptEngine);

System.out.println("-------Local bindings scope--------------");
final Bindings localBinding = javascriptEngine.createBindings();
localBinding.put("x", 2);
localBinding.put("y", 3);
localBinding.put("z", 1);
eval(scriptArray, javascriptEngine, localBinding);
}

private static void eval(String[] scriptArray, ScriptEngine javascriptEngine) throws ScriptException {
Bindings aBindings = javascriptEngine.getBindings(ScriptContext.GLOBAL_SCOPE);
for (String key : aBindings.keySet()) {
System.out.println("Args (Global bindings scope) > " + key + "=" + aBindings.get(key));
}
aBindings = javascriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
for (String key : aBindings.keySet()) {
System.out.println("Args (Engine bindings scope) > " + key + "=" + aBindings.get(key));
}
for (String script : scriptArray) {
System.out.println("script > " + script + " = " + javascriptEngine.eval(script));
}
}

private static void eval(String[] scriptArray, ScriptEngine javascriptEngine, Bindings localBinding) throws ScriptException {
Bindings aBindings = javascriptEngine.getBindings(ScriptContext.GLOBAL_SCOPE);
for (String key : aBindings.keySet()) {
System.out.println("Args (Global bindings scope) > " + key + "=" + aBindings.get(key));
}
aBindings = javascriptEngine.getBindings(ScriptContext.ENGINE_SCOPE);
for (String key : aBindings.keySet()) {
System.out.println("Args (Engine bindings scope) > " + key + "=" + aBindings.get(key));
}
for (String s : localBinding.keySet()) {
System.out.println("Args (Local bindings scope) > " + s + "=" + aBindings.get(s));
}
for (String script : scriptArray) {
System.out.println("script > " + script + " = " + javascriptEngine.eval(script, localBinding));
}
}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
-------Engine bindings scope--------------
Args (Global bindings scope) > a=5
Args (Engine bindings scope) > z=1
Args (Engine bindings scope) > y=20.4
Args (Engine bindings scope) > x=20
script > x*y+z = 409.0
script > x*(y+z) = 428.0
script > a+x*(y+z) = 433.0
-------Local bindings scope--------------
Args (Global bindings scope) > a=5
Args (Engine bindings scope) > println=sun.org.mozilla.javascript.internal.InterpretedFunction@45a23f67
Args (Engine bindings scope) > context=javax.script.SimpleScriptContext@1ef0a6e8
Args (Engine bindings scope) > z=1
Args (Engine bindings scope) > print=sun.org.mozilla.javascript.internal.InterpretedFunction@495dd936
Args (Engine bindings scope) > y=20.4
Args (Engine bindings scope) > x=20
Args (Local bindings scope) > z=1
Args (Local bindings scope) > y=20.4
Args (Local bindings scope) > x=20
script > x*y+z = 7.0
script > x*(y+z) = 8.0
script > a+x*(y+z) = 13.0

Process finished with exit code 0

Bindings变量的有效范围

  1. Global对应到ScriptEngineFactory,通过scriptEngine.getBindings(ScriptContext.GLOBAL_SCOPE)获得。
  2. Engine对应到ScriptEngine,通过scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE)获得。
  3. Local Binding每一次执行script,通过scriptEngine.createBindings()获得。

使用场景适用于

  1. 规则引擎
  2. 流程流转条件判定

观点仅代表自己,期待你的留言。

redis进阶

redis主从(读写分离)

问题:

  • redis为单进程程序,只能占用单核,无法充分使用多核的系统资源。
  • 单节点redis在出现故障时则无法继续提供数据存储服务,无法达到高可用要求。
    解决方法:
  • 增加redis slave节点通过replicas进行master-slave的数据复制,故障时手动恢复。
  • 通过增加sentinel(哨兵)监控redis master-slave节点的存活状态,当master出现故障时自动将slave升级为master继续提供服务。

data sharding(数据分片)

问题:
单台redis节点的内存总量有限,达到上限后想要扩容除了增加内存别无它法
解决方法:

  • redis client (部署多个独立的redis节点,通过在客户端代码中针对key进行hash,然后将数据按hash映射存储到不同的redis中)
  • Twemproxy (部署多个独立的redis节点,通过引入twitter开源中间件封装key的hash操作,最终将数据存储到不同的redis中)

redis cluster集群

问题:
单台redis节点的内存容量有限,达到上限后想要扩容除了增加内存别无它法
解决方法:

  • 将所有的redis节点的内存总容量划分为n个哈希槽(hash slot)(默认为16384个),每一个redis节点负责一段solt,存取数据时通过CRC16算法对key进行取余来决定应该从哪一个redis节点进行存取操作。
    redis节点两两连接共同形成一个集群,客户端代码连接集群中任意节点进行存取服务,集群中的节点会通过CRC16算法来将存取请求转发到目标redis节点完成数据的存取。
  • 通过在集群内挑选一部分节点设置为slave节点,通过master-slave构建高可用redis服务

RocketMQ使用指南及参数详解

一、使用指南

  • 客户端寻址方式

在代码中指定NameServer地址

1
Producer.setNamesrvAddr(“192.168.8.106:9876”);

1
Consumer.setNamesrvAddr(“192.168.8.106:9876”);

Java启动参数中指定NameServer地址

1
-Drocketmq.namesrv.addr=192.168.8.106:9876

环境变量指定NameServer地址·

1
export NAMESRV_ADDR=192.168.8.106:9876
  • http静态服务器寻址

客户端启动后,会定时访问一个静态的HTTP服务器,地址如下:

http://jmenv.tbsite.net:8080/rocketmq/msaddr

这个URL的返回内容如下:

192.168.8.106:9876

客户端默认每隔2分钟访问一次这个HTTP服务器,并更新本地的NameServer地址。URL已经在代码中写死,可通过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增加如下配置:

10.232.22.67 jmenv.taobao.net

二、参数详解

  • 客户端的公共配置类:ClientConfig
参数名 默认值 说明
NamesrvAddr NameServer地址列表,多个nameServer地址用分号隔开
clientIP 本机IP 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定
instanceName DEFAULT 客户端实例名称,客户端创建的多个Producer,Consumer实际是共用一个内部实例(这个实例包含网络连接,线程资源等)
clientCallbackExecutorThreads 4 通信层异步回调线程数
pollNameServerInteval 30000 轮训Name Server 间隔时间,单位毫秒
heartbeatBrokerInterval 30000 向Broker发送心跳间隔时间,单位毫秒
persistConsumerOffsetInterval 5000 持久化Consumer消费进度间隔时间,单位毫秒
  • Producer配置
参数名 默认值 说明
producerGroup DEFAULT_PRODUCER Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。
createTopicKey TBW102 在发送消息时,自动创建服务器不存在的topic,需要指定key
defaultTopicQueueNums 4 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
sendMsgTimeout 10000 发送消息超时时间,单位毫秒
compressMsgBodyOverHowmuch 4096 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
retryAnotherBrokerWhenNotStoreOK FALSE 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送
maxMessageSize 131072 客户端限制的消息大小,超过报错,同时服务端也会限制(默认128K)
transactionCheckListener 事物消息回查监听器,如果发送事务消息,必须设置
checkThreadPoolMinSize 1 Broker回查Producer事务状态时,线程池大小
checkThreadPoolMaxSize 1 Broker回查Producer事务状态时,线程池大小
checkRequestHoldMax 2000 Broker回查Producer事务状态时,Producer本地缓冲请求队列大小
  • PushConsumer配置
参数名 默认值 说明
consumerGroup DEFAULT_CONSUMER Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应将它们归为同一组
messageModel CLUSTERING 消息模型,支持以下两种1.集群消费2.广播消费
consumeFromWhere CONSUME_FROM_LAST_OFFSET Consumer启动后,默认从什么位置开始消费
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法实现策略
Subscription {} 订阅关系
messageListener 消息监听器
offsetStore 消费进度存储
consumeThreadMin 10 消费线程池数量
consumeThreadMax 20 消费线程池数量
consumeConcurrentlyMaxSpan 2000 单队列并行消费允许的最大跨度
pullThresholdForQueue 1000 拉消息本地队列缓存消息最大数
Pullinterval 0 拉消息间隔,由于是长轮询,所以为0,但是如果应用了流控,也可以设置大于0的值,单位毫秒
consumeMessageBatchMaxSize 1 批量消费,一次消费多少条消息
pullBatchSize 32 批量拉消息,一次最多拉多少条
  • PullConsumer配置
参数名 默认值 说明
consumerGroup Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
brokerSuspendMaxTimeMillis 20000 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒
consumerPullTimeoutMillis 10000 非长轮询,拉消息超时时间,单位毫秒
consumerTimeoutMillisWhenSuspend 30000 长轮询,Consumer拉消息请求咋broker挂起超过指定时间,客户端认为超时,单位毫秒
messageModel BROADCASTING 消息模型,支持以下两种:1集群消费 2广播模式
messageQueueListener 监听队列变化
offsetStore 消费进度存储
registerTopics 注册的topic集合
allocateMessageQueueStrategy Rebalance算法实现策略
  • Broker配置参数
    查看Broker默认配置
1
sh mqbroker -m
参数名 默认值 说明
consumerGroup Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
listenPort 10911 Broker对外服务的监听端口
namesrvAddr Null Name Server地址
brokerIP1 本机IP 本机IP地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况下可以人工配置。
brokerName 本机主机名
brokerClusterName DefaultCluster Broker所属哪个集群
brokerId 0 BrokerId,必须是大等于0的整数,0表示Master,>0表示Slave,一个Master可以挂多个Slave,Master和Slave通过BrokerName来配对
storePathCommitLog $HOME/store/commitlog commitLog存储路径
storePathConsumeQueue $HOME/store/consumequeue 消费队列存储路径
storePathIndex $HOME/store/index 消息索引存储队列
deleteWhen 4 删除时间时间点,默认凌晨4点
fileReservedTime 48 文件保留时间,默认48小时
maxTransferBytesOnMessageInMemory 262144 单次pull消息(内存)传输的最大字节数
maxTransferCountOnMessageInMemory 32 单次pull消息(内存)传输的最大条数
maxTransferBytesOnMessageInDisk 65535 单次pull消息(磁盘)传输的最大字节数
maxTransferCountOnMessageInDisk 8 单次pull消息(磁盘)传输的最大条数
messageIndexEnable TRUE 是否开启消息索引功能
messageIndexSafe FALSE 是否提供安全的消息索引机制,索引保证不丢
brokerRole ASYNC_MASTER Broker的角色 -ASYNC_MASTER异步复制Master -SYNC_MASTER同步双写Master -SLAVE
flushDiskType ASYNC_FLUSH 刷盘方式 -ASYNC_FLUSH异步刷盘 -SYNC_FLUSH同步刷盘
cleanFileForciblyEnable TRUE 磁盘满,且无过期文件情况下TRUE表示强制删除文件,优先保证服务可用 FALSE标记服务不可用,文件不删除

转自:https://www.cnblogs.com/xiaodf/p/5075167.html

观点仅代表自己,期待你的留言。

RocketMQ各组件特性和HA集群方案

一、简介

  • RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

  • 能够保证严格的消息顺序

  • 提供丰富的消息拉取模式

  • 高效的订阅者水平扩展能力

  • 实时的消息订阅机制

  • 亿级消息堆积能力

二、组件特性

1. nameserver

相对来说,nameserver的稳定性非常高。原因有二:

  • nameserver互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用。

  • nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。

2. broker

与nameserver关系
连接:

单个broker和所有nameserver保持长连接

心跳:

心跳间隔:每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。

心跳超时:nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。

断开:

时机:broker挂掉;心跳超时导致nameserver主动关闭连接

动作:一旦连接断开,nameserver会立即感知,更新topc与队列的对应关系,但不会通知生产者和消费者

负载均衡:

  1. 一个topic分布在多个broker上,一个broker可以配置多个topic,它们是多对多的关系。

  2. 如果某个topic消息量很大,应该给它多配置几个队列,并且尽量多分布在不同broker上,减轻某个broker的压力。

  3. topic消息量都比较均匀的情况下,如果某个broker上的队列越多,则该broker压力越大。

可用性:
由于消息分布在各个broker上,一旦某个broker宕机,则该broker上的消息读写都会受到影响。所以rocketmq提供了master/slave的结构,salve定时从master同步数据,如果master宕机,则slave提供消费服务,但是不能写入消息,此过程对应用透明,由rocketmq内部解决。

这里有两个关键点:

  1. 一旦某个broker master宕机,生产者和消费者多久才能发现?受限于rocketmq的网络连接机制,默认情况下,最多需要30秒,但这个时间可由应用设定参数来缩短时间。这个时间段内,发往该broker的消息都是失败的,而且该broker的消息无法消费,因为此时消费者不知道该broker已经挂掉。

  2. 消费者得到master宕机通知后,转向slave消费,但是slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢的,一旦master恢复,未同步过去的消息会被消费掉。

可靠性:

  1. 所有发往broker的消息,有同步刷盘和异步刷盘机制,总的来说,可靠性非常高

  2. 同步刷盘时,消息写入物理文件才会返回成功,因此非常可靠

  3. 异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电

消息清理:

  • 扫描间隔

默认10秒,由broker配置参数cleanResourceInterval决定

  • 空间阈值

物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接受消息,broker打印出日志,消息发送失败,阈值为固定值85%

  • 清理时机

默认每天凌晨4点,由broker配置参数deleteWhen决定;或者磁盘空间达到阈值

  • 文件保留时长

默认72小时,由broker配置参数fileReservedTime决定

读写性能:

  1. 文件内存映射方式操作文件,避免read/write系统调用和实时文件读写,性能非常高

  2. 永远一个文件在写,其他文件在读

  3. 顺序写,随机读

  4. 利用linux的sendfile机制,将消息内容直接输出到sokect管道,避免系统调用

系统特性:

  1. 大内存,内存越大性能越高,否则系统swap会成为性能瓶颈

  2. IO密集

  3. cpu load高,使用率低,因为cpu占用后,大部分时间在IO WAIT

  4. 磁盘可靠性要求高,为了兼顾安全和性能,采用RAID10阵列

  5. 磁盘读取速度要求快,要求高转速大容量磁盘

3. 消费者

与nameserver关系
连接:

单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。

心跳:

与nameserver没有心跳

轮询时间:

默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由DefaultMQPushConsumer的pollNameServerInteval参数决定,可手动配置。

与broker关系

连接:

单个消费者和该消费者关联的所有broker保持长连接。

心跳:

默认情况下,消费者每隔30秒向所有broker发送心跳,该时间由DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费

断开:

时机:消费者挂掉;心跳超时导致broker主动关闭连接

动作:一旦连接断开,broker会立即感知到,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费

负载均衡:
集群消费模式下,一个消费者集群多台机器共同消费一个topic的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

消费机制:

  • 本地队列
    消费者不间断的从broker拉取消息,消息拉取到本地队列,然后本地消费线程消费本地消息队列,只是一个异步过程,拉取线程不会等待本地消费线程,这种模式实时性非常高。对消费者对本地队列有一个保护,因此本地消息队列不能无限大,否则可能会占用大量内存,本地队列大小由DefaultMQPushConsumer的pullThresholdForQueue属性控制,默认1000,可手动设置。

  • 轮询间隔

消息拉取线程每隔多久拉取一次?间隔时间由DefaultMQPushConsumer的pullInterval属性控制,默认为0,可手动设置。

  • 消息消费数量

监听器每次接受本地队列的消息是多少条?这个参数由DefaultMQPushConsumer的consumeMessageBatchMaxSize属性控制,默认为1,可手动设置。

消费进度存储:
每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumer的persistConsumerOffsetInterval属性控制,默认为5秒,可手动设置。

如果一个topic在某broker上有3个队列,一个消费者消费这3个队列,那么该消费者和这个broker有几个连接?
一个连接,消费单位与队列相关,消费连接只跟broker相关,事实上,消费者将所有队列的消息拉取任务放到本地的队列,挨个拉取,拉取完毕后,又将拉取任务放到队尾,然后执行下一个拉取任务

4. 生产者

与nameserver关系
连接:

单个生产者者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。

轮询时间:

默认情况下,生产者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败。该时间由DefaultMQProducer的pollNameServerInteval参数决定,可手动配置。

心跳:

与nameserver没有心跳

与broker关系
连接:

单个生产者和该生产者关联的所有broker保持长连接。

心跳:

默认情况下,生产者每隔30秒向所有broker发送心跳,该时间由DefaultMQProducer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接。

连接断开:

移除broker上的生产者信息

负载均衡:
生产者时间没有关系,每个生产者向队列轮流发送消息

三、集群方案

  1. 单个 Master
    这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。

  2. 多 Master 模式
    一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master

    优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。

    缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。

先启动 NameServer,例如机器 IP 为:172.16.8.106:9876

1
nohup sh mqnamesrv &

在机器 A,启动第一个 Master

1
nohup sh mqbroker -n 172.16.8.106:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &

在机器 B,启动第二个 Master

1
nohup sh mqbroker -n 172.16.8.106:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
  1. 多 Master 多 Slave 模式,异步复制
    每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。

    优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。

    缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。

先启动 NameServer,例如机器 IP 为:172.16.8.106:9876

1
nohup sh mqnamesrv &

在机器 A,启动第一个 Master

1
nohup sh mqbroker -n 172.16.8.106:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &

在机器 B,启动第二个 Master

1
nohup sh mqbroker -n 172.16.8.106:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &

在机器 C,启动第一个 Slave

1
nohup sh mqbroker -n 172.16.8.106:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &

在机器 D,启动第二个 Slave

1
nohup sh mqbroker -n 172.16.8.106:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
  1. 多 Master 多 Slave 模式,同步双写
    每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用同步双写方式,主备都写成功,向应用返回成功。

    优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高

    缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。

先启动 NameServer,例如机器 IP 为:172.16.8.106:9876

1
nohup sh mqnamesrv &

在机器 A,启动第一个 Master

1
nohup sh mqbroker -n 172.16.8.106:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &

在机器 B,启动第二个 Master

1
nohup sh mqbroker -n 172.16.8.106:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &

在机器 C,启动第一个 Slave

1
nohup sh mqbroker -n 172.16.8.106:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &

在机器 D,启动第二个 Slave

1
nohup sh mqbroker -n 172.16.8.106:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

以上 Broker 与 Slave 配对是通过指定相同的brokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave的BrokerId 必须是大与 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。


转自:https://www.cnblogs.com/xiaodf/p/5075167.html

观点仅代表自己,期待你的留言。

分布式事务最终一致性解决方案-DTCS

简介

DTCS(Data Transfer Consistency Service):数据一致性传输服务,通过MQ+一致性保障所实现的数据一致性传输服务。为全方面保障数据传输的一致性,增加运维人员人工介入纠正未及时实现数据一致性的处理方式。

业务调用时序

业务调用时序

后续

表决心: 此方案后续坚持实现后,必将开源于Github,届时请同行不吝指正。


观点仅代表自己,期待你的留言。

ubuntu下thinkpad电池阀设置(待验证)

设置电池阀

sudo add-apt-repository ppa:linrunner/tlp
sudo apt-get update

sudo apt-get install tlp tlp-rdw
sudo apt-get install tp-smapi-dkms acpi-call-tools

sudo gedit /etc/default/tlp

Main battery (values in %)

START_CHARGE_THRESH_BAT0=10
STOP_CHARGE_THRESH_BAT0=96

之后执行

sudo tlp setcharge

重启 OK 电池阀设置完成了

查看电池阀设置

tone@ubuntu:~$ sudo tlp-stat –battery
— TLP 0.4 ——————————————–

+++ ThinkPad Extended Battery Functions
tp-smapi = active
tpacpi-bat = active

+++ ThinkPad Battery Status (Main)
/sys/devices/platform/smapi/BAT0/manufacturer = LGC
/sys/devices/platform/smapi/BAT0/model = 42T4865
/sys/devices/platform/smapi/BAT0/manufacture_date = 2011-11-24
/sys/devices/platform/smapi/BAT0/first_use_date = 2012-04-13
/sys/devices/platform/smapi/BAT0/cycle_count = 63
/sys/devices/platform/smapi/BAT0/design_capacity = 62160 [mWh]
/sys/devices/platform/smapi/BAT0/last_full_capacity = 59470 [mWh]
/sys/devices/platform/smapi/BAT0/remaining_capacity = 42410 [mWh]
/sys/devices/platform/smapi/BAT0/remaining_percent = 71 [%]
/sys/devices/platform/smapi/BAT0/remaining_running_time_now = 189 [min]
/sys/devices/platform/smapi/BAT0/remaining_charging_time = not_charging [min]
/sys/devices/platform/smapi/BAT0/power_now = -13432 [mW]
/sys/devices/platform/smapi/BAT0/power_avg = -12742 [mW]

tpacpi-bat.BAT0.startThreshold = 10 [%]
tpacpi-bat.BAT0.stopThreshold = 96 [%]
tpacpi-bat.BAT0.forceDischarge = 0


观点仅代表自己,期待你的留言。

Linux下通过nginx实现直播间功能的实验

实验环境

  • 系统环境
    1
    2
    wujianjun@wujianjun-work ~ $ uname -a
    Linux wujianjun-work 4.10.0-37-generic #41~16.04.1-Ubuntu SMP Fri Oct 6 22:42:59 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
  • 软件环境
    OBS(Open Broadcaster Software) v20.0.1 (Linux)

nginx version: nginx/1.13.6
built by gcc 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.5)
built with OpenSSL 1.0.2g 1 Mar 2016
TLS SNI support enabled
configure arguments: –with-pcre=pcre-8.38 –add-module=nginx-rtmp-module-1.1.11

Nginx+obs安装及配置

安装obs

1
2
3
4
5
wujianjun@wujianjun-work ~ $ sudo add-apt-repository ppa:kirillshkrogalev/ffmpeg-next
wujianjun@wujianjun-work ~ $ sudo apt-get update && sudo apt-get install ffmpeg
wujianjun@wujianjun-work ~ $ sudo apt-get install obs-studio
wujianjun@wujianjun-work ~ $ sudo add-apt-repository ppa:obsproject/obs-studio
wujianjun@wujianjun-work ~ $ sudo apt-get update && sudo apt-get install obs-studio

nginx加装rtmp模块

nginx-rtmp-module (https://github.com/arut/nginx-rtmp-module)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
wujianjun@wujianjun-work ~ $ sudo apt-get install build-essential
wujianjun@wujianjun-work ~ $ wget wget http://nginx.org/download/nginx-1.13.6.tar.gz
wujianjun@wujianjun-work ~/nginx-1.13.6 $ wget https://github.com/arut/nginx-rtmp-module/archive/v1.1.11.tar.gz
wujianjun@wujianjun-work ~/nginx-1.13.6 $ tar -xvf v1.1.11.tar.gz
wujianjun@wujianjun-work ~/nginx-1.13.6 $ wget http://jaist.dl.sourceforge.net/project/pcre/pcre/8.38/pcre-8.38.tar.gz
wujianjun@wujianjun-work ~/nginx-1.13.6 $ tar -xvf pcre-8.38.tar.gz
wujianjun@wujianjun-work ~/nginx-1.13.6 $ ls -all
总用量 748
drwxr-xr-x 9 wujianjun wujianjun 4096 10月 15 11:39 .
drwxr-xr-x 63 wujianjun wujianjun 4096 10月 15 11:33 ..
drwxr-xr-x 6 wujianjun wujianjun 4096 10月 15 11:33 auto
-rw-r--r-- 1 wujianjun wujianjun 282456 10月 10 23:22 CHANGES
-rw-r--r-- 1 wujianjun wujianjun 430416 10月 10 23:22 CHANGES.ru
drwxr-xr-x 2 wujianjun wujianjun 4096 10月 15 11:33 conf
-rwxr-xr-x 1 wujianjun wujianjun 2502 10月 10 23:22 configure
drwxr-xr-x 4 wujianjun wujianjun 4096 10月 15 11:33 contrib
drwxr-xr-x 2 wujianjun wujianjun 4096 10月 15 11:33 html
-rw-r--r-- 1 wujianjun wujianjun 1397 10月 10 23:22 LICENSE
drwxr-xr-x 2 wujianjun wujianjun 4096 10月 15 11:33 man
drwxrwxr-x 6 wujianjun wujianjun 4096 2月 13 2017 nginx-rtmp-module-1.1.11
drwxr-xr-x 7 wujianjun wujianjun 4096 11月 23 2015 pcre-8.38
-rw-r--r-- 1 wujianjun wujianjun 49 10月 10 23:22 README
drwxr-xr-x 9 wujianjun wujianjun 4096 10月 15 11:33 src
wujianjun@wujianjun-work ~/nginx-1.13.6 $ ./configure --with-pcre=pcre-8.38 --add-module=nginx-rtmp-module-1.1.11
wujianjun@wujianjun-work ~/nginx-1.13.6 $ make && sudo make install
wujianjun@wujianjun-work ~/nginx-1.13.6 $ ls -all /usr/local/nginx/
总用量 24
drwxr-xr-x 6 root root 4096 10月 15 16:11 .
drwxr-xr-x 11 root root 4096 10月 15 16:11 ..
drwxr-xr-x 2 root root 4096 10月 15 16:11 conf
drwxr-xr-x 2 root root 4096 10月 15 16:11 html
drwxr-xr-x 2 root root 4096 10月 15 16:11 logs
drwxr-xr-x 2 root root 4096 10月 15 16:11 sbin

增加rtmp协议配置

1
wujianjun@wujianjun-work ~/nginx-1.13.6 $ sudo vi /usr/local/nginx/conf/nginx.conf

在nginx.conf文件末尾增加以下rtmp协议的配置

1
2
3
4
5
6
7
8
9
10
11
rtmp {
server {
listen 1935;
chunk_size 4096;

application live {
live on;
record off;
}
}
}

启动&测试

  • 启动nginx
    1
    wujianjun@wujianjun-work ~/nginx-1.13.6 $ sudo /usr/local/nginx/sbin/nginx
  • 启动OBS
    打开刚安装的OBS软件,在来源处配置图像的推送来源(我这里选择窗口捕获),点击右下角”设置”,进行如下图配置流推送地址

配置完成后,点击”开始推流”

  • 启动支持网络流播放的视频播放器(演示使用vlc播放器)
    配置网络流播放的地址,如下图:

当点击”播放”后,稍等几秒,即可看到播放器显示了obs捕获的图像了。

由于视频流需要通过网络进行传输,所以直播图像会有几秒的延迟。

http访问直播视频

1、更改nginx.conf中配置,增加hls配置(hls是在流媒体服务器中用来存放流媒体的文件夹),再次hls所在目录设置为http协议访问目录即可,更改后的配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
rtmp {
server {
listen 1935;
chunk_size 4096;

application live {
live on;
hls on;
hls_path /usr/share/nginx/html/hls;
hls_fragment 5s;
}
}
}

http {
server {
listen 80;
.....
location / {
#root html;
root /usr/share/nginx/html;
index index.html index.htm;
}
.....
}
}

注意: hls所在目录nginx的用户必须有写入权限。

2、obs软件配置录制流名称
在配置obs推送流URL的下方有一个设置”流名称”的地方,这里可以随意填写一个名称(我这里示例填入”test”)

3、重启一下nginx与obs软件,我们即可在手机浏览器中输入 http://ip/hls/test.m3u8 即可通过手机播放直播视频。(直播延迟有点大,后续出文章优化)


观点仅代表自己,期待你的留言。

分布式下数据事务

一、概念

1、CAP定律

Consistency(一致性): 数据一致更新,针对集群内所有节点数据变动都是同步完成。
Availability(可用性): 在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。
Partition tolerance(分区容错性): 以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。

It states, that though its desirable to have Consistency, High-Availability and Partition-tolerance in every system, unfortunately no system can achieve all three at the same time.
在分布式系统的设计中,没有一种设计可以同时满足一致性,可用性,分区容错性 3个特性

CAP定制

2、ACID模型

Atomicity(原子性): 事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。
Consistency(一致性): 事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态的含义是数据库中的数据应满足完整性约束。
Isolation(隔离性): 多个事务并发执行时,一个事务的执行不应影响其他事务的执行。
Durability(持久性): 一个事务一旦提交,他对数据库的修改应该永久保存在数据库中。

3、BASE模型(反ACID模型)

Basically Available(基本可用): 基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性。
Soft state(软状态): 软状态和硬状态相对,是指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,数据更改时允许集群的不同节点有一段时间进行数据状态同步。
Eventually consistent(最终一致性): 事务处理过程中,会有短暂不一致的情况,但通过恢复系统,可以让事务的数据达到最终一致的目标。

二、强一致性解决方案

1、2PC (二阶段提交)

算法思路: 参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。
XA协议: XA是一个分布式事务协议,由Tuxedo提出。XA中大致分为两部分:事务管理器(Transaction Manager)和本地资源管理器(Local Resource Manager)。其中本地资源管理器往往由数据库实现,比如Oracle、DB2这些商业数据库都实现了XA接口,而事务管理器作为全局的调度者,负责各个本地资源的提交和回滚。

X/Open DTP模型

二阶段:

  1. 准备阶段: 事务协调者(事务管理器)给每个参与者(资源管理器)发送Prepare消息,每个参与者要么直接返回失败(如权限验证失败),要么在本地执行事务,写本地的redo和undo日志,但不提交本地事务。
  2. 提交阶段: 如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据协调者的指令执行提交或者回滚操作,释放所有事务处理过程中使用的锁资源。(注意:必须在最后阶段释放锁资源)

缺点:

  1. 同步阻塞问题。执行过程中,所有参与节点都是事务阻塞型的。当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态。
  2. 单点故障。由于协调者的重要性,一旦协调者发生故障。参与者会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)
  3. 数据不一致。在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了commit请求。而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据部一致性的现象。
  4. 二阶段无法解决的问题:协调者再发出commit消息之后宕机,而唯一接收到这条消息的参与者同时也宕机了。那么即使协调者通过选举协议产生了新的协调者,这条事务的状态也是不确定的,没人知道事务是否被已经提交。

2、3PC (三阶段提交)

与两阶段提交不同的是,三阶段(CanCommit、PreCommit、DoCommit)提交有两个改动点:

  1. 引入超时机制。同时在协调者和参与者中都引入超时机制。
  2. 在第一阶段和第二阶段中插入一个准备阶段。保证了在最后提交阶段之前各参与节点的状态是一致的。

CanCommit阶段: 3PC的CanCommit阶段其实和2PC的准备阶段很像。协调者向参与者发送commit请求,参与者如果可以提交就返回Yes响应,否则返回No响应。
PreCommit阶段: 协调者根据参与者的反应情况来决定是否可以继续事务的PreCommit操作。如果参与者执行完本地事务操作则返回YES,假如有任何一个参与者向协调者发送了No响应,或者等待超时之后,协调者都没有接到参与者的响应,那么就执行事务的中断。
doCommit阶段: 该阶段由协调者通知参与者的PreCommit阶段反馈进行判断,最终决定是真正的事务提交,还是执行事务回滚。

缺点:
在doCommit阶段,如果参与者无法及时接收到来自协调者的doCommit或者rebort请求时,会在等待超时之后,会继续进行事务的提交。(其实这个应该是基于概率来决定的,当进入第三阶段时,说明参与者在第二阶段已经收到了PreCommit请求,那么协调者产生PreCommit请求的前提条件是他在第二阶段开始之前,收到所有参与者的CanCommit响应都是Yes。(一旦参与者收到了PreCommit,意味他知道大家其实都同意修改了)所以,一句话概括就是,当进入第三阶段时,由于网络超时等原因,虽然参与者没有收到commit或者abort响应,但是他有理由相信:成功提交的几率很大。 )

3、Paxos算法

待补充

三、最终一致性解决方案

1、本地消息表

设计思想: 是将远程分布式事务拆分成一系列的本地事务。借助关系型数据库中的表即可实现。
集群节点同步: 定时扫描本地消息表,将需要同步状态的消息产生MQ消息通过向时效性高的MQ放入消息,再由其它集群节点消费该消息完成通知,通过消息消费状态表来避免MQ消息的重复消费。

2、RocketMQ(事务消息)

以网传处理流程为例进行说明

说明:
1、当第3步confirmB失败时,则交由RMQ定时调用checkTransaction进行处理结果的检测,如果为失败,则rollback,否则发送消息到B账户。
2、当第5步consumeB失败时,则交由RMQ定时进行重试,但需要设置最大重试次数,如果达到最大次数依然失败,则需要人工介入进行修正。

所以RMQ需要一个人工修正的控制台,当系统通过重试无法进行修正时以人工做为做终的修正手段来做保障最终事务一致性。


观点仅代表自己,期待你的留言。

Cron表达式说明

CronTrigger

CronTriggers往往比SimpleTrigger更有用,如果您需要基于日历的概念,而非SimpleTrigger完全指定的时间间隔,复发的发射工作的时间表。
CronTrigger,你可以指定触发的时间表如“每星期五中午”,或“每个工作日9:30时”,甚至“每5分钟一班9:00和10:00逢星期一上午,星期三星期五“。
即便如此,SimpleTrigger一样,CronTrigger拥有的startTime指定的时间表时生效,指定的时间表时,应停止(可选)结束时间。

Cron Expressions

cron的表达式被用来配置CronTrigger实例。 cron的表达式是字符串,实际上是由七子表达式,描述个别细节的时间表。
这些子表达式是分开的空白,代表:

  1.    Seconds
    
  2.    Minutes
    
  3.    Hours
    
  4.    Day-of-Month
    
  5.    Month
    
  6.    Day-of-Week
    
  7.    Year (可选字段)
    
    例 “0 0 12 ? * WED” 在每星期三下午12:00 执行,

个别子表达式可以包含范围, 例如,在前面的例子里(“WED”)可以替换成 “MON-FRI”, “MON, WED, FRI”甚至”MON-WED,SAT”.

“* ” 代表整个时间段.

每一个字段都有一套可以指定有效值,如

Seconds (秒) :可以用数字0-59 表示,

Minutes(分) :可以用数字0-59 表示,

Hours(时) :可以用数字0-23表示,

Day-of-Month(天) :可以用数字1-31 中的任一一个值,但要注意一些特别的月份

Month(月) :可以用0-11 或用字符串 “JAN, FEB, MAR, APR, MAY, JUN, JUL, AUG, SEP, OCT, NOV and DEC” 表示

Day-of-Week(每周):可以用数字1-7表示(1 = 星期日)或用字符口串“SUN, MON, TUE, WED, THU, FRI and SAT”表示

“/”:为特别单位,表示为“每”如“0/15”表示每隔15分钟执行一次,“0”表示为从“0”分开始, “3/20”表示表示每隔20分钟执行一次,“3”表示从第3分钟开始执行

“?”:表示每月的某一天,或第周的某一天

“L”:用于每月,或每周,表示为每月的最后一天,或每个月的最后星期几如“6L”表示“每月的最后一个星期五”

“W”:表示为最近工作日,如“15W”放在每月(day-of-month)字段上表示为“到本月15日最近的工作日”

““#”:是用来指定“的”每月第n个工作日,例 在每周(day-of-week)这个字段中内容为”6#3” or “FRI#3” 则表示“每月第三个星期五”

1)Cron表达式的格式:秒 分 时 日 月 周 年(可选)。

字段名 允许的值 允许的特殊字符
0-59 , - * /
0-59 , - * /
小时 0-23 , - * /
1-31 , - * ? / L W C
1-12 or JAN-DEC , - * /
周几 1-7 or SUN-SAT , - * ? / L C #
年 (可选字段) empty, 1970-2099 , - * /
 “?”字符:表示不确定的值

 “,”字符:指定数个值

 “-”字符:指定一个值的范围

 “/”字符:指定一个值的增加幅度。n/m表示从n开始,每次增加m

 “L”字符:用在日表示一个月中的最后一天,用在周表示该月最后一个星期X

 “W”字符:指定离给定日期最近的工作日(周一到周五)

 “#”字符:表示该月第几个周X。6#3表示该月第3个周五

2)Cron表达式范例:

 每隔5秒执行一次:*/5 * * * * ?

 每隔1分钟执行一次:0 */1 * * * ?

 每天23点执行一次:0 0 23 * * ?

 每天凌晨1点执行一次:0 0 1 * * ?

 每月1号凌晨1点执行一次:0 0 1 1 * ?

 每月最后一天23点执行一次:0 0 23 L * ?

 每周星期天凌晨1点实行一次:0 0 1 ? * L

 在26分、29分、33分执行一次:0 26,29,33 * * * ?

 每天的0点、13点、18点、21点都执行一次:0 0 0,13,18,21 * * ?

转载自:http://www.cnblogs.com/maybo/p/5189617.html
观点仅代表自己,期待你的留言。