Sử dụng embedded kafka trong test
kafka
5
Scala
50
White

Hoàng Minh Trung viết ngày 17/08/2015

Khi sử dụng một số hệ thống tính toán phân tán mà setup sẽ mất thời gian như kafka chẳng hạn, đôi khi bạn muốn kafka chỉ chạy trong test session rồi sau đó biến mất, không giữ thông tin gì cả.

Thực ra kafka có hỗ trợ việc này bằng việc cung cấp một thứ gọi là KafkaServer mà nó sẽ chạy trên single node.
Tuy nhiên để chạy kafka thì lại cần ZooKeeper , và rất may mắn chúng ta cũng có một thứ tương tự là ZooKeeperServer chạy trên single node.
Tận dụng 2 thứ này thì chúng ta sẽ chạy được Kafka và ZooKeeper on memory. Để sử dụng 2 thằng này thì mình có sample code dưới đây các bạn có thể tham khảo:

ZookeeperServer

class EmbeddedZookeeper(val connectString: String = ZookeeperConnectionString) {

  val snapshotDir = createTempDir

  val logDir = createTempDir

  val server = new ZooKeeperServer(snapshotDir, logDir, 500)

  val (ip, port) = {
    val splits = connectString.split(":")
    (splits(0), splits(1).toInt)
  }

  val factory = new NIOServerCnxnFactory()
  factory.configure(new InetSocketAddress(ip, port), 16)
  factory.startup(server)
  println(s"ZooKeeperServer isRunning: $isRunning")

  def isRunning: Boolean = Try(server.isRunning) getOrElse false

  def shutdown(): Unit = {
    println(s"Shutting down ZK NIOServerCnxnFactory.")
    factory.shutdown()
    deleteRecursively(snapshotDir)
    deleteRecursively(logDir)
  }
}

KafkaServer

import java.io.File
import java.util.Properties

import scala.util.Try
import scala.concurrent.duration.{Duration, _}
import kafka.admin.AdminUtils
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}

final class EmbeddedKafka(val kafkaParams: Map[String,String])  {

  def this(groupId: String) = this(Map(
    "zookeeper.connect" -> ZookeeperConnectionString,
    "group.id" -> groupId,
    "auto.offset.reset" -> "smallest"))

  def this() = this(s"consumer-${scala.util.Random.nextInt(10000)}")

  /** Starts the ZK server. */
  private val zookeeper = new EmbeddedZookeeper()
  awaitCond(zookeeper.isRunning, 2000.millis)

  val kafkaConfig: KafkaConfig = {
    import scala.collection.JavaConversions._
    val map = Map(
      "broker.id" -> "0",
      "host.name" -> "127.0.0.1",
      "port" -> "9092",
      "advertised.host.name" -> "127.0.0.1",
      "advertised.port" -> "9092",
      "log.dir" -> createTempDir.getAbsolutePath,
      "zookeeper.connect" -> ZookeeperConnectionString,
      "replica.high.watermark.checkpoint.interval.ms" -> "5000",
      "log.flush.interval.messages" -> "1",
      "replica.socket.timeout.ms" -> "500",
      "controlled.shutdown.enable" -> "false",
      "auto.leader.rebalance.enable" -> "false"
    )
    val props = new Properties()
    props.putAll(map)
    new KafkaConfig(props)
  }

  val server = new KafkaServer(kafkaConfig)
  Thread.sleep(2000)

  println(s"Starting the Kafka server at $ZookeeperConnectionString")
  server.startup()
  Thread.sleep(2000)

  val producerConfig: ProducerConfig = {
    val p = new Properties()
    p.put("metadata.broker.list", kafkaConfig.hostName + ":" + kafkaConfig.port)
    p.put("serializer.class", classOf[StringEncoder].getName)
    new ProducerConfig(p)
  }

  val producer = new Producer[String, String](producerConfig)

  def createTopic(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1) {
    AdminUtils.createTopic(server.zkClient, topic, numPartitions, replicationFactor)
    awaitPropagation(topic, 0, 2000.millis)
  }

  def produceAndSendMessage(topic: String, sent: Map[String, Int]): Unit = {
    producer.send(createTestMessage(topic, sent): _*)
  }

