问题是这样的,本来是在本地开发 flink 应用,然后想连接云主机 Doris 把内容写入到 Doris 中,但是一开始报错是返回了远程 Doris 运行时的一个内网地址。
于是改了以下源码的这个类:BackendV2 在其中增加了 convertToHostname() 方法将内网地址映射为公网地址
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.rest.models;
import com.car.common.Constant;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
/**
* Be response model
**/
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendV2 {
@JsonProperty(value = "backends")
private List<BackendRowV2> backends;
public void setBackends(List<BackendRowV2> backends) { this.backends = backends; }
public static class BackendRowV2 {
@JsonProperty("ip")
public String ip;
@JsonProperty(value="http_port")
public int httpPort;
@JsonProperty("is_alive")
public boolean isAlive;
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getHttpPort() {
return httpPort;
}
public void setHttpPort(int httpPort) {
this.httpPort = httpPort;
}
public boolean isAlive() {
return isAlive;
}
public void setAlive(boolean alive) {
isAlive = alive;
}
public String convertToHostname(String ip) {
System.out.println("================"+ip);
this.ip = ip;
if(Objects.equals(ip,"192.168.0.107")) {
return ip= Constant.HADOOP102;
} else if(Objects.equals(ip,"192.168.0.108")) {
return ip = Constant.HADOOP103;
} else if (Objects.equals(ip,"192.168.0.109")) {
return ip = Constant.HADOOP104;
} else {
return null;
}
}
public String toBackendString(){
return convertToHostname(ip) + ":" + httpPort;
//return ip + ":" + httpPort;
}
}
}
本来的源码是这样的:
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.rest.models;
import com.car.common.Constant;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
/**
* Be response model
**/
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendV2 {
@JsonProperty(value = "backends")
private List<BackendRowV2> backends;
public void setBackends(List<BackendRowV2> backends) { this.backends = backends; }
public static class BackendRowV2 {
@JsonProperty("ip")
public String ip;
@JsonProperty(value="http_port")
public int httpPort;
@JsonProperty("is_alive")
public boolean isAlive;
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getHttpPort() {
return httpPort;
}
public void setHttpPort(int httpPort) {
this.httpPort = httpPort;
}
public boolean isAlive() {
return isAlive;
}
public void setAlive(boolean alive) {
isAlive = alive;
}
public String toBackendString(){
return ip + ":" + httpPort;
}
}
}
但是替换完以后报错如下:
Caused by: org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "http_port" (class org.apache.doris.flink.rest.models.BackendV2$BackendRowV2), not marked as ignorable (4 known properties: "httpPort", "isAlive", "alive", "ip"])
at [Source: (String)"{"backends":[{"ip":"192.168.0.109","http_port":7040,"is_alive":true}]}"; line: 1, column: 52] (through reference chain: org.apache.doris.flink.rest.models.BackendV2["backends"]->java.util.ArrayList[0]->org.apache.doris.flink.rest.models.BackendV2$BackendRowV2["http_port"])
at org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1127)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2023)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1700)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1678)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:319)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:355)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:313)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)
at org.apache.doris.flink.rest.RestService.parseBackendV2(RestService.java:380)
替换前报错如下:
2023-07-25 20:54:43 WARN (org.apache.doris.flink.sink.writer.DorisWriter:tryHttpConnection) - Failed to connect to backend:http://192.168.0.109:7040
java.net.ConnectException: Connection timed out: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1228)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1162)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
at org.apache.doris.flink.sink.writer.DorisWriter.tryHttpConnection(DorisWriter.java:259)
at org.apache.doris.flink.sink.writer.DorisWriter.getAvailableBackend(DorisWriter.java:245)
at org.apache.doris.flink.sink.writer.DorisWriter.initializeLoad(DorisWriter.java:108)
at org.apache.doris.flink.sink.DorisSink.createWriter(DorisSink.java:64)
at org.apache.flink.streaming.api.transformations.SinkV1Adapter.createWriter(SinkV1Adapter.java:77)
at org.apache.flink.streaming.api.transformations.SinkV1Adapter$PlainSinkAdapter.createWriter(SinkV1Adapter.java:306)
at org.apache.flink.streaming.api.transformations.SinkV1Adapter$StatefulSinkAdapter.createWriter(SinkV1Adapter.java:315)
at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
想问问大家有没遇到过本地 flink 应用远程连接 Doris 并写入数据的问题啊? 感谢大家!!!