Sử dụng embedded kafka trong test
kafka
8
Scala
49
White

Hoàng Minh Trung viết ngày 17/08/2015

Khi sử dụng một số hệ thống tính toán phân tán mà setup sẽ mất thời gian như kafka chẳng hạn, đôi khi bạn muốn kafka chỉ chạy trong test session rồi sau đó biến mất, không giữ thông tin gì cả.

Thực ra kafka có hỗ trợ việc này bằng việc cung cấp một thứ gọi là KafkaServer mà nó sẽ chạy trên single node.
Tuy nhiên để chạy kafka thì lại cần ZooKeeper , và rất may mắn chúng ta cũng có một thứ tương tự là ZooKeeperServer chạy trên single node.
Tận dụng 2 thứ này thì chúng ta sẽ chạy được Kafka và ZooKeeper on memory. Để sử dụng 2 thằng này thì mình có sample code dưới đây các bạn có thể tham khảo:

ZookeeperServer

class EmbeddedZookeeper(val connectString: String = ZookeeperConnectionString) {

 val snapshotDir = createTempDir

 val logDir = createTempDir

 val server = new ZooKeeperServer(snapshotDir, logDir, 500)

 val (ip, port) = {
  val splits = connectString.split(":")
  (splits(0), splits(1).toInt)
 }

 val factory = new NIOServerCnxnFactory()
 factory.configure(new InetSocketAddress(ip, port), 16)
 factory.startup(server)
 println(s"ZooKeeperServer isRunning: $isRunning")

 def isRunning: Boolean = Try(server.isRunning) getOrElse false

 def shutdown(): Unit = {
  println(s"Shutting down ZK NIOServerCnxnFactory.")
  factory.shutdown()
  deleteRecursively(snapshotDir)
  deleteRecursively(logDir)
 }
}

KafkaServer

import java.io.File
import java.util.Properties

import scala.util.Try
import scala.concurrent.duration.{Duration, _}
import kafka.admin.AdminUtils
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}

final class EmbeddedKafka(val kafkaParams: Map[String,String]) {

 def this(groupId: String) = this(Map(
  "zookeeper.connect" -> ZookeeperConnectionString,
  "group.id" -> groupId,
  "auto.offset.reset" -> "smallest"))

 def this() = this(s"consumer-${scala.util.Random.nextInt(10000)}")

 /** Starts the ZK server. */
 private val zookeeper = new EmbeddedZookeeper()
 awaitCond(zookeeper.isRunning, 2000.millis)

 val kafkaConfig: KafkaConfig = {
  import scala.collection.JavaConversions._
  val map = Map(
   "broker.id" -> "0",
   "host.name" -> "127.0.0.1",
   "port" -> "9092",
   "advertised.host.name" -> "127.0.0.1",
   "advertised.port" -> "9092",
   "log.dir" -> createTempDir.getAbsolutePath,
   "zookeeper.connect" -> ZookeeperConnectionString,
   "replica.high.watermark.checkpoint.interval.ms" -> "5000",
   "log.flush.interval.messages" -> "1",
   "replica.socket.timeout.ms" -> "500",
   "controlled.shutdown.enable" -> "false",
   "auto.leader.rebalance.enable" -> "false"
  )
  val props = new Properties()
  props.putAll(map)
  new KafkaConfig(props)
 }

 val server = new KafkaServer(kafkaConfig)
 Thread.sleep(2000)

 println(s"Starting the Kafka server at $ZookeeperConnectionString")
 server.startup()
 Thread.sleep(2000)

 val producerConfig: ProducerConfig = {
  val p = new Properties()
  p.put("metadata.broker.list", kafkaConfig.hostName + ":" + kafkaConfig.port)
  p.put("serializer.class", classOf[StringEncoder].getName)
  new ProducerConfig(p)
 }

 val producer = new Producer[String, String](producerConfig)

 def createTopic(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1) {
  AdminUtils.createTopic(server.zkClient, topic, numPartitions, replicationFactor)
  awaitPropagation(topic, 0, 2000.millis)
 }

 def produceAndSendMessage(topic: String, sent: Map[String, Int]): Unit = {
  producer.send(createTestMessage(topic, sent): _*)
 }

