服务端也就是后系统系统如何对客流端以下请求进行配置。 conn.setRequestProperty("Content-Type", "application/json;charset=utf-8"); //5.设置Accept conn.setRequestProperty("Accept", "text/event-stream");
时间: 2025-08-06 10:04:22 AIGC 浏览: 21
<think>我们有两个需求:
1. 客户端请求头中可能包含 `Content-Type: application/json;charset=utf-8`,表示客户端发送的是JSON格式的数据。
2. 客户端请求头中包含 `Accept: text/event-stream`,表示客户端希望接收SSE(Server-Sent Events)格式的响应。
服务端需要同时支持这两种头信息:
- 对于请求体(request body)为JSON格式的数据,服务端需要能够正确解析。
- 对于响应,服务端需要按照SSE的规范返回事件流。
由于SSE是一种长连接,服务端会持续发送数据,因此通常使用异步非阻塞的方式处理。这里我们以Spring Boot为例(使用Spring Web MVC或WebFlux)来展示如何配置。
注意:在同一个请求中,客户端发送的是JSON,但要求返回的是事件流,这通常用于客户端上传一个请求(例如订阅某些数据的条件),然后服务端持续推送相关事件。
实现步骤:
1. 定义控制器方法,处理POST请求(因为客户端发送JSON数据,通常用POST)。
2. 方法参数使用`@RequestBody`接收JSON并映射到Java对象。
3. 方法返回类型为`SseEmitter`(如果使用Spring Web MVC)或者`Flux<ServerSentEvent>`(如果使用Spring WebFlux,响应式编程)。
这里我们分别给出两种实现:
方案一:Spring Web MVC(使用SseEmitter)
```java
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@RestController
public class SseController {
// 客户端发送JSON请求体,并希望接收事件流
@PostMapping(path = "/stream", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handleSse(@RequestBody MyRequest request) {
// 创建一个SseEmitter,可以设置超时时间,默认是30秒
SseEmitter emitter = new SseEmitter(0L); // 0表示不超时
// 异步处理,避免阻塞
CompletableFuture.runAsync(() -> {
try {
// 模拟持续发送事件
for (int i = 0; i < 10; i++) {
// 构建事件数据
SseEmitter.SseEventBuilder event = SseEmitter.event()
.name("event-name") // 事件名称,可选
.id(String.valueOf(i)) // 事件ID,可选
.data("Message " + i + " for request: " + request.getParameter()) // 数据
.reconnectTime(5000); // 重连时间,可选
// 发送事件
emitter.send(event);
// 间隔1秒
Thread.sleep(1000);
}
// 完成
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
}
```
方案二:Spring WebFlux(响应式,使用Flux)
```java
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
@RestController
public class ReactiveSseController {
@PostMapping(path = "/reactive-stream", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> handleReactiveSse(@RequestBody Mono<MyRequest> requestMono) {
// 将请求体转换为Flux,然后持续发送事件
return requestMono.flatMapMany(request ->
Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String>builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("Data " + sequence + " for request: " + request.getParameter())
.build()
)
.take(10); // 发送10次
}
}
```
关键配置说明:
- `consumes = MediaType.APPLICATION_JSON_VALUE`:指定该方法只处理Content-Type为`application/json`的请求。
- `produces = MediaType.TEXT_EVENT_STREAM_VALUE`:指定响应的Content-Type为`text/event-stream`,这样Spring会自动设置响应头`Content-Type: text/event-stream`,同时也会设置其他必要的头(如`Cache-Control: no-cache`等)。
注意:在WebFlux中,我们返回的是`Flux<ServerSentEvent>`,Spring会自动将其转换为符合SSE规范的响应流。
另外,关于客户端请求头中的`Accept: text/event-stream`,Spring框架在匹配请求处理方法时会检查`produces`属性,因此设置`produces = MediaType.TEXT_EVENT_STREAM_VALUE`后,只有请求头`Accept`包含`text/event-stream`的请求才会匹配到该方法。
但是,有些客户端可能发送的`Accept`头包含多个值(如`*/*`),为了更灵活,我们可以不强制要求`Accept`头,而是通过`produces`属性来设置响应类型。这样,即使客户端没有发送`Accept: text/event-stream`,只要服务端设置了`produces`,就会返回事件流,但客户端需要能够处理事件流。
如果客户端明确要求`Accept: text/event-stream`,那么服务端通过设置`produces`属性已经满足要求。
补充:如何设置跨域?
如果需要跨域,可以在控制器方法上添加`@CrossOrigin`注解,或者使用全局CORS配置。
全局CORS配置示例(Spring Boot):
```java
@Configuration
public class CorsConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("*")
.allowedMethods("*")
.allowedHeaders("*")
.exposedHeaders("Content-Type", "Accept") // 暴露需要的响应头
.allowCredentials(false).maxAge(3600);
}
}
```
对于WebFlux,可以使用以下配置:
```java
@Configuration
public class CorsGlobalConfiguration implements WebFluxConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("*")
.allowedMethods("*")
.allowedHeaders("*")
.exposedHeaders("Content-Type", "Accept")
.allowCredentials(false).maxAge(3600);
}
}
```
这样,服务端就可以处理客户端发送的JSON请求体,并返回事件流了。
最后,注意在服务端实现中,要确保资源的管理(如SseEmitter的超时和完成处理)以避免内存泄漏。
相关问题:
1. 如何在Spring Boot中处理SSE连接断开?
2. 如何对SSE事件流进行单元测试?
3. 如何保证SSE服务端的高可用性和负载均衡?</think>### 服务端配置支持特定请求头(JSON请求 + SSE响应)指南
服务端需要同时支持两种关键请求头:
1. **请求处理**:`Content-Type: application/json;charset=utf-8`(客户端发送JSON数据)
2. **响应处理**:`Accept: text/event-stream`(客户端要求SSE事件流)
以下是具体实现方案(以Spring Boot为例):
---
### 一、核心配置步骤
#### 1. 请求处理器配置(处理JSON请求体)
```java
@RestController
public class SseController {
// 关键配置:consumes声明处理JSON请求体
@PostMapping(
path = "/sse-stream",
consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, // 处理JSON请求
produces = MediaType.TEXT_EVENT_STREAM_VALUE // 返回事件流
)
public SseEmitter handleSseRequest(@RequestBody UserRequest request) {
// 1. 解析JSON请求体
String userId = request.getUserId();
// 2. 创建SSE发射器
SseEmitter emitter = new SseEmitter(0L); // 0=无超时限制
// 3. 启动异步线程发送事件
CompletableFuture.runAsync(() -> {
try {
for (int i = 0; i < 10; i++) {
// 构建SSE格式事件
emitter.send(
SseEmitter.event()
.name("userUpdate")
.id(String.valueOf(i))
.data(new UserData(userId, "Event"+i))
);
Thread.sleep(1000);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
// 请求体DTO
static class UserRequest {
private String userId;
// getters/setters
}
// 响应体DTO
static class UserData {
private String userId;
private String message;
// 构造方法/getters
}
}
```
#### 2. 响应头自动配置(Spring Boot自动处理)
当使用`produces = MediaType.TEXT_EVENT_STREAM_VALUE`时,Spring Boot会自动设置以下响应头:
```http
HTTP/1.1 200 OK
Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked
```
---
### 二、关键配置说明
| 配置项 | 作用 | 实现方式 |
|--------------------------------|----------------------------------------------------------------------|--------------------------------------------------------------------------|
| **处理JSON请求** | 解析客户端发送的JSON数据 | `consumes = MediaType.APPLICATION_JSON_UTF8_VALUE` + `@RequestBody` |
| **生成事件流响应** | 符合SSE协议规范的事件流 | `produces = MediaType.TEXT_EVENT_STREAM_VALUE` + `SseEmitter` |
| **连接管理** | 保持长连接并持续发送事件 | `SseEmitter(0L)`设置无超时限制 + 异步线程发送 |
| **数据格式转换** | 自动将Java对象转换为JSON | Spring Boot默认使用Jackson库进行序列化 |
| **SSE事件格式** | 符合`data: {...}\n\n`规范 | `SseEmitter.event().data()`自动处理格式 |
---
### 三、完整工作流程
```mermaid
sequenceDiagram
participant Client as 客户端
participant Server as 服务端
Client->>Server: POST /sse-stream
Note right of Client: Headers:<br/>Content-Type: application/json<br/>Accept: text/event-stream
Client->>Server: {"userId": "123"}
Server->>Server: 解析JSON请求体
Server->>Server: 创建SseEmitter
Server-->>Client: HTTP 200 (text/event-stream)
loop 事件推送
Server->>Client: data:{"userId":"123","message":"Event0"}
Server->>Client: data:{"userId":"123","message":"Event1"}
Server->>Client: ...(持续推送)
end
Server->>Client: [连接关闭]
```
---
### 四、高级配置建议
1. **跨域支持**(若需要)
```java
@Configuration
public class CorsConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/sse-stream")
.allowedOrigins("*")
.allowedMethods("POST")
.allowedHeaders("Content-Type", "Accept");
}
}
```
2. **连接中断处理**
```java
emitter.onCompletion(() -> {
log.info("SSE连接完成: {}", request.getUserId());
});
emitter.onTimeout(() -> {
log.warn("SSE连接超时: {}", request.getUserId());
});
```
3. **性能优化**
```java
// 在application.properties中
server.servlet.context-path=/api
server.max-http-header-size=8192
spring.mvc.async.request-timeout=0 # 禁用异步超时
```
---
### 五、客户端请求示例
```javascript
// 使用fetch API发送请求
fetch('/sse-stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json;charset=utf-8',
'Accept': 'text/event-stream'
},
body: JSON.stringify({ userId: '123' })
})
.then(response => {
const reader = response.body.getReader();
const readChunk = () => {
reader.read().then(({ value, done }) => {
if (done) {
console.log('Stream complete');
return;
}
// 解码并处理SSE事件
const text = new TextDecoder().decode(value);
text.split('\n\n').forEach(event => {
if(event.includes('data:')) {
const data = JSON.parse(event.match(/data: ({.*})/)[1]);
console.log('Received:', data);
}
});
readChunk();
});
};
readChunk();
});
```
---
阅读全文