mirror of
https://github.com/langgenius/dify-sdk-go.git
synced 2026-07-01 20:39:37 -04:00
fix: lost the first message in streaming mode
This commit is contained in:
@@ -13,12 +13,6 @@ type API struct {
|
||||
secret string
|
||||
}
|
||||
|
||||
type ErrorResponse struct {
|
||||
Code string `json:"code"`
|
||||
Message string `json:"message"`
|
||||
Params string `json:"params"`
|
||||
}
|
||||
|
||||
func (api *API) WithSecret(secret string) *API {
|
||||
api.secret = secret
|
||||
return api
|
||||
|
||||
+11
-24
@@ -6,6 +6,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
@@ -45,36 +46,19 @@ func (api *API) ChatMessagesStream(ctx context.Context, req *ChatMessageRequest)
|
||||
}
|
||||
|
||||
func (api *API) chatMessagesStreamHandle(ctx context.Context, resp *http.Response, streamChannel chan ChatMessageStreamChannelResponse) {
|
||||
var (
|
||||
body = resp.Body
|
||||
reader = bufio.NewReader(body)
|
||||
|
||||
err error
|
||||
line []byte
|
||||
)
|
||||
|
||||
defer resp.Body.Close()
|
||||
defer close(streamChannel)
|
||||
|
||||
if line, _, err = reader.ReadLine(); err == nil {
|
||||
var errResp ErrorResponse
|
||||
var _err error
|
||||
if _err = json.Unmarshal(line, &errResp); _err == nil {
|
||||
streamChannel <- ChatMessageStreamChannelResponse{
|
||||
Err: errors.New(string(line)),
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
reader := bufio.NewReader(resp.Body)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
if line, err = reader.ReadBytes('\n'); err != nil {
|
||||
line, err := reader.ReadBytes('\n')
|
||||
if err != nil {
|
||||
streamChannel <- ChatMessageStreamChannelResponse{
|
||||
Err: errors.New("Error reading line: " + err.Error()),
|
||||
Err: fmt.Errorf("error reading line: %w", err),
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -82,14 +66,17 @@ func (api *API) chatMessagesStreamHandle(ctx context.Context, resp *http.Respons
|
||||
if !bytes.HasPrefix(line, []byte("data:")) {
|
||||
continue
|
||||
}
|
||||
|
||||
line = bytes.TrimPrefix(line, []byte("data:"))
|
||||
line = bytes.TrimSpace(line)
|
||||
|
||||
var resp ChatMessageStreamChannelResponse
|
||||
if err = json.Unmarshal(line, &resp); err != nil {
|
||||
streamChannel <- ChatMessageStreamChannelResponse{
|
||||
Err: errors.New("Error unmarshalling event: " + err.Error()),
|
||||
Err: fmt.Errorf("error unmarshalling event: %w", err),
|
||||
}
|
||||
return
|
||||
} else if resp.Event == "error" {
|
||||
streamChannel <- ChatMessageStreamChannelResponse{
|
||||
Err: errors.New("error streaming event: " + string(line)),
|
||||
}
|
||||
return
|
||||
} else if resp.Answer == "" {
|
||||
|
||||
Reference in New Issue
Block a user