 private def createTestMessage(topic: String, send: Map[String, Int]): Seq[KeyedMessage[String, String]] =
  (for ((s, freq) <- send; i <- 0 until freq) yield new KeyedMessage[String, String](topic, s)).toSeq

 def awaitPropagation(topic: String, partition: Int, timeout: Duration): Unit =
  awaitCond(
   server.apis.metadataCache.getPartitionInfo(topic, partition)
    .exists(_.leaderIsrAndControllerEpoch.leaderAndIsr.leader >= 0),
   max = timeout,
   message = s"Partition [$topic, $partition] metadata not propagated after timeout"
  )

 def shutdown(): Unit = try {
  println(s"Shutting down Kafka server.")
  Option(producer).map(_.close())
  //https://issues.apache.org/jira/browse/KAFKA-1887
  Try(server.kafkaController.shutdown())
  Try(server.shutdown())
  server.awaitShutdown()
  server.config.logDirs.foreach(f => deleteRecursively(new File(f)))
  zookeeper.shutdown()
  awaitCond(!zookeeper.isRunning, 2000.millis)
  println(s"ZooKeeper server shut down.")
  Thread.sleep(2000)
 } catch { case e: java.io.IOException => }
}

Sử dụng 2 class này chúng ta có thể tạo kafka in memory một cách dễ dàng:

 val data = Seq(
  """{"user":"helena","commits":98, "month":3, "year":2015}""",
  """{"user":"jacek-lewandowski", "commits":72, "month":3, "year":2015}""",
  """{"user":"pkolaczk", "commits":42, "month":3, "year":2015}""")

 /* Kafka (embedded) setup */
 val kafka = new EmbeddedKafka
 kafka.createTopic("github")

 // simulate another process streaming data to Kafka
 val producer = new Producer[String, String](kafka.producerConfig)
 data.foreach (m => producer.send(new KeyedMessage[String, String]("github", "githubstats", m)))
 producer.close()
Bình luận


White
{{ comment.user.name }}
Bỏ hay Hay
{{comment.like_count}}
Male avatar
{{ comment_error }}
Hủy
   

Hiển thị thử

Chỉnh sửa

White

Hoàng Minh Trung

23 bài viết.
74 người follow
Kipalog
{{userFollowed ? 'Following' : 'Follow'}}
Cùng một tác giả
White
24 1
Bài viết dịch từ http://arslan.io/tenusefultechniquesingo Sử dụng một GOPATH duy nhất Sử dụng đồng thời nhiều GOPATH sẽ không giúp cho hệ thống ...
Hoàng Minh Trung viết gần 6 năm trước
24 1
White
20 15
(Ảnh) Mục đích của bài viết là hướng dẫn cơ bản nhất cho những ai chưa biết về docker, môi trường thực hiện là mac OS. Chuẩn bị Cài đặt virtua...
Hoàng Minh Trung viết gần 7 năm trước
20 15
White
19 4
Giới thiệu (Ảnh) Về cơ bản kafka là hệ thống message pub/sub phân tán mà có khả năng scale rất tốt. Message của kafka được lưu trên đĩa cứng, đồ...
Hoàng Minh Trung viết gần 7 năm trước
19 4
Bài viết liên quan
White
5 0
Bạn có một file log, bạn muốn đổ log vào một topic nào đó trên kafka cho một ai đó dùng. Một cách nhanh chóng có thể sử dụng producer console như s...
Quăng viết gần 6 năm trước
5 0
White
10 0
Kí tự Regex cơ bản Về cơ bản thì các sử lý matching của scala.util.matching.Regex sẽ được "phó thác" (delegate) cho java Regex. Bạn có thể tạo một ...
huydx viết gần 7 năm trước
10 0
{{like_count}}

kipalog

{{ comment_count }}

bình luận

{{liked ? "Đã kipalog" : "Kipalog"}}


White
{{userFollowed ? 'Following' : 'Follow'}}
23 bài viết.
74 người follow

 Đầu mục bài viết

Vẫn còn nữa! x

Kipalog vẫn còn rất nhiều bài viết hay và chủ đề thú vị chờ bạn khám phá!