Sử dụng embedded kafka trong test
kafka
5
Scala
50
White

Hoàng Minh Trung viết ngày 17/08/2015

Khi sử dụng một số hệ thống tính toán phân tán mà setup sẽ mất thời gian như kafka chẳng hạn, đôi khi bạn muốn kafka chỉ chạy trong test session rồi sau đó biến mất, không giữ thông tin gì cả.

Thực ra kafka có hỗ trợ việc này bằng việc cung cấp một thứ gọi là KafkaServer mà nó sẽ chạy trên single node.
Tuy nhiên để chạy kafka thì lại cần ZooKeeper , và rất may mắn chúng ta cũng có một thứ tương tự là ZooKeeperServer chạy trên single node.
Tận dụng 2 thứ này thì chúng ta sẽ chạy được Kafka và ZooKeeper on memory. Để sử dụng 2 thằng này thì mình có sample code dưới đây các bạn có thể tham khảo:

ZookeeperServer

class EmbeddedZookeeper(val connectString: String = ZookeeperConnectionString) {

  val snapshotDir = createTempDir

  val logDir = createTempDir

  val server = new ZooKeeperServer(snapshotDir, logDir, 500)

  val (ip, port) = {
    val splits = connectString.split(":")
    (splits(0), splits(1).toInt)
  }

  val factory = new NIOServerCnxnFactory()
  factory.configure(new InetSocketAddress(ip, port), 16)
  factory.startup(server)
  println(s"ZooKeeperServer isRunning: $isRunning")

  def isRunning: Boolean = Try(server.isRunning) getOrElse false

  def shutdown(): Unit = {
    println(s"Shutting down ZK NIOServerCnxnFactory.")
    factory.shutdown()
    deleteRecursively(snapshotDir)
    deleteRecursively(logDir)
  }
}

KafkaServer

import java.io.File
import java.util.Properties

import scala.util.Try
import scala.concurrent.duration.{Duration, _}
import kafka.admin.AdminUtils
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}

final class EmbeddedKafka(val kafkaParams: Map[String,String])  {

  def this(groupId: String) = this(Map(
    "zookeeper.connect" -> ZookeeperConnectionString,
    "group.id" -> groupId,
    "auto.offset.reset" -> "smallest"))

  def this() = this(s"consumer-${scala.util.Random.nextInt(10000)}")

  /** Starts the ZK server. */
  private val zookeeper = new EmbeddedZookeeper()
  awaitCond(zookeeper.isRunning, 2000.millis)

  val kafkaConfig: KafkaConfig = {
    import scala.collection.JavaConversions._
    val map = Map(
      "broker.id" -> "0",
      "host.name" -> "127.0.0.1",
      "port" -> "9092",
      "advertised.host.name" -> "127.0.0.1",
      "advertised.port" -> "9092",
      "log.dir" -> createTempDir.getAbsolutePath,
      "zookeeper.connect" -> ZookeeperConnectionString,
      "replica.high.watermark.checkpoint.interval.ms" -> "5000",
      "log.flush.interval.messages" -> "1",
      "replica.socket.timeout.ms" -> "500",
      "controlled.shutdown.enable" -> "false",
      "auto.leader.rebalance.enable" -> "false"
    )
    val props = new Properties()
    props.putAll(map)
    new KafkaConfig(props)
  }

  val server = new KafkaServer(kafkaConfig)
  Thread.sleep(2000)

  println(s"Starting the Kafka server at $ZookeeperConnectionString")
  server.startup()
  Thread.sleep(2000)

  val producerConfig: ProducerConfig = {
    val p = new Properties()
    p.put("metadata.broker.list", kafkaConfig.hostName + ":" + kafkaConfig.port)
    p.put("serializer.class", classOf[StringEncoder].getName)
    new ProducerConfig(p)
  }

  val producer = new Producer[String, String](producerConfig)

  def createTopic(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1) {
    AdminUtils.createTopic(server.zkClient, topic, numPartitions, replicationFactor)
    awaitPropagation(topic, 0, 2000.millis)
  }

  def produceAndSendMessage(topic: String, sent: Map[String, Int]): Unit = {
    producer.send(createTestMessage(topic, sent): _*)
  }

