场景是想实现一个 dsl, 从 kafka 读取消息,sink 到 mysql。
现在遇到的问题是: 读取自定义的 schema, 转化成 scala 的 case class, 怎么获取它的类型?
以简化代码示例, 比较好解释:
import scala.reflect.runtime.universe._
import scala.tools.reflect.ToolBox
val cm = runtimeMirror(getClass.getClassLoader)
val toolBox = cm.mkToolBox()
toolBox.compile(q"""case class Foo(value:String) """)
// 如何获取 Foo 的类型, 因为下文需要将其传入序列化
// Deserializer[T]
val consumerSettings =
ConsumerSettings(system, new StringDeserializer, new Deserializer[Foo])
在 scala 的REPl可以获取它的类型, 这是怎么实现的?
scala> toolBox.compile(q"""case class Foo(value:String) """)
res19: () => Any = scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal$$Lambda$1471/913724364@5429565e
scala> new Foo("1")
res20: Foo = Foo(1)