  private def createTestMessage(topic: String, send: Map[String, Int]): Seq[KeyedMessage[String, String]] =
    (for ((s, freq) <- send; i <- 0 until freq) yield new KeyedMessage[String, String](topic, s)).toSeq

  def awaitPropagation(topic: String, partition: Int, timeout: Duration): Unit =
    awaitCond(
      server.apis.metadataCache.getPartitionInfo(topic, partition)
        .exists(_.leaderIsrAndControllerEpoch.leaderAndIsr.leader >= 0),
      max = timeout,
      message = s"Partition [$topic, $partition] metadata not propagated after timeout"
    )

  def shutdown(): Unit = try {
    println(s"Shutting down Kafka server.")
    Option(producer).map(_.close())
    //https://issues.apache.org/jira/browse/KAFKA-1887
    Try(server.kafkaController.shutdown())
    Try(server.shutdown())
    server.awaitShutdown()
    server.config.logDirs.foreach(f => deleteRecursively(new File(f)))
    zookeeper.shutdown()
    awaitCond(!zookeeper.isRunning, 2000.millis)
    println(s"ZooKeeper server shut down.")
    Thread.sleep(2000)
  } catch { case e: java.io.IOException => }
}

Sử dụng 2 class này chúng ta có thể tạo kafka in memory một cách dễ dàng:

 val data = Seq(
    """{"user":"helena","commits":98, "month":3, "year":2015}""",
    """{"user":"jacek-lewandowski", "commits":72, "month":3, "year":2015}""",
    """{"user":"pkolaczk", "commits":42, "month":3, "year":2015}""")

  /* Kafka (embedded) setup */
  val kafka = new EmbeddedKafka
  kafka.createTopic("github")

  // simulate another process streaming data to Kafka
  val producer = new Producer[String, String](kafka.producerConfig)
  data.foreach (m => producer.send(new KeyedMessage[String, String]("github", "githubstats", m)))
  producer.close()
Bình luận


White
{{ comment.user.name }}
Bỏ hay Hay
{{comment.like_count}}
Male avatar
{{ comment_error }}
Hủy
   

Hiển thị thử

Chỉnh sửa

White

Hoàng Minh Trung

23 bài viết.
65 người follow
Kipalog
{{userFollowed ? 'Following' : 'Follow'}}
Cùng một tác giả
White
22 1
Bài viết dịch từ http://arslan.io/tenusefultechniquesingo Sử dụng một GOPATH duy nhất Sử dụng đồng thời nhiều GOPATH sẽ không giúp cho hệ thống ...
Hoàng Minh Trung viết hơn 2 năm trước
22 1
White
19 15
(Ảnh) Mục đích của bài viết là hướng dẫn cơ bản nhất cho những ai chưa biết về docker, môi trường thực hiện là mac OS. Chuẩn bị Cài đặt virtua...
Hoàng Minh Trung viết hơn 3 năm trước
19 15
White
17 0
Bài viết dịch từ https://github.com/luciotato/golangnotes/blob/master/OOP.md Mục đích bài viết Học golang dễ dàng hơn với những kiến thức bạn đ...
Hoàng Minh Trung viết hơn 2 năm trước
17 0
Bài viết liên quan
White
3 0
Bạn có một file log, bạn muốn đổ log vào một topic nào đó trên kafka cho một ai đó dùng. Một cách nhanh chóng có thể sử dụng producer console như s...
Quăng viết hơn 2 năm trước
3 0
White
3 0
Cài đặt Download kafka từ một trong các link dưới đây: https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.0/kafka_2.100.8.2.0.tgz Kafka là ...
Hoàng Minh Trung viết hơn 3 năm trước
3 0
White
14 3
Giới thiệu (Ảnh) Về cơ bản kafka là hệ thống message pub/sub phân tán mà có khả năng scale rất tốt. Message của kafka được lưu trên đĩa cứng, đồ...
Hoàng Minh Trung viết hơn 3 năm trước
14 3
{{like_count}}

kipalog

{{ comment_count }}

bình luận

{{liked ? "Đã kipalog" : "Kipalog"}}


White
{{userFollowed ? 'Following' : 'Follow'}}
23 bài viết.
65 người follow

 Đầu mục bài viết

Vẫn còn nữa! x

Kipalog vẫn còn rất nhiều bài viết hay và chủ đề thú vị chờ bạn khám phá!