  private def createTestMessage(topic: String, send: Map[String, Int]): Seq[KeyedMessage[String, String]] =
    (for ((s, freq) <- send; i <- 0 until freq) yield new KeyedMessage[String, String](topic, s)).toSeq

  def awaitPropagation(topic: String, partition: Int, timeout: Duration): Unit =
    awaitCond(
      server.apis.metadataCache.getPartitionInfo(topic, partition)
        .exists(_.leaderIsrAndControllerEpoch.leaderAndIsr.leader >= 0),
      max = timeout,
      message = s"Partition [$topic, $partition] metadata not propagated after timeout"
    )

  def shutdown(): Unit = try {
    println(s"Shutting down Kafka server.")
    Option(producer).map(_.close())
    //https://issues.apache.org/jira/browse/KAFKA-1887
    Try(server.kafkaController.shutdown())
    Try(server.shutdown())
    server.awaitShutdown()
    server.config.logDirs.foreach(f => deleteRecursively(new File(f)))
    zookeeper.shutdown()
    awaitCond(!zookeeper.isRunning, 2000.millis)
    println(s"ZooKeeper server shut down.")
    Thread.sleep(2000)
  } catch { case e: java.io.IOException => }
}

Sử dụng 2 class này chúng ta có thể tạo kafka in memory một cách dễ dàng:

 val data = Seq(
    """{"user":"helena","commits":98, "month":3, "year":2015}""",
    """{"user":"jacek-lewandowski", "commits":72, "month":3, "year":2015}""",
    """{"user":"pkolaczk", "commits":42, "month":3, "year":2015}""")

  /* Kafka (embedded) setup */
  val kafka = new EmbeddedKafka
  kafka.createTopic("github")

  // simulate another process streaming data to Kafka
  val producer = new Producer[String, String](kafka.producerConfig)
  data.foreach (m => producer.send(new KeyedMessage[String, String]("github", "githubstats", m)))
  producer.close()
Bình luận


White
{{ comment.user.name }}
Bỏ hay Hay
{{comment.like_count}}
Male avatar
{{ comment_error }}
Hủy
   

Hiển thị thử

Chỉnh sửa

White

Hoàng Minh Trung

21 bài viết.
53 người follow
Kipalog
{{userFollowed ? 'Following' : 'Follow'}}
Cùng một tác giả
White
17 15
(Ảnh) Mục đích của bài viết là hướng dẫn cơ bản nhất cho những ai chưa biết về docker, môi trường thực hiện là mac OS. Chuẩn bị Cài đặt virtua...
Hoàng Minh Trung viết hơn 2 năm trước
17 15
White
16 1
Bài viết dịch từ http://arslan.io/tenusefultechniquesingo Sử dụng một GOPATH duy nhất Sử dụng đồng thời nhiều GOPATH sẽ không giúp cho hệ thống ...
Hoàng Minh Trung viết hơn 1 năm trước
16 1
White
13 0
Bài viết dịch từ https://github.com/luciotato/golangnotes/blob/master/OOP.md Mục đích bài viết Học golang dễ dàng hơn với những kiến thức bạn đ...
Hoàng Minh Trung viết hơn 1 năm trước
13 0
Bài viết liên quan
White
2 0
Bạn có một file log, bạn muốn đổ log vào một topic nào đó trên kafka cho một ai đó dùng. Một cách nhanh chóng có thể sử dụng producer console như s...
Quăng viết hơn 1 năm trước
2 0
White
3 0
Cài đặt Download kafka từ một trong các link dưới đây: https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.0/kafka_2.100.8.2.0.tgz Kafka là ...
Hoàng Minh Trung viết hơn 2 năm trước
3 0
{{like_count}}

kipalog

{{ comment_count }}

bình luận

{{liked ? "Đã kipalog" : "Kipalog"}}


White
{{userFollowed ? 'Following' : 'Follow'}}
21 bài viết.
53 người follow

 Đầu mục bài viết

Vẫn còn nữa! x

Kipalog vẫn còn rất nhiều bài viết hay và chủ đề thú vị chờ bạn khám phá!