ruby sync_offsets.rb

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ruby sync_offsets.rb相关的知识,希望对你有一定的参考价值。

####
#  Description:a ruby script to sync consumers offsets with brokers offsets.
#  Requirements: zookeeper
#                sudo gem install zookeeper
#
#####
require 'rubygems'
require 'zookeeper'
require 'socket'

class Partition
  attr_accessor :broker_id,:partition

  def initialize(broker_id, partition)
    @broker_id = broker_id
    @partition = partition
  end

  def to_s
    return "#{@broker_id}-#{@partition}"
  end

  def inspect
    return to_s
  end

  def  <=>(other)
    if @broker_id != other.broker_id
      return @broker_id <=> other.broker_id
    else
      return @partition <=> other.partition
    end
  end
end


class MetaQOffsetsSyncer

  def initialize(zk_servers, metaq_server="localhost:8123", zk_root="/avos-fetch-meta", topic="avos-fetch-tasks", group="avos-fetcher")
    @zk = Zookeeper.new(zk_servers)
    @metaq_server = metaq_server
    @topic = topic
    @zk_root = zk_root
    @group = group
    @broker_topic_path = "#{@zk_root}/brokers/topics"
    @consumer_offset_path = "#{@zk_root}/consumers/#{@group}/offsets/#{@topic}"
  end

  def run
    broker_id = get_broker_id
    parts = get_parts(broker_id)
    count = 0
    parts.each do |part|
      offset_in_zk =  get_zk_offset(part)
      offset_in_broker = get_broker_offset(part)
      if offset_in_zk != offset_in_broker
        set_zk_offset(part, offset_in_broker)
        puts "Synced #{part} offset"
        count = count + 1
      end
    end
    return count
  end

  def set_zk_offset(part, new_offset)
    @zk.set({ :path=> "#{@consumer_offset_path}/#{part}", :data => "0-#{new_offset}"})
  end

  def get_zk_offset(part)
    _,offset= safe_zk_get("#{@consumer_offset_path}/#{part}")[:data].split "-"
    return offset.to_i
  end

  def get_broker_offset(part)
    #line: avos-fetch-tasks part 6 min_offset 0 max_offset 72492782
    lines = stats_broker("offsets")
    pat = Regexp.new("#{@topic} part #{part.partition} min_offset [0-9]+ max_offset ([0-9]+)\r\n")
    lines.each do  |line|
      return $1.to_i if line =~ pat
    end
  end

  def safe_zk_get(path, count=0)
    begin
      return @zk.get({ :path => path })
    rescue Exception => e
      if count >= 3
        raise e
      else
        safe_zk_close()
        @zk = Zookeeper.new(@zk_servers)
        return safe_zk_get(path, count.succ)
      end
    end
  end

  def get_parts(broker_id)
    n_parts = safe_zk_get("#{@broker_topic_path}/#{@topic}/#{broker_id}-m")[:data].to_i
    (0..n_parts-1).collect do | n |
      Partition.new(broker_id, n)
    end
  end

  def get_broker_id
    lines = stats_broker
    lines.each do |line|
      if line =~ /broker_id (\d+)\r\n/
        return $1.to_i
      end
    end
  end

  def stats_broker(item="")
    host,port = @metaq_server.split ":"
    socket = TCPSocket.open(host,port.to_i)
    socket.write "stats #{item}\r\n"
    socket.flush
    lines = []
    line = socket.readline
    while line != "END\r\n"
      line = socket.readline
      lines << line
    end
    socket.close
    return lines
  end

end

if __FILE__ == $0
  begin
    brokers = ["localhost:8123"]
    brokers.each do  | broker|
      puts "Begin to sync broker '#{broker}' offsets"
      syncer = MetaQOffsetsSyncer.new("zk:2181",broker,zk_root="/avos-fetch-meta",topic="avos-connector-tasks",group="avos-importor")
      puts "Synced count:#{syncer.run}"
    end
  rescue => e
    puts "#{e.backtrace.join('\n')} #{e.message}"
  end
end

以上是关于ruby sync_offsets.rb的主要内容,如果未能解决你的问题,请参考以下文章

Ruby运算符

Ruby 25 岁了!Ruby 之父说 Ruby 3 有望 3 倍提速

如何学习ruby?Ruby学习技巧分享

ruby Ruby脚本,看看是否用openssl编译了ruby

什么是ruby?

ruby和ruby ee