Hi everyone.
I have a problem with my content download function.
In practice, it seems like it doesn't close the task/thread and I have no idea how to fix it
my log after crash...
=================================================================
Error occurred for plugin 'dev':
Plugin context UI is running too many tasks (101). Aborting
=================================================================
BrightScript Micro Debugger.
Enter any BrightScript statement, debug commands, or HELP.
Suspending threads...
Thread selected: 1* ...ts/LoaderTasks/UriFetcher.brs(5) m.top.control = "RUN"
Current Function:
001: function init()
002: m.port = createObject("roMessagePort")
003: m.top.observeField("request", m.port)
004: m.top.functionName = "go"
005:* m.top.control = "RUN"
006: m.ret = true
007: m.maxNumberXfers = 20
008: end function
Source Digest(s):
pkg: dev 0.1.2 c1eb345f RMPlay
pkg: 632126 1.1.1 a093eae0 libplayready
Too many task threads (runtime error &h29) in pkg:/components/LoaderTasks/UriFetcher.brs(5)
Backtrace:
#0 Function init() As Dynamic
file/line: pkg:/components/LoaderTasks/UriFetcher.brs(6)
Local Variables:
global Interface:ifGlobal
m roAssociativeArray refcnt=2 count:3
Threads:
ID Location Source Code
0 pkg:/source/main.brs(49) msg = wait(0, m.port)
1* ...ts/LoaderTasks/UriFetcher.brs(5) m.top.control = "RUN"
2 ...ts/LoaderTasks/UriFetcher.brs(14) msg = wait(60000, m.port)
3 ...ts/LoaderTasks/UriFetcher.brs(14) msg = wait(60000, m.port)
4 ...ts/LoaderTasks/UriFetcher.brs(14) msg = wait(60000, m.port)
...
101 ...ts/LoaderTasks/UriFetcher.brs(14) msg = wait(60000, m.port)
102[u] ...s/LoaderLogic/MakeRequest.brs(10) if type(parameters) = "roAssociativeArray"
*selected [u]unattached(not debuggable)
Brightscript Debugger>
And my code ...
function init()
m.port = createObject("roMessagePort")
m.top.observeField("request", m.port)
m.top.functionName = "go"
m.top.control = "RUN"
m.ret = true
m.maxNumberXfers = 20
end function
function go() as void
m.jobsById = {}
m.cachedRequest = []
while true
msg = wait(60000, m.port)
if msg = invalid
print "No messages received, continuing loop."
continue while
end if
mt = type(msg)
currentTime = getCurrentTime()
for each idKey in m.jobsById
job = m.jobsById[idKey]
if job.startTime <> invalid then
if currentTime - job.startTime > 60000 ' 60 secondi
print "UriFetcher: Timeout for job id: "; idKey
job.xfer.abort() ' Aborta il trasferimento
m.jobsById.delete(idKey) ' Rimuovi il job in timeout
end if
end if
end for
if mt = "roSGNodeEvent"
if msg.getField() = "request"
m.ret = addRequest(msg.getData())
else
print "UriFetcher: unrecognized field '"; msg.getField(); "'"
end if
else if mt = "roUrlEvent"
processResponse(msg)
else
print "UriFetcher: unrecognized event type '"; mt; "'"
end if
if m.ret = false
print "UriFetcher: too many requests"
m.cachedRequest.Push(msg.getData())
print "UriFetcher: a request saved to cache "
end if
msg = invalid
printStatus()
end while
end function
function addRequest(request as object) as boolean
if type(request) = "roAssociativeArray"
context = request.context
if type(context) = "roSGNode"
parameters = context.parameters
if type(parameters) = "roAssociativeArray"
uri = parameters.uri
if type(uri) = "roString"
if m.jobsById.Count() < m.maxNumberXfers
newXfer = CreateObject("roUrlTransfer")
newXfer.setUrl(uri)
newXfer.setPort(m.port)
' newXfer.setTimeout(60000)
newXfer.setCertificatesFile("common:/certs/ca-bundle.crt")
newXfer.AddHeader("X-Roku-Reserved-Dev-Id", "")
newXfer.initClientCertificates()
newXfer.addHeader("Content-Type", "application/json")
if m.global.UserInfo <> invalid
print "Authorization" + "Bearer " + m.global.UserInfo.token
newXfer.addHeader("Authorization", "Bearer " + m.global.UserInfo.token)
end if
' should transfer more stuff from parameters to urlXfer
idKey = stri(newXfer.getIdentity()).trim()
if parameters.requestBody <> invalid
postRequest = formatJson(parameters.requestBody)
ok = newXfer.AsyncPostFromString(postRequest)
else
ok = newXfer.AsyncGetToString()
end if
if ok
m.jobsById[idKey] = { context: context, xfer: newXfer }
print "UriFetcher: initiating transfer '"; idkey; "' for URI '"; uri; "'"; " succeeded: "; ok
else
print "UriFetcher: invalid uri: "; uri
end if
else
print "UriFetcher: urlTransferPool is fully used"
return false
end if
end if
end if
end if
end if
return true
end function
function processResponse(msg as object)
idKey = stri(msg.GetSourceIdentity()).trim()
job = m.jobsById[idKey]
if job <> invalid
m.ret = true
context = job.context
parameters = context.parameters
uri = parameters.uri
result = { code: msg.getResponseCode(), content: msg.getString() }
print "UriFetcher: response for transfer job idkey'"; idkey; "' for URI '"; uri; "'"
' next code caould be use for test new api calls,
' just change text to ypur specific end point
' if Instr(1, parameters.uri, "/api/v1/saveprofile") <> 0
' ? result
' end if
' could handle various error codes, retry, etc.
if result.code = 429
headers = msg.GetResponseHeaders()
print "UriFetcher: Server error code: 429"
print "UriFetcher: Server response: Too Many Requests, trying to repeat request after "; headers["retry-after"]; " seconds"
sleep(headers["retry-after"].ToInt() * 1000)
ok = addRequest(job)
if not ok
print "UriFetcher: too many requests in process"
print "UriFetcher: a request saved to cache"
m.cachedRequest.Push(job)
end if
else
job.context.response = result
end if
m.jobsById.delete(idKey)
job.xfer = invalid
job.context = invalid
'm.port = invalid
else
print "UriFetcher: event for unknown job "; idkey
end if
' try to execute a cached job
tryCachedRequests()
if job.xfer <> invalid
job.xfer = invalid ' Invalida l'oggetto roUrlTransfer dopo il completamento
end if
end function
function tryCachedRequests()
if m.cachedRequest.Count() <> 0
sleep(1000) ' Aggiungi un breve ritardo per evitare sovraccarico
request = m.cachedRequest.Shift() ' Prendi la richiesta dalla cache
print "UriFetcher: trying to execute cached request"
ok = addRequest(request)
if not ok
print "UriFetcher: too many requests in process"
m.cachedRequest.Push(request) ' Rimetti la richiesta in cache se non può essere processata
end if
end if
end function
function getCurrentTime() as Integer
dt = CreateObject("roDateTime")
return dt.AsSeconds() ' Restituisce il tempo in millisecondi
end function
sub printStatus()
print "Number of request in cahed queue: "; m.cachedRequest.count()
print "Number of jobs in processing: "; m.jobsById.count()
end sub
It seems to have solved it!
precisely with the above solution.
Thanks to RokuBen for the tip!
Who is holding on to the Task objects? That's where they need to be released. Your list of threads indicates that you're making a whole new task node for each download instead of having a single node with various downloads queued.
Unfortunately I'm having a hard time understanding where I could put the object so it can be recycled.
Can you kindly give me an example?
@MassimilianoBG where is you logic for making what I assume is the `UriFetcher` component?
I don't understand English very well, unfortunately. Anyway... here is the xml configuration file
<?xml version="1.0" encoding="utf-8"?>
<component name="UriFetcher" extends="Task">
<interface>
<field id="request" type="assocarray" />
</interface>
<script type="text/brightscript" uri="UriFetcher.brs" />
<children />
</component>
and here is where I call 'Urifetcher'
function makerequest
sub makeRequest(parameters as object, callback as string)
context = createObject("RoSGNode", "Node")
if m.uriFetcher = invalid
m.uriFetcher = createObject("roSGNode", "UriFetcher")
end if
if type(parameters) = "roAssociativeArray"
context.addFields({ parameters: parameters, response: {} })
context.observeField("response", callback) ' response callback is request-specific
m.uriFetcher.request = { context: context }
end if
end sub
and here is an example of use
sub init()
' set the name of the function in the Task node component to be executed when the state field changes to RUN
' in our case this method executed after the following cmd: m.contentTask.control = "run"(see Init method in MainScene)
m.top.functionName = "getExploreContent"
m.callBack = "exploreDataResultDispatcher"
end sub
sub getExploreContent()
' request the content feed from the API
print "Loading the explore data..."
makeRequest({ uri: m.global.BaseURL + "/api/v1/get_explore_tv" }, m.callBack)
end sub
function exploreDataResultDispatcher(msg as object)
mt = type(msg)
if mt = "roSGNodeEvent"
print "UriFetcher: results obtained", m.callBack
context = msg.getRoSGNode()
response = msg.getData()
rt = type(response)
if rt = "roAssociativeArray"
parameters = context.parameters
print " uri: "; parameters.uri
' ? " response: "; response
if Instr(1, parameters.uri, "/get_explore_tv") <> 0
' explore content
parsingRootContentData(response.content)
else
' other uri conditions
end if
else
print "UriFetcher: unknown response type '"; rt; "'"
end if
else
print "UriFetchere: unknown msg type '"; mt; "'"
end if
end function
I've already pulled out all my hair out of anger. I'm bald now...
Nonetheless, I will try not to create a new instance of UrlFetcher by calling it from m.global like this...
Now I'll cross my fingers and toes and see if the thing holds up
sub makeRequest(parameters as object, callback as string)
context = createObject("RoSGNode", "Node")
if m.global.uriFetcher = invalid
uriFetcher = createObject("roSGNode", "UriFetcher")
m.global.addFields({uriFetcher: uriFetcher})
end if
if type(parameters) = "roAssociativeArray"
context.addFields({ parameters: parameters, response: {} })
context.observeField("response", callback) ' response callback is request-specific
m.global.uriFetcher.request = { context: context }
end if
end sub
It seems to have solved it!
precisely with the above solution.
Thanks to RokuBen for the tip!