聊聊rocketmq的ConsumeMode.ORDERLY
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊rocketmq的ConsumeMode.ORDERLY相关的知识,希望对你有一定的参考价值。
参考技术A 本文主要研究一下rocketmq的ConsumeMode.ORDERLYrocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/annotation/ConsumeMode.java
rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法遍历msgs,然后挨个调用rocketMQListener.onMessage(doConvertMessage(messageExt)),若循环中出现异常被捕获住了会执行context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis),然后返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
聊聊rocketmq-mysql的ColumnParser
序
本文主要研究一下rocketmq-mysql的ColumnParser
ColumnParser
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java
public abstract class ColumnParser {
public static ColumnParser getColumnParser(String dataType, String colType, String charset) {
switch (dataType) {
case "tinyint":
case "smallint":
case "mediumint":
case "int":
return new IntColumnParser(dataType, colType);
case "bigint":
return new BigIntColumnParser(colType);
case "tinytext":
case "text":
case "mediumtext":
case "longtext":
case "varchar":
case "char":
return new StringColumnParser(charset);
case "date":
case "datetime":
case "timestamp":
return new DateTimeColumnParser();
case "time":
return new TimeColumnParser();
case "year":
return new YearColumnParser();
case "enum":
return new EnumColumnParser(colType);
case "set":
return new SetColumnParser(colType);
default:
return new DefaultColumnParser();
}
}
public static String[] extractEnumValues(String colType) {
String[] enumValues = {};
Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType);
if (matcher.matches()) {
enumValues = matcher.group(2).replace("'", "").split(",");
}
return enumValues;
}
public abstract Object getValue(Object value);
}
ColumnParser定义了getValue抽象方法;它提供了getColumnParser方法用于根据dataType获取对应的ColumnParser实现类
IntColumnParser
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java
public class IntColumnParser extends ColumnParser {
private int bits;
private boolean signed;
public IntColumnParser(String dataType, String colType) {
switch (dataType) {
case "tinyint":
bits = 8;
break;
case "smallint":
bits = 16;
break;
case "mediumint":
bits = 24;
break;
case "int":
bits = 32;
}
this.signed = !colType.matches(".* unsigned$");
}
@Override
public Object getValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof Long) {
return value;
}
if (value instanceof Integer) {
Integer i = (Integer) value;
if (signed || i > 0) {
return i;
} else {
return (1L << bits) + i;
}
}
return value;
}
}
IntColumnParser解析tinyint、smallint、mediumint、int类型
BigIntColumnParser
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java
public class BigIntColumnParser extends ColumnParser {
private static BigInteger max = BigInteger.ONE.shiftLeft(64);
private boolean signed;
public BigIntColumnParser(String colType) {
this.signed = !colType.matches(".* unsigned$");
}
@Override
public Object getValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof BigInteger) {
return value;
}
Long l = (Long) value;
if (!signed && l < 0) {
return max.add(BigInteger.valueOf(l));
} else {
return l;
}
}
}
BigIntColumnParser解析bigint类型
StringColumnParser
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java
public class StringColumnParser extends ColumnParser {
private String charset;
public StringColumnParser(String charset) {
this.charset = charset.toLowerCase();
}
@Override
public Object getValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof String) {
return value;
}
byte[] bytes = (byte[]) value;
switch (charset) {
case "utf8":
case "utf8mb4":
return new String(bytes, Charsets.UTF_8);
case "latin1":
case "ascii":
return new String(bytes, Charsets.ISO_8859_1);
case "ucs2":
return new String(bytes, Charsets.UTF_16);
default:
return new String(bytes, Charsets.toCharset(charset));
}
}
}
StringColumnParser解析tinytext、text、mediumtext、longtext、varchar、char类型
DateTimeColumnParser
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
public class DateTimeColumnParser extends ColumnParser {
private static SimpleDateFormat dateTimeFormat;
private static SimpleDateFormat dateTimeUtcFormat;
static {
dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
}
@Override
public Object getValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof Timestamp) {
return dateTimeFormat.format(value);
}
if (value instanceof Long) {
return dateTimeUtcFormat.format(new Date((Long) value));
}
return value;
}
}
DateTimeColumnParser解析date、datetime、timestamp类型
TimeColumnParser
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java
public class TimeColumnParser extends ColumnParser {
@Override
public Object getValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof Timestamp) {
return new Time(((Timestamp) value).getTime());
}
return value;
}
}
TimeColumnParser解析time类型
YearColumnParser
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java
public class YearColumnParser extends ColumnParser {
@Override
public Object getValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof Date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime((Date) value);
return calendar.get(Calendar.YEAR);
}
return value;
}
}
YearColumnParser解析year类型
EnumColumnParser
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java
public class EnumColumnParser extends ColumnParser {
private String[] enumValues;
public EnumColumnParser(String colType) {
enumValues = extractEnumValues(colType);
}
@Override
public Object getValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof String) {
return value;
}
Integer i = (Integer) value;
if (i == 0) {
return null;
} else {
return enumValues[i - 1];
}
}
}
EnumColumnParser解析enum类型
SetColumnParser
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java
public class SetColumnParser extends ColumnParser {
private String[] enumValues;
public SetColumnParser(String colType) {
enumValues = extractEnumValues(colType);
}
@Override
public Object getValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof String) {
return value;
}
StringBuilder builder = new StringBuilder();
long l = (Long) value;
boolean needSplit = false;
for (int i = 0; i < enumValues.length; i++) {
if (((l >> i) & 1) == 1) {
if (needSplit)
builder.append(",");
builder.append(enumValues[i]);
needSplit = true;
}
}
return builder.toString();
}
}
SetColumnParser解析set类型
DefaultColumnParser
rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java
public class DefaultColumnParser extends ColumnParser {
@Override
public Object getValue(Object value) {
if (value == null) {
return null;
}
if (value instanceof byte[]) {
return Base64.encodeBase64String((byte[]) value);
}
return value;
}
}
DefaultColumnParser通过base64将byte数组转为string
小结
ColumnParser定义了getValue抽象方法;它提供了getColumnParser方法用于根据dataType获取对应的ColumnParser实现类
doc
ColumnParser
以上是关于聊聊rocketmq的ConsumeMode.ORDERLY的主要内容,如果未能解决你的问题,请参考以下文章
聊聊rocketmq的RemotingTooMuchRequestException
聊聊rocketmq的retryTimesWhenSendFailed