聊聊rocketmq的ConsumeMode.ORDERLY

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊rocketmq的ConsumeMode.ORDERLY相关的知识,希望对你有一定的参考价值。

参考技术A 本文主要研究一下rocketmq的ConsumeMode.ORDERLY

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/annotation/ConsumeMode.java

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java

DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法遍历msgs,然后挨个调用rocketMQListener.onMessage(doConvertMessage(messageExt)),若循环中出现异常被捕获住了会执行context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis),然后返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

聊聊rocketmq-mysql的ColumnParser

本文主要研究一下rocketmq-mysql的ColumnParser

ColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java

public abstract class ColumnParser {

  public static ColumnParser getColumnParser(String dataType, String colType, String charset) {

      switch (dataType) {
          case "tinyint":
          case "smallint":
          case "mediumint":
          case "int":
              return new IntColumnParser(dataType, colType);
          case "bigint":
              return new BigIntColumnParser(colType);
          case "tinytext":
          case "text":
          case "mediumtext":
          case "longtext":
          case "varchar":
          case "char":
              return new StringColumnParser(charset);
          case "date":
          case "datetime":
          case "timestamp":
              return new DateTimeColumnParser();
          case "time":
              return new TimeColumnParser();
          case "year":
              return new YearColumnParser();
          case "enum":
              return new EnumColumnParser(colType);
          case "set":
              return new SetColumnParser(colType);
          default:
              return new DefaultColumnParser();
      }
  }

  public static String[] extractEnumValues(String colType) {
      String[] enumValues = {};
      Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType);
      if (matcher.matches()) {
          enumValues = matcher.group(2).replace("'", "").split(",");
      }

      return enumValues;
  }

  public abstract Object getValue(Object value);

}
  • ColumnParser定义了getValue抽象方法;它提供了getColumnParser方法用于根据dataType获取对应的ColumnParser实现类

IntColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java

public class IntColumnParser extends ColumnParser {

  private int bits;
  private boolean signed;

  public IntColumnParser(String dataType, String colType) {

      switch (dataType) {
          case "tinyint":
              bits = 8;
              break;
          case "smallint":
              bits = 16;
              break;
          case "mediumint":
              bits = 24;
              break;
          case "int":
              bits = 32;
      }

      this.signed = !colType.matches(".* unsigned$");
  }

  @Override
  public Object getValue(Object value) {

      if (value == null) {
          return null;
      }

      if (value instanceof Long) {
          return value;
      }

      if (value instanceof Integer) {
          Integer i = (Integer) value;
          if (signed || i > 0) {
              return i;
          } else {
              return (1L << bits) + i;
          }
      }

      return value;
  }
}
  • IntColumnParser解析tinyint、smallint、mediumint、int类型

BigIntColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java

public class BigIntColumnParser extends ColumnParser {

  private static BigInteger max = BigInteger.ONE.shiftLeft(64);

  private boolean signed;

  public BigIntColumnParser(String colType) {
      this.signed = !colType.matches(".* unsigned$");
  }

  @Override
  public Object getValue(Object value) {

      if (value == null) {
          return null;
      }

      if (value instanceof BigInteger) {
          return value;
      }

      Long l = (Long) value;
      if (!signed && l < 0) {
          return max.add(BigInteger.valueOf(l));
      } else {
          return l;
      }
  }
}
  • BigIntColumnParser解析bigint类型

StringColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java

public class StringColumnParser extends ColumnParser {

  private String charset;

  public StringColumnParser(String charset) {
      this.charset = charset.toLowerCase();
  }

  @Override
  public Object getValue(Object value) {

      if (value == null) {
          return null;
      }

      if (value instanceof String) {
          return value;
      }

      byte[] bytes = (byte[]) value;

      switch (charset) {
          case "utf8":
          case "utf8mb4":
              return new String(bytes, Charsets.UTF_8);
          case "latin1":
          case "ascii":
              return new String(bytes, Charsets.ISO_8859_1);
          case "ucs2":
              return new String(bytes, Charsets.UTF_16);
          default:
              return new String(bytes, Charsets.toCharset(charset));

      }
  }
}
  • StringColumnParser解析tinytext、text、mediumtext、longtext、varchar、char类型

DateTimeColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java

public class DateTimeColumnParser extends ColumnParser {

  private static SimpleDateFormat dateTimeFormat;
  private static SimpleDateFormat dateTimeUtcFormat;

  static {
      dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
  }

  @Override
  public Object getValue(Object value) {

      if (value == null) {
          return null;
      }

      if (value instanceof Timestamp) {
          return dateTimeFormat.format(value);
      }

      if (value instanceof Long) {
          return dateTimeUtcFormat.format(new Date((Long) value));
      }

      return value;
  }
}
  • DateTimeColumnParser解析date、datetime、timestamp类型

TimeColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java

public class TimeColumnParser extends ColumnParser {

  @Override
  public Object getValue(Object value) {

      if (value == null) {
          return null;
      }

      if (value instanceof Timestamp) {

          return new Time(((Timestamp) value).getTime());
      }

      return value;
  }
}
  • TimeColumnParser解析time类型

YearColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java

public class YearColumnParser extends ColumnParser {

  @Override
  public Object getValue(Object value) {

      if (value == null) {
          return null;
      }

      if (value instanceof Date) {
          Calendar calendar = Calendar.getInstance();
          calendar.setTime((Date) value);
          return calendar.get(Calendar.YEAR);
      }

      return value;
  }
}
  • YearColumnParser解析year类型

EnumColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java

public class EnumColumnParser extends ColumnParser {

  private String[] enumValues;

  public EnumColumnParser(String colType) {
      enumValues = extractEnumValues(colType);
  }

  @Override
  public Object getValue(Object value) {

      if (value == null) {
          return null;
      }

      if (value instanceof String) {
          return value;
      }

      Integer i = (Integer) value;
      if (i == 0) {
          return null;
      } else {
          return enumValues[i - 1];
      }
  }
}
  • EnumColumnParser解析enum类型

SetColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java

public class SetColumnParser extends ColumnParser {

  private String[] enumValues;

  public SetColumnParser(String colType) {
      enumValues = extractEnumValues(colType);
  }

  @Override
  public Object getValue(Object value) {
      if (value == null) {
          return null;
      }

      if (value instanceof String) {
          return value;
      }

      StringBuilder builder = new StringBuilder();
      long l = (Long) value;

      boolean needSplit = false;
      for (int i = 0; i < enumValues.length; i++) {
          if (((l >> i) & 1) == 1) {
              if (needSplit)
                  builder.append(",");

              builder.append(enumValues[i]);
              needSplit = true;
          }
      }

      return builder.toString();
  }
}
  • SetColumnParser解析set类型

DefaultColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java

public class DefaultColumnParser extends ColumnParser {

  @Override
  public Object getValue(Object value) {

      if (value == null) {
          return null;
      }

      if (value instanceof byte[]) {
          return Base64.encodeBase64String((byte[]) value);
      }

      return value;
  }
}
  • DefaultColumnParser通过base64将byte数组转为string

小结

ColumnParser定义了getValue抽象方法;它提供了getColumnParser方法用于根据dataType获取对应的ColumnParser实现类

doc

  • ColumnParser


以上是关于聊聊rocketmq的ConsumeMode.ORDERLY的主要内容,如果未能解决你的问题,请参考以下文章

聊聊rocketmq的RemotingTooMuchRequestException

3万字聊聊什么是RocketMQ

3万字聊聊什么是RocketMQ

聊聊rocketmq的retryTimesWhenSendFailed

聊聊rocketmq的ListenerContainerConfiguration

聊聊rocketmq的sendBatchMessage